Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Akka Streams pitfalls to avoid — part 1 by Jakub Dziworski

W1siziisijiwmtkvmdqvmtivmtmvntavmdevody0l2jsywnrlwfuzc13agl0zs1ibgfjay1hbmqtd2hpdgutc2lnbi0xntg1nzexlmpwzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

Have you come across any Akka Streams pitfalls which you wish you had been able to avoid?

SoftwareMill Developer, Jakub Dziworski has done us all a favour and helped us learn from his mistakes. Find out exactly what to avoid!

 

'The Akka Streams library is proven to be very useful in implementing systems focused on data processing. However, if you use it without prior knowledge or insufficient care you may encounter some really weird behavior.

At SoftwareMill we’ve been using Akka Streams in many projects. During this time we’ve come across multiple quirks related to this library. After you gain more experience, most of them make sense, but are not intuitive at first glance. Especially if you are trying to solve the problem with a procedural mindset.

This post is a collection of common Akka Streams pitfalls with explanations how to avoid them. We found ourselves caught in these traps many times while learning the library. We hope you can learn from our mistakes.

 

1. Swallowed exceptions

The default behavior for most stages when the exception is thrown is to… swallow it.

In such case the stream will just be completed without any logs. To actually log exceptions there are 2 options:

  • log or rethrow exception in the 'recover' stage. This way all exceptions from upstream will be caught and logged:
  • define custom supervision strategy and use it in stream attributes or in materializer settings:
1 val loggingMaterializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy { e =>
2   println(s”Exception: $e”)
3   Supervision.Stop
4 })
5
6 Source(1 to 10)
7   .map(_ / 0)
8   .runWith(Sink.ignore)(loggingMaterializer)

 

2. Value, by-name and function parameters

Generally speaking, most methods in Akka Streams API require a function to transform emitted elements. After some time you may find yourself accustomed to transforming elements with lambdas. For this reason there is a high possibility you are going to misinterpret methods that take value parameters. Let’s imagine you need to consume elements from an infinite queue:

The code above seems to be fine, but the actual output is:

msg1
msg1
msg1
msg1
It turns out that repeat takes parameter which is a value ('def repeat[T](element: T)'), so it repeats the same element over and over again. It gets even more confusing if you are used to the Scala collection API. 'Stream.continually '— equivalent of 'Source.repeat' has a method which takes by-name parameter('def continually[A](elem: => A)'). To solve the issue you can just repeat irrelevant value like Unit and map it to something else in the next stage:

Pay extra attention to what kind of parameter is required by a method you use.

3. Exceptions thrown outside stages

A similar case applies to exceptions. Suppose you want to consume messages from JMS queue. Communication with an external services should always be considered unstable. To make such cases more resilient, Akka provides 'RestartSource', which will restart the inner source each time it fails in a backoff manner… at least it should.

 

Calling 'connectToUnstableService()' could throw an exception. Theoretically, 'RestartSource' should automatically retry an inner source if the exception is thrown. However, if you run this example the exception is going to be thrown and the stream will just complete:
Error during preStart in 
[akka.stream.scaladsl.RestartWithBackoffSource@7174a4f4]: Failed to 
connect service
java.lang.RuntimeException: Failed to connect unstable jms service

Why? If you look closely, 'connectToUnstableService()' is called when creating an inner source. 'Source.single' takes value as a parameter. The exception is therefore not thrown while the stream is processing, but before it was even created. This is something out of 'RestartSource’s' jurisdiction. The solution is similar to the previous pitfall — just create a source with a single unit element and map this element to a connection or whatever dangerous operation you want to perform:

 

4. mapAsync — keep everything in the Future

If you want to process something in parallel, there is a good chance that you would use 'mapAsync'. Good choice, just remember to put everything in the 'Future'. Otherwise you will end up with blocking code.

Such source will finish not after about 5 seconds, as you might assume, but after 25 seconds:
2017–11–16 14:09:33,581 — very slow action 1 … 
2017–11–16 14:09:38,585 — very slow action 2 … 
2017–11–16 14:09:43,585 — very slow action 3 … 
2017–11–16 14:09:48,585 — very slow action 4 … 
2017–11–16 14:09:53,587 — very slow action 5 … 
2017–11–16 14:09:58,588 — done …
Because the 'Future' is not enclosing the blocking code, it will actually block the actor responsible for this stage. Remember that all stages not marked asynchronous will run in one single actor. With a corrected example:

everything works as expected:

2017–11–16 14:10:41,041 — very slow action 1 … 
2017–11–16 14:10:41,041 — very slow action 2 … 
2017–11–16 14:10:41,042 — very slow action 4 … 
2017–11–16 14:10:41,042 — very slow action 3 … 
2017–11–16 14:10:41,042 — very slow action 5 … 
2017–11–16 14:10:46,043 — done …

 

5. tick as a scheduler

Let’s say you want to schedule some processing with 60 minute intervals. Usually the processing takes about 40 minutes to finish. Sometimes it needs more than the scheduled interval, e.g. 90 minutes. In such case you would like to start the next processing, right after the previous one. Unfortunately, if you use the 'tick' method, your scheduler will need to wait for the next tick.

2017–11–15 08:46:34,584 — Processing started
2017–11–15 08:46:41,586 — done!
2017–11–15 08:46:44,494 — Processing started
2017–11–15 08:46:51,495 — done!
2017–11–15 08:46:54,494 — Processing started
2017–11–15 08:47:01,495 — done!
As you can see, the interval between 'Processing started' lines is 10 seconds, instead of 5 seconds. Since the actual processing needs more than 5 seconds, 
every second tick is skipped. This can be fixed by adding 'buffer(1, OverflowStrategy.backpressure)' after 'tick'.'
 
 
This article was written by Jakub Dziworski and posted originally on SoftwareMill Blog.