Distributed Event Store with NATS-Streaming
Event-sourcing is gold. That’s true in many ways but it also costs me a lot of headaches to get on the right road. If you don’t know what Event-Sourcing is here is a brief introduction:
With Event-Sourcing we want to make our system 100% deterministic, this means that all business-relevant changes e.g.
are tracked in an immutable Event Store. In the end, you will get a free time-machine because if you want to know the “current” state of a customer order “Order(ID=1)” you only need to accumulate all order events related to “Order(ID=1)”.
If you have never heard from Event-Sourcing, I highly recommend my collection:
- An In-Depth Look at Event Sourcing With CQRS (https://www.youtube.com/watch?v=EqpalkqJD8M)
- Go Back to the Future with Event Sourcing and CQRS (https://www.youtube.com/watch?v=iGt0DBOWDTs)
- Stream Processing, Event Sourcing, Reactive, CEP Explained (https://www.confluent.io/blog/making-sense-of-stream-processing/)
- Event-Soucing Implementation in Ruby with excellent documentation (https://github.com/envato/event_sourcery#core-concepts)
In this post, I would like to present a solution that allows you to bootstrap a language-agnostic distributed Event Store in only a few steps. I’m not going to get into much detail of how event sourcing is implemented. For that purpose, you can see here a reference implementation in go.
The core component NATS-Streaming
NATS-Streaming (NATS) is a highly-available and durable message queue with an unbeatable combination of simplicity and messaging features:
- Lightweight, written in Go
- Single binary, zero runtime dependencies
- Ordered, log-based persistence
- At-least-once delivery model
- Automatic subscriber offset tracking
- Support to replay messages in a stream
- High availability cluster
You can run it in-memory or use flat-files, SQL storage like Postgres, MySQL to persist your messages. In that post, we will use the SQL store.
In the next chapter, I will describe why NATS perfectly fits into building an Event Store.
Issue a command
In order to produce events, a command is issued to express our intention like “CreateOrder”. This is highly simplified.
NATS Streaming as the source of truth
When a message arrives NATS-Streaming it will be persisted in the SQL cluster. At this point, the Events can no longer be removed. An Event Store is immutable.
Already in this step, we can run into concurrency issues when two commands are executed concurrently on the same aggregate. NATS doesn’t protect us from that. Before events are published, we have to determine the next aggregate version of the “Order” entity to check if we operate on the latest state. In that case, we automatically retry the command with a backoff strategy. That operation is idempotent (safe to retry). This is called optimistic locking.
Another solution is to make commands 100% synchronous and run only one instance of the event store. This would eliminate the concurrency issue completely but comes with a performance penalty. You could improve this situation by creating a worker queue per aggregate-id. That queue would allow running multiple commands in parallel but aggregates with the same aggregate-id in order.
Order of events
The order of events is business-critical. We have to ensure that concurrent updates won’t result in a broken “Order” state. Imagine, the event “OrderCanceled” would proceed before “OrderStatusUpdated”. The order would be unintentionally canceled.
NATS guarantee that all messages are delivered at-least-once and in the order, they were published. This is not the case when messages are redelivered due to an error or even when the client ACK-request got lost in transit:
- Publish (1,2,3,4,5) | Subscriber ACK’s (1,3,4,5) | Error (2)
- Publish (6,7,8) | Subscribe ACK’s (6,2,7,8)
In the scenario above the message with sequence-id “2" is redelivered and will not appear in the expected order when NATS redelivers it.
To ensure that events are redelivered in the same order we have to rate-limit the queue to handle only one event at a time. It’s similar to Kafka. In the NATS client, we can enforce strict order with:
This means the message handler has “30sec” to finish its work before the message is redelivered. The message must be manually acknowledged and the subscription state is tracked for the client even when the connection is closed.
While we can ensure strict ordering, we still deal with at-least-once semantic. The same message can arrive multiple times. We must check if an event was already processed. In that case, we can acknowledge it as done and ignore it.
For every processed event, we store the “last_processed_event_id” in the database. If the operation crash or the message ACK could not be delivered the event is redelivered.
If the “ProcessingEvent()” can’t be executed due to an error the message is not acknowledged and processed a second time. This is the flaw of a two-step process and we are trying to compensate it by another system that guarantees ACID semantics.
Proxy events to Event Handlers
To distribute events that were previously published to “eventstore.events” we create a single subscription that subscribes to “eventstore.events”. This subscription is responsible to republish all events to the projector/reactor subscriptions. The subscriber has previously declared its interest in some of the event types. The publishing procedure is asynchronous. We only ensure events are published to NATS. If an error occurs during the process the event is redelivered to all subscribers. Every service is responsible for de-dupe events.
The great strength of managing an event store (single source of truth) is that projectors and reactors can be rebuilt every time.
In order to replay events, we only need to start the service aka subscription “eventstore.events” with a different subscription option to define which events are replayed. In the subscription handler, we will call the order projection handler to apply it to each event. To not pollute other subscription states, we need to ensure that a NATS connection is created for every subscription. Every service will have different Client-ID. In that way, NATS can track the state of a client when consuming historical events.
How events are replayed can be defined in several ways. All options are supported natively by NATS-Streaming:
or even all available.
NATS is scalable in terms of high-availability. We can combine multiple servers into a cluster. With the help of queue-groups, we can load balance traffic across multiple subscribers of the same subscription in NATS. This is only applicable for services where the order of events is negligible because every member of the queue-group retains the right to proceed with one event at a time.
Event store storage
We modified the NATS-Streaming SQL-Store implementation. The fields like aggregate_id, event_data, event_type, event_version are extracted from the protobuf message payload and added to the SQL transaction. The changes can be seen here. In our application, we will handle the NATS “messages” table as the Event Store. The SQL schema of NATS-Streaming is provided in the repository here.
NATS-Streaming store limits
Using NATS-Streaming as an event store requires a special configuration:
Really important is that we remove the limit of “maxMessages” and “maxInactivity” of the event store channel. Otherwise, we will lose events over time. If you use the SQL storage you must disable caching to ensure that events are flushed immediately to the database. For efficiency reasons, you can set specific limits for the channels of your services.
I have never worked with Event-Sourcing before but I’m really excited to learn more about it. You will get in touch with many interesting problems in distributed systems. Here are some other notes you should definitely consider:
- Publishing messages to NATS can fail. Run NATS in a cluster with at least 3 Nodes.
- Run a high available SQL database like Cockroachdb.
- There are many NATS client libraries. The code examples can be adopted in almost any language.
- Log everything, a distributed system is hard enough and lack of context is your death.
- Event-Sourcing embraces CQRS.
- If you have to deal with a high amount of writes instead of reads you should rethink your conflict strategy. https://www.continuousimprover.com/2017/02/the-good-of-event-sourcing-conflict.html
- Handle failures gracefully using retries, message re-delivery, locking, and two-phase commits
With that bracket in your hand, you can build a solid foundation to experiment with Event-Sourcing in your next project.