Skip to main content

⚡ Part 10: Event-Driven Microservices in .NET 8 (RabbitMQ + Outbox + DLQ)

Synchronous calls (REST/gRPC) are great — but they tightly couple services in time.

  • If the downstream is slow, your request is slow.
  • If it’s down, you’re down.

👉 Enter Event-Driven Microservices.

  • Producers publish events and move on.
  • Consumers process them asynchronously, at their own pace.
  • With outbox pattern, retries, DLQ, and idempotency → your system stays reliable.

0️⃣ Scenario

  • 🛒 Order Service → publishes OrderCreated after saving an order
  • 📦 Inventory Service → subscribes to decrease stock
  • 💳 Payment Service → subscribes to charge the customer

We guarantee:

✅ No event is lost (Outbox pattern)
✅ Consumers handle duplicates (Idempotency)
✅ Poison messages don’t block queues (DLQ)


1️⃣ Messaging Stack with Docker

RabbitMQ (simple & great for routing + DLQ)

📄 docker-compose.rabbit.yml

version: "3.9"
services:
  rabbitmq:
    image: rabbitmq:3.13-management
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # UI http://localhost:15672 (user: guest / pass: guest)

Run:

docker compose -f docker-compose.rabbit.yml up -d

👉 RabbitMQ Management UI → http://localhost:15672


Kafka (optional alternative)

📄 docker-compose.kafka.yml

version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    ports: ["2181:2181"]

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    ports: ["9092:9092"]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Run:

docker compose -f docker-compose.kafka.yml up -d

2️⃣ Event Contract

Keep events schema-driven & versioned:

{
  "eventId": "1f09f4f7a1d24575a2b4b2a1b35c0b1a",
  "eventType": "order.created.v1",
  "occurredAtUtc": "2025-08-22T09:30:00Z",
  "payload": {
    "orderId": 12045,
    "customerId": 77,
    "totalAmount": 1499.00,
    "currency": "INR",
    "items": [
      { "productId": 101, "qty": 1 },
      { "productId": 205, "qty": 2 }
    ]
  },
  "metadata": {
    "traceId": "c4c7…",
    "correlationId": "5a7e…",
    "producer": "order-service",
    "schemaVersion": 1
  }
}

3️⃣ RabbitMQ Topology

  • Exchange: orders.exchange (topic)
  • Routing key: order.created.v1
  • Queues:
    • inventory.order-created.q
    • payment.order-created.q
  • DLQs:
    • inventory.order-created.dlq
    • payment.order-created.dlq

👉 Use x-dead-letter-exchange + x-message-ttl for retries.


4️⃣ Outbox Pattern (Producer Side)

Problem:
If you save an order in DB but crash before publishing the event → drift!

Solution:
Outbox = Write both order + event in same DB transaction. A background dispatcher later publishes events reliably.

Schema

CREATE TABLE Orders (
  Id INT IDENTITY PRIMARY KEY,
  CustomerId INT NOT NULL,
  TotalAmount DECIMAL(18,2) NOT NULL,
  CreatedAtUtc DATETIME2 NOT NULL
);

CREATE TABLE OutboxMessages (
  Id UNIQUEIDENTIFIER PRIMARY KEY,
  OccurredAtUtc DATETIME2 NOT NULL,
  Type NVARCHAR(200) NOT NULL,
  Payload NVARCHAR(MAX) NOT NULL,
  PublishedAtUtc DATETIME2 NULL,
  Attempts INT NOT NULL DEFAULT 0,
  LastError NVARCHAR(1000) NULL
);

CREATE INDEX IX_OutboxMessages_Unpublished
ON OutboxMessages(PublishedAtUtc) WHERE PublishedAtUtc IS NULL;

EF Core Models

public class Order { ... }
public class OutboxMessage { ... }

public class OrdersDb : DbContext
{
    public DbSet<Order> Orders => Set<Order>();
    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
}

Create Order + Stage Outbox

app.MapPost("/orders", async (OrdersDb db, CreateOrderDto dto) =>
{
    using var tx = await db.Database.BeginTransactionAsync();

    var order = new Order(dto.CustomerId, dto.TotalAmount);
    db.Orders.Add(order);
    await db.SaveChangesAsync();

    var evt = new { /* event payload */ };

    db.OutboxMessages.Add(new OutboxMessage
    {
        Type = "order.created.v1",
        Payload = JsonSerializer.Serialize(evt)
    });

    await db.SaveChangesAsync();
    await tx.CommitAsync();

    return Results.Created($"/orders/{order.Id}", new { order.Id });
});

Outbox Dispatcher (BackgroundService)

public class OutboxDispatcher : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            using var scope = _sp.CreateScope();
            var db = scope.ServiceProvider.GetRequiredService<OrdersDb>();
            var bus = scope.ServiceProvider.GetRequiredService<IEventBus>();

            var batch = await db.OutboxMessages
                .Where(m => m.PublishedAtUtc == null && m.Attempts < 10)
                .OrderBy(m => m.OccurredAtUtc)
                .Take(50)
                .ToListAsync(ct);

            foreach (var msg in batch)
            {
                try
                {
                    await bus.PublishAsync(msg.Type, msg.Payload, ct);
                    msg.PublishedAtUtc = DateTime.UtcNow;
                }
                catch (Exception ex)
                {
                    msg.Attempts++;
                    msg.LastError = ex.Message;
                }
            }

            await db.SaveChangesAsync(ct);
            await Task.Delay(1000, ct);
        }
    }
}

5️⃣ RabbitMQ Publisher (IEventBus)

public interface IEventBus
{
    Task PublishAsync(string eventType, string jsonPayload, CancellationToken ct = default);
}

public sealed class RabbitMqEventBus : IEventBus, IDisposable
{
    private readonly IModel _ch;
    private const string Exchange = "orders.exchange";

    public RabbitMqEventBus()
    {
        var factory = new ConnectionFactory { HostName = "localhost", DispatchConsumersAsync = true };
        var conn = factory.CreateConnection();
        _ch = conn.CreateModel();
        _ch.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true);
    }

    public Task PublishAsync(string eventType, string jsonPayload, CancellationToken ct = default)
    {
        var props = _ch.CreateBasicProperties();
        props.DeliveryMode = 2; // persistent
        var body = Encoding.UTF8.GetBytes(jsonPayload);
        _ch.BasicPublish(Exchange, eventType, true, props, body);
        return Task.CompletedTask;
    }
}

6️⃣ Consumer (Inventory Service)

  • Ensures idempotency
  • Retries & dead-letters failed messages

Topology Setup

public static class RabbitTopology
{
    public static void Ensure(IModel ch)
    {
        const string exchange = "orders.exchange";
        const string queue = "inventory.order-created.q";
        const string dlx = "inventory.order-created.dlx";
        const string dlq = "inventory.order-created.dlq";

        ch.ExchangeDeclare(exchange, ExchangeType.Topic, durable: true);
        ch.ExchangeDeclare(dlx, ExchangeType.Fanout, durable: true);
        ch.QueueDeclare(dlq, durable: true);

        var args = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", dlx },
            { "x-message-ttl", 60000 }
        };

        ch.QueueDeclare(queue, durable: true, arguments: args);
        ch.QueueBind(queue, exchange, "order.created.*");
        ch.QueueBind(dlq, dlx, "");
    }
}

Consumer with Idempotency

public class OrderCreatedConsumer : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var consumer = new AsyncEventingBasicConsumer(_ch);
        consumer.Received += async (_, ea) =>
        {
            try
            {
                var json = Encoding.UTF8.GetString(ea.Body.ToArray());
                var envelope = JsonSerializer.Deserialize<OrderCreatedEnvelope>(json)!;

                using var scope = _sp.CreateScope();
                var db = scope.ServiceProvider.GetRequiredService<InventoryDb>();

                // ✅ Idempotency check
                if (await db.ProcessedEvents.FindAsync(envelope.eventId) is not null)
                {
                    _ch.BasicAck(ea.DeliveryTag, false);
                    return;
                }

                await UpdateInventoryAsync(db, envelope.payload.items, ct);
                db.ProcessedEvents.Add(new ProcessedEvent { EventId = envelope.eventId });
                await db.SaveChangesAsync(ct);

                _ch.BasicAck(ea.DeliveryTag, false);
            }
            catch
            {
                _ch.BasicNack(ea.DeliveryTag, false, requeue: false);
            }
        };

        _ch.BasicConsume("inventory.order-created.q", autoAck: false, consumer);
    }
}

7️⃣ Kafka Notes

  • Use topic: order.created.v1
  • Key = orderId to preserve order per aggregate
  • Consumers in a group auto-balance
  • DLQ = dead-letter topic
  • Use Confluent.Kafka in .NET

8️⃣ Observability

  • Trace propagation (traceparent, correlation IDs)
  • Metrics: publish/consume rate, retry count, DLQ depth
  • Dashboards: Prometheus + Grafana
  • Alerts: DLQ > 0, lag > SLO

9️⃣ Putting It Together

Order Service

builder.Services.AddDbContext<OrdersDb>(...);
builder.Services.AddSingleton<IEventBus, RabbitMqEventBus>();
builder.Services.AddHostedService<OutboxDispatcher>();

Inventory Service

builder.Services.AddDbContext<InventoryDb>(...);
builder.Services.AddHostedService<OrderCreatedConsumer>();

🔟 Checklist for Production

  • [x] Outbox pattern
  • [x] Idempotent consumers
  • [x] Retry + DLQ
  • [x] Observability (metrics/traces/logs)
  • [x] Schema evolution (event versioning)
  • [x] Backpressure handling (prefetch/partitions)
  • [x] Security (TLS, creds, policies)
  • [x] DR (broker replication, snapshots)

🎯 Recap

You now have a robust event-driven backbone in .NET 8:

  • 📝 Outbox → DB & event consistency
  • 🛡 Idempotency → no duplicates
  • 🔄 Retry + DLQ → resilience
  • 🐇 RabbitMQ / Kafka → async decoupling

 Link 👉 Next → Part 11: Deploying to Kubernetes with Docker, Probes, HPA, Ingress & CI/CD 

Comments

Popular posts from this blog

🌟 Dot net Microservices interview questions

Here is a comprehensive list of 200 .NET microservices coding questions covering all core microservices concepts and cross-cutting concerns relevant for designing, building, deploying, and maintaining .NET-based distributed systems. 🧩 A. Microservices Fundamentals (20) Build a microservice in .NET 8 that exposes a simple CRUD API. Implement communication between two microservices using REST. How would you design microservices for an e-commerce application? Create a microservice that handles user registration and login. How do you isolate domain logic in a microservice? How to apply the "Single Responsibility Principle" in microservices? Design a service registry/discovery mechanism using custom middleware. Implement a service that handles file uploads and metadata separately. Build a stateless microservice and explain its benefits. Implement health check endpoints in .NET 8. Demonstrate versioning in a microservice API. Add Swagger/OpenAPI support to your m...

⚡ Part 1: Introduction to Generics in C#

🌍 Why Do We Need Generics? Imagine you want to create a stack (like a pile of books 📚): You can push items on top You can pop items off the top If we write a stack for integers : public class IntStack { private int[] items = new int[10]; private int index = 0; public void Push(int item) => items[index++] = item; public int Pop() => items[--index]; } 👉 Problem: This only works for int . What if we want a string stack ? Or a Customer stack ? We’d have to duplicate code for every type. 😢 ✅ Solution: Generics Generics let us create type-safe reusable code without duplication. We can say: “I don’t care what type it is yet — I’ll decide later.” 1) Generic Classes Here’s a generic stack : // Generic class "Stack<T>" // The <T> is a placeholder for any type public class Stack<T> { private T[] items = new T[10]; // Array of type T private int index = 0; // Push adds an item of type T public void P...

🚪 Part 9: API Gateway for .NET 8 Microservices (Ocelot & YARP)

Once you have multiple microservices (Products, Orders, Payments…), exposing each one directly to clients gets messy: Different base URLs Duplicated auth logic No unified rate limiting / caching Hard to evolve routes or aggregate data 👉 Enter the API Gateway — your single front door for all microservices. An API Gateway handles: ✅ Routing & path rewriting ✅ Load balancing, retries, circuit breakers ✅ Authentication & Authorization (JWT, OAuth2) ✅ Rate limiting & caching ✅ Aggregation (compose results from multiple services) In this post we’ll implement two strong options: Ocelot → config-driven, mature, DevOps-friendly YARP (Yet Another Reverse Proxy) → Microsoft’s code-first, extensible gateway ⚖️ Ocelot vs YARP — When to Choose Ocelot → JSON config, minimal C#, built-in QoS (rate limit, circuit breaker). Perfect for teams that like DevOps config-as-code. YARP → full C# control, middleware-friendly, can embed into broader apps (e.g. add dashb...