Last active
July 13, 2025 09:46
-
-
Save ascopes/e30ce5afb63ca9fdf72bc3177be596e7 to your computer and use it in GitHub Desktop.
Crude implementation of the actor model in Python using asyncio.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import abc | |
| import asyncio | |
| import dataclasses | |
| import traceback | |
| import typing as t | |
| import weakref | |
| @t.final | |
| @dataclasses.dataclass(frozen=True, slots=True, kw_only=True) | |
| class StopMessage: | |
| reason: str | |
| cause: BaseException | None = None | |
| type Message[M] = M | StopMessage | |
| class UnhandledActorException(Exception): | |
| message: Message[t.Any] | None | |
| def __init__(self, reason: str, message: Message[t.Any] | None) -> None: | |
| super().__init__(reason) | |
| self.message = message | |
| class Actor[M](abc.ABC): | |
| _child_actors: t.Final[t.MutableSet["Actor[t.Any]"]] | |
| _parent_actor: "Actor[t.Any] | None" | |
| _poison_pill: StopMessage | None | |
| _postbox: t.Final[asyncio.Queue[Message[M]]] | |
| _task: asyncio.Task[None] | None | |
| def __init__( | |
| self, | |
| *, | |
| max_backpressure: int = 0, | |
| ) -> None: | |
| self._child_actors = weakref.WeakSet() | |
| self._parent_actor = None | |
| self._poison_pill = None | |
| self._postbox = asyncio.Queue(max_backpressure) | |
| self._task = None | |
| def init_child_actor[A: Actor[t.Any]](self, actor: A) -> A: | |
| self._child_actors.add(actor) | |
| actor._start() | |
| return actor | |
| async def tell(self, message: M) -> None: | |
| if self._poison_pill is not None: | |
| raise UnhandledActorException("Actor was poisoned", self._poison_pill) | |
| await self._postbox.put(message) | |
| async def tell_stop(self) -> None: | |
| await self.tell(StopMessage(reason="User requested the actor to stop")) | |
| def __await__(self) -> t.Generator[t.Any, t.Any, None]: | |
| if (task := self._task) is not None: | |
| yield from task | |
| def _start(self) -> None: | |
| if self._task is not None: | |
| raise RuntimeError("Actor has already run") | |
| loop = asyncio.get_running_loop() | |
| self._task = loop.create_task(self._run()) | |
| async def _run(self) -> None: | |
| try: | |
| await self.post_construct() | |
| while True: | |
| message = await self._postbox.get() | |
| if isinstance(message, StopMessage): | |
| break | |
| try: | |
| await self.handle_message(message) | |
| except BaseException as ex: | |
| await self.handle_error(message, ex) | |
| except Exception as ex: | |
| self._poision_pill = StopMessage( | |
| reason=f"An unhandled exception was raised: {ex!r}", | |
| cause=ex, | |
| ) | |
| finally: | |
| try: | |
| await self.pre_destroy(self._poison_pill) | |
| except Exception: | |
| traceback.print_exc() | |
| finally: | |
| await self._terminate() | |
| async def _terminate(self) -> None: | |
| for child_actor in self._child_actors: | |
| await child_actor.tell_stop() | |
| await child_actor | |
| self._child_actors.clear() | |
| # Remove the parent reference; this allows the child task | |
| # to be garbage collected. | |
| self._parent_actor = None | |
| async def post_construct(self) -> None: | |
| pass | |
| async def pre_destroy(self, message: StopMessage, /) -> None: | |
| pass | |
| async def handle_error(self, message: M, exception: Exception, /) -> None: | |
| traceback.print_exc() | |
| raise UnhandledActorException( | |
| "An unhandled exception was raised: {exception!r}", | |
| message | |
| ) from exception | |
| @abc.abstractmethod | |
| async def handle_message(self, message: M) -> None: ... | |
| type MessageHandlerFunc[M] = t.Callable[[Actor[M], M], t.Awaitable[None]] | |
| class StatelessActor[M](Actor[M]): | |
| _handler: t.Final[MessageHandlerFunc[M]] | |
| def __init__( | |
| self, | |
| *, | |
| handler: MessageHandlerFunc[M], | |
| max_backpressure: int = 0, | |
| ) -> None: | |
| super().__init__(max_backpressure=max_backpressure) | |
| self._handler = handler | |
| def handle_message(self, message: M) -> t.Awaitable[None]: | |
| return self._handler(self, message) | |
| def actor[M](handler: MessageHandlerFunc[M], /) -> StatelessActor[M]: | |
| return StatelessActor(handler=handler) | |
| def run_actor[M](actor: Actor[M], message: M) -> None: | |
| async def main() -> None: | |
| actor._start() | |
| await actor.tell(message) | |
| await actor | |
| if actor._poison_pill is not None and actor._poison_pill.cause is not None: | |
| raise actor._poison_pill.cause | |
| asyncio.run(main()) | |
| ####### | |
| @actor | |
| async def message_producer(self: Actor[None], _: None) -> None: | |
| @self.init_child_actor | |
| @actor | |
| async def message_printer(self: Actor[str], message: str) -> None: | |
| print("received:", message) | |
| for m in "Hello world, this is working!".split(): | |
| await message_printer.tell(m) | |
| await self.tell_stop() | |
| run_actor(message_producer, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment