Skip to content

Instantly share code, notes, and snippets.

@ndy40
Last active January 29, 2025 17:12
Show Gist options
  • Select an option

  • Save ndy40/785c7354006b8bee0a975f476c66e966 to your computer and use it in GitHub Desktop.

Select an option

Save ndy40/785c7354006b8bee0a975f476c66e966 to your computer and use it in GitHub Desktop.
EventBus class
import importlib
import inspect
import os
from functools import wraps
from typing import List, Callable, Dict, Type, ClassVar
class EventBus:
listeners: Dict[str, List[Callable]]
def __init__(self):
self.listeners = {}
def register(self, event: str | Type['Event']):
def decorator(func: Callable):
from .events import Event
name = event.__name__ if isinstance(event, type) and issubclass(event, Event) else event
self.listeners.setdefault(name, []).append(func)
setattr(func, '_is_registered_listener', True)
return func
return decorator
def dispatch(self, event: str | Type['Event'], **kwargs):
from deal_finder.events import Event
event_data = {"event": event}
handlers = self.get_handlers(event.__class__.__name__ if isinstance(event, Event) else event)
for listener in handlers:
print(f"Dispatching event {event} to {listener}")
listener(**event_data, **kwargs)
def get_handlers(self, event: str | Type['Event']):
from deal_finder.events import Event
if isinstance(event, type) and issubclass(event, Event):
name = event.__class__.__name__
elif isinstance(event, str):
name = event
return self.listeners.get(name, [])
@staticmethod
def scan_for_listeners():
global bus
# Scan the current directory for Python files
for root, _, files in os.walk(os.getcwd()):
for file in files:
try:
if file.endswith('handlers.py'):
module_name = os.path.splitext(file)[0]
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and hasattr(obj, '_is_registered_listener'):
bus.register(obj)
except Exception as e:
...
bus = EventBus()
from dataclasses import dataclass, field
from uuid import UUID, uuid4
@dataclass(frozen=True)
class Event:
...
@dataclass(frozen=True)
class DealFound(Event):
deal_url: str
user_id: UUID
@dataclass(frozen=True)
class UserRegistered(Event):
user_id: UUID
email: str
interest_tags: list[str] = field(default_factory=list)
from deal_finder import register_listener
from deal_finder.events import UserRegistered
@register_listener(UserRegistered)
def log_user_registered(event: UserRegistered, logger, **kwargs):
logger.info(f"User {event.user_id} registered")
@register_listener(UserRegistered)
def send_welcome_email(event: UserRegistered, logger, **kwargs):
logger.info(f"Welcome email sent to {event.email}")
import uuid
from deal_finder.event_bus import event_bus
from deal_finder.events import UserRegistered
def main():
event = UserRegistered(
user_id=uuid.uuid4(),
interest_tags=['travel', 'food', 'sports'],
email='user@email.com',
)
event_bus.scan_for_listeners()
event_bus.dispatch(event, logger_name='user_service')
if __name__ == '__main__':
main()
from dataclasses import dataclass
from unittest import TestCase
from deal_finder.event_bus import EventBus
from deal_finder.events import Event
class TestEventBus(TestCase):
def test_registering_of_handler_for_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class HelloEvent(Event):
name: str = "hello"
@event_bus.register(HelloEvent)
def hello_handler(event: Event):
assert event.name == "hello"
assert len(event_bus.listeners) == 1
def test_registering_multiple_handlers_for_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class DummyEvent(Event):
...
@event_bus.register(DummyEvent)
def handler1(*args, **kwargs):
print('Handler 1')
#
@event_bus.register(DummyEvent)
def handler2(*args, **kwargs):
print('Handler 2')
assert len(event_bus.get_handlers(DummyEvent)) == 2
def test_both_handlers_called_for_dummy_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class DummyEvent(Event):
...
handler_calls = []
@event_bus.register(DummyEvent)
def handler1(*args, **kwargs):
handler_calls.append('handler1_called')
@event_bus.register(DummyEvent)
def handler2(*args, **kwargs):
handler_calls.append('handler2_called')
event_bus.dispatch(DummyEvent())
assert 'handler1_called' in handler_calls
assert 'handler2_called' in handler_calls
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment