Shuttle.Recall is an event-sourcing mechanism for .NET that provides a flexible way to persist and retrieve event streams.
dotnet add package Shuttle.RecallTo register Shuttle.Recall, use the AddRecall extension method:
services.AddRecall(builder =>
{
builder.AddProjection("ProjectionName", projection =>
{
projection.AddEventHandler<SomeEvent>((context, evt) =>
{
// handle event
});
});
});The following types are registered:
IEventStore(Scoped): Used to retrieve and save event streams.IEventProcessor(Singleton): Used to process projections.IEventMethodInvoker(Singleton): Invokes event handling methods on aggregate roots.ISerializer(Singleton): Serializes and deserializes events.IConcurrencyExceptionSpecification(Singleton): Detects concurrency exceptions.
services.AddRecall(options =>
{
options.EventProcessing.ProjectionThreadCount = 5;
options.EventProcessing.IncludedProjections.Add("ProjectionName");
options.EventProcessing.ExcludedProjections.Add("ExcludeMe");
options.EventStore.CompressionAlgorithm = "gzip";
options.EventStore.EncryptionAlgorithm = "aes";
});| Property | Default | Description |
|---|---|---|
ProjectionThreadCount |
5 |
Number of threads for projection processing |
IncludedProjections |
[] |
List of projection names to include |
ExcludedProjections |
[] |
List of projection names to exclude |
ProjectionProcessorIdleDurations |
varies | Idle durations for processor polling |
| Property | Default | Description |
|---|---|---|
CompressionAlgorithm |
"" |
Compression algorithm (e.g., "gzip") |
EncryptionAlgorithm |
"" |
Encryption algorithm (e.g., "aes") |
EventHandlingMethodName |
"On" |
Method name invoked on aggregate roots |
BindingFlags |
Instance | NonPublic |
Binding flags for event method discovery |
var eventStore = serviceProvider.GetRequiredService<IEventStore>();
var streamId = Guid.NewGuid();
var stream = await eventStore.GetAsync(streamId);
stream.Add(new SomeEvent { Data = "example" });
await eventStore.SaveAsync(stream);var stream = await eventStore.GetAsync(streamId, builder =>
{
builder.AddHeader("key", "value");
});
stream.Add(new SomeEvent { Data = "example" });
await eventStore.SaveAsync(stream);var stream = await eventStore.GetAsync(streamId);
stream.Add(new SomeEvent { Data = "example" });
stream.ConcurrencyInvariant(5); // throws EventStreamConcurrencyException if version != 5
await eventStore.SaveAsync(stream);var stream = await eventStore.GetAsync(streamId);
stream
.WithCorrelationId(correlationId)
.Add(new SomeEvent { Data = "example" });
await eventStore.SaveAsync(stream);var stream = await eventStore.GetAsync(streamId);
// Apply committed events to an aggregate root or state object
stream.Apply(someAggregateRoot);var stream = await eventStore.GetAsync(streamId);
// Get only committed events
var committedEvents = stream.GetEvents(EventStream.EventRegistrationType.Committed);
// Get only appended events
var appendedEvents = stream.GetEvents(EventStream.EventRegistrationType.Appended);
// Get all events
var allEvents = stream.GetEvents(EventStream.EventRegistrationType.All);var stream = await eventStore.GetAsync(streamId);
stream.Add(new SomeEvent { Data = "example" });
// Events are only applied after commit
stream.Commit();
// Now Apply() will include the committed events
stream.Apply(someAggregateRoot);var stream = await eventStore.GetAsync(streamId);
stream.Remove();
await eventStore.RemoveAsync(streamId);Implement the IEventHandler<T> interface to handle events:
public class OrderProjection : IEventHandler<OrderPlaced>
{
public async Task HandleAsync(IEventHandlerContext<OrderPlaced> context, CancellationToken cancellationToken = default)
{
var evt = context.Event;
var projection = context.Projection;
var primitiveEvent = context.PrimitiveEvent;
// Process the event
await SaveToReadModelAsync(evt.OrderId, evt.Amount, cancellationToken);
// Optionally defer for retry
// context.Defer(TimeSpan.FromSeconds(5));
}
}services.AddRecall(builder =>
{
builder.AddProjection("OrderProjection", projection =>
{
projection.AddEventHandler<OrderProjection>();
});
});services.AddRecall(builder =>
{
builder.AddProjection("OrderProjection", projection =>
{
projection.AddEventHandler((IEventHandlerContext<OrderPlaced> context) =>
{
var evt = context.Event;
// handle event inline
});
});
});builder.AddProjection("ProjectionName", (IEventHandlerContext<SomeEvent> context) =>
{
// handle event
});var processor = serviceProvider.GetRequiredService<IEventProcessor>();
await processor.StartAsync();
// ... application runs ...
await processor.StopAsync();The EventEnvelope class contains metadata about each event:
| Property | Description |
|---|---|
EventId |
Unique identifier for the event |
EventType |
Full type name of the event |
AssemblyQualifiedName |
Assembly-qualified type name |
Event |
The serialized event bytes |
RecordedAt |
When the event was recorded |
Version |
Event version in the stream |
CorrelationId |
Optional correlation ID |
CompressionAlgorithm |
Compression algorithm used |
EncryptionAlgorithm |
Encryption algorithm used |
Headers |
Custom key-value headers |
| Property | Description |
|---|---|
Id |
The stream's unique identifier |
Version |
Current stream version |
CorrelationId |
Correlation ID (if set) |
Removed |
Whether the stream has been removed |
IsEmpty |
Whether the stream has no events |
Count |
Total number of events |
EventStreamConcurrencyException: Thrown when concurrent modification is detectedEventProcessingException: Thrown during projection event processing failures
Please visit the Shuttle.Recall documentation for more information.