Last active
January 20, 2026 03:06
-
-
Save myui/7cd6f993a9aee6a468ce32640cc37a68 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import asyncio | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Any, Callable, Dict, List, Optional, Tuple | |
| # ---------------------------- | |
| # History model | |
| # ---------------------------- | |
| @dataclass(frozen=True) | |
| class Event: | |
| type: str | |
| payload: Dict[str, Any] | |
| class Mode(str, Enum): | |
| LIVE = "live" | |
| REPLAY = "replay" | |
| class HistoryMismatch(RuntimeError): | |
| pass | |
| class History: | |
| """ | |
| Minimal event log. | |
| - LIVE: append events | |
| - REPLAY: consume events in order | |
| """ | |
| def __init__(self, events: Optional[List[Event]] = None) -> None: | |
| self._events: List[Event] = list(events or []) | |
| self._pos: int = 0 | |
| def events(self) -> List[Event]: | |
| return list(self._events) | |
| def append(self, event: Event) -> None: | |
| self._events.append(event) | |
| def next(self) -> Event: | |
| if self._pos >= len(self._events): | |
| raise HistoryMismatch("history exhausted") | |
| e = self._events[self._pos] | |
| self._pos += 1 | |
| return e | |
| # ---------------------------- | |
| # Runtime (like a tiny Temporal-like runner) | |
| # ---------------------------- | |
| class Runtime: | |
| def __init__(self, mode: Mode, history: History) -> None: | |
| self.mode = mode | |
| self.history = history | |
| def start_workflow(self, workflow_name: str, input: Dict[str, Any]) -> None: | |
| if self.mode == Mode.LIVE: | |
| self.history.append(Event("WorkflowStarted", {"name": workflow_name, "input": input})) | |
| else: | |
| e = self.history.next() | |
| self._assert_event(e, "WorkflowStarted", name=workflow_name) | |
| def complete_workflow(self, workflow_name: str, result: Any) -> None: | |
| if self.mode == Mode.LIVE: | |
| self.history.append(Event("WorkflowCompleted", {"name": workflow_name, "result": result})) | |
| else: | |
| e = self.history.next() | |
| self._assert_event(e, "WorkflowCompleted", name=workflow_name) | |
| def _assert_event(self, event: Event, type_: str, **expected_payload: Any) -> None: | |
| if event.type != type_: | |
| raise HistoryMismatch(f"expected {type_}, got {event.type}: {event}") | |
| for k, v in expected_payload.items(): | |
| if event.payload.get(k) != v: | |
| raise HistoryMismatch( | |
| f"history mismatch for {type_}.{k}: expected {v!r}, got {event.payload.get(k)!r}" | |
| ) | |
| # ---------------------------- | |
| # Decorator: @activity | |
| # ---------------------------- | |
| def activity(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | |
| """ | |
| Wrap an async function so that: | |
| - LIVE: execute real function, then record ActivityCompleted(name,args,result) | |
| - REPLAY: read next ActivityCompleted and return recorded result | |
| """ | |
| def deco(fn: Callable[..., Any]) -> Callable[..., Any]: | |
| if not asyncio.iscoroutinefunction(fn): | |
| raise TypeError("@activity can wrap async functions only") | |
| async def wrapper(rt: Runtime, *args: Any) -> Any: | |
| if rt.mode == Mode.REPLAY: | |
| e = rt.history.next() | |
| if e.type != "ActivityCompleted": | |
| raise HistoryMismatch(f"expected ActivityCompleted, got {e.type}: {e}") | |
| if e.payload.get("name") != name: | |
| raise HistoryMismatch(f"activity name mismatch: expected {name}, got {e.payload.get('name')}") | |
| if tuple(e.payload.get("args", ())) != args: | |
| raise HistoryMismatch(f"activity args mismatch: expected {args}, got {tuple(e.payload.get('args', ()))!r}") | |
| return e.payload["result"] | |
| # LIVE | |
| result = await fn(rt, *args) | |
| rt.history.append(Event("ActivityCompleted", {"name": name, "args": args, "result": result})) | |
| return result | |
| wrapper.__name__ = fn.__name__ | |
| return wrapper | |
| return deco | |
| # ---------------------------- | |
| # Example "ask" activity | |
| # ---------------------------- | |
| @activity(name="ask") | |
| async def ask(rt: Runtime, q: str) -> str: | |
| # In real life: LLM call / API / DB query, etc. | |
| # Here: deterministic placeholder to illustrate. | |
| mapping = { | |
| "Is it an animal?": "yes", | |
| "Is it a pet?": "yes", | |
| "Is it a cat?": "yes", | |
| } | |
| await asyncio.sleep(0) # keep it async | |
| return mapping.get(q, "unknown") | |
| # ---------------------------- | |
| # Workflow code | |
| # ---------------------------- | |
| def decide_next_question(questions: List[Tuple[str, str]]) -> str: | |
| # Toy policy based on how many QAs we already have. | |
| n = len(questions) | |
| if n == 0: | |
| return "Is it an animal?" | |
| if n == 1: | |
| return "Is it a pet?" | |
| return "Is it a cat?" | |
| def check_solved(questions: List[Tuple[str, str]]) -> bool: | |
| return len(questions) >= 3 and questions[-1] == ("Is it a cat?", "yes") | |
| async def twenty_questions_workflow(rt: Runtime) -> List[Tuple[str, str]]: | |
| rt.start_workflow("twenty_questions", {"secret": "cat"}) | |
| questions: List[Tuple[str, str]] = [] | |
| solved = False | |
| while not solved: | |
| q = decide_next_question(questions) | |
| a = await ask(rt, q) # <-- @activity wrapper controls LIVE vs REPLAY | |
| questions.append((q, a)) | |
| solved = check_solved(questions) | |
| rt.complete_workflow("twenty_questions", questions) | |
| return questions | |
| # ---------------------------- | |
| # Demo: LIVE then REPLAY | |
| # ---------------------------- | |
| async def main() -> None: | |
| # 1) LIVE run: execute activities and record history | |
| live_history = History() | |
| live_rt = Runtime(mode=Mode.LIVE, history=live_history) | |
| live_result = await twenty_questions_workflow(live_rt) | |
| print("LIVE result:", live_result) | |
| print("LIVE history:") | |
| for e in live_history.events(): | |
| print(" ", e) | |
| # 2) REPLAY run: do NOT execute activities; consume history and reconstruct state | |
| replay_history = History(events=live_history.events()) | |
| replay_rt = Runtime(mode=Mode.REPLAY, history=replay_history) | |
| replay_result = await twenty_questions_workflow(replay_rt) | |
| print("\nREPLAY result:", replay_result) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment