Skip to content

Instantly share code, notes, and snippets.

@KaminariOS
Created February 21, 2026 16:22
Show Gist options
  • Select an option

  • Save KaminariOS/369e8ab9ffca9c426513ae78ec43413c to your computer and use it in GitHub Desktop.

Select an option

Save KaminariOS/369e8ab9ffca9c426513ae78ec43413c to your computer and use it in GitHub Desktop.
# Codex `notify.py` with `ntfy` (topic: codex-example)
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.13"
# dependencies = []
# ///
import argparse
import base64
import json
import os
import select
import subprocess
import sys
import tempfile
import time
import urllib.parse
DEFAULT_TIMEOUT_SECONDS = 600
DEFAULT_NTFY_TOPIC = "codex-example"
DEFAULT_NTFY_TITLE = "Notification"
NTFY_TITLE_MAX_LEN = 120
CLIENT_INFO = {
"name": "notify_reply",
"title": "Codex Notify Reply",
"version": "0.1.0",
}
def _run_command_capture_text(cmd):
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=2,
check=False,
)
except Exception:
return None
if result.returncode != 0:
return None
output = result.stdout.strip()
if not output:
return None
return output
def _resolve_tmux_session_name():
if os.getenv("CODEX_NOTIFY_DISABLE_TMUX_SESSION") == "1":
return None
override = (os.getenv("CODEX_NOTIFY_TMUX_SESSION") or "").strip()
if override:
return override
if not os.getenv("TMUX"):
return None
tmux_pane = os.getenv("TMUX_PANE")
if tmux_pane:
pane_session = _run_command_capture_text(
["tmux", "display-message", "-p", "-t", tmux_pane, "#{session_name}"]
)
if pane_session:
return pane_session
return _run_command_capture_text(["tmux", "display-message", "-p", "#{session_name}"])
def _tmux_open_command_lines(session_name):
import shlex
quoted = shlex.quote(session_name)
return [
f"tmux attach-session -t {quoted}",
f"tmux switch-client -t {quoted}",
]
def _resolve_ntfy_config():
server = (
os.getenv("CODEX_NOTIFY_NTFY_SERVER")
or os.getenv("NTFY_SERVER")
or "https://ntfy.sh"
).rstrip("/")
topic = (
os.getenv("CODEX_NOTIFY_NTFY_TOPIC")
or os.getenv("NTFY_TOPIC")
or DEFAULT_NTFY_TOPIC
)
if not topic:
return None
return {
"server": server,
"topic": topic,
"token": os.getenv("CODEX_NOTIFY_NTFY_TOKEN") or os.getenv("NTFY_TOKEN"),
"username": os.getenv("CODEX_NOTIFY_NTFY_USERNAME") or os.getenv("NTFY_USERNAME"),
"password": os.getenv("CODEX_NOTIFY_NTFY_PASSWORD") or os.getenv("NTFY_PASSWORD"),
"tags": os.getenv("CODEX_NOTIFY_NTFY_TAGS", "robot"),
"priority": os.getenv("CODEX_NOTIFY_NTFY_PRIORITY", "high"),
}
def _clip_title(text):
title = (text or "").strip().replace("\r", " ").replace("\n", " ")
title = " ".join(title.split())
if not title:
return DEFAULT_NTFY_TITLE
if len(title) > NTFY_TITLE_MAX_LEN:
return f"{title[:NTFY_TITLE_MAX_LEN - 1].rstrip()}…"
return title
def _fallback_ntfy_title(notification):
event = str(notification.get("event", "unknown")).strip()
last_message = str(notification.get("last-assistant-message", "")).strip()
if last_message:
first_line = next((ln.strip() for ln in last_message.splitlines() if ln.strip()), "")
if first_line:
return _clip_title(first_line)
return _clip_title(f"{event}: update")
def _summarize_ntfy_title(notification):
# Allow opting out if title generation is too slow or unavailable.
if os.getenv("CODEX_NOTIFY_DISABLE_TITLE_SUMMARY") == "1":
return _fallback_ntfy_title(notification)
last_message = str(notification.get("last-assistant-message", "")).strip()
event = str(notification.get("event", "unknown")).strip()
turn_id = str(notification.get("turn-id", "unknown")).strip()
cwd = str(notification.get("cwd", "unknown")).strip()
if len(last_message) > 1200:
last_message = f"{last_message[:1200]}..."
prompt = (
"Write one short notification title (max 80 chars) for this update.\n"
"Return title text only. No markdown, no quotes, no punctuation at the end.\n\n"
f"event: {event}\n"
f"turn: {turn_id}\n"
f"cwd: {cwd}\n"
"assistant_message:\n"
f"{last_message}\n"
)
try:
with tempfile.NamedTemporaryFile(prefix="codex-title-", suffix=".txt") as output_file:
cmd = [
"codex",
"exec",
"--skip-git-repo-check",
"--color",
"never",
"--output-last-message",
output_file.name,
"-c",
"notify=[]",
"-c",
'model_reasoning_effort="low"',
"-",
]
result = subprocess.run(
cmd,
input=prompt,
text=True,
capture_output=True,
timeout=20,
)
if result.returncode == 0:
with open(output_file.name, "r", encoding="utf-8") as f:
title = f.read().strip()
if title:
return _clip_title(title)
except Exception:
pass
return _fallback_ntfy_title(notification)
def _build_ntfy_body(notification):
thread_id = str(notification.get("thread-id", "unknown"))
event = str(notification.get("event", "unknown"))
cwd = str(notification.get("cwd", "unknown"))
turn_id = str(notification.get("turn-id", "unknown"))
last_message = str(notification.get("last-assistant-message", "")).strip()
input_messages = notification.get("input-messages", [])
table_fields = {
k: v
for k, v in notification.items()
if k
not in (
"thread-id",
"event",
"turn-id",
"cwd",
"last-assistant-message",
"input-messages",
)
}
if len(last_message) > 3000:
last_message = f"{last_message[:3000]}..."
lines = []
if last_message:
lines.extend(["", "## Assistant Message", "", last_message])
if input_messages:
lines.extend(["", "## User Prompt", ""])
lines.append(f"- {str(input_messages[-1])}")
lines.extend(
[
"",
"## Metadata",
f"- Thread: `{thread_id}`",
f"- Event: `{event}`",
f"- Turn: `{turn_id}`",
f"- CWD: `{cwd}`",
]
)
for key, value in table_fields.items():
if isinstance(value, (dict, list)):
pretty = json.dumps(value, ensure_ascii=False, indent=2)
lines.extend(["", f"### {key}", "```json", pretty, "```"])
else:
lines.append(f"- {key}: `{value}`")
lines.extend(
[
"",
"## Resume in Codex",
"",
"```bash",
f"codex resume {thread_id}",
"```",
]
)
return "\n".join(lines).strip() + "\n"
def send_notification_ntfy(notification):
cfg = _resolve_ntfy_config()
if cfg is None:
return
title = _summarize_ntfy_title(notification)
body = _build_ntfy_body(notification).encode("utf-8")
topic = urllib.parse.quote(cfg["topic"], safe="")
url = f"{cfg['server']}/{topic}"
cmd = [
"curl",
"-fsS",
"--max-time",
"10",
"-X",
"POST",
"-H",
"Content-Type: text/markdown; charset=utf-8",
"-H",
"X-Markdown: yes",
"-H",
"Markdown: yes",
"-H",
f"Title: {title}",
"-H",
f"Tags: {cfg['tags']}",
"-H",
f"Priority: {cfg['priority']}",
"--data-binary",
"@-",
url,
]
if cfg["token"]:
cmd.extend(["-H", f"Authorization: Bearer {cfg['token']}"])
elif cfg["username"] and cfg["password"]:
raw = f"{cfg['username']}:{cfg['password']}".encode("utf-8")
cmd.extend(["-H", f"Authorization: Basic {base64.b64encode(raw).decode('ascii')}"])
result = subprocess.run(cmd, input=body, capture_output=True)
if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="replace").strip()
if stderr:
raise RuntimeError(f"ntfy curl failed: {stderr}")
raise RuntimeError(f"ntfy curl failed with exit code {result.returncode}")
thread_id = str(notification.get("thread-id", "")).strip()
if thread_id:
resume_body = f"codex resume {thread_id}".encode("utf-8")
resume_cmd = cmd.copy()
resume_result = subprocess.run(resume_cmd, input=resume_body, capture_output=True)
if resume_result.returncode != 0:
stderr = resume_result.stderr.decode("utf-8", errors="replace").strip()
if stderr:
raise RuntimeError(f"ntfy curl (resume command) failed: {stderr}")
raise RuntimeError(
f"ntfy curl (resume command) failed with exit code {resume_result.returncode}"
)
class AppServerClient:
def __init__(self, codex_bin, timeout_seconds):
self.timeout_seconds = timeout_seconds
self.proc = subprocess.Popen(
[codex_bin, "app-server"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
)
if self.proc.stdin is None or self.proc.stdout is None:
raise RuntimeError("failed to acquire codex app-server stdio")
def close(self):
if self.proc.poll() is not None:
return
self.proc.terminate()
try:
self.proc.wait(timeout=1.5)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait(timeout=1.5)
def _write(self, payload):
raw = json.dumps(payload, separators=(",", ":"))
self.proc.stdin.write(raw + "\n")
self.proc.stdin.flush()
def _read_message(self, deadline):
while True:
if time.monotonic() >= deadline:
raise TimeoutError("timed out waiting for app-server response")
if self.proc.poll() is not None:
raise RuntimeError(
f"codex app-server exited unexpectedly with code {self.proc.returncode}"
)
timeout = max(0.0, deadline - time.monotonic())
ready, _, _ = select.select([self.proc.stdout], [], [], timeout)
if not ready:
continue
line = self.proc.stdout.readline()
if not line:
raise RuntimeError("codex app-server closed stdout unexpectedly")
line = line.strip()
if not line:
continue
try:
return json.loads(line)
except json.JSONDecodeError as exc:
raise RuntimeError(f"invalid JSON from app-server: {line}") from exc
def _send_error_response(self, request_id, message):
self._write({"id": request_id, "error": {"code": -32601, "message": message}})
def _handle_server_request(self, msg):
request_id = msg.get("id")
method = msg.get("method")
if request_id is None or not method:
return
if method == "item/commandExecution/requestApproval":
self._write({"id": request_id, "result": {"decision": "decline"}})
return
if method == "item/fileChange/requestApproval":
self._write({"id": request_id, "result": {"decision": "decline"}})
return
if method == "execCommandApproval":
self._write({"id": request_id, "result": {"decision": "denied"}})
return
if method == "applyPatchApproval":
self._write({"id": request_id, "result": {"decision": "denied"}})
return
if method == "item/tool/requestUserInput":
self._write({"id": request_id, "result": {"answers": {}}})
return
if method == "item/tool/call":
self._write(
{
"id": request_id,
"result": {"success": False, "contentItems": []},
}
)
return
self._send_error_response(
request_id,
f"notify.py does not support server request method `{method}`",
)
def _wait_for_response(self, request_id, deadline):
while True:
msg = self._read_message(deadline)
if isinstance(msg, dict) and msg.get("id") == request_id:
if "result" in msg:
return msg["result"]
if "error" in msg:
raise RuntimeError(f"request failed: {msg['error']}")
raise RuntimeError(f"malformed response: {msg}")
if isinstance(msg, dict) and "method" in msg and "id" in msg:
self._handle_server_request(msg)
def _default_deadline(self):
return time.monotonic() + self.timeout_seconds
def initialize(self):
req_id = 1
self._write(
{
"id": req_id,
"method": "initialize",
"params": {"clientInfo": CLIENT_INFO},
}
)
self._wait_for_response(req_id, self._default_deadline())
self._write({"method": "initialized", "params": {}})
def resume_thread(self, thread_id):
req_id = 2
self._write(
{
"id": req_id,
"method": "thread/resume",
"params": {"threadId": thread_id},
}
)
self._wait_for_response(req_id, self._default_deadline())
def start_turn(self, thread_id, message):
req_id = 3
self._write(
{
"id": req_id,
"method": "turn/start",
"params": {
"threadId": thread_id,
"input": [{"type": "text", "text": message}],
},
}
)
result = self._wait_for_response(req_id, self._default_deadline())
turn = result.get("turn", {})
turn_id = turn.get("id")
if not turn_id:
raise RuntimeError(f"turn/start response missing turn id: {result}")
return turn_id
def wait_for_turn_completion(self, turn_id):
deadline = self._default_deadline()
deltas_by_item = {}
completed_messages = {}
while True:
msg = self._read_message(deadline)
if not isinstance(msg, dict):
continue
if "method" in msg and "id" in msg:
self._handle_server_request(msg)
continue
method = msg.get("method")
params = msg.get("params", {})
if not isinstance(params, dict):
continue
if method == "item/agentMessage/delta":
item_id = params.get("itemId")
delta = params.get("delta")
if isinstance(item_id, str) and isinstance(delta, str):
deltas_by_item[item_id] = deltas_by_item.get(item_id, "") + delta
continue
if method == "item/completed":
item = params.get("item", {})
if (
isinstance(item, dict)
and item.get("type") == "agentMessage"
and isinstance(item.get("id"), str)
and isinstance(item.get("text"), str)
):
completed_messages[item["id"]] = item["text"]
continue
if method == "turn/completed":
turn = params.get("turn", {})
if not isinstance(turn, dict) or turn.get("id") != turn_id:
continue
status = turn.get("status", "unknown")
error = turn.get("error")
assistant_text = ""
if completed_messages:
assistant_text = next(iter(completed_messages.values()))
elif deltas_by_item:
assistant_text = "".join(deltas_by_item.values())
return {
"status": status,
"assistant_text": assistant_text,
"error": error,
}
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Codex notify hook: send email notifications and reply via app-server."
)
subparsers = parser.add_subparsers(dest="command", required=True)
reply = subparsers.add_parser(
"reply", help="Send a reply to an existing Codex thread via app-server."
)
reply.add_argument(
"--thread-id",
required=True,
help="Codex thread id from notification payload (UUID).",
)
message_group = reply.add_mutually_exclusive_group(required=True)
message_group.add_argument("--message", help="Reply text to send.")
message_group.add_argument(
"--message-file", help="Path to a UTF-8 text file containing the reply."
)
message_group.add_argument(
"--message-stdin", action="store_true", help="Read reply text from stdin."
)
reply.add_argument(
"--codex-bin",
default="codex",
help="Path to codex binary (default: codex in PATH).",
)
reply.add_argument(
"--timeout-seconds",
type=int,
default=DEFAULT_TIMEOUT_SECONDS,
help=f"Per-step timeout in seconds (default: {DEFAULT_TIMEOUT_SECONDS}).",
)
return parser.parse_args(argv)
def read_reply_text(args):
if args.message is not None:
text = args.message
elif args.message_file is not None:
with open(args.message_file, "r", encoding="utf-8") as f:
text = f.read()
elif args.message_stdin:
text = sys.stdin.read()
else:
raise RuntimeError("reply text source is missing")
text = text.strip()
if not text:
raise RuntimeError("reply text is empty")
return text
def handle_reply_mode(args):
reply_text = read_reply_text(args)
client = AppServerClient(args.codex_bin, args.timeout_seconds)
try:
client.initialize()
client.resume_thread(args.thread_id)
turn_id = client.start_turn(args.thread_id, reply_text)
result = client.wait_for_turn_completion(turn_id)
finally:
client.close()
status = result.get("status", "unknown")
print(f"thread={args.thread_id}")
print(f"turn={turn_id}")
print(f"status={status}")
assistant_text = result.get("assistant_text") or ""
if assistant_text:
print("\nassistant_message:")
print(assistant_text)
if status == "failed":
error = result.get("error")
if error:
print(f"\nerror={error}")
return 1
return 0
def handle_legacy_notify_mode(raw_notification):
try:
notification = json.loads(raw_notification)
except json.JSONDecodeError:
print("Invalid JSON input")
return 1
try:
send_notification_ntfy(notification)
return 0
except Exception as exc:
print(f"Warning: notification delivery failed for ntfy: {exc}", file=sys.stderr)
return 1
def main():
# Backward-compatible path used by Codex notify hook:
# notify.py '<NOTIFICATION_JSON>'
if len(sys.argv) == 2 and not sys.argv[1].startswith("-"):
return handle_legacy_notify_mode(sys.argv[1])
try:
args = parse_args(sys.argv[1:])
if args.command == "reply":
return handle_reply_mode(args)
print("Unknown command")
return 1
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment