Articles

Snapshots in Event Sourcing

Oskar Dudycz  |  20 May 2021

ES-Snapshots-1

 

Contents

Introduction

One of the great benefits of Event Sourcing is that you don't lose any business data. Each business operation ends with a new event appended to the event store.

The business object is represented by the sequence of events called a stream. When we want to execute business logic, we're reading all events from a specific stream. We're recreating the current state by applying all the events one by one in order of appearance. Based on the current state, we verify and execute the business logic.

ES-Snapshots-2

But isn't loading more than one event a performance issue? Frankly, it's not. Downloading even a dozen, or several dozens of small events is not a significant overhead.

Events are concise, containing only the information needed. EventStoreDB is optimised for such operations, and the reads scale well.

Still, you can't disagree that loading a few events will take longer than loading a single one, this is where snapshotting can help.

What Are Snapshots In Event Sourcing?

In Event Sourcing, snapshots are used when the number of events that need to be replayed to restore the state of an aggregate (the logical unit of data) need to be reduced.

Snapshots are a way of storing the current state of an aggregate at a particular point in time, and can be used to skip over the previous events when loading the aggregate. This can help improve the efficiency and performance of an event-native application.

ES-Snapshots-3

Let's look an example to explain this concept in more detail. 

Suppose I opened a bank account at the age of 18. Let's assume that I was making three transactions a day. If we multiply these numbers (3 x 17 x 365), we get 18,615 transactions.

If we follow the Event Sourcing pattern literally, we'd need to get all these transactions to calculate the current account's balance. This won't be efficient. Your first thought to make this more efficient may be caching the latest state somewhere.

Instead of retrieving all these events, we could retrieve one record and use it for our business logic. This is a snapshot. 

ES-Snapshots-4

 

Another way to describe a snapshot is by using the cash register example.

Is the balance in the cash register calculated based on all transactions since the shop was created?

No.

Usually cashiers create summaries at the end of their shift. They verify whether the state in the POS system is consistent with the actual amount of money in the cash register.

The following employee starts a new shift, at the end of which a separate summary is made. It is the same in a bank account. The billing data is opened and closed in a regular cycle. Old data is archived, and we start again with the summarised balance.

When To Take A Snapshot

Assessing when to take a snapshot is essential. Popular tactics are:

  1. Snapshot after each event. With this, you don't need to get events. Always base your business logic on the latest snapshot. It's crucial in selecting whether storing the snapshot happens in the same process as storing an event or as asynchronously in the background process. The latter may provide stale data, the former impacts writes performance. It's also worth noting that if we're doing snapshots as the performance optimisation, then additional write each time can degrade it even more.
  2. Snapshot every N number of events. You may not need to do a snapshot after each operation, but only doing it after a set number of events. If this is the case, then besides reading the snapshot, you need to also read the events that happened after the snapshot was created (maximum N number).
  3. Snapshot when the event of a specified type was stored. This would be similar to the "closing the books", or the first step to migrating long-living stream to that process. The snapshot could be stored when, e.g. CashierShiftEnded was registered.
  4. Every selected period. Storing the snapshot can be scheduled, for example, once a day, every 1 hour, etc. The risk of doing that is that spikes in the event processing may occur between the snapshots storing periods. That may reduce the benefit of doing it.

Disadvantages Of Snapshots

The need to use snapshots may hint to the model's design flaw.

Snapshots can be used as a tactical hotfix or optimisation. Adding them should not stop us from evaluating the design correctness. We should plan it, not just satisfy ourselves with the quick win.

We must also remember that we will run into a versioning problem when using snapshots. Our object will live long (as we have not shortened its lifecycle), so the risk of changing the schema is greater.

When we change our business object structure, we'll need to perform a data migration. As you probably know, it's always complicated, and can go even worse if we use snapshots as the read models. It's pretty tempting to do that when we have the latest entity version stored.

Write and read models tend to evolve as the time flow becomes more distant. Snapshots are an optimisation technique for the write model. It's purely technical. If we conflate that with other aspects like read models, we're introducing coupling that may be hard to untangle.

Where And How To Store Snapshots?

That is only limited by your imagination and the technologies used in your project. You can save them, for example, as:

  • events in the same or separate stream,
  • in a separate database,
  • in-memory (popular in actor-based systems),
  • in cache such as Redis.

Using cache or in-memory storage provides the option of setting the maximum lifetime (TTL). We can easily define that the snapshot will live only one day. Then cache will be invalidated. It helps in reducing the need for migration. However, after the snapshot was removed from a cache, we need to rebuild it again. Also, my running joke is: If you solved your problem by using a cache, you usually have two issues afterwards.

Our blog, Snapshot Strategies, goes into the storing on snapshots in more detail and outlines implementation with code examples.

Alternatives To Snapshots

Shorter Streams

Each data storage model has its specifics. Relational databases have normalisation. Document databases are denormalised. Key-value stores have strategies for keys definition. Event stores also have their specifics.

Traditionally, we do not pay much attention to the number of operations made on a business object. All of them will be condensed to a single record. In Event Sourcing, thanks to the history of events, we gain auditability and diagnostics. We also have an additional modelling aspect to consider explicitly: lifecycle over time.

Let's get back to our shopping example. Instead of modelling our stream as all the events happened for the specific cash register (e.g. transactions), we could break it down into smaller, shorter-lived entities. For example:

  • billing day (i.e. from opening to closing the shopping day),
  • cashier's shift,
  • each receipt separately.

If we ask "business", it may turn out that such a break-down reflects the reality. "Closing the books/end of business day" is a typical pattern for many industries. Very often, our technical assumptions are an oversimplification. That is why it is worth digging down and asking business to bring problems, not solutions.

By modelling the stream as the events on a given cashier's shift, we can simplify the solution. Streams will contain fewer events.

The stream's lifecycle affects not only the performance, but most of all, it is easier to maintain. I wrote about it in How to (not) do event versioning.

If our stream is short-lived, schema versioning will be easier. We rarely care about records that are deleted or archived. Therefore, when we deploy new changes and have events with the old schema, we will have to support them as long as their streams exist.

Thanks to this, we can break our deployment into "two steps". First, we deploy a version that supports both schemas and mark the old one as obsolete. Then when all streams with old schema are not active, we can remove the old code and create a new version.

ES-Snapshots-5

If you're going to use the "closing the books" process, how should you approach it?

  1. Talk with business to find out if such a pattern exists in the workflow. Search for the keywords like the end of the day, day of work, summary, shift, etc.
  2. Define the start and end of the business workflow. It can be, for example, the beginning and end of the cashier shift, opening and closing the shop, first and the final whistle of a game. Map them to events.
  3. When the lifecycle is finished, store the "summary event", e.g. CashierShiftEnded. The event should contain all the needed business summaries. It can be triggered by the business operation or a cron-based scheduler (See more in Passage of time pattern). The selected strategy depends on the specific use case.
  4. If ending closing the books has an immediate follow-up, publish the new event to the new stream. E.g. closing the month for the financial account should open a new period. This event may look similar to a snapshot, but it's not the same. It represents the specific business operation context. It does not require having all data from the previous period. It only needs a minimum set to operate (e.g. the account balance at the end of the month, cash in the cash register after the last shift).
  5. Depending on the archivisation strategy, events from the old stream can be marked as to be archived. If we don't have any business logic for the old data, we can safely schedule a task to move them to "cold storage" and delete them from the event store. Thanks to this, we still keep them for auditing purposes and maintain our storage size.

What are the disadvantages of breaking streams into smaller ones?

It can be artificial sometimes. If we were to break our stream to reflect each work hour, but it may turn out that it does not reflect the actual business flow. Streams that are too small also cause a more significant management overhead. If we add tight performance requirements, we may need to cut each potential overhead.

In this situation, snapshots can help. However, I would suggest that you treat it as a last resort when nothing else helps.

Snapshots Implementation

Let's now look at some of the strategies you can use for implementing snapshots. However, as outlined above, there are pros and cons to snapshots, so my aim is to explain how to do it when you really have to.

I'll use EventStoreDB gRPC client, TypeScript and NodeJS, but I think that it should be easy to translate to other dev environments.

Snapshots Storage

Snapshots are the representation of the current state at a certain "point in time". I'm using quotes as it doesn't have to be a precise time. Technically, "point in time" represents the stream revision, so the last event position when the snapshot was made. The time can be correlated using event (meta)data.

Snapshotting-Strategies-1

We can store snapshots in any storage. It can be:

  • a record in durable storage (e.g. relational or document database, key-value store, etc.),
  • in-memory cache entry,
  • distributed cache entry (e.g. in Redis),
  • event in the event store.

All of them have pros and cons, for example:

  • using additional durable storage introduces more moving pieces, increasing the complexity of the system.
  • by using a cache, we're risking that cache will be invalidated. We need to define a fallback scenario and prepare for the peaks when we need to rebuild snapshots. When not appropriately tuned, the in-memory cache can eat your service resources and negatively impact the solution performance. Plus, if you're trying to fix an issue with a cache, you usually end with two problems.

You should choose your strategy wisely based on your use case.

Reading

The premise of making snapshots is to speed up reading. In Event Sourcing, we retrieve the state by reading all stream events and applying them one by one on the state object. For instance, having events BankAcocountCreated, DepositRecorded, CashWithdrawn, we can add the amount from the deposit (100$) and subtract with amount from withdrawal (e.g. 80$). By applying events in such a way, we can calculate the current account balance (e.g. 20$).

If we created a snapshot, then in theory, we could use it as the current state. Why "in theory"? A snapshot is a representation of the state at a certain point in time. It may happen that, between creation and the next read, new events occurred. It's a common scenario. Plus, as I'll describe in the next paragraph, snapshots might not be stored after each change.

Not having the latest state doesn't have to be an issue. Depending on our scenario, we can live with that. Though, typically we want to make the business decisions using the latest state, not to make choices on obsolete information. Therefore, we usually load the snapshot and events that happened after the snapshot was created. What's more, if we represent the snapshot as an event, we can also use it in the regular state aggregation process.

If we define the event type as:

export type Event<
  EventType extends string = string,
  EventData extends object = object,
  EventMetadata extends object = object
> = {
  type: EventType;
  data: EventData;
  metadata?: EventMetadata;
};

Having that, we can derive the snapshot event type:

type SnapshotMetadata = {
  snapshottedStreamRevision: string;
};

type SnapshotEvent<
  EventType extends string = string,
  EventData extends object = object,
  EventMetadata extends SnapshotMetadata = SnapshotMetadata
> = Event<EventType, EventData, EventMetadata> & {
  metadata: Readonly<EventMetadata>;
};

To retrieve the snapshot follow-up events, we need to get the stream revision on which the snapshot was made. The most convenient is to store it in the metadata.


If this syntax looks weird and you are not familiar with TypeScript quirks, don't worry, I will explain it to you. TypeScript allows specifying the restrictions for the generic type parameters. EventType extends string means that type provided has to be string. TypeScript allows types aliases even for the primitive types. This syntax allows us to restrict the generic type param to be string or string type alias.

We can also assign the default type by EventType extends string = string. This is useful to simplify the usage and generic logic.

The above event type definition written in C# could look like:

public abstract class Event
{
    public abstract string Type { get; set; }
    public object Data { get; set;  };
    public object Metadata { get; set; };
}

public abstract class Event<EventData>: Event
    where EventData : class
{
    public new EventData Data
    {
        get { return base.Data as EventData; }
        set { base.Data = value; }
    }
}

public abstract class Event<EventData, EventMetadata>: Event<EventData>
    where EventData : class
    where EventMetadata : class
{
    public new EventMetadata Metadata
    {
        get { return base.Metadata as EventMetadata; }
        set { base.Metadata = value; }
    }
}

We could make also the event type fully immutable by using Readonly and defining it as:

export type Event<
  EventType extends string = string,
  EventData extends Record<string, unknown> = Record<string, unknown>,
  EventMetadata extends Record<string, unknown> = Record<string, unknown>
> = Readonly<{
  type: Readonly<EventType>;
  data: Readonly<EventData>;
  metadata?: Readonly<EventMetadata>;
}>;

However, let's leave it for now and ignore read-only syntax for brevity. I don't want to focus on the TypeScript syntax but on the Snapshotting techniques.


Let's take this from the previous article: a cash register domain. We modelled our stream as all the events (e.g. transactions) registered for the cash register since it was placed at the workstation. We'll use snapshots to tactically resolve performance issues related to loading streams with thousands of events.

With that, we can define the Cash Register entity and its snapshot event type (CashRegisterSnapshoted) as:

type CashRegister = {
  id: string;
  float: number;
  workstation: string;
  currentCashierId?: string;
};

type CashRegisterSnapshoted = SnapshotEvent<
  'cash-register-snapshoted',
  CashRegister
>;

type CashRegisterEvent =
  | PlacedAtWorkStation
  | ShiftStarted
  | TransactionRegistered
  | ShiftFinished
  | CashRegisterSnapshoted;

We also defined the union type containing all the cash register events. We can use it to define our state application logic (read more details about this process in my other article):

function when(
  currentState: Partial<CashRegister>,
  event: CashRegisterEvent
): Partial<CashRegister> {
  switch (event.type) {
    case 'placed-at-workstation':
      return {
        id: event.data.cashRegisterId,
        workstation: event.data.workstation,
        float: 0,
      };
    case 'shift-started':
      return {
        ...currentState,
        currentCashierId: event.data.cashierId,
      };
    case 'transaction-registered':
      return {
        ...currentState,
        float: (currentState.float ?? 0) + event.data.amount,
      };
    case 'shift-ended':
      return {
        ...currentState,
        currentCashierId: undefined,
      };
    case 'cash-register-snapshoted':
      return {
        ...event.data,
      };
    default:
      // Unexpected event type
      return {
        ...currentState,
      };
  }
}

Even if we don't store the snapshot as the event but into a separate database or cache, we can map it to this structure while reading. Thanks to that, our state aggregation logic will be exactly the same as we don't use snapshots.

Snapshotting-Strategies-2

Reading snapshot from the separate stream

Let's define the simple read events wrapper to map the results and return an error instead of throwing an exception. We'll need that later.

type STREAM_NOT_FOUND = 'STREAM_NOT_FOUND';

async function readFromStream<StreamEvent extends Event>(
  eventStore: EventStoreDBClient,
  streamName: string,
  options?: ReadStreamOptions
): Promise<StreamEvent[] | STREAM_NOT_FOUND> {
  try {
    const events: StreamEvent[] = [];

    for await (const resolvedEvent of eventStore.readStream(
      streamName,
      options
    )) {
      if (!resolvedEvent.event) continue;

      events.push(<StreamEvent>{
        type: resolvedEvent.event!.type,
        data: resolvedEvent.event!.data,
        metadata: resolvedEvent.event?.metadata,
      });
    }

    return events;
  } catch (error) {
    if (error.type == ErrorType.STREAM_NOT_FOUND) {
      return "STREAM_NOT_FOUND";
    }
    throw error;
  }
}

async function readLastEventFromStream<StreamEvent extends Event>(
  eventStore: EventStoreDBClient,
  streamName: string
): Promise<StreamEvent | STREAM_NOT_FOUND> {
  const events = await readFromStream<StreamEvent>(eventStore, streamName, {
    maxCount: 1,
    fromRevision: END,
    direction: 'backwards',
  });

  if (events === 'STREAM_NOT_FOUND') {
    return  'STREAM_NOT_FOUND';
  }

  return events[0];
}

The general logic to read the last snapshot and follow-up events can be defined as:

  1. Read the last snapshot (if it exists).
  2. Read events from the EventStoreDB.
  • If a snapshot exists, read events since the last stream revision of which snapshot was created.
  • Otherwise, read all events.
  1. Return stream events preceded by the snapshot.

Code for that can look like:

async function readEventsFromExternalSnapshot<
  StreamEvent extends Event,
  SnapshotStreamEvent extends SnapshotEvent = StreamEvent & SnapshotEvent
>(
  getLastSnapshot: (
    streamName: string
  ) => Promise<SnapshotStreamEvent | undefined>,
  eventStore: EventStoreDBClient,
  streamName: string
): Promise<{
  events: (StreamEvent | SnapshotStreamEvent)[];
  lastSnapshotRevision?: bigint;
}> {
  const snapshot = await getLastSnapshot(streamName);

  const lastSnapshotRevision = snapshot
    ? BigInt(snapshot.metadata.snapshottedStreamRevision)
    : undefined;

  const streamEvents = await readFromStream<StreamEvent>(
    eventStore,
    streamName,
    {
      fromRevision: lastSnapshotRevision,
    }
  );

  if (streamEvents === 'STREAM_NOT_FOUND') throw 'STREAM_NOT_FOUND';

  const events = snapshot ? [snapshot, ...streamEvents] : streamEvents;

  return {
    events,
    lastSnapshotRevision,
  };
}

We can inject the getLastSnapshot function with any logic of how to get the last snapshot. We can load it from the cache, other database or the other stream.

If we want to read it from a separate stream, then the code can look like:

function addSnapshotPrefix(streamName: string): string {
  return `snapshot-${streamName}`;
}

async function readSnapshotFromSeparateStream<
  SnapshotStreamEvent extends SnapshotEvent
>(
  eventStore: EventStoreDBClient,
  streamName: string
): Promise<SnapshotStreamEvent | undefined> {
  const snapshotStreamName = addSnapshotPrefix(streamName);

  const snapshot = await readLastEventFromStream<SnapshotStreamEvent>(
    eventStore,
    snapshotStreamName
  );

  return snapshot !== 'STREAM_NOT_FOUND' ? snapshot : undefined;
}

We're reading only the last event from the stream, as we're interested in the latest snapshot. If the stream does not exist, we return an empty value instead of throwing an exception. It's a valid case that means that no snapshot was stored yet.

Reading snapshot from the same stream

Snapshotting-Strategies-3

We could also store and read a snapshot in the same stream as regular events. Having that, we won't be reading all stream events but only snapshot and further. To do that, we need to keep the snapshotted revision somewhere. The most convenient way is to put it at the stream level: in the stream metadata. Having that, we can read the last snapshot event position and read events from there.

To read the last snapshot revision code will look like that:

async function readStreamMetadata<
  StreamMetadata extends Record<string, unknown>
>(
  eventStore: EventStoreDBClient,
  streamName: string,
  options?: GetStreamMetadataOptions
): Promise<StreamMetadata | undefined> {
  const result = await eventStore.getStreamMetadata<StreamMetadata>(
    streamName,
    options
  );

  return result.metadata;
}

async function getLastSnapshotRevisionFromStreamMetadata(
  eventStore: EventStoreDBClient,
  streamName: string
): Promise<bigint | undefined> {
  const streamMetadata = await readStreamMetadata<SnapshotMetadata>(
    eventStore,
    streamName
  );

  return streamMetadata
    ? BigInt(streamMetadata.snapshottedStreamRevision)
    : undefined;
}

Based on that, we can build the main read logic:

  1. Read the snapshot event position from the stream metadata.
  2. Return events from the last snapshot position or all if there was no snapshot made.

The code for such logic:

async function readEventsFromSnapshotInTheSameStream<
  StreamEvent extends Event,
  SnapshotStreamEvent extends SnapshotEvent = SnapshotEvent & StreamEvent
>(
  eventStore: EventStoreDBClient,
  streamName: string
): Promise<(StreamEvent | SnapshotStreamEvent)[]> {
  const lastSnapshotRevision = await getLastSnapshotRevisionFromStreamMetadata(
    eventStore,
    streamName
  );

  const events = await readFromStream<StreamEvent>(eventStore, streamName, {
    fromRevision: lastSnapshotRevision,
  });

  if (events === 'STREAM_NOT_FOUND') throw 'STREAM_NOT_FOUND';

  return events;
}

Storing snapshots to the separate stream

We already know how to read events using two methods: external storage and the same stream. Now let's discuss how to store them.

In Event Sourcing, events are logically grouped into streams that are entities' representation. All the state mutations end up persisted as events.

A snapshot should be created after the event is appended. For the external storage, it will mean making an additional call. EventStoreDB operations are atomic on the stream level, so storing a snapshot to a different stream will require a separate call.

Let's define then the simple code for storing the snapshot:

async function appendEventAndExternalSnapshot<
  State extends object = object,
  StreamEvent extends Event = Event,
  SnapshotStreamEvent extends SnapshotEvent = StreamEvent & SnapshotEvent
>(
  tryBuildSnapshot: (
    newEvent: StreamEvent,
    currentState: State,
    newStreamRevision: bigint
  ) => SnapshotStreamEvent | undefined,
  appendSnapshot: (
    snapshot: SnapshotStreamEvent,
    streamName: string,
    lastSnapshotRevision?: bigint
  ) => Promise<AppendResult>,
  eventStore: EventStoreDBClient,
  streamName: string,
  newEvent: StreamEvent,
  currentState: State,
  lastSnapshotRevision?: bigint
): Promise<AppendResult> {
  const appendResult = await appendToStream(eventStore, streamName, [newEvent]);

  const snapshot = tryBuildSnapshot(
    newEvent,
    currentState,
    appendResult.nextExpectedRevision
  );

  if (snapshot) {
    await appendSnapshot(snapshot, streamName, lastSnapshotRevision);
  }

  return appendResult;
}

The function takes the current state, a new event generated by the business logic and the last snapshot revision. When the event was successfully appended, we're trying to build a snapshot.

We're also injecting two functions:

  • tryBuildSnapshot: responsible for constructing snapshot from the event new event and the current state.
  • appendSnapshot: allows injecting custom storage location logic (separate stream, database or cache).

I'll discuss the building strategies in more detail, but let's grasp the main idea first. We're using snapshots to increase the read performance by reading fewer data. However, if we would be doing them after each event, we may downgrade the writes performance. We could, for example, do a snapshot only after set event type. Then we could reduce the amount of additional data and traffic generated by snapshot creation.

For instance, code for building snapshot after each end of the cashier's shift:

function shouldDoSnapshot(newEvent: CashRegisterEvent): boolean {
  return newEvent.type === 'shift-finished';
}

function buildCashierSnapshot(
  currentState: CashRegister,
  newStreamRevision: bigint
): CashRegisterSnapshoted {
  return {
    type: 'cash-register-snapshoted',
    data: currentState,
    metadata: { snapshottedStreamRevision: newStreamRevision.toString() },
  };
}

function tryBuildCashierSnapshot(
  newEvent: CashRegisterEvent,
  currentState: CashRegister,
  newStreamRevision: bigint
): CashRegisterSnapshoted | undefined {
  if (shouldDoSnapshot(newEvent)) return undefined;

  return buildCashierSnapshot(currentState, newStreamRevision);
}

If the snapshot should be created, build it and pass it to the appendSnapshot method. This can be any prefered storage or cache. If we'd like to do it to the separate stream in EventStoreDB, see below:

function appendSnapshotToSeparateStream<
  SnapshotStreamEvent extends SnapshotEvent
>(
  eventStore: EventStoreDBClient,
  snapshot: SnapshotStreamEvent,
  streamName: string,
  lastSnapshotRevision?: bigint
): Promise<AppendResult> {
  const snapshotStreamName = addSnapshotPrefix(streamName);

  if (lastSnapshotRevision === undefined) {
    eventStore.setStreamMetadata(snapshotStreamName, { maxCount: 1 });
  }

  return appendToStream(eventStore, snapshotStreamName, [snapshot]);
}

The logic is simple. We're adding the snapshot prefix to the stream name and append snapshot event.

I also applied an additional optimisation here. Usually, we don't need to keep all snapshots, as we just care about the latest one. We can use the $maxCount stream metadata to ensure that there won't be more events in the stream than the defined threshold. If there is more, EventStoreDB will delete old events. Setting $maxCount to 1 will make sure that there is only one snapshot event.

We want to set the stream metadata only once (when the first snapshot is created). We can verify it using the last snapshot revision. If it's not set, then it means that there was no snapshot before.

Storing snapshots to the same stream

Storing events to the same stream does not require an additional call. Well, almost. It requires a further call to update the stream metadata.

async function appendEventAndSnapshotToTheSameStream<
  State extends object = object,
  StreamEvent extends Event = Event
>(
  tryBuildSnapshot: (
    newEvent: StreamEvent,
    currentState: State
  ) => StreamEvent | undefined,
  eventStore: EventStoreDBClient,
  streamName: string,
  newEvent: StreamEvent,
  currentState: State
): Promise<AppendResult> {
  const snapshot = tryBuildSnapshot(newEvent, currentState);

  const eventsToAppend = snapshot ? [newEvent, snapshot] : [newEvent];

  const appendResult = await appendToStream(
    eventStore,
    streamName,
    eventsToAppend
  );

  const snapshottedStreamRevision = appendResult.nextExpectedRevision.toString();

  await eventStore.setStreamMetadata<SnapshotMetadata>(streamName, {
    snapshottedStreamRevision,
  });

  return appendResult;
}

Accordingly to the external storage example, we're trying to build a snapshot. If the snapshot was made, we're appending it together with the new event. Note, the snapshot should be appended after the event.

Afterwards, we need to update stream metadata with the stream revision. This step may be redundant if we're always appending a snapshot after an event.

The main difference to external storage is: we don't need to store the snapshotted revision in the event metadata, as we're keeping it in the stream metadata.

Separate stream vs the same stream considerations

As was described above, two main strategies for reading and storing snapshots:

  1. Separate storage (different database, cache, but also a separate stream).
  2. The same stream.

Both of them have design implications.

Keeping it separate makes it more vulnerable to transient errors. We need to makes additional calls. Because of that, we have to think about scenarios when one of the calls fail. "Should I revert the previous one?"

Nevertheless, for the snapshots, those considerations might not be critical. What worse can happen if the snapshot wasn't stored? We'll read the whole stream and store it the next time. As mentioned, we should treat snapshots as a technical performance optimisation. Our system should be designed to ensure that it's operational even if the optimisation wasn't applied.

When making a decision, we should also consider the snapshots lifetime. A snapshot's structure tends to change quite often. Each new event type or update to how the event is interpreted may change the snapshot schema. For example, initially, we just kept the transactions count in the cash register snapshot, as it was enough for the business logic. Then a new requirement came, and we'd like to keep the collection of all transactions.

Now we need to reapply the events, and the old snapshot becomes obsolete. If we keep snapshots in the same stream, then we'll need to read all events and ignore snapshots while applying the events. In the worst case, if we were storing a snapshot after each event, this can double the stream size.

In my opinion, this is much more complicated than just storing a single, latest snapshot in a separate stream. If we set the $maxCount metadata on the snapshot stream, we don't need to keep the redundant snapshots. Rebuild is more straightforward, as we just remove the last snapshot stream event.

The same applies to the external storage or cache. Additionally, we'll also need to consider that we're increasing complexity by adding new pieces.

I recommend using a separate stream for snapshots, but you should evaluate the strategy based on your use case.

Storing snapshot during the command handling

The examples I explained above assume that the snapshotting happens together with appending the event. The typical flow of the event sourcing command (request) handling is:

  • read the stream events,
  • rebuild the aggregate/entity state from events,
  • perform the business logic that generates a new event,
  • store the event If we want to store the snapshot in the same processing pipeline, we should add the additional step: storing a snapshot.

As explained in my other article. The process of rebuilding the state based on events is called stream aggregation. The generic method for that can be defined as:

export function aggregateStream<Aggregate, StreamEvent extends Event>(
  events: StreamEvent[],
  when: (
    currentState: Partial<Aggregate>,
    event: StreamEvent
  ) => Partial<Aggregate>,
  check: (state: Partial<Aggregate>) => state is Aggregate
): Aggregate {
  const state = events.reduce<Partial<Aggregate>>(when, {});

  return assertStateIsValid(state, check);
}

function applyEvent<Aggregate, StreamEvent extends Event>(
  currentState: Aggregate,
  event: StreamEvent,
  when: (
    currentState: Partial<Aggregate>,
    event: StreamEvent
  ) => Partial<Aggregate>,
  check: (state: Partial<Aggregate>) => state is Aggregate
): Aggregate {
  return assertStateIsValid(when(currentState, event), check);
}

export function assertStateIsValid<Aggregate>(
  state: Partial<Aggregate>,
  check: (state: Partial<Aggregate>) => state is Aggregate
) {
  if (!check(state)) throw 'Aggregate state is not valid';

  return state;
}

Besides the stream aggregation method, I also defined two others:

  • applyEvent for applying the single event on the current state (to get the new state).
  • assertStateIsValid type check assertion to make sure that state is correct after aggregation.

I'm using the type guards TypeScript mechanism. An example of the type guard can be defined as:

function isNotEmptyString(value: any): boolean {
  return typeof value === 'string' && value.length > 0;
}

function isPositiveNumber(value: any): boolean {
  return typeof value === 'number' && value >= 0;
}

export function isCashRegister(
  cashRegister: any
): cashRegister is CashRegister {
  return (
    cashRegister !== undefined &&
    isNotEmptyString(cashRegister.id) &&
    isPositiveNumber(cashRegister.float) &&
    isNotEmptyString(cashRegister.workstation) &&
    (cashRegister.currentCashierId === undefined ||
      isNotEmptyString(cashRegister.currentCashierId))
  );
}

Even if you're not programming in TypeScript and this syntax looks weird, it's always worth making sure that the rebuild state follows the business rules. It's essential to ensure that we handled all events properly. As the application evolves, our business logic changes and events schema may change. Thus, it is worth having the state check and unit tests to reduce unexpected errors related to the wrong state.


Getting back to snapshotting. Having all of that and defining the example command handling logic for closing the cashier's shift as:

function endShift(
  events: CashRegisterEvent[],
  command: EndShift
): {
  newState: CashRegister;
  newEvent: ShiftEnded;
} {
  const cashRegister = aggregateStream(events, when, isCashRegister);

  if (cashRegister.currentCashierId === undefined) {
    throw 'SHIFT_NOT_STARTED';
  }

  const newEvent: ShiftEnded = {
    type: 'shift-finished',
    data: {
      cashRegisterId: cashRegister.id,
      finishedAt: new Date(),
    },
  };

  return {
    newState: applyEvent(cashRegister, newEvent, when, isCashRegister),
    newEvent,
  };
}

Then the application code for getting and storing events can be defined as:

async function handleEndShift(command: EndShift): Promise<void> {
  const eventStore = EventStoreDBClient.connectionString(
    `esdb://localhost:2113?tls=false`
  );

  const streamName = `cashregister-${command.data.cashRegisterId}`;

  // 1. Read events and snapshot from the separate stream
  const { events, lastSnapshotRevision } = await readCashRegisterEvents(
    eventStore,
    streamName
  );

  // 2. Perform business logic handling the command
  const { newState, newEvent } = endShift(events, command);

  // 3. Append the new event and snapshot
  await storeCashRegister(
    eventStore,
    streamName,
    newEvent,
    newState,
    lastSnapshotRevision
  );
}

If your strategy is to store events in a separate stream, the helper functions can be defined as:

async function readCashRegisterEvents(
  eventStore: EventStoreDBClient,
  streamName: string
) {
  return readEventsFromExternalSnapshot<CashRegisterEvent>(
    (streamName) => readSnapshotFromSeparateStream(eventStore, streamName),
    eventStore,
    streamName
  );
}

async function storeCashRegister(
  eventStore: EventStoreDBClient,
  streamName: string,
  newEvent: ShiftFinished,
  newState: CashRegister,
  lastSnapshotRevision?: bigint
) {
  return appendEventAndExternalSnapshot(
    tryBuildCashierSnapshot,
    (snapshot, streamName, lastSnapshotRevision) =>
      appendSnapshotToSeparateStream(
        eventStore,
        snapshot,
        streamName,
        lastSnapshotRevision
      ),
    eventStore,
    streamName,
    newEvent,
    newState,
    lastSnapshotRevision
  );
}

If we store them in the same stream, the code will look slightly different (as we don't need to pass the last snapshot revision):

async function handleEndShiftSameSnapshotStream(
  command: EndShift
): Promise<void> {
  const eventStore = EventStoreDBClient.connectionString(
    `esdb://localhost:2113?tls=false`
  );

  const streamName = `cashregister-${command.data.cashRegisterId}`;

  // 1. Read events and snapshot from the same stream
  const events = await readCashRegisterEvents(eventStore, streamName);

  // 2. Perform business logic handling the command
  const { newState, newEvent } = endShift(events, command);

  // 3. Append the new event and snapshot
  await storeCashRegister(eventStore, streamName, newEvent, newState);
}

async function readCashRegisterEvents(
  eventStore: EventStoreDBClient,
  streamName: string
) {
  return readEventsFromSnapshotInTheSameStream<CashRegisterEvent>(
    eventStore,
    streamName
  );
}

async function storeCashRegister(
  eventStore: EventStoreDBClient,
  streamName: string,
  newEvent: ShiftFinished,
  newState: CashRegister
) {
  return appendEventAndSnapshotToTheSameStream<CashRegister, CashRegisterEvent>(
    tryBuildCashierSnapshot,
    eventStore,
    streamName,
    newEvent,
    newState
  );
}

It should be clear now that we can hide the implementation detail about the snapshotting strategy by adding wrapper methods. What's more, we can use snapshotting for entities with the most considerable amount of events. For the others, we can use the traditional approach. By utilising the tactic presented above, we can keep the primary code snapshotting agnostic.

Storing snapshot asynchronously with subscriptions

Storing snapshots together in the command handling process seems to be a decent approach. However, like everything, it has pros and cons. The main benefit is that we are sure that we will have a snapshot created after successful processing. This can be useful if we're creating a snapshot after each event.

Yet, as I noted above. This may improve the read events performance but can lead to slowing down writes. Additionally, we're risking transient errors because of the multiple calls. As our intention is to increase performance, we should also consider doing that asynchronously. By that, we won't put the additional effort into the writes.

To do that, we can use subscriptions. EventStoreDB provides the opportunity to subscribe to notifications about new events in either specific stream notifications or the $all stream. The general recommendation is to use $all stream subscriptions (together with a server-side filtering), as they're the most performant. Stream subscriptions are also valid, but having too many of them may also impact database performance. You can also consider using subscriptions to the event type projection stream. The biggest benefit of the EventStoreDB is that they are push-based. That enables event-driven flow and have a positive performance impact (especially comparing to traditional Change Data Capture).

Doing snapshots with a subscription doesn't limit us to a specific pattern. We still can do snapshots to external storage, separate and the same stream. This choice is about when we do it, not how. The general steps needed to perform are:

  1. Start a long-living or background process.
  2. Subscribe to events notifications.
  3. On a new event, check if you need to do a snapshot.
  4. If yes, then do it. Otherwise, skip the event handling.
  5. Wait for the upcoming events, and repeat points 2 and 3 when they appear.

The simplest way to run the long-living process in NodeJS is running the anonymous async block with Promise. We can pass resolve and reject methods to complete promise when task finished or failed.

(async () => {
  return new Promise<void>(async (resolve, reject) => {
    try {
      await subscribeToAll(reject, resolve);
    } catch (error) {
      reject(error);
    }
  });
})();

Having that, we can define a subscription to $all logic as:

  1. Start an EventStoreDB connection.
  2. Read the checkpoint (last processed event position).
  3. Subscribe to $all stream excluding system events (we don't need them for snapshots processing).
  4. Try to do a snapshot when data is received from a subscription.
  5. Store new checkpoint.
  6. When the subscription finishes, complete processing.
  7. When it fails, reject with an error.

The code for that can look like:

async function subscribeToAll(
  reject: (error: any) => void,
  resolve: () => void
) {
  const subscriptionId = 'SnapshottingSubscriptionToAll';

  const eventStore = EventStoreDBClient.connectionString(
    `esdb://localhost:2113?tls=false`
  );

  const lastCheckpoint = await loadCheckPoint(subscriptionId);

  eventStore
    .subscribeToAll({
      fromPosition: lastCheckpoint ?? START,
      filter: excludeSystemEvents(),
    })
    .on('data', async function (resolvedEvent) {
      
      await tryDoSnapshot(eventStore, resolvedEvent);

      await storeCheckpoint(subscriptionId, resolvedEvent.event!.position);
    })
    .on('error', (error) => {
      // 6. End asynchronous process with error
      reject(error);
    })
    // 7. When subscription finished end the process
    .on('close', () => resolve())
    .on('end', () => resolve());
}

Let's skip the considerations about checkpointing now, as subscriptions are a broader topic for another article. For now, assume that you need to store somewhere information (e.g. as the event in EventStoreDB, record in another database) of the last processed event position. That is necessary to be able to resubscribe from the previous position instead of reprocessing all events.

Snapshot creation gets a bit trickier than the command handling style. We don't have the current stream state, we need to retrieve it. The simplest way to do that is to read all events since the last snapshot and aggregate them, for example:

async function snapshotCashRegisterOnSubscription(
  eventStore: EventStoreDBClient,
  resolvedEvent: ResolvedEvent
): Promise<void> {
  const event = {
    type: resolvedEvent.event!.type,
    data: resolvedEvent.event!.data,
    metadata: resolvedEvent.event!.metadata,
  } as CashRegisterEvent;

  if (!shouldDoSnapshot(event)) return;

  const streamName = resolvedEvent.event!.streamId;

  const { events, lastSnapshotRevision } = await readCashRegisterEvents(
    eventStore,
    streamName
  );

  const currentState = aggregateStream(events, when, isCashRegister);

  const snapshot = buildCashierSnapshot(
    currentState,
    resolvedEvent.event!.revision
  );

  await appendSnapshotToSeparateStream(
    eventStore,
    snapshot,
    streamName,
    lastSnapshotRevision
  );
}

We'd like to avoid reading events constantly. So, checking if a snapshot should be made needs to happen first.

Once that's done, and the event should trigger snapshotting, we have to read stream events. We're reusing the known before readCashRegisterEvents method that will load snapshot and following stream events. There is one thing to consider here, subscription handling happens with a delay to the time event was published. Because of that, when we read events, we may also get those that occurred after the handled event. See the image below.

Snapshotting-Strategies-4

If we receive the ShiftFinished event and try to read snapshot and all following events, then we'll also get a ShiftStarted event that happened later. That doesn't have to be an issue if we're assuming that it's just a technical optimisation. We'll just snapshot the latest state. However, if we make some assumptions about what we expect from the snapshot state, this may be an issue.

We should also consider the snapshotting frequency. If we're doing snapshots too often, we may degrade database performance (especially if we subscribe per stream). We'll have a continuous pattern of actions:

  • listen for an event,
  • read events,
  • append new snapshot.

It's essential to run tests with production load and tune the check condition if we store snapshots. It's a trade-offs game.

We can also try strategies like caching the current stream state. We can apply each event from the subscription to keep the current state. Once the event comes that should trigger snapshotting, we can get the current state from the cache instead of reading the events. Beware of keeping too much in the in-memory state. It can (in the edge case) eat all the memory.

Conclusion

Snapshots are a valid pattern but shouldn't be treated as the foundational part of the system architecture. They should be performance optimisation. As with other optimisations, we should do them for the critical business parts, not try to apply them by default.

If we have to use snapshots, we should analyse the specifics of our data and the expected traffic characteristics. Based on that, we can define our strategy. We should verify our hypothesis, make benchmarks and compare result metrics with the expected ones.

Before deciding to use snapshots, we should re-evaluate our stream design. It's essential to make sure that we cannot make our streams short-living (e.g. using the "complete the books" pattern).

Even if we're in urgent need and we have to act tactically, it's worth going back to the drawing board with the business and focus on the modelling. It may appear that we may don't need snapshotting by shaping our domain model differently.


Photo of Oskar Dudycz

Oskar Dudycz Oskar continues to champion Event Store. His focus is on helping to create applications closer to business needs. He believes that Event Sourcing, CQRS, and Event-Driven Design are enablers to achieving that. Oskar is an Open Source contributor, and co-maintainer of Marten Library and always focuses on practical, hands-on development experience. You can check Oskar's blog event-driven.io and follow him on LinkedIn


https://event-driven.io