Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Supervision & error handling in ZIO, Akka and Monix (part 3) + series summary by Adam Warski

W1siziisijiwmtgvmtivmjavmtyvndyvmzivmzm4l3blegvscy1wag90by0ynzq4otuuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

It's time for Part 2! In this final part CTO Adam Warski looks at the final feature which makes actors stand out including error handling, supervision and the actor hierarchy. Find out what it's all about as well as a final summary round-up.

We hope you've enjoyed reading this series with us.

 

'In the previous parts we’ve seen:

In this last part we’ll look at the final feature that makes actors stand out: error handling, supervision and the actor hierarchy.

What is this all about? Things fail all the time. Whenever you communicate with an external system, there might be a network error; the service might have a bug; requests can be malformed; servers might be down; etc.

Error handling is often a significant part of our application. Because it’s surface area is so large, it has a tendency to creep into each corner of our code and make it harder to read and understand. That’s why there are numerous efforts to contain the situation and separate the error handling code from the business logic.

If successfull, we’ll get clear, readable business logic, but also clear and readable error handling logic. Another part of the challenge is to get a degree of certainty that our error handling actually works!

Akka borrows from Erlang’s “let it crash” philosophy. The key idea is not to try to handle all errors in a process. Firstly, this leads to error code getting tangled with the business logic. Secondly, an actor can simply lack context to be able to fix the error.

For example, if there’s an actor whose sole responsibility is to read from a queueing system, and the connection to the queue breaks, what should the actor do? Re-create the connection? But how, if that’s not the responsibility of the actor?

That’s what supervision hierarchies in Akka (and Erlang, and other actor implementations) are for. Each actor has a parent; if an error is not handled by the actor, it is propagated to the parent. The parent can decide if the child process should be resumed, restarted, stopped, or if the error should be escalated to its parent.

Parent, grand-parent and so on, actors might have more and more context, and might hence be able to run appropriate logic — e.g. re-creating the connection to the queue and creating a new child actor which will read from the queue.

Using supervision hierarchies we also achieve the separation of concerns that we were after: the business logic is in the actor, while the error handling logic is in the supervisor.

 

Akka

As in the previous parts, we will start with an Akka example and see how to implement the same logic using Akka Typed, Monix and Zio. In this example we’ll:

  1. connect to an external queueing system through a 'QueueConnector' trait
  2. after obtaining a connected 'Queue' instance, read from the queue
  3. forward any messages to interested consumers
  4. upon any errors, attempt to re-connect, beforehand attempting to 'close()' the old queue connection.

Here are the base traits we’ll be working with:

1 trait Queue[F[_]] {
2   def read(): F[String]
3   def close(): F[Unit]
4 }
5
6 trait QueueConnector[F[_]] {
7   def connect: F[Queue[F]]
8 }

The traits are parametrised with a wrapper (higher-order) type 'F[_]', which should be capable both of representing successful and failed computations. In the Akka example, we’ll be using 'QueueConnector[Future]' and 'Queue[Future]', as that’s the container type that Akka works with best.

We’ll implement a pattern that’s also known as “error kernel”. We’ll keep the imporant state safe & protected in a parent actor: here the state will be the set of registered message consumers (to which the messages read from the queue should be forwarded). The risky operations, which might fail: connecting to and consuming from the queue, will be delegated to a child actor. That way even if the child actor fails, the state will not be lost.

The parent actor will receive two types of messages:

1 case class Subscribe(actor: ActorRef)
2 case class Received(msg: String)

'Subscribe' to add an actor to the set of consumers interested in receiving messages, and 'Received', sent by the child actor, when a new message has been received from the queue. The subscribe and received-message handling logic is quite straightforward:

1 class BroadcastActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3  
4   private var consumers: Set[ActorRef] = Set()
5
6   override def receive: Receive = {
7     case Subscribe(actor) => consumers += actor
8     case Received(msg) =>
9       consumers.foreach(_ ! msg)
10   }
11 }

View source

But with this definition alone, nothing will really happen, as we never try to connect to the queue. That’s why when the broadcast (parent) actor starts, we’ll spawn a child actor:

1 class BroadcastActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3    
4   override def preStart(): Unit = {
5     context.actorOf(Props(new ConsumeQueueActor(connector)))
6   }
7  
8   // ...
9 }

What does the child actor do? Its internal state will consist of the currently connected 'Queue' instance (if any). Once again we’ll use the 'preStart' callback to try to connect to the queue immediately after the actor starts. As this is an asynchronous operation, we’ll 'pipeTo' the result to the actor. That way the result of the connect operation will be sent as a message to the actor:

1 class ConsumeQueueActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3  
4   import context.dispatcher
5
6   private var currentQueue: Option[Queue[Future]] = None
7
8   override def preStart(): Unit = {
9     log.info("[queue-start] connecting")
10     connector.connect.pipeTo(self)
11   }
12    
13   // ...
14 }

Once the connected queue is received, we can start reading messages from it. Each message, once available, will be forwarded to the parent actor, wrapped in 'Received'. After a message is received, we can receive the next one by sending the queue to 'self' (the current actor):

1 class ConsumeQueueActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3   
4   import context.dispatcher  
5    
6   private var currentQueue: Option[Queue[Future]] = None  
7  
8   // ...
9  
10   override def receive: Receive = {
11     case queue: Queue[Future] =>
12       if (currentQueue.isEmpty) {
13         log.info("[queue-start] connected")
14         currentQueue = Some(queue)
15       }
16       log.info("[queue] receiving message")
17       queue
18         .read()
19         .pipeTo(self) // forward message to self
20         .andThen { case Success(_) => self ! queue } // receive next message
21
22     case msg: String =>
23       context.parent ! Received(msg)
24
25     case Failure(e) =>
26       log.info(s"[queue] failure: ${e.getMessage}")
27       throw e
28   }
29 }

View source

But what if there’s an error? If either 'connector.connect.pipeTo(self)' or 'queue.read().pipeTo(self)' fails, the actor will receive a 'Failure(e)' message. We don’t really know what to do with that, so we are taking the easiest route: re-throwing the error — which will cause the actor to fail — and hence propagating the error to the parent.

Whatever the reason for the child actor to be stopped (either failure or a regular shutdown of the application), we make one last effort to clean up in the 'postStop' method:

1 class ConsumeQueueActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3  
4   import context.dispatcher
5
6   private var currentQueue: Option[Queue[Future]] = None
7
8   override def preStart(): Unit = // ...
9
10   override def postStop(): Unit = {
11     log.info("[queue-stop] stopping queue actor")
12     currentQueue.foreach { queue =>
13       log.info("[queue-stop] closing")
14       Await.result(queue.close(), 1.minute)
15       log.info("[queue-stop] closed")
16     }
17   }
18    
19   // ...
20 }

 

If there’s any connected 'Queue' instance (there might not be, if connecting failed), we try to invoke its 'close()' method. As this is an asynchronous process, and the 'postStop' method is synchronous, we have no other choice but to use 'Await.result'.

And that’s all there is to the child actor; notice that there’s almost no error handling code at all (except for re-throwing any exceptions).

What will the parent do once a child fails? That depends on the supervision strategy. The default one is to 'Restart' a child on “normal” exceptions. The strategy is defined in the parent actor as an overridable method:

1 class BroadcastActor(connector: QueueConnector[Future]) 
2   extends Actor with ActorLogging {
3    
4   // ...
5
6   // optional - the default one is identical
7   override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
8     case _: ActorInitializationException => Stop
9     case _: ActorKilledException         => Stop
10     case _: DeathPactException           => Stop
11     case e: Exception =>
12       log.info(s"[broadcast] exception in child actor: ${e.getMessage}, restarting")
13       Restart
14   }
15    
16   // ...
17 }

Here we have a simple hierarchy with one child actor, but in more complex examples, besides restarting the actor (one-for-one), there is also the possibility of restarting all child actors if only one fails (all-for-one).

In addition, there’s also some flexibility in how the child actor is restarted. One option is to use backoff, that is not to restart the child actor immediately, but after a (growing) delay. If a system is down, it’s quite possible that it will be down if we try again right after failure. But if we wait a bit, it has a higher chance of getting back to shape. This is possible by wrapping the child actor in a 'BackoffSupervisor'.

The example above is available in the GitHub repository, together with tests which simulate failures at various stages of the application. There’s quite a lot of logging going on, so you can observe what happens at each moment, when and if the actors are created and restarted.

 

Akka Typed

The Akka Typed implementation is slightly different in two aspects. First of all, failure handling is not tied to the parent actor. Instead, it’s a wrapper for a behavior which gives us more flexibility. Failure handling can be both defined in the parent, or can come pre-defined with the child actor behavior.

Secondly, if a parent actors spawns multiple child actors, each of them can have different supervisor handling — unlike the “global” configuration of the supervision strategy in the “traditional” Akka approach.

To implement our example we’ll define 'broadcastBehavior' which will describe how the parent actor should behave. It will handle the same two types of messages as before, but because we need to parametrize the behavior with a single type, we introduce a common trait:

1 sealed trait BroadcastActorMessage
2 case class Subscribe(actor: ActorRef[String]) extends BroadcastActorMessage
3 case class Received(msg: String) extends BroadcastActorMessage

The message handling logic won’t have any mutable state. Instead, once again it will be a method parametrised with the state —the set of consumer actors — which is called recursively:

1 def handleMessage(consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = 
2   Behaviors.receiveMessage {
3     case Subscribe(actor) => handleMessage(consumers + actor)
4     case Received(msg) =>
5       consumers.foreach(_ ! msg)
6       handleMessage(consumers)
7   }

But, before handling any 'BroadcastActorMessage', we should try to connect to the queue and start receiving messages. We’ll do that in a separate actor, spawned when the broadcast behavior is first created:

1 def broadcastBehavior(
2   connector: QueueConnector[Future]): Behavior[BroadcastActorMessage] = 
3   Behaviors.setup { ctx =>
4     val connectBehavior = Behaviors
5       .supervise[Nothing](connectToQueueBehavior(connector, ctx.self))
6       .onFailure[RuntimeException](SupervisorStrategy.restart)
7     ctx.spawn[Nothing](connectBehavior, "connect-queue")
8
9     def handleMessage(
10       consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = // ...
11
12     handleMessage(Set())
13   }

View source

We’re using 'Behaviors.supervise' to wrap the child actor behavior ('connectToQueueBehavior', which we’ll define next) so that whenever a 'RuntimeException' happens, the actor will be restarted. Note that 'supervise' is a wrapper for any 'Behavior', yielding a new 'Behavior'. We could have defined it completely separately and outside of the parent actor. Depending on the use-case, it might be more logical to define it inside, or outside of the supervisor.

Even easier than before, we can also use delayed restarts with a backoff by using 'SupervisorStrategy.restartWithBackoff' (and others), instead of 'SupervisorStrategy.restart' as in this case.

There’s an important difference between “traditional” Akka and Akka Typed. In the previous approach, we’ve seen that the default supervisor strategy for “normal” exceptions is to restart the child actor. In Akka Typed, the default is to stop the child actor. That’s why we need to explicitly specify what to do on child failures using 'onFailure'.

The second difference from the previous implementaiton is that the child actor will in fact consist of two actors: one for connecting to the queue, the other for a connected queue. The reason why we need not only two behaviors but also two actors is that both of them will handle different types of messages. That’s the small price we’ll need to pay for type safety.

We won’t be sending any messages from the parent actor to the child actor, hence its type, as viewed by the parent actor, will be 'Behavior[Nothing]'. Inside the actor, however, we are sending a message containing the connected queue, so we’ll need to create a behavior which accepts a 'Try[Queue[Future]]' and then hide that fact from the parent using 'narrow':

1 def connectToQueueBehavior(connector: QueueConnector[Future], 
2                            msgSink: ActorRef[Received]): Behavior[Nothing] = {
3   Behaviors.setup[Try[Queue[Future]]] { ctx =>
4     import ctx.executionContext
5
6     ctx.log.info("[queue-start] connecting")
7     connector.connect.andThen { case result => ctx.self ! result }
8
9     Behaviors.receiveMessage {
10       case Success(queue) =>
11         ctx.log.info("[queue-start] connected")
12
13         val consumeActor = ctx.spawn(consumeQueueBehavior(queue, msgSink), 
14                                      "consume-queue")
15         ctx.watch(consumeActor)
16 
17         // we can either not handle Terminated, which will cause 
18         // DeathPactException to be thrown and propagated or rethrow the 
19         // original exception
20         Behaviors.receiveSignal {
21           case (_, t @ Terminated(_)) =>
22             t.failure.foreach(throw _)
23             Behaviors.empty
24         }
25       case Failure(e) =>
26         ctx.log.info("[queue-start] failure")
27         throw e
28     }
29   }
30 }.narrow[Nothing]

Using the self-reference from the context, we are sending a message to self once the queue is connected ('connector.connect.andThen { case result => ctx.self ! result }'). If it’s a failure, we rethrow the error which will cause the supervisor in the parent to be invoked. If it’s a success, we spawn a child actor with the queue-consuming behavior ('consumeQueueBehavior', defined below).

Note that instead of the 'preStart' callback, in Akka Typed we simply create a behavior which runs the desired code when the actor is setup (using 'Behavior.setup'), and then returns the “proper” behavior. There’s no looping in this actor, it only ever receives one message.

But that’s not the end. If the queue-consuming actor fails, we need to propagate that error to the parent. That’s not done automatically, we need to watch the new child actor (using 'ctx.watch'). Then, the only thing left to do in the actor is to wait for the child’s termination signal (when things go wrong), and propagate that to the parent.

Termination signals are sent through a different channel than normal actor messages, hence the dedicated behavior factory ('Behavior.receiveSignal', instead of the usual 'Behavior.receiveMessage').

Finally, we get to the behavior of the queue consumer:

1 def consumeQueueBehavior(queue: Queue[Future], 
2                          msgSink: ActorRef[Received]): Behavior[Try[String]] =
3   Behaviors.setup { ctx =>
4     import ctx.executionContext
5
6     ctx.log.info("[queue] receiving message")
7     queue.read().andThen { case result => ctx.self ! result }
8
9     Behaviors
10       .receiveMessage[Try[String]] {
11         case Success(msg) =>
12           msgSink ! Received(msg)
13           consumeQueueBehavior(queue, msgSink)
14 
15         case Failure(e) =>
16           ctx.log.info(s"[queue] failure: ${e.getMessage}")
17           throw e
18       }
19       .receiveSignal {
20         case (_, PostStop) =>
21           ctx.log.info("[queue-stop] closing")
22           Await.result(queue.close(), 1.minute)
23           ctx.log.info("[queue-stop] closed")
24           Behaviors.same
25       }
26   }

Sim'ilarly to the “traditional” Akka implementation, we invoke reading from the queue and once the message is ready, we forward it to self ('queue.read().andThen { case result => ctx.self ! result }'). Once a message is received, we send it to the sink (that will be the parent actor) and recursively call the same behavior.

If it’s a failure, we simply throw the exception. That will cause the queue-connecting actor to be notified, which will in turn notify the parent actor.

What about closing the queue before the queue-consumer actor finishes (for whatever reason)? There’s no 'postStop' method to override here like before. Instead, we modify the created behavior adding a 'receiveSignal' handler. If we get a 'PostStop' signal, we try to close the queue. Again, we need to synchronously return a new behavior, but the closing action is asynchronous — hence the need for the 'Await'.

It’s important to note here that once again we are leveraging the fact that 'Behavior's, just like Monix’s 'Task's and Zio’s 'IO' are lazy. This allows modifying the (recursive) behavior by adding additional handlers or meta-data. Here, we are modifying 'Behaviors.receiveMessage[Try[String]]' so that the signal handler is installed as well. If the behaviors were eagerly executed, the 'receiveSignal' would never be called.

One more case where separating description of a computation from its interpretation is beneficial.

 

Monix

Let’s start examining the Monix implementation from the end, that is from the description of the task which will connect to the queue, consume messages from it and close it in the end (either due to normal termination or an error).

Instead of using lifecycle hooks ('preStart', 'postStop' in Akka), we’ll simply define a process which performs the connect-consume-close steps in sequence.

As in the previous parts, to communicate with the parent process we’ll use an 'MVar' (a bounded, 1-element queue) which will store elements of type 'BroadcastMessage':

1 sealed trait BroadcastMessage
2 case class Subscribe(consumer: String => Task[Unit]) extends BroadcastMessage
3 case class Received(msg: String) extends BroadcastMessage

Next, we’ll define three separate tasks which connect to the queue, consume elements from the queue and finally close it:

1 val connect: Task[Queue[Task]] = Task
2   .eval(logger.info("[queue-start] connecting"))
3   .flatMap(_ => connector.connect)
4   .map { q =>
5     logger.info("[queue-start] connected")
6     q
7   }
8
9 def consumeQueue(queue: Queue[Task]): Task[Unit] =
10   Task
11     .eval(logger.info("[queue] receiving message"))
12     .flatMap(_ => queue.read())
13     .flatMap(msg => inbox.put(Received(msg)))
14     .cancelable
15     .restartUntil(_ => false)
16
17 def releaseQueue(queue: Queue[Task]): Task[Unit] =
18   Task
19     .eval(logger.info("[queue-stop] closing"))
20     .flatMap(_ => queue.close())
21     .map(_ => logger.info("[queue-stop] closed"))

The task definitions are pretty straighforward: they simply invoke the appropriate methods on the 'connector' or a connected 'queue' instance and perform some additional logging. Note that 'consumeQueue' will never end normally, as after reading a single message and sending it the parent process (using 'inbox.put(Received(msg))'), it’s always restarted to read another message ('restartUntil(_ => false)').

'Task[Queue[Task]]' might look weird, but well … it’s a task which, when run, creates a 'Queue' which in turn, wraps the results of its method in a 'Task'.

How to combine these three tasks into a whole? We’ll use 'bracket':

1 def consume(connector: QueueConnector[Task], 
2             inbox: MVar[BroadcastMessage]): Task[Unit] = {
3  
4   val connect: Task[Queue[Task]] = // ...
5   def consumeQueue(queue: Queue[Task]): Task[Unit] = // ...
6   def releaseQueue(queue: Queue[Task]): Task[Unit] = // ...
7  
8   connect.bracket(consumeQueue)(releaseQueue)
9 }

View source

Note that we are using 'inbox' as the name for the communication channel between the consume and broadcast processes to avoid name clashes, as 'Queue' is already taken by our domain class.

'bracket' in an operator that forms one of the basic building blocks of error handling in Monix (and ZIO as well). It’s equivalent to the well known 'try ... catch ... finally' construct from Java/Scala. The 'connect' task should allocate the resources; then the first bracket parameter is the resource usage. Regardless of the way the resource usage part ends (either the task completes, there is an error or the fiber is cancelled), it’s guaranteed that the third task, to release the resources, will be evaluated as well.

And that’s exactly what we need! Using 'bracket', we can ensure that the queue will at least be attempted to be closed however the queue consumption ends.

It’s not quite clear what will happen when both resource-usage and resource-release parts throw an error. Which error will the user get? It’s the first one, and the second will be discarded. An important detail to keep in mind.

The above guarantees proper behavior when an error happens. But what if we just want to end the process gracefully? We might be no longer interested in consuming the queue. With Akka it was enough to stop the actor. Here we have to use cancellation.

In the previous parts we’ve used 'Fiber.cancel' as well, to end a forked (asynchronous) process. Here the consumption logic will also be run asynchronously (as we’ll see below soon). If the user decides that queue consumption should stop, cancellation is the only hope to break the infinite consumption loop.

However, there’s a catch: by default a lot of things aren’t cancellable. For example, the infitite 'flatMap' chain in 'consumeQueue' (if we unfold the recursive invocations) will never be cancelled. That’s why we need to add a cancellation boundary using 'cancelable'. This will cause the flat-map chain to allow stopping mid-way.

What 'cancelable' does, in essence, is to instruct the interpreter of the task that when it receives a cancellation request for a fiber (ligthweight thread), and there’s an opportunity to stop executing the task — for example because the interpreter just finished one 'flatMap' operation and is about to start another one — the task will be cancelled.

So far we’ve talked only about connecting to the queue. What about the rest? We still need to define the message-broadcasting process which will send the read messages to interested consumers. For that, we create a task which will handle both 'Subscribed' and 'Received' messages:

1 def processMessages(inbox: MVar[BroadcastMessage], 
2                     consumers: Set[String => Task[Unit]]): Task[Unit] =
3   inbox.take
4     .flatMap {
5       case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
6       case Received(msg) =>
7         consumers
8           .map(consumer => consumer(msg).fork)
9           .toList
10           .sequence_
11           .flatMap(_ => processMessages(inbox, consumers))
12     }

Nothing out of the ordinary that we haven’t seen before. We describe a never-ending process which reads messages from a queue, maps them to the appropriate tasks (updating the internal state — the set of 'consumer's — if necessary) and recursively calls itself.

We still need to define how and when the consume process should be restarted:

1 def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] =
2   consume(connector, inbox).attempt
3     .map {
4       case Left(e) =>
5         logger.info("[broadcast] exception in queue consumer, restarting", e)
6       case Right(()) =>
7         logger.info("[broadcast] queue consumer completed, restarting")
8     }
9     .restartUntil(_ => false)

That part of the broadcast process definition corresponds to the supervisor strategy. When a 'consume' task fails — which can only happen due to an error — we have to decide what to do. Here we simply log the result and restart the process, just like as the supervisor’s 'restart'.

While not built-in, that’s the place where we might use backoff or a limited retry mechanism; however, we’d have to code that by hand.

We’ve also managed to maintain the separation between the business and error-handling logic, however here it’s not enforced through a special mechanism. Instead, we are separating the 'Task' description into a “single” 'consume' task and a task which manages the restarts. Creating fine-grained, single-responsibility task descriptions is one way of creating readable, maintainable code when using Monix.

Finally, we need to tie all the parts together and kick-start the background processes:

1 def broadcast(connector: QueueConnector[Task]): Task[BroadcastResult] = {
2   def processMessages(inbox: MVar[BroadcastMessage], 
3                       consumers: Set[String => Task[Unit]]): Task[Unit] = // ...
4 
5   def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] = // ...
6   
7   for {
8     inbox <- MVar.empty[BroadcastMessage]
9     f1 <- consumeForever(inbox).fork
10     f2 <- processMessages(inbox, Set()).fork
11   } yield BroadcastResult(inbox, f1.cancel *> f2.cancel)
12 }

To start the broadcast, we start two asynchronous processes: one consuming from the queue in a loop, the other processing messages. The two processes communicate through the 'inbox' 'MVar'.

The return type of the method consists of both the inbox — so that external clients have the possibility to subscribe new consumers, and of a task which, when run, will cancel the whole process. Note how the fact that 'Task' is lazy allows us to simply create the description of the cancellation logic: 'f1.cancel *> f2.cancel' ('*>' flat-maps the two tasks, discarding the result of the first), without fear of running the cancellation prematurely.

Cancelling 'f1' will invoke the bracket’s release, while cancelling 'f2' will cause messages to no longer be read from the 'inbox'.

 

ZIO

Finally, let’s see how ZIO handles errors. As expected, the implementation is quite similar to Monix, however this might be deceptive at times: there are some very important differences, especially in the cancellation model.

However, the overall structure of the solution is the same as before. We’ll be using the same two messages to communicate with the broadcast process:

1 sealed trait BroadcastMessage
2 case class Subscribe(consumer: String => IO[Nothing, Unit]) extends BroadcastMessage
3 case class Received(msg: String) extends BroadcastMessage

With the difference that in 'Subscribe', the consumer results in an 'IO' instead of a 'Task' or a 'Future'. Following the same order as in the previous section, the description of how queue consumption should work looks familiar:

1 def consume(connector: QueueConnector[IO[Throwable, ?]], 
2             inbox: IOQueue[BroadcastMessage]): IO[Throwable, Unit] = {
3   
4   val connect: IO[Throwable, Queue[IO[Throwable, ?]]] = IO
5     .syncThrowable(logger.info("[queue-start] connecting"))
6     .flatMap(_ => connector.connect)
7     .map { q =>
8       logger.info("[queue-start] connected")
9       q
10     }
11
12   def consumeQueue(queue: Queue[IO[Throwable, ?]]): IO[Throwable, Unit] =
13     IO.syncThrowable(logger.info("[queue] receiving message"))
14       .flatMap(_ => queue.read())
15       .flatMap(msg => inbox.offer(Received(msg)))
16       .forever
17
18   def releaseQueue(queue: Queue[IO[Throwable, ?]]): IO[Void, Unit] =
19     IO.syncThrowable(logger.info("[queue-stop] closing"))
20       .flatMap(_ => queue.close())
21       .map(_ => logger.info("[queue-stop] closed"))
22       .catchAll[Nothing](e => IO.now(
23                   logger.info("[queue-stop] exception while closing", e)))
24
25   connect.bracket(releaseQueue)(consumeQueue)
26 }

View source

The 'bracket' operator works the same way as in Monix (though the release-resource and use-resource arguments are reversed): it guarantees that, if the 'connect' action succeedes, 'releaseQueue' will be evaluated (closing an open queue connection), both when 'consumeQueue' finishes normally or due to an error.

There are two important differences in the code, though. First of all, in 'consumeQueue' you might notice that in the Monix version we had to explicitly mark the 'flatMap'-chain as 'cancelable' so that it’s possible to stop queue consumption from the outside. Here, that’s not needed: 'flatMap' chains are by default auto-cancellable.

Secondly, the release-resource part in 'bracket' must handle all errors: and that’s enforced through the type system, as the type of the release-resource parameter is 'A => IO[Void, Unit]'. That’s why there’s no problem what to do in case the release action results in an error: normal errors aren’t possible (as the type states), and if the action does throw an exception (which is always possible), this is considered a programming defect and will be reported to the fiber’s supervisor and/or logged.

The 'broadcast' process implementation corresponds directly to the Monix implementation without significant differences:

1 def broadcast(connector: QueueConnector[IO[Throwable, ?]]
2               ): IO[Void, BroadcastResult] = {
3   
4   def processMessages(inbox: IOQueue[BroadcastMessage], 
5                       consumers: Set[String => IO[Void, Unit]]
6                       ): IO[Void, Unit] =
7     inbox
8       .take[Nothing]
9       .flatMap {
10         case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
11         case Received(msg) =>
12           consumers
13             .map(consumer => consumer(msg).fork[Nothing])
14             .toList
15             .sequence_
16             .flatMap(_ => processMessages(inbox, consumers))
17       }
18
19   def consumeForever(inbox: IOQueue[BroadcastMessage]): IO[Void, Unit] =
20     consume(connector, inbox).attempt.map {
21       case Left(e) =>
22         logger.info("[broadcast] exception in queue consumer, restarting", e)
23       case Right(()) =>
24         logger.info("[broadcast] queue consumer completed, restarting")
25     }.forever
26
27   for {
28     inbox <- IOQueue.make[Void, BroadcastMessage](32)
29     f1 <- consumeForever(inbox).fork
30     f2 <- processMessages(inbox, Set()).fork
31   } yield BroadcastResult(inbox, 
32             f1.interrupt(new RuntimeException) *> 
33             f2.interrupt(new RuntimeException))
34 }

 

To reiterate on the previous description: we create two processes, one which tries to connect to the queue and consume messages from it ('consumerForever'), restarting the whole procedure if necessary. The second one ('processMessages') maintains the state — the set of current subscribers (and hence implements the Error Kernel pattern). As a result of the whole action, we return:

  • a queue to which new subscribers can be sent
  • a way to stop the whole process
Stopping the process involves, as before, interrupting the fiber which tries to connect to the queue and consumes messages from it, and another fiber which broadcasts the incoming messages.
 

Interruption in ZIO and cancellation in Monix

The way interruption and cancellation works in ZIO and Monix is one of their distinguishing differences, so it might make sense to compare them side-by-side.

 

Creating cancelable actions

In Monix, cancelable actions can be created using:

  • 'Task.create', where the user needs to provide a 'Cancelable' instance which should stop (or try) the asynchronous computation. Upon cancellation, this callback might run concurrently with the cancelled action
  • 'cancelable' operator, which causes 'flatMap' chains in the 'Task' to become cancelable (by default they are not)

In ZIO we have:

  • 'IO.async0', where the user needs to provide a 'Canceler' which will be run when the action is cancelled. The canceller might be run concurrently with the cancelled action
  • 'flatMap' chains are cancellable by default, no need to explicitly mark them as such

Both libraries offer an 'uncancelable' (Monix)/'uninterruptibly' (ZIO) operators which prevent the described action from being cancelled — even if it’s built out of cancellable operations.

In neither of the libraries atomic actions (such as a single 'flatMap' step, or wrapped synchronous code) will be attempted to be interrupted/cancelled e.g. using 'Thread.interrupt'.

 

Cancelling fibers

The way fiber cancellation/interruption tasks work is another important difference. In Monix, fibers can be interrupted by evaluating a task returned by the 'Fiber.cancel: Task[Unit]' method. This task will complete once the cancellation is sent.

In ZIO, we have the 'Fiber.interrupt(t: Throwable): IO[E, Unit]' method. It’s similar, as when evaluated, it will interrupt the target fiber. But it’s also different in two aspects. First, we can specify a specific interruption reason(an exception). That reason will be then reported to the any action that attempts to join the interrupted fiber, or to the fiber’s supervisor, allowing logging or restarts.

Second important difference is that the action returned by 'interrupt' will only complete once the interruption is successful or the fiber ended. If we need the interrupt-and-forget semantics from Monix, this can be achieved by forking the fiber interruption into a fiber ('.interrupt(...).fork').

 

What can be cancelled

When can cancellation be invoked? Both Monix and ZIO provide a way to cancel/interrupt a running fiber, as described above.

Additionally, when a Monix task is run asynchronously e.g. using 'runAsync', it returns a 'CancelableFuture'. That’s an extension to the regular 'Future' which can additionaly cancel a running computation through the side-effecting 'cancel()' method.

ZIO doesn’t have such possibilities, however the same effect might be achieved by forking the 'IO' action to a fiber and obtaining (through the synchronous 'unsafePerformIO') a 'Fiber' instance, which can then be interrupted.

 

Cleaning up

Both Monix and ZIO have a 'bracket' operator which works the same way: when applied to a resource-create action, it guarantees that a resource-release action will be run once the resource-use action completes successfuly, with an error or is cancelled.

ZIO also has some handy aliases, like 'ensuring' (corresponds to 'finally') and 'bracketOnError'.

 

Cancellation callbacks

Is it possible to find out that an action has been cancelled within the action itself?

Monix has two such operators. Firstly, 'doOnCancel(cb: Task[Unit])' runs the given task when cancellation occurs (there’s also a counterpart which runs when the task ends normally, 'doOnFinish'). Hence, it’s a “partial bracket”.

The second,'onCancelRaiseError', causes the action to fail with the given exception, instead of becoming non-terminating on cancel. There’s no way to specify the cancellation reason from the cancelling fiber, but it’s possible to specify it in the cancelled fiber. On the other hand, in ZIO it’s only possible to specify the reason in the interrupting fiber, and the interrupted fiber is always terminating with that exception.

ZIO has no operators which would allow to find out in the interrupted that an interruption happened. Instead, interruption will be reported to any actions that attempt to 'join' the fiber that is being interrupted.

 

Fiber supervisors

ZIO has two additional mechanisms for fiber supervision which have no counterpart in Monix.

The first one are fiber supervisors. When forking an 'IO' to a fiber it’s possible to specify a handler which will be invoked on any exceptions not handled by the fiber: 'fork0[E2](handler: Throwable => Infallible[Unit])'. If this resembles supervisors in actors — it should!

If no supervisor is specified, a default one is used which logs the exception.

The second mechanism is the 'IO.supervised(t: Throwable)' method which causes any fibers forked as part of evaluation of the given action to be interrupted with the given exception, once this action completes. Again, this is similar to all child actors being stopped when the parent actor is stopped, however here it’s optional, not mandatory.

We’ve seen an example of using this feature in part 2, where the worker fibers were automatically interrupted once the crawler finishes.

 

Summary

Is Monix or ZIO an alternative to Akka actors? Yes: state encapsulation, communication and error handling/supervision can all be implemented using 'Task's or 'IO' actions, without much effort, at the same time keeping the code readable and maintainable, in a more type-safe way.

However, Akka is not lagging behind, as there’s an alternative to “traditional” Akka actors in Akka itself: Akka typed, which is definitely a viable alternative as well.

Whichever approach we choose, as we have seen in the examples presented in the 3 parts of the series, the overall structure of solutions written using the four approaches is the same:

  • all of them use asynchronous message passing
  • all of them communicate using queues: implicit actor mailboxes or explicit queues
  • all of them use concurrently running, independent light-weight processes: actors or fibers

However, as the saying goes, the devil is in the details: the level of type-safety, the model of evaluation, supervision, cancelling and error handling differs significantly. Below is a summary of the various features that we have covered in the series (also available in textual format on Google Sheets)

An important difference is the choice of primitives. In Akka the basic construct is an actor, while in Monix/ZIO there are 'Task'/'IO' actions, which are more low-level, but hence also more flexible. Actors are a pre-defined recipe for creating an asynchronous process.

Is this a limitation of Akka? To some degree yes, however actors are also a very natural way to think about concurrent systems.

Actors which:

  • mind their own business
  • protect their internal state
  • communicate with others using fire-and-forget messages
  • form supervision hierarchies

constrain the ways in which we can define asynchronous processes, but also provide a great framework to model, understand and talk about concurrent systems, which is always a challenging task. Whatever helps in taming the inherent complexity of a distributed system is worth paying attention to.

That’s why even when describing a process using Monix or ZIO, in addition to many other „recipies” that can be defined using 'Task's/'IO's, the notion of an actor is still very useful and that’s why we try to emulate it.

If you’d like to further explore and experiment with the examples, or take Akka/ZIO/Monix for a testdrive using the examples as a skeleton, the code is available, together with tests, on GitHub. Just import to your favorite IDE and try to implement your use-case using the four approaches!

 

Summary: the short version

  • Akka: the most mature, popular solution, with a big ecosystem. However, the basic construct — actors — lack in type safety, which apart from programming errors might make it difficult to understand how communication in a system is organized.
  • Akka typed: a new approach to defining and running actors, using a type-safe, composable and lazy construct — behaviors. Communication patterns are easier to browse. Integrates well with the Akka ecosystem. Some side-effecting operations remain, such as scheduling or message sending.
  • Monix: a pretty well-established library, which provides a type-safe way to describe and manipulate lazy concurrent process using a rich set of combinators. In addition to fully encapsulating side effects, also has a module which implements reactive streaming.
  • ZIO: the newest contender which further develops the Monix approach of lazy computation descriptions by offering type-safe errors and supervision. Puts an emphasis on well-behaved interruption, fibers and resource safety constructs.

The code we write is increasingly concurrent, which creates new challenges and lifts the importance of code readability to the next level. The Scala ecosystem offers a wide range of solutions, which differ in choice of primitive constructs, programming styles and type-safety. It’s great to have a choice, though, and as the recent developments in ZIO, Monix and Akka Typed have shown, competition really drives innovation forward.

Good time to be a Scala programmer.'

 

This article was written by Adam Warski and posted originally on SoftwareMill Blog.