Data ingestion with Logstash and EventStore

Tutorials
,

In this post our challenge is to load a CSV file and ingest it using the EventStoreDB HTTP API to be ingested.

To be precise, we want to convert this:

Europe,Italy,Clothes,Online,M,12/17/2013,278155219,1/10/2014,1165,109.28,35.84,127311.20,41753.60,85557.60

To this (which is an example of an HTTP POST to the Event Store HTTP API):

[
    {
        "eventId": "fbf4b1a1-b4a3-4dfe-a01f-ec52c34e16e4",
        "eventType": "InboundDataReceived",
        "data": {
            "message": "Europe,Italy,Clothes,Online,M,12/17/2013,278155219,1/10/2014,1165,109.28,35.84,127311.20,41753.60,85557.60"
        },
        "metadata": {
            "host": "box-1",
            "path": "/usr/data/sales.csv"
        }
    }
]

In this example we set several different parts of the HTTP POST, including a unique eventId, and enrich each message with metadata (like the host and the file path).

This data will be invaluable later for finding the specific mapping for that particular data source and to keep track of the origin.

enter image description here

The API Gateway is our internal data collection endpoint. In my opinion, it must be a simple HTTP Endpoint. There is no need for a heavyweight framework that can potentially add complexity and little value.

In this case, we are using the existing Event Store HTTP API as the API Gateway, and Logstash to monitor folders and convert the CSV lines into JSON Messages.

Logstash is a powerful tool that can keep track of the last ingested position in case something goes wrong and/or the file changes.

There are other tools available for this task, such as Telegraf, but I personally have more experience using Logstash.

Once that the messages are flowing through the API Gateway, we can then implement Domain Components or Microservices that subscribe to these messages and provide validation, mapping, and Domain business logic. The results will then be saved in their own data streams as Domain Events.

To avoid repeatedly the data in the inbound stream, you can also set an optional retention period after which the messages are automatically removed from the system.

Try it yourself:

  1. Download Logstash and Event Store
  2. Create a folder called “inbound” and copy/paste in a new logstash.yml file (here’s my gist for reference/inspiration: https://gist.github.com/riccardone/18c176dab737631c9a216e89b79acdb7)
  3. If you are on a Windows computer, create a logstash.bat file so you can easily run logstash. Its contents will depend on your file paths but in my case it’s: C:\services\logstash-6.2.4\bin\logstash -f C:\inbound\logstash.yml
  4. Then, from a command line, run logstash
  5. Create a subdirectory inbound/ e.g. c:\\inbound\\RdnSoftware
  6. Copy a CSV file into the folder and let the ingestion begin

With this Logstash input configuration, when you have a new client you can just add another folder and start ingesting the data. For production environments, you can combine Logstash with Filebeat to distribute part of the ingestion workload to separate boxes.

In some scenarios - especially when the data has not yet been discovered and you don’t have a mapper - it will be easier to direct the data straight into a semi-structured data lake for further analysis.

Hope that you enjoyed my little data ingestion example.

Here’s the 15-minute webinar if you’d like to watch me go through everything above and find out a little more: https://youtu.be/2nyzvrIdnPg


Related posts

Playing with Projections

Tutorials
,

Projections are common concept in Event Sourcing that allow you to create queries of your events and streams of events. Last year I attended Michel Grootjans’s “playing with projections” workshop that taught attendees how to create projections in a variety of programming languages. I decided to convert the workshop exercises to use Event Store’s internal projections engine and show how to use our projections API. The data set is player interactions with a quiz. Visit...


Easier Development Builds on Unix-like Operating Systems

Tutorials
,

A common complaint we’ve heard about Event Store since it was originally open sourced is that it’s complex to make development builds locally - and they were slow - and thus the project was hard to contribute to. As part of our recent work on Event Store v3.1.0 (more on this in another post) we decided to rectify this. Event Store is primarily written in C#, and builds on Mono on Linux and MacOS and...


Catch-up subscriptions with the Event Store

Tutorials
,

We have had many questions about how to use the catch-up subscriptions in the C# Event Store Client API. In this post I’ll build out a simple example of a console application which receives a callback each time an event is written to the built-in statistics stream. Contrary to what some have thought, catch-up subscriptions are implemented as part of the client, and have been available in the client API version 1.1.0 which has been...