How do you effectively update the read model with Akka Persistence? In this post by Software Journeyman, Andrzej Ludwikowski looks at how choosing the right strategy highly depends on your domain and underlying database for storing events and why the solution should be scalable for each read model independently.
'Akka Persistence is a pretty nice Event Sourcing implementation. There is no point in describing all its functionalities, because (almost) everything is in the documentation. However, one thing is missing. When you are choosing Event Sourcing architecture, the natural indication is to also go with CQRS approach. You don’t want to put an additional work to your persistence actors with ReadQueries, this should be done as a separate access path. Of course with possible eventual consistency as a price. So the big question is: how to effectively update the read model? Let’s start with some assumptions:
- read model should be updated based on persisted events,
- no event can be lost,
- events order must be preserved.
Usually, you have a lot of different read models. Some of them are more important than the other. That’s why the solution should be scalable for each read model independently.
Akka Persistent Query
The first approach is very straightforward. We can use built-in solution from Akka stack, which is Persistence Query. The idea is as follows:
- Connect to event journal and provide events as a stream.
- Update read model.
- Save processed event’s sequence number.
1 val eventJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 2 3 eventJournal 4 .eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue) 5 .via(updateSingleReadModel) 6 .mapAsync(1)(saveSequnceNr)
Saving sequence number is necessary for the recovery phase so that you will not start processing events from the beginning.
Easy and elegant solution, right? Unfortunately not entirely reactive one. The Source under the hood is pooling Cassandra in intervals, 3 seconds by default. At first, this could be fine, but let’s say you have 10000 Persistent Actors and on production, which is actually a quite small number, but big enough to kill your application. For each Persistent Actor you will need to launch a stream, and believe me 10000 streams is not the best idea. Actually, if you want to update read models independently you should multiply the number of Persistent Actors times number of read model.
Instead of eventsByPersistenceId you should tag your events and use eventsByTag query. In most cases this is fine, but you could face the problem of events distribution. Let’s say that majority of events is generated by 1% of your Persistent Actors. This could lead to lags in the processing of the other 99% Persistent Actor events, because all events will be in the same single source. The solution for this could be some form of sharded tagging, like in Lagom implementation.
Unfortunately, no trick will fix the main problem with Persistent Queries, which is polling data. For some cases, 3 seconds lag is not a problem in other ones even 0,5s might not be acceptable. Too small interval will also create an unnecessary load for the underlying database. It’s time to analyse other options.
How about streaming data directly from the database? Cassandra (like most DBs) supports a CDC mechanism. In theory, we could easily connect to the changes log and use it for updating read models. Great, but there is one gotcha here. Cassandra is a distributed database, so each DB node will have a separate CDC log file and managing these logs to ensure events order will be a nightmare. At this point, CDC should be “considered harmful”.
Kafka as a store
If efficient reading from Cassandra is such a big problem, maybe we could use message queue like Kafka as an event store? Reading stream of events from Kafka will be (by design) extremely effective. Each read model will be updated by a different Kafka consumer, which could be a separate process, completely autonomous, isolated and independently scalable. The whole concept is described in details in one of our previous blog post. For some applications this approach could work smoothly. However, in some cases Kafka (or actually any message queue) as a database can bring a lot of additional problems:
- Snapshot management.
- Retention management (retention should be probably disabled).
- Kafka partitions — to keep order, all events from an aggregate must fit into a single partition, which must fit on a single node. In some heavy load cases this could be a blocker or a challenge to work around.
- Not supported by Akka Persistence.
Cassandra, Kafka and at least once delivery
How about combining two concepts together? Cassandra for storing events (source of truth) and Kafka as an additional layer for read model processors.
In theory, this is perfect. The only problem is how to effectively feed Kafka with events. Persistent Queries like eventsByPersistenceId or eventsByTag(described above) instead of updating read model could be used for sending events to Kafka, but the lag and distribution problems will still remain.
Another approach might be sending events to Kafka right after update state phase. The algorithm for Persistence Actor is simple:
- receive command
- save event(s)
- update state
- send event(s) to Kafka
- in case of a lower sequence number than the current one — event was already processed, can be skipped
- in case of gap higher than 1, stashing messages and launching eventsByPersistenceId to fill the gap, and unstashing pending messages
Before you jump to the last solution which is the most awesome, effective, scalable but also the hardest one to implement correctly, answer yourself if you really need such level of scalability (and inevitably complexity). I saw event sourcing implementation with transactional events persist and read model updates. As long as you meet expected latency and throughput, such approach is perfectly fine. However, if you need to squeeze Event Sourcing architecture to the very last millisecond, I hope that this article will help you to find the optimal solution for your use case.'