Skip to main content
Version: 0.15

Event store

In order to isolate the core library from a particular way of storing events, Eventuous uses the IEventStore abstraction.

The IEventStore interface implements both IEventReader and IEventWriter interfaces. Each of those interfaces is focused on one specific task - reading events from streams, and appending events to streams. This separation is necessary for scenarios when you only need, for example, to read events from a specific store, but not to append them. In such case, you'd want to use the IEventReader interface only.

Eventuous has several implementations of event store abstraction, you can find them in the infrastructure section. The default implementation is EsdbEventStore, which uses EventStoreDB as the event store. It's a great product, and we're happy to provide first-class support for it. It's also a great product for learning about Event Sourcing and CQRS.

In addition, Eventuous has an in-memory event store, which is mostly used for testing purposes. It's not recommended to use it in production, as it doesn't provide any persistence guarantees.

Primitives

Event store works with a couple of primitives, which allow wrapping infrastructure-specific structures. Those primitives are:

Record typeWhat it's for
StreamReadPositionRepresent the stream revision, from there the event store will read the stream forwards or backwards.
ExpectedStreamVersionThe stream version (revision), which we expect to have in the database, when event store tries to append new events. Used for optimistic concurrency.
StreamEventA structure, which holds the event type as a string as well as serialised event payload and metadata.

All of those are immutable records.

Operations

Here are the operations supported by event stores:

FunctionInterfaceWhat's it for
AppendEventsIEventWriterAppend one or more events to a given stream.
ReadEventsIEventReaderRead events from a stream forwards from a given start position.
ReadEventsBackwardsIEventReaderRead events from a stream backwards from a given start position.
StreamExistsIEventStoreChecks if a given stream exists in the store.
TruncateStreamIEventStoreRemoves all events from the stream from the stream start up to a given truncation position.
DeleteStreamIEventStoreDeletes the stream. This operation could behave differently on different store implementations.

Available adapters

Eventuous has several implementations of the event store:

If you use one of the implementations provided, you won't need to know about the event store abstraction. It is required though if you want to implement it for your preferred database.

tip

Preferring EventStoreDB will save you lots of time! Remember to check Event Store Cloud.

Aggregate persistence

As Eventuous 0.15 stopped using the IAggregateStore abstraction, and all related API is marked obsolete.

Aggregate persistence is now supported by a few functions extending the IEventReader and IEventWriter interfaces, described on the Aggregate store page.

State persistence

The vent store interfaces are quite low-level, which allows supporting various underlying databases with relatively small amount of code. However, for higher-level persistence operations, this core API isn't very practical. This usability gap is covered by additional functions that extend code event store interfaces, providing easier to use API for application developers. Most of those functions are also used by Eventuous, particularly in command services.

Storing state updates

Persisting state updates is done by appending events to the entity stream. The low-level API for that is the AppendEvents function. The IEventWriter operation also has one higher-level function to do that easier.

Store events

Task<AppendEventsResult> IEventWriter.Store(
StreamName streamName,
ExpectedStreamVersion expectedStreamVersion,
IReadOnlyCollection<object> changes,
AmendEvent? amendEvent = null,
CancellationToken cancellationToken = default
) {

The Store function will take the list of new events from the changes collection, and then try to call the AppendEvents core function. If AppendEvents throw an exception with "wrong expected version" message, it will throw the OptimisticConcurrencyException.

It's possible to make some changes in events before they get persisted using the AmendEvent function provided as amendEvent argument. The AmendEvent type is equivalent to Func<NewStreamEvent, NewStreamEvent>. Since AmendEvent function deals with the NewStreamEvent instance, it allows adding metadata to the events before persisting them.

Event store with archive

Sometimes, you need to keep historical data but want to keep the operational store small for optimized performance. Eventuous supports using two stores and combine them to one, tiered store. There are two classes that support this:

  • TieredEventReader: uses two stores, one hot and one cold, to read events from streams. If it detects that the hot store stream has been truncated to save space, it will reach to the cold store and attempt to retrieve remaining events. All the retrieved events are combined in a single collection before returning the result.
  • TieredEventStore: implementation of IEventStore that uses the hot store for append, delete and truncate operations, but also reads from hot and cold store using TieredEventReader.

Archive setup

Eventuous is an application-level framework, so it cannot support the archival function directly because it requires a separate, continuously-running process for shoveling events from one store to another. Therefore, setting up an archive store cannot be done using Eventuous as a framework. However, we provide the Connector framework and pre-built container images to make the setup easier.

For example, a connector between EventStoreDB and Elasticsearch copies all the events from the hot store to the archive store real-time. Examples on this page use that connector.

note

Elastic is a particularly good candidate for the archive store because it has multi-tier architecture as a native feature, and you can easily set up a cluster with warm, cold, and frozen tiers, so that you pay less to keep the historical events. Eventuous Connector for Elasticsearch can set up an index with roll-over policies.

When the connector is running, it will replicate all events from one store to the other.

Replication processReplication process

You'd need to have the connector up and running before you can use Elasticsearch as the cold store in Eventuous-based application.

Application setup

When the connector is running, and you see events being replicated to the index, you can add a few things to the application to use Elasticsearch as the cold store.

First, you need to configure the Elasticsearch client and register it in the DI container. Make sure you configure serialization correctly if your events contain structures that are not natively supported by System.Text.Json serializer.

The example below configures Elasticsearch .NET client with custom serialization options:

Program.cs
// Elasticsearch
var options = new JsonSerializerOptions(JsonSerializerDefaults.Web)
.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
var elasticConnectionString = "http://localhost:9200";
var settings = new ConnectionSettings(
new SingleNodeConnectionPool(new Uri(elasticConnectionString)),
(def, _) => new ElasticSerializer(def, options)
);
builder.Services.AddSingleton(new ElasticClient(settings));

// EventStoreDB
var esdbConnectionString = "esdb://localhost:2113?tls=false";
builder.Services.AddEventStoreClient(easbConnectionString);
note

You should always configure the client with custom serializer provided by Eventuous as it makes sure that the document format used to persist events in Elasticsearch matches the way Eventuous does it.

After that, replace the regular registration of IEventStore with:

builder.Services.AddEventStore<EsdbEventStore, ElasticEventStore>();

After that, the tiered event store can be used in the application as usual.

The process of reading events from the tiered store looks like this:

Reading from two storesReading from two stores

Here's the reading process described step-by-step:

  1. First, read the stream from the hot store
  2. If the stream doesn't exist, read all the stream events from the archive store
  3. If the stream exists, check the first retrieved event number
  4. If the number is greater than zero, it means that part of the stream was truncated, so read the rest of the stream from the archive store
  5. Combine events from both stores in one collection
  6. Select only distinct events using the event version
  7. Return the combined collection of events

As the result, the load operation would be as fast as it would be with the hot store, because when the hot store returns a full stream, the event store won't fall back to the archive store. However, when the event store discovers that the hot store contains an incomplete stream, it will attempt to load the historical events from the archive store. As this process is seamless for the user of the event store, there's no difference on the event store interface signature.

Impact on subscriptions

caution

There is a drawback to this approach. As events disappear from the hot store, you won't be able to replay them when you create new subscriptions that need to process all the historical events. However, you need to carefully examine if this is a problem in your application.

In many cases, replaying the full history is the opposite of what you want. If you add a new subscription to host a new read model, that new read model might only need to process the latest events, and processing the full history would just trigger useless database operations.

For example, if you add an Upcoming check-ins read model, it would need to process events from reservations during the booking period that hasn't completed yet. It's not often that you have reservations that are made five years back, in many cases it's just impossible as hotels set their prices only a year or so upfront. Therefore, projecting the full history of reservations would trigger adding and removing thousands of records to the database, which is not what you want. In fact, if you have only reservation events for the past year, and the rest in the archive, you will have the new read model rebuilt much faster, and the fact that all the historical events are archived won't be noticeable.

note

Remember that you might decide to use the archive function in your system if you expect it to produce tens of millions events per year, and keep them forever.

Cleaning up the hot store

Eventuous has no built-in support for truncating streams that can be archived. It might, however, be supported by the database you use as a hot store. For example, EventStoreDB supports stream truncation based on stream settings like maxCount and maxAge, so it will continuously remove events from the hot store during the scavenge operation.

If you use other implementations of event store, you'd need to take care of deleting old events yourself.