Created
February 21, 2026 16:22
-
-
Save KaminariOS/369e8ab9ffca9c426513ae78ec43413c to your computer and use it in GitHub Desktop.
# Codex `notify.py` with `ntfy` (topic: codex-example)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [] | |
| # /// | |
| import argparse | |
| import base64 | |
| import json | |
| import os | |
| import select | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| import urllib.parse | |
| DEFAULT_TIMEOUT_SECONDS = 600 | |
| DEFAULT_NTFY_TOPIC = "codex-example" | |
| DEFAULT_NTFY_TITLE = "Notification" | |
| NTFY_TITLE_MAX_LEN = 120 | |
| CLIENT_INFO = { | |
| "name": "notify_reply", | |
| "title": "Codex Notify Reply", | |
| "version": "0.1.0", | |
| } | |
| def _run_command_capture_text(cmd): | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=2, | |
| check=False, | |
| ) | |
| except Exception: | |
| return None | |
| if result.returncode != 0: | |
| return None | |
| output = result.stdout.strip() | |
| if not output: | |
| return None | |
| return output | |
| def _resolve_tmux_session_name(): | |
| if os.getenv("CODEX_NOTIFY_DISABLE_TMUX_SESSION") == "1": | |
| return None | |
| override = (os.getenv("CODEX_NOTIFY_TMUX_SESSION") or "").strip() | |
| if override: | |
| return override | |
| if not os.getenv("TMUX"): | |
| return None | |
| tmux_pane = os.getenv("TMUX_PANE") | |
| if tmux_pane: | |
| pane_session = _run_command_capture_text( | |
| ["tmux", "display-message", "-p", "-t", tmux_pane, "#{session_name}"] | |
| ) | |
| if pane_session: | |
| return pane_session | |
| return _run_command_capture_text(["tmux", "display-message", "-p", "#{session_name}"]) | |
| def _tmux_open_command_lines(session_name): | |
| import shlex | |
| quoted = shlex.quote(session_name) | |
| return [ | |
| f"tmux attach-session -t {quoted}", | |
| f"tmux switch-client -t {quoted}", | |
| ] | |
| def _resolve_ntfy_config(): | |
| server = ( | |
| os.getenv("CODEX_NOTIFY_NTFY_SERVER") | |
| or os.getenv("NTFY_SERVER") | |
| or "https://ntfy.sh" | |
| ).rstrip("/") | |
| topic = ( | |
| os.getenv("CODEX_NOTIFY_NTFY_TOPIC") | |
| or os.getenv("NTFY_TOPIC") | |
| or DEFAULT_NTFY_TOPIC | |
| ) | |
| if not topic: | |
| return None | |
| return { | |
| "server": server, | |
| "topic": topic, | |
| "token": os.getenv("CODEX_NOTIFY_NTFY_TOKEN") or os.getenv("NTFY_TOKEN"), | |
| "username": os.getenv("CODEX_NOTIFY_NTFY_USERNAME") or os.getenv("NTFY_USERNAME"), | |
| "password": os.getenv("CODEX_NOTIFY_NTFY_PASSWORD") or os.getenv("NTFY_PASSWORD"), | |
| "tags": os.getenv("CODEX_NOTIFY_NTFY_TAGS", "robot"), | |
| "priority": os.getenv("CODEX_NOTIFY_NTFY_PRIORITY", "high"), | |
| } | |
| def _clip_title(text): | |
| title = (text or "").strip().replace("\r", " ").replace("\n", " ") | |
| title = " ".join(title.split()) | |
| if not title: | |
| return DEFAULT_NTFY_TITLE | |
| if len(title) > NTFY_TITLE_MAX_LEN: | |
| return f"{title[:NTFY_TITLE_MAX_LEN - 1].rstrip()}…" | |
| return title | |
| def _fallback_ntfy_title(notification): | |
| event = str(notification.get("event", "unknown")).strip() | |
| last_message = str(notification.get("last-assistant-message", "")).strip() | |
| if last_message: | |
| first_line = next((ln.strip() for ln in last_message.splitlines() if ln.strip()), "") | |
| if first_line: | |
| return _clip_title(first_line) | |
| return _clip_title(f"{event}: update") | |
| def _summarize_ntfy_title(notification): | |
| # Allow opting out if title generation is too slow or unavailable. | |
| if os.getenv("CODEX_NOTIFY_DISABLE_TITLE_SUMMARY") == "1": | |
| return _fallback_ntfy_title(notification) | |
| last_message = str(notification.get("last-assistant-message", "")).strip() | |
| event = str(notification.get("event", "unknown")).strip() | |
| turn_id = str(notification.get("turn-id", "unknown")).strip() | |
| cwd = str(notification.get("cwd", "unknown")).strip() | |
| if len(last_message) > 1200: | |
| last_message = f"{last_message[:1200]}..." | |
| prompt = ( | |
| "Write one short notification title (max 80 chars) for this update.\n" | |
| "Return title text only. No markdown, no quotes, no punctuation at the end.\n\n" | |
| f"event: {event}\n" | |
| f"turn: {turn_id}\n" | |
| f"cwd: {cwd}\n" | |
| "assistant_message:\n" | |
| f"{last_message}\n" | |
| ) | |
| try: | |
| with tempfile.NamedTemporaryFile(prefix="codex-title-", suffix=".txt") as output_file: | |
| cmd = [ | |
| "codex", | |
| "exec", | |
| "--skip-git-repo-check", | |
| "--color", | |
| "never", | |
| "--output-last-message", | |
| output_file.name, | |
| "-c", | |
| "notify=[]", | |
| "-c", | |
| 'model_reasoning_effort="low"', | |
| "-", | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| input=prompt, | |
| text=True, | |
| capture_output=True, | |
| timeout=20, | |
| ) | |
| if result.returncode == 0: | |
| with open(output_file.name, "r", encoding="utf-8") as f: | |
| title = f.read().strip() | |
| if title: | |
| return _clip_title(title) | |
| except Exception: | |
| pass | |
| return _fallback_ntfy_title(notification) | |
| def _build_ntfy_body(notification): | |
| thread_id = str(notification.get("thread-id", "unknown")) | |
| event = str(notification.get("event", "unknown")) | |
| cwd = str(notification.get("cwd", "unknown")) | |
| turn_id = str(notification.get("turn-id", "unknown")) | |
| last_message = str(notification.get("last-assistant-message", "")).strip() | |
| input_messages = notification.get("input-messages", []) | |
| table_fields = { | |
| k: v | |
| for k, v in notification.items() | |
| if k | |
| not in ( | |
| "thread-id", | |
| "event", | |
| "turn-id", | |
| "cwd", | |
| "last-assistant-message", | |
| "input-messages", | |
| ) | |
| } | |
| if len(last_message) > 3000: | |
| last_message = f"{last_message[:3000]}..." | |
| lines = [] | |
| if last_message: | |
| lines.extend(["", "## Assistant Message", "", last_message]) | |
| if input_messages: | |
| lines.extend(["", "## User Prompt", ""]) | |
| lines.append(f"- {str(input_messages[-1])}") | |
| lines.extend( | |
| [ | |
| "", | |
| "## Metadata", | |
| f"- Thread: `{thread_id}`", | |
| f"- Event: `{event}`", | |
| f"- Turn: `{turn_id}`", | |
| f"- CWD: `{cwd}`", | |
| ] | |
| ) | |
| for key, value in table_fields.items(): | |
| if isinstance(value, (dict, list)): | |
| pretty = json.dumps(value, ensure_ascii=False, indent=2) | |
| lines.extend(["", f"### {key}", "```json", pretty, "```"]) | |
| else: | |
| lines.append(f"- {key}: `{value}`") | |
| lines.extend( | |
| [ | |
| "", | |
| "## Resume in Codex", | |
| "", | |
| "```bash", | |
| f"codex resume {thread_id}", | |
| "```", | |
| ] | |
| ) | |
| return "\n".join(lines).strip() + "\n" | |
| def send_notification_ntfy(notification): | |
| cfg = _resolve_ntfy_config() | |
| if cfg is None: | |
| return | |
| title = _summarize_ntfy_title(notification) | |
| body = _build_ntfy_body(notification).encode("utf-8") | |
| topic = urllib.parse.quote(cfg["topic"], safe="") | |
| url = f"{cfg['server']}/{topic}" | |
| cmd = [ | |
| "curl", | |
| "-fsS", | |
| "--max-time", | |
| "10", | |
| "-X", | |
| "POST", | |
| "-H", | |
| "Content-Type: text/markdown; charset=utf-8", | |
| "-H", | |
| "X-Markdown: yes", | |
| "-H", | |
| "Markdown: yes", | |
| "-H", | |
| f"Title: {title}", | |
| "-H", | |
| f"Tags: {cfg['tags']}", | |
| "-H", | |
| f"Priority: {cfg['priority']}", | |
| "--data-binary", | |
| "@-", | |
| url, | |
| ] | |
| if cfg["token"]: | |
| cmd.extend(["-H", f"Authorization: Bearer {cfg['token']}"]) | |
| elif cfg["username"] and cfg["password"]: | |
| raw = f"{cfg['username']}:{cfg['password']}".encode("utf-8") | |
| cmd.extend(["-H", f"Authorization: Basic {base64.b64encode(raw).decode('ascii')}"]) | |
| result = subprocess.run(cmd, input=body, capture_output=True) | |
| if result.returncode != 0: | |
| stderr = result.stderr.decode("utf-8", errors="replace").strip() | |
| if stderr: | |
| raise RuntimeError(f"ntfy curl failed: {stderr}") | |
| raise RuntimeError(f"ntfy curl failed with exit code {result.returncode}") | |
| thread_id = str(notification.get("thread-id", "")).strip() | |
| if thread_id: | |
| resume_body = f"codex resume {thread_id}".encode("utf-8") | |
| resume_cmd = cmd.copy() | |
| resume_result = subprocess.run(resume_cmd, input=resume_body, capture_output=True) | |
| if resume_result.returncode != 0: | |
| stderr = resume_result.stderr.decode("utf-8", errors="replace").strip() | |
| if stderr: | |
| raise RuntimeError(f"ntfy curl (resume command) failed: {stderr}") | |
| raise RuntimeError( | |
| f"ntfy curl (resume command) failed with exit code {resume_result.returncode}" | |
| ) | |
| class AppServerClient: | |
| def __init__(self, codex_bin, timeout_seconds): | |
| self.timeout_seconds = timeout_seconds | |
| self.proc = subprocess.Popen( | |
| [codex_bin, "app-server"], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.DEVNULL, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| if self.proc.stdin is None or self.proc.stdout is None: | |
| raise RuntimeError("failed to acquire codex app-server stdio") | |
| def close(self): | |
| if self.proc.poll() is not None: | |
| return | |
| self.proc.terminate() | |
| try: | |
| self.proc.wait(timeout=1.5) | |
| except subprocess.TimeoutExpired: | |
| self.proc.kill() | |
| self.proc.wait(timeout=1.5) | |
| def _write(self, payload): | |
| raw = json.dumps(payload, separators=(",", ":")) | |
| self.proc.stdin.write(raw + "\n") | |
| self.proc.stdin.flush() | |
| def _read_message(self, deadline): | |
| while True: | |
| if time.monotonic() >= deadline: | |
| raise TimeoutError("timed out waiting for app-server response") | |
| if self.proc.poll() is not None: | |
| raise RuntimeError( | |
| f"codex app-server exited unexpectedly with code {self.proc.returncode}" | |
| ) | |
| timeout = max(0.0, deadline - time.monotonic()) | |
| ready, _, _ = select.select([self.proc.stdout], [], [], timeout) | |
| if not ready: | |
| continue | |
| line = self.proc.stdout.readline() | |
| if not line: | |
| raise RuntimeError("codex app-server closed stdout unexpectedly") | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| return json.loads(line) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"invalid JSON from app-server: {line}") from exc | |
| def _send_error_response(self, request_id, message): | |
| self._write({"id": request_id, "error": {"code": -32601, "message": message}}) | |
| def _handle_server_request(self, msg): | |
| request_id = msg.get("id") | |
| method = msg.get("method") | |
| if request_id is None or not method: | |
| return | |
| if method == "item/commandExecution/requestApproval": | |
| self._write({"id": request_id, "result": {"decision": "decline"}}) | |
| return | |
| if method == "item/fileChange/requestApproval": | |
| self._write({"id": request_id, "result": {"decision": "decline"}}) | |
| return | |
| if method == "execCommandApproval": | |
| self._write({"id": request_id, "result": {"decision": "denied"}}) | |
| return | |
| if method == "applyPatchApproval": | |
| self._write({"id": request_id, "result": {"decision": "denied"}}) | |
| return | |
| if method == "item/tool/requestUserInput": | |
| self._write({"id": request_id, "result": {"answers": {}}}) | |
| return | |
| if method == "item/tool/call": | |
| self._write( | |
| { | |
| "id": request_id, | |
| "result": {"success": False, "contentItems": []}, | |
| } | |
| ) | |
| return | |
| self._send_error_response( | |
| request_id, | |
| f"notify.py does not support server request method `{method}`", | |
| ) | |
| def _wait_for_response(self, request_id, deadline): | |
| while True: | |
| msg = self._read_message(deadline) | |
| if isinstance(msg, dict) and msg.get("id") == request_id: | |
| if "result" in msg: | |
| return msg["result"] | |
| if "error" in msg: | |
| raise RuntimeError(f"request failed: {msg['error']}") | |
| raise RuntimeError(f"malformed response: {msg}") | |
| if isinstance(msg, dict) and "method" in msg and "id" in msg: | |
| self._handle_server_request(msg) | |
| def _default_deadline(self): | |
| return time.monotonic() + self.timeout_seconds | |
| def initialize(self): | |
| req_id = 1 | |
| self._write( | |
| { | |
| "id": req_id, | |
| "method": "initialize", | |
| "params": {"clientInfo": CLIENT_INFO}, | |
| } | |
| ) | |
| self._wait_for_response(req_id, self._default_deadline()) | |
| self._write({"method": "initialized", "params": {}}) | |
| def resume_thread(self, thread_id): | |
| req_id = 2 | |
| self._write( | |
| { | |
| "id": req_id, | |
| "method": "thread/resume", | |
| "params": {"threadId": thread_id}, | |
| } | |
| ) | |
| self._wait_for_response(req_id, self._default_deadline()) | |
| def start_turn(self, thread_id, message): | |
| req_id = 3 | |
| self._write( | |
| { | |
| "id": req_id, | |
| "method": "turn/start", | |
| "params": { | |
| "threadId": thread_id, | |
| "input": [{"type": "text", "text": message}], | |
| }, | |
| } | |
| ) | |
| result = self._wait_for_response(req_id, self._default_deadline()) | |
| turn = result.get("turn", {}) | |
| turn_id = turn.get("id") | |
| if not turn_id: | |
| raise RuntimeError(f"turn/start response missing turn id: {result}") | |
| return turn_id | |
| def wait_for_turn_completion(self, turn_id): | |
| deadline = self._default_deadline() | |
| deltas_by_item = {} | |
| completed_messages = {} | |
| while True: | |
| msg = self._read_message(deadline) | |
| if not isinstance(msg, dict): | |
| continue | |
| if "method" in msg and "id" in msg: | |
| self._handle_server_request(msg) | |
| continue | |
| method = msg.get("method") | |
| params = msg.get("params", {}) | |
| if not isinstance(params, dict): | |
| continue | |
| if method == "item/agentMessage/delta": | |
| item_id = params.get("itemId") | |
| delta = params.get("delta") | |
| if isinstance(item_id, str) and isinstance(delta, str): | |
| deltas_by_item[item_id] = deltas_by_item.get(item_id, "") + delta | |
| continue | |
| if method == "item/completed": | |
| item = params.get("item", {}) | |
| if ( | |
| isinstance(item, dict) | |
| and item.get("type") == "agentMessage" | |
| and isinstance(item.get("id"), str) | |
| and isinstance(item.get("text"), str) | |
| ): | |
| completed_messages[item["id"]] = item["text"] | |
| continue | |
| if method == "turn/completed": | |
| turn = params.get("turn", {}) | |
| if not isinstance(turn, dict) or turn.get("id") != turn_id: | |
| continue | |
| status = turn.get("status", "unknown") | |
| error = turn.get("error") | |
| assistant_text = "" | |
| if completed_messages: | |
| assistant_text = next(iter(completed_messages.values())) | |
| elif deltas_by_item: | |
| assistant_text = "".join(deltas_by_item.values()) | |
| return { | |
| "status": status, | |
| "assistant_text": assistant_text, | |
| "error": error, | |
| } | |
| def parse_args(argv): | |
| parser = argparse.ArgumentParser( | |
| description="Codex notify hook: send email notifications and reply via app-server." | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| reply = subparsers.add_parser( | |
| "reply", help="Send a reply to an existing Codex thread via app-server." | |
| ) | |
| reply.add_argument( | |
| "--thread-id", | |
| required=True, | |
| help="Codex thread id from notification payload (UUID).", | |
| ) | |
| message_group = reply.add_mutually_exclusive_group(required=True) | |
| message_group.add_argument("--message", help="Reply text to send.") | |
| message_group.add_argument( | |
| "--message-file", help="Path to a UTF-8 text file containing the reply." | |
| ) | |
| message_group.add_argument( | |
| "--message-stdin", action="store_true", help="Read reply text from stdin." | |
| ) | |
| reply.add_argument( | |
| "--codex-bin", | |
| default="codex", | |
| help="Path to codex binary (default: codex in PATH).", | |
| ) | |
| reply.add_argument( | |
| "--timeout-seconds", | |
| type=int, | |
| default=DEFAULT_TIMEOUT_SECONDS, | |
| help=f"Per-step timeout in seconds (default: {DEFAULT_TIMEOUT_SECONDS}).", | |
| ) | |
| return parser.parse_args(argv) | |
| def read_reply_text(args): | |
| if args.message is not None: | |
| text = args.message | |
| elif args.message_file is not None: | |
| with open(args.message_file, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| elif args.message_stdin: | |
| text = sys.stdin.read() | |
| else: | |
| raise RuntimeError("reply text source is missing") | |
| text = text.strip() | |
| if not text: | |
| raise RuntimeError("reply text is empty") | |
| return text | |
| def handle_reply_mode(args): | |
| reply_text = read_reply_text(args) | |
| client = AppServerClient(args.codex_bin, args.timeout_seconds) | |
| try: | |
| client.initialize() | |
| client.resume_thread(args.thread_id) | |
| turn_id = client.start_turn(args.thread_id, reply_text) | |
| result = client.wait_for_turn_completion(turn_id) | |
| finally: | |
| client.close() | |
| status = result.get("status", "unknown") | |
| print(f"thread={args.thread_id}") | |
| print(f"turn={turn_id}") | |
| print(f"status={status}") | |
| assistant_text = result.get("assistant_text") or "" | |
| if assistant_text: | |
| print("\nassistant_message:") | |
| print(assistant_text) | |
| if status == "failed": | |
| error = result.get("error") | |
| if error: | |
| print(f"\nerror={error}") | |
| return 1 | |
| return 0 | |
| def handle_legacy_notify_mode(raw_notification): | |
| try: | |
| notification = json.loads(raw_notification) | |
| except json.JSONDecodeError: | |
| print("Invalid JSON input") | |
| return 1 | |
| try: | |
| send_notification_ntfy(notification) | |
| return 0 | |
| except Exception as exc: | |
| print(f"Warning: notification delivery failed for ntfy: {exc}", file=sys.stderr) | |
| return 1 | |
| def main(): | |
| # Backward-compatible path used by Codex notify hook: | |
| # notify.py '<NOTIFICATION_JSON>' | |
| if len(sys.argv) == 2 and not sys.argv[1].startswith("-"): | |
| return handle_legacy_notify_mode(sys.argv[1]) | |
| try: | |
| args = parse_args(sys.argv[1:]) | |
| if args.command == "reply": | |
| return handle_reply_mode(args) | |
| print("Unknown command") | |
| return 1 | |
| except Exception as exc: | |
| print(f"Error: {exc}", file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment