In this article, I'm going to examine the usage of Event Sourcing and the Command Query Responsibility Segregation (CQRS) architectural style through a sample telehealth medicine application. For tooling, I'm going to use the Marten library (https://martendb.io), which provides robust support for Event Sourcing on top of the Postgresql database engine.
Before I move on to Event Sourcing though, let's think about the typical role of a database within your systems that don't use Event Sourcing. For most of my early career as a software professional, I assumed that system state would be persisted in a relational database that would act as the source of truth for the system. Very frequently, I've visualized systems with something like the simple layered view shown in Figure 1.
With this typical architecture, almost all input and output of the system will involve reading or writing to this one database. Different use cases will have different needs, so at various times I might need to write explicit code to:
- Map the middle tier model to the database tables
- Map incoming input from outside the system to the database tables
- Translate the internal database to different structures for outgoing data in Web services
The point I'm trying to make here is that the single database model can easily bring with it a fair amount of mechanical effort in translating the one database structure to the specific needs of various system input or output - and I'll need to weigh this effort when I compare the one database model to the Event Sourcing and CQRS approach I'll examine later.
I've also long known that in the industry, no one database structure can be optimized for both reading and writing, so I might very well try to support a separate, denormalized database specifically for reporting. That reporting database will need to be synchronized somehow from the main transactional database. This is just to say that the idea of having essentially the same information in multiple databases within the software architectures is not exactly new.
Alternative approaches using Event Sourcing or CQRS can look scary or intimidating upon your first exposure. Myself, I walked out after a very early software conference presentation in 2008 on what later became known as CQRS shaking my head and thinking that it was completely crazy and would never catch on, yet here I am, recommending this pattern for some systems.
Telehealth System Example
Before diving into the nomenclature or concepts around Event Sourcing or CQRS architectures, I want to consider a sample problem domain. Hastened by the COVID pandemic, “telehealth” approaches to health care, where you can speak to a health care provider online without having to make an in-office visit, rapidly became widespread. Imagine that I'm tasked with building a new website application that allows potential patients to request an appointment, connect with a provider (physician, nurse, nurse practitioner, etc.), and host the online appointments.
The system will need to provide functionality to coordinate the on-call providers. In this case, I'm going to attempt to schedule the appointments as soon as a suitable provider is available, so I'll need to be able to estimate expected wait times. I do care about patient happiness and want the providers working with the system and to have a good experience with the system, so it's going to be important to be able to collect a lot of metrics to help adjust staffing. Moreover, I need to plan for problems during a normal business day and give the administrative users the ability to understand what transpired during the day that might have caused patient wait times to escalate.
Moreover, because this is related to health care, I should plan on having some pretty stringent requirements for auditing all activity within the system through a formal audit log.
This sample problem domain is based on a project I was a part of where I successfully used Event Sourcing, but on a very different technical stack.
Event Sourcing
Event Sourcing is an architectural approach to data persistence that captures all incoming data as an append-only event log. In effect, you're making the logical change log into the single source of truth in your system. Rather than modeling the current state of the system and trying to map every incoming input and outgoing query to this centralized state, event sourcing explicitly models the changes to the system.
So how does it work? First, let's do some modeling of the online telehealth system. Just considering events around the online appointments I might model events for:
- Appointment Requested
- Appointment Scheduled
- Appointment Started
- Appointment Finished
- Appointment Cancelled
These events are small types carrying data that models the change of state whenever they're recorded. Do note that the event type names are expressed in the past tense and are directly related to the business domain. The event name by itself can be important in understanding the system behavior. As an example, here's a C# version of AppointmentScheduled
that models whether the appointment is assigned to a certain provider (medical professional):
public record AppointmentScheduled(Guid ProviderId, DateTimeOffset EstimatedTime);
Taking Marten as a relatively typical approach, these event objects are persisted in the underlying database as serialized JSON in a single table that's ordered sequentially by the order in which the events are appended. Like other event stores, Marten also tracks metadata about the events, like the time the event was captured, and potentially more data related to distributed tracing, like correlation identifiers.
The events are organized into streams of related events that model a single workflow within the system. In the telehealth system, there are event streams for:
- Appointments
- Provider Shift to model the activity of a single provider during a single day
Although events in an Event Sourcing approach are the source of truth, you do still need to understand the current system state to support incoming system commands or supply clients with queries against the system state. This is where the concept of a projection comes into play. A projection is a view of the underlying events suitable for providing the write model to validate or apply incoming commands or a read model that's suitable for usage by system queries. If you're familiar with the concept of materialized views in relational database engines, a projection in a system based on Event Sourcing plays a very similar role.
The advantages, or maybe just the applicability, of Event Sourcing are:
- It creates a rich audit log of business activity.
- It supports the concept of “Time Travel” or temporal querying to be able to analyze the state of the system in the past by selectively replaying events.
- Using Event Sourcing makes it possible to retrofit potentially valuable metrics about the system after the fact by again replaying the events.
- Event Sourcing fits well with asynchronous programming models and event-driven architectures.
- Having the event log can often clarify system behavior.
Command Query Responsibility Segregation
CQRS is an architectural pattern that calls for the system state to be divided into separate models. The write model is optimized for transactions and is updated by incoming commands. The read model is optimized for queries to the system. As you'll rightly surmise, something has to synchronize and transform the incoming write model to the outgoing read model. That leads to the architectural diagram in Figure 2, which I think of as the “scary view of CQRS.”
In this common usage of CQRS, there's some kind of background process that's asynchronously and continuously applying data updates to the write model to update the read model. This can lead to extra complexity through more necessary infrastructure compared to the classic “one database model” system model. I don't believe this is necessarily more code than using the traditional one database model. Rather, I would say that the hidden mapping and translation code in the one database model is much more apparent in the CQRS approach.
Event Sourcing and CQRS can be used independently of each other, but you'll very frequently see these two techniques used together. Fortunately, as I'll show in the remainder of this article, Marten can help you create a simpler architecture for CQRS with Event Sourcing than the diagram above.
Requirements through Event Storming
Event Storming (https://www.eventstorming.com/) is a very effective requirements workshop format to help a development team and their collaborating business partners understand the requirements for a software system. As the name suggests, Event Storming is a natural fit with Event Sourcing (and CQRS architectures).
Although there are software tools to do Event Storming sessions online, the easiest way to get started with Event Storming is to grab some colored sticky notes, a couple of markers, and convene a session with both the development team and the business domain experts near a big whiteboard or a blank wall.
The first step is to start brainstorming on the domain events within the business processes. As you discover these logical events, you'll write the event name down on an orange card and stick it on the board. As an example from the telehealth problem domain, some events might be “Appointment Requested” or “Appointment Scheduled” or “Appointment Cancelled.” Note that these events are named tersely and are expressed in the past tense. As much as possible, you want to try to organize the events in the sequential order in which they occur within the system. If using a whiteboard, I also like to add some ad hoc arrows to delineate possible branching or relationships, but that's not part of the formal Event Storming approach.
The next step - but don't think for a minute that this must be a linear flow and that you shouldn't iterate between steps at any time - is to identify the commands or input to the system that will cause the previously identified events in the system. These commands are recorded as blue notes just to the left of the event or events that the command may cause in the system. The nomenclature, in this case, is in the present tense, like “Request Appointment.”
In the third step, try to identify the business entities you'll need in order to process the incoming command inputs and decide which events should be raised. In Event Storming (and Event Sourcing) nomenclature, these are referred to as “Aggregates.” In the case of the telehealth, I've identified the need to have an Appointment aggregate that reflects the current state of an ongoing or requested patient appointment and a “Provider Shift” to track the time and activity of a provider during a particular day. These aggregates are captured in yellow cards and posted to the board.
Beyond that, you can optionally use:
- Green cards to denote informational views that users of the system need to access to carry out their work. In the case of the telehealth system, I'm calling out the need for a Board view that represents a related group of appointments and providers during a single workday. For example, pediatric appointments in the state of Texas on July 18, 2022 are a single Board.
- Significant business logic processes that potentially create one or more domain events are recorded in purple notes. In the telehealth example, there's going to be some kind of “matching logic” that tries to match appropriate providers with the incoming appointments based on a combination of availability, specialty, and the licensure of the provider.
- External system dependencies can be written down on pink cards to record their existence. In this case, I'll probably use Twilio, or something similar, to host any kind of embedded chat or teleconferencing, so I'm noting that in the Event Storming session.
Figure 3 shows a sample for what an Event Storming session on the telehealth system might look like.
I'm a big fan of Event Storming to discover requirements and to create a common understanding of the business domain. Event Storming stands apart from many traditional requirements elicitation techniques by directly pointing the way toward the artifacts in your code. Event Storming sessions are a great way to discover the ubiquitous language for the system that is a necessary element of doing domain-driven development (DDD).
Getting Started with Marten
To get started with Marten as your event store, you'll first need a Postgresql database. My preference for local development is to use Docker to run the development database, and this is a copy of a docker-compose.yaml
file that will get you started:
version: '3'
services:
postgresql:
image: "clkao/postgres-plv8:latest"
ports:
- "5433:5432"
Assuming that you have Docker Desktop installed on your local development computer, you just need to type this in your command line at the same location as the file above:
docker compose up -d
The command above starts the Docker container in the background. Next, let's start a brand new ASP.NET Core Web API project with this command:
dotnet new webapi
And let's add a reference to Marten with some extra command line utilities you'll use later with:
dotnet add package Marten.CommandLine
Switching to the application bootstrapping in the Program file created by the dotnet new template that I used, I'll add the following code:
builder.Services.AddMarten(opts =>
{
var connString = builder.Configuration.GetConnectionString("marten");
opts.Connection(connString);
// There will be more here later...
});
Last, I'll add an entry to the appsettings.json
file for the database connection string:
{
"ConnectionStrings": {
"marten": "connection string"
}
}
To enable some administrative command line tooling that I'll use later, replace the last line of code in the generated Program file with this call:
// This is using the Oakton library
await app.RunOaktonCommands(args);
Appending Events with Marten
Marten (https://martendb.io) started its life as a library to allow .NET developers to exploit the robust JSON support in the Postgresql database engine as a full-fledged document database with a small event sourcing capability bolted onto the side. As Marten and Marten's community have grown, the event sourcing functionality has matured and probably drives most of the growth of Marten at this point.
In the telehealth system, I'll write the very simplest possible code to append events for the start of a new ProviderShift
stream. First though, let's add some event types for the ProviderShift
workflow:
public record ProviderAssigned(Guid AppointmentId);
public record ProviderJoined(Guid BoardId, Guid ProviderId);
public record ProviderReady();
public record ProviderPaused();
public record ProviderSignedOff();
public record ChartingFinished();
public record ChartingStarted();
I'm assuming the usage of .NET 6 or above here, so it's legal to use C# record types. That isn't mandatory for Marten usage, but it's convenient because events should never change during the lifetime of the system. Mostly for me though, using C# records just makes the code very terse and easily readable.
If you're interested, the underlying table structure for streams and events that Marten generates is shown in Listing 1 and Listing 2.
Listing 1: Stream Table
CREATE TABLE mt_streams (
id uuid NOT NULL,
type varchar NULL,
version bigint NULL,
timestamp timestamptz NOT NULL DEFAULT (now()),
snapshot jsonb NULL,
snapshot_version integer NULL,
created timestamptz NOT NULL DEFAULT (now()),
tenant_id varchar NULL DEFAULT '*DEFAULT*',
is_archived bool NULL DEFAULT FALSE,
CONSTRAINT pkey_mt_streams_id PRIMARY KEY (id)
);
Listing 2: Events Table
CREATE TABLE mt_events (
seq_id bigint NOT NULL,
id uuid NOT NULL,
stream_id uuid NULL,
version bigint NOT NULL,
data jsonb NOT NULL,
type varchar(500) NOT NULL,
timestamp timestamp with time zone NOT NULL DEFAULT '(now())',
tenant_id varchar NULL DEFAULT '*DEFAULT*',
mt_dotnet_type varchar NULL,
is_archived boolNULL DEFAULT FALSE,
CONSTRAINT pkey_mt_events_seq_id PRIMARY KEY (seq_id)
);
ALTER TABLE mt_events
ADD CONSTRAINT fkey_mt_events_stream_id FOREIGN KEY(stream_id)
REFERENCES cli.mt_streams(id)ON DELETE CASCADE;
You'll also notice that I'm not adding a lot of members to most of the events. As you'll see in the next code sample, Marten tags all these captured events to the provider shift ID anyway. Just the name of the event type by itself denotes a domain event, so that's informative. In addition, Marten tags each event captured with metadata like the event type, the version within the stream, and, potentially, correlation and causation identifiers.
Now, on to appending events with Marten. In the following code sample, I spin up a new Marten DocumentStore that's the root of any Marten usage, then start a new ProviderShift
stream with a couple initial events:
// This would be an input
var boardId = Guid.NewGuid();
var store = DocumentStore.For("connection string");
using var session = store.LightweightSession();
session.Events.StartStream<ProviderShift>(
new ProviderJoined(boardId),
new ProviderReady()
);
await session.SaveChangesAsync();
Similar to Entity Framework Core's DbContext
type, the Marten IDocumentSession
represents a unit of work that I can use to organize transactional boundaries by gathering up work that should be done inside of a single transaction, then helping to commit that work in one native Postgresql transaction. From the Marten side of things, it's perfectly possible to capture events for multiple event streams and even a mix of document updates within one IDocumentSession
.
Projections with Marten
Now that you know how to append events, the next step is to have the provider events projected into a write model representing the state of the ProviderShift
that you'll need later. That's where Marten's projection model comes into play.
As a simple example, let's say that you want all of the provider events for a single ProviderShift
rolled up into this data structure:
public class ProviderShift
{
public Guid Id { get; set; }
public int Version { get; set; }
public Guid BoardId { get; private set; }
public Guid ProviderId { get; init; }
public ProviderStatus Status { get; private set; }
public string Name { get; init; }
public Guid? AppointmentId { get; set; }
// More here in just a minute...
}
Hopefully you'll be able to trace how all of this information could be gleaned from the event records like ProviderReady
that I defined earlier. In essence, what you need to do is to apply the “left fold” concept from functional programming to combine all the events for a single ProviderShift
event stream into that structure above.
The one exception is the ProviderShift.Version
property. One of Marten's built-in naming conventions (which can be overridden) is to treat any public member of an aggregated type with the name “Version” as the stream version, such that when Marten applies the events to update the projected document, this member is set by Marten to be the most recent version number of the stream itself. To make that concrete, if a ProviderShift
stream contains four events, then the version of the stream itself is 4.
As the simplest possible example, I'm going to use Marten's self-aggregate feature to add the updates by event directly to the ProviderShift
type up above. Do note that it's possible to use an immutable aggregate type for this inside of Marten, but I'm choosing to use a mutable object type just because that leads to simpler code. In real usage, be aware that opting for immutable aggregate types works the garbage collection in your system harder by spawning more object allocations. Also, be careful with immutable aggregates because that can occasionally bump into JSON serialization issues that are easily avoidable with mutable aggregate types.
In this case, the event stream within the application should be started with the ProviderJoined
event, so I'll add a method to the ProviderShift
type up above that creates a new ProviderShift
object to match that initial ProviderJoined
event, like so:
public static async Task<ProviderShift>
Create(ProviderJoined joined, IQuerySession session)
{
var p = await session.LoadAsync<Provider>(joined.ProviderId);
return new ProviderShift
{
Name = $"{p.FirstName} {p.LastName}",
Status = ProviderStatus.Ready,
ProviderId = joined.ProviderId,
BoardId = joined.BoardId
};
}
A couple notes about the code above:
- There's no interface or mandatory base class of any kind from Marten in this usage, just naming conventions.
- The method name
Create()
with the first argument type beingProviderJoined
exercises a naming convention in Marten to identify this method as taking part in the projection. - The Marten team urges some caution with this, but it's possible to query Marten for additional information inside the
Create()
method by passing in the MartenIQuerySession
object. - As implied by this code, it's quite possible with Marten to store reference or relatively static data like basic information about a provider (name, phone number, qualifications) in a persisted document type while also using the Marten event store capabilities.
Now let's add some additional methods to handle other event types. The easiest thing to do is to add more methods named Apply(event type) like this one:
public void Apply(ProviderReady ready)
{
AppointmentId = null;
Status = ProviderStatus.Ready;
}
public void Apply(ProviderAssigned assigned)
{
Status = ProviderStatus.Assigned;
AppointmentId = assigned.AppointmentId;
}
Or even better, if the resulting method can be a one line, use the newer C# method expression option:
// This is kind of a catch all for any paperwork
// the provider has to do after an appointment
// for the just concluded appointment
public void Apply(ChartingStarted charting) =>
Status = ProviderStatus.Charting;
Again, to be clear, these methods are added directly to the ProviderShift
class to teach Marten how to apply events to the ProviderShift
aggregate.
Let's move on to applying the aggregate with Marten's “live aggregation” mode:
public async Task access_live_aggregation(IQuerySession session, Guid shiftId)
{
// Fetch all the events for the stream, and
// apply them to a ProviderShift aggregate
var shift = await session
.Events
.AggregateStreamAsync<ProviderShift>(shiftId);
}
In the code above, IQuerySession
is a read-only version of Marten's IDocumentSession
that's available in your application's Dependency Injection container in a typical .NET Core application. The code above is fetching all the captured events for the stream identified by shiftId
, then passed one at a time, in order, to the ProviderShift
aggregate to create the current state from the events.
This usage queries for every single event for the stream, and deserializes each event object from persisted JSON in the database, so it could conceivably get slow as the event stream grows. Offhand, I'm guessing that I'm probably okay with the ProviderShift
aggregation only happening “live,” but I do have other options.
The second option is to use Marten's “inline” lifecycle to apply changes to the projection at the time that events are captured. To use this, I'm going to need to do just a little bit of configuration in the Marten set up:
var store = DocumentStore.For(opts =>
{
opts.Connection("connection string");
opts.Projections.SelfAggregate<ProviderShift>
(ProjectionLifecycle.Inline);
});
Now, when I capture events against a ProviderShift
event stream, Marten applies the new events to the persisted ProviderShift
aggregate for that stream, and updates the aggregated document and appends the events in the same transaction for strong consistency:
var shiftId = session.Events.StartStream<ProviderShift>(
new ProviderJoined(boardId),
new ProviderReady()
).Id;
// The ProviderShift aggregate will be updated at this time
await session.SaveChangesAsync();
// Load the persisted ProviderShift right out of the database
var shift = await session.LoadAsync<ProviderShift>(shiftId);
Right here, you can hopefully see the benefit of Marten coming with both a document database feature set and the event store functionality. Without any additional configuration, Marten can store the projected ProviderShift
documents directly to the underlying Postgresql database.
Lastly, there's one last choice. I can use eventual consistency and allow the ProviderShift
aggregate to be built in an asynchronous manner in background threads. This is going to require a little more configuration, though, as I need to be using the full application bootstrapping, as shown below:
builder.Services.AddMarten(opts =>
{
// This would typically come from config
opts.Connection("connection string");
opts.Projections
.SelfAggregate<ProviderShift>(
ProjectionLifecycle.Async);
})
// This adds a hosted service to run
// asynchronous projections in the background
.AddAsyncDaemon(DaemonMode.HotCold);
As shown in Figure 4, Marten has an optional subsystem called the “async daemon” that's used to process asynchronous projections with an eventual consistency model in a background process.
The async daemon runs as a .NET IHostedService
in a background thread. The daemon constantly scans the underlying event store tables and applies new events to the registered projections. In the case of the ProviderShift
aggregation, the async daemon applies new incoming events like the ProviderReady
or ProviderAssigned
events that are handled by the ProviderShift
aggregate to update the ProviderShift
aggregate documents and persists them using Marten's document database functionality. The async daemon comes with guarantees to:
- Apply events in sequential order
- Apply all events at least once
The async daemon is an example of eventual consistency where the query model (the ProviderShift
aggregate in this case) is updated to match the incoming events rather than the strong consistency model allowed by Marten's inline projection lifecycle.
To summarize the projection lifecycles in Marten and their applicability, refer to Table 1.
Time Travel
One of the advantages of using Event Sourcing is the ability to use “time travel” to replay events up to a certain time to recreate the state of the system at a certain time or at a certain revision. In the sample below, I'm going to recreate the state of a given ProviderShift
at a time in the past:
public async Task time_travel(
IQuerySession session,
Guid shiftId,
DateTimeOffset atTime)
{
// Fetch all the events for the stream, and
// apply them to a ProviderShift aggregate
var shift = await session
.Events
.AggregateStreamAsync<ProviderShift>(shiftId, timestamp:atTime);
}
In this usage, Marten queries for all the events for the given ProviderShift
stream up to the point in time expressed by the atTime
argument and calculating the projected state at that time. Inside of this fictional telehealth system, it might very well be valuable for the business to replay events throughout the day to understand how the appointments and provider interaction played out and diagnose scheduling delays.
Projecting Events to a Flat Table
One of the advantages of Marten is that it allows you to be flexible in your persistence approach within a single database engine without having to introduce yet more infrastructure. Marten was originally built to be a document database with a nascent event store capability over the top of the existing Postgresql database engine, but the event store functionality has matured greatly since then. In addition, Postgresql is a great relational database engine, so I can even take advantage of that and write projections that write some of the events to a plain old SQL table.
Back to the fictional telehealth system, one of the features I'll absolutely need is the ability to predict the wait times that patients should expect when they request an appointment. To support that calculation, the system needs to track statistics about how long appointments last during different times of the day. To that end, I'm going to add another projection against the same events I'm already capturing, but this time, I'm going to use Marten's EventProjection
recipe that allows me to be more explicit about how the projection handles events.
First, I'm going to start a new class for this projection and define through Marten itself what the table structure is:
public AppointmentDurationProjection()
{
// Defining an extra table so Marten can manage it for us
var table = new Table("appointment_duration");
table.AddColumn<Guid>("id").AsPrimaryKey();
table.AddColumn<DateTimeOffset>("start");
table.AddColumn<DateTimeOffset>("end");
SchemaObjects.Add(table);
}
// more later...
Next, using Marten's naming conventions, I'm going to add a method that handles the AppointmentStarted
event in this projection:
public void Apply(
IEvent<AppointmentStarted> @event,
IDocumentOperations ops)
{
var sql = "insert into appointment_duration"
+ " (id, start) values (?, ?)";
ops.QueueSqlCommand(sql, @event.Id, @event.Timestamp);
}
And an additional method for the AppointmentFinished
event:
public void Apply(
IEvent<AppointmentFinished> @event,
IDocumentOperations ops)
{
var sql = "update appointment_duration "
+ "set end = ? where id = ?";
ops.QueueSqlCommand(sql, @event.Timestamp, @event.Id);
}
The next step is to add this new projection to the system by revisiting the AddMarten
() section of the Program file and adding that projection like so:
builder.Services.AddMarten(opts =>
{
// other configuration...
opts.Projections.Add<AppointmentDurationProjection>
(ProjectionLifecycle.Async);
// OR ???
opts.Projections.Add<AppointmentDurationProjection>
(ProjectionLifecycle.Inline);
});
There's a decision to be made about the new AppointmentDurationProjection
that I'm adding to a system that's already in production. If I make the AppointmentDurationProjection
asynchronous and deploy that change to production, the Marten async daemon attempts to run every historical event from the beginning of the system through this new projection until it has eventually reached what Marten calls the “high water mark” of the event store, and then continues to process new incoming events at a normal pace.
There's the concept of stream archival in Marten that you can use to avoid the potential performance problem of having to replay every event from the beginning of the system.
If, instead, I decide to make the new AppointmentDurationProjection
run inline with event capture transactions, that new table only reflects events that are captured from that point on. And maybe that's perfectly okay for the purposes here.
But what if, instead, I want that new projection to run inline and also want it applied to every historical event? That's the topic of the next section.
Replaying Events or Rebuilding Projections
It's an imperfect world, and there will occasionally be reasons to rebuild the stored document or data from a projection against the persisted events. Maybe I had a reason to change how the projection was created or structured? Maybe I've added a new projection? Maybe, due to intermittent errors of some sort, the async daemon had to skip over some missing events or there was some sort of “poison pill” event that Marten had to skip over due to errors in the projection code?
The point is that the events are the single source of truth, the stored projection data is a read only view of that raw data, and I can rebuild the projections from the raw events later.
Here's an example of doing this rebuild programmatically:
public async Task rebuild_projection(
IDocumentStore store,
CancellationToken cancellation)
{
// create a new instance of the async daemon
// as configured in the document store
using var daemon = await store.BuildProjectionDaemonAsync();
await daemon.RebuildProjection
<AppointmentDurationProjection>(cancellation);
}
That code deletes any existing data in the appointment_duration
table, reset Marten's record of the progress of the existing projection, and start to replay all non-archived events in the system from event #1 all the way to the known “high water mark” of the event store at the beginning of this operation.
This can function, simultaneously with the running application, as long as the projection being rebuilt isn't also running in the application.
To make this functionality easier to access and apply at deployment time, Marten comes with some command line extensions to your .NET application with the Marten.CommandLine library. Marten.CommandLine works with the related Oakton (https://jasperfx.github.io/oakton) library that allows .NET developers to expose additional command line tools directly to their .NET applications.
Assuming that your application has a reference to Marten.CommandLine, you can opt into the extended command line options with this line of code in your Program file:
// This is using the Oakton library
await app.RunOaktonCommands(args);
From the command line at the root of your project using the Marten.CommandLine library, type:
dotnet run -- help projections
This lets you access the built-in usage help for the Oakton commands active in your system. With Marten.CommandLine, you should see some text output like this:
projections - Marten's asynchronous projection...
└── Marten's asynchronous projection...
└── dotnet run -- projections
├── [-i, --interactive]
├── [-r, --rebuild]
├── [-p, --projection <projection>]
├── [-s, --store <store>]
├── [-l, --list]
├── [-d, --database <database>]
├── [-l, --log <log>]
├── [-e, --environment <environment>]
├── [-v, --verbose]
├── [-l, --log-level <loglevel>]
└── [----config:<prop> <value>]
To rebuild only the new AppointmentDurationProjection
from the command line, type this at the command line at the root of the telehealth system:
dotnet run -- projections --rebuild -p AppointmentDurationProjection
This command line usage was intended for both development or testing time, but also for scripting production deployments.
The Marten team and community, of course, looks forward to the day when Marten is able to support a “zero downtime” projection rebuild model.
Command Handlers with Marten
I've spent a lot of time talking about Event Sourcing so far, but little about CQRS, so let's amend that by considering the code that you'd need to write as a command handler. As part of the telehealth system, the providers need to perform a business activity called “charting” at the end of each patient appointment where they record whatever notes or documentation is required to close out the appointment. The telehealth system absolutely needs to track the time that providers spend charting.
To mark the end of the charting activity, the system needs to accept a command message from the provider's user interface client that might look something like this:
public record CompleteCharting(Guid ShiftId, int Version);
To write the simplest possible ASP.NET Core controller endpoint method that handles this incoming command, verifies the request against the current state of the ProviderShift, and raises a new ChartingFinished
event, I'll write this code:
public async Task CompleteCharting(
[FromBody] CompleteCharting charting,
[FromServices] IDocumentSession session)
{
var shift = await session.LoadAsync<ProviderShift>(charting.ShiftId);
// Validate the incoming data before making the status transition
if (shift.Status != ProviderStatus.Charting)
{
throw new Exception("invalid request");
}
var finished = new ChartingFinished();
session.Events.Append(charting.ShiftId, finished);
await session.SaveChangesAsync();
}
The big thing I missed up there is any kind of concurrency protection to verify that either I'm not erroneously receiving duplicate commands for the same ProviderShift
or that I want to force the commands against a single ProviderShift
to be processed sequentially.
First, let's try to solve the potential concurrency issues with optimistic concurrency, meaning that I'm going to start by telling Marten what initial version of the ProviderShift
stream the command thinks the stream should be at. If, at the time of saving the changes on the IDocumentSession
, Marten determines that the event stream in the database has moved on from that version, Marten throws a concurrency exception and rollback the transaction.
Recent enhancements to Marten make this workflow much simpler. The following code rewrites the Web service method above to incorporate optimistic concurrency control based on the CompleteCharting.Version
value that's assumed to be the initial stream version:
public async Task CompleteCharting(
[FromBody] CompleteCharting charting,
[FromServices] IDocumentSession session)
{
var stream = await session
.Events
.FetchForWriting<ProviderShift>(charting.ShiftId, charting.Version);
// Validation code...
var finished = new ChartingFinished();
stream.AppendOne(finished);
await session.SaveChangesAsync();
}
And, for another alternative, if you're comfortable with a functional programming inspired “continuation passing style” usage of Lambdas:
return session
.Events
.WriteToAggregate<ProviderShift>(
charting.ShiftId,
charting.Version,
stream =>
{
// validation code...
var finished = new ChartingFinished();
stream.AppendOne(finished);
});
Optimistic concurrency checks are very efficient, assuming that actual concurrent access is rare, because it avoids any kind of potential expensive database locking. However, this requires some kind of exception-handling process that may include selective retries. That's outside the scope of this article.
Because Marten is built on top of the full-fledged Postgresql database, Marten can take advantage of Postgresql row locking to wait for exclusive access to write to a specific event stream. I'll rewrite the code in the previous sample to instead use exclusive locking:
return session
.Events
.WriteExclusivelyToAggregate
<ProviderShift>(charting.ShiftId, stream =>
{
// validation code...
var finished = new ChartingFinished();
stream.AppendOne(finished);
});
This usage uses the database itself to order concurrent operations against a single event stream, but be aware that this usage can also throw exceptions if Marten is unable to attain a write lock on the event stream before timing out.
Summary
Marten is one of the most robust and feature-complete tools for Event Sourcing on the .NET stack. Arguably, Marten is an easy solution for Event Sourcing within CQRS solutions because of its “event store in a box” inclusion of both the event store and asynchronous projection model within one single library and database engine.
Event Sourcing is quite different from the traditional approach of persisting system state in a single database structure, but has strengths that may well fit business domains better than the traditional approach. CQRS can be done without necessarily having a complicated infrastructure.
Table 1: Marten Projection Lifecycles
Task | Description |
---|---|
Live | The projected documents are evaluated from the raw events on demand. This lifecycle is recommended for short event streams or in cases where you want to optimize much more for fast writes with few reads. |
Inline | The projected documents are updated and persisted at the time of event capture, and in the same database transaction for a strong consistency model. |
Async | Projections are updated from new events in a background process. This lifecycle should be used any time there's a concern about concurrent updates to a projected document and should almost always be used for projections that span multiple event streams. |