Aecor — Purely functional event sourcing in Scala. Part 4b

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

We’ve built a streamed view for Booking entity and discussed projections a little bit in Part 4a. One piece is missing though: we haven’t made it work in a cluster yet.

If you haven’t read Part 4a, I encourage you to start there, since this post depends heavily on things covered there.

Today we’ll look at Aecor’s Distributed Processing, which allows to run any kind of computations (including stateful ones, like projections) on top of akka cluster.

After demo example is complete, we’ll discuss some non-trivial practical topics around building projections in an eventsourced system.

Why bother about cluster?

For some it might not be clear straight away, so let’s look at partitioned tagging picture from previous post:

Which node should run each stream?
If a node dies, will other node pick it up?

So we have N streams to run in our cluster, where N is 1 or bigger.
First, we definitely don’t want to have duplicates running at the same time. Having two concurrent streams for, say, tag “Booking-2” within the same view sounds like nightmare: you’ll get something, but definitely not something you want.

Not properly handled concurrency can be fun. But not in production.

So we need exactly one stream per each tag. We could launch them all on a single node, but what if that node experiences an outage? View quickly becomes outdated, which doesn’t sound fault tolerant at all.

It’s actually the same consensus problem we faced before. Cluster has to agree on which streams each node runs. This distributed state requires consensus.

As we already learned, akka-cluster offers a solution. With a small amount of streams to run, you can get away with using simple cluster singleton. It’s the same “all streams on one node”, but with failover.

When shit gets real, you need to distribute your streams over all nodes, and this is a task for akka-sharding. Aecor’s Distributed Processing wraps this solution in a generic purely functional interface.

Distributed Processing

Let’s see how we can deploy our projection on a cluster:

Yep, it’s that simple. We give a common name for our bunch of processes and just do distributedProcessing.start. Name is not completely arbitrary: you can have multiple deployments and each has to be uniquely named — it’s required to properly setup underlying sharding.

Watchful readers will ask: “What is fs2Process?”. It’s just some plumbing to make raw fs2 streams work with a more generic distributed processing. Let’s go over it step by step.

A unit of distributed processing deployment is called a Process. It’s some kind of long-lasting computation, that is expected to be restarted in case of failure.

So a Process here is nothing more than a recipe to launch particular computation. An instance of running computation, which is called RunningProcess, essentially represents a Fiber — primitive, that is heavily used in effect systems of both cats-effect and ZIO.

For those unfamiliar with fibers, RunningProcess provides a shutdown hook and a way to watchTermination — subscribe to the fact that computation completed or failed.

Now we have everything to describe how distributed processing deployment works in plain words. When you hand it a process, Aecor uses process.run to launch the computation. It then watches for process termination, and if it happens, uses process.run to restart the computation again. Shutdown hook is used if a process deployment killswitch is triggered externally.

You can see, that although projection streams are a perfect fit for a processing deployment, the concept of a Process is much broader. You can distribute almost any kind of computation with it, which is nice.

But currently we’d be happy with just a bunch of streams for a single projection. To connect all the dots we need to transform an fs2.Stream into a DistributedProcessing.Process. Cats-effect make it rather simple:

We launch the stream inside a fiber, using a signal to be able to terminate the stream externally. Fiber’s join can be used to watch stream termination.

And that’s it. You just got yourself a partitioned distributed projection.

Things to know

We’re done with the code, so let’s now discuss some related questions I consider important. Hope this section will save someone from a maldesigned system or an unexpected production issue.

View as a sanity check for your entity

Designing entities and aggregate boundaries is nowhere easy. There are many tools you can employ to validate and improve your design — it would take a separate post to cover them.

I just want to mention one, related to the Part 4 topic: views. It wasn’t mentioned explicitly in the post, but deduplication mechanism relied on an atomic update of both the view data and the version. In our case we have it for free, by the virtue of having them sit in the same database row.

If, for some reason, we’d have to update several rows in reaction to a single event, that would become trickier. In particular, we’d have to use some kind of database transaction to pull that off. This hurts scalability and performance.

I consider it a very strong smell if a view for single entity doesn’t fit into one database row. In other words:

If a view of your entity doesn’t fit a single database row (or it does but it feels too wrong), there are really two or more entities behind it.

I made this mistake once and it caused a lot of trouble down the road.

Know your journal well

Although eventsourcing is all about eventual consistency, underlying machinery actually puts much stronger requirements on storage technology. One particular thing, related to projections, is the order of events.

As we discussed in previous post, most projections rely on events to be causally and temporally ordered (within a single entity). This means, that if entity A writes events E1 and E2, read side should not ever see E2 before E1. This happens to be quite challenging to guarantee in real life.

For example, let’s take PostgreSQL as a journal. Using auto-incremented serial column as an offset is a simple and powerful solution — you get strict journal-wise order for free. But does it hold the requirement above?

It happens that when doing concurrent writes with default settings you can read row N+1 before row N, where N and N+1 are values of the offset column. Here you go — projection missed an event.

The problem can be addressed in several ways, but without knowing such quirks you can end up in a rush, looking for solution while your production is down.

Aecor postgres journal we used in this series currently supports only the simplest writing strategy, which is to serialize the writes by locking the journal table.

While not the most efficient approach, it’s cost is heavily amortized by the fact that you can (and probably should) have a separate journal table for each entity.

Be prepared to replay views

It’s well known that eventsourcing systems are especially hard to operate. Whereas any kind of local data corruption can be easily fixed in a CRUD-based system by just updating the data in place, it’s much more complex with eventsourcing.

Even if you can update an event in the journal (which can be tricky on it’s own, if you use a binary storage format like protobuf), your projections would have already processed the old corrupted event, and the fix won’t propagate by itself. To fix your views, you’d need to replay projections from some earlier offset.

One of our projections, complaining about replays.

If a projection is not idempotent, then you can’t just replay from an earlier offset — some events will end up processed twice. You’re up to a real challenge here and most probably the projection will have to be replayed from scratch.

Idempotency is a very handy property, so don’t miss a chance if you can get it.

There are handful of other reasons for a partial or complete projection replay:

  • a bug in projection fold;
  • missed events (hardware glitch or journal bug);
  • a need to migrate view(s) to another database system.
Swiss army knife of eventsourced system developer.

This is inevitable, so better be prepared. Actual scenarios differ depending on your setup and SLA’s your service has to obey. Make sure your team knows what to do when an emergency replay is required.

Congratulations!

1 month, 6 posts and we have finally covered bread and butter of eventsourcing with Aecor. I hope you liked the journey, together we learned how to:

  • define an eventsourced behavior;
  • deploy it into the cluster;
  • build a view for an eventsourced entity;
  • distribute the view projection over the cluster.

It’s enough to go and build a working app! But there’re still one valuable pattern I haven’t discussed, which is Process Managers.

You already know everything to develop a process manager with Aecor, so in Part 5 we’ll quickly go over an example and discuss some theory.

See you there!

Aecor — Purely functional event sourcing in Scala. Part 4a

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

In previous post we finally deployed our eventsourced behavior on a cluster. Now we can issue booking commands, but there’s still nothing that our end users can query.

So today we’re going to build an eventsourced view with the help of Aecor and discuss CQRS in the context of eventsourcing.

I decided to split this post in 2 pieces. Otherwise it would become too huge. In Part 4b we’ll look at real world problems you face when running projections as well as best practices and experience, collected over the years at Evotor.

Projections

I hope you’re familiar with the value proposition of CQRS. Before we start to write code, let’s understand why CQRS is especially valuable in eventsourcing and take a closer look at underlying reasoning and mechanics.

In eventsourcing your write-side is the event log. Since it’s an append-only log, it can potentially provide extremely high throughput for incoming events. On the other hand, it would be awkward and inconvenient to query and fold events from the log every time you need current entity state. So, obviously, journal is an awfully poor fit for the read side backend.

Thanks to single writer we discussed in Part 2, events are strictly ordered (at least within each particular entity). It’s common sense: unless you’re dealing with something special (like CRDTs), you can’t build properly working behaviors without events being ordered.

So we can view log as a bunch of frozen event streams (one per each entity). Given that events are constantly appended to the log, these streams are effectively infinite.

Now we come to the notion of projection, which is nothing more than an arbitrary fold of an entity event stream. An example of such fold is a view: a representation of the entity state which is optimized for querying. This view is going to be the read-side of our CQRS setup.

We shouldn’t limit projections to views though. When I say projection is an arbitrary fold, it’s what I mean. Our log is the source of truth which stores facts happened in our system. Using projections, you can derive secondary facts in any format you want. For example, you can project your events into an ElasticSearch index, while still building a regular view in parallel.

Duality of tables and streams. Image from confluent.io.

Most of the time projections fold into a database table of some sort. This is no coincidence, because streams and tables are closely related. This duality is just too cool to not be mentioned, so I couldn’t miss the opportunity.

I personally love the idea of using events as the source of truth and use streaming to derive any secondary information in any format you want. Confluent, the maintainers of Kafka, have done an amazing job popularizing and explaining this paradigm to the community. I’ll just quickly pay them some respects by recommending their free book, which I learned a lot from.

Log => Stream

To fold an event stream we first have to run it somehow. Usually it’s the journal module that has to provide streaming capabilities to build projections.

Aecor follows this approach. In Part 3 we created a journal for booking events using aecor-postgres-journal:

To get CQRS capability package, just call queries extension on the journal instance, like this:

In return you get a read-side journal API with just a couple of methods:

If you used akka-persistence before, these should look familiar. Indeed, like in akka-persistence, the only difference between the two streams is that eventsByTag always stays live and polls for new events, while currentEventsByTag halts after all currently known events are processed. Since usually projections have to be real-time, eventsByTag is used in most cases.

FS2 is chosen here as a streaming library, which should be of no surprise. eventsByTag stream is constructed using following parameters:

  • pollingInterval, which we specified when constructing the read journal. It controls how often the journal should look for new events, matching the stream criteria;
  • offset, which is just a Long under the hood. It allows to start the stream from an arbitrary position in the log;
  • and tag, which serves stream partitioning purposes.

Let’s talk about tagging, because it’s really interesting.

Tagging

Tagging is simply marking events with additional tag or tags. It allows you to get an alternative slicing of your log. One very common tagging approach is to tag all events of particular entity type with a common tag. For example, mark all BookingEvents with Booking tag.

But why? We don’t need it on the write-side: there we just process each entity separately. Why now we need to join them using tags?

Well, we have to. Processing each entity separately on the read side is not practical. Here’s why.

On the write side, we know exactly when a command happens, so we can wake up the corresponding entity instance, process the command, store the event and then shut the instance down until further commands, effectively releasing resources. It’s highly unlikely that all the entities get commands at the same time, therefore it’s practical.

To stream events on the read-side though, we have to constantly poll the journal for new events, because read-side has no clue of when next event is going to happen. It’s clear that polling the journal separately for each entity instance will just drown the database with requests and nothing will work.

This is where tagging comes to the rescue. You slice the log into bigger chunks in a way that won’t disrupt the event order of any particular entity. After that you can launch a polling stream for each chunk and it won’t crush the whole system.

Single tag per entity

There’s a simple natural way to tag events, such that for each particular entity all it’s events go into single chunk. It’s just the tag’em’all approach I mentioned earlier: “mark all BookingEvents with Booking tag”.

In this case for each projection of booking events there will be a single stream running the fold. To setup such simple tagging configuration, use Tagging.const from Aecor. With bookings we would do it this way:

We create tagging config using a single tag and pass it to the journal. It’s required on the write-side either, because journal has to know how to tag events when it stores them.

Then we just use the tag to launch the read-side stream. Nice and easy.

Single tag partitioning. Simple, but throughput is limited.
Still valuable in cases where multiple entity types share the same journal table.

But throughput of a single stream is limited. What if it’s not enough? In this case partitioned tagging should be chosen.

Partitioned tagging

The idea is to partition the single tag chunk further while keeping each entity within a single sub-partition. And because entities are identified by a unique key, this key is a perfect candidate to partition the log by.

It’s done conceptually the same way Kafka partitions messages by key: partition is determined by taking a hash of the key modulo the number of partitions.

Partitioned tagging at work. A more scalable solution.
As well as single tag, allows to have multiple entity types share the same journal.

Tagging.partitioned does exactly that:

In this example we’ll get 10 partitions, each marked with a distinct tag: Booking-0, Booking-1, …, Booking-9. Here, to project all partitions we’ll need to launch 10 streams (one per partition):

At this moment, we won’t push further and stay happy with just somehow launching the streams. To make it work properly in reality we’d need them to be distributed over the cluster, but this is a topic for a future post.

So let’s stop here and talk about offsets instead.

Consumers and offsets

Let’s for a moment zoom back out and look at the whole projection again. Say we’re projecting a view where each event updates something in the view table. We don’t want to process each event more than once, so projection needs to track it’s progress somehow.

Offset is used for that purpose. In comparison to sequenceNumber that defines order withing a single entity instance, offset is a global (usually time-based) ordering withing a journal. Since no tagging scheme can escape the journal, we can safely say, that for any tag, offset will still provide a total order within this tag.

SequenceNumber is a strictly serial order within specific entity instance.
Offsets provide global order within the whole journal. It’s ok to have gaps in offset history.

So for projection it’s enough to store the offset of the last processed event. After next event is processed, the new offset is committed. When projection is restarted, it can use the stored offset to continue from where it stopped last time.

Again, resemblance to Kafka is very high here:

  • Projection <=> Kafka consumer group
  • Projection stream (for single tag) <=> Kafka consumer
  • Tag <=> Kafka partition
  • Offset <=> Kafka consumer offset

This analogy helped me immensely when I was trying to understand how Aecor works. Hope it helps you as well.

Kafka as a journal?

Since it’s all that similar to Kafka… why not use it as a journal then?

It’s a good question and a short answer is “It’s tricky”. There’s some discussion on the topic going on in Aecor’s issue tracker.

There’s also a project of my former colleagues from Evolution Gaming. It is a combo journal technology, that uses both Kafka and Cassandra.

Committable

Let’s get back to projections mechanics. So far we have:

  • a partitioned stream of events (each event has an offset attached);
  • some storage for projection offsets, let’s call it an offset store;
  • projection folding logic itself, e.g. populating a view table.

There’s a small design issue though. Seems like that every projection has to know how to commit the latest offset to the offset store. This is required, because only projection knows when an event can be considered processed.

On the other hand, it seems like too much for projection to know about offset stores. Also, it would require a lot of repetition to interact with offset store in every projection. Conceptually, projection just needs a way to tell the journal that it’s done processing a particular event.

Aecor journal provides you with such abstraction. For this to work, you need to define and supply an offset store to the journal first:

As you can see, a KeyValueStore[F, TagConsumer, Offset] has enough power to be an offset store. TagConsumer here is just a tag paired with a consumerId.

This upgrades bookingQueries to produce streams that allow consumer to commit offsets. Here’s how eventsByTag would look now:

Now, every event is wrapped into Committable, which is a simple container that attaches a commit hook to the payload:

For journal with an offset store attached the commit action would commit the corresponding offset when executed. Projection now doesn’t even know any offsets are involved: it can just rely on commit to save projection progress, whatever it takes. It’s quite beautiful and we’ll see it in action soon.

View

To make a complete demonstration, let’s build a view projection for our booking entity. In this case we need a very simple view, that will allow users to see their own bookings.

Let’s start with a storage (or repository) algebra:

get and set will be used by the projection to construct the view, and both get and byClient will serve data to clients.

We can write an interpreter for this repo using doobie and a PostgreSQL table. It’s mostly boilerplate, so I won’t put it here, you can look it up on github.

Now let’s define projection logic: how it’s going to fold events into the view. It’s mostly straightforward, but there are couple of tricks there, so let’s not skip it entirely.

So projection can:

  • get current state of the view that corresponds to currently processed event. It’s allows to properly do the fold and also to deduplicate events;
  • run the fold: produce a new version of the view according to happened event;
  • and update the view.

View version is nothing more than a counter of processed events. If an event comes in and event.sequenceNr <= version, then this event was already processed before and projection can skip it.

This deduplication is required, because offset commit is asynchronous and there’s a possibility for the projection to fail after an event was processed, but before the offset was committed.

Of course, if your projection is idempotent, there’s no need in deduplication. We’re looking at a more general case.

Implementation is rather simple and I won’t put it here as well. Curious readers can find it on github as well.

The whole setup in state, demonstrating
why deduplication is required for a non-idempotent projection.
(click for a larger pic)

So we have another component for our read-side setup. Let’s now plug it into the event stream.

Projection Flow

Let’s now look how events will flow through the projection. We’ll design this flow as a simple fs2.Sink. Implementation is a little verbose, but rather simple:

Let’s start from the returned value. It’s has type fs2.Sink[F, Committable[F, Event]], which is an alias for fs2.Stream[F, Committable[F, Event] => fs2.Stream[F, Unit]. So basically we do something with every event in the stream, and throw away the results.

Every event goes through two stages:

  1. Execute the fold, which in turn has following steps:
    1. Fetch current version and state of the view.
    2. Ignore events that were already processed (by comparing seqNr and version.
    3. Fold the event, and if it affected the view then save the new state to the repo.
  2. Commit the offset.

Everything works as we previously discussed. An interesting detail is the traverse in the first evalMap. It’s called on Committable envelope and works the same way as it does for Option or List. We just run the projection “inside” the committable wrapper and then pull the effect out, so that in the end we get an F[Committable[F, A]] instead of Committable[F, F[A]].

As many other eventsourcing concepts, projections map really nicely into real world.
Game score is a projection of match events.
Account balance is a projection of all successful transactions.
Code is a projection of all the keystrokes team members have made. And so on.

Now we have all the LEGO bricks we need to launch our view. Let’s just compose them together.

Wiring the view

Let’s wire everything together:

Translating into English: for each tag partition (“Booking-1”, “Booking-2”, etc.) we launch an event stream with a fixed consumer id, that uniquely identifies this projection.

Each stream is then directed into our projection sink, which on every event updates the view through the repo and commits offset to the offset store.

What’s left is to actually launch these streams at the “end of the world” and distribute them over the cluster. As I said, we’ll talk about this in the future post. If you’re eager to check it out now, github repo is always there waiting.

See you soon!

So we got ourselves a really nice view!

A lot is hidden from sight though. We’ll discuss underlying assumptions, design choices, various problems and best practices related to projections in Part 4b. And, as I promised, we’ll distribute projection streams over the cluster.

Hope you had a great time reading and see you in the next post! 🙂