Originally published Feb 22, 2017 on github.io.
The product matching and taxonomy engine at Jet.com is a large event-driven distributed system consisting of nearly 400 microservices that process a half million messages per minute on a quiet Sunday. Clustered linear data stores, such as Kafka and EventStore, are used to retain and distribute many of these messages.
For auditing and diagnostic purposes, finding an individual message in these datastores could entail scanning a huge number of events in linear time, algorithmically on the order of
O(n). In practice, this could take days.
Although recent versions of these technologies maintain both timestamps and sequence numbers, we may want to search based on the value of monotonically increasing fields inside the event’s Json document. Any field containing a date, incremental ID or value other than the message commit date and sequence number. This is often possible because of ordering guarantees among upstream microservices.
The goal is to locate the precise offset (and partition in the case of Kafka) of messages at the lower bound of the desired date time, or ID. And, this needs to be done quickly without drawing down and processing the vast number of messages in the data store.
Any solution should support technical PMs who need to quickly locate such a message. There may be an urgent need to trace down the contents of an event that could have been transmitted during a large window two or three days earlier. It needs to be able to accept different path specifiers to identify the search field for different kinds of documents and be able to query arbitrary topics and streams. It’s possible that not all messages in a stream even contain the desired search field, so it must operate within an environment of partial information.
Thus, the solution is to be friendly, fast, and flexible.
We assume that the target search field’s contents are monotonically increasing. Imagine the following sample from a stream of data:
Note that the
target field value is trending upward but not necessarily one-to-one with the offset value. It may be missing
, and it may appear several times
Binary Search Algorithm
Fortunately, this problem looks like a slight variation of the childhood number guessing game. In that game, you ask a child to guess what number you’re thinking of between
100. When they say “4”, you respond, “Nope, it’s bigger than 4!” An astute child will then guess some number between
100 to earn a cookie.
In introductory computer science, the binary search algorithm solves that game in logarithmic time, or
If you’re looking for a message with
"target": 296971532, you might start by probing offset
31418. Seeing that
296967168 is too low, you might then look at offset
31420 and then
31421 to find the lower bound of
However, if you probe partition
31419 above for the
target field, you will be unable to extract the
target value. Thankfully, most clients for Kafka and EventStore are designed to efficiently stream subsequent events. In this case, the algorithm needs to be modified to also consider
target values later in the stream including offsets
31420 and above. It should also be able to handle situations where the tail of the event stream entirely excludes this
To expand on this, consider also the rate of elimination and the time that it takes to probe an offset:
For your first guess, you may be able to eliminate half of the entire data store, say, a billion events. On your second guess, you may even eliminate an additional quarter: five hundred million. If it takes a second to complete each of those probes, then your information gain will definitely exceed your cost.
Within the next 20 probes, you could potentially narrow the range of possibilities to 2000 events at a cost of, say, 20 seconds.
To precisely locate the lower bound could cost another 11 seconds. However, depending on the batch size of your driver, you may already be streaming 500 events at a time. Simply iterating all remaining 2000 events could be completed in far less time.
As a result, you want an algorithm with both seek and scanning capability, transitioning from seek to scanning at some point in the search. In practice, this threshold is around 3-5000 messages depending on the size of the cluster and the availability of the data.
Since Jet.com uses F# as its predominant language, there are a wide variety of options available to query both EventStore and Kafka as well as to operate on Json formatted messages. Designing sophisticated user interfaces is also quite easy with F# and Xaml.
Querying a Json document using JsonPath makes it easy for an end user to specify different
target fields depending on their needs.
A selection of clients exists for streaming data:
- EventStore’s official .NET Client
- Kafka support from a number of clients, including:
In this application, I used the Nata.IO library, which provides a common abstraction over stable versions of the above clients. It includes a consistent mechanism to query the range of available offsets, as well as to stream from arbitrary positions in the target event log.
To build a modern WPF user interface in F#, I also used the FsXaml library. Although the documentation is sparse, it is a powerful tool with nearly the same flexibility and capability as the well-known C# tooling in Visual Studio. In fact, it supports the same interactive visual designer for Xaml.
A video screencast of the final interface:
Many of these features work together to solve the problem outlined above. These include:
- Selecting either Kafka or EventStore with custom host name, and topic or stream.
- Targeting a
Numericvalue from a field that increases over time:
- Custom JsonPath queries such as
$.envelope.timestampto extract this field.
- Custom transition threshold from
- Custom JsonPath queries such as
Full controls to Start, Stop and Resume multiple searches simultaneously.
- Visibility into the complete search state and progress.
- Copy/Paste support into Excel for subsequent analysis.
The actual application source is broadly separated into two areas, the Algorithm and the User Interface:
The search algorithm is initialized with connection information to query the
indexOf bounds within the topic or stream, and
readFrom arbitrary positions within those bounds. When asked to execute it performs the search and emits periodic updates to the search state.
Extraction of the
target is performed using a codec that transforms a raw byte stream into a stream of the desired Json field.
This algorithm started its life as a simple F#
- User Interface
Originally created using C#, the FsXaml UI retains all of the original design except for some unneeded code-behind (e.g.
The primary difference in F# is the use of type providers to generate type information from the XAML to use in subsequent F# code.
Once assembled, the application is wired up and compiled as an F# Windows Application.
This is paired with a user control to query the latest version and provide an unobtrusive notification in the event of a new release.
The resulting application demonstrates how a simple and clean F# stream-processing algorithm can be exposed with a complete and modern user interface. It pulls together technology for EventStore and Kafka, query capability with JsonPath, view layout with
FsXaml type providers, and a host of other elements designed to make this kind of application easy.
It performs extremely well. In scenarios, such as the screencast above, it can locate the lower bound of a timestamp among four million messages in less than three seconds. Moreover, the logarithmic nature of the algorithm lets us scale to many, many times that amount of data with a few extra seek operations. For example, a billion events requires only 8 extra seeks (typically on the order of a few milliseconds each).
To play with the source code or download a copy, visit me on GitHub:
Note that while I’ve used this tool at Jet, the company makes no warranty with respect to its use in your own environment. It’s open source. Review and edit to meet your own unique needs! And have fun!
If you’d like to learn more about technology careers at Jet.com, visit https://tech.jet.com/. ☺