Skip to content

Instantly share code, notes, and snippets.

@myui
Last active January 20, 2026 03:06
Show Gist options
  • Select an option

  • Save myui/7cd6f993a9aee6a468ce32640cc37a68 to your computer and use it in GitHub Desktop.

Select an option

Save myui/7cd6f993a9aee6a468ce32640cc37a68 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple
# ----------------------------
# History model
# ----------------------------
@dataclass(frozen=True)
class Event:
type: str
payload: Dict[str, Any]
class Mode(str, Enum):
LIVE = "live"
REPLAY = "replay"
class HistoryMismatch(RuntimeError):
pass
class History:
"""
Minimal event log.
- LIVE: append events
- REPLAY: consume events in order
"""
def __init__(self, events: Optional[List[Event]] = None) -> None:
self._events: List[Event] = list(events or [])
self._pos: int = 0
def events(self) -> List[Event]:
return list(self._events)
def append(self, event: Event) -> None:
self._events.append(event)
def next(self) -> Event:
if self._pos >= len(self._events):
raise HistoryMismatch("history exhausted")
e = self._events[self._pos]
self._pos += 1
return e
# ----------------------------
# Runtime (like a tiny Temporal-like runner)
# ----------------------------
class Runtime:
def __init__(self, mode: Mode, history: History) -> None:
self.mode = mode
self.history = history
def start_workflow(self, workflow_name: str, input: Dict[str, Any]) -> None:
if self.mode == Mode.LIVE:
self.history.append(Event("WorkflowStarted", {"name": workflow_name, "input": input}))
else:
e = self.history.next()
self._assert_event(e, "WorkflowStarted", name=workflow_name)
def complete_workflow(self, workflow_name: str, result: Any) -> None:
if self.mode == Mode.LIVE:
self.history.append(Event("WorkflowCompleted", {"name": workflow_name, "result": result}))
else:
e = self.history.next()
self._assert_event(e, "WorkflowCompleted", name=workflow_name)
def _assert_event(self, event: Event, type_: str, **expected_payload: Any) -> None:
if event.type != type_:
raise HistoryMismatch(f"expected {type_}, got {event.type}: {event}")
for k, v in expected_payload.items():
if event.payload.get(k) != v:
raise HistoryMismatch(
f"history mismatch for {type_}.{k}: expected {v!r}, got {event.payload.get(k)!r}"
)
# ----------------------------
# Decorator: @activity
# ----------------------------
def activity(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Wrap an async function so that:
- LIVE: execute real function, then record ActivityCompleted(name,args,result)
- REPLAY: read next ActivityCompleted and return recorded result
"""
def deco(fn: Callable[..., Any]) -> Callable[..., Any]:
if not asyncio.iscoroutinefunction(fn):
raise TypeError("@activity can wrap async functions only")
async def wrapper(rt: Runtime, *args: Any) -> Any:
if rt.mode == Mode.REPLAY:
e = rt.history.next()
if e.type != "ActivityCompleted":
raise HistoryMismatch(f"expected ActivityCompleted, got {e.type}: {e}")
if e.payload.get("name") != name:
raise HistoryMismatch(f"activity name mismatch: expected {name}, got {e.payload.get('name')}")
if tuple(e.payload.get("args", ())) != args:
raise HistoryMismatch(f"activity args mismatch: expected {args}, got {tuple(e.payload.get('args', ()))!r}")
return e.payload["result"]
# LIVE
result = await fn(rt, *args)
rt.history.append(Event("ActivityCompleted", {"name": name, "args": args, "result": result}))
return result
wrapper.__name__ = fn.__name__
return wrapper
return deco
# ----------------------------
# Example "ask" activity
# ----------------------------
@activity(name="ask")
async def ask(rt: Runtime, q: str) -> str:
# In real life: LLM call / API / DB query, etc.
# Here: deterministic placeholder to illustrate.
mapping = {
"Is it an animal?": "yes",
"Is it a pet?": "yes",
"Is it a cat?": "yes",
}
await asyncio.sleep(0) # keep it async
return mapping.get(q, "unknown")
# ----------------------------
# Workflow code
# ----------------------------
def decide_next_question(questions: List[Tuple[str, str]]) -> str:
# Toy policy based on how many QAs we already have.
n = len(questions)
if n == 0:
return "Is it an animal?"
if n == 1:
return "Is it a pet?"
return "Is it a cat?"
def check_solved(questions: List[Tuple[str, str]]) -> bool:
return len(questions) >= 3 and questions[-1] == ("Is it a cat?", "yes")
async def twenty_questions_workflow(rt: Runtime) -> List[Tuple[str, str]]:
rt.start_workflow("twenty_questions", {"secret": "cat"})
questions: List[Tuple[str, str]] = []
solved = False
while not solved:
q = decide_next_question(questions)
a = await ask(rt, q) # <-- @activity wrapper controls LIVE vs REPLAY
questions.append((q, a))
solved = check_solved(questions)
rt.complete_workflow("twenty_questions", questions)
return questions
# ----------------------------
# Demo: LIVE then REPLAY
# ----------------------------
async def main() -> None:
# 1) LIVE run: execute activities and record history
live_history = History()
live_rt = Runtime(mode=Mode.LIVE, history=live_history)
live_result = await twenty_questions_workflow(live_rt)
print("LIVE result:", live_result)
print("LIVE history:")
for e in live_history.events():
print(" ", e)
# 2) REPLAY run: do NOT execute activities; consume history and reconstruct state
replay_history = History(events=live_history.events())
replay_rt = Runtime(mode=Mode.REPLAY, history=replay_history)
replay_result = await twenty_questions_workflow(replay_rt)
print("\nREPLAY result:", replay_result)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment