Articles

How Event Sourcing Can Power Machine Learning

Kaan Can Fidan  |  26 June 2024

In this guest blog, Chief Architect at Vispera, Kaan Can Fidan explains how they've developed a machine learning pipeline using event sourcing with EventStoreDB.

Vispera offers image recognition-based retail execution and tracking services to grocery retailers and suppliers so they can radically improve the efficiency and effectiveness of their field operations. 

Why would anyone create an event-sourced machine learning pipeline?

I consider myself to be lucky to be working at Vispera since its beginnings, and I have been fulfilling the role of Chief Architect for some time. For a little more than 3 years, we have been running an event-sourced machine learning pipeline in production, at millions of images per month scale. As its lead engineer, I wanted to share our experience.

I will first try to justify the motivation behind it, explain how we modeled the domain and then point out where we had to make some compromises to fit our use-case to event sourcing which was designed with e-commerce, finance or healthcare in mind.

Requirements for machine learning at scale

Vispera has a bold vision to provide image recognition solutions to any problem one can encounter in a retail scene. We have started our journey by detecting products in supermarket shelf images to create stock availability, planogram compliance, and shelf share reports. Now, the reports we create are based on fully customized KPI sets that also include - but are not limited to- detecting and reading price tags, detecting branding materials, detecting people, parsing menus, recognizing types of retailer assets…etc. As one can imagine, these tasks involve a lot of machine learning algorithms to be run, and while some utilize common ones, most are fine-tuned against the specific customer’s data to maximize accuracy.

At this point in time (circa 2024), Vispera has about 1000 actively used machine learning models. This means we must:

  1. Perform inference with combinations of these models at a rate of about 100 images per second at peak load.
  2. Quality check their results using human operators, track each of their accuracies in time.
  3. Gather and manually annotate more data for future training sessions.
  4. Clean already annotated data, because human errors exist.
  5. Train new models to increase accuracy and prevent model drift.
  6. Rinse and repeat.

Image processing and inference at scale

This is the first engineering challenge we were focused on. Even before our state-of-the-art event-driven system, we had to separate each image processing requirement (e.g. object detection, recognition, reading…etc.) into its own separate service implementation simply because they required dedicated hardware (GPU for low latency results, vector processing-compatible CPUs, high amounts of memory).

The first architectural design of our system basically included:

  • Monolithic business logic backend with a web API and a relational database as persistence layer
  • Message queues to distribute processing load
  • Image processing services that consume said message queues

 

processing-1

Fig. 1: Simplified monolithic processing architecture

Once we had these implemented, we JUST needed accurate machine learning models to perform inference!

Data gathering, annotation and model quality assurance at scale

This is the expensive aspect of operating supervised machine learning systems as they are very data-hungry and require continuous human labour. Especially in our domain where there are new visuals in product packages every day, the model drift from real world is easily observable. Data gathering and model quality assurance require:

  • Keeping a record of every visual task received from customers with all their intermediate results
  • Implementing an operator interface to mark objects of interest, label them, input text for objects to read…etc.
  • Efficiently distributing annotation tasks to operators
  • Keeping a record of every addition or correction an operator makes on a visual task
  • Extracting annotated data whenever an operator corrects / validates a result
  • Measuring accuracy of each model by comparing its results to operator provided ground truth

At this point, some prefer crowd-sourcing the annotation work and make multiple people annotate the same examples to find consensus between them (think of CAPTCHAs). We went another way and hired our own full-time operators, because our annotation specifications are a bit more complicated than just reading words or finding traffic lights. This adds another requirement on top:

  • Keep track of operator efficiency and accuracy to provide feedback.

To satisfy these needs, we implemented operator and model accuracy tracking, data extraction, operator task distribution responsibilities into our good old monolithic backend. We also had to introduce management APIs to list and filter all past visual tasks and observe their statuses, some commands to interact with them (e.g. re-process with latest models, re-send to operator for cleaning and correction…etc.) and a web UI to go along with it.

annotation

Fig. 2: Simplified machine learning operation architecture

Adding all these transactional and analytical responsibilities to the same relational database had caused scalability concerns. Also, all changes to the visual tasks’ states also required side-effect tables like model accuracies and operator sessions to be updated for consistency. This meant we either had to update side-effects in mission critical paths (higher coupling between concerns and higher latency), or queue messages for them, which retrospectively feels like poor man’s events.

Training models at scale

Machine learning researchers/engineers are very good at writing training scripts, running them to create new models, and measuring the output accuracy against a validation dataset. Making them do these steps by running python train.py for each of those thousands of training sessions can be very expensive in engineering time. Training models should be a one-click trigger that automatically runs a training pipeline and deploys the newly created model to be used in production.

Once your “Train” button is ready, you click it and start waiting for the data to be gathered to feed the hungry GPU. But downloading thousands to millions of images and transforming or cropping them into objects of interest can take a significant amount of time. Also, if you are using the same data in multiple sessions, redoing all that is wasteful and inefficient. Ideally, the data should be ready even before the training is triggered to minimize model lead time.

Once again, this ahead-of-time data extraction needed some messages to be queued and processed behind the scenes whenever an operator generated a supervised result. And once again, it was a transient message that resembled an event that was lost forever once it was handled, like dandelion seeds in the wind.

Where the good old monolith with relational database failed us

Maybe “failure” is a stronger word than what actually happened, but it did start creating friction.

The code and especially the relational data model started to feel like it was being stretched by many requirements, which in turn made implementing new features more difficult, slower, and engineering cost was going up. It was 2019, the microservices hype was as strong as ever, and we were feeling a strong pull to divide and conquer our problems.

The final nail in the coffin of our monolith came as a feature request: the operations team wanted to only annotate some subtasks and hide the other - already high accuracy - ones from the operator to increase efficiency. The problem was some subtasks also affected other ones down the line, so if any one of their results was corrected, the dependent subtasks had to be re-processed. This effectively changed our strategy of first processing the visual task in servers and then filling in the blanks by operators to a ping-pong match between the two. Whenever the backend or the annotation frontend application received a partially processed task, it needed to understand what parts were previously processed and deduce what must be done next. We had essentially started coding logic to infer past events from the last state.

As the implementation of this feature dragged on, I had a nagging feeling that we were on the wrong path. I had read about event-driven architecture and sensed that it was a decent way to decouple our concerns. I even tried to create a proof-of-concept using Apache Kafka (because that’s what I thought was THE event log) but could not wrap my head around how to publish changes to a visual task without looking at an eventually consistent copy and create concurrency problems.

When I was looking for other solutions, I came across a talk by Greg Young titled “Why Event Sourcing?”, and through a YouTube video, he spoke to me directly. Without even knowing about us, he knew what we needed and explained it very clearly.

I understood the idea of folding past events to create last state, perform checks on that state to run a command and publish the resulting event with a version expectation to get optimistic concurrency control. Thinking of all the complexities like monitoring model accuracies and operator efficiency, distributing operator tasks, extracting training data…etc. as queries to read-copy databases enlightened me. I pulled the Docker image for EventStoreDB and never looked back.

The domain model and execution

Vispera image recognition system generates a wide variety of results, which correspond to many different types of visual tasks. Each of these visual task recipes have different sets of associated events. For example, a task event stream that includes price tag reading has events related to price tag detection and reading steps.

For the sake of brevity, I want to highlight one of the simplest possible tasks: product detection. Let’s say the client wants us to list all the available products in the following image:

exampleimage

Fig. 3: Example image request for product detection

The client also specifies what products they are looking for in the task request. This is called a class group and it has a key and a list of classes associated with it. In this case, the class group contains a bunch of different ice cream products.

The desired result looks like the following:

exampleobjects

Fig. 4: Example image with object boundaries marked

Where each of the marked product boundaries has been recognized with the correct class:

exampleclass

Fig. 5: Example recognized class

Event storming

It seems appropriate to simulate an event storming session to analyze what the event stream looks like. So, first things first, here’s the legend for it:

eslegendFig. 6: Event storming legend

Disclaimer: This is of course not the exact model of our system, but a tastefully simplified version that hopefully demonstrates the design principles.

The event stream starts with the client issuing a Create Task command that raises a Task Created event. This event carries all the necessary parameters (task type, class group, image URLs…etc.) to process the task.

es-1

Fig. 7: Create Task command and Task Created event on the Task aggregate

At this point, our Task Executor service kicks in and reacts to the Task Created event and requests object detection (Detection Requested) following the task recipe.

Let’s say this happens during the setup phase of the project, so we do not have an already trained model which can find and recognize client’s products, and hence automatic task execution cannot continue after a Model Not Found event.

es-2

Fig. 8: New project task stream up to Model Not Found event

We can’t continue and generate an automatic result, but we might as well use the image to annotate it for future model training sessions. Task Executor raises a white flag and asks for assistance from a human operator.

es-3

Fig. 9: Assistance requested from a human operator and images are manually annotated

The Assistance Requested event is the point where the task execution is handed over to the Operator Manager service. It reads the event and updates the task’s status in its own read-copy database in MongoDB. The task waits in this state until an operator who is part of this project’s team comes online and asks for a task to be assigned with Assign Task command. Operator Manager finds the task in MongoDB when it hits the operator’s search criteria and raises an Operator Assigned event so that the task’s status is updated in its own read-copy to avoid assigning the same task to multiple people.

Once the task is assigned, the task events are then serialized and sent over wire to the operator annotation frontend. The annotation tool reads the event stream and forms a UI workflow depending on which steps were performed previously. In this case, there is a single object detection annotation page that is shown in figures 4 and 5.

The operator manually draws boundaries around objects of interest (baskets of ice cream products in this case) and finds the correct product label for them. These changes are included in Objects Added, Objects Labeled events and the operator session is sealed with Operator Finished.

es-4Fig. 10: Side-effects of Operator Finished and Task Result Generated

The Operator Finished event is important for several concerns:

  • Operator Manager service updates its own read-copy to mark the operator task as finished
  • Task Executor service checks if there are further subtasks to be done (only product detection in this case, so no) and generates the task result by combining results from previous subtasks and raises Task Result Generated
  • Data Extractor service extracts the annotated example and makes it easily queryable in a Training Data Repository to feed into training GPUs
  • Operator Monitor service populates a PostgreSQL star-schema database that has operator session statistics (how much work has been done and in how much time) to be consumed in a PowerBI dashboard.
  • Engine Monitor service populates another PostgreSQL star-schema database that has trained model statistics (number of true positives, false positives, false negatives) by comparing the automatic result to operator ground truth. In the example above, there was no model yet, but the same operator events are fired when results are corrected by operators during quality checks.

After the result is generated:

  • Our standard way of returning results is posting them to a call-back URL in the client application (Task Reported).
  • Any other result sink integrations can be implemented through listening to the Task Result Generated event.
  • Billing service updates its own yet another star-schema PostgreSQL database to extract billing information at the end of the month.

Once enough annotated images accumulate over many tasks processed by operators, we can train the first model by querying the Training Data Repository read-copy database. After the model is ready and deployed to production, automatic results can now be generated without operator assistance. After the result is generated, human operators are once again called to duty but this time for quality checking.

During quality checks, operators would raise Objects Removed events that include any false positive objects that are removed by the operator and Objects Added for missing (false negative) objects that are added by the operator. Any mistakes in class label recognition are corrected with once again Objects Labeled event. These corrections are then used to evaluate model accuracy.

es-5

Fig. 11: Automatic result generation and quality check request

All these events are also consumed by a service named Task Manager that has a queryable read-copy database (MongoDB) for tasks and their statuses and provides some commands to interact with them.

Although this is a simple enough example with one subtask (product detection), I think it demonstrates enough to stir imagination to understand how it can be extended to also include asset brand recognition, perspective correction for planogram comparisons, panorama generation from multiple images, price tag detection and reading…etc.

Operators and servers can also easily process tasks in ping-pong fashion when necessary, without trying to infer what the other side added on top as it is trivial by just reading the past events.

The final architecture diagram looks like the following:

eventdrivenarch

Fig. 12: Event-driven architecture diagram

Where we had to stretch event sourcing

Our use-case is probably not what Greg Young had in mind when he coined the term “event sourcing”. We had to extend on the existing ideas and discover some uncharted territory.

Fat events

There are two important design principles for event sourcing:

  • Having small, single-responsible events
  • Having short event streams (let’s say less than 100 events)

These two principles make the byte sizes of individual streams smaller so that our applications can read them fast to fold into the last state and stay responsive to commands.

In our domain, some events must either be quite fat, or they must be separated into a thousand smaller pieces. A good example of this is the aforementioned Objects Detected. When the task has many images, and the searched objects are small and numerous in even one image, this event sometimes carries more than a thousand objects with their corresponding boundary coordinates and recognized classes. So, it must either be a thousand Object Detected events, or one fat Objects Detected event that includes all. In this “fat vs many” tradeoff, we chose to go with fat.

To be able to handle such events with lower max event body size and hence with fast traffic between services, we chose to encode the event body using Protocol Buffers. In our experiments, protobuf events were 25% the size of their JSON counterparts on average. On the flip side, this unfortunately disabled the EventStoreDB integrated queries as they only understand JSON, but we put extra queryable fields in the metadata of the event and use those when we need to query directly on EventStoreDB. Protobuf is also quite nice to generate data structures for different languages, so it has a side-benefit for polyglot systems.

Single command, many events

This may not be a stretch, but it certainly is different than usual CRUD-like examples. The usual event sourcing examples are in the realm of Add User command resulting in a single User Added event, or Change Shipping Address command resulting in Shipping Address Changed.

In our case, 1 CreateTask command is usually correlated with ~18 events on average. When we first started modeling the domain, this discrepancy made us think that we must have been doing something wrong. Eventually, we made peace with it and now think it’s just that our domain is more background-process-heavy.

Single event-sourced aggregate type

We only have one aggregate type in EventStoreDB, which is the Task. Because it made sense to us to let only one thread change the task state at any given time.

We had a few options for others:

Models

Another aggregate is the Model, which is the trained machine learning model, but they are segregated to a subsystem with a CRUD database that is not event-sourced. We could have Model streams that were projected into the search database, but it seemed like it achieved the same effect with extra steps.

Class groups

Many tasks share the same class group definition, and currently these duplicate definitions are included in Task Created events. We could have a Class Group aggregate type and track class group changes in their own streams. There are even some parts of the system where the last state of each class group is projected from the last corresponding task which is not ideal.

Changing the class group definition has weird effects when a task is being processed (e.g. what happens if the operator had labeled an object using a class that is no longer part of the class group). We can still go back on this design decision and introduce it, but for now we think that the benefits do not outweigh potential complications.

Subtasks

We could have subtasks as separate aggregate types to allow independent steps to be processed in parallel. For example, we could have ProductDetection and TagReading streams and refer to those from the main Task stream. The synchronization at the end to generate the task result is a bit more complicated in this case. We thought the added complication would not be worth it as the task throughput of the system would not change significantly but single tasks would be processed with lower latency. We chose to go with higher but negligible latency with less overall complication.

Summarized commands

Our annotation UI is designed to work with intermittent connectivity, so it does not try to send all individual changes to the servers one-by-one. It keeps all the changes locally in a stack (coincidentally the undo stack) and sends them to the server in bulk.

I have previously glossed over the annotation related commands and events like Add Objects command and Objects Added event. But when you think about it in detail, the operators can only draw a single object boundary using their mouse and issue a single Add Object command at a time. As there can be a thousand objects in a task result, this means they issue at least a thousand commands for them including removed and redrawn ones. Since we had already established we do not want a thousand events in our task streams, we have to somehow summarize all similar changes in an operator session to singular commands (many Add Object commands become single Add Objects).

The Session Summarizer implementation needs to:

  1. Join the same commands together
  2. Eliminate any commands that counteract each other (Add Object and Remove Object for the same object).

This is the most complicated piece of code we have in our system by far and has been a fruitful breeding grounds for bugs. We have tried to come up with different designs, but even after 4 years, it lives on as a necessary evil.

Different cardinalities on read models

Having a read model for tasks themselves is very straight-forward. For example, the Task Summary model that the Task Manager populates into its own database is a selected subset of fields and some calculations on them.

The not-so-straight-forward projections happen when the relationship of the read model is not one-to-one with the aggregate. For example:

  • Each task can have multiple operator sessions, which means the Operator Monitor populates a table with these sessions that has one-to-many relationship with the tasks. This is not a huge problem as many session rows can be inserted in 1 transaction with a concurrency check on the tasks table (ACID 🎉).
  • When a task is annotated, multiple image documents are extracted as training data to MongoDB. So, 1 event may touch multiple documents that cannot be inserted, updated or deleted in an atomic fashion. We have a separate collection for task versions for concurrency checks and update it if all the changes are reflected in full. This in turn means that in some edge cases, we can have partially updated training data until the event processing is retried and succeeds (a more granular eventual consistency). We choose to live with it. 🧿
  • As previously mentioned, class groups have the other kind of many-to-one relationship with tasks. We currently live with projecting the last task’s definition and use projection stream’s ($ce-Task) link event number for concurrency checks.

Read-copies where version checks are not possible

Guaranteeing eventual consistency of read-copies with the event stream requires the read-copy updates to have preconditions on event versions to avoid event ordering and redelivery related problems. That being said, sometimes it’s not possible to specify a precondition on the update because the technology used for the read-copy does not allow it.

A concrete example in our case is the Training Data Repository. The annotated images have a MongoDB document for search and an actual JPEG file that is written as a file to storage. The storage API we use does not have any version precondition for updates, so we had a few options:

  • Write separate files for each version. This is infeasibly expensive in our case because the data size gets multiplied quite fast.
  • Write the JPEG bytes into MongoDB document itself and not use separate storage. This is once again very expensive.
  • Have a version check around both the document and storage updates using a separate version document. This means there could be edge cases where the document is not consistent with the stored file. This is the cheapest option.

Yes, we went with the cheapest option and live with it. 🧿

Where we had to stretch EventStoreDB

We had to accept some bitter trade-offs with EventStoreDB, as it was not necessarily designed with our use-case in mind.

Overuse of persistent subscriptions

Our system has to have competing consumers on almost all previously mentioned services, as single-threaded catch-up subscriptions lag behind significantly. This means that although we are using a 3-node highly-available EventStoreDB cluster, the system shuts down completely when the cluster leader goes down and waits until a new leader is elected (hopefully in seconds). This is a single point-of-failure and I wish that each persistent subscription had a leader of its own and a ready-to-go replica (similar to RabbitMQ highly-available queues) so a node going down would have a smaller blast radius.

The cost of not archiving streams

We have yet to delete any streams from the past, and our storage and backup bills have been ever-growing. It is very hard to pinpoint the cost of losing old data for us as we reuse old annotations for new projects, so an automatic archive to cheaper storage feature would be greatly appreciated.

Proto event queries

I have previously mentioned that we had to forgo the ability to query our event data on the EventStoreDB server because we could not use JSON event bodies. JSON metadata is often useful, but of course is not the same as querying the whole data. When we have to list streams that fit certain criteria in the events, usually we have to write a script that reads the events and filters them on the client-side. Although this is still doable, it is certainly much slower.

Conclusion

We have been quite happy with the way things worked out after the event sourcing transition. Of course, we had to accept and live with some trade-offs but overall net gain in engineering cost is irrefutable.

If machine learning is core business for you, in such a way that you need to implement the whole pipeline yourself, you should consider if event sourcing would also work for you. If machine learning is not your core business, I think you should just buy an off-the-shelf solution for it (e.g. buy Vispera services if you are in retail) and avoid the whole headache. 😊

If you have nothing to do with machine learning but are trying to apply event sourcing outside the usual example domains, I hope this post encourages you and guides you in some way.


Photo of Kaan Can Fidan

Kaan Can Fidan is the Chief Architect at Vispera which was founded in 2014 and is focused on helping large grocers execute excellent displays in their stores and franchises. They use image processing and machine learning to make better processes for their clients to monitor stock levels, staff productivity and analyze business performance. Their clients include multinational names such as Unilever, Coca-Cola, Carlsberg, Danone, and many more.


https://vispera.co