Connecting...

Pexels Photo 160107

Scalable read model updates in Akka Persistence by Andrzej Ludwikowski

Pexels Photo 160107

How do you effectively update the read model with Akka Persistence? In this post by Software Journeyman, Andrzej Ludwikowski looks at how choosing the right strategy highly depends on your domain and underlying database for storing events and why the solution should be scalable for each read model independently.

 

'Akka Persistence is a pretty nice Event Sourcing implementation. There is no point in describing all its functionalities, because (almost) everything is in the documentation. However, one thing is missing. When you are choosing Event Sourcing architecture, the natural indication is to also go with CQRS approach. You don’t want to put an additional work to your persistence actors with ReadQueries, this should be done as a separate access path. Of course with possible eventual consistency as a price. So the big question is: how to effectively update the read model? Let’s start with some assumptions:

  1. read model should be updated based on persisted events,
  2. no event can be lost,
  3. events order must be preserved.
The first two points are quite obvious and if you don’t care about events order, you don’t need Event Sourcing at all. Choosing the right strategy highly depends on your domain (how many persistent actors you have, how many events you are producing, etc.) and underlying database for storing events. At the time of writing this post, my weapon of choice for storing events (and snapshots) is Apache Cassandra — a highly scalable, distributed database. Existing Cassandra plugin has proven many times that it is stable and production ready. There are some rumours about Scylla, as an even more effective storage, but this is still an R&D topic.

Usually, you have a lot of different read models. Some of them are more important than the other. That’s why the solution should be scalable for each read model independently.

 

Akka Persistent Query

The first approach is very straightforward. We can use built-in solution from Akka stack, which is Persistence Query. The idea is as follows:

  1. Connect to event journal and provide events as a stream.
  2. Update read model.
  3. Save processed event’s sequence number.
1 val eventJournal  = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
2
3 eventJournal
4   .eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue)
5   .via(updateSingleReadModel)
6   .mapAsync(1)(saveSequnceNr)

Saving sequence number is necessary for the recovery phase so that you will not start processing events from the beginning.

Easy and elegant solution, right? Unfortunately not entirely reactive one. The Source under the hood is pooling Cassandra in intervals, 3 seconds by default. At first, this could be fine, but let’s say you have 10000 Persistent Actors and on production, which is actually a quite small number, but big enough to kill your application. For each Persistent Actor you will need to launch a stream, and believe me 10000 streams is not the best idea. Actually, if you want to update read models independently you should multiply the number of Persistent Actors times number of read model.

Instead of eventsByPersistenceId you should tag your events and use eventsByTag query. In most cases this is fine, but you could face the problem of events distribution. Let’s say that majority of events is generated by 1% of your Persistent Actors. This could lead to lags in the processing of the other 99% Persistent Actor events, because all events will be in the same single source. The solution for this could be some form of sharded tagging, like in Lagom implementation.

Unfortunately, no trick will fix the main problem with Persistent Queries, which is polling data. For some cases, 3 seconds lag is not a problem in other ones even 0,5s might not be acceptable. Too small interval will also create an unnecessary load for the underlying database. It’s time to analyse other options.

 

CDC

How about streaming data directly from the database? Cassandra (like most DBs) supports a CDC mechanism. In theory, we could easily connect to the changes log and use it for updating read models. Great, but there is one gotcha here. Cassandra is a distributed database, so each DB node will have a separate CDC log file and managing these logs to ensure events order will be a nightmare. At this point, CDC should be “considered harmful”.

 

Kafka as a store

If efficient reading from Cassandra is such a big problem, maybe we could use message queue like Kafka as an event store? Reading stream of events from Kafka will be (by design) extremely effective. Each read model will be updated by a different Kafka consumer, which could be a separate process, completely autonomous, isolated and independently scalable. The whole concept is described in details in one of our previous blog post. For some applications this approach could work smoothly. However, in some cases Kafka (or actually any message queue) as a database can bring a lot of additional problems:

  1. Snapshot management.
  2. Retention management (retention should be probably disabled).
  3. Kafka partitions — to keep order, all events from an aggregate must fit into a single partition, which must fit on a single node. In some heavy load cases this could be a blocker or a challenge to work around.
  4. Not supported by Akka Persistence.
 

Cassandra, Kafka and at least once delivery

How about combining two concepts together? Cassandra for storing events (source of truth) and Kafka as an additional layer for read model processors.

In theory, this is perfect. The only problem is how to effectively feed Kafka with events. Persistent Queries like eventsByPersistenceId or eventsByTag(described above) instead of updating read model could be used for sending events to Kafka, but the lag and distribution problems will still remain.

Another approach might be sending events to Kafka right after update state phase. The algorithm for Persistence Actor is simple:

  1. receive command
  2. save event(s)
  3. update state
  4. send event(s) to Kafka
Let’s take a look at possible problems. To secure events order, sending to Kafka must block the whole actor, which is of course a bad idea, since it will degrade Persistent Actor performance. We could delegate Kafka producer to a separate child actor (let’s call it KafkaSender). Awesome, but then we must ensure messages delivery between actors. This could be done by using AtLeastOnceDelivery trait. You probably sense that nothing is for free. Exactly, more messages are circulating in our cluster and we loose the order:
At-least-once delivery implies that original message send order is not always retained.
Before you start thinking about some buffering mechanism for events in KafkaSender actor to ensure events order, please stop! At least once delivery can be done in a different, more optimistic way. You can send events to KafkaSender without delivery confirmation, but you will need to monitor event sequence number. In case of any sequence number discrepancy an additional action is necessary:
  1. in case of a lower sequence number than the current one — event was already processed, can be skipped
  2. in case of gap higher than 1, stashing messages and launching eventsByPersistenceId to fill the gap, and unstashing pending messages
Of course, current sequence number should be persisted and restored after KafkaSender failure or Persistent Actor restart. Sounds complicated? True, but no one said that high scalability is cheap.
 

Conclusion

Before you jump to the last solution which is the most awesome, effective, scalable but also the hardest one to implement correctly, answer yourself if you really need such level of scalability (and inevitably complexity). I saw event sourcing implementation with transactional events persist and read model updates. As long as you meet expected latency and throughput, such approach is perfectly fine. However, if you need to squeeze Event Sourcing architecture to the very last millisecond, I hope that this article will help you to find the optimal solution for your use case.'

 

This article was written by Andrzej Ludwikowski and posted originally on SoftwareMill Blog.