Created
March 25, 2024 18:26
-
-
Save backpackerhh/c63bcb5e0b006ccbb912435553fd82c7 to your computer and use it in GitHub Desktop.
Event bus implemented in Ruby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| YourApp.register_provider :domain_events, namespace: true do # dry-system | |
| prepare do | |
| Dir[target.root.join("path/to/**/*_event_subscriber.rb")].each { |file| require file } | |
| register "subscribers", EventSubscriber.subclasses | |
| end | |
| start do | |
| register "bus", InMemoryEventBus.new | |
| register "async_bus", SidekiqEventBus.new | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Event | |
| attr_reader :aggregate_id, :aggregate_attributes, :occurred_at, :id, :name | |
| private_class_method :new | |
| def self.from_primitives(attributes) | |
| new(id: attributes.fetch(:id, SecureRandom.uuid), | |
| aggregate_id: attributes.fetch(:aggregate_id), | |
| aggregate_attributes: attributes.fetch(:aggregate_attributes), | |
| occurred_at: attributes.fetch(:occurred_at)) | |
| end | |
| def initialize(id:, aggregate_id:, aggregate_attributes:, occurred_at:) | |
| @id = id | |
| @aggregate_id = aggregate_id | |
| @aggregate_attributes = aggregate_attributes | |
| @occurred_at = occurred_at | |
| @name = self.class.name | |
| end | |
| def to_primitives | |
| { | |
| id:, | |
| type: name, | |
| occurred_at: occurred_at.strftime("%Y-%m-%d %H:%M:%S.%N %z"), | |
| attributes: { | |
| id: aggregate_id, | |
| **aggregate_attributes | |
| } | |
| } | |
| end | |
| def ==(other) | |
| name == other.name && | |
| occurred_at == other.occurred_at && | |
| aggregate_id == other.aggregate_id && | |
| aggregate_attributes == other.aggregate_attributes | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class EventBus | |
| include Deps[event_subscribers: "domain_events.subscribers"] # dry-auto_inject | |
| attr_reader :event_subscriptions | |
| def initialize(...) | |
| super(...) | |
| @event_subscriptions = Hash.new { |hash, key| hash[key] = [] } | |
| event_subscribers.each do |event_subscriber_klass| | |
| event_subscriber = event_subscriber_klass.new | |
| event_subscriber.subscribed_to.each do |event_klass| | |
| @event_subscriptions[event_klass] << event_subscriber | |
| end | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require "json-schema" | |
| class EventSerializer | |
| def self.serialize(event) | |
| uuid_regex_pattern = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" | |
| time_regex_pattern = /\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{9} \+\d{4}\z/ | |
| schema = { | |
| id: "domain-events-serializer", | |
| type: "object", | |
| required: %w[id type occurred_at attributes], | |
| properties: { | |
| id: { type: "string", pattern: uuid_regex_pattern }, | |
| type: { type: "string" }, | |
| occurred_at: { type: "string", pattern: time_regex_pattern }, | |
| attributes: { | |
| type: "object", | |
| required: ["id"], | |
| properties: { | |
| id: { type: "string", pattern: uuid_regex_pattern } | |
| } | |
| } | |
| } | |
| } | |
| validation_errors = JSON::Validator.fully_validate(schema, event.to_primitives) | |
| if validation_errors.any? | |
| raise Domain::InvalidEventSchemaError, validation_errors | |
| end | |
| JSON.parse({ data: { **event.to_primitives } }.to_json) | |
| end | |
| def self.deserialize(raw_event) | |
| event_klass = Object.const_get(raw_event.dig("data", "type")) | |
| event_klass.from_primitives( | |
| aggregate_id: raw_event.dig("data", "attributes", "id"), | |
| aggregate_attributes: raw_event.dig("data", "attributes").except("id").transform_keys(&:to_sym), | |
| occurred_at: Time.parse(raw_event.dig("data", "occurred_at")), | |
| id: raw_event.dig("data", "id") | |
| ) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class EventSubscriber | |
| def on(event) | |
| raise NotImplementedError, "Define what will the event subscriber do upon receiving an event in #on method" | |
| end | |
| def subscribed_to | |
| raise NotImplementedError, "Define the list of events in #subscribed_to method" | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class OrderCreatedEvent < Event | |
| def self.from(order) | |
| from_primitives( | |
| aggregate_id: order.id.value, | |
| aggregate_attributes: { | |
| amount: order.amount.value, | |
| # other attributes... | |
| }, | |
| occurred_at: order.created_at.value | |
| ) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class CreateOrderCommissionOnOrderCreatedEventSubscriber < EventSubscriber | |
| def on(event) | |
| CreateOrderCommissionUseCase.new.create( | |
| order_id: event.aggregate_id, | |
| order_amount: event.aggregate_attributes.fetch(:amount) | |
| ) | |
| end | |
| def subscribed_to | |
| [OrderCreatedEvent] | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class CreateOrderUseCase | |
| include Deps[event_bus: "domain_events.async_bus"] | |
| def create(attributes) | |
| # omitted... | |
| event_bus.publish(OrderCreatedEvent.from(order)) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class InMemoryEventBus < EventBus | |
| def publish(event) | |
| event_subscriptions[event.class].each { |subscriber| subscriber.on(event) } | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require "sidekiq" | |
| class PublishEventJob | |
| include Sidekiq::Job | |
| sidekiq_options queue: "domain_events", unique: true, retry_for: 3600 # 1 hour | |
| def perform(subscriber_klass_name, raw_event) | |
| event = EventSerializer.deserialize(raw_event) | |
| Object.const_get(subscriber_klass_name).new.on(event) | |
| logger.info("Job enqueued to publish event #{event.id}") | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class SidekiqEventBus < EventBus | |
| def publish(event) | |
| event_subscriptions[event.class].each do |subscriber| | |
| PublishEventJob.perform_async(subscriber.class.name, EventSerializer.serialize(event)) | |
| end | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment