Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Functional Programming with Kafka Streams and Scala by Joan Goyeau

W1siziisijiwmtgvmduvmzavmtqvmtqvmzuvnzkyl3blegvscy1wag90by02otm4ntkuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

Have you ever wondered about the functional programming on a social network? This article by Joan Goyeau looks at how such a scalable system can be achieved without going high in complexity.

 

"Context

I’ve been working for a social network lately where we faced interesting business use cases. Created by famous people, we anticipated from the early design of the architecture that the co-founders’ fans would drive a surge of traffic to the site as soon as it launched. Therefore such a system should be scalable from any point of view since we are having huge load of people viewing, posting, commenting, liking...

Usually power is in direct relation to the complexity of your system and since we are a startup with a small backend team we also need the system to be simple to run and maintain as we don’t have the data team, the operation team, the dev team, the whatever team… We are more like a unique team of around 4 devs which each of them understand every part of the system.

So how can we achieve such a scalable system without going high in complexity? This is what we’ll see in this story.

Use case

For the sake of simplicity, in this story I will boil down the use case to a minimal set of functionality.

As a social network it is possible to publish posts:

And as you can see it is also possible to interact with a post through comments and likes. Let’s have a closer look at one of these posts:


We can see that this little card already contains quiet some information:

  • The post itself with the text “Money can’t buy taste” and the boat picture,
  • But also information about the author with his name and it’s avatar
  • And some counters about the number of likes and comments this post received.
 

Model

Let’s define our model representation of the above UI.

We will have ids everywhere in our system so we define a case class for it and we extend AnyVal to avoid boxing whenever it’s possible, this lets us be more type safe than using a simple String:

case class Id[Resource](value: String) extends AnyVal

And here are all the case classes for the User, Post, Like and Comment:

import java.time.Instant
import java.net.URI

case class User(id: Id[User], updatedOn: Instant, image: URI, nickname: String, verified: Boolean, deleted: Boolean)
case class Post(id: Id[Post], updatedOn: Instant, author: Id[User], text: String, image: URI, deleted: Boolean)
case class Like(userId: Id[User], postId: Id[Post], updatedOn: Instant, unliked: Boolean)
case class Comment(id: Id[Comment], postId: Id[Post], updatedOn: Instant, author: Id[User], text: String, deleted: Boolean)

 

CQRS model

Our main requirement is that the system should scale horizontally on reads and writes.

 

Read optimised approach

A naive approach is to store all the data in some database and generate the post views by querying the post itself, the user’s name and avatar with the id of the author and calculating the number of likes and comments, all of that at read time.
Even though you can optimise the reads with indices and caching, this will not scale if we have too many reads.
 

Write optimised approach

Another way of doing is probably doing some denormalisation at write time. So we could create a DenormalisedPost case class that will contain all the necessary informations for the reads:

case class DenormalisedPost(post: Post, author: User, interactions: Interactions)
case class Interactions(likes: Set[Like], comments: Int)

And then we could at write time update this DenormalisedPost, for example incrementing the like counter when someone like the post.
This would make the reads very fast but it wouldn’t scale if we have many writes. For example if a post is very popular and so many people rush to like it, we would be locking the denormalised post way too much.

 

CQRS approach

A real solution is to implement the CQRS (Command Query Responsibility Segregation) model:


This basically separates the reads and the writes to let them scale. We would therefore be able from the API to write at high throughput new users, posts, comments and likes in the command model and read the denormalised posts in the query model.

One thing is missing though, if we separate the reads and the writes how the data we write in the command model will end up in the query model?
This is what Kafka Streams is for, it’s a data processing library that can consume data from topics, transform them and sink in other topics.

So here is how we can implement the CQRS approach with Kafka and Kafka Streams:

The API will write it’s commands in some Kafka topics. Those topics will be consumed by Kafka Streams, transformed and sank in some other topics. Those other topics will be connected to write in databases like Elasticsearch, MongoDB, Redis… whichever is the best for the API to query.

Note that this architecture is eventually consistent, this means if I like a post and query straight away the number of likes on this post it might not already be incremented. We usually solve this issue with the frontend faking the increment in the counter.

 

Command Model

Let’s see how it looks like in our social network:
We will have 4 Kafka topics for the command model which, you might have already guessed, will map to our model classes:
  • users
  • posts
  • likes
  • comments

So the API will be able to write the case classes previously defined in their corresponding topics:


 

Query model

But now you might wonder what and how are we going to write in the databases where the API will read from.

Kafka Streams will consume the posts, users, comments, and likes command topics to produce Denormalised Post we’ve seen in the Write optimised approach in a DenormalisedPosts topic which will be connected to write in a database for the API to query:

 

Circe and Kafka Serdes

We’ve come up until here almost without touching a line of code, having plans is good but implementing them is better.
As we can see on the schemas there is a lot of writing/producing and reading/consuming going on here. So before diving into any implementation let me introduce Circe, probably the best JSON library in Scala.

Very simply Circe defines 2 type classes for you, Encoder[T] and Decoder[T], which respectively knows how to encode and decode T. All of that is well documented on https://circe.github.io/circe/.

Kafka has also quiet the same concept except that they are called Serializer[T]and Deserializer[T]. Again each of them know how to serialise and deserialise T. But not specifically to JSON! You might if you’d like serialise/deserialise into bytes.

Kafka Streams goes a bit further with the Serde which is essentially just grouping the Serializer[T] and Deserializer[T] of a same type into one type Serde[T] which knows how to serialise and deserialise T.

So why are we looking to those concepts? Well we probably want to serialise our records to JSON, as this is an easy and commonly used format, and therefore use Circe. But we surely don’t want to write a Kafka Serde for every (automatically generated?) Circe Encoder/Decoder.
And this is exactly what Kafka Streams Circe can do for you. All you need is adding one import and this will bring all the Kafka Serde for which we have a Circe Encoder/Decoder:

import com.goyeau.kafka.streams.circe.CirceSerdes._

Kafka Producer

We saw how to serialise and deserialise some Scala object to JSON. Now it’s time to use this ability to produce data in the Command model topics.

Let’s have a look at the Kafka Producer that we will be using in the API server code:

val config = new Properties()
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
val postProducer = new KafkaProducer(
  config,
  CirceSerdes.serializer[Id[Post]],
  CirceSerdes.serializer[Post]
)

val post = Post(Id[Post]("post0"), Instant.now(), Id[User]("user0"), "Some text", URI.create("https://some-uri"), false)
producer.send(new ProducerRecord("posts", null, post.updatedOn, post.id, post)).get()
postProducer.close()
view rawProducer.scala hosted with ❤ by GitHub

In this snippet we are creating a post producer and then sending a Post to the posts topic.
As you can see sending a post is quiet boilerplaty. Every time we want to send a record, we need to attach some configuration to it. Not only this is adding overhead when we just want to send a record but also can lead to a mistake where a post is sent to the wrong topic or the wrong serialiser being used. It would be better if we could involve the compiler to check that for us.

Indeed this can be greatly improved by creating a type class Record that will contain all the config about a record. This type class will then be used by a new send() function that takes only the object to send (Post in this example) and configure automatically the topic, id and timestamp of this record:

trait Record[K, V] {
  def topic: String
  def key(value: V): K
  def timestamp(value: V): Long
}

object Producer {
  def apply[V] = new ProducerBuilder[V]
  class ProducerBuilder[V] {
    def apply[K](config: Properties)(implicit record: Record[K, V],
                                     keySerializer: Serializer[K],
                                     valueSerializer: Serializer[V]): KafkaProducer[K, V] =
      new KafkaProducer(config, keySerializer, valueSerializer)
  }
}

implicit class KafkaProducerOps[K, V](kafkaProducer: KafkaProducer[K, V]) {
  def send(value: V)(implicit record: Record[K, V]): Future[RecordMetadata] = Future {
    kafkaProducer.send(new ProducerRecord(record.topic, null, record.timestamp(value), record.key(value), value)).get()
  }
}

All we need to do now is adding an implicit Record[Id[Post], Post] in the companion object of Post:

object Post {
  implicit val record: Record[Id[Post], Post] = new Record[Id[Post], Post] {
    val topic = "posts"
    def key(post: Post): Id[Post] = post.id
    def timestamp(post: Post): Long = post.updatedOn.toEpochMilli
  }
}

and then we can just send directly the post, the configuration will be found automatically by implicit resolution:

This looks much better, easier to understand and free of mistakes since the compiler will do the work for us.

 

Kafka Streams

Now that we have the Command model topics produced we will need to consume them to generate the Query model. And that’s where Kafka Streams kicks in, as a data processing framework it will let us do stateful transformations.

 

Streams

First let’s create those streams:

val streamsBuilder = new StreamsBuilder()
val postsStream = streamsBuilder.stream[Id[Post], Post]("posts")

As we can see there is again some boiler plate here that may lead to runtime errors as we saw when creating the Kafka Producer. Not only we get to rewrite the 3 information again (key type, value type, topic) but if the types are not matching the topic, there is no compile time check, it would blow at runtime!
Fortunately there is a way of getting over it, reusing the previously created Record[K, V] type class:

implicit class StreamsBuilderOps(streamsBuilder: StreamsBuilderS) {
  def streamFromRecord[V] = new StreamBuilder[V]

  class StreamBuilder[V] {
    def apply[K]()(implicit record: Record[K, V], consumed: Consumed[K, V]): KStreamS[K, V] =
      streamsBuilder.stream[K, V](record.topic)
  }
}

This can be used as below to create all the streams we will need:

val usersStream = streamsBuilder.streamFromRecord[User]()
val postsStream = streamsBuilder.streamFromRecord[Post]()
val commentsStream = streamsBuilder.streamFromRecord[Comment]()
val likesStream = streamsBuilder.streamFromRecord[Like]()

Pretty neat but don’t forget the empty brackets at the end.

 

KTables

Alright, you’ve been reading for too long now to finally see the first stream processing code that will power our social network, be carful Mark Zuckerberg!
The goal here is to generate DenormalisedPost as we receive Users, Posts, Likes and Comments. For that we will need to build KTables that will represent our state and then we will be able to join them on their common key.

So we want a KTable of users by their id to join with a KTable of the posts by author id (user id).

You might wonder why using a KTable for posts? Why not using the original KStream rekeyed by author id to join with the KTable of users by their id, essentially querying a user for each post that comes?
Well for the reason that if a user gets updated after he posted, we will need to find back all the posts he created to update the user there too. This can only be done if we keep the posts in a KTable, effectively building a queryable state for them.

 

Commutativity

This brings us on the concept of commutativity. A simple example is:

1 + 2 + 3 = 6
3 + 2 + 1= 6

Therefore we can say that the + operator is commutative since it doesn’t matter in which order the numbers are coming.

This is exactly what we are doing here with the Posts and Users, it shouldn’t matter in which order they are coming, the output should always be the same DenormalisedPost.

Alright, let’s have a look at the code that creates these 2 KTables:


val usersByKey = usersStream
  .groupByKey
  .reduce((first, second) => if (first.updatedOn.isAfter(second.updatedOn)) first else second)

val postsByAuthor = postsStream
  .groupBy((_, post) => post.author)
  .aggregate(
    () => Map.empty[Id[Post], Post],
    (_, post: Post, posts: Map[Id[Post], Post]) =>
      if (posts.get(post.id).exists(_.updatedOn.isAfter(post.updatedOn))) posts
      else posts + (post.id -> post),
    Materialized.`with`[Id[User], Map[Id[Post], Post], KeyValueStore[Bytes, Array[Byte]]](
      CirceSerdes.serde[Id[User]],
      CirceSerdes.serde[Map[Id[Post], Post]]
    )
  )
  .mapValues(_.values.toSet)
  • usersByKey group by the key (user id) and reduce all the user we get for the same key to keep only the latest version of it. The check on isAfterensure commutativity, as it doesn’t matter in which order the reduce receives the users, the result will be the same.
  • postsByAuthor group by the author of the post (user id) and aggregate all the posts we get for the same author id. We are also doing the check on isAfter to stay commutative. One important point here is that we are grouping, not by key, but by another field of the document (the author), this operation is more expensive than grouping by key as it envolves repartitioning of the stream. Although this is required here as we will see later, the 2 KTables or KStreams we are joining need to be co-partitioned.
 

Associativity

It is important here that we also respect the concept of associativity. For example the + and * operators used together are not associative:

(3*5)+6 = 21
3*(5+6) = 33

As we can see the result will not be the same depending how you apply the operators even though the order of the elements is the same.

If we look at the reduce operation of usersByKey, it doesn’t matter how we apply it in regards to their updatedOn field:

(2018–05–20, 2018–05–01), 2018–05–28 = 2018–05–28
2018–05–20, (2018–05–01, 2018–05–28) = 2018–05–28

The result will always be the same.

Let’s add the KTables for Likes and Comments:

val likesByKey = likesStream
  .groupByKey
  .aggregate(
    () => Set.empty[Like],
    (_, like: Like, likes: Set[Like]) => if (like.unliked) likes - like else likes + like,
    Materialized.`with`[Id[Post], Set[Like], KeyValueStore[Bytes, Array[Byte]]](
      CirceSerdes.serde[Id[Post]],
      CirceSerdes.serde[Set[Like]]
    )
  )

val commentCountByKey = commentsStream
  .groupByKey
  .aggregate(
    () => 0,
    (_, comment: Comment, count: Int) => if (comment.deleted) count - 1 else count + 1,
    Materialized.`with`[Id[Post], Int, KeyValueStore[Bytes, Array[Byte]]](
      CirceSerdes.serde[Id[Post]],
      CirceSerdes.serde[Int]
    )
  )

Pretty straight forward, we add or remove likes depending on if it’s a like or unlike and for comments we add or subtract 1 to the counter.

 

Idempotence

But there is one issue with the comment counter, it’s not idempotent. Imagine a situation where the user click Publish comment twice (like my mother because she thinks it’s a file browser) then 2 events might get triggered. In that case we will wrongly count 2 instead of 1 comment.

If the API is well done the id should be generated by the frontend and therefore the duplicate event will have the same id. A solution would then be to use a Set[Id[Comment]] instead of the Int:

val commentCountByKey = commentsStream
  .groupByKey
  .aggregate(
    () => Set.empty[Id[Comment]],
    (_, comment: Comment, commentIds: Set[Id[Comment]]) =>
      if (comment.deleted) commentIds - comment.id else commentIds + comment.id,
    Materialized.`with`[Id[Post], Set[Id[Comment]], KeyValueStore[Bytes, Array[Byte]]](
      CirceSerdes.serde[Id[Post]],
      CirceSerdes.serde[Set[Id[Comment]]]
    )
  )
  .mapValues(_.size)

The Set would indeed guaranty the idempotence.

 

Semilattice

If you combine all Commutativity, Associativity and Idempotence you will obtain what we call a Semilattice. This is a concept to bear in mind when we design robust data pipeline.

It guarantees that not matter in which order the events are coming, how we apply our operations and if there is duplicate events, the result will always be the same.

 

Joins

We’ve started with KStreams, from them we built up a state with KTables, now it’s time to join all of them to create the long awaited DenormalisedPosts:

postsByAuthor
  .join(usersByKey,
        (posts: Set[Post], author: User) =>
          posts.map(DenormalisedPost(_, author, DenormalisedPost.Interactions(Set.empty, 0))))
  .toStream
  .flatMapValues(identity)
  .groupBy((_, denormalisedPost) => denormalisedPost.post.id)
  .reduce((first, second) => if (first.post.updatedOn.isAfter(second.post.updatedOn)) first else second)
  .leftJoin(likesByKey,
            (denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
              Option(likes).fold(denormalisedPost)(denormalisedPost.lens(_.interactions.likes).set(_)))
  .leftJoin(commentCountByKey,
            (denormalisedPost: DenormalisedPost, commentCount: Int) =>
              denormalisedPost.lens(_.interactions.comments).set(commentCount))
  .toStream
  .to("denormalised-posts")

Ok, a lot going on here!

  • We first join postsByAuthor with usersByKey creating our DenormalisedPost with an empty Interactions.
  • We convert to a stream so that we can flatten the DenormalisedPosts and create again a KTable of DenormalisedPosts but keyed by post id this time.
  • Then we leftJoin with likesByKey where we add the likes to the Interactions. We use leftJoin here as there might not be any likes.
  • We do the same with commentCountByKey.
  • And finally we convert to a stream to persist into the denormalised-posts topic.

Can we improve something here?
We can always improve something… In this case the .to(“denormalised-posts”) doesn’t look very safe for exactly the same reasons we saw with the producers and the stream builder. Even though here it’s not to verbose, there is no guaranty at run time that the given topic is the one that contains the right type of document.

Again we can decorate the KStream type with a toTopic function that will reuse our Record type and give the right topic for the output type of the stream:

implicit class KStreamOps[K, V](stream: KStreamS[K, V]) {
  def toTopic(implicit record: Record[K, V], produced: Produced[K, V]) = stream.to(record.topic)
}

The usage becomes simply:

...
  .toStream
  .toTopic

Any other stuff we can improve?

 

Monocle

I don’t like much this multiple .copy() we are doing in the joins with likes and comments:

...
  .join(likesByKey,
        (denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
          denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(likes = likes)))
  .join(commentCountByKey,
        (denormalisedPost: DenormalisedPost, commentCount: Int) =>
          denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(comments = commentCount)))

For each level of object we need to manually copy them modifying the part we want. This is tedious compare to a mutable structure where we can just do something like denormalisedPost.interactions.likes = likes.

Fortunately we can use lenses from Monocle that lets us do:

...
  .join(likesByKey,
        (denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
          denormalisedPost.lens(_.interactions.likes).set(likes))
  .join(commentCountByKey,
        (denormalisedPost: DenormalisedPost, commentCount: Int) =>
          denormalisedPost.lens(_.interactions.comments).set(commentCount))

Even though we didn’t reach the simplicity of a mutable structure this is much easier to use.

 

Conclusion

This story showed on a reduced example how we can build a fully scalable system but still easy to run and maintain.

Hopefully you understood the 3 properties of Commutativity, Associativity and Idempotence all 3 forming a Semilattice, which is an important concept to build robust data pipelines.

We’ve also learnt about come nice libraries other than Kafka Streams, like Circe and Monocle."

 

This article was written by Joan Goyeau and originally posted on Medium.com