Event Sourcing is Awesome!

I wanted to share a simple and powerful example of why I think event sourcing is great. In doing so, I realized that we never described event sourcing on this blog or how we use it. I will describe the general concept of event sourcing, how we employ it at Jet, and close with my real-world example that led to my writing this article.

What is Event Sourcing?

It is a pattern for documenting the state of your system. As people or machines affect changes on the system, the events that occurred are individually logged to the event stream. Events are written in the past-tense, with the name of the event as a descriptive identifier of what happened. The payload contains the details of the event or history. With these two premises, it becomes possible to easily audit the state of the system based on the events that occurred or by a lack of events.

Let’s take an example of an order that is ready for shipment. Order number 123456789 just received the AwaitingShipment event, meaning that all of the order’s items were packed in a box, scanned and are sitting on a conveyor belt waiting to be shipped. The event is added to the CustomerOrder stream, which includes the order number as an identifier.

Stream – CustomerOrder-123456789
Event Name – AwaitingShipment
Payload –
{
    "scanned_on" : "2016-08-08T20:05:00.0000000Z"
    "scanned_by" : "thor.odinson@jet.com"
    "dock" : "12"
}

From the event, it is possible to know a few things. Order number 123456789 is now awaiting shipment, it is sitting on dock 12, and it was scanned by Thor Odinson on August 8 at 8 PM UTC. A few hours later, the shipment is scanned and the carrier truck departs. This generates the OrderShipped event.

Stream – CustomerOrder-123456789
Event Name – OrderShipped
Payload –
{
    "fulfillment_center" : "NJ1"
    "carrier" : "fedex"
    "tracking_number" : "abc123"
    "shipped_to" : "123 Main St., Hoboken, NJ, 07030"
    "shipped_on" : "2016-08-08T22:10:00.0000000Z"
}

Now that we know that the order shipped, we can let the customer know that his or her order is on its way. We send an email to the customer and save an event that the email was sent.

Order Stream – Order-123456789
Event Name – ShipmentEmailSent
Payload – 
{
    "order_id" : "123456789"
    "total" : "100.00"
    "customer_email" : "awesome.user@jet.com"
    "subject" : "Your Jet.com order shipped!"
    "message" : "Your order is on its way!!!"
}

We now have a good history of the status of an order that shipped as well as our communication with the customer. It is also possible to recreate the status of the order as an aggregate of all of the events that occurred on the stream. We read the events in-order as they were written to the stream and apply the events to the aggregate.

Domain Events are added to a stream and aggregated into a business-level view
Figure 1 – Example of events getting added to the event stream and then aggregated into a view
// The aggregated view of shipment status for the order
type ShipmentStatus = {
    OrderId : string
    IsShipped : bool
    ShippedOn : DateTime
    ShipmentEmailSent : bool
} with
    static member zero = {
        OrderId = null
        IsShipped = false
        ShippedOn = DateTime(1776, 7, 4, 0, 0, 0)
        ShipmentEmailSent = false
    }

The picture above shows how events for a CustomerOrder are written to the CustomerOrder stream. The events contain only the information needed to describe the event history. As the example shows, we only need to know where and when the order was delivered. The aggregate is a view over a set of events. Notice how in this example we built a ShipmentStatus aggregate, and not a CustomerOrder aggregate? The aggregate is just a view into the stream data. It just needs to fit the purpose of the application requesting for it.

So, how do we build the aggregate? We use an apply function to create the aggregated view. The apply function adds the events in the CustomerOrder stream in-order and applies the result to our ShipmentStatus aggregate. This is very similar to how a general ledger works. In a general ledger, financial deposits and withdrawals are counted up from the opening of the account to the most recent entry. New entries can override old entries (e.g. adding another line item for a missing penny or subtracting an amount in case of an earlier accounting error).

// our event envelope that we map to when reading from our event store
type DomainEvent = { Name : string ; Payload : byte [] }

let apply (state:ShipmentStatus) (event:DomainEvent) = async {
    // deserialize the payload based on the name of the event
    match event.Name with
    | "AwaitingShipment" ->
        return state
    | "OrderShipped" ->
        let os = deserializeFromBytesT event.Payload
        return { state with IsShipped = true; ShippedOn = os.shipped_on }
    | "ShipmentEmailSent" ->
        return { state with ShipmentEmailSent = true }
    | _ -> return state // ignore any other events
}

let getShippingStatus orderId = async {
    let stream = "CustomerOrder-" + orderId
    let! events = readStreamFromEventStore stream
    let orderStatus = 
        Seq.fold (fun shipStatus event -> apply shipStatus event)
        <| { ShipmentStatus.zero with OrderId = orderId }
        <| events
}

To get the ShipmentStatus for an order, we read the event stream from the event store, which returns a sequence of events. We then fold over each event, starting with an initial (zero) ShipmentStatus object. The apply function takes the event, applies the details of the event to the ShipmentStatus state object, and returns the updated state. The result of the fold function is the final ShipmentStatus object.

Event sourcing allows us to have a detailed history of the actions that occurred on our systems. It is possible to build the current state of a stream by using fold and its variant, apply. From an auditing perspective it is very powerful in that all transactions contain detailed history. Another benefit to event sourcing is that we can replay the events over a period of time to recreate the state of the system up to that point in time. This means that it is possible to replay a bug in the system, which is very helpful in tracking down errors in distributed systems.

How We Use It

At Jet, we use event sourcing as our main log of truth for historical transactions. Most teams use Event Store to store the events that are generated by their systems. The Event Store log is an append-only log. All events that were written to the log are immutable, and changes to a stream are handled by appending new events to the log that contains an updated state for the stream.

CustomerOrder stream details
Figure 2 – CustomerOrder stream on Event Store

Events are written to streams, which are collections of correlated events. We think of streams as a specific domain type (e.g. Customer Order), and the events that are applied to the stream are the Domain Events. The stream name has the syntax of StreamName-{Id}, where in this case, the Id is the ID of the customer order. The hyphen in the stream name is a special character that Event Store uses to split the names of the streams for its projections. I’ll discuss projections later in this post. In Figure 2, above, the CustomerOrder stream for order Id 15150a…0369c has 17 events that describe the state of the order. Clicking on the JSON for the event would show the actual event details. As you can see, we try to be clear and explicit with our event names. It makes it easier to read and understand.

Projections

Projections are a feature of Event Store that generate new streams based on existing streams. There are two projections that we use frequently at Jet: the category projection and the event topic projection. The category projection is shown below in Figure 3. It is a projection of all of the events that occur for all streams in a given category. In the example, all events in the CustomerOrder stream are projected onto the $ce-CustomerOrder stream. This is useful when we need to know all of the events happening across all customer orders.

CustomerOrder category stream
Figure 3 – Category stream in Event Store

The event topic projection is like a filter on the category projection. It is a projection of all events of a specific type found in a category stream. In Figure 4 below, the OrderCreated event is projected onto the $et-OrderCreated stream. This type of stream is useful when we want to know when a certain event is added to the log. Consider the AwaitingShipment event in the earlier example. We might want to create a microservice that subscribes to the $et-AwaitingShipment stream and triggers the “Ship package to customer” workflow.

OrderCreated events projected onto the $et-OrderCreated stream
Figure 4 – Event Topic stream in Event Store

Checkpoints

Our microservices can subscribe to these projection streams and react to the events that are appended to the stream. Sometimes the microservice can fail, or the VM where the microservice lives is rebooted for maintenance. To avoid losing place of the last read, we write the number of the last read and processed event to a checkpoint stream once we complete a full decode -> handle -> interpret pipeline for the event message.

The checkpoint stream is just the position of the last read event. If you look at Figure 4 above, you’ll see the first column contains the event number. That is the number that is tracked and written to the checkpoint stream for the microservice.

When a microservice subscribes to the stream, using checkpoints, it creates a new checkpoint stream for itself. The checkpoint stream is explicitly managed by our client as a service reads and processes events from the stream. The checkpoint streams also follow a convention that we set, where they are named according to the stream name and the microservice name: StreamName_MicrosreviceName_checkpoints. Consider a microservice named EmailOnOrderCreated that listens to the $et-OrderCreated stream. The microservice would subscribe to the stream starting with the last position found in the etOrderCreated_EmailOnOrderCreated_checkpoints stream.

Stream – etOrderCreated_EmailOnOrderRouted_checkpoints
Event Name – checkpoint
Payload – { "pos" : 90170 }

Idempotent Writes

We are very conscious at Jet to make sure that our microservices are idempotent, meaning that for a given input the microservice produces the same output, and any side-effects only happen once. For instance, it would be very confusing for a customer if we sent a confirmation email for an order that was previously fulfilled. Event Store provides features for optimistic concurrency control, that enable one level of idempotency.

When appending the first event in a new stream, we can set the expected version for the write in the client as ExpectedVersion.NoStream. If the stream already exists, then the event is already present in the stream, and a WrongExpectedVersionException is thrown. We could log the fact that the event already occurred or continue processing the next set of commands. Where this would be used is when we consistently create a stream with the same event type, e.g. OrderCreated in stream CustomerOrder-{Id}.

Another variant of the expected version uses the event stream number for its concurrency check. If we know the last event number that was written to the stream, and we try to write a new event with a number that is either before or after that number, then Event Store will throw a WrongExpectedVersionException. We might use this approach if we had multiple instances of a microservice all trying to write to the same stream.

The last feature that we can use from Event Store to ensure idempotency is to use the EventData object itself. The EventData object contains the payload that is appended to the stream. It contains an event ID, along with the name and payload of the event in Event Store. When appending the EventData object to a stream, Event Store checks the EventId field. If the event ID matches a previously written event ID, Event Store will simply ignore the write request, returning the next expected write position in the stream. We just have to make sure that the ExpectedVersion is set to something other than ExpectedVersion.Any, as that flag disables all optimistic concurrency checks in Event Store.

type EventData = {
    EventId  : Guid        // unique identifier of the event
    Type     : string      // the name of the event (e.g. OrderCreated)
    IsJson   : bool        // true if the data is a json-serialized object
    Data     : byte []     // byte array of the event payload
    MetaData : byte []     // byte array of any extra metadata for the event
}

Teams at Jet typically use a message queue, like Kafka, to communicate between microservices. Kafka guarantees at-least once delivery of messages, which means that a microservice can receive the same message one or more times. If the GUID of the command from the message queue is used for the EventId that we write to Event Store, then the microservice is guarded from duplicate writes for the same command. This feature also works well when replaying commands from the message queue. However, it only works if the commands that created events are still present in the queue. Our Kafka instance stores commands for a few days. If we had to recreate the commands, they would likely create new unique identifiers for those commands, which would in turn create new events on the event stream.

The final strategy is to check against the domain state. Basically, we read the event stream and get the aggregate for the stream. If the event is already applied to the aggregate, then we ignore the command and any side-effects it might create. Reading the stream and getting the latest version of the aggregate can be slow. In cases where it is critical to know the exact state of the stream, we would incur the time it takes to read the stream and generate the aggregate. Otherwise, we can read from a cached state that is stored in ElasticSearch, DocumentDB, or even Redis. And, yes, we would create a separate microservice whose sole purpose is to keep the cache up-to-date.

Real-World Story

We are finally at the point where I can share my simple story! Most teams create a snapshot, or projection, of their event store database into a SQL database. It is akin to making a View or ViewModel of the system state. We do that to make it easy to run business queries against our data as well as to connect with third-party data tools. I was on-call when one of the projection databases for our team showed a very noticeable inconsistency with the related production event store database. We found that the microservice that updates the projection failed to write to the database after a finite number of attempts. We also didn’t have an alert on the failure so it went unnoticed.

When it was pointed out to me the database was last known to be in sync 8 days prior, my heart sank. When was the last time that you had a SQL database that had 8 days of mismatched data and it was an easy fix to correct? This is where event sourcing and our microservices came to the rescue!

Amazing thing #1: Since we have a culture of microservices, there was only one place I had to go to disable writes to the projected database: the projection microservice.

Amazing thing #2: Azure SQL premium servers can be restored from any point in time for up to 35 days! I created a restore from the last known good point in time, which created a new database for me in Azure.

Amazing thing #3: Once the new database was available, I was able to look into the event log and quickly correlate which event was the last event that was projected out to the database.

Amazing thing #4: I was able to set the projection microservice’s checkpoint to the event number of the last known projected event. Then all I had to do was re-enable the projection microservice, pointing all writes to the restored database.

Amazing thing #5: Another cultural edict is that we write idempotent microservices. If an action was already taken, then we don’t repeat it. Even if I set the checkpoint too far back, I was confident that only new data would be written to the projection database.

The outcome: In a matter of 20 minutes the projection microservice read the event log, processing a little over 50k events. At the end of those 20 minutes, both production values and the projection database were in perfect harmony. Amazing.

Many thanks to Jeremy Kimball, Scott Havens, Krishna Vagapandu, Troy Kershaw, Skanda Mohan and all of the other amazing Jet engineers that introduced me to event sourcing and how we employ it at Jet.

24 comments

  1. Hi,

    Thank you very much for this. Helps me wrap my head around ES. and seriously your explanation is the BEST ever.

    Thank you

  2. Question:
    You mention that GES allows different microservices to subscribe to the same stream or a projection (which is a partition and/or aggregation of events). I’m assuming that these microservices are all closely related to each other. So in many ways GES can function as a message bus too it seems (especially with the scavenge function).

    In another blog post jet talks about using kafka for communicating between microservices (https://tech.jet.com/blog/2016/08-17-visual-learning-kafka-problem-solves-got/).

    I would like to know how/why you choose to use kafka for some things and ES for others (other than for long term persistence available in GES), etc.

    1. karl, to quickly answer your question, we are investigating the use of kafka for inter-service messaging. In this way we are able to separate our concerns between our internal database structures (event store), and the contracts that we expose (kafka) to other teams.

      As for the historical reasoning, we initially built our systems with the concepts that I outlined in the article above. That’s why I talk about consuming the streams as if they were a message queue. However, we find that as we grow as a company, we need to handle more consumers of our streams and we want to better handle failure, which includes running event store in a cluster. Since we don’t run projections in a clustered configuration, we can’t get the operational safetly that we want. Kafka, is a distributed service by nature, and it can handle multiple producers and consumers effortlessly. For that reason, we are interested in the idea of publishing certain events to Kafka for intra-service and inter-service communcation. And when I say intra-, I mean internal to the team/project vs. inter- which I consider to be across teams or across projects.

      We also have an interest in Kafka for the way that we think of microservices. Some teams use Kafka to send commands to their microservices to perform a function. The side effect of that function is logged in event store, and the output might be another command to “do the next thing”. So instead of reacting to an event, the service can react to a command.

      I hope that explains why we’re writing about kafka and event store.

      1. Thanks for the great article. Interesting how you combine the use of Event Store and Kafka, taking advantage of their respective different capabilities.

  3. Thanks for an awesome blog post! Really in depth and easy to digest and great to hear about your real world success stories.

  4. How do you map streams (name+id) to kafka topics?
    * one on one?
    * or stream name = kafka topic name, and id = kafka key?
    * or … ?

    1. We treat Kafka a little more like an event store projection. The topics store specific events, or group of events, that we are interested in publishing both intra-service (internal project) and inter-service (between teams). So we typically name our topics “teamname”-“eventname”.

      If we wanted to project the “OrderShipped” event to a topic, we’d name it “gambit-order-shipped”. If we wanted to project a category projection, it would be “gambit-ce-customer-order”.

      For single event projections, we can use the order id as the key, which allows kafka to distribute the events across the partitions.

  5. Very interesting blog post. You mention, “Notice how in this example we built a ShipmentStatus aggregate, and not a CustomerOrder aggregate?” What if you do need to view the details of a customer order, including order, shipping, and payment details, involving for example, 50-100 properties that have been updated over various events. Is creating an aggregate of stream data still appropriate in that case?

  6. I’m curious in what domain you use event sourcing. Is this used for the shopping cart as well?

    I haven’t been able to understand how the eventually consistent nature of message -> processing -> update a view or SQL table in contexts where you need to serve up a page in real-time that reflects the action the customer just took.

    Thanks for a great post!

    1. We use event sourcing as our log of truth for everything at Jet, and I mean EVERYTHING. So, if we have a cart, it has an event stream, if we have an order process, it has an event stream. If we have a pricing engine, it has an event stream. The aggregates from these streams are the up-to-date representations of what happened in the system. Where we use the SQL projections are for things like analytics or to help the business folk with their processes and workflows.

      Most of our event processes do happen asynchronously. That does raise issues like you said where an action’s results can not be immediately displayed on the screen. In those cases, we have to figure out what is best and acceptable to display. As for the cart, we create a cart stream when you start adding items to the cart. It has item added and item removed events in the stream. The aggregate is what we display to the user. So, in effect, it is the most up-to-date version of the cart, since it is what we stored in the event database.

      When a user checks out and submits their payment information, the payment authorization has to happen synchronously. We wait for the authorization and write the results to the stream. From there, the order is picked up by the order management team and then fulfilled by the fulfillment team.

  7. Great write up!
    Thank you for sharing with us.

    I didn’t know Event Store has checkpoint streams. That is a great feature. It would make my life easier if went Event Store way.

    1. Hey Zeljko, we wrote our own F# client wrapper for the .NET event store client api. In that wrapper, we wrote a function that lets us subscribe to a stream as an observable. When we setup that subscription, we create the checkpoint stream as per the convention I described in the article.

      If the stream exists, we get the last event on the stream to extract the last read event for that microservice. If not, we start from position 0. Also, with checkpoints, we only increment the checkpoint on a successful read from the event stream. That is done at the end of a decode -> handle -> interpret process.

      Checkpoints are not a feature that is offered by event store out of the box, but it is fairly easy to extend the client to add them into your workflow.

      1. Thanks for explanation.
        When I started with event sourcing Event Store was mentioned on some groups and blogs as, well, event store but I didn’t understand benefits of using it.
        Your blog post cleared that up for me.

  8. How many instances of a projection microservice do you have writing to the SQL database? How do you manage multiple instances writing their aggregate to the database?

    1. Victor, typically have one microservice writing to a single projection node. For the project I’m working on right now, we project updates on a stream’s category event ($ce-Task) to our document DB database. For the same project, we’re currently building up the reporting database, and for that we need to read from multiple streams (both category and event type projection streams). In this case, we’ll still use a single service, but what we do is read from the multiple streams in the same service and merge the streams using reactive extensions. So in the end, we still have a single service writing to the projection node and we don’t have to manage updates from disparate services.

      1. From where do you display the data that you show the user? How do you choose between a traditional database and your event store directly? And I’d also like to know if you have a single Event Store for all of Jet or do you have each team/project have their own event store? Are there any trade-offs?

        1. Each team manages their own event stores. It’s mostly one event store instance or cluster, per system. I would advise to separate your event stores across teams or projects just as you would a SQL database. This way you can separate your team’s concerns from other team’s. At Jet, all systems do event sourcing, and we are traditionally using Event Store to store those events. We might use other technology, like SQL, Azure Table Storage, or DocumentDB, for non-critical scenarios. For instance, we might want to display reference data on the UI of an application, which fits better in something like SQL. Also, we might want to display a cache of what is in Event Store, for which we would project our events as views through Document DB.

          For things where we need to know the absolute current state of an object, or affect the current state of an object, that is where we will read and write to event store. On our main site, the cart will be a stream that needs to be in Event Store, but the product details can be pulled from a cache for faster load times.

          1. These are very good explanations and use cases for event sourcing. It would have been nice to see that sort of detail in the article or perhaps a future article. That being said, is there any case in which teams would share an event store, or is that something you would not allow? I would assume that if teams do need to share data, you do it via the service apis and not the event stores.

            Are there going to be more articles using event sourcing and microservices on azure from you guys? I love these topics and you guys seem to be doing pretty well with this pattern.

  9. I think that another article explaining how we design our systems would better explain your questions around event store usage. To answer your immediate question, we do not share our event stores across teams. Again, it is one event store service per project. We have, in the past allowed external consumers (i.e. different teams) to read from the projection nodes of a service. However, we are moving to using kafka for inter-service notifications and for replicating our projection streams. That is a fairly new pattern within the company and we are still experimenting with it.

    1. You know what I just realized? What I really want to know is do you do CQRS/ES and how do you shard your reads?

Leave a Reply

Your email address will not be published. Required fields are marked *