Event Store Blog

Event Sourcing and CQRS - Event Store Blog

Written by Alexey Zimarev | Jun 17, 2020 11:00:00 PM

CQRS stands for Command-Query Segregation Principle. Greg Young described (and named) the pattern thoroughly in 2010, but the idea existed way before that time. On a high level, CQRS states the fact that operations that trigger state transitions should be described as commands and any data retrieval that goes beyond the need of the command execution, should be named a query. Because the operational requirements for executing commands and queries are very often different, developers should consider using different persistence techniques for handling commands and queries, therefore segregating them.

Operations vs Reporting

Most software systems need to persist data. Since for the scope of this guide we assume that the business logic of a software system is represented and executed by the Domain Model, we can say that application persistence focuses on persisting domain objects, like aggregates and entities. Because domain objects often have complex structures, developers might need to apply advanced persistence techniques to overcome the impedance mismatch. The Domain Model-oriented persistence is optimised for transactional performance, because the essential function of the Domain Model is to execute operations that trigger state transitions in the system, according to the logic and rules of the model.

Such an approach, however, could create challenges when it comes to retrieving data for reporting purposes.

Don’t see reporting as an act of creating a PDF or CSV file. Each query to the database that aims to get some data in order to show it to the user or return the result of an API call is also reporting.

The challenge surfaces from the fact that reporting needs are often drastically different from the needs of executing transactional operations. In addition, many systems have a clear imbalance between the number of writes and reads. For example, a typical user-facing application has much less writes than reads, because users normally read the information provided by the system and rarely execute operations in the systems. Some systems, however, have the opposite imbalance with prevailing writes. Think of a back-office of your bank, or a vehicle tracking system as examples.

One of the first traces of the original discussion around the reporting issue when using the Domain Model pattern happened during the The challenge of executing DDD panel with Martin Fowler, Eric Evans and others, back in 2004. Also in 2004, Martin Fowler published the ReportingDatabase pattern in Bliki. The pattern suggests using a separate database for reporting purposes and to push changes from the operational database to that reporting database.

Source: Martin Fowler’s bliki

The Reporting Database pattern is similar to CQRS but not exactly the same. It aims to solve the similar issue, but gets confused with the term reporting. In the Reporting Database pattern context, reporting has a more traditional meaning.

The kick-off question of the aforementioned panel discussion was:

On the other hand business execs want / crave / require “ad-hoc” reporting, with the degree of “ad-hoc-ness” varying from simple reports to data warehouses. You’d like to simply write some SQL and stuff the results into a report, but this is impossible when you have the domain model supporting all of your complex logic. So, what do you do?

Here, reporting only covers something that is accessed now and then. Such a report is only actual at the moment it gets produced, just as a printed report.

CQRS, however, assumes that any query is some sort of report, wether it is moved, printed out, or shown to the user inside the application.

Commands vs Queries

Developers often confuse CQRS with the Reporting Database pattern, although there is nothing in the name (Command-Query Responsibility Segregation) to suggest that.

The essence of CQRS is to separate command flow from the query flow, based on fundamental characteristics of those concepts.

Commands

The main attribute of a command is that when the command gets successfully executed, the system transitions to a new state.

Another important attribute of a command is that it conveys the intent of the user. Requests like UpdateCustomer are not exactly commands according to CQRS. Within the application boundaries, handling a command should result in one transaction on one Aggregate.

As a consequence, the command flow in CQRS goes hand in hand with the Task-Based User Interface pattern. In contrast with the CRUD-based user interface, which has four basic operations available for users - Create, Edit, Save and Delete, a task-based UI makes each operation explicit, like Check Out, Add Item or Cancel Order. By doing this, it makes operations available to the user easily translatable to commands that the UI sends to the Domain Model via an API.

Queries

Queries, as the name suggest, allow getting data from the persistent store, so it can be shown to the user, sent to another system or used for any other purpose.

Although the image shows all the system elements stored in what looks like a single database, it is just an example and doesn’t have to be that way.

Unlike commands, queries do not need to involve the Domain Model because queries do not execute any operations and should not contain any business logic. In addition, queries have no side effects and are completely idempotent, so it doesn’t matter how many times the query gets executed - it will always return the same result, unless the system state changed in the meantime.

Therefore, when implementing a query handler, developers don’t need to use the same way to access data as they do when persisting domain objects. For example, if the Domain Model persistence uses some ORM framework that distribute state of domain objects across tables in a relational database, a query could fire an SQL statement that ignores the ORM and gets the data directly from those tables.

CQRS also suggests that queries target specific use cases and return a pre-composed data set that can be shown in its entirety on the screen or in a cohesive part of the screen.

CQRS with Event Sourcing

Back in 2016 in his talk A Decade of DDD, CQRS, Event Sourcing Greg Young said the following:

“You need to look at CQRS not as being the main thing. CQRS was a product of its time and meant to be a stepping stone towards the ideas of Event Sourcing.”

It is not a coincidence that Greg proposed the CQRS pattern at the same time as he introduced Event Sourcing to the public. Unlike state-based persistence, where there might be a way to avoid using the domain model for queries, such an approach is hard to impossible to use in event-sourced systems. It is because there is no place where the state of a domain object is stored entirely.

As described in the previous article, domain entities in event-sourced systems are stored as event streams, essentially each entity is a sequence of events from the persistence point of view. There is more to it yet. Domain events in event store alone do not allow reconstructing the entity state without knowing the logic that the entity uses to rehydrate its own state from events and this logic is in the code of the Domain Model.

Projections

We need a way to project events to an alternative store, which we can easily query. It could be a relational or document database, or cache, or any other type of persistence that is applicable for a specific use case.

The idea is that a software component called a Projection subscribes to the live event feed of the events database and when it receives an event, it could project the information in that event to a query model in a dedicated reporting database.

In the context of CQRS you often hear write side, read side and read model. The write side is the command side, since that’s the place where state mutations, also referred as writes, take place. By comparison, the query side is often being referred as the read side, and the query model gets by name read model.

By definition, a projection is a representation of an object using a different perspective. For example, isometric and orthographic projections allow us to represent a 3D object on paper using different points of view.

When we talk about projecting data, we usually mean representing the data differently from the form it was originally stored. For example, a relational database view is a projection. It uses a transformation query to represent a subset of data that is already available in the database, producing a derived result from that data. The view, however, is stateless and has no side effects - it doesn’t change the original data in any way.

Projecting events

The same applies to event projections with one difference. When projecting events, we have to store the state, so we have the latest snapshot of the projected state. It works somewhat similarly to materialised views that some relational databases support. Still, outside of its own state, such a projection doesn’t manipulate the original data (events) in any way.

In CQRS, the query side is always a projection, even if it is not implemented as a stateful manner. When it comes to projecting events, we really want to keep the state of our read models in a queryable store to solve the issue of events streams being hard to query ad-hoc.

The idea is that the projection will receive all the events that it is able to project and will do the normal CRUD operations on the read model it controls, using the normal CRUD operations provided by the read model database (which could be anything).

Projections do the same thing as the When function in the entity code. Just as the entity state, the read model state is the left fold of all the event it processes.

Unlike the entity state, which is only applying events for that single entity, projections aren’t limited to only process events of a single entity and can assemble and aggregate data for multiple entities, even for different types of entities.

Keeping all this in mind, we can write code for a simple projection.

public class OrderOverviewProjection {
    Database db;

    OrderOverviewProjection(Database db) => this.db = db;

    public void Project(object event) {
        var dbOp = GetOperation(event);
        db.Execute(dbOp);
    }

    DbOperation GetOperation(object event) {
        return event switch {
            OrderCreated e => new CreateOperation(
                new OrderOverview(e.OrderId, e.CustomerId)),
            ItemAdded e => Update(e.OrderId).With(x => x.Items.Add(MapItem(e.Item))),
            PaymentRequested e => Update(e.OrderId).With(x => x.Status = AwaitingPayment),
            PaymentReceived e => Update(e.OrderId).With(x => x.Status = Paid)
        }

        UpdateOperation<OrderOverview> Update(string id)
        {
            return new UpdateOperation<OrderOverview>(id);
        }
    }
}

Projections are usually dependant on specific infrastructure interfaces, since they work directly with databases that keep their read models.

Subscriptions

Since the query model is used almost every time the application handles any type of query, whether a GET HTTP request or something else, we must ensure that the information stored in the read model is up-to-date. Therefore, we need to establish a real-time connection to the event store, so the projection receives events immediately after they are stored.

Event Store in particular provides a way to achieve that requirement. The most common implementation for client-side projections is to use catch-up subscriptions. The term “catch-up” comes from the fact that such subscriptions, when first connected to the server, will read all the historical events (catch up) and then automatically switch to real-time event processing.

With the Event Store .NET client, you can create a subscription like this:

// Presumably we got an IEventStoreConnection instance from somewhere

var subscription = connection.SubscribeToStreamFrom(
    stream: "mystream",
    lastCheckpoint: StreamCheckpoint.StreamStart,
    settings: CatchUpSubscriptionSettings.Default,
    eventAppeared: EventAppeared);

When you call the SubscribeToStreamFrom method, the subscription activates immediately. The EventAppeared function will be called for each event that already exists in the mystream stream. When all the historical events will be processed, the subscription will get to the real-time mode and starts receiving events as they are appended to the stream.

The AppendStream function could look like this, if we want to use our projection:

Task EventSppeared(EventStoreCatchUpSubscription _, ResolvedEvent evt) {
    var domainEvent = DeserialzeEvent(resolvedEvent);
    projection.Project(domainEvent);
}

Of course, there’s more plumbing involved in places like the DeserializeEvent function, which isn’t covered here, since the aim of this article is to describe the concept.

Checkpoints

The SubscribeToStreamFrom function requires you to specify the stream checkpoint in the lastCheckpoint parameter. In the example above, we used the StreamCheckpoint.StreamStart constant, which instructs the subscription to start reading events from the beginning of time (for that stream).

It will work, but it’s not practical. When an application that hosts this subscription eventually stops and then starts again, the subscription will start catching up from the first event in the stream again. It defeats the purpose of having the read model state persisted in a database. If it’s feasible for a system to re-project all the events each time the projection starts, we could just keep all the read models in memory.

It is, actually, a valid technique for caching and keeping the aggregate state snapshot available for command processing. Keep in mind how much time will be required to read all the events at the startup.

In order to avoid re-projecting the whole history all over again, we can store the event offset (a position the event in the stream) after projecting the event. By doing that, we let the system to load the stored checkpoint when the application starts again, so it can subscribe from the last known position instead of the stream start.

With this in mind, the previous code snippet can be refactored to handle the checkpoint as well.

// Startup code
var lastCheckpoint = checkpointStore.Load("mystream-checkpoint");
var subscription = connection.SubscribeToStreamFrom(
    stream: "mystream",
    lastCheckpoint: lastCheckpoint ?? StreamCheckpoint.StreamStart,
    settings: CatchUpSubscriptionSettings.Default,
    eventAppeared: EventAppeared);

In addition to the checkpoint loading code, we need to persist the checkpoint for processed events.

Task EventSppeared(EventStoreCatchUpSubscription _, ResolvedEvent evt) {
    var domainEvent = DeserialzeEvent(resolvedEvent);
    projection.Project(domainEvent);
    checkpointStore.Save(evt.OriginalPosition.Value);
}

Now, when the application suddenly stops and then starts again, it will load the last known checkpoint and will subscribe from that stream position.

The code snippet above uses two database operations for each processed event. It might introduce an issue known as two-phase commit that has a possibility to execute only the first operation (read model update) but not the second (store the checkpoint) due to some transient failure. There are two ways to overcome the issue:

  • Use a database transaction to wrap both operations.
  • Make projections idempotent, so applying the same event twice won’t bring the read model to an invalid state.

This is the basics of projections but might’ve noticed that subscribing to a single stream won’t deliver much of a value. Projections normally handle events of many entities. This is where the $all stream becomes important to create projections that can handle events from multiple streams.

Global event stream

Subscribing to a single event stream that represents an entity isn’t always useful. More often than not the projection code needs to handle any event that comes from the domain model and build sophisticated read models that serve a variety of needs.

For example, we might need to show user a page that contains order details, including payment and shipping information. It might be that the domain model splits those concerns to different aggregates with an independent lifecycle, since they address different concerns. One common approach is to compose such a page on the front-end side and call multiple API endpoints to collect the information from different parts of the system. Albeit such an approach can be useful, with Event Sourcing we have an option to build a read model, which would represent all the information for that page. By doing so, the need to call multiple API endpoints and do somewhat complex data composition in the UI disappears.

Conceptually, to achieve the goal like that, a projection needs to receive events from different streams.

The projection will then create a read model for each order and project both order information and payment information to it.

In this case, the subscription that feeds the projection with events has to subscribe to some stream that contains all the events from entities of different types, with different ids.

‘$all’ stream

In Event Store, the concept of individual stream builds on top of the single event sequence, which is called the $all stream.

Streams with names starting with $ are considered system streams, but it doesn’t mean you can’t use them.

It means that each new event gets appended to the global event sequence. The stream name for that event serves for event indexing, so you can read a subset of events by using the stream name. However, the stream name doesn’t tell Event Store where the event needs to be persisted, since all the events go to the global append-only store. Because of this internal structure of the persistence, Event Store allows subscribing to the global stream of events. Client libraries have a special method for subscriptions to connect to that stream.

var subscription = connection.SubscribeToAllFrom(
    lastCheckpoint: lastCheckpoint ?? AllCheckpoint.AllStart,
    settings: CatchUpSubscriptionSettings.Default,
    eventAppeared: EventAppeared);

The API for the SubscribeToAllFrom is almost identical to SubscribeToStreamFrom but the checkpoint is not represented as a long integer number but as a struct with two long integers - prepare position and commit position. Therefore, the checkpoint store needs to support storing the Position struct instead of a simple number.

The most useful thing about the global event stream is that all events in it are ordered and any subscription that uses the $all stream will get events in the same order as they were written to Event Store, even if events were appended to different logical streams. That way projections can be reassured that things that must happen in order will do so, as long as the write-side of the application behaves correctly.

The illustration above shows that one projection handles events for two different event types to construct the read model with the full order overview. In order for the projection to handle payment events, those events need to contain the order id, so the projection could understand which overview record needs updating, since those records are created for each individual order.

Read model scope

When developers come to Event Sourcing with a substantial experience of persisting domain objects in traditional databases, they rarely resist an urge to build projections that build the current entity state as a read model. By doing so, they return to the comfort zone of being able to check the current state of any domain object, at any time by looking into the database. However, such an approach doesn’t really deliver the full power of CQRS combined with Event Sourcing.

One of the great outcomes of having an event-sourced system is the ability to create new read models at will, at any time, without affecting anything else. For example, if you find your read models in MongoDB to be not fitting the full-text search requirement, you can build a new projection targeting ElasticSearch with a limited set of fields that the full-text search function needs. Similarly, it is possible to build new aggregations or sets of denormalized data with the goal to have pre-calculated pages available for the UI to show, instead of running expensive queries every time a user opens that page.

Continuing the previous illustrations and snippets, our eCommerce system might one day require a page that shows all the orders for one single customer. It is possible to build an API endpoint that runs a query for that purpose. There are drawbacks for adding queries though since each query potentially introduces side effects on the database, like space used for indexes, degraded performance and so on. Some databases won’t even let you query without changing the persistence model significantly. Building a new read model, however, is relatively straightforward.

One thing to remember is that all the events included in such aggregation need to have a field that conveys the aggregation id (customer id in this case). It might be problematic if the original event schema didn’t contain enough data. It’s not exactly straightforward, but this problem has several solutions, like stream joins (enrichment), upcasting, migration and so on. Those patterns are out of scope for this article.

Here are some references for further reading: