Last active
December 28, 2023 11:37
-
-
Save jbouwh/8d9551a6d66887603b3534b5813c8a8d to your computer and use it in GitHub Desktop.
Script to remove emails from the postfix holdqueue after a fixed time and send an email notification. This script can be used with the `HoldQuarantinedMessages` setting set to `True` in `opendmarc`. Then script can be executed hourly from a cronjob on you mail system.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/python3 | |
| """Postfix hold queue clean up tool.""" | |
| from datetime import datetime, timedelta | |
| import json | |
| from pathlib import Path | |
| import subprocess | |
| import logging | |
| import os | |
| import smtplib | |
| from email.message import EmailMessage | |
| from typing import Any | |
| base_name = os.path.basename(__file__) | |
| fh = logging.FileHandler(f"/var/log/{base_name}.log") | |
| fh.setLevel(logging.INFO) | |
| ch = logging.StreamHandler() | |
| ch.setLevel(logging.ERROR) | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| fh.setFormatter(formatter) | |
| _LOGGER = logging.getLogger(base_name) | |
| _LOGGER.addHandler(fh) | |
| _LOGGER.addHandler(ch) | |
| _LOGGER.setLevel(logging.INFO) | |
| _LOGGER.info("Checking mail queue") | |
| CONF_POST_QUEUE_EXE = "postqueue" | |
| CONF_POST_QUEUE_ARGS = "-j" | |
| CONF_POST_SUPER_EXE = "postsuper" | |
| CONF_POST_SUPER_ARGS = "-d" | |
| CONF_POST_CAT_EXE = "postcat" | |
| CONF_POST_CAT_ARGS = "-q" | |
| CONF_SMTP_SERVER = "beta.jbsoft.nl" | |
| CONF_SENDER_EMAIL = "noreply@jbsoft.nl" | |
| CONF_NOTIFICATION_RECIPIENT = "postmaster@jbsoft.nl" | |
| CONF_NOTIFIED_TEMP_FILE = "/tmp/postfix_hold_queue_notified" | |
| JAIL_TIME_TO_DESTROY = timedelta(days=7) | |
| MAIL_DETAIL_HEADERS = {"Authentication-Results", "Subject", "Received-SPF"} | |
| # Create a secure SSL context | |
| # context = ssl.create_default_context() | |
| def get_mail_item_details(queue_id: str) -> str: | |
| """Get mail authentication details from a mail in the hold queue.""" | |
| def _include(line: bytes) -> bool: | |
| return any(line.decode().startswith(f"{header}: ") for header in MAIL_DETAIL_HEADERS) | |
| post_cat_proc = subprocess.Popen( | |
| [CONF_POST_CAT_EXE,CONF_POST_CAT_ARGS, queue_id], | |
| stdout=subprocess.PIPE | |
| ) | |
| try: | |
| lines = post_cat_proc.stdout.readlines() | |
| except Exception: | |
| _LOGGER.exception("Fetching message details for mail with queue id %s failed", queue_id) | |
| return False | |
| mail_details = [line.decode() for line in lines if _include(line)] | |
| return "\n".join(mail_details) | |
| def send_notification_mail(message: str, content_sender: str, content_recipients: str): | |
| """Send notification on planned removal.""" | |
| try: | |
| server = smtplib.SMTP(CONF_SMTP_SERVER) | |
| msg = EmailMessage() | |
| msg.set_content(message) | |
| # me == the sender's email address | |
| # you == the recipient's email address | |
| msg['Subject'] = f'Mail in quarantine for {content_recipients} from {content_sender}' | |
| msg['From'] = CONF_SENDER_EMAIL | |
| msg['To'] = CONF_NOTIFICATION_RECIPIENT | |
| # server.starttls(context=context) # Secure the connection | |
| # server.login(CONF_SMTP_USER_LOGIN, CONF_SMTP_USER_PASSWORD) | |
| server.ehlo() | |
| server.send_message(msg) | |
| except Exception as exc: | |
| _LOGGER.exception("Unexpected error sending notification: %s", exc) | |
| raise | |
| finally: | |
| server.quit() | |
| def remove_from_hold_queue(queue_id: str) -> bool: | |
| """Remove a message from the hold queue.""" | |
| post_super_proc = subprocess.Popen( | |
| [CONF_POST_SUPER_EXE,CONF_POST_SUPER_ARGS, queue_id], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| try: | |
| lines = post_super_proc.stdout.readlines() | |
| _LOGGER.info("\n".join(line.decode("utf-8") for line in lines)) | |
| lines = post_super_proc.stderr.readlines() | |
| _LOGGER.info("\n".join(line.decode("utf-8") for line in lines)) | |
| except Exception: | |
| _LOGGER.exception("Removing message with queue id %s failed", queue_id) | |
| return False | |
| return True | |
| postqueue_dict: dict[str, Any] | |
| postqueue_proc = subprocess.Popen([CONF_POST_QUEUE_EXE,CONF_POST_QUEUE_ARGS],stdout=subprocess.PIPE) | |
| hold_queue = {} | |
| while True: | |
| try: | |
| postqueue_dict = json.loads(postqueue_proc.stdout.readline()) | |
| except json.decoder.JSONDecodeError: | |
| postqueue_dict = None | |
| break | |
| else: | |
| if postqueue_dict["queue_name"] != "hold": | |
| continue | |
| queue_id = postqueue_dict.pop("queue_id") | |
| hold_queue[queue_id] = postqueue_dict | |
| if postqueue_dict is None: | |
| break | |
| current_time = datetime.utcnow() | |
| notified = set() | |
| notified_file = Path(CONF_NOTIFIED_TEMP_FILE) | |
| if os.path.isfile(notified_file): | |
| try: | |
| with open(notified_file, "r") as json_file: | |
| json_content = json.load(json_file) | |
| for id in json_content["queue_ids"]: | |
| notified.add(id) | |
| except Exception as exc: | |
| _LOGGER.warning("Could not read file %s, overwriting: %s", notified_file, exc) | |
| for queue_id, values in hold_queue.items(): | |
| arrived_epoch = values["arrival_time"] | |
| arrived_time = datetime.fromtimestamp(arrived_epoch) | |
| sender = values["sender"] | |
| message_size: int = values["message_size"] | |
| recipients = ", ".join([recipient["address"] for recipient in values["recipients"]]) | |
| planned_removal = arrived_time + JAIL_TIME_TO_DESTROY | |
| if destroy := (current_time > planned_removal): | |
| planned_removal = None | |
| message = ( | |
| f"queue_id: {queue_id}, planned removal: {'now' if destroy else planned_removal}, " | |
| f"arrived: {arrived_time}, size: {message_size}, sender: {sender}, recipients: {recipients}" | |
| ) | |
| if not destroy: | |
| if queue_id not in notified: | |
| _LOGGER.info(message) | |
| message_details = get_mail_item_details(queue_id) | |
| send_notification_mail(f"{message}\n\n{message_details}", sender, recipients) | |
| _LOGGER.info(f"Notification sent to {sender}") | |
| notified.add(queue_id) | |
| else: | |
| if remove_from_hold_queue(queue_id) and queue_id in notified: | |
| notified.remove(queue_id) | |
| # Save the notified queue id's for next run | |
| try: | |
| with open(notified_file, "w") as json_file: | |
| json.dump({"queue_ids": list(notified)}, json_file) | |
| except Exception as exc: | |
| _LOGGER.exception("Unexpected error writing notified ids: %s", exc) | |
| raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment