Skip to content

Instantly share code, notes, and snippets.

@syuraj
Created March 4, 2026 20:48
Show Gist options
  • Select an option

  • Save syuraj/634e502dcb3c200e3d412989f3a5eba1 to your computer and use it in GitHub Desktop.

Select an option

Save syuraj/634e502dcb3c200e3d412989f3a5eba1 to your computer and use it in GitHub Desktop.
import json
import logging
log = logging.getLogger()
log.setLevel(logging.INFO)
def _iter_messages(event):
"""
Supports these trigger shapes:
1) Direct S3 -> Lambda (Records[])
2) SNS -> Lambda (Records[].Sns.Message)
3) SQS -> Lambda (Records[].body) where body may be:
- raw JSON
- SNS-wrapped JSON (body is JSON with "Message")
"""
# Direct invoke / EventBridge / S3 direct
if isinstance(event, dict) and "Records" not in event:
yield event
return
records = (event or {}).get("Records") or []
for r in records:
# SNS trigger
if "Sns" in r and "Message" in r["Sns"]:
msg = r["Sns"]["Message"]
yield msg
continue
# SQS trigger
if "body" in r:
body = r["body"]
yield body
continue
# S3 direct trigger
yield r
def _json_load_maybe(x):
if isinstance(x, (dict, list)):
return x
if not isinstance(x, str):
return {"raw": repr(x)}
try:
return json.loads(x)
except Exception:
return {"raw": x}
def lambda_handler(event, context):
log.info("raw_event=%s", json.dumps(event, default=str))
for msg in _iter_messages(event):
obj = _json_load_maybe(msg)
# If SQS body is SNS-wrapped, unwrap once
if isinstance(obj, dict) and "Message" in obj and isinstance(obj["Message"], str):
inner = _json_load_maybe(obj["Message"])
obj = inner
# Print replication-failure-ish payloads plainly
# (Exact fields vary by delivery method/config; we log everything.)
log.info("notification=%s", json.dumps(obj, default=str))
# Optional: highlight likely replication failure event names
# Common places: obj["detail"]["eventName"], obj["EventName"], etc.
event_name = None
if isinstance(obj, dict):
event_name = (
obj.get("EventName")
or obj.get("eventName")
or (obj.get("detail") or {}).get("eventName")
or (obj.get("detail") or {}).get("eventName")
)
if event_name:
log.info("event_name=%s", event_name)
return {"ok": True}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment