Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.
In previous post we finally deployed our eventsourced behavior on a cluster. Now we can issue booking commands, but there’s still nothing that our end users can query.
So today we’re going to build an eventsourced view with the help of Aecor and discuss CQRS in the context of eventsourcing.
I decided to split this post in 2 pieces. Otherwise it would become too huge. In Part 4b we’ll look at real world problems you face when running projections as well as best practices and experience, collected over the years at Evotor.
I hope you’re familiar with the value proposition of CQRS. Before we start to write code, let’s understand why CQRS is especially valuable in eventsourcing and take a closer look at underlying reasoning and mechanics.
In eventsourcing your write-side is the event log. Since it’s an append-only log, it can potentially provide extremely high throughput for incoming events. On the other hand, it would be awkward and inconvenient to query and fold events from the log every time you need current entity state. So, obviously, journal is an awfully poor fit for the read side backend.
Thanks to single writer we discussed in Part 2, events are strictly ordered (at least within each particular entity). It’s common sense: unless you’re dealing with something special (like CRDTs), you can’t build properly working behaviors without events being ordered.
So we can view log as a bunch of frozen event streams (one per each entity). Given that events are constantly appended to the log, these streams are effectively infinite.
Now we come to the notion of projection, which is nothing more than an arbitrary fold of an entity event stream. An example of such fold is a view: a representation of the entity state which is optimized for querying. This view is going to be the read-side of our CQRS setup.
We shouldn’t limit projections to views though. When I say projection is an arbitrary fold, it’s what I mean. Our log is the source of truth which stores facts happened in our system. Using projections, you can derive secondary facts in any format you want. For example, you can project your events into an ElasticSearch index, while still building a regular view in parallel.
Most of the time projections fold into a database table of some sort. This is no coincidence, because streams and tables are closely related. This duality is just too cool to not be mentioned, so I couldn’t miss the opportunity.
I personally love the idea of using events as the source of truth and use streaming to derive any secondary information in any format you want. Confluent, the maintainers of Kafka, have done an amazing job popularizing and explaining this paradigm to the community. I’ll just quickly pay them some respects by recommending their free book, which I learned a lot from.
Log => Stream
To fold an event stream we first have to run it somehow. Usually it’s the journal module that has to provide streaming capabilities to build projections.
Aecor follows this approach. In Part 3 we created a journal for booking events using aecor-postgres-journal:
To get CQRS capability package, just call
queries extension on the journal instance, like this:
In return you get a read-side journal API with just a couple of methods:
If you used akka-persistence before, these should look familiar. Indeed, like in akka-persistence, the only difference between the two streams is that
eventsByTag always stays live and polls for new events, while
currentEventsByTag halts after all currently known events are processed. Since usually projections have to be real-time,
eventsByTag is used in most cases.
FS2 is chosen here as a streaming library, which should be of no surprise.
eventsByTag stream is constructed using following parameters:
pollingInterval, which we specified when constructing the read journal. It controls how often the journal should look for new events, matching the stream criteria;
offset, which is just a
Longunder the hood. It allows to start the stream from an arbitrary position in the log;
tag, which serves stream partitioning purposes.
Let’s talk about tagging, because it’s really interesting.
Tagging is simply marking events with additional tag or tags. It allows you to get an alternative slicing of your log. One very common tagging approach is to tag all events of particular entity type with a common tag. For example, mark all
But why? We don’t need it on the write-side: there we just process each entity separately. Why now we need to join them using tags?
Well, we have to. Processing each entity separately on the read side is not practical. Here’s why.
On the write side, we know exactly when a command happens, so we can wake up the corresponding entity instance, process the command, store the event and then shut the instance down until further commands, effectively releasing resources. It’s highly unlikely that all the entities get commands at the same time, therefore it’s practical.
To stream events on the read-side though, we have to constantly poll the journal for new events, because read-side has no clue of when next event is going to happen. It’s clear that polling the journal separately for each entity instance will just drown the database with requests and nothing will work.
This is where tagging comes to the rescue. You slice the log into bigger chunks in a way that won’t disrupt the event order of any particular entity. After that you can launch a polling stream for each chunk and it won’t crush the whole system.
Single tag per entity
There’s a simple natural way to tag events, such that for each particular entity all it’s events go into single chunk. It’s just the tag’em’all approach I mentioned earlier: “mark all
In this case for each projection of booking events there will be a single stream running the fold. To setup such simple tagging configuration, use
Tagging.const from Aecor. With bookings we would do it this way:
We create tagging config using a single tag and pass it to the journal. It’s required on the write-side either, because journal has to know how to tag events when it stores them.
Then we just use the tag to launch the read-side stream. Nice and easy.
But throughput of a single stream is limited. What if it’s not enough? In this case partitioned tagging should be chosen.
The idea is to partition the single tag chunk further while keeping each entity within a single sub-partition. And because entities are identified by a unique
key, this key is a perfect candidate to partition the log by.
It’s done conceptually the same way Kafka partitions messages by key: partition is determined by taking a hash of the key modulo the number of partitions.
Tagging.partitioned does exactly that:
In this example we’ll get 10 partitions, each marked with a distinct tag:
Booking-9. Here, to project all partitions we’ll need to launch 10 streams (one per partition):
At this moment, we won’t push further and stay happy with just somehow launching the streams. To make it work properly in reality we’d need them to be distributed over the cluster, but this is a topic for a future post.
So let’s stop here and talk about offsets instead.
Consumers and offsets
Let’s for a moment zoom back out and look at the whole projection again. Say we’re projecting a view where each event updates something in the view table. We don’t want to process each event more than once, so projection needs to track it’s progress somehow.
Offset is used for that purpose. In comparison to
sequenceNumber that defines order withing a single entity instance,
offset is a global (usually time-based) ordering withing a journal. Since no tagging scheme can escape the journal, we can safely say, that for any tag,
offset will still provide a total order within this tag.
So for projection it’s enough to store the offset of the last processed event. After next event is processed, the new offset is committed. When projection is restarted, it can use the stored offset to continue from where it stopped last time.
Again, resemblance to Kafka is very high here:
- Projection <=> Kafka consumer group
- Projection stream (for single tag) <=> Kafka consumer
- Tag <=> Kafka partition
- Offset <=> Kafka consumer offset
This analogy helped me immensely when I was trying to understand how Aecor works. Hope it helps you as well.
Kafka as a journal?
Since it’s all that similar to Kafka… why not use it as a journal then?
It’s a good question and a short answer is “It’s tricky”. There’s some discussion on the topic going on in Aecor’s issue tracker.
There’s also a project of my former colleagues from Evolution Gaming. It is a combo journal technology, that uses both Kafka and Cassandra.
Let’s get back to projections mechanics. So far we have:
- a partitioned stream of events (each event has an offset attached);
- some storage for projection offsets, let’s call it an offset store;
- projection folding logic itself, e.g. populating a view table.
There’s a small design issue though. Seems like that every projection has to know how to commit the latest offset to the offset store. This is required, because only projection knows when an event can be considered processed.
On the other hand, it seems like too much for projection to know about offset stores. Also, it would require a lot of repetition to interact with offset store in every projection. Conceptually, projection just needs a way to tell the journal that it’s done processing a particular event.
Aecor journal provides you with such abstraction. For this to work, you need to define and supply an offset store to the journal first:
As you can see, a
KeyValueStore[F, TagConsumer, Offset]has enough power to be an offset store.
TagConsumerhere is just a
tagpaired with a
bookingQueries to produce streams that allow consumer to commit offsets. Here’s how
eventsByTag would look now:
Now, every event is wrapped into
Committable, which is a simple container that attaches a commit hook to the payload:
For journal with an offset store attached the
commit action would commit the corresponding offset when executed. Projection now doesn’t even know any offsets are involved: it can just rely on
commit to save projection progress, whatever it takes. It’s quite beautiful and we’ll see it in action soon.
To make a complete demonstration, let’s build a view projection for our booking entity. In this case we need a very simple view, that will allow users to see their own bookings.
Let’s start with a storage (or repository) algebra:
set will be used by the projection to construct the view, and both
byClient will serve data to clients.
We can write an interpreter for this repo using doobie and a PostgreSQL table. It’s mostly boilerplate, so I won’t put it here, you can look it up on github.
Now let’s define projection logic: how it’s going to fold events into the view. It’s mostly straightforward, but there are couple of tricks there, so let’s not skip it entirely.
So projection can:
- get current state of the view that corresponds to currently processed event. It’s allows to properly do the fold and also to deduplicate events;
- run the fold: produce a new version of the view according to happened event;
- and update the view.
version is nothing more than a counter of processed events. If an event comes in and
event.sequenceNr <= version, then this event was already processed before and projection can skip it.
This deduplication is required, because offset commit is asynchronous and there’s a possibility for the projection to fail after an event was processed, but before the offset was committed.
Of course, if your projection is idempotent, there’s no need in deduplication. We’re looking at a more general case.
Implementation is rather simple and I won’t put it here as well. Curious readers can find it on github as well.
So we have another component for our read-side setup. Let’s now plug it into the event stream.
Let’s now look how events will flow through the projection. We’ll design this flow as a simple
fs2.Sink. Implementation is a little verbose, but rather simple:
Let’s start from the returned value. It’s has type
fs2.Sink[F, Committable[F, Event]], which is an alias for
fs2.Stream[F, Committable[F, Event] => fs2.Stream[F, Unit]. So basically we do something with every event in the stream, and throw away the results.
Every event goes through two stages:
- Execute the fold, which in turn has following steps:
- Fetch current version and state of the view.
- Ignore events that were already processed (by comparing
- Fold the event, and if it affected the view then save the new state to the repo.
- Commit the offset.
Everything works as we previously discussed. An interesting detail is the
traverse in the first
evalMap. It’s called on
Committable envelope and works the same way as it does for
List. We just run the projection “inside” the committable wrapper and then pull the effect out, so that in the end we get an
F[Committable[F, A]] instead of
Now we have all the LEGO bricks we need to launch our view. Let’s just compose them together.
Wiring the view
Let’s wire everything together:
Translating into English: for each tag partition (“Booking-1”, “Booking-2”, etc.) we launch an event stream with a fixed consumer id, that uniquely identifies this projection.
Each stream is then directed into our projection sink, which on every event updates the view through the repo and commits offset to the offset store.
What’s left is to actually launch these streams at the “end of the world” and distribute them over the cluster. As I said, we’ll talk about this in the future post. If you’re eager to check it out now, github repo is always there waiting.
See you soon!
So we got ourselves a really nice view!
A lot is hidden from sight though. We’ll discuss underlying assumptions, design choices, various problems and best practices related to projections in Part 4b. And, as I promised, we’ll distribute projection streams over the cluster.
Hope you had a great time reading and see you in the next post! 🙂