Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Where is the llama for FS2? by Michał Matłoka

W1siziisijiwmtkvmdevmtevmtyvmtevmdkvndy4l3blegvscy1wag90by0zntywnzkuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

When dealing with data if you end up encountering lots of gigabytes processed by several systems, it can all get a bit complicated! However, dealing with the integration of different data sources is a daily task for many developers.

Take a look at this article from Senior Software Engineer, Michał Matłoka where he looks at the different system integration tools and libraries.

 

'Integration of different data sources is quite a common task during everyday developer work. Is there anyone, who never needed to read information from one place, transform it, and save them somewhere else? If you are dealing with a small amount of data and not too many formats, this can be quite a simple task, but if you encounter hundreds of gigabytes processed by several systems, then this can become a much more complicated problem.

During this post we are going to take a look at the different system integration tools and libraries, verify if there is the Alpakka equivalent for FS2 (let’s call it for now Llama), and see what are the other options for FS2.

 

Apache Camel

Over the years many tools, methods and technologies were created in order to make such transformations easier, safer, or just “enterprise ready”. One of the terms you may have heard of is the Enterprise Integration Patterns (EIP). They are implemented e.g. by Apache Camel. Included features allow to cover really complicated scenarios of integrating various systems. However, in many cases Camel appears to be a too powerful tool for the problem, or sometimes it just presents a too steep learning curve.

Despite its long history (over 10 years!) Apache Camel is still actively developed. Quite recently a new variation called Apache Camel K was published. Its role is to bring Camel features to Kubernetes, serverless and microservices worlds.

 

Akka Streams & Alpakka

In a totally different part of developers’ world we quite often encounter the Akka Streams library. It’s designed to handle “streams” of data in an efficient, robust and asynchronous manner. Whenever you hear about Akka Streams, something called “back-pressure” is usually mentioned as well. It relates to a situation when one of the stream processing elements may influence the other ones. It can pass “back” information indicating that the data is coming too fast or too slow. The real power of Akka Streams comes with Alpakka — an initiative to implement reactive integration pipelines for various systems and libraries. Alpakka allows you to integrate with various databases, AWS, Azure and GCP products, but also with different messaging systems. If you’d like to learn more about Akka Streams and Alpakka, then take a look at the presentation by Jacek Kunicki.

Did you know that Alpakka Kafka was originally named Reactive Kafka and was created by SoftwareMill? One of its main contributors is our colleague — Krzysiek Ciesielski.

So, when do we really need Alpakka and “back-pressure”? Just imagine that you are receiving data from Apache Kafka, which is then decoded and processed. Messages may be split into multiple separate entries and then, at some point you may need to save them to a database.

How would you do that without Akka? In one of the scenarios you could simply call the 'KafkaConsumer#poll' method obtaining multiple records, you would make some transformations using the Scala collections APIs, and then you would just execute, for each row, a separate operation to do a database insert. At the end you would need to commit a message to Kafka.

1 def processAndInsert(...): Future[...] = // decode, insert, commit ...
2
3 while(true) {
4   val records = consumer.poll(Duration.ofMinutes(1)).asScala
5   val future = Future.sequence(records.map(processAndInsert))
6   Await.ready(future, ...) // ugly ;) 
7 }

What can go wrong? Actually, everything. Single message could be invalid — so you would need to include error handling. Then, database may not be working — you need retries. You may suddenly receive 20k rows to be inserted at once to the database. Doing some basic 'Future.sequence' could fail in multiple ways. Possibly your database could crash, but if it survived such sudden batch of rows, then some single inserts could fail. Then, of course, you need to handle individual inserts retries or implement custom mechanism to prevent such situations in the future.

What about the Alpakka? Well, you get a source giving you single messages, you can map them asynchronously limiting parallelism (e.g. 100 parallel inserts vs 20k), but also you may just define a policy for error handling. Much less error-prone.

1 val cassandraInsertFlow = CassandraFlow.createWithPassThrough(parallelism = 100, preparedStatement, statementBinder)
2
3 val decider: Supervision.Decider = {
4   case _: BadMessageFormat => Supervision.Resume
5   case _ => Supervision.Stop
6 }
7
8 Consumer
9   .committableSource(consumerSettings, Subscriptions.topics(topic))
10   .mapAsync(parallelism = 100)(process)
11   .via(cassandraInsertFlow)
12   .map(_.committableOffset)
13   .toMat(Committer.sink(committerSettings))(Keep.both)
14   .withAttributes(ActorAttributes.supervisionStrategy(decider))
15   .mapMaterializedValue(DrainingControl.apply)
16   .run()
 

FS2

What about other Scala streaming-related libraries? In a more “pure” world FS2 (previously Scalaz-Stream) is quite popular. It integrates nicely with cats-effect IO Monad. You can build a whole, nicely structured, application with FS2, http4scats-effect and e.g. doobie. But what about integrating FS2 with various systems? Well, there is a list of related projects. Maybe it’s not as impressive as for Alpakka, but it includes databases like Apache Cassadra, messaging systems like Apache KafkaRabbitMQ, JMS and others. It’s not that bad. On the other hand, unfortunately, quality of the libraries can vary. In some cases the code is very nice, but they lack documentation. Some are outdated (e.g. supporting old versions of queuing systems), it depends. You may also find, on GitHub, multiple different libs actively developed to support the same system. But does it mean that you can’t use FS2 in a production system? Not really. However, if the implemented connectors are not good enough for you, then there is another way:

1 for {
2   executionContext <- consumerExecutionContextStream[F]
3   consumer <- consumerStream[F].using(consumerSettings(executionContext))
4   _ <- consumer.subscribe(NonEmptyList.one(topic))
5   stream <- consumer.stream.mapAsync(100)(process).map(_.committableOffset).to(commitBatchWithin[F](10, 15.seconds))
6 } yield {
7   stream
8 }
 

Streamz

The Streamz library allows to integrate FS2 with Akka Streams (& Alpakka!) and… Apache Camel! This, of course, means that you can also integrate Akka Streams with Camel itself. This way, all of the communities can share integrations. You may start with FS2, if you don’t find the connector you need, then you can use Streamz to get Alpakka’s one. But when you reach a moment, when you decide that something more powerful is needed, then you can integrate with Apache Camel! Sounds great (but keep in mind that Camel is synchronous and blocking).

Streamz allows to convert Akka 'Source', 'Sink' but also 'Flow' to FS2 equivalents.

1 val akkaSource: akka.stream.scaladsl.Source[Int, NotUsed] = ...
2 val fs2Source: fs2.Stream[IO, Int] = akkaSource.toStream[IO]()
3
4 val akkaSink: akka.stream.scaladsl.Sink[Int, Future[Done]] = ...
5 val fs2Sink: fs2.Sink[IO, Int] = akkaSink.toSink[IO]()
6 
7 fs2Source.to(fs2Sink).compile.drain.unsafeRunSync() // works!
We can modify the previous FS2 Kafka related example, so that it uses an Alpakka 'CasandraFlow' for the processing (insertions).
1 implicit val system: ActorSystem = ActorSystem()
2 implicit val materializer: ActorMaterializer = ActorMaterializer()
3
4 val alpakkaFlow =  CassandraFlow.createWithPassThrough(...) // Akka
5 val flow = alpakkaFlow.toPipe() // FS2
6
7 for {
8   executionContext <- consumerExecutionContextStream[F]
9   consumer <- consumerStream[F].using(consumerSettings(executionContext))
10   _ <- consumer.subscribe(NonEmptyList.one(topic))
11   stream <- consumer.stream.through(convertedCassandraFlow).map(_.committableOffset).to(commitBatchWithin[F](1, 15.seconds)) // fs2 -> akka -> fs2
12 } yield {
13   stream
14 }

Such approach has, unfortunately, some drawbacks. For sure there is a performance penalty related to the conversions needed, but also, in order to use Alpakka from FS2, you need to, of course, bring an 'ActorSystem' to your project, which includes a few thread pools (and some people just consider it harmful).

 

Reactive Streams

What about the Reactive Streams initiative? Since both Akka Streams and FS2 implement the proper interfaces, then it should be possible to integrate via them. In FS2 it is required to include separate 'fs2-reactive-streams' module, but after that you can easily convert a 'fs2.Stream' using 'toUnicastPublisher' method to 'org.reactivestreams.Publisher'

On the other hand 'akka.stream.scaladsl.Source' includes 'asPublisher' and 'fromPublisher' methods. This means that you can do the following:

Source.fromPublisher(fs2stream.toUnicastPublisher())
Similarly 'akka.stream.scaladsl.Sink' includes also 'asSubscriber' and 'fromSubscriber' methods.

Drawbacks? The Reactive Streams classes are mutable and based on Java interfaces. What is more, it can be challenging to convert an Akka Stream 'Flow'. However, with Reactive Streams you are not limited only to Akka Streams and FS2. There is also Monix where e.g. you can convert 'monix.reactive.Observer' to 'org.reactivestreams.Subscriber' using the 'Observer.toReactiveSubscriber(observer)'. Unfortunately, Reactive Streams interfaces are not integrated with RxScala, they only work with RxJava.

 

Summary

Does the FS2 need its Llama, similar to Camel or an Alpakka? Well, for sure it would be nice to have a common place for all “connectors” working with FS2. They could have a common documentation format and verified quality. Also, developers could unite in their work to create single implementations supporting specific systems. However, at the same time, thanks to the Reactive Streams initiative or libraries like Streamz, the FS2 community can benefit from connectors created for other libraries. This makes FS2 a really great piece of code, possible to use in production-stable systems.

Of course FS2 and Akka Streams are not the only choice for streaming in Scala projects. There is Monix (you can find a comparison between Akka Streams and Monix in Jacek Kunicki blog series and presentation) but also, the new ZIO Streams are coming!'

 

This article was written by Michał Matłoka and posted originally on SoftwareMill Blog.