Skip to content

Instantly share code, notes, and snippets.

@moostafaa
Created December 30, 2025 14:15
Show Gist options
  • Select an option

  • Save moostafaa/f8df6da06d7a519695fa8c502ecbcca4 to your computer and use it in GitHub Desktop.

Select an option

Save moostafaa/f8df6da06d7a519695fa8c502ecbcca4 to your computer and use it in GitHub Desktop.
Saga With outbox pattern
//Saga Events
public record OrderCreatedEvent(Guid OrderId, int ProductId, int Quantity, decimal Amount, string CustomerId);
public record StockReservedEvent(Guid OrderId, int ProductId, int Quantity);
public record StockReservationFailedEvent(Guid OrderId, string Reason);
public record OrderConfirmedEvent(Guid OrderId);
public record OrderCancelledEvent(Guid OrderId, string Reason);
public record StockReleaseRequestedEvent(Guid OrderId, int ProductId, int Quantity);
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; }
public string EventPayload { get; set; }
public DateTime CreatedAt { get; set; }
public bool IsPublished { get; set; }
public DateTime? PublishedAt { get; set; }
}
//GENERIC OUTPUX PUBLISHER
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default);
}
public class OutboxPublisher<TDbContext> : BackgroundService where TDbContext : DbContext
{
private readonly IServiceProvider _serviceProvider;
private readonly IEventBus _eventBus;
private readonly ILogger<OutboxPublisher<TDbContext>> _logger;
private readonly string _serviceName;
public OutboxPublisher(IServiceProvider serviceProvider, IEventBus eventBus, ILogger<OutboxPublisher<TDbContext>> logger, string serviceName)
{
_serviceProvider = serviceProvider;
_eventBus = eventBus;
_logger = logger;
_serviceName = serviceName;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Outbox Publisher started for {ServiceName}", _serviceName);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
await using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
var messages = await dbContext.Set<OutboxMessage>()
.Where(m => !m.IsPublished)
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(stoppingToken);
foreach (var message in messages)
{
try
{
var eventType = Type.GetType(message.EventType);
var jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var @event = JsonSerializer.Deserialize(message.EventPayload, eventType!, jsonOptions);
await _eventBus.PublishAsync(@event!, stoppingToken);
message.IsPublished = true;
message.PublishedAt = DateTime.UtcNow;
await dbContext.SaveChangesAsync(stoppingToken);
_logger.LogInformation("Published outbox message {MessageId}", message.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id);
//Retry - exponential backoff
}
}
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in Outbox Publisher loop");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
}
//ORDERS
public class OrderDbContext : DbContext
{
public DbSet<Order> Orders { get; set; }
public DbSet<OutboxMessage> OutboxMessages { get; set; }
public OrderDbContext() : base() { }
public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options) { }
protected override void OnConfiguring(DbContextOptionsBuilder options)
{
if (!options.IsConfigured)
options.UseSqlServer("Server=.;Database=OrderDB;Trusted_Connection=True;TrustServerCertificate=True;");
}
}
public class Order
{
public Guid Id { get; set; }
public int ProductId { get; set; }
public int Quantity { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
public string Status { get; set; } // Pending, Confirmed, Cancelled
public DateTime CreatedAt { get; set; }
}
public class OrderSagaService
{
private readonly OrderDbContext _dbContext;
private readonly ILogger<OrderSagaService> _logger;
public OrderSagaService(OrderDbContext dbContext, ILogger<OrderSagaService> logger)
{
_dbContext = dbContext;
_logger = logger;
}
public async Task<Guid> CreateOrderAsync(int productId, int quantity, decimal amount, string customerId)
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
var orderId = Guid.NewGuid();
var order = new Order
{
Id = orderId,
ProductId = productId,
Quantity = quantity,
Amount = amount,
CustomerId = customerId,
Status = "Pending",
CreatedAt = DateTime.UtcNow
};
_dbContext.Orders.Add(order);
var orderCreatedEvent = new OrderCreatedEvent(orderId, productId, quantity, amount, customerId);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(OrderCreatedEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(orderCreatedEvent, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
})
};
_dbContext.OutboxMessages.Add(outboxMessage);
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogInformation("Order created and event stored in outbox: {OrderId}", orderId);
return orderId;
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "Failed to create order");
throw;
}
}
//Compensation
public async Task CancelOrderAsync(Guid orderId, string reason)
{
var order = await _dbContext.Orders.FindAsync(orderId);
if (order != null && order.Status == "Pending")
{
order.Status = "Cancelled";
var cancelEvent = new OrderCancelledEvent(orderId, reason);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(OrderCancelledEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(cancelEvent)
};
_dbContext.OutboxMessages.Add(outboxMessage);
await _dbContext.SaveChangesAsync();
_logger.LogInformation("Order cancelled: {OrderId}", orderId);
}
}
}
//INVENTORY
public class InventorySagaService
{
private readonly InventoryDbContext _dbContext;
private readonly ILogger<InventorySagaService> _logger;
public InventorySagaService(InventoryDbContext dbContext, ILogger<InventorySagaService> logger)
{
_dbContext = dbContext;
_logger = logger;
}
public async Task ReserveStockAsync(OrderCreatedEvent @event)
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
var stock = await _dbContext.ProductStocks.FirstOrDefaultAsync(s => s.ProductId == @event.ProductId);
if (stock == null || stock.AvailableQuantity < @event.Quantity)
{
var failedEvent = new StockReservationFailedEvent(@event.OrderId, "Insufficient stock");
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(StockReservationFailedEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(failedEvent)
};
_dbContext.OutboxMessages.Add(outboxMessage);
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogWarning("Stock reservation failed for order {OrderId}", @event.OrderId);
return;
}
stock.AvailableQuantity -= @event.Quantity;
stock.ReservedQuantity += @event.Quantity;
var reservedEvent = new StockReservedEvent(@event.OrderId, @event.ProductId, @event.Quantity);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(StockReservedEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(reservedEvent)
};
_dbContext.OutboxMessages.Add(outboxMessage);
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogInformation("Stock reserved for order {OrderId}", @event.OrderId);
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "Error reserving stock for order {OrderId}", @event.OrderId);
throw;
}
}
// Compensation
public async Task ReleaseStockAsync(OrderCancelledEvent @event)
{
var order = await _dbContext.Orders.FindAsync(@event.OrderId);
if (order != null)
{
var stock = await _dbContext.ProductStocks.FirstOrDefaultAsync(s => s.ProductId == order.ProductId);
if (stock != null)
{
stock.AvailableQuantity += order.Quantity;
stock.ReservedQuantity -= order.Quantity;
var releaseEvent = new StockReleaseRequestedEvent(@event.OrderId, order.ProductId, order.Quantity);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(StockReleaseRequestedEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(releaseEvent)
};
_dbContext.OutboxMessages.Add(outboxMessage);
await _dbContext.SaveChangesAsync();
_logger.LogInformation("Stock released for cancelled order {OrderId}", @event.OrderId);
}
}
}
}
//ORCHESTRATION
public class OrderOrchestratorService
{
private readonly IEventBus _eventBus;
private readonly ILogger<OrderOrchestratorService> _logger;
public OrderOrchestratorService(IEventBus eventBus, ILogger<OrderOrchestratorService> logger)
{
_eventBus = eventBus;
_logger = logger;
}
public async Task<Guid> ProcessNewOrderAsync(int productId, int quantity, decimal amount, string customerId)
{
var orderId = Guid.NewGuid();
var orderService = new OrderSagaService(new OrderDbContext(), NullLogger<OrderSagaService>.Instance);
await orderService.CreateOrderAsync(productId, quantity, amount, customerId);
_logger.LogInformation("Orchestrator initiated order process: {OrderId}", orderId);
return orderId;
}
public async Task HandleStockReservedAsync(StockReservedEvent @event)
{
try
{
using var dbContext = new OrderDbContext();
var order = await dbContext.Orders.FindAsync(@event.OrderId);
if (order != null && order.Status == "Pending")
{
order.Status = "Confirmed";
var confirmedEvent = new OrderConfirmedEvent(@event.OrderId);
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = typeof(OrderConfirmedEvent).AssemblyQualifiedName!,
EventPayload = JsonSerializer.Serialize(confirmedEvent)
};
dbContext.OutboxMessages.Add(outboxMessage);
await dbContext.SaveChangesAsync();
_logger.LogInformation("Order confirmed: {OrderId}", @event.OrderId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error confirming order {OrderId}", @event.OrderId);
throw;
}
}
public async Task HandleStockReservationFailedAsync(StockReservationFailedEvent @event)
{
var orderService = new OrderSagaService(new OrderDbContext(),NullLogger<OrderSagaService>.Instance);
await orderService.CancelOrderAsync(@event.OrderId, @event.Reason);
_logger.LogInformation("Order cancelled due to stock failure: {OrderId}", @event.OrderId);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment