What is Jet’s product catalog?
At Jet, we aggregate product data from many different sources. Typically these sources include 3rd party merchant catalogs (provided by direct merchants via a public API that Jet exposes to the world) and streams of data collected by ETL-like processes. All of this data is routed into Jet’s matching engine, which performs a process called data fusion: matching pieces of data that we believe describe the same product and merging them into single entities called SKUs.
Every SKU is a stream
There are many ways to model the catalog, and different marketplaces do this in a variety of ways. The approach we use is called event sourcing, where every SKU is represented by an ordered sequence of immutable events with every next event in the sequence carrying a “delta” – a change in the SKU’s state.
If you want to understand the state of a given SKU at a given point in time, you have to:
- Get all the events for this stream.
- Make sure the events are ordered by the time they were appended to the stream.
- Fold events into one object:
- We start with an empty SKU called Sku.Empty.
- We have a folding function that knows how to apply an event to an existing SKU. The result is the modified SKU:
let apply: Sku -> Event -> Sku
- The actual folding looks very simple:
let aggegateEvents: Event seq -> Sku = Seq.fold apply Sku.Empty
Where do we store the event log?
In the very beginning we used a single technology, EventStore, both for storing events and for passing them as messages between services. Life was good. Then, as Jet grew, we reached a point where scalability and reliability became a huge concern for us, and that put additional requirements on the underlying technologies that we used. These included clustering, geo-replication, and support for rapidly growing data volumes, all with the assumption that everything had to work nicely in a cloud environment where any part of your infrastructure can temporarily disappear at any second. Ultimately, we ended up with a mix of technologies that gave us some of the things we needed, although there is still a lot of work to do.
In the interest of brevity, we will simplify things a little in this blog post. We will assume that the technology we use for storing events supports three operations:
- Get all events for a stream (by stream Id, for example: SKU id).
- Append an event to a stream (example: SKU update).
- Support optimistic concurrency:
- All streams have a version associated with them.
- Every time a stream is updated (and the only way to change a stream is to append events to it), the version is changed.
- The append operation accepts expected version as an argument. The operation succeeds only if the current version of the stream equals the expected version.
Here is an API we will assume our log storage technology is giving us:
let getEvents: StreamId -> Async<Event seq * Version>
let appendEvent: StreamId -> Event -> Version -> Async<bool>
We need to design a framework that will allows us to write events concurrently from all microservices that touch the catalog (or any other part of the platform, for that matter):
- In a uniform way: When you have a few dozen microservices, uniformity becomes a feature that you cannot ignore. It’s a very simple thing that can make or break you, especially in a world where constant refactoring is something many of us take for granted.
- In a safe way: Every time some service updates the catalog, we want to make sure data remains consistent. This is where enforcing optimistic concurrency becomes important.
- In a concise way: If you have to repeat a single line of code in 50 different services, it’s 50 lines of code right there. In the long run the size of your codebase is the ultimate factor that defines how much you can do with it.
Catalog service as a function (with benefits)
Every service has a different set of inputs that trigger business logic, and generally speaking, you cannot assume your inputs having any particular structure. There are two assumptions that you can make, though:
- Every input has a reference to a SKU that has to be affected by this input, which means we can request the id for that SKU:
let getSkuId: 'Input -> SkuId
- Every service needs the current state of the SKU in order to make a decision if an event has to be created or not (i.e. if the event actually causes something to change on the SKU). In its most general form, every business function in every service can be encapsulated by the following function:
let processInput: 'Input -> Sku -> Async<Event option>
The service takes a data input and corresponding SKU as parameters and potentially produces an event that has to be appended to the SKU’s stream if needed.
Two important properties that we heavily rely on that every catalog function has to provide are:
- Idempotence: Every catalog function has to produce the same result for the same input regardless of whether it’s applied one or multiple times.
- No side effects: Catalog functions have no long-range side effects. For example, it’s typically okay to write something to the log but not okay to send an email.
What about concurrency?
The approach described above can unify the way your services apply changes to the SKU and remove boilerplate code needed to handle input and output. What it doesn’t give you is protection against a situation where multiple processes/threads/handlers try to update the same SKU at the same time:
- Thread 1 starts processing command A for SKU 1
- Thread 2 starts processing command B for SKU 1
- Thread 1 loads SKU aggregate from Log store
- Thread 2 loads SKU aggregate from Log Store
- Based on SKU aggregate, Thread 1 decodes to append event E1
- Based on SKU aggregate, Thread 2 decodes to append event E2
=> Both events are appended to the SKU’s stream.
Such behavior could lead to data loss or data corruption. An example would be a service that checks if a SKU has a coupon applied to it, and if it does not, the service applies the coupon by appending a special event to the stream. Without any protection in place we could apply the same coupon twice and probably lose money as a result.
Typical approaches to handle this issue include:
- Locking-based mechanisms
- Pessimistic: Before loading the SKU aggregate, every thread will have to acquire a lock, and it is released after the operation is completed.
- Optimistic: Operate under the assumption that you can update a SKU, but before updating there is a check to see if it was modified by someone else since the time you loaded it. If it was modified, the update fails.
- Software transactional memory (STM): In-memory mechanisms are inspired by database transactions where a process logs all read and write operations, and if it detects any conflicts, it re-executes them.
Software transactional memory of a distributed and functional flavor
We would like to end up with a solution that combines properties from both approaches:
- From STM: We want to be able to automatically rerun any transaction that was cancelled due to a collision. Typical STM systems do that only for in-memory transactions, which means that they cannot be used in a distributed system.
- From locking: We can easily use optimistic concurrency in a distributed system as long as there is a special process/store that reliably handles SKU versions.
This is where we are getting huge benefits from the properties of service functions we postulated above:
- No side effects: This makes implementation of STM trivial because we do not need to keep a log of all read/write operation, and the output of such a function is solely defined by its input.
- Idempotance: It is safe to rerun any service function as many times as needed if a concurrency collision is detected.
Let’s wrap concurrency checking and retry functionality into a stand-alone function using the Log Store API we described above.
One helper function that we will need is the one that extracts SkuId from the command:
let getSkuId (cmd: CheckSkuAssortment) = cmd.skuId
Now it’s time to write a function that will contain all of the business functionality that the new catalog service needs:
This implementation is unaware of concurrency and requires you to load the SKU first, so let’s combine it with our optimistic concurrency/STM blend:
let checkAssortment = changeSku getSkuId checkAssortment
Now the checkAssortment function has the following signature:
let checkAssortment: CheckSkuAssortment -> Async<unit>
This mean that it supports out-of-the-box:
- Event loading/aggregation/saving
- Collision handling
- Retry functionality
Where to go next
Now that we have a function all changes go through, it is very easy to add logic that is supposed to affect all catalog services. Some examples include:
- Unified validation: You can consider putting general business rules all services have to follow into changeSku function. One thing to keep in mind is that such an approach might not be scalable if you have too many rules. It is also worth mentioning that rules have to be truly general and devoid of microservice-specific logic.
- Unified versioning: If all modification requests go through one function, we can easily add object versioning. In the simplest case, the SKU version could be the number of events it has.
- Caching: Once you have versioning, it’s easy to add proper caching. Simply, ask for a cache first with a SKU snapshot. If it’s not there, go to the (much slower) log store. If a SKU is modified, save the latest snapshot to the cache so that subsequent calls can get it.