ข้ามไปยังเนื้อหา

Cephalon.EventSourcing.MongoDB

เนื้อหานี้ยังไม่ได้แปลเป็นภาษาไทย แสดงเป็นภาษาอังกฤษแทน

Maturity: M1 · Ownership: provider-managed · Family: event-sourcing · See audit, matrix.

Cephalon.EventSourcing.MongoDB is the MongoDB event-store provider for Cephalon, following the same provider pattern as Cephalon.EventSourcing.EntityFramework. It delivers the IEventStore contract against a MongoDB collection instead of a relational table, with optimistic concurrency enforced by a compound unique index on (StreamId, StreamVersion).

  • a MongoDB-backed implementation of IEventStore registered through AddCephalonMongoDbEventSourcing()
  • the MongoDbEventEntry document model for append-only event stream documents
  • MongoDbEventSourcingConfiguration that creates the compound unique index on (StreamId, StreamVersion) using lazy double-check semantics so indexes are created on first use, not at startup
  • optimistic-version append semantics: reads the current stream version before every AppendAsync, compares against expectedVersion, and throws EventStreamConcurrencyException before writing if they differ
  • a fallback concurrency guard via InsertManyAsync — if a concurrent writer commits the same version between the version read and the insert, MongoDB raises error code 11000 and the provider re-reads the actual version before rethrowing EventStreamConcurrencyException
  • stream replay through ReadStreamAsync returning events ordered by StreamVersion ascending
  • GetVersionAsync returning -1 for a stream that does not exist yet
  • System.Text.Json serialization for event payloads using the concrete event CLR type
  • event type round-tripping through AssemblyQualifiedName — the type name is stored as written by the CLR and resolved back via Type.GetType() on read
  • MongoDbEventEntry.cs
  • MongoDbEventStore.cs
  • MongoDbEventSourcingConfiguration.cs
  • Hosting/MongoDbEventSourcingServiceCollectionExtensions.cs

This pack sits on top of Cephalon.EventSourcing, not in place of it. Cephalon.EventSourcing owns the IEventStore contract, the IDomainEvent marker, and EventStreamConcurrencyException. Cephalon.EventSourcing.MongoDB supplies the MongoDB document-store implementation of that contract so event-sourced aggregates can keep the same IEventStore injection point while swapping the backing store without changing application code.

The slice is intentionally narrow: it proves append, read, and optimistic concurrency against a MongoDB collection and nothing more. Snapshot persistence, projection rebuild, archival, and background replay workers are honest later additions.

builder.Services.AddCephalonMongoDbEventSourcing(
connectionString: "mongodb://localhost:27017",
databaseName: "myapp");

The collectionName parameter defaults to "event_streams" and can be overridden:

builder.Services.AddCephalonMongoDbEventSourcing(
connectionString: connectionString,
databaseName: "myapp",
collectionName: "domain_events");

The method registers IMongoClient, IMongoDatabase, and the typed IMongoCollection<MongoDbEventEntry> using TryAdd semantics — a host that already registered a shared IMongoClient keeps its own instance.

Event stream collection schema (event_streams)

Section titled “Event stream collection schema (event_streams)”
FieldBSON typeNotes
_idObjectIdAuto-generated surrogate key
StreamIdstringLogical aggregate / stream identifier
StreamVersionlongPer-stream monotonic version (1-based; stream starts at version 1)
EventTypestringAssemblyQualifiedName of the concrete event CLR type
PayloadstringSystem.Text.Json-serialized event body using the concrete type
OccurredAtUtcDateTimeIDomainEvent.OccurredAtUtc as stored by the domain event
AppendedAtUtcDateTimeUTC wall-clock time of the InsertManyAsync call
CorrelationIdstring?Optional; not populated in this slice
TenantIdstring?Optional; not populated in this slice

Index: compound unique index on (StreamId ascending, StreamVersion ascending). This index is the primary concurrency guard and makes per-stream range queries efficient without a full collection scan.

ScenarioBehaviour
GetVersionAsync on empty streamReturns -1
AppendAsync(..., expectedVersion: -1) on empty streamSucceeds — assigns versions starting at 1
AppendAsync(..., expectedVersion: N) when stream is at NSucceeds — appends events at versions N+1, N+2, ...
AppendAsync with wrong expectedVersionEventStreamConcurrencyException thrown before insert
Concurrent writer commits same version (race after version read)InsertManyAsync raises error code 11000; provider re-reads actual version and throws EventStreamConcurrencyException
Event’s StreamVersion does not match expected sequential assignmentInvalidOperationException thrown — events must declare the version the provider will assign
Event’s StreamId does not match the streamId argumentInvalidOperationException thrown

ReadStreamAsync(streamId, fromVersion) filters the collection with:

StreamId == streamId AND StreamVersion >= fromVersion

and sorts by StreamVersion ascending. It returns an IAsyncEnumerable<IDomainEvent>, yielding events one by one as the cursor advances. The CLR type is resolved from EventType via Type.GetType(throwOnError: false) — a missing type throws InvalidOperationException with a message that names the unresolvable type and the stream.

MongoDbEventStore uses a volatile bool _indexesCreated flag checked before every operation. On first access it calls MongoDbEventSourcingConfiguration.EnsureIndexesAsync(), which calls CreateOneAsync with CreateIndexOptions { Background = false }. Subsequent calls skip the check via the volatile read. This avoids startup cost if the store is never accessed in a given process lifetime.

This provider intentionally does not claim:

  • snapshot persistence (ISnapshotStore is not implemented)
  • projection rebuild orchestration
  • archival or retention management
  • background stream replay workers
  • change-stream subscription support
  • transport or event-bus integration
  • multi-tenancy discriminator population (TenantId field is present but not filled)