Skip to content

Instantly share code, notes, and snippets.

@ascopes
Last active July 13, 2025 09:46
Show Gist options
  • Select an option

  • Save ascopes/e30ce5afb63ca9fdf72bc3177be596e7 to your computer and use it in GitHub Desktop.

Select an option

Save ascopes/e30ce5afb63ca9fdf72bc3177be596e7 to your computer and use it in GitHub Desktop.
Crude implementation of the actor model in Python using asyncio.
import abc
import asyncio
import dataclasses
import traceback
import typing as t
import weakref
@t.final
@dataclasses.dataclass(frozen=True, slots=True, kw_only=True)
class StopMessage:
reason: str
cause: BaseException | None = None
type Message[M] = M | StopMessage
class UnhandledActorException(Exception):
message: Message[t.Any] | None
def __init__(self, reason: str, message: Message[t.Any] | None) -> None:
super().__init__(reason)
self.message = message
class Actor[M](abc.ABC):
_child_actors: t.Final[t.MutableSet["Actor[t.Any]"]]
_parent_actor: "Actor[t.Any] | None"
_poison_pill: StopMessage | None
_postbox: t.Final[asyncio.Queue[Message[M]]]
_task: asyncio.Task[None] | None
def __init__(
self,
*,
max_backpressure: int = 0,
) -> None:
self._child_actors = weakref.WeakSet()
self._parent_actor = None
self._poison_pill = None
self._postbox = asyncio.Queue(max_backpressure)
self._task = None
def init_child_actor[A: Actor[t.Any]](self, actor: A) -> A:
self._child_actors.add(actor)
actor._start()
return actor
async def tell(self, message: M) -> None:
if self._poison_pill is not None:
raise UnhandledActorException("Actor was poisoned", self._poison_pill)
await self._postbox.put(message)
async def tell_stop(self) -> None:
await self.tell(StopMessage(reason="User requested the actor to stop"))
def __await__(self) -> t.Generator[t.Any, t.Any, None]:
if (task := self._task) is not None:
yield from task
def _start(self) -> None:
if self._task is not None:
raise RuntimeError("Actor has already run")
loop = asyncio.get_running_loop()
self._task = loop.create_task(self._run())
async def _run(self) -> None:
try:
await self.post_construct()
while True:
message = await self._postbox.get()
if isinstance(message, StopMessage):
break
try:
await self.handle_message(message)
except BaseException as ex:
await self.handle_error(message, ex)
except Exception as ex:
self._poision_pill = StopMessage(
reason=f"An unhandled exception was raised: {ex!r}",
cause=ex,
)
finally:
try:
await self.pre_destroy(self._poison_pill)
except Exception:
traceback.print_exc()
finally:
await self._terminate()
async def _terminate(self) -> None:
for child_actor in self._child_actors:
await child_actor.tell_stop()
await child_actor
self._child_actors.clear()
# Remove the parent reference; this allows the child task
# to be garbage collected.
self._parent_actor = None
async def post_construct(self) -> None:
pass
async def pre_destroy(self, message: StopMessage, /) -> None:
pass
async def handle_error(self, message: M, exception: Exception, /) -> None:
traceback.print_exc()
raise UnhandledActorException(
"An unhandled exception was raised: {exception!r}",
message
) from exception
@abc.abstractmethod
async def handle_message(self, message: M) -> None: ...
type MessageHandlerFunc[M] = t.Callable[[Actor[M], M], t.Awaitable[None]]
class StatelessActor[M](Actor[M]):
_handler: t.Final[MessageHandlerFunc[M]]
def __init__(
self,
*,
handler: MessageHandlerFunc[M],
max_backpressure: int = 0,
) -> None:
super().__init__(max_backpressure=max_backpressure)
self._handler = handler
def handle_message(self, message: M) -> t.Awaitable[None]:
return self._handler(self, message)
def actor[M](handler: MessageHandlerFunc[M], /) -> StatelessActor[M]:
return StatelessActor(handler=handler)
def run_actor[M](actor: Actor[M], message: M) -> None:
async def main() -> None:
actor._start()
await actor.tell(message)
await actor
if actor._poison_pill is not None and actor._poison_pill.cause is not None:
raise actor._poison_pill.cause
asyncio.run(main())
#######
@actor
async def message_producer(self: Actor[None], _: None) -> None:
@self.init_child_actor
@actor
async def message_printer(self: Actor[str], message: str) -> None:
print("received:", message)
for m in "Hello world, this is working!".split():
await message_printer.tell(m)
await self.tell_stop()
run_actor(message_producer, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment