Aggregate stream
Concept
So far, we figured out that an Aggregate is the transaction and consistency boundary within the domain model.
The aggregate store (application-level abstraction) uses an event store (infrastructure) to store events in streams. Each aggregate instance has its own stream, so the event store needs to be capable to read and write events from/to the correct stream.
When appending events to a stream, the append operation for a single stream must be transactional to ensure that the stream is consistent. Eventuous handles commands using the command service, and one command handler is the unit of work. All the events generated by the aggregate instance during the unit of work are appended to the stream as the final step in the command handling process.
Stream name
By default, Eventuous uses the AggregateType.Name
combined with the aggregate id as the stream name. For example, the Booking
aggregate with id 1
has a stream name Booking-1
. That's what StreamName.For<Booking>(1)
returns.
However, you might want to have more fine-grained control over the stream name. For example, you might want to include the tenant id in the stream name. It's possible to override the default convention by configuring the stream name mapping. The stream map contains a mapping between the aggregate identity type (derived from AggregateId
) and the stream name generation function. Therefore, any additional property of the aggregate identity type can be used to generate the stream name.
For example, the following code registers a stream name mapping for the Booking
aggregate:
public record BookingId : AggregateId {
public BookingId(string id, string tenantId) : base(id) {
TenantId = tenantId;
}
public string TenantId { get; }
}
Create a StreamNameMap
and register in the container:
var streamNameMap = new StreamNameMap();
streamNameMap.Register<BookingId>(
id => new StreamName($"Booking-{id.TenantId}:{id.Value}") // Split in example with : if you use a Guid as identifier.
);
builder.Services.AddSingleton(streamNameMap);
builder.Services.AddCommandService<BookingService, Booking>();
Then, use the registered StreamNameMap
in the CommandService
:
public class BookingService : CommandService<Booking, BookingState, BookingId> {
public BookingService(IAggregateStore store, StreamNameMap streamNameMap)
: base(store, streamNameMap: streamNameMap) {
// command handlers registered here
}
}
In your projections you can retrieve the Id
and TenantId
from the StreamName
in the IMessageConsumeContext<out T>
:
static UpdateDefinition<BookingDocument> HandleRoomBooked(
IMessageConsumeContext<V1.RoomBooked> ctx,
UpdateDefinitionBuilder<BookingDocument> update
) {
var evt = ctx.Message;
// Get Id and TenantId
var (id, tenantId) = ctx.Stream.ExtractMultiTenantIds();
return update
.SetOnInsert(x => x.Id, id)
.SetOnInsert(x => x.TenantId, tenantId)
.Set(x => x.GuestId, evt.GuestId)
.Set(x => x.RoomId, evt.RoomId)
.Set(x => x.CheckInDate, evt.CheckInDate)
.Set(x => x.CheckOutDate, evt.CheckOutDate)
.Set(x => x.BookingPrice, evt.BookingPrice)
.Set(x => x.Outstanding, evt.OutstandingAmount);
}
The snippet above uses the following extension method to extract the Id
and TenantId
from the StreamName
:
public static class StreamNameExtensions
{
/// <summary>
/// Split the StreamName into multiple parts for multi tenant stream id.
/// </summary>
/// <param name="stream">The streamname</param>
/// <param name="separator">The seperator for splitting. Default is ':'.</param>
/// <returns>A tuple with TenantId and Id property.</returns>
/// <exception cref="InvalidStreamName">When stream id can't be split in 2 sections.</exception>
public static (string TenantId, string Id) ExtractMultiTenantIds(this StreamName stream, char separator = ':')
{
string streamId = stream.GetId();
var streamIdParts = streamId.Split(separator);
if (streamIdParts.Length != 2)
{
throw new InvalidStreamName(streamId);
}
return (streamIdParts[0], streamIdParts[1]);
}
}