Did you catch part 1 of the Akka Streams pitfalls to avoid?
Developers Jakub Dziworski and Andrzej Ludwikowski, have joined forces to give us the further mistakes we should avoid making. There are lots to learn so let's get started!
'Let’s continue the journey across Akka Stream pitfalls started in the first part.
6. withAttributes order
In the example above, 'resumingDecider' will be applied only to the first 'map' stage. This causes exception to actually stop the stream instead of resuming it. Be careful with the 'withAttributes' methods. Usually, you want to use them at the end of stream definition, unless you are changing async boundaries.
7. Finding which stage causes issues
Let’s try to implement a simple blockchain using Akka Streams. There is an infinite stream of transactions which is the input of the system (in real world this could be Apache Kafka). Your job is to group incoming transactions into blocks, hash them and save to some storage.
If you run this stream — everything looks great, but all of a sudden it stops processing elements:
saved block with hash:0 saved block with hash:1201059031 saved block with hash:1835264323 saved block with hash:-1214204587 saved block with hash:-597626070
How do you debug such problem? A good start would be to add log (or breakpoint) after the stage that is expected to complete. That’s the approach we’ve been doing for decades.
We get some logs but after a while, there are no more entries:
Problem found — we do not receive transactions. Now, you spend the rest of your day debugging Kafka or whatever is the input to your system. This approach is wrong when it comes to Akka Streams. The enemy of such debugging approach is the backpressure mechanism. Stages in Akka Streams are not emitting elements if the downstream (stages below them) are for some reason busy. If the stage does not receive demand it will not emit element. This way no buffering is required, which significantly reduces the resources usage. There could be two reasons why we do not receive new transactions:
[got transaction] Element: mark sent 7551 BTC to andrew [got transaction] Element: mark sent 1691 BTC to andrew [got transaction] Element: mark sent 8233 BTC to mark [got transaction] Element: steve sent 4616 BTC to mark [got transaction] Element: mark sent 6529 BTC to mark
- there is indeed something wrong with the service pushing transactions to our app,
- there is no demand from downstream.
The second reason is definitely faster to check. Let’s do that by adding logs between all stages:
The last log entry is 'hashed block' element. Since saving hash is the last stage, we know there is constant demand for elements generated by this stage. Technically the last stage is 'Sink.ignore' but it never backpressures. Therefore, we know that the last stage is responsible for not emitting elements at the beginning of the stream. As it turns out the problem is with database becoming randomly unavailable:
[got transaction] Element: steve sent 7030 BTC to john [got transaction] Element: steve sent 5580 BTC to john [got transaction] Element: mark sent 9178 BTC to andrew [got transaction] Element: john sent 8189 BTC to steve [got transaction] Element: steve sent 5600 BTC to mark [grouped transactions] Element: Vector(steve sent 934 BTC to mark, …)) [hashed block] Element: -546445259
Conclusion — do not focus completely on the stage you think is problematic. Always have the backpressure in mind and treat the stream as a whole. Start debugging issues similar to the example above, by focusing on first and last stages and then shrink the suspicions circle.
Additional tip — use log stage almost everywhere, it’s using debug level by default. If you encounter some issue on production you can always temporarily switch to debug via JMX. If you can’t use JMX you can always set debug level with stream attributes. This way you can locate exactly which part of the stream is the root cause of the problem.
elem:1 map elem:1 to elem:1 alsoTo
The truth is that the message is passed to 'alsoTo' sink and then processing continues immediately. Why is it important? Imagine that you are processing temperature readings from multiple rooms. There is a requirement to store rooms and readings in a relational database. In order to save temperature reading, a room has to be present in the database (to satisfy foreign key).
The implementation above will cause a race condition which will make 'saveReading' fail occasionally with:
Exception: java.lang.RuntimeException: No room for id 7 found!
Conclusion — if you use 'alsoTo', make sure that downstream does not depend on sink results.
9. Failures in non-linear graphs
If you want to simply ignore exception and continue the stream processing you will probably use 'Supervision.resumingDecider':
Running the above code will print:
1 2 4 5
The third element 'a' was just skipped and the stream resumed processing elements. Let’s examine a more complex non-linear graph:
The expected output is:
(1,1) (2,2) (3,3) (4,4) (6,6) //(5,5) should be skipped (7,7) (8,8) (9,9)
In fact, this stream will just process 4 elements and then we can observe a deadlock:
(1,1) (2,2) (3,3) (4,4)
After (4,4) is emitted, the stream does not emit more elements nor completes. The reason for that is attributes inheritance. After we add 'Supervision' attribute to the flow, we expect to apply resuming strategy only to the top level 'nonLinearFlow'. This way when 'dangerousFlow' receives element '5' and fails, it should bubble up failures until it reaches 'nonLinearFlow' decider, which will then resume the whole flow by demanding next element (6).
The reality is different, because of the attributes inheritance — all of the stages within the graph are set to resume in case of failure (including 'dangerousFlow'). The consequences are as follows:
- 'broadcast' emits element “5” to DangerousFlow and SafeFlow.
- 'safeFlow' emits element “5” to Zip.
- 'dangerousFlow' fails and does not emit element to 'Zip'. It then resumes demanding next element from 'broadcast'. However, for 'broadcast' to emit element the demand must be signaled from all outputs.
- 'Zip' receives only one element (from 'safeFlow') and waits forever for the second element. 'Zip' emits only when both inputs have value.
There is not much you can do about it, other than wrapping dangerous parts between broadcast and 'Zip' in 'Try' monad:
Afterwards you can filter out failures with stages like collect:
Conclusion — pay attention when applying custom recovery strategy in non-linear graphs.
10. Materialization and flatmapConcat
Suppose you want to send messages to Kafka in the middle of the flow:
The connection is established (in 'Producer.flow'), messages are sent, everything works as expected.
Code is constantly evolving, so after some refactoring, you could end up using the 'flatMapConcat' stage due to some complex processing requirements.
Should work pretty much the same, right? At first glance it does, but if you look closely at the logs, you can see that the connection to Kafka is continually dropped and reestablished:
Shutdown of Kafka producer I/O thread has completed. Kafka producer with client id producer-8 created
In this case, the source will connect to Kafka on every message. Why? It is easy to overlook that the parameter passed to 'flatMapConcat' is a function returning a new 'Source' (for each incoming element). This source must be materialized at some point, causing a new connection to be established. Luckily Akka Streams Kafka provides a method which allows to create a flow with already created producer, thus reusing the same connection.
Conclusion — watch out for streams that could be costly to materialize (most likely related to IO) in stages like 'flatMapConcat' or 'flatMapMerge'. Always look for a way to share an expensive object between materializations.
These were 10 Akka Streams pitfalls we have recently stumbled upon. There are definitely more, so if you encountered any interesting case, please leave a comment. We’d love to hear from you.'
This article was written by Jakub Dziworski and Andrzej Ludwikowski and originally posted on SoftwareMill Blog.