Aecor — Purely functional event sourcing in Scala. Part 3

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

In previous posts we learned everything about eventsourced behaviors, constructed and refined one for the Booking entity. Now it’s finally time to see it in action. In this post we’ll:

  • see how you can launch your behavior with Aecor;
  • get a quick overview of how it works on the surface;
  • discuss some theoretical topics, related to running eventsourced entities in a distributed system.

Overview

Let’s start with a high-level picture.

Aecor has a notion of a Runtime, which is just an infrastructure platform, where behaviors can be deployed. For example, you would need to store the events somewhere — this is one of the responsibilities of an Aecor runtime.

As you have seen in previous posts, Aecor behaviors are completely unaware of any infrastructure concerns. This is pretty much inline with famous onion architecture: behaviors belong to the domain layer and runtime is out there in the infrastructure layer.

I think it’s pretty valuable, because you don’t always have such clean separation of layers. If you ever tried to do eventsourcing with naked akka-persistence, then you know what I mean.

When you deploy an entity onto a runtime, it looks like this:

This diagram should help you visualize things as we go through the post. 

At the moment of writing, Aecor provides an akka-cluster based runtime, which comes in two flavors:

  • Akka Persistence Runtime, which is an older one and uses akka-persistence under the hood.
  • Generic Akka Runtime, which is more flexible and depends on akka-cluster only.

While it can be easier to start with Akka Persistence runtime (especially if you have done akka-persistence before), I would recommend looking at Generic runtime first. It has some nice advantages, that we’ll talk about later.

Now let’s break the runtimes down into essential components. There are three of them: the journal, consensus and wire protocol.

The Log

Eventsourcing can’t exist without a log, so the first thing you’ll need is the place to store events — the journal.

Event logs existed long before computers were invented

There are a lot of options for underlying storage technology: it can be a specialized solution (e.g. EventStore), or you can pick some general purpose database, like PostgreSQL or Cassandra. The choice depends on performance and operational requirements for your system. What’s important is that semantically journal has to behave as an append-only log.

Journal will need some kind of serialization protocol for your events. It’s a very important topic in eventsourcing, because events, being immutable, require schema evolution to be taken care of. And with akka, serialization is especially easy to get wrong.

Comprehensive guide to Akka serialization

So storage and serialization. If you go with akka persistence runtime, then there’s a variety of plugins for different storage backends. You won’t need anything special to make it work with Aecor — just configure the plugin inline with it’s documentation. Aecor will pick it all up the same way a raw akka-persistence app would do.

On the other hand, generic akka runtime doesn’t know anything about storage or eventsourcing. It’s actually orthogonal: you can partition any kind of tagless final behavior over an akka-cluster using this runtime. 

To deploy eventsourced behaviors on a generic runtime you’ll still need a journal, and Aecor has something to offer here. We’ll see later how you can wire it all up.

Single Writer

I go into some theory here, so feel free to skip this section if you’re familiar with this concept.

Next thing we’d like to have from our runtime is single writer guarantee. In simple words it means, that at any point in time in the whole cluster there is maximum one instance of each specific entity, that can process commands and write events to the log. There should be no concurrent command processing and no concurrent writes to the log for any single entity instance.

For example, in such a runtime if we issue (possibly in parallel) some commands for booking with id “42”, such runtime guarantees that all commands will be handled by a single node, as long as it’s alive. Also, “booking 42” command handler on this node has to process commands strictly one by one, with no concurrency involved.

It’s a very very strong guarantee, which is called serializability of changes (or serializable isolation in database world). It’s extremely hard to provide it globally, but here it’s very local and thus reasonable — we require serializability only within a single entity instance. Such small area of consistency is often called a consistency boundary.

This is why it is so important to think about partitioning when you design your entities. If you want your system to scale, what should grow is the amount of consistency boundaries, not their size.

To get a single writer in a distributed system you need consensus: machines in the cluster have to agree on who is going to be the writer for each entity instance. And this is where akka-cluster comes into picture. It uses a gossip-based consensus protocol to track node membership, which in turn allows to distribute (or shard) entity processing over the cluster.

A lot of new research is going on around consensus protocols these days, thanks to crypto-currency and blockchain frenzy.

After selecting a single node to run the handler, the “no concurrency” property is further ensured by wrapping the handler into a single actor. Actor mailbox guarantees that messages are processed one by one.

Just to clarify, actors are not the only way to do this — you can use an in-memory message queue to do the same thing in a more functional and typesafe manner. And Aecor uses akka not because of actors: it’s the Scala-native consensus which makes akka-cluster invaluable here.


Let’s wrap this section up. There’s a single important reason for Aecor to use akka-cluster under the hood: it allows to retain strong consistency for every entity instance even in a distributed deployment.

Both production-ready runtimes of Aecor rely heavily on akka-cluster. But luckily, except for cluster configuration, it’s completely opaque. Aecor wraps it up nicely in purely functional, typesafe API.

Wire protocol

Last requirement for a distributed eventsourcing runtime is a messaging protocol. The following scenario will illustrate why it’s needed.

Let’s say node A receives a command for booking “42”. Latest cluster gossip say, that booking “42” handler should be executed on another node B. Now node A should transmit the command to node B, where it can be handled. When B is done, it has to hand the result back to A, so that it can fulfill the original request.

So nodes will inevitably have to communicate commands and responses between each other. All of this messaging is nicely handled by akka-sharding, but you’ll need to teach it how to properly encode and decode these calls over the wire.

Aecor doesn’t enforce any specific protocol format, so you can use any format as long as it’s consistent. To do it you just define an instance of WireProtocol for your algebra, where WireProtocol is defined by Aecor as:

Conceptually it’s clear that it’s an encoder/decoder pair. Encoder and Decoder are from scodec library, which is a nice tool to work with binary data in Scala.

Other details are probably not transparent, but we’ll leave them for a later post. We don’t need them right now, because Aecor provides a really nice default wire protocol, which should be enough most of the time.

To get it for your algebra, just mark it with @boopickleWireProtocol macro annotation:

It will generate a WireProtocol instance, using boopickle serialization library. Since these messages are not persisted, serialization can focus on speed and message size. So boopickle is a sane choice here.

If your algebra accepts or returns something special, that boopickle can’t derive a serializer for, macro will fail with an error and you’ll have to manually provide a pickler for that type.

Also, since target algebra that we’re going to deploy is an EitherK, we’ll need to define an scodec.Codec for our rejection type. Given a WireProtocol[M] for algebra M and an scodec.Codec[R] for rejection R you get a WireProtocol[EitherK[M, R, ?[_]] for free.

The following are all manual definitions we need to do for wire protocol to work in our example app:

To see a complete setup, please refer to the github repo.

Deploying to a generic akka runtime

So we went over the components of Aecor runtime. Let’s now finally deploy something.

We’ll start with a more recommended approach, which is to use Generic Akka Runtime. We’ve done some preparation: defined a WireProtocol and configured an akka-cluster. Since we’re not using akka persistence, we’ll need to setup a journal ourselves.

Entity key

But first, it’s finally time to define booking identity. We discussed this a little bit in previous posts: behavior itself doesn’t usually need it’s own identifier to run domain logic. But runtime is a totally different story, so let’s define a trivial BookingKey:

Entity key doesn’t have to be a value class and can easily be composite. But for us a String will work here.

Both the journal, we’re going to define, and Aecor runtime would have to know, how to encode and decode the key. That is required for both storage and sharding to work properly. You’ll just need to define a pair of codecs:

It’s very similar to how you define codecs for Map keys in circe. In this case the codecs are trivial, but for composite keys Aecor provides a small Composer utility (usage example).

Aecor journal

Journals are a nice extension point for Aecor: there’s a simple EventJournal interface, that you can implement to define and plug in your own journal. We’ll use aecor-postgres-journal, which works on top of doobie:

We pass in:

  • a doobie transactor, configured with a PostgreSQL backend;
  • a table name;
  • and a serializer, which in my example uses protobuf. You can check it out here, but it’s just boilerplate, that can be automated with a macro.

There is also tagging parameter, which is configuring the read side of our entity, so we’ll discuss it in Part 4,  dedicated to CQRS.

There’s one nice advantage of Aecor journals over akka persistence. While most akka persistence plugins store all events of all entities into a single table, Aecor entities have completely independent journals.

You can have several entities with journals storing data in different tables, database instances or even different database engines — all deployed to a single Aecor runtime.

Configuring such a setup with akka-persistence plugins would be quite a challenge.

Deployment

Now we have everything to deploy the entity, let’s see how it’s done:

Ok, what is going on here?

First we define a handle for our runtime, which is kinda trivial. But next line is interesting. Using Eventsourced helper we join together the behavior and the journal. What we get back is a createBehavior function.

It can be viewed as a constructor for an entity behavior: you give it a key, and in return it gives you a behavior handle for corresponding entity instance. You can then use the handler to run commands against this particular instance.

You can also specify a snapshotting policy if you want the runtime to periodically save snapshots of entity state to speedup recovery.

There’s some machinery involved in the Eventsourced smart construtor, but I will save the details for a later post as well.

Now that we have a behavior constructor, what’s left is to distribute it over a cluster. Last statement does exactly that. You can tune sharding-related parameters (e.g. number of shards or passivation timeout) with settings argument.

In return we get a suspended behavior gateway. While the signature is similar to the behavior constructor we’ve seen before, the gateway is much smarter. With a gateway on hand, you can send commands to any node of the cluster, and the runtime will route those commands to different node if it has to. Of course, responses will be routed back to the original node you called the handler from.

The gateway function is suspended, so you just execute that F effect somewhere “at the end of the world” where the application starts. And that’s it — you have an entry point to a fully operational, distributed eventsourced behavior!

It’s a good time to play around with it. Demo repository defines http endpoints to do just that.

Deploying to an Akka persistence runtime

Deployment to akka persistence runtime is almost identical to the generic solution above, just a little simpler — it doesn’t use Eventsourced to run the actions, relying on akka-persistence machinery instead.

Due to similarity and less relevance, comparing to generic runtime, I won’t spend too much time here. You can refer to an exampe wiring in the series repo if you want to. Prior to generic runtime, Aecor was working on akka-persistence-cassandra plugin version 0.59. For simplicity the same version is used in the example. You can pick any other plugin at almost no additional cost.

Getting rid of EitherK

Last thing I would like to show today is a small generalization you can do to your deployment.

As you have seen already, entity gateway we obtain from Aecor runtime returns an EitherK. Let’s quickly refresh what it is — it’s just an algebra transformer, that wraps all the behavior actions into EitherT.

While perfectly usable, it feels too heavy. Since many parts of our system are going to access booking entity, it doesn’t feel right to make them all work with so many transformers.

To make it better we’ll just need to unwrap EitherK, and then unwrap the EitherT in the algebra results. There’s a small conversion function in Aecor, that does exactly that, it’s called Eventsourced.Entities.fromEitherK:

After this transformation the raw type requires a type-lambda to be properly written. So Aecor has an Entities.Rejectable alias, that makes it all look nicer.

And in the end it’s reasonable to define another short alias for the whole entity gateway type. Since it grants access to all instances of this entity, it makes sense to name it in plural (see Bookings[F[_]] alias in the snippet above).

What’s next?

We successfully deployed an eventsourced Booking entity to our cluster. Now, in the spirit of CQRS, we need to project booking events into a view that is optimized for doing simple and efficient queries.

Will do that in Part 4, which is coming next. See you there and thanks for reading!