Skip to content

Instantly share code, notes, and snippets.

@jbouwh
Last active December 28, 2023 11:37
Show Gist options
  • Select an option

  • Save jbouwh/8d9551a6d66887603b3534b5813c8a8d to your computer and use it in GitHub Desktop.

Select an option

Save jbouwh/8d9551a6d66887603b3534b5813c8a8d to your computer and use it in GitHub Desktop.
Script to remove emails from the postfix holdqueue after a fixed time and send an email notification. This script can be used with the `HoldQuarantinedMessages` setting set to `True` in `opendmarc`. Then script can be executed hourly from a cronjob on you mail system.
#! /usr/bin/python3
"""Postfix hold queue clean up tool."""
from datetime import datetime, timedelta
import json
from pathlib import Path
import subprocess
import logging
import os
import smtplib
from email.message import EmailMessage
from typing import Any
base_name = os.path.basename(__file__)
fh = logging.FileHandler(f"/var/log/{base_name}.log")
fh.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
_LOGGER = logging.getLogger(base_name)
_LOGGER.addHandler(fh)
_LOGGER.addHandler(ch)
_LOGGER.setLevel(logging.INFO)
_LOGGER.info("Checking mail queue")
CONF_POST_QUEUE_EXE = "postqueue"
CONF_POST_QUEUE_ARGS = "-j"
CONF_POST_SUPER_EXE = "postsuper"
CONF_POST_SUPER_ARGS = "-d"
CONF_POST_CAT_EXE = "postcat"
CONF_POST_CAT_ARGS = "-q"
CONF_SMTP_SERVER = "beta.jbsoft.nl"
CONF_SENDER_EMAIL = "noreply@jbsoft.nl"
CONF_NOTIFICATION_RECIPIENT = "postmaster@jbsoft.nl"
CONF_NOTIFIED_TEMP_FILE = "/tmp/postfix_hold_queue_notified"
JAIL_TIME_TO_DESTROY = timedelta(days=7)
MAIL_DETAIL_HEADERS = {"Authentication-Results", "Subject", "Received-SPF"}
# Create a secure SSL context
# context = ssl.create_default_context()
def get_mail_item_details(queue_id: str) -> str:
"""Get mail authentication details from a mail in the hold queue."""
def _include(line: bytes) -> bool:
return any(line.decode().startswith(f"{header}: ") for header in MAIL_DETAIL_HEADERS)
post_cat_proc = subprocess.Popen(
[CONF_POST_CAT_EXE,CONF_POST_CAT_ARGS, queue_id],
stdout=subprocess.PIPE
)
try:
lines = post_cat_proc.stdout.readlines()
except Exception:
_LOGGER.exception("Fetching message details for mail with queue id %s failed", queue_id)
return False
mail_details = [line.decode() for line in lines if _include(line)]
return "\n".join(mail_details)
def send_notification_mail(message: str, content_sender: str, content_recipients: str):
"""Send notification on planned removal."""
try:
server = smtplib.SMTP(CONF_SMTP_SERVER)
msg = EmailMessage()
msg.set_content(message)
# me == the sender's email address
# you == the recipient's email address
msg['Subject'] = f'Mail in quarantine for {content_recipients} from {content_sender}'
msg['From'] = CONF_SENDER_EMAIL
msg['To'] = CONF_NOTIFICATION_RECIPIENT
# server.starttls(context=context) # Secure the connection
# server.login(CONF_SMTP_USER_LOGIN, CONF_SMTP_USER_PASSWORD)
server.ehlo()
server.send_message(msg)
except Exception as exc:
_LOGGER.exception("Unexpected error sending notification: %s", exc)
raise
finally:
server.quit()
def remove_from_hold_queue(queue_id: str) -> bool:
"""Remove a message from the hold queue."""
post_super_proc = subprocess.Popen(
[CONF_POST_SUPER_EXE,CONF_POST_SUPER_ARGS, queue_id],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
lines = post_super_proc.stdout.readlines()
_LOGGER.info("\n".join(line.decode("utf-8") for line in lines))
lines = post_super_proc.stderr.readlines()
_LOGGER.info("\n".join(line.decode("utf-8") for line in lines))
except Exception:
_LOGGER.exception("Removing message with queue id %s failed", queue_id)
return False
return True
postqueue_dict: dict[str, Any]
postqueue_proc = subprocess.Popen([CONF_POST_QUEUE_EXE,CONF_POST_QUEUE_ARGS],stdout=subprocess.PIPE)
hold_queue = {}
while True:
try:
postqueue_dict = json.loads(postqueue_proc.stdout.readline())
except json.decoder.JSONDecodeError:
postqueue_dict = None
break
else:
if postqueue_dict["queue_name"] != "hold":
continue
queue_id = postqueue_dict.pop("queue_id")
hold_queue[queue_id] = postqueue_dict
if postqueue_dict is None:
break
current_time = datetime.utcnow()
notified = set()
notified_file = Path(CONF_NOTIFIED_TEMP_FILE)
if os.path.isfile(notified_file):
try:
with open(notified_file, "r") as json_file:
json_content = json.load(json_file)
for id in json_content["queue_ids"]:
notified.add(id)
except Exception as exc:
_LOGGER.warning("Could not read file %s, overwriting: %s", notified_file, exc)
for queue_id, values in hold_queue.items():
arrived_epoch = values["arrival_time"]
arrived_time = datetime.fromtimestamp(arrived_epoch)
sender = values["sender"]
message_size: int = values["message_size"]
recipients = ", ".join([recipient["address"] for recipient in values["recipients"]])
planned_removal = arrived_time + JAIL_TIME_TO_DESTROY
if destroy := (current_time > planned_removal):
planned_removal = None
message = (
f"queue_id: {queue_id}, planned removal: {'now' if destroy else planned_removal}, "
f"arrived: {arrived_time}, size: {message_size}, sender: {sender}, recipients: {recipients}"
)
if not destroy:
if queue_id not in notified:
_LOGGER.info(message)
message_details = get_mail_item_details(queue_id)
send_notification_mail(f"{message}\n\n{message_details}", sender, recipients)
_LOGGER.info(f"Notification sent to {sender}")
notified.add(queue_id)
else:
if remove_from_hold_queue(queue_id) and queue_id in notified:
notified.remove(queue_id)
# Save the notified queue id's for next run
try:
with open(notified_file, "w") as json_file:
json.dump({"queue_ids": list(notified)}, json_file)
except Exception as exc:
_LOGGER.exception("Unexpected error writing notified ids: %s", exc)
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment