Created
December 30, 2025 14:15
-
-
Save moostafaa/f8df6da06d7a519695fa8c502ecbcca4 to your computer and use it in GitHub Desktop.
Saga With outbox pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //Saga Events | |
| public record OrderCreatedEvent(Guid OrderId, int ProductId, int Quantity, decimal Amount, string CustomerId); | |
| public record StockReservedEvent(Guid OrderId, int ProductId, int Quantity); | |
| public record StockReservationFailedEvent(Guid OrderId, string Reason); | |
| public record OrderConfirmedEvent(Guid OrderId); | |
| public record OrderCancelledEvent(Guid OrderId, string Reason); | |
| public record StockReleaseRequestedEvent(Guid OrderId, int ProductId, int Quantity); | |
| public class OutboxMessage | |
| { | |
| public Guid Id { get; set; } | |
| public string EventType { get; set; } | |
| public string EventPayload { get; set; } | |
| public DateTime CreatedAt { get; set; } | |
| public bool IsPublished { get; set; } | |
| public DateTime? PublishedAt { get; set; } | |
| } | |
| //GENERIC OUTPUX PUBLISHER | |
| public interface IEventBus | |
| { | |
| Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default); | |
| } | |
| public class OutboxPublisher<TDbContext> : BackgroundService where TDbContext : DbContext | |
| { | |
| private readonly IServiceProvider _serviceProvider; | |
| private readonly IEventBus _eventBus; | |
| private readonly ILogger<OutboxPublisher<TDbContext>> _logger; | |
| private readonly string _serviceName; | |
| public OutboxPublisher(IServiceProvider serviceProvider, IEventBus eventBus, ILogger<OutboxPublisher<TDbContext>> logger, string serviceName) | |
| { | |
| _serviceProvider = serviceProvider; | |
| _eventBus = eventBus; | |
| _logger = logger; | |
| _serviceName = serviceName; | |
| } | |
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
| { | |
| _logger.LogInformation("Outbox Publisher started for {ServiceName}", _serviceName); | |
| while (!stoppingToken.IsCancellationRequested) | |
| { | |
| try | |
| { | |
| await using var scope = _serviceProvider.CreateAsyncScope(); | |
| await using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>(); | |
| var messages = await dbContext.Set<OutboxMessage>() | |
| .Where(m => !m.IsPublished) | |
| .OrderBy(m => m.CreatedAt) | |
| .Take(20) | |
| .ToListAsync(stoppingToken); | |
| foreach (var message in messages) | |
| { | |
| try | |
| { | |
| var eventType = Type.GetType(message.EventType); | |
| var jsonOptions = new JsonSerializerOptions | |
| { | |
| PropertyNamingPolicy = JsonNamingPolicy.CamelCase | |
| }; | |
| var @event = JsonSerializer.Deserialize(message.EventPayload, eventType!, jsonOptions); | |
| await _eventBus.PublishAsync(@event!, stoppingToken); | |
| message.IsPublished = true; | |
| message.PublishedAt = DateTime.UtcNow; | |
| await dbContext.SaveChangesAsync(stoppingToken); | |
| _logger.LogInformation("Published outbox message {MessageId}", message.Id); | |
| } | |
| catch (Exception ex) | |
| { | |
| _logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id); | |
| //Retry - exponential backoff | |
| } | |
| } | |
| await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); | |
| } | |
| catch (Exception ex) | |
| { | |
| _logger.LogError(ex, "Error in Outbox Publisher loop"); | |
| await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); | |
| } | |
| } | |
| } | |
| } | |
| //ORDERS | |
| public class OrderDbContext : DbContext | |
| { | |
| public DbSet<Order> Orders { get; set; } | |
| public DbSet<OutboxMessage> OutboxMessages { get; set; } | |
| public OrderDbContext() : base() { } | |
| public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options) { } | |
| protected override void OnConfiguring(DbContextOptionsBuilder options) | |
| { | |
| if (!options.IsConfigured) | |
| options.UseSqlServer("Server=.;Database=OrderDB;Trusted_Connection=True;TrustServerCertificate=True;"); | |
| } | |
| } | |
| public class Order | |
| { | |
| public Guid Id { get; set; } | |
| public int ProductId { get; set; } | |
| public int Quantity { get; set; } | |
| public decimal Amount { get; set; } | |
| public string CustomerId { get; set; } | |
| public string Status { get; set; } // Pending, Confirmed, Cancelled | |
| public DateTime CreatedAt { get; set; } | |
| } | |
| public class OrderSagaService | |
| { | |
| private readonly OrderDbContext _dbContext; | |
| private readonly ILogger<OrderSagaService> _logger; | |
| public OrderSagaService(OrderDbContext dbContext, ILogger<OrderSagaService> logger) | |
| { | |
| _dbContext = dbContext; | |
| _logger = logger; | |
| } | |
| public async Task<Guid> CreateOrderAsync(int productId, int quantity, decimal amount, string customerId) | |
| { | |
| using var transaction = await _dbContext.Database.BeginTransactionAsync(); | |
| try | |
| { | |
| var orderId = Guid.NewGuid(); | |
| var order = new Order | |
| { | |
| Id = orderId, | |
| ProductId = productId, | |
| Quantity = quantity, | |
| Amount = amount, | |
| CustomerId = customerId, | |
| Status = "Pending", | |
| CreatedAt = DateTime.UtcNow | |
| }; | |
| _dbContext.Orders.Add(order); | |
| var orderCreatedEvent = new OrderCreatedEvent(orderId, productId, quantity, amount, customerId); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(OrderCreatedEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(orderCreatedEvent, new JsonSerializerOptions | |
| { | |
| PropertyNamingPolicy = JsonNamingPolicy.CamelCase | |
| }) | |
| }; | |
| _dbContext.OutboxMessages.Add(outboxMessage); | |
| await _dbContext.SaveChangesAsync(); | |
| await transaction.CommitAsync(); | |
| _logger.LogInformation("Order created and event stored in outbox: {OrderId}", orderId); | |
| return orderId; | |
| } | |
| catch (Exception ex) | |
| { | |
| await transaction.RollbackAsync(); | |
| _logger.LogError(ex, "Failed to create order"); | |
| throw; | |
| } | |
| } | |
| //Compensation | |
| public async Task CancelOrderAsync(Guid orderId, string reason) | |
| { | |
| var order = await _dbContext.Orders.FindAsync(orderId); | |
| if (order != null && order.Status == "Pending") | |
| { | |
| order.Status = "Cancelled"; | |
| var cancelEvent = new OrderCancelledEvent(orderId, reason); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(OrderCancelledEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(cancelEvent) | |
| }; | |
| _dbContext.OutboxMessages.Add(outboxMessage); | |
| await _dbContext.SaveChangesAsync(); | |
| _logger.LogInformation("Order cancelled: {OrderId}", orderId); | |
| } | |
| } | |
| } | |
| //INVENTORY | |
| public class InventorySagaService | |
| { | |
| private readonly InventoryDbContext _dbContext; | |
| private readonly ILogger<InventorySagaService> _logger; | |
| public InventorySagaService(InventoryDbContext dbContext, ILogger<InventorySagaService> logger) | |
| { | |
| _dbContext = dbContext; | |
| _logger = logger; | |
| } | |
| public async Task ReserveStockAsync(OrderCreatedEvent @event) | |
| { | |
| using var transaction = await _dbContext.Database.BeginTransactionAsync(); | |
| try | |
| { | |
| var stock = await _dbContext.ProductStocks.FirstOrDefaultAsync(s => s.ProductId == @event.ProductId); | |
| if (stock == null || stock.AvailableQuantity < @event.Quantity) | |
| { | |
| var failedEvent = new StockReservationFailedEvent(@event.OrderId, "Insufficient stock"); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(StockReservationFailedEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(failedEvent) | |
| }; | |
| _dbContext.OutboxMessages.Add(outboxMessage); | |
| await _dbContext.SaveChangesAsync(); | |
| await transaction.CommitAsync(); | |
| _logger.LogWarning("Stock reservation failed for order {OrderId}", @event.OrderId); | |
| return; | |
| } | |
| stock.AvailableQuantity -= @event.Quantity; | |
| stock.ReservedQuantity += @event.Quantity; | |
| var reservedEvent = new StockReservedEvent(@event.OrderId, @event.ProductId, @event.Quantity); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(StockReservedEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(reservedEvent) | |
| }; | |
| _dbContext.OutboxMessages.Add(outboxMessage); | |
| await _dbContext.SaveChangesAsync(); | |
| await transaction.CommitAsync(); | |
| _logger.LogInformation("Stock reserved for order {OrderId}", @event.OrderId); | |
| } | |
| catch (Exception ex) | |
| { | |
| await transaction.RollbackAsync(); | |
| _logger.LogError(ex, "Error reserving stock for order {OrderId}", @event.OrderId); | |
| throw; | |
| } | |
| } | |
| // Compensation | |
| public async Task ReleaseStockAsync(OrderCancelledEvent @event) | |
| { | |
| var order = await _dbContext.Orders.FindAsync(@event.OrderId); | |
| if (order != null) | |
| { | |
| var stock = await _dbContext.ProductStocks.FirstOrDefaultAsync(s => s.ProductId == order.ProductId); | |
| if (stock != null) | |
| { | |
| stock.AvailableQuantity += order.Quantity; | |
| stock.ReservedQuantity -= order.Quantity; | |
| var releaseEvent = new StockReleaseRequestedEvent(@event.OrderId, order.ProductId, order.Quantity); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(StockReleaseRequestedEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(releaseEvent) | |
| }; | |
| _dbContext.OutboxMessages.Add(outboxMessage); | |
| await _dbContext.SaveChangesAsync(); | |
| _logger.LogInformation("Stock released for cancelled order {OrderId}", @event.OrderId); | |
| } | |
| } | |
| } | |
| } | |
| //ORCHESTRATION | |
| public class OrderOrchestratorService | |
| { | |
| private readonly IEventBus _eventBus; | |
| private readonly ILogger<OrderOrchestratorService> _logger; | |
| public OrderOrchestratorService(IEventBus eventBus, ILogger<OrderOrchestratorService> logger) | |
| { | |
| _eventBus = eventBus; | |
| _logger = logger; | |
| } | |
| public async Task<Guid> ProcessNewOrderAsync(int productId, int quantity, decimal amount, string customerId) | |
| { | |
| var orderId = Guid.NewGuid(); | |
| var orderService = new OrderSagaService(new OrderDbContext(), NullLogger<OrderSagaService>.Instance); | |
| await orderService.CreateOrderAsync(productId, quantity, amount, customerId); | |
| _logger.LogInformation("Orchestrator initiated order process: {OrderId}", orderId); | |
| return orderId; | |
| } | |
| public async Task HandleStockReservedAsync(StockReservedEvent @event) | |
| { | |
| try | |
| { | |
| using var dbContext = new OrderDbContext(); | |
| var order = await dbContext.Orders.FindAsync(@event.OrderId); | |
| if (order != null && order.Status == "Pending") | |
| { | |
| order.Status = "Confirmed"; | |
| var confirmedEvent = new OrderConfirmedEvent(@event.OrderId); | |
| var outboxMessage = new OutboxMessage | |
| { | |
| Id = Guid.NewGuid(), | |
| EventType = typeof(OrderConfirmedEvent).AssemblyQualifiedName!, | |
| EventPayload = JsonSerializer.Serialize(confirmedEvent) | |
| }; | |
| dbContext.OutboxMessages.Add(outboxMessage); | |
| await dbContext.SaveChangesAsync(); | |
| _logger.LogInformation("Order confirmed: {OrderId}", @event.OrderId); | |
| } | |
| } | |
| catch (Exception ex) | |
| { | |
| _logger.LogError(ex, "Error confirming order {OrderId}", @event.OrderId); | |
| throw; | |
| } | |
| } | |
| public async Task HandleStockReservationFailedAsync(StockReservationFailedEvent @event) | |
| { | |
| var orderService = new OrderSagaService(new OrderDbContext(),NullLogger<OrderSagaService>.Instance); | |
| await orderService.CancelOrderAsync(@event.OrderId, @event.Reason); | |
| _logger.LogInformation("Order cancelled due to stock failure: {OrderId}", @event.OrderId); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment