Welcome to EventFlow’s documentation!¶

EventFlow is a basic CQRS+ES framework designed to be easy to use.
Have a look at our getting started guide, the do’s and don’ts and the FAQ.
Contents:
Getting started¶
Initializing EventFlow always starts with an EventFlowOptions.New
as this
performs the initial bootstrap and starts the fluent configuration API. The
very minimum initialization of EventFlow can be done in a single line, but
wouldn’t serve any purpose as no domain has been configured.
var resolver = EventFlowOptions.New.CreateResolver();
The above line does configures several important defaults
- Custom internal IoC container
- In-memory event store
- Console logger
- A “null” snapshot store, that merely writes a warning if used (no need to do anything before going to production if you aren’t planning to use snapshots)
- And lastly, default implementations of all the internal parts of EventFlow
Important
If you’re using ASP.NET Core, you should install the *EventFlow.AspNetCore* package and invoke AddAspNetCoreMetadataProviders in Startup.
public void ConfigureServices(IServiceCollection services)
{
services.AddEventFlow(ef =>
{
ef.AddDefaults(typeof(Startup).Assembly);
ef.AddAspNetCoreMetadataProviders();
});
}
Important
Before using EventFlow in a production environment, you should configure an alternative event store, an alternative IoC container and another logger that sends log messages to your production log store.
To start using EventFlow, a domain must be configured which consists of the following parts
- Aggregate
- Aggregate identity
- Aggregate events
- Commands and command handlers (optional, but highly recommended)
In addition to the above, EventFlow provides several optional features. Whether or not these features are utilized depends on the application in which EventFlow is used.
Example application¶
The example application includes one of each of the required parts: aggregate, event, aggregate identity, command and a command handler. Further down we will go through each of the individual parts.
Note
The example code provided here is located within the EventFlow code base
exactly as shown, so if you would like to debug and step through the
entire flow, checkout the code and execute the GettingStartedExample
test.
https://github.com/eventflow/Documentation/tree/master/Source/EventFlow.Documentation/GettingStarted
All classes create for the example application are prefixed with Example
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | // We wire up EventFlow with all of our classes. Instead of adding events,
// commands, etc. explicitly, we could have used the the simpler
// AddDefaults(Assembly) instead.
using (var resolver = EventFlowOptions.New
.AddEvents(typeof(ExampleEvent))
.AddCommands(typeof(ExampleCommand))
.AddCommandHandlers(typeof(ExampleCommandHandler))
.UseInMemoryReadStoreFor<ExampleReadModel>()
.CreateResolver())
{
// Create a new identity for our aggregate root
var exampleId = ExampleId.New;
// Define some important value
const int magicNumber = 42;
// Resolve the command bus and use it to publish a command
var commandBus = resolver.Resolve<ICommandBus>();
var executionResult = await commandBus.PublishAsync(
new ExampleCommand(exampleId, magicNumber),
CancellationToken.None)
.ConfigureAwait(false);
// Verify that we didn't trigger our domain validation
executionResult.IsSuccess.Should().BeTrue();
// Resolve the query handler and use the built-in query for fetching
// read models by identity to get our read model representing the
// state of our aggregate root
var queryProcessor = resolver.Resolve<IQueryProcessor>();
var exampleReadModel = await queryProcessor.ProcessAsync(
new ReadModelByIdQuery<ExampleReadModel>(exampleId),
CancellationToken.None)
.ConfigureAwait(false);
// Verify that the read model has the expected magic number
exampleReadModel.MagicNumber.Should().Be(42);
}
|
The above example publishes the ExampleCommand
to the aggregate with the
exampleId
identity with the magical value of 42
. After the command has
been published, the accompanying read model ExampleReadModel
is fetched
and we verify that the magical number has reached it.
During the execution of the example application, a single event is emitted and stored in the in-memory event store. The JSON for the event is shown here.
{
"MagicNumber": 42
}
The event data itself is straightforward as it is merely the JSON serialization of
an instance of the type ExampleEvent
with the value we defined. A bit more
interesting is the metadata that EventFlow stores alongside the event, which is
used by the EventFlow event store.
{
"timestamp": "2016-11-09T20:56:28.5019198+01:00",
"aggregate_sequence_number": "1",
"aggregate_name": "ExampleAggrenate",
"aggregate_id": "example-c1d4a2b1-c75b-4c53-ae44-e67ee1ddfd79",
"event_id": "event-d5622eaa-d1d3-5f57-8023-4b97fabace90",
"timestamp_epoch": "1478721389",
"batch_id": "52e9d7e9-3a98-44c5-926a-fc416e20556c",
"source_id": "command-69176516-07b7-4142-beaf-dba82586152c",
"event_name": "example",
"event_version": "1"
}
All the built-in metadata is available on each instance of IDomainEvent<,,>
,
which is accessible from event handlers for e.g. read models or subscribers. It is
also possible to create your own metadata providers
or add additional EventFlow built-in providers as needed.
Aggregate identity¶
The aggregate ID in EventFlow is represented as a value object that inherits
from the IIdentity
interface. You can provide your own implementation, but
EventFlow provides a convenient implementation that will suit most needs. Be
sure to read the section about the Identity<> class
for details on how to use it.
For our example application we use the built-in class, which makes the implementation very simple.
1 2 3 4 5 6 | /// Represents the aggregate identity (ID)
public class ExampleId :
Identity<ExampleId>
{
public ExampleId(string value) : base(value) { }
}
|
Aggregate¶
Now we’ll take a look at the ExampleAggregate
. It is rather simple as the
only thing it can do is apply the magic number once.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class ExampleAggregate :
AggregateRoot<ExampleAggregate, ExampleId>,
IEmit<ExampleEvent>
{
private int? _magicNumber;
public ExampleAggregate(ExampleId id) : base(id) { }
// Method invoked by our command
public IExecutionResult SetMagicNumer(int magicNumber)
{
if (_magicNumber.HasValue)
return ExecutionResult.Failed("Magic number already set");
Emit(new ExampleEvent(magicNumber));
return ExecutionResult.Success();
}
// We apply the event as part of the event sourcing system. EventFlow
// provides several different methods for doing this, e.g. state objects,
// the Apply method is merely the simplest
public void Apply(ExampleEvent aggregateEvent)
{
_magicNumber = aggregateEvent.MagicNumber;
}
|
Be sure to read the section on aggregates to get all the
details right. For now the most important thing to note, is that the state
of the aggregate (updating the _magicNumber
variable) happens in the
Apply(ExampleEvent)
method. This is the event sourcing part of EventFlow in
effect. As state changes are only saved as events, mutating the aggregate state
must happen in such a way that the state changes are replayed the next time the
aggregate is loaded. EventFlow has a set of different approaches
that you can select from. In this example we use the Apply methods as
they are the simplest.
Important
The Apply(ExampleEvent)
is invoked by the Emit(...)
method, so
after the event has been emitted, the aggregate state has changed.
The ExampleAggregate
exposes the SetMagicNumer(int)
method, which
is used to expose the business rules for changing the magic number. If the
magic number hasn’t been set before, the event ExampleEvent
is emitted
and the aggregate state is mutated.
If the magic numer was changed, we return a failed IExecutionResult
with
an error message. Returning a failed execution result will make EventFlow
disregard any events the aggregate has emitted.
If you need to return something more useful than a bool
in an execution
result, merely create a new class that implements the IExecutionResult
interface and specific the type as generic arguments for the command and
command handler.
Note
While possible, do not use the execution results as a method of reading
values from the aggregate, that’s what the IQueryProcessor
and
read models are for.
Event¶
Next up is the event which represents something that has happened in our domain. In this example, it’s merely that some magic number has been set. Normally these events should have a really, really good name and represent something in the ubiquitous language for the domain.
1 2 3 4 5 6 7 8 9 10 11 12 | /// A basic event containing some information
[EventVersion("example", 1)]
public class ExampleEvent :
AggregateEvent<ExampleAggregate, ExampleId>
{
public ExampleEvent(int magicNumber)
{
MagicNumber = magicNumber;
}
public int MagicNumber { get; }
}
|
We have applied the [EventVersion("example", 1)]
to our event, marking it
as the example
event version 1
, which directly corresponds to the
event_name
and event_version
from the metadata store along side the
event mentioned. The information is used by EventFlow to tie the name and version to
a specific .NET type.
Important
Even though the using the EventVersion
attribute is optional, it is
highly recommended. EventFlow will infer the information if it isn’t
provided, thus making it vulnerable to type renames among other things.
Important
Once you have aggregates in your production environment that have emitted an event, you should never change the .NET implementation. You can deprecate it, but you should never change the type or the data stored in the event store.
Command¶
Commands are the entry point to the domain and if you remember from the example
application, they are published using the ICommandBus
as shown here.
1 2 3 4 5 | var commandBus = resolver.Resolve<ICommandBus>();
var executionResult = await commandBus.PublishAsync(
new ExampleCommand(exampleId, magicNumber),
CancellationToken.None)
.ConfigureAwait(false);
|
In EventFlow commands are simple value objects that merely house the arguments for
the command execution. All commands implement the ICommand<,>
interface, but
EventFlow provides an easy-to-use base class that you can use.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | /// Command for update magic number
public class ExampleCommand :
Command<ExampleAggregate, ExampleId, IExecutionResult>
{
public ExampleCommand(
ExampleId aggregateId,
int magicNumber)
: base(aggregateId)
{
MagicNumber = magicNumber;
}
public int MagicNumber { get; }
}
|
A command doesn’t do anything without a command handler. In fact, EventFlow will throw an exception if a command doesn’t have exactly one command handler registered.
Command handler¶
The command handler provides the glue between the command, the aggregate and the IoC container as it defines how a command is executed. Typically they are rather simple, but they could contain more complex logic. How much is up to you.
1 2 3 4 5 6 7 8 9 10 11 12 13 | /// Command handler for our command
public class ExampleCommandHandler :
CommandHandler<ExampleAggregate, ExampleId, IExecutionResult, ExampleCommand>
{
public override Task<IExecutionResult> ExecuteCommandAsync(
ExampleAggregate aggregate,
ExampleCommand command,
CancellationToken cancellationToken)
{
var executionResult = aggregate.SetMagicNumer(command.MagicNumber);
return Task.FromResult(executionResult);
}
}
|
The ExampleCommandHandler
in our case here merely invokes the
SetMagicNumer
on the aggregate and returns the execution result. Remember, if
a command handler returns a failed execution result, EventFlow will disregard any
events the aggregate has emitted.
Important
Everything inside the ExecuteAsync(...)
method of a command handler
may be executed more than once if there’s an optimistic concurrency
exception, i.e., something else has happened to the aggregate since it
as loaded from the event store and its therefor automatically reloaded by
EventFlow. It is therefor essential that the command handler doesn’t mutate
anything other than the aggregate.
Read model¶
If you ever need to access the data in your aggregates efficiently, its important that read models are used. Loading aggregates from the event store takes time and its impossible to query for e.g. aggregates that have a specific value in its state.
In our example we merely use the built-in in-memory read model store. It is useful in many cases, e.g. executing automated domain tests in a CI build.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | /// Read model for our aggregate
public class ExampleReadModel :
IReadModel,
IAmReadModelFor<ExampleAggregate, ExampleId, ExampleEvent>
{
public int MagicNumber { get; private set; }
public void Apply(
IReadModelContext context,
IDomainEvent<ExampleAggregate, ExampleId, ExampleEvent> domainEvent)
{
MagicNumber = domainEvent.AggregateEvent.MagicNumber;
}
}
|
Notice the IDomainEvent<ExampleAggrenate, ExampleId, ExampleEvent> domainEvent
argument. It’s merely a wrapper around the specific event we implemented
earlier. The IDomainEvent<,,>
provides additional information, e.g. any
metadata stored alongside the event.
The main difference between the event instance emitted in the aggregate and the instance wrapped here, is that the event has been committed to the event store.
Next steps¶
Although the implementation in this guide enables you to create a complete application, there are several topics that are recommended as next steps.
- Read the dos and donts section
- Use value objects to produce cleaner JSON
- If your application need to act on an emitted event, create a subscriber
- Check the configuration to make sure everything is as you would like it
- Setup a persistent event store using e.g. Microsoft SQL Server
- Create read models for efficient querying
- Consider the use of specifications to ease creation of business rules
Identity¶
The Identity<>
value object provides generic functionality to create
and validate the IDs of aggregate roots. It is basically a wrapper
around a Guid
.
Lets say we want to create a new identity named TestId
. We could do
it like this.
public class TestId : Identity<TestId>
{
public TestId(string value)
: base(value)
{
}
}
The identity follows the form
{classname without "Id"}-{guid}
e.g.test-c93fdb8c-5c9a-4134-bbcd-87c0644ca34f
for the aboveTestId
exampleThe internal
Guid
can be generated using one of the following methods/properties. Note that you can access theGuid
factories directly by accessing the static methods on theGuidFactories
class -New
: Uses the standardGuid.NewGuid()
-NewDeterministic(...)
: Creates a name-basedGuid
using thealgorithm from RFC 4122 §4.3, which allows identities to be generated based on known data, (e.g. an user e-mail). It always returns the same identity for the same arguments
NewComb()
: Creates a sequentialGuid
that can be used to avoid database fragmentation
A
string
can be tested to see if its a valid identity using the staticbool IsValid(string)
methodAny validation errors can be gathered using the static
IEnumerable<string> Validate(string)
method
Important
Its very important to name the constructor argument value
as it is significant when the identity type is deserialized.
Here are some examples of how we can use our newly created TestId
// Uses the default Guid.NewGuid()
var testId = TestId.New
// Create a namespace, put this in a constant somewhere
var emailNamespace = Guid.Parse("769077C6-F84D-46E3-AD2E-828A576AAAF3");
// Creates an identity with the value "test-9181a444-af25-567e-a866-c263b6f6119a"
var testId = TestId.NewDeterministic(emailNamespace, "test@example.com");
// Creates a new identity every time, but an identity when used in
// database indexes, minimizes fragmentation
var testId = TestId.NewComb()
Note
Be sure to read the section about
value objects as the Identity<>
is basically a
value object.
Aggregates¶
Initially before you can create an aggregate, you need to create its
identity. You can create your own implementation by implementing the
IIdentity
interface or you can use a base class Identity<>
that
EventFlow provides, like this.
public class TestId : Identity<TestId>
{
public TestId(string value) : base(value)
{
}
}
The Identity<> value object
provides generic functionality to create and validate aggregate root
IDs. Please read the documentation regarding the bundled Identity<>
type as it provides several useful features, e.g. several different
schemes for ID generation, including one that minimizes MSSQL database
fragmentation.
Next, to create a new aggregate, simply inherit from
AggregateRoot<,>
like this, making sure to pass test aggregate own
type as the first generic argument and the identity as the second.
public class TestAggregate : AggregateRoot<TestAggregate, TestId>
{
public TestAggregate(TestId id)
: base(id)
{
}
}
Events¶
In an event source system like EventFlow, aggregate root data are stored on events.
public class PingEvent : AggregateEvent<TestAggregate, TestId>
{
public string Data { get; }
public PingEvent(string data)
{
Data = data;
}
}
Please make sure to read the section on value objects and events for some important notes on creating events.
Emitting events¶
In order to emit an event from an aggregate, call the protected
Emit(...)
method which applies the event and adds it to the list of
uncommitted events.
public void Ping(string data)
{
// Fancy domain logic here that validates aggregate state...
if (string.IsNullOrEmpty(data))
{
throw DomainError.With("Ping data empty")
}
Emit(new PingEvent(data))
}
Remember not to make any changes to the aggregate with these methods, as the state is only stored through events.
Applying events¶
Currently EventFlow has three methods of applying events to the
aggregate when emitted or loaded from the event store. Which you choose
is up to you. Implementing IEmit<SomeEvent>
is the most convenient,
but will expose public Apply
methods.
- Create a method called
Apply
that takes the event as an argument. To get the method signature right, implement theIEmit<SomeEvent>
on your aggregate. This is the default fallback and you will get an exception if no other strategies are configured. Although you can implementIEmit<SomeEvent>
, it’s optional. TheApply
methods can beprotected
orprivate
- Create a state object by inheriting from
AggregateState<,,>
and registering using the protectedRegister(...)
in the aggregate root constructor - Register a specific handler for a event using the protected
Register<SomeEvent>(e => Handler(e))
from within the constructor - Register an event applier using
Register(IEventApplier eventApplier)
, which could be a e.g. state object
Commands¶
Commands are the basic value objects, or models, that represent the write operations that you can perform in your domain. As described in more detail below a command is the “thing” to be done. A command handler does the “thing”.
As an example, one might implement this command for updating user passwords.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class UserUpdatePasswordCommand : Command<UserAggregate, UserId>
{
public Password NewPassword { get; }
public Password OldPassword { get; }
public UserUpdatePasswordCommand(
UserId id,
Password newPassword,
Password oldPassword)
: base(id)
{
NewPassword = newPassword;
OldPassword = oldPassword;
}
}
|
Note that the Password
class is merely a value object created to
hold the password and do basic validation. Read the article regarding
value objects for more information. Also, you
don’t have to use the default EventFlow Command<,>
implementation,
you can create your own, it merely has to implement the ICommand<,>
interface.
A command by itself doesn’t do anything and will throw an exception if published. To make a command work, you need to implement one (and only one) command handler which is responsible for invoking the aggregate.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class UserUpdatePasswordCommandHandler :
CommandHandler<UserAggregate, UserId, IExecutionResult, UserUpdatePasswordCommand>
{
public override Task<IExecutionResult> ExecuteCommandAsync(
UserAggregate aggregate,
UserUpdatePasswordCommand command,
CancellationToken cancellationToken)
{
var executionResult = aggregate.UpdatePassword(
command.OldPassword,
command.NewPassword);
return Task.FromResult(executionResult);
}
}
|
Execution results¶
If the aggregate detects a domain error and wants to abort execution and return an error back, then execution results are used. EventFlow ships with a basic implementation that allows returning success or failed and optionally an error message as shown here.
ExecutionResult.Success();
ExecutionResult.Failed();
ExecutionResult.Failed("With some error");
However, you can create your own custom execution results to allow
aggregates to e.g. provide detailed validation results. Merely
implement the IExecutionResult
interface and use the type as
generic arguments on the command and command handler.
Note
While possible, do not use the execution results as a method of reading
values from the aggregate, that’s what the IQueryProcessor
and
read models are for.
Ensure idempotency¶
Detecting duplicate operations can be hard, especially if you have a distributed application, or simply a web application. Consider the following simplified scenario.
- The user wants to change his password
- The user fills in the “change password form”
- The user submits the form twice, either accidentally or impatiently
- The first web request completes and the password is changed. However, as the browser is waiting on the second web request, this result is ignored
- The second web request throws a domain error as the “old password” doesn’t match as the current password has already been changed
- The user is presented with a error on the web page
Handling this is simple, merely ensure that the aggregate is idempotent in regards to password changes. But instead of implementing this yourself, EventFlow has support for it that is simple to utilize and is done per command.
To use the functionality, merely ensure that commands that represent the
same operation have the same ISourceId
which implements IIdentity
like the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | blic class UserUpdatePasswordCommand : Command<UserAggregate, UserId>
public Password NewPassword { get; }
public Password OldPassword { get; }
public UserCreateCommand(
UserId id,
ISourceId sourceId,
Password newPassword,
Password oldPassword)
: base(id, sourceId)
{
NewPassword = newPassword;
OldPassword = oldPassword;
}
|
Note the use on line 11 of the protected
constructor of Command<,>
that takes a ISourceId
in addition to the aggregate root identity.
If a duplicate command is detected, a DuplicateOperationException
is
thrown. The application could then ignore the exception or report the
problem to the end user.
The default ISourceId
history size of the aggregate root, is ten.
But it can be configured using the SetSourceIdHistory(...)
method
in the aggregate root constructor.
Easier ISourceId calculation¶
Ensuring the correct calculation of the command ISourceId
can be
somewhat cumbersome, which is why EventFlow provides another base
command you can use, the DistinctCommand<,>
. By using the
DistinctCommand<,>
you merely have to implement the
GetSourceIdComponents()
and providing the IEnumerable<byte[]>
that makes the command unique. The bytes are used to create a
deterministic GUID to be used as an ISourceId
.
public class UserUpdatePasswordCommand :
DistinctCommand<UserAggregate, UserId>
{
public Password NewPassword { get; }
public Password OldPassword { get; }
public UserUpdatePasswordCommand(
UserId id,
Password newPassword,
Password oldPassword)
: base(id)
{
NewPassword = newPassword;
OldPassword = oldPassword;
}
protected override IEnumerable<byte[]> GetSourceIdComponents()
{
yield return NewPassword.GetBytes();
yield return OldPassword.GetBytes();
}
}
The GetBytes()
merely returns the Encoding.UTF8.GetBytes(...)
of
the password.
Caution
Don’t use the GetHashCode()
, as the implementation
can be different on 32 bit and 64 bit .NET (e.g. string
).
Subscribers¶
Whenever your application needs to perform an action when a specific event is emitted from your domain, you create a class that implements one of the following two interfaces:
ISubscribeSynchronousTo<TAggregate,TIdentity,TEvent>
: Executed synchronouslyISubscribeAsynchronousTo<TAggregate,TIdentity,TEvent>
: Executed asynchronously
Any subscribers that you implement need to be registered to this interface
using either AddSubscriber(...)
, AddSubscribers(...)
or
AddDefaults(...)
during initialization. If you have configured a
custom IoC container, you can register the implementations using it
instead.
Note
The synchronous and asynchronous here has nothing to do
with the .NET framework keywords async
, await
or the Task
Parallel Library. It refers to how the subscribers are executed. Read
below for details.
Synchronous subscribers¶
Synchronous subscribers in EventFlow are executed one at a time for each
emitted domain event in order. This e.g. guarantees that all subscribers
have been executed when the ICommandBus.PublishAsync(...)
returns.
The ISubscribeSynchronousTo<,,>
interface is shown here.
public interface ISubscribeSynchronousTo<TAggregate, in TIdentity, in TEvent>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
Task HandleAsync(
IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent,
CancellationToken cancellationToken);
}
Out of order events¶
As synchronous subscribers are by their very nature executed synchronously, emitting multiple events from an aggregate and letting subscribers publish new commands based on this can lead to some unexpected behavior as “innermost” subscribers will be executed before the next “outer” event is handled by the subscriber.
- Aggregate emits events
Event 1
andEvent 2
- Subscriber handles
Event 1
and publishes a command that results inEvent 3
being emitted - Subscriber handles
Event 3
(doesn’t affect the domain) - Subscriber handles
Event 2
In the above example the subscriber will handle the events in the
following order Event 1
, Event 3
and then Event 2
. While
this could occur in a distributed system or when executing subscribers on
different threads, it is a certainty when using synchronous subscribers.
Exceptions swallowed by default¶
By default any exceptions thrown by a subscriber are swallowed
by EventFlow after it has been logged as an error. Depending on the
application this might be the preferred behavior, but in some cases
it isn’t. If a subscriber exception should be thrown, and thus allowing
them to be caught in e.g. command handlers, the behaivor can be disabled
by setting the ThrowSubscriberExceptions
to true
as illustrated
here:
using (var resolver = EventFlowOptions.New
.Configure(c => c.ThrowSubscriberExceptions = true)
.CreateResolver())
{
...
}
Asynchronous subscribers¶
Asynchronous subscribers in EventFlow are executed using a scheduled job.
Important
Asynchronous subscribers are disabled by default and must be enabled using the following configuration.
eventFlowOptions.Configure(c => c.IsAsynchronousSubscribersEnabled = true);
Important
Since asynchronous subscribers are executed using a job, its important
to configure proper job scheduling. The EventFlow.Hangfire
NuGet
package integrates with the ‘HangFire Job Scheduler <https://www.hangfire.io>,
and provides a usable solution to this requirement.
The ISubscribeAsynchronousTo<,,>
is shown here and is, besides its
name, identical to its synchronous counterpart.
public interface ISubscribeAsynchronousTo<TAggregate, in TIdentity, in TEvent>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
Task HandleAsync(
IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent,
CancellationToken cancellationToken);
}
Note
Setting ThrowSubscriberExceptions = true
has no effect
on asynchronous subscribers.
Subscribe to every event¶
Instead of subscribing to every single domain, you can register an
implementation of ISubscribeSynchronousToAll
which is defined as
shown here.
public interface ISubscribeSynchronousToAll
{
Task HandleAsync(
IReadOnlyCollection<IDomainEvent> domainEvents,
CancellationToken cancellationToken);
}
Any registered implementations will be notified for every domain event emitted.
RabbitMQ¶
See RabbitMQ setup for details on how to get started using RabbitMQ.
After RabbitMQ has been configured, all domain events are published
to an exchange named eventflow
with routing keys in the following
format.
eventflow.domainevent.[Aggregate name].[Event name].[Event version]
Which will be the following for an event named CreateUser
version
1
for the MyUserAggregate
.
eventflow.domainevent.my-user.create-user.1
Note the lowercasing and adding of -
whenever there’s a capital
letter.
All the above is the default behavior. If you don’t like it, replace the
registered message factory service IRabbitMqMessageFactory
to
customize what routing key or exchange to use. Have a look at how
EventFlow has done its
implementation to get started.
Metadata¶
Metadata is all the “additional” information that resides with a emitted event, some of which is required information.
In EventFlow metadata is merely an IEnumerable
of
KeyValuePair<string,string>
for which each is a metadata entry.
Out of the box these metadata keys are added to each aggregate event.
event_name
andevent_version
- A name and version for the event which is used during event deserialization.timestamp
- ADateTimeOffset
for when the event was emitted from the aggregate.aggregate_sequence_number
- The version the aggregate was after the event was emitted, e.g.1
for the very first event emitted.
Custom metadata provider¶
If you require additional information to be stored along with each
event, then you can implement the IMetadataProvider
interface and
register the class using e.g. .AddMetadataProvider(...)
on
EventFlowOptions
.
Additional built-in providers¶
EventFlow ships with a collection of ready-to-use providers in some of its NuGet packages.
EventFlow¶
- AddEventTypeMetadataProvider
event_type_assembly_name
- Assembly name of the assembly containing the eventevent_type_assembly_version
- Assembly version of the assembly containing the eventevent_type_fullname
- Full name of the event corresponding toType.FullName
for the aggregate event type.- AddGuidMetadataProvider
guid
- A newGuid
for each event.- AddMachineNameMetadataProvider
environment_machinename
- Adds the machine name handling the event fromEnvironment.MachineName
EventFlow.Owin¶
- AddRequestHeadersMetadataProvider
request_header[HEADER]
- Adds all headers from the OWIN request as metadata, each as a separate entry for whichHEADER
is replaced with the name of the header. E.g. therequest_header[Connection]
might contain the valueKeep-Alive
.- AddUriMetadataProvider
request_uri
- OWIN request URI.request_method
- OWIN request method.- AddUserHostAddressMetadataProvider
user_host_address
- The provider tries to find the correct user host address by inspecting request headers, i.e., if you have a load balancer in front of your application, then the request IP is not the real user address, but the load balancer should send the user IP as a header.user_host_address_source_header
- The header from which the user host address was taken.remote_ip_address
- The remote IP address. Note that this might be the IP address of your load balancer.
Queries¶
Creating queries in EventFlow is simple.
First create a value object that contains the data required for the query. In this example we want to search for users based on their username.
public class GetUserByUsernameQuery : IQuery<User>
{
public string Username { get; }
public GetUserByUsernameQuery(string username)
{
Username = username;
}
}
Next create a query handler that implements how the query is processed.
public class GetUserByUsernameQueryHandler :
IQueryHandler<GetUserByUsernameQuery, User>
{
private IUserReadModelRepository _userReadModelRepository;
public GetUserByUsernameQueryHandler(
IUserReadModelRepository userReadModelRepository)
{
_userReadModelRepository = userReadModelRepository;
}
Task<User> ExecuteQueryAsync(
GetUserByUsernameQuery query,
CancellationToken cancellationToken)
{
return _userReadModelRepository.GetByUsernameAsync(
query.Username,
cancellationToken)
}
}
Last step is to register the query handler in EventFlow. Here we show the simple, but cumbersome version, you should use one of the overloads that scans an entire assembly.
...
EventFlowOptions.New
.AddQueryHandler<GetUserByUsernameQueryHandler, GetUserByUsernameQuery, User>()
...
Then in order to use the query in your application, you need a reference
to the IQueryProcessor
, which in our case is stored in the
_queryProcessor
field.
...
var user = await _queryProcessor.ProcessAsync(
new GetUserByUsernameQuery("root")
cancellationToken)
.ConfigureAwait(false);
...
Queries shipped with EventFlow¶
ReadModelByIdQuery<TReadModel>
: Supported by both the in-memory and MSSQL read model stores automatically as soon as you define the read model use using the EventFlow options for that storeInMemoryQuery<TReadModel>
: Takes aPredicate<TReadModel>
and returnsIEnumerable<TReadModel>
, making it possible to search all of your in-memory read models based on any predicate
Sagas¶
EventFlow provides a simple saga system to coordinate messages between bounded contexts and aggregates.
- Saga identity
- Saga
- Saga locator
- Zero or more aggregates
This example is based on the chapter “A Saga on Sagas” from the CQRS Journey by Microsoft, in which we want to model the process of placing an order.
- User sends command
PlaceOrder
to theOrderAggregate
OrderAggregate
emits anOrderCreated
eventOrderSaga
handlesOrderCreated
by sending aMakeReservation
command to theReservationAggregate
ReservationAggregate
emits aSeatsReserved
eventOrderSaga
handlesSeatsReserved
by sending aMakePayment
command to thePaymentAggregate
PaymentAggregate
emits aPaymentAccepted
eventOrderSaga
handlesPaymentAccepted
by emitting aOrderConfirmed
event with all the details, which via subscribers updates the user, theOrderAggregate
and theReservationAggregate
Next we need an ISagaLocator
which basically maps domain events to a
saga identity allowing EventFlow to find it in its store.
In our case we will add the order ID to event metadata of all events related to a specific order.
public class OrderSagaLocator : ISagaLocator
{
public Task<ISagaId> LocateSagaAsync(
IDomainEvent domainEvent,
CancellationToken cancellationToken)
{
var orderId = domainEvent.Metadata["order-id"];
var orderSagaId = new OrderSagaId($"ordersaga-{orderId}");
return Task.FromResult<ISagaId>(orderSagaId);
}
}
Alternatively the order identity could be added to every domain event
emitted from the OrderAggregate
, ReservationAggregate
and
PaymentAggregate
aggregates that the OrderSaga
subscribes to,
but this would depend on whether or not the order identity is part of
the ubiquitous language for your domain.
public class OrderSaga
: AggregateSaga<OrderSaga, OrderSagaId, OrderSagaLocator>,
ISagaIsStartedBy<OrderAggregate, OrderId, OrderCreated>
{
public Task HandleAsync(
IDomainEvent<OrderAggregate, OrderId, OrderCreated> domainEvent,
ISagaContext sagaContext,
CancellationToken cancellationToken)
{
// Update saga state with useful details.
Emit(new OrderStarted(/*...*/));
// Make the reservation
Publish(new MakeReservation(/*...*/));
return Task.FromResult(0);
}
public void Apply(OrderStarted e)
{
// Update our aggregate state with relevant order details
}
}
Important
Even though the method for publishing commands is named
Publish
, the commands are only published to the command bus
after the aggregate has been successfully committed to the event
store (just like events). If an unexpected exception is thrown by this
command publish, it should be handled by a custom implementation of
ISagaErrorHandler
.
The next few events and commands are omitted in this example, but at last the
PaymentAggregate
emits its PaymentAccepted
event and the saga
completes and emits the final OrderConfirmed
event.
public class OrderSaga
: AggregateSaga<OrderSaga, OrderSagaId, OrderSagaLocator>,
...
ISagaHandles<PaymentAggregate, PaymentId, PaymentAccepted>
{
...
public Task HandleAsync(
IDomainEvent<PaymentAggregate, PaymentId, PaymentAccepted> domainEvent,
ISagaContext sagaContext,
CancellationToken cancellationToken)
{
Emit(new OrderConfirmed(/*...*/))
}
public void Apply(OrderConfirmed e)
{
// As this is the last event, we complete the saga by calling Complete()
Complete();
}
}
Note
An AggregateSaga<,,>
is only considered in its running
state if there has been an event and it hasn’t been marked as completed
(by invoking the protected
Complete()
method on the
AggregateSaga<,,>
).
Alternative saga store¶
By default EventFlow is configured to use event sourcing and aggregate
roots for storage of sagas. However, you can implement your own storage
system by implementing ISagaStore
and registering it.
Jobs¶
A job is basically a task that you want to execute outside of the current context, on another server or at a later time. EventFlow provides basic functionality for jobs.
There are areas where you might find jobs very useful, here are some examples
- Publish a command at a specific time in the future
- Transient error handling
var jobScheduler = resolver.Resolve<IJobScheduler>();
var job = PublishCommandJob.Create(new SendEmailCommand(id), resolver);
await jobScheduler.ScheduleAsync(
job,
TimeSpan.FromDays(7),
CancellationToken.None)
.ConfigureAwait(false);
In the above example the SendEmailCommand
command will be published
in seven days.
Important
When working with jobs, you should be aware of the following
- The default implementation does executes the job now (completly ignoring runAt/delay parameters) and in the
current context. To get support for scheduled jobs, inject another implementation of IJobScheduler,
e.g. by installing
EventFlow.Hangfire
(Read below for details). - Your jobs should serialize to JSON properly, see the section on value objects for more information
- If you use the provided
PublishCommandJob
, make sure that your commands serialize properly as well
Create your own jobs¶
To create your own jobs, your job merely needs to implement the IJob
interface and be registered in EventFlow.
Here’s an example of a job implementing IJob
[JobVersion("LogMessage", 1)]
public class LogMessageJob : IJob
{
public LogMessageJob(string message)
{
Message = message;
}
public string Message { get; }
public Task ExecuteAsync(
IResolver resolver,
CancellationToken cancellationToken)
{
var log = resolver.Resolve<ILog>();
log.Debug(Message);
}
}
Note that the JobVersion
attribute specifies the job name and
version to EventFlow and this is how EventFlow distinguishes between the
different job types. This makes it possible for you to reorder your
code, even rename the job type. As long as you keep the same attribute
values it is considered the same job in EventFlow. If the attribute is
omitted, the name will be the type name and version will be 1
.
Here’s how the job is registered in EventFlow.
var resolver = EventFlowOptions.new
.AddJobs(typeof(LogMessageJob))
...
.CreateResolver();
Then to schedule the job
var jobScheduler = resolver.Resolve<IJobScheduler>();
var job = new LogMessageJob("Great log message");
await jobScheduler.ScheduleAsync(
job,
TimeSpan.FromDays(7),
CancellationToken.None)
.ConfigureAwait(false);
Hangfire¶
To use Hangfire as the job scheduler, install
the NuGet package EventFlow.Hangfire
and configure EventFlow to use
the scheduler like this.
var resolver = EventFlowOptions.new
.UseHangfireJobScheduler() // This line
...
.CreateResolver();
Note
The UseHangfireJobScheduler()
doesn’t do any Hangfire
configuration, but merely registers the proper scheduler in EventFlow.
Event upgrade¶
At some point you might find the need to replace an event with zero or more events. Some use cases might be
- A previous application version introduced a domain error in the form of a wrong event being emitted from the aggregate
- Domain has changed, either from a change in requirements or simply from a better understanding of the domain
EventFlow event upgraders are invoked whenever the event stream is loaded from the event store. Each event upgrader receives the entire event stream one event at a time.
A new instance of an event upgrader is created each time an aggregate is loaded. This enables you to store information from previous events on the upgrader instance to be used later, e.g. to determine an action to take on a event or to provide additional information for a new event.
Note that the ordering of event upgraders is important as you might implement two upgraders, one to upgrade an event from V1 to V2 and then another upgrading V2 to V3. EventFlow orders the event upgraders by name before starting the event upgrade.
Caution
Be careful when working with event upgraders that return zero or more than one event, as this has an influence on the aggregate version and you need to make sure that the aggregate sequence number on upgraded events are valid in regard to the aggregate history.
Example - removing a damaged event¶
To remove an event, simply check and only return the event if its not the event you want to remove.
public class DamagedEventRemover : IEventUpgrader<MyAggregate, MyId>
{
public IEnumerable<IDomainEvent<TestAggregate, TestId>> Upgrade(
IDomainEvent<TestAggregate, TestId> domainEvent)
{
var damagedEvent = domainEvent as IDomainEvent<MyAggregate, MyId, DamagedEvent>;
if (damagedEvent == null)
{
yield return domainEvent;
}
}
}
Example - replace event¶
To upgrade one event to another, you should use the
IDomainEventFactory.Upgrade
to help migrate metadata and create the
new event.
public class UpgradeMyEventV1ToMyEventV2 : IEventUpgrader<MyAggregate, MyId>
{
private readonly IDomainEventFactory _domainEventFactory;
public UpgradeTestEventV1ToTestEventV2(IDomainEventFactory domainEventFactory)
{
_domainEventFactory = domainEventFactory;
}
public IEnumerable<IDomainEvent<TestAggregate, TestId>> Upgrade(
IDomainEvent<TestAggregate, TestId> domainEvent)
{
var myEventV1 = domainEvent as IDomainEvent<MyAggregate, MyId, MyEventV1>;
yield return myEventV1 == null
? domainEvent
: _domainEventFactory.Upgrade<MyAggregate, MyId>(
domainEvent, new MyEventV2());
}
}
Event stores¶
By default EventFlow uses an in-memory event store. But EventFlow provides support for alternatives.
- In-memory (for test)
- Microsoft SQL Server
- Mongo DB
- Files (for test)
In-memory¶
Important
In-memory event store shouldn’t be used for production environments, only for tests.
Using the in-memory event store is easy as it’s enabled by default, no need to do anything.
MSSQL event store¶
See MSSQL setup for details on how to get started using Microsoft SQL Server in EventFlow.
To configure EventFlow to use MSSQL as the event store, simply add the
UseMssqlEventStore()
as shown here.
IRootResolver rootResolver = EventFlowOptions.New
...
.UseMssqlEventStore()
...
.CreateResolver();
Create and migrate required MSSQL databases¶
Before you can use the MSSQL event store, the required database and tables must be created. The database specified in your MSSQL connection will not be automatically created, you have to do this yourself.
To make EventFlow create the required tables, execute the following code.
var msSqlDatabaseMigrator = rootResolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowEventStoresMsSql.MigrateDatabase(msSqlDatabaseMigrator);
You should do this either on application start or preferably upon application install or update, e.g., when the web site is installed.
Important
If you utilize user permission in your application, then you
need to grant the event writer access to the user defined table type
eventdatamodel_list_type
. EventFlow uses this type to pass entire
batches of events to the database.
PostgreSql event store¶
Basically all above on MS SQL server store applicable to PostgreSql. See MSSQL setup for setup documentation.
Mongo DB¶
See Mongo DB setup for details on how to get started using Mongo DB in EventFlow.
To configure EventFlow to use Mongo DB as the event store, simply add the UseMongoDbEventStore()
as shown here.
IRootResolver rootResolver = EventFlowOptions.New
...
.UseMongoDbEventStore()
...
.CreateResolver();
Files¶
Important
The Files event store shouldn’t be used for production environments, only for tests.
The file based event store is useful if you have a set of events that represents a certain scenario and would like to create a test that verifies that the domain handles it correctly.
To use the file based event store, simply invoke .UseFilesEventStore`("...")
with the path containing the files.
var storePath = @"c:\eventstore"
var rootResolver = EventFlowOptions.New
...
.UseFilesEventStore(FilesEventStoreConfiguration.Create(storePath))
...
.CreateResolver();
Read model stores¶
In order to create query handlers that perform and enable them search across multiple fields, read models or projections are used.
To get started you can use the built-in in-memory read model store, but EventFlow supports a few others as well.
Creating read models¶
Read models are a flattened view of a subset or all aggregate domain events created specifically for efficient queries.
Here’s a simple example of how a read model for doing searches for
usernames could look. The read model handles the UserCreated
domain
event to get the username and user ID.
public class UserReadModel : IReadModel,
IAmReadModelFor<UserAggregate, UserId, UserCreated>
{
public string UserId { get; set; }
public string Username { get; set; }
public void Apply(
IReadModelContext context,
IDomainEvent<UserAggregate, UserId, UserCreated> domainEvent)
{
UserId = domainEvent.AggregateIdentity.Value;
Username = domainEvent.AggregateEvent.Username.Value;
}
}
The read model applies all UserCreated
events and thereby merely saves
the latest value instead of the entire history, which makes it much easier to
store in an efficient manner.
Read model locators¶
Typically the ID of read models are the aggregate identity, but sometimes this isn’t the case. Here are some examples.
- Items from a collection on the aggregate root
- Deterministic ID created from event data
- Entity within the aggregate
To create read models in these cases, use the EventFlow concept of read model locators, which is basically a mapping from a domain event to a read model ID.
As an example, consider if we could add several nicknames to a user. We
might have a domain event called UserNicknameAdded
similar to this.
public class UserNicknameAdded : AggregateEvent<UserAggregate, UserId>
{
public Nickname Nickname { get; set; }
}
We could then create a read model locator that would return the ID for each nickname we add via the event like this.
public class UserNicknameReadModelLocator : IReadModelLocator
{
public IEnumerable<string> GetReadModelIds(IDomainEvent domainEvent)
{
var userNicknameAdded = domainEvent as
IDomainEvent<UserAggregate, UserId, UserNicknameAdded>;
if (userNicknameAdded == null)
{
yield break;
}
yield return userNicknameAdded.Nickname.Id;
}
}
And then use a read model similar to this that represents each nickname.
public class UserNicknameReadModel : IReadModel,
IAmReadModelFor<UserAggregate, UserId, UserNicknameAdded>
{
public string UserId { get; set; }
public string Nickname { get; set; }
public void Apply(
IReadModelContext context,
IDomainEvent<UserAggregate, UserId, UserNicknameAdded> domainEvent)
{
UserId = domainEvent.AggregateIdentity.Value;
Nickname = domainEvent.AggregateEvent.Nickname.Value;
}
}
You may need to assign the id of your readmodel from a batch of nicknames assigned on the creation event of the username. You would then read the assigned readmodel id acquired from the locator using the ‘context’ field:
public class UserNicknameReadModel : IReadModel,
IAmReadModelFor<UserAggregate, UserId, UserCreatedEvent>
{
public string Id { get; set; }
public string UserId { get; set; }
public string Nickname { get; set; }
public void Apply(
IReadModelContext context,
IDomainEvent<UserAggregate, UserId, UserCreatedEvent> domainEvent)
{
var id = context.ReadModelId;
UserId = domainEvent.AggregateIdentity.Value;
var nickname = domainEvent.AggregateEvent.Nicknames.Single(n => n.Id == id);
Id = nickname.Id;
Nickname = nickname.Nickname;
}
}
We could then use this nickname read model to query all the nicknames
for a given user by search for read models that have a specific
UserId
.
Read store implementations¶
EventFlow has built-in support for several different read model stores.
In-memory¶
The in-memory read store is easy to use and easy to configure. All read models are stored in-memory, so if EventFlow is restarted all read models are lost.
To configure the in-memory read model store, simply call
UseInMemoryReadStoreFor<>
or UseInMemoryReadStoreFor<,>
with
your read model as the generic argument.
var resolver = EventFlowOptions.New
...
.UseInMemoryReadStoreFor<UserReadModel>()
.UseInMemoryReadStoreFor<UserNicknameReadModel,UserNicknameReadModelLocator>()
...
.CreateResolver();
Microsoft SQL Server¶
To configure the MSSQL read model store, simply call
UseMssqlReadModel<>
or UseMssqlReadModel<,>
with your read model
as the generic argument.
var resolver = EventFlowOptions.New
...
.UseMssqlReadModel<UserReadModel>()
.UseMssqlReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
...
.CreateResolver();
By convention, EventFlow uses the table named ReadModel-[CLASS NAME]
as the table to store the read model rows in. If you need to change
this, use the Table
from the
System.ComponentModel.DataAnnotations.Schema
namespace. So in the
above example, the read model UserReadModel
would be stored in a
table called ReadModel-UserReadModel
unless stated otherwise.
To allow EventFlow to find the read models stored, a single column is
required to have the MsSqlReadModelIdentityColumn
attribute. This
will be used to store the read model ID.
You should also create an int
column that has the
MsSqlReadModelVersionColumn
attribute to tell EventFlow which column
the read model version is stored in.
Important
EventFlow expects the read model to exist, and thus any maintenance of the database schema for the read models must be handled before EventFlow is initialized. Or, at least before the read models are used in EventFlow.
Elasticsearch¶
To configure the
Elasticsearch read
model store, simply call UseElasticsearchReadModel<>
or
UseElasticsearchReadModel<,>
with your read model as the generic
argument.
var resolver = EventFlowOptions.New
...
.ConfigureElasticsearch(new Uri("http://localhost:9200/"))
...
.UseElasticsearchReadModel<UserReadModel>()
.UseElasticsearchReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
...
.CreateResolver();
Overloads of ConfigureElasticsearch(...)
are available for
alternative Elasticsearch configurations.
Important
Make sure to create any mapping the read model requires in Elasticsearch before using the read model in EventFlow.
If EventFlow receives a request to purge a specific read model, it does it by deleting the index. This means that a separate index should be created for each read model.
If you want to control the index a specific read model is stored in,
create an implementation of IReadModelDescriptionProvider
and
register it in the EventFlow IoC.
Mongo DB¶
To configure the Mongo DB read model store, call UseMongoDbReadModel<>
or
UseMongoDbReadModel<,>
with your read model as the generic
argument.
var resolver = EventFlowOptions.New
...
.UseMongoDbReadModel<UserReadModel>()
.UseMongoDbReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
...
.CreateResolver();
Microsoft SQL Server¶
To setup EventFlow Microsoft SQL Server integration, install the NuGet
package EventFlow.MsSql
and add this to your EventFlow setup.
IRootResolver rootResolver = EventFlowOptions.New
.ConfigureMsSql(MsSqlConfiguration.New
.SetConnectionString(@"Server=.\SQLEXPRESS;Database=MyApp;User Id=sa;Password=???"))
...
.CreateResolver();
After setting up Microsoft SQL Server support in EventFlow, you can continue to configure it.
PostgreSql¶
To setup EventFlow PostgreSql integration, install the NuGet
package [EventFlow.PostgreSql](https://www.nuget.org/packages/EventFlow.PostgreSql)
and add this to your EventFlow setup.
IRootResolver rootResolver = EventFlowOptions.New
.ConfigurePostgreSql(PostgreSqlConfiguration.New
.SetConnectionString(@"User ID=me;Password=???;Host=localhost;Port=5432;Database=MyApp"))
.UsePostgreSqlEventStore()
.UsePostgreSqlSnapshotStore()
.UsePostgreSqlReadModel<UserReadModel>()
.UsePostgreSqlReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
.
...
.CreateResolver();
This code block configures Eventflow to store events, snapshots and read models in PostgreSql. It’s not mandatory, you can mix and match, i.e. storing events in PostgreSql, read models in Elastic search and don’t using snapshots at all.
- Event store. One big table EventFlow for all events for all aggredates.
- Read model store. Table ReadModel-[ClassName] per read model type.
- Snapshot store. One big table EventFlowSnapshots for all aggredates.
RabbitMQ¶
To setup EventFlow’s RabbitMQ integration, install the NuGet package
EventFlow.RabbitMQ
and add this to your EventFlow setup.
var uri = new Uri("amqp://localhost");
var resolver = EventFlowOptions.with
.PublishToRabbitMq(RabbitMqConfiguration.With(uri))
...
.CreateResolver();
After setting up RabbitMQ support in EventFlow, you can continue to configure it.
Configuration¶
EventFlow can be configured by invoking eventFlowOptions.Configure(c => ...)`
, or
by providing a custom implementation of IEventFlowConfiguration
.
Each configuration is using XML documentation. The default values should be good enough for most production setups.
IoC container¶
EventFlow has a custom minimal IoC container implementation, but before using EventFlow in a production environment, its recommended to change to Autofac or provide another.
Autofac¶
EventFlow provides the NuGet package EventFlow.Autofac
that allows
you to set the internal ContainerBuilder
used during EventFlow
initialization.
Pass the ContainerBuilder
to EventFlow and call
CreateContainer()
when configuration is done to create the
container.
var containerBuilder = new ContainerBuilder();
var container = EventFlowOptions.New
.UseAutofacContainerBuilder(containerBuilder) // Must be the first line!
...
.CreateContainer();
Log¶
The default log implementation of EventFLow logs to the console. To have another
behavior, register an implementation of ILog
, use the Log
as a base class
to make the implementation easier.
Snapshots¶
When working with long-lived aggregates, performance when loading aggregates, and thereby making changes to them, becomes a real concern. Consider aggregates that are comprised of several thousands of events, some of which needs to go through a rigorous update process before they are applied to the aggregates.
EventFlow supports aggregate snapshots, which is basically a capture of the entire aggregate state every few events. So instead of loading the entire aggregate event history, the latest snapshot is loaded, then applied to the aggregate and then the remaining events that were not captured in the snapshot.
To configure an aggregate root to support snapshots, inherit from
SnapshotAggregateRoot<,,>
and define a serializable snapshot type
that is marked with the ISnapshot
interface.
[SnapshotVersion("user", 1)]
public class UserSnapshot : ISnapshot
{
...
}
public class UserAggregate :
SnapshotAggregateRoot<UserAggregate, UserId, UserSnapshot>
{
protected override Task<UserSnapshot> CreateSnapshotAsync(
CancellationToken cancellationToken)
{
// Create a UserSnapshot based on the current aggregate state
...
}
protected override Task LoadSnapshotAsync(
UserSnapshot snapshot,
ISnapshotMetadata metadata,
CancellationToken cancellationToken)
{
// Load the UserSnapshot into the current aggregate
...
}
}
When using aggregate snapshots there are several important details to remember
- Aggregates must not make any assumptions regarding the existence of snapshots
- Aggregates must not assume that snapshots are created with increasing aggregate sequence numbers
- Snapshots must be created in such a way, that they represent the entire history up to the point of snapshot creation
Snapshot strategy¶
When implementing an aggregate root that inherits from
SnapshotAggregateRoot<,,>
, you need to pass the base class an
implementation of ISnapshotStrategy
. The strategy is used to
determine when a snapshot should be created, e.g. every 100 events.
EventFlow ships with two that should be enough for most purposes as they can be configured.
SnapshotEveryFewVersionsStrategy:
Snapshots are created after a predefined number of events, the default is100
, but another frequency can be specifiedSnapshotRandomlyStrategy:
Snapshots are created randomly with a predefined chance, the default is1%
, but another can be specified
Upgrading snapshots¶
As an application grows over time, the data required to be stored within a snapshot will change. Either because some become obsolete or merely because a better way of storing the aggregate state is found. If this happens, the snapshots persisted in the snapshot store could potentially become useless as aggregates are unable to apply them. The easy solution would be to make change-by-addition and make sure that the old snapshots can be deserialized into the new version.
EventFlow provides an alternative solution, which is basically allowing developers to upgrade snapshots similar to how events are upgraded.
Lets say we have an application that has developed three snapshot versions over time.
[SnapshotVersion("user", 1)]
public class UserSnapshotV1 : ISnapshot
{
...
}
[SnapshotVersion("user", 2)]
public class UserSnapshotV2 : ISnapshot
{
...
}
[SnapshotVersion("user", 3)]
public class UserSnapshot : ISnapshot
{
...
}
Note how version three of the UserAggregate
snapshot is called
UserSnapshot
and not UserSnapshotV3
, its basically to help
developers tell which snapshot version is the current one.
Remember to add the [SnapshotVersion]
attribute as it enables
control of the snapshot definition name. If left out, EventFlow will
make a guess, which will be tied to the name of the class type.
The next step will be to implement upgraders, or mappers, that can upgrade one snapshot to another.
public class UserSnapshotV1ToV2Upgrader :
ISnapshotUpgrader<UserSnapshotV1, UserSnapshotV2>
{
public Task<UserSnapshotV2> UpgradeAsync(
UserSnapshotV1 userSnapshotV1,
CancellationToken cancellationToken)
{
// Map from V1 to V2 and return
}
}
public class UserSnapshotV2ToV3Upgrader :
ISnapshotUpgrader<UserSnapshotV2, UserSnapshot>
{
public Task<UserSnapshot> UpgradeAsync(
UserSnapshotV2 userSnapshotV2,
CancellationToken cancellationToken)
{
// Map from V2 to V3 and return
}
}
The snapshot types and upgraders then only needs to be registered in EventFlow.
var resolver = EventFlowOptions.New
...
.AddSnapshotUpgraders(myAssembly)
.AddSnapshots(myAssembly)
...
.CreateResolver();
Now, whenever a snapshot is loaded from the snapshot store, it is automatically upgraded to the latest version and the aggregate only needs to concern itself with the latest version.
Snapshot store implementations¶
EventFlow has built-in support for some snapshot stores (more will be implemented).
Null (or none)¶
The default implementation used by EventFlow does absolutely nothing besides logging a warning if used. It exists only to help developers to select a proper snapshot store. Making in-memory the default implementation could present problems if snapshots were configured, but the snapshot store configuration forgotten.
In-memory¶
For testing, or small applications, the in-memory snapshot store is
configured by merely calling UseInMemorySnapshotStore()
.
var resolver = EventFlowOptions.New
...
.UseInMemorySnapshotStore()
...
.CreateResolver();
Microsoft SQL Server¶
To use the MSSQL snapshot store you need to install the NuGet package
EventFlow.MsSql
.
Configuration¶
Configure the MSSQL connection and snapshot store as shown here.
var rootResolver = EventFlowOptions.New
...
.ConfigureMsSql(MsSqlConfiguration.New
.SetConnectionString(@"Server=.\SQLEXPRESS;Database=MyApp;User Id=sa;Password=???"))
.UseMsSqlSnapshotStore()
...
.CreateResolver();
Note that if you already use MSSQL for event- or read model store, you
only need to invoke the ConfigureMsSql
extension once.
Create and migrate required MSSQL databases¶
Before you can use the MSSQL snapshot store, the required database and tables must be created. The database specified in your MSSQL connection will not be automatically created, you have to do this yourself.
To make EventFlow create the required tables, execute the following code.
var msSqlDatabaseMigrator = rootResolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowSnapshotStoresMsSql.MigrateDatabase(msSqlDatabaseMigrator);
You should do this either on application start or preferably upon application install or update, e.g., when the web site is installed.
Custom¶
If none of the above stores are adequate, a custom implementation is
possible by implementing the interface ISnapshotPersistence
.
However, there are some rules that the snapshot persistence store must
follow.
- Its valid to store snapshots in any order, e.g. first version 3 then 2
- Its valid to overwrite existing snapshots version, e.g. storing version 3 then version 3 again
- Fallback to old snapshots is allowed
Customize¶
If you are looking for how to configure EventFlow, look at the configuration documentation.
Whenever EventFlow doesn’t meet your needs, e.g. if you want to collect statistics on each command execution time, you can customize EventFlow.
Basically EventFlow relies on an IoC container to allow developers to customize the different parts of EventFlow.
Note: Read the section “Changing IoC container” for details on how to change the IoC container used if you have specific needs like e.g. integrating EventFlow into an Owin application.
You have two options for when you want to customize EventFlow
- Decorate an implementation
- Replace an implementation
Decorating implementations¶
In the case of collecting statistics, you might want to wrap the
existing ICommandBus
with a decorator class that can collect
statistics on command execution times.
void ConfigureEventFlow()
{
var resolver = EventFlowOptions.new
.RegisterServices(DecorateCommandBus)
...
.CreateResolver();
}
void DecorateCommandBus(IServiceRegistration sr)
{
sr.Decorate<ICommandBus>((r, cb) => new StatsCommandBus(cb));
}
class StatsCommandBus : ICommandBus
{
private readonly ICommandBus _internalCommandBus;
public StatsCommandBus(ICommandBus commandBus)
{
_internalCommandBus = commandBus;
}
// Here follow implementations of ICommandBus that call the
// internal command bus and logs statistics
...
}
Registering new implementations¶
The more drastic step is to completely replace an implementation. For
this you use the Register(...)
and related methods on
IServiceRegistration
instead of the Decorate(...)
method.
Event serialization and value objects¶
One of the important parts of creating an event sourced application is to ensure that you can always read your event streams. It seems simple enough, but it is a problem, especially for large applications that undergo refactoring or domain changes.
The basic idea is to store events in a structure that’s easy to access and migrate if the need should arise. EventFlow, like many other event sourced systems, stores its events using JSON.
Making pretty and clean JSON¶
You might wonder “but, why?”, and the reason is somewhat similar to the reasoning behind semantic URLs.
Consider the following value object used to validate and contain usernames in an application.
public class Username
{
public string Value { get; }
public Username(string value)
{
if (string.IsNullOrEmpty(value) || value.Length <= 4)
{
throw DomainError.With($"Invalid username '{value}'");
}
Value = value;
}
}
First we do some cleanup and re-write it using EventFlows
SingleValueObject<>
.
public class Username : SingleValueObject<string>
{
public Username(string value) : base(value)
{
if (string.IsNullOrEmpty(value) || value.Length <= 4)
{
throw DomainError.With($"Invalid username '{value}'");
}
}
}
Now it looks simple and we might think we can use this value object directly in our domain events. We could, but the resulting JSON will look like this.
{
"Username" : {
"Value": "my-awesome-username",
}
}
This doesn’t look very good. First, that extra property doesn’t make it easier to read and it takes up more space when serializing and transmitting the event.
In addition, if you use the value object in a web API, people using the API will need to wrap the properties in their DTOs in a similat way. What we would like is to modify our serialized event to look like this instead and still use the value object in our events.
{
"Username" : "my-awesome-username"
}
To do this, we use the custom JSON serializer EventFlow has for single
value objects called SingleValueObjectConverter
on our Username
class like this.
[JsonConverter(typeof(SingleValueObjectConverter))] // Only this line added
public class Username : SingleValueObject<string>
{
public Username(string value) : base(value)
{
if (string.IsNullOrEmpty(value) || value.Length <= 4)
{
throw DomainError.With($"Invalid username '{value}'");
}
}
}
The JSON converter understands the single value object and will serialize and deserialize it correctly.
Using this converter also enables to you replace e.g. raw string
and
int
properties with value objects on existing events as they will be
“JSON compatible”.
Note
Consider applying this to any classes that inherit from Identity<>
.
Do’s and don’ts¶
Whenever creating an application that uses CQRS+ES there are several things you need to keep in mind to make it easier and minimize the potential bugs. This guide will give you some details on typical problems and how EventFlow can help you minimize the risk.
Business rules¶
Specifications¶
Consider moving complex business rules to specifications. This eases both readability, testability and re-use.
Events¶
Produce clean JSON¶
Make sure that when your aggregate events are JSON serialized, they produce clean JSON as it makes it easier to work with and enables easier deserialization of events in the future.
- No type information
- No hints of value objects (see value objects)
Here’s an example of good clean event JSON produced from a create user event.
{
"Username": "root",
"PasswordHash": "1234567890ABCDEF",
"EMail": "root@example.org",
}
Keep old event types¶
Keep in mind that you need to keep the event types in your code for as long as these events are in the event source, which in most cases is forever as storage is cheap and information, i.e., your domain events, are expensive.
However, you should still clean your code. Have a look at how you can upgrade and version your events for details on how EventFlow supports you in this.
Subscribers and out of order events¶
Be very careful if aggregates emit multiple events for a single command, subscribers will almost certainly receive these out of order.
Specifications¶
EventFlow ships with an implementation of the specification pattern which could be used to e.g. make complex business rules easier to read and test.
To use the specification implementation shipped with EventFlow, simply create a
class that inherits from Specification<T>
.
public class BelowFiveSpecification : Specification<int>
{
protected override IEnumerable<string> IsNotSatisfiedBecause(int i)
{
if (5 <= i)
{
yield return string.Format("{0} is not below five", i);
}
}
}
Note that instead of simply returning a bool
to indicate whether or not the
specification is satisfied, this implementation requires a reason (or reasons)
why the specification is not satisfied.
The ISpecification<T>
interface has two methods defined, the traditional
IsSatisfiedBy
as well as WhyIsNotSatisfiedBy
, which returns an
empty enumerable if the specification was indeed satisfied.
public interface ISpecification<in T>
{
bool IsSatisfiedBy(T obj);
IEnumerable<string> WhyIsNotSatisfiedBy(T obj);
}
Specifications really become powerful when they are combined. EventFlow also
ships with a series of extension methods for the ISpecification<T>
interface
that allows easy combination of implemented specifications.
// Throws a `DomainError` exception if obj doesn't satisfy the specification
spec.ThrowDomainErrorIfNotStatisfied(obj);
// Builds a new specification that requires all input specifications to be
// satified
var allSpec = specEnumerable.All();
// Builds a new specification that requires a predefined amount of the
// input specifications to be satisfied
var atLeastSpec = specEnumerable.AtLeast(4);
// Builds a new specification that requires the two input specifications
// to be satisfied
var andSpec = spec1.And(spec2);
// Builds a new specification that requires one of the two input
// specifications to be satisfied
var orSpec = spec1.Or(spec2);
// Builds a new specification that requires the input specification
// not to be satisfied
var notSpec = spec.Not();
If you need a simple expression to combine with other more complex specifications
you can use the bundled ExpressionSpecification<T>
, which is a specification
wrapper for an expression.
var spec = new ExpressionSpecification<int>(i => 1 < i && i < 3);
// 'str' will contain the value "i => ((1 < i) && (i < 3))"
var str = spec.ToString();
If the specification isn’t satisfied, a string representation of the expression is returned.
FAQ - frequently asked questions¶
How can I ensure that only specific users can execute commands?¶
You should implement a decorator for the ICommandBus
that does the
authentication. Have a look at the decorator documentation
to see how this can be achieved.
Why isn’t there a “global sequence number” on domain events?¶
While this is easy to support in some event stores like MSSQL, it doesn’t really make sense from a domain perspective. Greg Young also has this to say on the subject:
Order is only assured per a handler within an aggregate root boundary. There is no assurance of order between handlers or between aggregates. Trying to provide those things leads to the dark side. > Greg Young
Why doesn’t EventFlow have a unit of work concept?¶
Short answer, you shouldn’t need it. But Mike has a way better answer:
In the Domain, everything flows in one direction: forward. When something bad happens, a correction is applied. The Domain doesn’t care about the database and UoW is very coupled to the db. In my opinion, it’s a pattern which is usable only with data access objects, and in probably 99% of the cases you won’t be needing it. As with the Singleton, there are better ways but everything depends on proper domain design. > Mike Mogosanu
If your case falls within the 1% case, write a decorator for the
ICommandBus
that starts a transaction, use MSSQL as event store and
make sure your read models are stored in MSSQL as well.
Why are subscribers receiving events out of order?¶
It might be that your aggregates are emitting multiple events. Read about subscribers and out of order events.