ข้ามไปยังเนื้อหา

CQRS + Read-model projections

เนื้อหานี้ยังไม่ได้แปลเป็นภาษาไทย แสดงเป็นภาษาอังกฤษแทน

This tutorial walks through CQRS (Command Query Responsibility Segregation) in a CephalonEngine app: the write model owns commands + state, the read model owns optimized projections, and Wolverine events bridge the two.

You’ll build:

  • A Products write model on Postgres (transactional, normalized).
  • A Products read model on ClickHouse (denormalized, analytics-friendly).
  • A Wolverine event bus that publishes ProductCreated / ProductPriceChanged / ProductDiscontinued from the write side and projects them into the read side.
  • Two transport behaviors: POST /products writes via the write model, GET /products/search?... reads via the read model.
  • Reasoning about eventual consistency between the two sides + how to measure the projection lag.

Time: ~60 minutes. Prerequisite: Build your first app — this tutorial picks up where step 3 (data) ends.

What is CQRS, and when do you actually want it?

Section titled “What is CQRS, and when do you actually want it?”

CQRS = different models for reads vs writes. It’s not about different code paths — every web app already has those. It’s about different storage shapes:

SideOptimizes forTypical shape
WriteTransactional consistency, invariants, FK integrityNormalized relational (3NF)
ReadQuery latency, aggregation, full-text, facetingDenormalized — wide rows, materialized views, search indexes
  • Read traffic > 10× write traffic and reads are doing expensive joins/aggregations.
  • The same data is queried in many shapes (per-customer summary, search, dashboards, exports).
  • You need independent scaling — read replicas / read-only nodes that can be added without touching the write side.
  • Eventual consistency is acceptable (the read side lags writes by milliseconds-to-seconds).
  • Low traffic — a single Postgres can serve both sides for years.
  • Strong-consistency reads required — e.g. checkout flows that depend on the latest write within the same request.
  • Single query shape — if every read is the same shape as the write entity, you’re just adding lag for nothing.

CephalonEngine doesn’t force CQRS — it just makes it cheap when you need it.

State-based projections. The write model stores entity state. When state changes, a behavior publishes a *Changed event. A handler on the read side updates a denormalized table.

Command ──→ Write model (Postgres) ──→ Event ──→ Projection handler ──→ Read model (ClickHouse)
└──→ (also: external integrations, audit, search index)

Pros: simple mental model, easy to debug, current state is in one place. Cons: read-side rebuild requires re-emitting all current state (no historical replay).

Events are the source of truth. No “current state” table on the write side — just an append-only event stream. Read projections rebuild by replaying events.

Command ──→ Validate against event-replayed state ──→ Append events ──→ Read projection rebuilds from events

Pros: full audit trail, time-travel queries, rebuild any projection from history. Cons: bigger learning curve, event schema evolution is a real problem, debugging needs event-store tooling.

This tutorial does lightweight CQRS. We cover full event-sourcing at the end with pointers to the Cephalon.EventSourcing family.

Pick up from first-app step 3 — you should have an Acme.Store.Modules.Products module with Product and Category EF Core entities on Postgres.

Terminal window
# At the repo root
docker run -d --name acme-store-clickhouse -p 9000:9000 -p 8123:8123 \
-e CLICKHOUSE_DB=acme_read \
-e CLICKHOUSE_USER=acme \
-e CLICKHOUSE_PASSWORD=acme \
clickhouse/clickhouse-server:24.3
docker run -d --name acme-store-rabbit -p 5672:5672 -p 15672:15672 \
rabbitmq:3.13-management

You should already have Postgres running from step 3. After this, you have 3 containers: Postgres (write), ClickHouse (read), RabbitMQ (eventing).

Add ClickHouse + Wolverine packages:

Terminal window
cd src/Acme.Store.Host
dotnet add package Cephalon.Data.ClickHouse --prerelease
dotnet add package Cephalon.Eventing --prerelease
dotnet add package Cephalon.Eventing.Wolverine --prerelease

Then configure appsettings.json:

appsettings.json
{
"Engine": {
"Data": {
"IdStrategy": "Sfid",
"Provider": null,
"WriteModel": {
"Provider": "Postgres",
"Migrations": { "ApplyOn": "Startup" }
},
"ReadModel": {
"Provider": "ClickHouse",
"Migrations": { "ApplyOn": "Startup" }
}
},
"Messaging": {
"Enabled": true,
"Provider": "Wolverine",
"Wolverine": {
"Transport": "RabbitMQ",
"ConnectionString": "amqp://guest:guest@localhost:5672",
"Concurrency": 4,
"Outbox": { "Enabled": true }
}
}
},
"ConnectionStrings": {
"ProductsWrite": "Host=localhost;Port=5432;Database=acme_store;Username=acme;Password=acme",
"ProductsRead": "Host=localhost;Port=8123;Database=acme_read;Username=acme;Password=acme"
}
}

What’s happening:

  • Engine:Data:WriteModel:Provider="Postgres" + ReadModel:Provider="ClickHouse" tells the data capability that this app has split storage.
  • ConnectionStrings:ProductsWrite / ProductsRead follow the {Module}{Side} convention so the engine routes each DbContext to the right backend.
  • Messaging:Wolverine:Outbox:Enabled=true enables the transactional outbox — events publish in the same transaction as the write, guaranteeing read-side projections see every committed write (even if the broker is briefly down).

Full schema: Reference → Configuration → Data + Messaging.

The write model is the source of truth for commands and invariants. Keep it normalized and small.

src/Acme.Store.Modules.Products/Write/ProductsWriteDbContext.cs
using Cephalon.Data.EntityFramework;
using Microsoft.EntityFrameworkCore;
namespace Acme.Store.Modules.Products.Write;
public sealed class ProductsWriteDbContext(DbContextOptions<ProductsWriteDbContext> options)
: CephalonDbContext(options)
{
public DbSet<Product> Products => Set<Product>();
protected override void OnModelCreating(ModelBuilder b)
{
base.OnModelCreating(b);
b.Entity<Product>(e =>
{
e.UseSfidPrimaryKey();
e.Property(p => p.Sku).HasMaxLength(64).IsRequired();
e.HasIndex(p => p.Sku).IsUnique();
e.Property(p => p.Name).HasMaxLength(200).IsRequired();
e.Property(p => p.Price).HasPrecision(18, 2);
e.Property(p => p.IsDiscontinued).HasDefaultValue(false);
e.Property(p => p.UpdatedAt).IsRowVersion();
});
}
}
public sealed class Product
{
public Sfid Id { get; init; }
public required string Sku { get; init; }
public required string Name { get; set; }
public required string Description { get; set; }
public required decimal Price { get; set; }
public required Sfid CategoryId { get; init; }
public bool IsDiscontinued { get; set; }
public byte[]? UpdatedAt { get; init; } // optimistic concurrency
}

Register in Program.cs:

Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services
.AddCephalonAspNetCore()
.AddModulesFromAssemblies(typeof(Program).Assembly)
.AddCephalonEntityFramework<ProductsWriteDbContext>((sp, opts) =>
{
var conn = sp.GetRequiredService<IConfiguration>().GetConnectionString("ProductsWrite");
opts.UseNpgsql(conn);
})
.Build(builder);

The read model is denormalized for queries. Cram the per-product details + category name + computed aggregations into wide rows.

src/Acme.Store.Modules.Products/Read/ProductsReadDbContext.cs
using Cephalon.Data.EntityFramework;
using Microsoft.EntityFrameworkCore;
namespace Acme.Store.Modules.Products.Read;
public sealed class ProductsReadDbContext(DbContextOptions<ProductsReadDbContext> options)
: CephalonDbContext(options)
{
public DbSet<ProductView> Products => Set<ProductView>();
protected override void OnModelCreating(ModelBuilder b)
{
base.OnModelCreating(b);
b.Entity<ProductView>(e =>
{
e.HasKey(v => v.Id);
e.Property(v => v.Sku).HasMaxLength(64);
e.Property(v => v.Name).HasMaxLength(200);
e.Property(v => v.CategoryName).HasMaxLength(200);
// ClickHouse-specific MergeTree config via annotation
e.HasAnnotation("ClickHouse:Engine", "MergeTree");
e.HasAnnotation("ClickHouse:OrderBy", "(CategoryId, Sku)");
});
}
}
public sealed class ProductView
{
public Sfid Id { get; set; }
public string Sku { get; set; } = "";
public string Name { get; set; } = "";
public string Description { get; set; } = "";
public decimal Price { get; set; }
public Sfid CategoryId { get; set; }
public string CategoryName { get; set; } = ""; // denormalized — no JOIN needed
public bool IsDiscontinued { get; set; }
public int ViewCount { get; set; } // denormalized aggregation
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset LastUpdatedAt { get; set; }
public DateTimeOffset ProjectedAt { get; set; } // when this row was last projected
}

Register the read DbContext:

Program.cs
builder.Services
.AddCephalonAspNetCore()
.AddModulesFromAssemblies(typeof(Program).Assembly)
.AddCephalonEntityFramework<ProductsWriteDbContext>((sp, opts) =>
{
opts.UseNpgsql(sp.GetRequiredService<IConfiguration>().GetConnectionString("ProductsWrite"));
})
.AddCephalonEntityFramework<ProductsReadDbContext>((sp, opts) =>
{
opts.UseConnection(sp.GetRequiredService<IConfiguration>().GetConnectionString("ProductsRead"));
// (Cephalon.Data.ClickHouse registers UseConnection on top of EF Core)
})
.Build(builder);
Denormalize aggressively in the read model. Every JOIN you avoid on the read side is a query you don’t have to optimize later. CategoryName being in the read row means category filtering and display are zero-join. ViewCount being denormalized means “popular products” queries are one column scan, not a left-join + count.

Events are the public contract between write and read sides. Keep them small and stable.

src/Acme.Store.Modules.Products/Events.cs
namespace Acme.Store.Modules.Products;
[Event]
public sealed record ProductCreated(
Sfid Id, string Sku, string Name, string Description, decimal Price,
Sfid CategoryId, string CategoryName, DateTimeOffset CreatedAt);
[Event]
public sealed record ProductPriceChanged(
Sfid Id, decimal OldPrice, decimal NewPrice, DateTimeOffset ChangedAt);
[Event]
public sealed record ProductRenamed(
Sfid Id, string OldName, string NewName, DateTimeOffset ChangedAt);
[Event]
public sealed record ProductDiscontinued(
Sfid Id, DateTimeOffset DiscontinuedAt);

Conventions:

  • Past-tense names. Events describe what happened, not commands.
  • Self-contained payload. The read side should not need to fetch additional data — include everything needed for the projection.
  • Id first. Convention so handlers can dispatch by aggregate.
  • [Event] attribute registers the type with Cephalon.Eventing for typed routing.

Step 5 — Write-side command behavior with outbox publishing

Section titled “Step 5 — Write-side command behavior with outbox publishing”

The command handler does the transactional write and publishes the event in the same transaction. The outbox ensures broker outages don’t cause lost events.

src/Acme.Store.Modules.Products/Behaviors/CreateProductBehavior.cs
public sealed record CreateProductCommand(
string Sku, string Name, string Description, decimal Price, Sfid CategoryId);
public sealed class CreateProductBehavior(
ProductsWriteDbContext db,
IMessagePublisher events,
IIdGenerator<Sfid> ids)
: IRestBehavior<CreateProductCommand, Sfid>
{
public RestRoute Route => RestRoute.Post("/products");
[BehaviorPipeline(typeof(WithRequireScope), Scopes = "products:write")]
[BehaviorPipeline(typeof(WithOutbox))] // ← outbox decorator
public async Task<Sfid> Handle(CreateProductCommand cmd, CancellationToken ct)
{
var category = await db.Categories.FirstAsync(c => c.Id == cmd.CategoryId, ct);
var product = new Product
{
Id = ids.NewId(),
Sku = cmd.Sku,
Name = cmd.Name,
Description = cmd.Description,
Price = cmd.Price,
CategoryId = cmd.CategoryId,
};
db.Products.Add(product);
// Publish event in the SAME transaction as the DB write.
// The outbox holds it until SaveChanges commits.
await events.PublishAsync(new ProductCreated(
product.Id, product.Sku, product.Name, product.Description, product.Price,
product.CategoryId, category.Name, DateTimeOffset.UtcNow), ct);
await db.SaveChangesAsync(ct); // commits write + flushes outbox to broker
return product.Id;
}
}

Key points:

  • IMessagePublisher is the Cephalon.Eventing interface — broker-neutral.
  • WithOutbox decorator wraps the handler so the Publish enqueues into the outbox table instead of going directly to the broker. The outbox flushes when SaveChanges commits — atomically with the DB write.
  • The handler returns only the Sfid — the read model isn’t consulted here.

The projection handler subscribes to events and updates the read model. Idempotent + at-least-once-safe.

src/Acme.Store.Modules.Products/Projections/ProductProjection.cs
public sealed class ProductProjection(ProductsReadDbContext read, ILogger<ProductProjection> log) :
IMessageHandler<ProductCreated>,
IMessageHandler<ProductPriceChanged>,
IMessageHandler<ProductRenamed>,
IMessageHandler<ProductDiscontinued>
{
[BehaviorPipeline(typeof(WithInbox))] // dedupe by event id
[BehaviorPipeline(typeof(WithRetry), MaxAttempts = 5, Backoff = "00:00:30")]
public async Task Handle(ProductCreated e, MessageContext ctx)
{
var view = new ProductView
{
Id = e.Id,
Sku = e.Sku,
Name = e.Name,
Description = e.Description,
Price = e.Price,
CategoryId = e.CategoryId,
CategoryName = e.CategoryName,
IsDiscontinued = false,
CreatedAt = e.CreatedAt,
LastUpdatedAt = e.CreatedAt,
ProjectedAt = DateTimeOffset.UtcNow,
};
await read.Products.AddAsync(view, ctx.CancellationToken);
await read.SaveChangesAsync(ctx.CancellationToken);
log.LogInformation("Projected ProductCreated for {ProductId} in {Lag} ms",
e.Id, (DateTimeOffset.UtcNow - e.CreatedAt).TotalMilliseconds);
}
[BehaviorPipeline(typeof(WithInbox))]
public async Task Handle(ProductPriceChanged e, MessageContext ctx)
{
var view = await read.Products.FindAsync([e.Id], ctx.CancellationToken)
?? throw new InvalidOperationException($"ProductView {e.Id} not found");
view.Price = e.NewPrice;
view.LastUpdatedAt = e.ChangedAt;
view.ProjectedAt = DateTimeOffset.UtcNow;
await read.SaveChangesAsync(ctx.CancellationToken);
}
// Handle(ProductRenamed) and Handle(ProductDiscontinued) follow the same shape.
}

Key points:

  • One handler class per aggregate (Product). It implements IMessageHandler<T> once per event type.
  • WithInbox decorator deduplicates by event id — so if the broker redelivers (network blip, ack lost), the projection runs at most once per logical event.
  • WithRetry handles transient ClickHouse failures.
  • The handler logs the projection lag — useful for measuring eventual-consistency window.

The read behavior queries the read model directly. No write-side dependency, no JOIN to category.

src/Acme.Store.Modules.Products/Behaviors/SearchProductsBehavior.cs
public sealed record ProductSearchInput(
string? Query, Sfid? CategoryId, decimal? MinPrice, decimal? MaxPrice,
int Limit = 20, int Offset = 0);
public sealed record ProductSearchResult(
IReadOnlyList<ProductView> Items, int Total);
public sealed class SearchProductsBehavior(ProductsReadDbContext read)
: IRestBehavior<ProductSearchInput, ProductSearchResult>
{
public RestRoute Route => RestRoute.Get("/products/search");
public async Task<ProductSearchResult> Handle(ProductSearchInput q, CancellationToken ct)
{
var query = read.Products.Where(p => !p.IsDiscontinued);
if (!string.IsNullOrWhiteSpace(q.Query))
query = query.Where(p => p.Name.Contains(q.Query) || p.Sku.Contains(q.Query));
if (q.CategoryId is { } cat)
query = query.Where(p => p.CategoryId == cat);
if (q.MinPrice is { } min) query = query.Where(p => p.Price >= min);
if (q.MaxPrice is { } max) query = query.Where(p => p.Price <= max);
var total = await query.CountAsync(ct);
var items = await query
.OrderByDescending(p => p.ViewCount) // pre-denormalized aggregation
.Skip(q.Offset).Take(q.Limit)
.ToListAsync(ct);
return new ProductSearchResult(items, total);
}
}

ClickHouse runs this in single-digit milliseconds across millions of rows because:

  • No JOIN — CategoryName is in the row.
  • OrderBy matches the table’s MergeTree sort key.
  • ViewCount is denormalized — no aggregation at query time.

Run the host + place a request:

Terminal window
dotnet run --project src/Acme.Store.Host
# Place a write
curl -X POST http://localhost:5000/products \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <token-with-products:write>" \
-d '{"sku":"WIDGET-001","name":"Blue widget","description":"...","price":12.50,"categoryId":"01HQ..."}'
# → {"id": "01HQ8RVKSXM4Y8RBVPDC0E9YHZ"}
# Wait ~50-200ms for projection...
sleep 0.5
# Read back
curl 'http://localhost:5000/products/search?query=widget'
# → {"items": [{ "id": "01HQ...", "name": "Blue widget", "categoryName": "Tools", ... }], "total": 1}

You can also verify via the engine manifest:

Terminal window
curl http://localhost:5000/engine/manifest | jq '.capabilities'
# → ["Data", "Eventing", "Identity", "Observability"]

Reads see writes after the projection runs. Typical lag:

ScenarioLag p50Lag p99
In-memory transport (dev)<1 ms<5 ms
RabbitMQ, same data center10-30 ms50-150 ms
RabbitMQ across regions50-100 ms300-500 ms
Kafka, single partition5-15 ms30-80 ms

Acceptable for: catalog browsing, search, dashboards, reporting. Not acceptable for: checkout reading the cart you just wrote (use the write model directly for that), idempotency checks against the same request.

The projection handler logs ProjectionLag = ProjectedAt - EventCreatedAt. Surface this in observability:

log.LogInformation("Projected {Event} in {Lag} ms", event.GetType().Name,
(DateTimeOffset.UtcNow - event.CreatedAt).TotalMilliseconds);

Then in Grafana / Tempo, alert when p99 > 500ms.

The lightweight CQRS above stores current state on the write side. To go full event-sourced — events ARE the state, replay rebuilds projections:

appsettings.json
{
"Engine": {
"EventSourcing": {
"Enabled": true,
"Provider": "EntityFramework", // or "Cassandra", "ClickHouse", etc.
"Snapshots": { "Every": 100 },
"Projections": { "RebuildOnSchemaChange": true }
}
}
}

Then refactor the write side from Product (state) to ProductAggregate (event-replayer):

public sealed class ProductAggregate : EventSourcedAggregate<Sfid>
{
public string Sku { get; private set; } = "";
public decimal Price { get; private set; }
public bool IsDiscontinued { get; private set; }
public void Apply(ProductCreated e) { /* update state from event */ }
public void Apply(ProductPriceChanged e) { Price = e.NewPrice; }
public void Apply(ProductDiscontinued e) { IsDiscontinued = true; }
public void ChangePrice(decimal newPrice)
{
if (IsDiscontinued) throw new InvariantException("Cannot price a discontinued product");
RaiseEvent(new ProductPriceChanged(Id, Price, newPrice, DateTimeOffset.UtcNow));
}
}

Read projections subscribe to the event stream and rebuild from history (not just from new events). Full coverage in Reference → Components → Event Sourcing.

Don’tDo
Query the read model from inside a command handlerRead-after-write inside a command = consistency bug. Use the write DbContext if you need read-after-write.
Use the write model for queries that don’t touch the writesIf the query doesn’t need invariants, it shouldn’t pay the write-model cost (JOINs, locks). Project to the read side.
Skip the outbox because “in-memory transport doesn’t drop messages”Production transport WILL drop messages without an outbox. Always enable outbox.
Denormalize the read model from the write model directly via DB triggersDB triggers tie write and read storage tightly. Use events — they’re decoupled.
Make every event a *Changed summary of the full entityEvent payloads should be what changed, not the full entity. Smaller events = cleaner audit trail + cheaper to publish.
Run write and read on the same Postgres “for now”If you’re going to deploy read replicas later, set up the read model on day 1 with the projection pipeline. Switching later is expensive.
Allow handlers to read external services (other APIs, files, etc.)Projections should be deterministic. Bring the data into the event payload OR call the external service from a separate behavior.

When the read schema changes (or a bug is fixed), rebuild from scratch:

public sealed class ProductsReadModelRebuilder(
ProductsWriteDbContext write,
IMessagePublisher events) : IHostedService
{
public async Task StartAsync(CancellationToken ct)
{
// Re-emit "synthetic" events from the current state of the write model.
var products = await write.Products.Include(p => p.Category).ToListAsync(ct);
foreach (var p in products)
{
await events.PublishAsync(new ProductCreated(
p.Id, p.Sku, p.Name, p.Description, p.Price,
p.CategoryId, p.Category.Name, p.CreatedAt), ct);
}
}
}

Run once, then disable. Better: keep an event store (full event-sourcing) and replay from there.

Nothing forces 1:1 between aggregate and read model. Common pattern: 3 read models per aggregate, one per consumer:

ProductAggregate (write)
├── ProductSearchView (ClickHouse — for full-text search)
├── ProductDashboardView (Postgres — for admin UI)
└── ProductPricingView (Redis — for fast cart calculations)

Each projection handler subscribes to the same events but writes to its own optimized shape.

Events are the public contract. Breaking them breaks consumers. Two patterns:

  1. Additive only. Add optional fields, never remove. Consumers ignore unknown fields.
  2. Versioned envelope. ProductCreatedV2 with a new schema; handlers explicitly opt into the new version.

Cephalon.Eventing supports both — pick one and stick to it.

If you replay events at startup, snapshots avoid the long warmup:

{ "Engine": { "EventSourcing": { "Snapshots": { "Every": 100 } } } }

Stores a snapshot every 100 events. Recovery: load latest snapshot + replay events after it.