Synchronous calls (REST/gRPC) are great — but they tightly couple services in time.
- If the downstream is slow, your request is slow.
- If it’s down, you’re down.
👉 Enter Event-Driven Microservices.
- Producers publish events and move on.
- Consumers process them asynchronously, at their own pace.
- With outbox pattern, retries, DLQ, and idempotency → your system stays reliable.
0️⃣ Scenario
- 🛒 Order Service → publishes
OrderCreatedafter saving an order - 📦 Inventory Service → subscribes to decrease stock
- 💳 Payment Service → subscribes to charge the customer
We guarantee:
✅ No event is lost (Outbox pattern)
✅ Consumers handle duplicates (Idempotency)
✅ Poison messages don’t block queues (DLQ)
1️⃣ Messaging Stack with Docker
RabbitMQ (simple & great for routing + DLQ)
📄 docker-compose.rabbit.yml
version: "3.9"
services:
rabbitmq:
image: rabbitmq:3.13-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # UI http://localhost:15672 (user: guest / pass: guest)
Run:
docker compose -f docker-compose.rabbit.yml up -d
👉 RabbitMQ Management UI → http://localhost:15672
Kafka (optional alternative)
📄 docker-compose.kafka.yml
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
ports: ["2181:2181"]
kafka:
image: confluentinc/cp-kafka:7.6.1
ports: ["9092:9092"]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Run:
docker compose -f docker-compose.kafka.yml up -d
2️⃣ Event Contract
Keep events schema-driven & versioned:
{
"eventId": "1f09f4f7a1d24575a2b4b2a1b35c0b1a",
"eventType": "order.created.v1",
"occurredAtUtc": "2025-08-22T09:30:00Z",
"payload": {
"orderId": 12045,
"customerId": 77,
"totalAmount": 1499.00,
"currency": "INR",
"items": [
{ "productId": 101, "qty": 1 },
{ "productId": 205, "qty": 2 }
]
},
"metadata": {
"traceId": "c4c7…",
"correlationId": "5a7e…",
"producer": "order-service",
"schemaVersion": 1
}
}
3️⃣ RabbitMQ Topology
- Exchange:
orders.exchange(topic) - Routing key:
order.created.v1 - Queues:
inventory.order-created.qpayment.order-created.q
- DLQs:
inventory.order-created.dlqpayment.order-created.dlq
👉 Use x-dead-letter-exchange + x-message-ttl for retries.
4️⃣ Outbox Pattern (Producer Side)
Problem:
If you save an order in DB but crash before publishing the event → drift!
Solution:
Outbox = Write both order + event in same DB transaction. A background dispatcher later publishes events reliably.
Schema
CREATE TABLE Orders (
Id INT IDENTITY PRIMARY KEY,
CustomerId INT NOT NULL,
TotalAmount DECIMAL(18,2) NOT NULL,
CreatedAtUtc DATETIME2 NOT NULL
);
CREATE TABLE OutboxMessages (
Id UNIQUEIDENTIFIER PRIMARY KEY,
OccurredAtUtc DATETIME2 NOT NULL,
Type NVARCHAR(200) NOT NULL,
Payload NVARCHAR(MAX) NOT NULL,
PublishedAtUtc DATETIME2 NULL,
Attempts INT NOT NULL DEFAULT 0,
LastError NVARCHAR(1000) NULL
);
CREATE INDEX IX_OutboxMessages_Unpublished
ON OutboxMessages(PublishedAtUtc) WHERE PublishedAtUtc IS NULL;
EF Core Models
public class Order { ... }
public class OutboxMessage { ... }
public class OrdersDb : DbContext
{
public DbSet<Order> Orders => Set<Order>();
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
}
Create Order + Stage Outbox
app.MapPost("/orders", async (OrdersDb db, CreateOrderDto dto) =>
{
using var tx = await db.Database.BeginTransactionAsync();
var order = new Order(dto.CustomerId, dto.TotalAmount);
db.Orders.Add(order);
await db.SaveChangesAsync();
var evt = new { /* event payload */ };
db.OutboxMessages.Add(new OutboxMessage
{
Type = "order.created.v1",
Payload = JsonSerializer.Serialize(evt)
});
await db.SaveChangesAsync();
await tx.CommitAsync();
return Results.Created($"/orders/{order.Id}", new { order.Id });
});
Outbox Dispatcher (BackgroundService)
public class OutboxDispatcher : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
using var scope = _sp.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<OrdersDb>();
var bus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var batch = await db.OutboxMessages
.Where(m => m.PublishedAtUtc == null && m.Attempts < 10)
.OrderBy(m => m.OccurredAtUtc)
.Take(50)
.ToListAsync(ct);
foreach (var msg in batch)
{
try
{
await bus.PublishAsync(msg.Type, msg.Payload, ct);
msg.PublishedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
{
msg.Attempts++;
msg.LastError = ex.Message;
}
}
await db.SaveChangesAsync(ct);
await Task.Delay(1000, ct);
}
}
}
5️⃣ RabbitMQ Publisher (IEventBus)
public interface IEventBus
{
Task PublishAsync(string eventType, string jsonPayload, CancellationToken ct = default);
}
public sealed class RabbitMqEventBus : IEventBus, IDisposable
{
private readonly IModel _ch;
private const string Exchange = "orders.exchange";
public RabbitMqEventBus()
{
var factory = new ConnectionFactory { HostName = "localhost", DispatchConsumersAsync = true };
var conn = factory.CreateConnection();
_ch = conn.CreateModel();
_ch.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true);
}
public Task PublishAsync(string eventType, string jsonPayload, CancellationToken ct = default)
{
var props = _ch.CreateBasicProperties();
props.DeliveryMode = 2; // persistent
var body = Encoding.UTF8.GetBytes(jsonPayload);
_ch.BasicPublish(Exchange, eventType, true, props, body);
return Task.CompletedTask;
}
}
6️⃣ Consumer (Inventory Service)
- Ensures idempotency
- Retries & dead-letters failed messages
Topology Setup
public static class RabbitTopology
{
public static void Ensure(IModel ch)
{
const string exchange = "orders.exchange";
const string queue = "inventory.order-created.q";
const string dlx = "inventory.order-created.dlx";
const string dlq = "inventory.order-created.dlq";
ch.ExchangeDeclare(exchange, ExchangeType.Topic, durable: true);
ch.ExchangeDeclare(dlx, ExchangeType.Fanout, durable: true);
ch.QueueDeclare(dlq, durable: true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", dlx },
{ "x-message-ttl", 60000 }
};
ch.QueueDeclare(queue, durable: true, arguments: args);
ch.QueueBind(queue, exchange, "order.created.*");
ch.QueueBind(dlq, dlx, "");
}
}
Consumer with Idempotency
public class OrderCreatedConsumer : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
var consumer = new AsyncEventingBasicConsumer(_ch);
consumer.Received += async (_, ea) =>
{
try
{
var json = Encoding.UTF8.GetString(ea.Body.ToArray());
var envelope = JsonSerializer.Deserialize<OrderCreatedEnvelope>(json)!;
using var scope = _sp.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<InventoryDb>();
// ✅ Idempotency check
if (await db.ProcessedEvents.FindAsync(envelope.eventId) is not null)
{
_ch.BasicAck(ea.DeliveryTag, false);
return;
}
await UpdateInventoryAsync(db, envelope.payload.items, ct);
db.ProcessedEvents.Add(new ProcessedEvent { EventId = envelope.eventId });
await db.SaveChangesAsync(ct);
_ch.BasicAck(ea.DeliveryTag, false);
}
catch
{
_ch.BasicNack(ea.DeliveryTag, false, requeue: false);
}
};
_ch.BasicConsume("inventory.order-created.q", autoAck: false, consumer);
}
}
7️⃣ Kafka Notes
- Use
topic: order.created.v1 - Key =
orderIdto preserve order per aggregate - Consumers in a group auto-balance
- DLQ = dead-letter topic
- Use Confluent.Kafka in .NET
8️⃣ Observability
- Trace propagation (
traceparent, correlation IDs) - Metrics: publish/consume rate, retry count, DLQ depth
- Dashboards: Prometheus + Grafana
- Alerts: DLQ > 0, lag > SLO
9️⃣ Putting It Together
Order Service
builder.Services.AddDbContext<OrdersDb>(...);
builder.Services.AddSingleton<IEventBus, RabbitMqEventBus>();
builder.Services.AddHostedService<OutboxDispatcher>();
Inventory Service
builder.Services.AddDbContext<InventoryDb>(...);
builder.Services.AddHostedService<OrderCreatedConsumer>();
🔟 Checklist for Production
- [x] Outbox pattern
- [x] Idempotent consumers
- [x] Retry + DLQ
- [x] Observability (metrics/traces/logs)
- [x] Schema evolution (event versioning)
- [x] Backpressure handling (prefetch/partitions)
- [x] Security (TLS, creds, policies)
- [x] DR (broker replication, snapshots)
🎯 Recap
You now have a robust event-driven backbone in .NET 8:
- 📝 Outbox → DB & event consistency
- 🛡 Idempotency → no duplicates
- 🔄 Retry + DLQ → resilience
- 🐇 RabbitMQ / Kafka → async decoupling
Link 👉 Next → Part 11: Deploying to Kubernetes with Docker, Probes, HPA, Ingress & CI/CD
Comments
Post a Comment