Aecor — Purely functional event sourcing in Scala. Part 6

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

This is the closing post of the series where we’ll have some fun studying Aecor internals. Tagless final approach is what shines the most there and we’ll see a couple of really beautiful examples.

Reading parts 1, 2 and 3 of the series is recommended before starting this one.

Aecor and Tagless Final

As a warm-up, let’s revisit how and where Aecor user faces tagless final. It happens at the most important and interesting place — entity behavior. For example:

Aecor rips some immediate and obvious benefits from such definition format: it can alter the effect type depending on the use case.

The ability to run behaviors in ActionT effect, and do it in MTL style (without premature coupling to specific data structure) is a direct consequence of having tagless final behavior definition.

Next, depending on whether your entity has rejections or not, Aecor can extend the effect to EitherT[ActionT[F, Option[S], E, ?], R, ?]. We also learned a handy wrapper Aecor offers for algebras with effect of such shape, which is EitherK.

When behavior is deployed, Aecor runtime handles the ActionT part of the effect. What’s left is a pretty simple effect: F[Either[R, ?]] for entities with rejectable commands and plain F for behaviors, that accept commands unconditionally. These effects are what client faces, when it sends commands to a deployed entity.

So all those effects are mixed and matched within a single behavior algebra definition. Isn’t it cool? Library user has to write zero additional code to support all these kind of execution semantics — it all works out of the box with the single tagless final behavior.

Aecor can use a behavior you defined in many contexts and for many purposes.
It’s really simple, thanks to tagless final format.

But it’s not the only place where it shines. Yes, we used several different effect types, which means that taking tagless final approach has already paid off. But all of these were familiar monad transformer-ish effects, which everyone is kinda used to.

Aecor runtime has a component, that leverages the same tagless final behavior in a much more unconventional way. On the surface this component might look boring, but thanks to clever design it’s actually quite remarkable.

It’s called WireProtocol.

Wire Protocol

We briefly discussed wire protocol in Part 3, but let’s revisit it’s main purpose.

Entities are deployed into a cluster runtime, and it can easily happen that the node handling the request (node A) isn’t the node that runs the entity instance (node B). In this case:

  1. The original node A has to encode the command and send it to the node B to handle.
  2. Node B has to decode the command on arrival and run it through the entity.
  3. Then node B has to encode command execution result and send it back to node A.
  4. Node A has to decode received result and continue handling original request.

As you can see, there’s a lot of encoding and decoding going on — all of that is a responsibility of a wire protocol. By providing an instance of wire protocol for an algebra we get an ability to execute commands over the wire.

Here’s how the typeclass looks like with all dependencies:

It’s a very dense definition that took me quite a lot to digest. So let’s go step by step.

Invocation

Invocation is just a partially applied call to some specific method on behavior M that returns a value of type A. It “partial” in an unusual way though: it has all the arguments applied, but the instance of the algebra itself will be selected later, when we run the invocation.

For example, let’s take place method on our Booking behavior and create a sample invocation:

If we further draw the analogy to actor-based eventsourcing, invocation is nothing more than a command object. There we create a command message by filling in all the required data, and then send it to some entity actor.

Same thing here — we’re creating an Invocation object, but instead of sending it we use run to execute it on our entity behavior instance. Of course Invocation is defined in a much more generic way, so I’m intentionally overspecializing it here to commands an entities to give a better understanding of the intent.

To summarize and make it simple: Invocation is a specific command, that can be executed on some entity later.

PairE

Not a lot to say here actually. PairE is just a wrapper class to have one value in two contexts at the same time. E here means “existential”: the value type is a type member. Later we’ll see why it’s important.

By the way, a context here is not always an effect — it can be a typeclass instance. A simple example would be PairE[List, Ordering]: for some type A it provides a list of values coupled with an Ordering instance for that same type.

scodec

Encoder and Decoder typeclasses here, as well as BitVector data structure are taken from scodec library, as you can see from the imports. Don’t worry if you’re not familiar with this library — it’s designed in way you’d probably expect it to be. In case you used circe, just switch Json to BitVector in your head, and you get scodec typeclasses roughly.

Client wire protocol

Now as we discussed all the components, let’s fight the boss — wire protocol typeclass itself. First of all, for the sake of better understanding, I would prefer to rename the methods of this typeclass the following way:

So protocol works both at client which sends the command, and server that runs the entity instance, handles the command and sends back the result.

IMO, these names are much closer to how things work. Although overall naming is not a huge concern here — it’s mostly internal API when it comes to real world use.

Let’s look at the client, since command execution starts there. Wire protocol client is just a custom interpreter for our behavior algebra with a very unusual “effect”: (BitVector, Decoder[?]). What the hell is happening here?

To answer this question let’s see how client interpreter works in case of our example booking behavior:

So for each command client interpreter returns:

  1. An encoded version of command in the form of BitVector. It can be sent over the wire to the server.
  2. A decoder for command result. Later, when server responds with the encoded command result, client will be able to decode it.

In our simple case, most of the commands return Unit, so result decoder is kinda useless for them. But when rejections come into play, we have to disambiguate success and rejection, so it has it’s valid use even for commands that result in Unit values.

So here you go — another place where it paid off to have behavior in a form of tagless final algebra. But we’re not done yet, let’s try to understand the server part.

Server wire protocol

On the server side of the wire protocol typeclass we have a single decoder, but not a trivial one.

First, it decodes the command into an Invocation. It makes sense — as we discussed, invocation is a command object, that can be executed on a behavior. Client has all the command data, but not the entity instance, so it encodes all the data into an invocation and sends it over the wire. Server has an entity instance, so it now can run the invocation.

Additionally, server attaches a result encoder to each command invocation: after entity responds, the response has to be encoded before sending it back to the client.

The invocation and result encoder are connected using PairE. And this is where it becomes important PairE is existential in the value type. Let’s imagine it had value type parameter instead. Then what type should we put there for the server?

Let’s remember that we have a single server for the whole behavior algebra. It means, that depending on the command, the type of value is gonna vary: Unit for commands like place or confirm and something else for “reading” command like status.

So hiding the value type of PairE as a type member allows server to use different underlying type for each command, while still have a compile-time proof that invocation result on the left of the pair can be encoded with the encoder on the right.

Wire protocol review

Let’s zoom out and see the whole command handling process in types. Say we’re calling status command from node A (the client) and the corresponding entity instance is running on node B (the server). Here’s what happens:

  1. Both nodes have an instance W: WireProtocol[Booking].
  2. Node A calls val (commandBytes, responseDecoder) = W.client.status.
  3. It sends the command commandBytes: BitVector to the shard region and keeps the decoder for later.
  4. Node B receives the command BitVector and runs W.server.decode(bytes). If decoding is successful it gets a pair: PairE[...] of invocation and result encoder.
  5. Node B obtains proper instance of booking entity and runs the invocation: val res: F[BookingStatus] = pair.first.run(entity).
  6. Then it encodes the result and sends it back to node A (by replying to the shard region): res.map(pair.second.encode).flatMap(sendToNodeA)
  7. Node A receives the result and uses responseDecoder to decode it back into BookingStatus. It is then handed back to original caller.

Having our Booking entity in the form of tagless final algebra helped twice here:

  1. We were able to completely reuse it on the client.
  2. It allowed to define Invocation in such a nice generic way while keeping all the types safe.

Wrap up

I hope this post demonstrated how much Aecor wins from using final tagless. It’s also a great reminder of how much power this pattern gives. If you accidentally hear someone telling that TF is just modern fancy replacement for Java interfaces, show them this series and this post in particular.

This concludes my post series. It was a great journey for me, and I hope you liked it as well. And if it convinced someone to try Aecor on a pet project or even at work — I would be immensely happy to hear that. Please, reach me out on twitter or in comments to share your experience or ask questions.

Thanks for reading!
Peace. Love. Referential Transparency.

Aecor — Purely functional event sourcing in Scala. Part 5

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

In parts 1-4 we covered Aecor from A to Z. In this post we’re going to look at Process Manager pattern and how it can be implemented with Aecor.

What is a Process?

Now, that’s an overloaded term we have here. We have even discussed processes in the context of Distributed Processing in the previous post! This time I want to look at business processes.

So what’s a business process? There are many legit ways to answer this question.

A generally known definition of a business process is a sequence of actions that leads to some meaningful valuable result for the business. These actions might have to be performed by different people, machines, business entities or even different companies. They can also be distributed in time and location.

If we get back to our booking system, a business process of booking tickets would look like this:

  1. client places a booking;
  2. internal booking confirmation system either confirms or denies the booking;
  3. when the booking is confirmed, client is allowed to pay for it;
  4. when payment is received, tickets are considered purchased and booking is settled;
  5. if payment is not received until the expiration deadline, booking expires and tickets are released.

It’s quite simplified: a real system would, for example, have to add several notifications along the way. But this process is good enough for the purposes of the post.

A business process can be really complex, but what’s important is that it can be represented as a sequence (or even a directed graph) of steps. Steps can be different in nature:

Process may involve external actions, that might never happen. Receiving payment from client is a good example. In case it doesn’t happen, a process is either stuck, or should continue through another branch (send a reminder to pay, or expire a booking). Such boundaries are often good points to split your process into sub-processes.

Another kind of step is a reaction to a particular event. Such step is often easy to identify — it’s described using a following pattern:

When A happens, then B should be done.

By the way, in eventstorming, such process steps are called policies. In our booking process above there are two steps of this kind:

  1. When a booking is placed, then system has to confirm it.
  2. When expiration deadline is reached and booking is not yet paid, then system has to expire the booking.

Such policies are glue, that wire up different independent entities and services into meaningful continuous workflows. If you have tried IFTTT or similar services — it’s a very good demonstration of why such glue is useful.

Processes in eventsourcing

Let’s narrow down our scope to eventsourcing. What’s cool about it, is that it allows to be really explicit about business processes. Reacting to events is essential in properly defining a process, and by employing eventsourcing we’re making it as natural and simple as it can be.

Let me show you, in my opinion, the best intuition of a process in eventsourcing.

If we put aside external systems, all we have in an eventsourced application are entities. Entities issue events in reaction to commands. A process step is a natural complement to that: it issues commands in reaction to events.

Here’s a possible example from our ticket booking domain. I’m using simplified eventstorming notation here:

Booking payment process wires up Payment and Booking entities into a workflow, that has business value

Booking payment process listens to payment events, and whenever a payment for booking succeeds, it issues a command to Booking entity to settle the corresponding booking.

So now we have a sufficient set of building blocks to develop workflows of arbitrary length and complexity in an eventsourced system. Nothing changes fundamentally if we add external systems to the mix: all the collaboration can be adapted to commands and events.

Process manager

What I just described is one of interpretations of Process Manager pattern. A process manager can have several input events, issue commands to several different entities and even track it’s own internal state to make decisions.

Process manager allows to properly decouple entities. Without a Booking payment process in the example above, Booking entity would have to know about PaymentSucceeded event and handle it on it’s own.

Process manager can also decouple services. Although it is not that simple as with entity boundaries. If we have payment and booking services only, then the one running the process will be coupled to another.

If this coupling is a problem, then extracting the process manager into a separate service can solve it. But it comes at an obvious cost of having another service in your system.

This trade-off at service level is known as Orchestration vs. Choreography. It’s a huge and a fairly known topic, so I won’t go deeper on it today. I’ll just point out that following Aecor-based example implementation is more on choreography side of things.

If you want to find more on Process Manager pattern in general, I can recommend this talk.

Process managers with Aecor

Let’s add a couple of process steps to our Aecor app. Booking confirmation process is going to be the first.

If you accidentally feel there’s not enough context, please, refer to the series github repo any time.

Booking confirmation process

After someone places a booking, our app has to reserve corresponding tickets in the ticket management system. If reservation is successful, then the actual price of the tickets is returned as well.

The process step would simply listen for BookingPlaced event, then try to reserve the tickets and send either a confirm or deny command to Booking entity depending on result.

Let’s define the step logic first. Essentially, process step is just a function from a coproduct of events to F[Unit]. Most of the time you can be explicit about it. Here we have just a single event as an input:

Let’s go through the process logic step by step:

  1. BookingPlaced event is our input. We also require entity key to be able to communicate to the same booking later down the process.
  2. First we try to reserve selected seats for the specified concert. Booking key is used as a unique reservation identifier.
  3. Reservation service returns an Either, so depending on the result, we issue deny or confirm command to the same booking instance. Note how we use the Booking entity gateway we developed in Part 3.
  4. We don’t expect either command to be rejected, so we log an error if it happens.

Just for reference, ticket reservation service contract looks like this:

In real life it would be a separate service, but in our example we’ll just stub it out. You can look up the stub interpreter in the series repo.

So the process step is defined, let’s now subscribe to events and make it alive! It’s usually a good practice to put all the process wiring into separate class. This allows your constructor-based DI to be nicely distributed over a tree of small readable files. Otherwise you can end up with a huge monstrous file where all the DI happens.

So to wire up our process step, we need a source of booking events. It’s provided by eventSource parameter, which is a function. A closer look to the signature of this function reveals that it’s just an abstraction over read-side journal’s eventsByTag interface.

So for each tag we launch a stream of events, collect only BookingPlaced ones and throw them into our process step handler. Once event is processed, we commit the offset to the offset store.

In case you want your process to have at most once delivery semantics, you just flip process and commit: first commit the offset and then process the event.

Such semantics might be useful for something like email notifications.

Notice that we give our process a distinct consumerId which has to be unique (at least within the offset store).

What we create in the end is a list of DistributedProcessing processes, where each process is a tag-based event stream, running through our process handler. Some upper level wiring file will just have to deploy these processes on the cluster and we’ll have a running process manager.

Visit Part 4 if you’d like a refresher on Aecor distributed processing.

Booking expiration process

This one is going to be a little different. Expiration process is going to periodically query booking view to see if there are bookings that should be expired. For each booking found, the process will issue an expire command.

Conceptually it’s the same event => command transformation, but the events are not explicit here: process mines new facts by constantly analyzing outside world.

As with the confirmation process, we start by defining the process step logic.

Separating process logic from infrastructure (like streaming, concurrency and offset management) has a nice benefit. Such isolated piece of logic is much easier to test — it’s just a function.

If you find yourself testing streams or offset management, consider refactoring your process.

So this process is quite simple

  1. It starts with receiving current time parameter.
  2. Booking repository provides a stream of still active bookings that have to be expired by now.
  3. For each such booking process issues expire command to the aggregate.

And that’s it. Here’s the BookingViewRepository contract for your reference. It’s an extended algebra of the booking view from Part 4. You can look up the implementation in the repo.

As you might guess, the only task of the wiring code is to periodically call the process. Fs2 has nice helpers for this:

We use fixedDelay to get an infinite stream of elements with specified time interval between them. Process invocation interval is configurable via frequency parameter. And to get current time we use Clock[F] from cats.

Higher level wiring looks too similar to show it again: just wire it up and deploy to distributed processing.

Why not use scheduler?

An alternative approach to implementing this process would involve scheduling expiration actions instead of constantly polling the view. It’s a viable alternative, but it’s definitely more complex. Running a consistent scheduler in a distributed environment is a hell of a task.

There can be several reasons to use scheduler. One is when you’re running at volumes or frequencies that make polling impractical. Another is when there’s no simple or reliable way to derive particular time-based fact at the process level.

By the way, a heavy load on the view is not that a strong reason — you can always build another view just to support your periodical process.

So, if there’s a sane way to avoid scheduler, I’d usually stick to it. Aecor has a deprecated schedule module, that provides distributed entity-based scheduler. Today using it is discouraged, not only because of complexity, but for design reasons as well.

post.commit

In his great talk “A Decade of DDD, CQRS, Event Sourcing” Greg Young names the lack of Process Managers one of biggest problems in event-driven systems these days.

I had a time in my career when I overlooked them. Having had that experience, today I completely agree with Greg on this point. I think it’s really important to be talking about processes explicitly both in conversations and in code.

Hope this post was a useful one. That’s it for today, thanks for reading!

Aecor — Purely functional event sourcing in Scala. Part 4b

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

We’ve built a streamed view for Booking entity and discussed projections a little bit in Part 4a. One piece is missing though: we haven’t made it work in a cluster yet.

If you haven’t read Part 4a, I encourage you to start there, since this post depends heavily on things covered there.

Today we’ll look at Aecor’s Distributed Processing, which allows to run any kind of computations (including stateful ones, like projections) on top of akka cluster.

After demo example is complete, we’ll discuss some non-trivial practical topics around building projections in an eventsourced system.

Why bother about cluster?

For some it might not be clear straight away, so let’s look at partitioned tagging picture from previous post:

Which node should run each stream?
If a node dies, will other node pick it up?

So we have N streams to run in our cluster, where N is 1 or bigger.
First, we definitely don’t want to have duplicates running at the same time. Having two concurrent streams for, say, tag “Booking-2” within the same view sounds like nightmare: you’ll get something, but definitely not something you want.

Not properly handled concurrency can be fun. But not in production.

So we need exactly one stream per each tag. We could launch them all on a single node, but what if that node experiences an outage? View quickly becomes outdated, which doesn’t sound fault tolerant at all.

It’s actually the same consensus problem we faced before. Cluster has to agree on which streams each node runs. This distributed state requires consensus.

As we already learned, akka-cluster offers a solution. With a small amount of streams to run, you can get away with using simple cluster singleton. It’s the same “all streams on one node”, but with failover.

When shit gets real, you need to distribute your streams over all nodes, and this is a task for akka-sharding. Aecor’s Distributed Processing wraps this solution in a generic purely functional interface.

Distributed Processing

Let’s see how we can deploy our projection on a cluster:

Yep, it’s that simple. We give a common name for our bunch of processes and just do distributedProcessing.start. Name is not completely arbitrary: you can have multiple deployments and each has to be uniquely named — it’s required to properly setup underlying sharding.

Watchful readers will ask: “What is fs2Process?”. It’s just some plumbing to make raw fs2 streams work with a more generic distributed processing. Let’s go over it step by step.

A unit of distributed processing deployment is called a Process. It’s some kind of long-lasting computation, that is expected to be restarted in case of failure.

So a Process here is nothing more than a recipe to launch particular computation. An instance of running computation, which is called RunningProcess, essentially represents a Fiber — primitive, that is heavily used in effect systems of both cats-effect and ZIO.

For those unfamiliar with fibers, RunningProcess provides a shutdown hook and a way to watchTermination — subscribe to the fact that computation completed or failed.

Now we have everything to describe how distributed processing deployment works in plain words. When you hand it a process, Aecor uses process.run to launch the computation. It then watches for process termination, and if it happens, uses process.run to restart the computation again. Shutdown hook is used if a process deployment killswitch is triggered externally.

You can see, that although projection streams are a perfect fit for a processing deployment, the concept of a Process is much broader. You can distribute almost any kind of computation with it, which is nice.

But currently we’d be happy with just a bunch of streams for a single projection. To connect all the dots we need to transform an fs2.Stream into a DistributedProcessing.Process. Cats-effect make it rather simple:

We launch the stream inside a fiber, using a signal to be able to terminate the stream externally. Fiber’s join can be used to watch stream termination.

And that’s it. You just got yourself a partitioned distributed projection.

Things to know

We’re done with the code, so let’s now discuss some related questions I consider important. Hope this section will save someone from a maldesigned system or an unexpected production issue.

View as a sanity check for your entity

Designing entities and aggregate boundaries is nowhere easy. There are many tools you can employ to validate and improve your design — it would take a separate post to cover them.

I just want to mention one, related to the Part 4 topic: views. It wasn’t mentioned explicitly in the post, but deduplication mechanism relied on an atomic update of both the view data and the version. In our case we have it for free, by the virtue of having them sit in the same database row.

If, for some reason, we’d have to update several rows in reaction to a single event, that would become trickier. In particular, we’d have to use some kind of database transaction to pull that off. This hurts scalability and performance.

I consider it a very strong smell if a view for single entity doesn’t fit into one database row. In other words:

If a view of your entity doesn’t fit a single database row (or it does but it feels too wrong), there are really two or more entities behind it.

I made this mistake once and it caused a lot of trouble down the road.

Know your journal well

Although eventsourcing is all about eventual consistency, underlying machinery actually puts much stronger requirements on storage technology. One particular thing, related to projections, is the order of events.

As we discussed in previous post, most projections rely on events to be causally and temporally ordered (within a single entity). This means, that if entity A writes events E1 and E2, read side should not ever see E2 before E1. This happens to be quite challenging to guarantee in real life.

For example, let’s take PostgreSQL as a journal. Using auto-incremented serial column as an offset is a simple and powerful solution — you get strict journal-wise order for free. But does it hold the requirement above?

It happens that when doing concurrent writes with default settings you can read row N+1 before row N, where N and N+1 are values of the offset column. Here you go — projection missed an event.

The problem can be addressed in several ways, but without knowing such quirks you can end up in a rush, looking for solution while your production is down.

Aecor postgres journal we used in this series currently supports only the simplest writing strategy, which is to serialize the writes by locking the journal table.

While not the most efficient approach, it’s cost is heavily amortized by the fact that you can (and probably should) have a separate journal table for each entity.

Be prepared to replay views

It’s well known that eventsourcing systems are especially hard to operate. Whereas any kind of local data corruption can be easily fixed in a CRUD-based system by just updating the data in place, it’s much more complex with eventsourcing.

Even if you can update an event in the journal (which can be tricky on it’s own, if you use a binary storage format like protobuf), your projections would have already processed the old corrupted event, and the fix won’t propagate by itself. To fix your views, you’d need to replay projections from some earlier offset.

One of our projections, complaining about replays.

If a projection is not idempotent, then you can’t just replay from an earlier offset — some events will end up processed twice. You’re up to a real challenge here and most probably the projection will have to be replayed from scratch.

Idempotency is a very handy property, so don’t miss a chance if you can get it.

There are handful of other reasons for a partial or complete projection replay:

  • a bug in projection fold;
  • missed events (hardware glitch or journal bug);
  • a need to migrate view(s) to another database system.
Swiss army knife of eventsourced system developer.

This is inevitable, so better be prepared. Actual scenarios differ depending on your setup and SLA’s your service has to obey. Make sure your team knows what to do when an emergency replay is required.

Congratulations!

1 month, 6 posts and we have finally covered bread and butter of eventsourcing with Aecor. I hope you liked the journey, together we learned how to:

  • define an eventsourced behavior;
  • deploy it into the cluster;
  • build a view for an eventsourced entity;
  • distribute the view projection over the cluster.

It’s enough to go and build a working app! But there’re still one valuable pattern I haven’t discussed, which is Process Managers.

You already know everything to develop a process manager with Aecor, so in Part 5 we’ll quickly go over an example and discuss some theory.

See you there!

Aecor — Purely functional event sourcing in Scala. Part 4a

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

In previous post we finally deployed our eventsourced behavior on a cluster. Now we can issue booking commands, but there’s still nothing that our end users can query.

So today we’re going to build an eventsourced view with the help of Aecor and discuss CQRS in the context of eventsourcing.

I decided to split this post in 2 pieces. Otherwise it would become too huge. In Part 4b we’ll look at real world problems you face when running projections as well as best practices and experience, collected over the years at Evotor.

Projections

I hope you’re familiar with the value proposition of CQRS. Before we start to write code, let’s understand why CQRS is especially valuable in eventsourcing and take a closer look at underlying reasoning and mechanics.

In eventsourcing your write-side is the event log. Since it’s an append-only log, it can potentially provide extremely high throughput for incoming events. On the other hand, it would be awkward and inconvenient to query and fold events from the log every time you need current entity state. So, obviously, journal is an awfully poor fit for the read side backend.

Thanks to single writer we discussed in Part 2, events are strictly ordered (at least within each particular entity). It’s common sense: unless you’re dealing with something special (like CRDTs), you can’t build properly working behaviors without events being ordered.

So we can view log as a bunch of frozen event streams (one per each entity). Given that events are constantly appended to the log, these streams are effectively infinite.

Now we come to the notion of projection, which is nothing more than an arbitrary fold of an entity event stream. An example of such fold is a view: a representation of the entity state which is optimized for querying. This view is going to be the read-side of our CQRS setup.

We shouldn’t limit projections to views though. When I say projection is an arbitrary fold, it’s what I mean. Our log is the source of truth which stores facts happened in our system. Using projections, you can derive secondary facts in any format you want. For example, you can project your events into an ElasticSearch index, while still building a regular view in parallel.

Duality of tables and streams. Image from confluent.io.

Most of the time projections fold into a database table of some sort. This is no coincidence, because streams and tables are closely related. This duality is just too cool to not be mentioned, so I couldn’t miss the opportunity.

I personally love the idea of using events as the source of truth and use streaming to derive any secondary information in any format you want. Confluent, the maintainers of Kafka, have done an amazing job popularizing and explaining this paradigm to the community. I’ll just quickly pay them some respects by recommending their free book, which I learned a lot from.

Log => Stream

To fold an event stream we first have to run it somehow. Usually it’s the journal module that has to provide streaming capabilities to build projections.

Aecor follows this approach. In Part 3 we created a journal for booking events using aecor-postgres-journal:

To get CQRS capability package, just call queries extension on the journal instance, like this:

In return you get a read-side journal API with just a couple of methods:

If you used akka-persistence before, these should look familiar. Indeed, like in akka-persistence, the only difference between the two streams is that eventsByTag always stays live and polls for new events, while currentEventsByTag halts after all currently known events are processed. Since usually projections have to be real-time, eventsByTag is used in most cases.

FS2 is chosen here as a streaming library, which should be of no surprise. eventsByTag stream is constructed using following parameters:

  • pollingInterval, which we specified when constructing the read journal. It controls how often the journal should look for new events, matching the stream criteria;
  • offset, which is just a Long under the hood. It allows to start the stream from an arbitrary position in the log;
  • and tag, which serves stream partitioning purposes.

Let’s talk about tagging, because it’s really interesting.

Tagging

Tagging is simply marking events with additional tag or tags. It allows you to get an alternative slicing of your log. One very common tagging approach is to tag all events of particular entity type with a common tag. For example, mark all BookingEvents with Booking tag.

But why? We don’t need it on the write-side: there we just process each entity separately. Why now we need to join them using tags?

Well, we have to. Processing each entity separately on the read side is not practical. Here’s why.

On the write side, we know exactly when a command happens, so we can wake up the corresponding entity instance, process the command, store the event and then shut the instance down until further commands, effectively releasing resources. It’s highly unlikely that all the entities get commands at the same time, therefore it’s practical.

To stream events on the read-side though, we have to constantly poll the journal for new events, because read-side has no clue of when next event is going to happen. It’s clear that polling the journal separately for each entity instance will just drown the database with requests and nothing will work.

This is where tagging comes to the rescue. You slice the log into bigger chunks in a way that won’t disrupt the event order of any particular entity. After that you can launch a polling stream for each chunk and it won’t crush the whole system.

Single tag per entity

There’s a simple natural way to tag events, such that for each particular entity all it’s events go into single chunk. It’s just the tag’em’all approach I mentioned earlier: “mark all BookingEvents with Booking tag”.

In this case for each projection of booking events there will be a single stream running the fold. To setup such simple tagging configuration, use Tagging.const from Aecor. With bookings we would do it this way:

We create tagging config using a single tag and pass it to the journal. It’s required on the write-side either, because journal has to know how to tag events when it stores them.

Then we just use the tag to launch the read-side stream. Nice and easy.

Single tag partitioning. Simple, but throughput is limited.
Still valuable in cases where multiple entity types share the same journal table.

But throughput of a single stream is limited. What if it’s not enough? In this case partitioned tagging should be chosen.

Partitioned tagging

The idea is to partition the single tag chunk further while keeping each entity within a single sub-partition. And because entities are identified by a unique key, this key is a perfect candidate to partition the log by.

It’s done conceptually the same way Kafka partitions messages by key: partition is determined by taking a hash of the key modulo the number of partitions.

Partitioned tagging at work. A more scalable solution.
As well as single tag, allows to have multiple entity types share the same journal.

Tagging.partitioned does exactly that:

In this example we’ll get 10 partitions, each marked with a distinct tag: Booking-0, Booking-1, …, Booking-9. Here, to project all partitions we’ll need to launch 10 streams (one per partition):

At this moment, we won’t push further and stay happy with just somehow launching the streams. To make it work properly in reality we’d need them to be distributed over the cluster, but this is a topic for a future post.

So let’s stop here and talk about offsets instead.

Consumers and offsets

Let’s for a moment zoom back out and look at the whole projection again. Say we’re projecting a view where each event updates something in the view table. We don’t want to process each event more than once, so projection needs to track it’s progress somehow.

Offset is used for that purpose. In comparison to sequenceNumber that defines order withing a single entity instance, offset is a global (usually time-based) ordering withing a journal. Since no tagging scheme can escape the journal, we can safely say, that for any tag, offset will still provide a total order within this tag.

SequenceNumber is a strictly serial order within specific entity instance.
Offsets provide global order within the whole journal. It’s ok to have gaps in offset history.

So for projection it’s enough to store the offset of the last processed event. After next event is processed, the new offset is committed. When projection is restarted, it can use the stored offset to continue from where it stopped last time.

Again, resemblance to Kafka is very high here:

  • Projection <=> Kafka consumer group
  • Projection stream (for single tag) <=> Kafka consumer
  • Tag <=> Kafka partition
  • Offset <=> Kafka consumer offset

This analogy helped me immensely when I was trying to understand how Aecor works. Hope it helps you as well.

Kafka as a journal?

Since it’s all that similar to Kafka… why not use it as a journal then?

It’s a good question and a short answer is “It’s tricky”. There’s some discussion on the topic going on in Aecor’s issue tracker.

There’s also a project of my former colleagues from Evolution Gaming. It is a combo journal technology, that uses both Kafka and Cassandra.

Committable

Let’s get back to projections mechanics. So far we have:

  • a partitioned stream of events (each event has an offset attached);
  • some storage for projection offsets, let’s call it an offset store;
  • projection folding logic itself, e.g. populating a view table.

There’s a small design issue though. Seems like that every projection has to know how to commit the latest offset to the offset store. This is required, because only projection knows when an event can be considered processed.

On the other hand, it seems like too much for projection to know about offset stores. Also, it would require a lot of repetition to interact with offset store in every projection. Conceptually, projection just needs a way to tell the journal that it’s done processing a particular event.

Aecor journal provides you with such abstraction. For this to work, you need to define and supply an offset store to the journal first:

As you can see, a KeyValueStore[F, TagConsumer, Offset] has enough power to be an offset store. TagConsumer here is just a tag paired with a consumerId.

This upgrades bookingQueries to produce streams that allow consumer to commit offsets. Here’s how eventsByTag would look now:

Now, every event is wrapped into Committable, which is a simple container that attaches a commit hook to the payload:

For journal with an offset store attached the commit action would commit the corresponding offset when executed. Projection now doesn’t even know any offsets are involved: it can just rely on commit to save projection progress, whatever it takes. It’s quite beautiful and we’ll see it in action soon.

View

To make a complete demonstration, let’s build a view projection for our booking entity. In this case we need a very simple view, that will allow users to see their own bookings.

Let’s start with a storage (or repository) algebra:

get and set will be used by the projection to construct the view, and both get and byClient will serve data to clients.

We can write an interpreter for this repo using doobie and a PostgreSQL table. It’s mostly boilerplate, so I won’t put it here, you can look it up on github.

Now let’s define projection logic: how it’s going to fold events into the view. It’s mostly straightforward, but there are couple of tricks there, so let’s not skip it entirely.

So projection can:

  • get current state of the view that corresponds to currently processed event. It’s allows to properly do the fold and also to deduplicate events;
  • run the fold: produce a new version of the view according to happened event;
  • and update the view.

View version is nothing more than a counter of processed events. If an event comes in and event.sequenceNr <= version, then this event was already processed before and projection can skip it.

This deduplication is required, because offset commit is asynchronous and there’s a possibility for the projection to fail after an event was processed, but before the offset was committed.

Of course, if your projection is idempotent, there’s no need in deduplication. We’re looking at a more general case.

Implementation is rather simple and I won’t put it here as well. Curious readers can find it on github as well.

The whole setup in state, demonstrating
why deduplication is required for a non-idempotent projection.
(click for a larger pic)

So we have another component for our read-side setup. Let’s now plug it into the event stream.

Projection Flow

Let’s now look how events will flow through the projection. We’ll design this flow as a simple fs2.Sink. Implementation is a little verbose, but rather simple:

Let’s start from the returned value. It’s has type fs2.Sink[F, Committable[F, Event]], which is an alias for fs2.Stream[F, Committable[F, Event] => fs2.Stream[F, Unit]. So basically we do something with every event in the stream, and throw away the results.

Every event goes through two stages:

  1. Execute the fold, which in turn has following steps:
    1. Fetch current version and state of the view.
    2. Ignore events that were already processed (by comparing seqNr and version.
    3. Fold the event, and if it affected the view then save the new state to the repo.
  2. Commit the offset.

Everything works as we previously discussed. An interesting detail is the traverse in the first evalMap. It’s called on Committable envelope and works the same way as it does for Option or List. We just run the projection “inside” the committable wrapper and then pull the effect out, so that in the end we get an F[Committable[F, A]] instead of Committable[F, F[A]].

As many other eventsourcing concepts, projections map really nicely into real world.
Game score is a projection of match events.
Account balance is a projection of all successful transactions.
Code is a projection of all the keystrokes team members have made. And so on.

Now we have all the LEGO bricks we need to launch our view. Let’s just compose them together.

Wiring the view

Let’s wire everything together:

Translating into English: for each tag partition (“Booking-1”, “Booking-2”, etc.) we launch an event stream with a fixed consumer id, that uniquely identifies this projection.

Each stream is then directed into our projection sink, which on every event updates the view through the repo and commits offset to the offset store.

What’s left is to actually launch these streams at the “end of the world” and distribute them over the cluster. As I said, we’ll talk about this in the future post. If you’re eager to check it out now, github repo is always there waiting.

See you soon!

So we got ourselves a really nice view!

A lot is hidden from sight though. We’ll discuss underlying assumptions, design choices, various problems and best practices related to projections in Part 4b. And, as I promised, we’ll distribute projection streams over the cluster.

Hope you had a great time reading and see you in the next post! 🙂