Skip to content

Cephalon.EventSourcing.Redis

Maturity: M1 · Ownership: provider-managed · Family: event-sourcing · See audit, matrix.

Cephalon.EventSourcing.Redis is the Redis Streams event-store provider for Cephalon, following the same provider pattern as Cephalon.EventSourcing.MongoDB. It delivers the IEventStore contract against Redis Streams (XADD/XRANGE) instead of a document collection or relational table.

  • a Redis Streams-backed implementation of IEventStore registered through AddCephalonRedisEventSourcing()
  • stream key naming via RedisEventSourcingConfiguration.StreamKey(keyPrefix, streamId){keyPrefix}stream:{streamId}
  • optimistic-version append semantics: reads the current stream version before every AppendAsync, compares against expectedVersion, and throws EventStreamConcurrencyException before writing if they differ
  • AppendAsync validates that each event’s StreamId matches the target stream and that declared StreamVersion values are sequential from the expected version before issuing any XADD commands
  • System.Text.Json serialization for event payloads using the concrete event CLR type
  • event type round-tripping through AssemblyQualifiedName — the type name is stored in the stream entry EventType field and resolved back via Type.GetType() on read
  • stream replay through ReadStreamAsync returning events filtered by StreamVersion >= fromVersion in ascending stream-entry order
  • GetVersionAsync returning -1 for a stream that does not exist yet, by reading the last Redis Stream entry in descending order
  • RedisEventStore.cs
  • RedisEventSourcingConfiguration.cs
  • Hosting/RedisEventSourcingServiceCollectionExtensions.cs

This pack sits on top of Cephalon.EventSourcing, not in place of it. Cephalon.EventSourcing owns the IEventStore contract, the IDomainEvent marker, and EventStreamConcurrencyException. Cephalon.EventSourcing.Redis supplies the Redis Streams implementation of that contract so event-sourced aggregates can keep the same IEventStore injection point while using Redis as the backing store.

The slice is intentionally narrow: it proves append, read, and optimistic concurrency against a Redis Stream and nothing more. Snapshot persistence, projection rebuild, archival, and background replay workers are honest later additions.

builder.Services.AddCephalonRedisEventSourcing(
configuration: "localhost:6379");

The keyPrefix parameter defaults to "cephalon:" and can be overridden:

builder.Services.AddCephalonRedisEventSourcing(
configuration: "localhost:6379",
keyPrefix: "myapp:");

The method registers IConnectionMultiplexer and IEventStore using TryAdd semantics — a host that already registered a shared IConnectionMultiplexer keeps its own instance.

Each domain event is stored as one Redis Stream entry under the key {keyPrefix}stream:{streamId}.

FieldTypeNotes
StreamVersionstring (long)Per-stream monotonic version assigned by the append logic
EventTypestringAssemblyQualifiedName of the concrete event CLR type
PayloadstringSystem.Text.Json-serialized event body using the concrete type
OccurredAtUtcstringISO 8601 UTC timestamp from IDomainEvent.OccurredAtUtc
AppendedAtUtcstringISO 8601 UTC wall-clock time of the XADD batch

The Redis Stream entry ID (auto-generated by XADD *) is a monotonic timestamp-sequence pair managed by Redis and is not used for stream versioning. Stream version is tracked explicitly in the StreamVersion field.

ScenarioBehaviour
GetVersionAsync on empty streamReturns -1
AppendAsync(..., expectedVersion: -1) on empty streamSucceeds — assigns versions starting at 0
AppendAsync(..., expectedVersion: N) when stream is at NSucceeds — appends events at versions N+1, N+2, ...
AppendAsync with wrong expectedVersionEventStreamConcurrencyException thrown before any XADD
Concurrent writer commits same version (race after version read)The second writer’s pre-insert check catches the mismatch on the next AppendAsync call; the race window between the version read and the XADD calls is a known limitation — see below
Event’s StreamVersion does not match expected sequential assignmentInvalidOperationException thrown — events must declare the version the provider will assign
Event’s StreamId does not match the streamId argumentInvalidOperationException thrown

Known concurrency limitation: unlike the MongoDB provider which has an atomic unique index on (StreamId, StreamVersion) as a secondary guard, the Redis provider performs an optimistic pre-check but has no atomic test-and-set. A narrow concurrent race between the GetVersionAsync read and the XADD commands can allow two writers to commit conflicting versions to the same stream. This is an explicit, documented tradeoff for this slice. Hardening with a Lua script or Redis transactions (MULTI/EXEC) is an honest follow-up slice.

ReadStreamAsync(streamId, fromVersion) issues a XRANGE {streamKey} - + command to read all entries, then filters client-side by StreamVersion >= fromVersion. It returns an IAsyncEnumerable<IDomainEvent>, yielding events in the order returned by XRANGE (ascending Redis Stream entry ID order, which corresponds to append order). The CLR type is resolved from EventType via Type.GetType(throwOnError: false) — a missing type throws InvalidOperationException with a message that names the unresolvable type and the stream.

This provider intentionally does not claim:

  • snapshot persistence (ISnapshotStore is not implemented)
  • projection rebuild orchestration
  • archival or retention management
  • background stream replay workers
  • Redis Pub/Sub or Consumer Group integration
  • atomic concurrency via Lua scripting or WATCH/MULTI/EXEC
  • multi-tenancy discriminator population
  • transport or event-bus integration