Connecting...

Pexels Photo 355948

Akka vs ZIO vs Monix, part 2: communication by Adam Warski

Pexels Photo 355948

Can you implement multiple communicating actors using ZIO or Monix? CTO at SoftwareMill, Adam Warski explores examples of this as well as examples using Crawler, Akka Typed and Sockets. 

If you haven't yet read Part 1 check it out on our Sunday Reads here.  

 

In part 1, we’ve explored how to implement a process which manages some non-trivial state using Akka, Akka Typed, Monix and ZIO. However, as a popular saying by Carl Hewitt goes, “one actor is no actor, they come in systems”. Hence, let’s explore examples which use multiple communicating actors, and see if it’s still possible and practical to implement them using ZIO or Monix.

 

Crawler

Our first example will be an implementation of the popular master-worker pattern, where we have a single master process distributing work to a number of worker processes. When a worker finishes a work unit, it sends the results to the master process, which gathers them and includes in the overall computation results.

More concretely, the task will be to create a web crawler. Starting from a given URL, it should traverse all the links, counting which hosts are most popular. HTTP requests should be executed in parallel, however there’s one additional restriction: we don’t want to be suspicious in our crawling efforts, so we impose a restriction that at any given time, at most one request to any host should be executed (but requests to different hosts can be done in parallel).

As we are interested in the way a process is defined, not in the actual crawling, we’ll use a stub HTTP service, along with stub functions which extract interesting links (which we want to crawl) from the site’s content:

1 type Host = String
2 case class Url(host: Host, path: String)
3
4 trait Http[F[_]] {
5   def get(url: Url): F[String]
6 }
7
8 type LinkParser = String => List[Url]

 

Traditional Akka

Like in part 1, let’s start with a “traditional” Akka solution, and then move to other implementations. Again, only crucial snippets will be included, but the full source code is available on GitHub.

We’ll have to define two actors: 'Crawler' and 'Worker'. To construct a 'Crawler' actor, we need an interface for executing HTTP requests ('http: Http[Future]', as we are in Akka-land and everything is 'Future'-based), a way to parse links ('parseLinks'), and a 'Promise' waiting to be completed with the final result (once all pages have been crawled; we’re assuming that it’s a finite process, and 'parseLinks' gives only “interesting” links, for example from a set of “interesting” hosts):

1 class Crawler(http: Http[Future], 
2               parseLinks: String => List[Url], 
3               result: Promise[Map[Host, Int]]) extends Actor {
4   
5   var referenceCount = Map[Host, Int]()
6   var visitedLinks = Set[Url]()
7   var inProgress = Set[Url]()
8   var workers = Map[Host, ActorRef]()
9   
10   // ...
11 }

View source

The internal state of the actor consists of:

  • 'referenceCount' — the current host popularity
  • 'visitedLinks' — which URLs have already been processed or are processed, to avoid processing them once again
  • 'inProgress' — set of URLs which are currently processed. Once this becomes empty (after starting the process), the crawling is done and the 'result' promise can be completed with 'referenceCount'
  • 'workers' — each host will have a dedicated worker actor, which will ensure that at most one request is done for each host at any time

There are two messages that the crawler actor can receive:

1 sealed trait CrawlerMessage
2 /**
3   * Start the crawling process for the given URL. Should be sent only once.
4   */
5 case class Start(url: Url) extends CrawlerMessage
6 case class CrawlResult(url: Url, links: List[Url]) extends CrawlerMessage

The 'Start' message should be sent only once, to kickstart the whole process. 'CrawlResult' messages will be sent by worker actors, once they have completed crawling the given URL and parsing the links.

Let’s start by looking at the 'crawlUrl' method in the actor:

1 private def crawlUrl(url: Url): Unit = {
2   if (!visitedLinks.contains(url)) {
3     visitedLinks += url
4     inProgress += url
5     actorFor(url.host) ! Crawl(url)
6   }
7 }
8
9 private def actorFor(host: Host): ActorRef = {
10   workers.get(host) match {
11     case None =>
12       val workerActor = context.actorOf(Props(new Worker(http, parseLinks, self)))
13       workers += host -> workerActor
14       workerActor
15
16     case Some(ar) => ar
17   }
18 }

The method checks if the URL has already been visited; if not, the 'visitedLinks' and 'inProgress' structures are updated. We create or lookup a worker actor using 'actorFor', and tell it to 'Crawl' the given address.

Notice that when creating a new worker, we’re passing the 'self: ActorRef' reference so that the worker can send messages back to the crawler.

As we mentioned before, the actor can receive two types of messages:

1 override def receive: Receive = {
2   case Start(start) =>
3     crawlUrl(start)
4
5   case CrawlResult(url, links) =>
6     inProgress -= url
7
8     links.foreach { link =>
9       crawlUrl(link)
10       referenceCount = referenceCount.updated(link.host, 
11                                               referenceCount.getOrElse(link.host, 0) + 1)
12     }
13
14     if (inProgress.isEmpty) {
15       result.success(referenceCount)
16       context.stop(self)
17     }
18 }

The worker actors are expected to reply to the crawler actor with the 'CrawlResult' method. Once this message is received, again the 'inProgress' and 'referenceCount' structures are updated, and all the linked URLs crawled. If at the end there‘s nothing being crawled — we are done!

The worker isn’t complicated as well. It’s parametrised with a reference ('ActorRef') to the master actor, which allows sending back messages, as well as the 'Http[Future]' interface and a way to parse links:

1 class Worker(http: Http[Future], 
2              parseLinks: String => List[Url], 
3              master: ActorRef) extends Actor with ActorLogging {
4   
5   var urlsPending: Vector[Url] = Vector.empty
6   var getInProgress = false
7   
8   // ...
9 }

The internal state of the actor consists of a list of URLs that should be crawled ('urlsPending'), and a flag indicating if there’s a request in progress ('getInProgress'). This is needed to ensure that there’s at most one request to a given domain executing at any time.

There are also two messages which the worker will receive:

1 sealed trait WorkerMessage
2 case class Crawl(url: Url) extends WorkerMessage
3 case class HttpGetResult(url: Url, result: Try[String]) extends WorkerMessage

The first one is sent, as we’ve seen, by the crawler actor:

1 override def receive: Receive = {
2   case Crawl(url) =>
3     urlsPending = urlsPending :+ url
4     startHttpGetIfPossible()
5    
6   // ...
7 }
8
9 private def startHttpGetIfPossible(): Unit = {
10   urlsPending match {
11     case url +: tail if !getInProgress =>
12       getInProgress = true
13       urlsPending = tail
14
15       import context.dispatcher
16       http.get(url).onComplete(r => self ! HttpGetResult(url, r))
17 
18     case _ =>
19   }
20 }

Once we get a new URL to crawl, we add it to the list of pending requests ('urlsPending'). If possible — that is, if there are no requests in progress — in the 'startHttpGetIfPossible' method we start executing a new HTTP request. Once this completes, we send a 'HttpGetResult' message to ourselves (the worker actor). Note that this is an asynchronous operation, and you always have to be cautious not to access or mutate the actor’s state from within such callbacks.

1 override def receive: Receive = {
2   case Crawl(url) =>
3     // ...
4
5   case HttpGetResult(url, Success(body)) =>
6     getInProgress = false
7     startHttpGetIfPossible()
8
9     val links = parseLinks(body)
10     master ! CrawlResult(url, links)
11
12   case HttpGetResult(url, Failure(e)) =>
13     getInProgress = false
14     startHttpGetIfPossible()
15
16     log.error(s"Cannot get contents of $url", e)
17     master ! CrawlResult(url, Nil)
18 }

Once the worker actor receives the 'HttpGetResult' message, it sends a notification to the master with the results ('CrawlResult'), and starts another request, if there’s one pending.

Overall, it’s not a complicated process, but there’s some communication happening: both between the master and the worker, and the other way round. There are tests for the implementation (see 'AkkaCrawlerTest'), which verify that indeed we get correct answers.

 

Akka Typed

With Akka Typed, instead of writing actors directly, we’ll be defining actor behaviors. The messages sent between the actors/behaviors will be exactly the same, however we’ll additionaly encapsulate the whole state in a case class:

1 case class CrawlerData(referenceCount: Map[Host, Int],
2                        visitedLinks: Set[Url],
3                        inProgress: Set[Url],
4                        workers: Map[Host, ActorRef[WorkerMessage]])

The behaviors are parametrised with an interface for executing HTTP requests, a function to parse the links and an actor to which the reply with the results should be sent once available (this used to be a 'Promise' in the previous example, but this way is more natural here):

1 class Crawler(http: Http[Future], 
2               parseLinks: String => List[Url], 
3               reportTo: ActorRef[Map[Host, Int]]) {
4   def crawlerBehavior: Behavior[CrawlerMessage] = ???
5 }

The crawler behavior that we’ll define will use the actor’s context, so we’ll wrap the method which define the message-processing behavior with a factory method which obtains the context. The context, as everything in Akka Typed, is parameterised with the type of the messages that the actor handles. That’s needed for example to obtain a well-typed 'self' actor reference, which needs to know what kind of messages it accepts:

1 def crawlerBehavior: Behavior[CrawlerMessage] = 
2   Behaviors.setup[CrawlerMessage] { ctx =>
3     def receive(data: CrawlerData): Behavior[CrawlerMessage] = ???
4     def crawlUrl(data: CrawlerData, url: Url): CrawlerData = ???
5     def workerFor(data: CrawlerData, 
6                   host: Host): (CrawlerData, ActorRef[WorkerMessage]) = ???
7   }

Let’s start from the end, with the method for looking up a worker actor for a given host. Since in this implementation we’re not using mutable state, but instead returning modified actor behaviors which wrap the state ('CrawlerData'), all methods will:

  • as a parameter, take a 'CrawlerData' instance
  • return the modified 'CrawlerData' as part of the return type
1 def workerFor(data: CrawlerData, host: Host): (CrawlerData, ActorRef[WorkerMessage]) = {
2   data.workers.get(host) match {
3     case None =>
4       val workerActor = ctx.spawn(workerBehavior(ctx.self), s"worker-$host")
5       (data.copy(workers = data.workers + (host -> workerActor)), workerActor)
6 
7     case Some(ar) => (data, ar)
8   }
9 }

If there’s no worker for the given domain yet, we’re spawning a new child actor, using the worker behavior, and returning an updated actor state, together with the created actor reference.

As the type of the behavior which we are passing to 'spawn' is 'Behavior[WorkerMessage]', the result of this method will be 'ActorRef[WorkerMessage]'.

The 'crawl' and 'receive' method are quite similar to the “traditional” Akka implementation, with the significant difference being that we need to thread through the modified actor state — sometimes there’s a couple of modifications, hence we get a chain of 'data', 'data2', 'data3' references:

1 def receive(data: CrawlerData): Behavior[CrawlerMessage] = Behaviors.receiveMessage {
2   case Start(start) =>
3     receive(crawlUrl(data, start))
4
5   case CrawlResult(url, links) =>
6     val data2 = data.copy(inProgress = data.inProgress - url)
7
8     val data3 = links.foldLeft(data2) {
9       case (d, link) =>
10         val d2 = d.copy(referenceCount = d.referenceCount.updated(
11           link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
12         crawlUrl(d2, link)
13     }
14
15     if (data3.inProgress.isEmpty) {
16       reportTo ! data3.referenceCount
17       Behavior.stopped
18     } else {
19       receive(data3)
20     }
21 }
22
23 def crawlUrl(data: CrawlerData, url: Url): CrawlerData = {
24   if (!data.visitedLinks.contains(url)) {
25     val (data2, worker) = workerFor(data, url.host)
26     worker ! Crawl(url)
27     data2.copy(
28       visitedLinks = data.visitedLinks + url,
29       inProgress = data.inProgress + url
30     )
31   } else data
32 }

Communication in both Akka variants looks the same: we use the '!' ('tell') method to send a message to a (typed) actor. Don’t be mistaken, though: here everything is well-typed. You won’t be able to send a message of an incorrect type to an actor.

The worker behavior also corresponds closely to what we’ve seen before, again with the exception that we’re not using mutable state (and hence there’s no possibility of accidentaly modifying it within callbacks):

1 def workerBehavior(master: ActorRef[CrawlResult]): Behavior[WorkerMessage] = 
2   Behaviors.setup[WorkerMessage] { ctx =>
3     
4   def receive(urlsPending: Vector[Url], getInProgress: Boolean): Behavior[WorkerMessage] =
5     Behaviors.receiveMessage {
6       case Crawl(url) =>
7         startHttpGetIfPossible(urlsPending :+ url, getInProgress)
8
9       case HttpGetResult(url, Success(body)) =>
10         val links = parseLinks(body)
11         master ! CrawlResult(url, links)
12 
13         startHttpGetIfPossible(urlsPending, getInProgress = false)
14
15       case HttpGetResult(url, Failure(e)) =>
16         ctx.log.error(s"Cannot get contents of $url", e)
17         master ! CrawlResult(url, Nil)
18
19         startHttpGetIfPossible(urlsPending, getInProgress = false)
20     }
21
22   def startHttpGetIfPossible(urlsPending: Vector[Url], 
23                              getInProgress: Boolean): Behavior[WorkerMessage] =
24     urlsPending match {
25       case url +: tail if !getInProgress =>
26         import ctx.executionContext
27         http.get(url).onComplete(r => ctx.self ! HttpGetResult(url, r))
28 
29         receive(tail, getInProgress = true)
30 
31       case _ =>
32         receive(urlsPending, getInProgress)
33     }
34
35   receive(Vector.empty, getInProgress = false)
36 }

 

ZIO

Once again, let’s leave the eager 'scala.concurrent.Future' world, and venture into the lazy land of 'IO'. In the example from the previous article we’ve been using an 'IOQueue' to communicate with the process from an outside world. Here, we’ll be using multiple 'IOQueue's.

The 'Crawler' process will also use a 'CrawlerData' case class for storing the current state, but instead of a map from the domain to the worker’s 'ActorRef', it will contain an 'IOQueue':

1 case class CrawlerData(referenceCount: Map[Host, Int], 
2                        visitedLinks: Set[Url], 
3                        inProgress: Set[Url], 
4                        workers: Map[Host, IOQueue[Url]])

Instead of actor classes, we’ll be defining methods, which will return 'IO' instances: descriptions of how to compute the host popularity counts. The method will take an 'Http[IO]' interface, but this time when executing the request, we won’t get a 'Future[String]', but as we’re in ZIO-world, an 'IO[String]'. That is, we’ll get back a description of how to execute a 'GET' request to the given address:

1 def crawl(crawlUrl: Url, 
2           http: Http[IO[Throwable, ?]],
3           parseLinks: String => List[Url]): IO[Nothing, Map[Host, Int]] {
4          
5   def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
6               data: CrawlerData): IO[Nothing, Map[Host, Int]] = // ...
7  
8   def worker(workerQueue: IOQueue[Url], 
9              crawlerQueue: IOQueue[CrawlerMessage]
10             ): IO[Nothing, Fiber[Nothing, Unit]] = // ...
11          
12   // ...
13 }

View source

In Akka Typed we had to define two behaviors for the crawler and the worker, here we’ll be defining two process descriptions. The first one, the crawler, contains the same parts as in the previous implementation:

1 def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
2             data: CrawlerData): IO[Nothing, Map[Host, Int]] = {
3  
4   def handleMessage(msg: CrawlerMessage, 
5                     data: CrawlerData): IO[Nothing, CrawlerData] = ???
6
7   def crawlUrl(data: CrawlerData, 
8                url: Url): IO[Nothing, CrawlerData] = ???
9
10   def workerFor(data: CrawlerData, 
11                 url: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = ???
12  
13   ???
14 }

Let’s again start from the bottom, with the description of how to obtain a worker for a given host. Even though we’ve travelled from Akka to Scalaz, we still need a way to ensure that there’s at most one request to a given host done at any given time. A separate asynchronous process which makes sure that’s the case is a good fit:

1 def workerFor(data: CrawlerData, 
2               host: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = {
3   
4   data.workers.get(host) match {
5     case None =>
6       for {
7         workerQueue <- IOQueue.make[Nothing, Url](32)
8         _ <- worker(workerQueue, crawlerQueue)
9       } yield {
10         (data.copy(workers = data.workers + (url -> workerQueue)), workerQueue)
11       }
12     case Some(queue) => IO.now((data, queue))
13   }
14 }

Here of course we also don’t have any mutable state, so we need to take in the 'CrawlerData' as a parameter, and return an updated copy. If there’s no worker for a given address yet, we first create a (bounded) queue which will be used to communicate with that worker, then create the worker process (we’ll get to the definition of 'worker' soon), and finally store the queue in our data structure. Again, that is not that different from the Akka Typed implementation.

The 'crawlUrl' method should look familiar as well:

1 def crawlUrl(data: CrawlerData, url: Url): IO[Nothing, CrawlerData] = {
2   if (!data.visitedLinks.contains(url)) {
3     workerFor(data, url.host).flatMap {
4       case (data2, workerQueue) =>
5         workerQueue.offer(url).map { _ =>
6           data2.copy(
7             visitedLinks = data.visitedLinks + url,
8             inProgress = data.inProgress + url
9           )
10         }
11    }
12   } else IO.now(data)
13 }

 

The major difference is that sending a message to a worker isn’t a side effecting operation as before. Instead, we use the 'workerQueue.offer' method, which returns a description of how to send a message to the queue. We need to combine this description with the overall description of how our code should run, or it will never be executed. Hence the need for the 'flatMap'/'map'.

The 'handleMessage' method corresponds to 'receive' from the Akka Typed implementation and should return the crawler data modified after handling a single, given message:

1 def handleMessage(msg: CrawlerMessage, 
2                   data: CrawlerData
3                  ): IO[Nothing, CrawlerData] = msg match {
4   case Start(url) =>
5     crawlUrl(data, url)
6
7   case CrawlResult(url, links) =>
8     val data2 = data.copy(inProgress = data.inProgress - url)
9
10     links.foldM(data2) {
11       case (d, link) =>
12         val d2 = d.copy(referenceCount = d.referenceCount.updated(
13          link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
14         crawlUrl(d2, link)
15     }
16 }

While before when handling the 'CrawlResult' message we did a simple 'foldLeft' on the resulting links, updating the data structure and running the side-effecting 'crawlUrl' method, here we need to combine all the 'IO's returned by every 'crwalUrl' invocation into one big description. That’s what the 'foldlM' method does: 'def foldlM[G[_], B](z: B)(f: B => A => G[B])(implicit M: Monad[G]): G[B]', giving us the final 'IO[CrawlerData]' which composes all side-effects into a single description.

But, that’s not the end! We have helper methods to handle the messages, but what about the main loop? Unlike in an actor, which as we’ve summarized before, is a pre-defined recipe for an asynchronous process reading messages from its inbox in a loop, here we need to create the loop by hand:

1 crawlerQueue.take.flatMap { msg =>
2   handleMessage(msg, data).flatMap { data2 =>
3     if (data2.inProgress.isEmpty) {
4       IO.now(data2.referenceCount)
5     } else {
6       crawler(crawlerQueue, data2)
7     }
8   }
9 }

The loop takes the form of recursive invocations of the main 'crawler' method, with updated queue data. Unless of course, there are no more requests in progress: then we simply return the result.

Having the crawler ready, let’s look at the worker process. It can in fact be simpler than in the Akka implementations. The key observation is that we are in full control over when we take a new message from the queue. An actor has the mailbox-read-loop baked-in, we cannot wait with receiving the next message until some condition is satisified (it is possible to stash messages, but that requires additional logic). Here, however, we have that possibility.

Hence the worker, after getting a new request to crawl an URL from a queue, can simply execute the request and only take the next URL after the request completes:

1 def worker(workerQueue: IOQueue[Url], 
2            crawlerQueue: IOQueue[CrawlerMessage]
3           ): IO[Nothing, Fiber[Nothing, Unit]] = {
4           
5   def handleUrl(url: Url): IO[Nothing, Unit] = {
6     http
7       .get(url)
8       .attempt[Nothing]
9       .map {
10         case Left(t) =>
11           logger.error(s"Cannot get contents of $url", t)
12           List.empty[Url]
13         case Right(b) => parseLinks(b)
14       }
15       .flatMap(r => crawlerQueue.offer(CrawlResult(url, r))
16                                 .fork[Nothing].toUnit)
17   }
18 
19   workerQueue
20     .take[Nothing]
21     .flatMap(handleUrl)
22     .forever
23     .fork
24 }

The worker process is an infinite loop (created with 'forever'), which takes a message from the queue and handles it. It is also forked into a fiber, so that it runs asynchronously in the background. The fiber instance is returned, but it’s never used by the crawler process.

There’s a very important detail here, however. Notice that when we send the crawl result to the message queue, we fork the operation into a fiber('crawlerQueue.offer(…).fork'). Why is that?

Recall that unlike the mailboxes of actors, the 'IOQueue' that we are using in ZIO is bounded, and when the queue is full, the 'offer' operation blocks. That’s good on one side — it gives a bound on memory usage, and also provides back-pressure. However, it can also lead to deadlocks.

In our example, imagine that there’s a lot of links from one page to a single host (but different paths), so we’ll be sending a lot of messages from the crawler process to a single worker process. If the number of links (URLs) is higher than the queue capacity, then at some point the crawler will become blocked and won’t be able to send any more URLs — as the queue will be full. The worker will slowly work through the requests, replying with results and processing messages from its queue — but it can get immediately filled up with new 'Crawl' messages.

If the total number of URLs sent from the crawler to a one worker during a single 'crawlUrl' invocation exceeds the combined capacities of the crawler and worker queues, at some point the crawler’s queue will fill up as well — as the crawler will be still sending 'Crawl' messages, and won’t get a chance to process the 'CrawlResult' messages it receives; now the worker will block as well. Hence the deadlock.

However, if we send the replies in the background — in a background fiber, the worker will be able to continue working through the 'Crawl' requests. All of the spawned 'offer(CrawlResult(...))'-fibers might wait blocking, until the crawler finishes enqueueing all 'Crawl' requests, but that’s not a problem.

EDIT: as pointed out by John De Goes, another way to solve the problem is to use an unbounded queue with 'IOQueue.make(Int.MaxValue)'. That way with ZIO we have a choice between bounded queues — which require more caution — and unbounded queues.

That way our memory usage is still bounded (by the total size of the queues), and we won’t get a deadlock, however we need to carefully design the way the processes interact to avoid that situation.

If the processes form a hierarchy — as here, there’s a parent proces (crawler) and a number of children processes (worker), a good rule might be to directly send messages only from parent processes to child processes (down the hierarchy tree). Any replies — going up the hierarchy tree — should be sent in the background, using a forked fiber.

Finally, we need to bootstrap the whole process: create the queue to communicate with the crawler, enqueue the initial message, and create the 'IO' which describes the crawling process:

1 val crawl = for {
2   crawlerQueue <- IOQueue.make[Nothing, CrawlerMessage](32)
3   _ <- crawlerQueue.offer[Nothing](Start(crawlUrl))
4   r <- crawler(crawlerQueue, CrawlerData(Map(), Set(), Set(), Map()))
5 } yield r
6
7 IO.supervise(crawl, new RuntimeException)

There’s one small but important feature here: the 'IO.supervise' call which wraps the whole process. What this method does is instruct the interpreter that when the wrapped computation completes ('crawl'), all fibers created by it should be interrupted (and terminated). And that’s exactly what we want: any forked worker fibers should be terminated once we have the final result, as they won’t be ever used.

This closely resembles a hierarchy of actors in Akka: once a parent actor is stopped, all child actors are stopped as well. In ZIO it’s not the default, but the option is there. When defining a computation which spawns multiple fibers, it’s very handy not to have to worry about the cleanup, but delegate the task to 'supervise'.

 

Monix

Finally, let’s move to Monix. As we noted in the previous installment of the series, Monix and ZIO solutions are very closely related. Here the situation is the same. There are two important differences however.

First of all, we cannot use 'MVar's (which behave like bounded queues of size 1) to communicate between the crawler and the worker. As putting a value to a full 'Mvar' is a blocking operation it could very quickly lead to a deadlock (as described above).

That’s why we need a proper queue. Monix does have an unbounded async queue implementation, 'monix.execution.misc.AsyncQueue', but it’s 'Future'-based, so we’ll create a thin 'Task'-wrapper around it:

1 class MQueue[T](q: AsyncQueue[T]) {
2   def take: Task[T] = {
3     Task.deferFuture(q.poll())
4   }
5   def offer(t: T): Task[Unit] = {
6     Task.eval(q.offer(t))
7   }
8 }
9 object MQueue {
10   def make[T]: MQueue[T] = new MQueue(AsyncQueue.empty)
11 }

The interface to our 'MQueue' is the same as to Scalaz’s 'IOQueue', but with an important difference: 'IOQueue' is bounded, and when the queue is full, 'IOQueue.offer' will (asynchronously) block. Here we have an unbounded queue, which corresponds to unbounded actor mailboxes in Akka. Hence, we won’t have problems with deadlocks (but we also don’t get a bound on memory usage).

The second difference is that there’s no construct analogous to 'IO.supervise' in Monix, so we have to manage fibers manually. That means that we are storing the fibers in the 'CrawlerData' data structure, next to the worker queues:

1 case class WorkerData(queue: MQueue[Url], 
2                       fiber: Fiber[Unit])
3 case class CrawlerData(referenceCount: Map[Host, Int], 
4                        visitedLinks: Set[Url], 
5                        inProgress: Set[Url], 
6                        workers: Map[Host, WorkerData])

When a new worker process is created, we have to store the fiber on which it is running:

1 def workerFor(data: CrawlerData, url: Host): Task[(CrawlerData, MQueue[Url])] = {
2   data.workers.get(url) match {
3     case None =>
4       val workerQueue = MQueue.make[Url]
5       worker(workerQueue, crawlerQueue).map { workerFiber =>
6         val workerData = WorkerData(workerQueue, workerFiber)
7         val data2 = data.copy(workers = data.workers + (url -> workerData))
8         (data2, workerQueue)
9       }
10     case Some(wd) => Task.now((data, wd.queue))
11   }
12 }

View source

And once the computation is done, all fibers need to be cancelled. This manual fiber management complicates slightly the 'Task' construction when we know that we are done with the crawling and want to return the result:

1 crawlerQueue.take.flatMap { msg =>
2   handleMessage(msg, data).flatMap { data2 =>
3     if (data2.inProgress.isEmpty) {
4       data2.workers.values.map(_.fiber.cancel).toList.sequence_
5         .map(_ => data2.referenceCount)
6     } else {
7       crawler(crawlerQueue, data2)
8     }
9   }
10 }

The 'data2.workers.values.map(_.fiber.cancel).toList.sequence_' creates a 'Task' description which cancels all the fibers ('Fiber.cancel: Task[Unit]') in sequence, and then returns the final result.

Otherwise the code is very similar to the ZIO implementation. Here’s the full source for you to browse.

Both the Scalaz and Monix implementations come with tests which simulate deep and wide chains of crawled links. This way we can verify that the solutions are not only correct, but also stack-safe.

 

Sockets example

The repository also contains another example, called sockets. It shows how to deal with two common problems:

  1. interfacing with a legacy, blocking API. Here, we have a server socket ('Socket') with a blocking and exception-throwing 'accept' method, and client sockets ('ConnectedSocket') with blocking 'send'/'receive' methods.
  2. broadcasting messages to a large number of clients. This is a common requirement e.g. when dealing with websockets

All examples use several processes:

  • the router process ('Actor'/'Behavior'/'Task'/'IO') manages the server socket and broadcasts messages received from any connected client sockets to all other connected client sockets
  • the socket process accepts new client connections, which result in new instances of a 'ConnectedSocket'
  • the client send/receive processes are created for each client 'ConnectedSocket' and send message or listen for new ones

If at any time a 'SocketTerminatedException' is thrown by a client socket send/receive operation, the client socket needs to be closed and removed from the router.

The code is constructed in the same way as before, no significant new ideas are introduced. Still, it might be educational to explore the code on your own. As in the other examples, there’s also a test suite which might be useful for verifying that the code actually works.

 

Summary

In this part we’ve built on the ideas presented in the introductory article, adding communication to our asynchronous processes. As in the last part, the overall structure of the code for all of the different implementations isn’t that different. There are significant differences in type safety, the exact semantics of the constructed objects — but the way communication is performed, via asynchronous message passing — is the same.

It’s quite easy to identify how concepts from actors can be mirrored to the ZIO/Monix worlds. In Akka, each actor is associated with a mailbox: which is a queue of incoming messages. In Monix/ZIO, if we need to model communication, we need to create a queue.

While in Akka we pass around (typed or untyped) 'ActorRef's, so that one actor can send messages to another actor, in ZIO we pass around (typed) 'IOQueue's and in Monix 'MQueue's or 'MVar's, depending on the use-case. Again, this is not that different.

This gives us another piece of the answer to the questions stated in the first part of the series: can ZIO/Monix offer an alternative to Akka/Akka Typed? As far as state management and communication is involved: yes. Keep in mind however, that we are looking at a small portion of what Akka is: while things like remoting, clustering or persistence could be implemented using the ZIO/Monix approach as well, there are no libraries which implement these functionalities (at least yet).

In the final part, we’ll look at failure management, supervision and cancellation. Stay tuned!'

 

This article was written by Adam Warski and posted originally on SoftwareMill Blog.