Step up your Scala skills!
Software Engineer, Danny Kay was new to Scala last year but has taken his knowledge to a new level by learning all about Kafka Streams with Scala. Want to learn with him?
Check out his article.
'This month, I’m learning Kafka Streams…with a Scala twist.
Most of the Kafka Streams examples you come across on the web are in Java, so I thought I’d write some in Scala.
I decided to start learning Scala seriously at the back end of 2018. I’d written a bit of Scala in the past but wanted to step it up a notch. I have a Java background and my current role involves writing Kotlin, but…I’m still learning Scala so this adds complexity to this months learning.
This isn’t an introduction to the subject of Kafka Streams, this is an introduction to the Streams DSL for Scala, so I won’t be going over theory such as what Kafka Streams is, processor topology, etc. I’ll be covering the basic structure of a Kafka Streams application and some of the more common operations. If you want an introduction, I advise you to buy Kafka Streams in Action by William P. Bejeck Jr. Its an amazing book and I’ve learnt a ton from it.
A basic Kafka Streams application
The Kafka Streams DSL is built on top of another part of Kafka Streams which is the Processor API. Confluent recommend users start their Kafka Streams journey with the DSL. The Processor API is low level were as the DSL allows users to write business logic in a declarative and functional format.
The DSL allows us to perform operations on records with a few lines of code.
Lets get stuck in.
The first thing we do is import 'org.apache.kafka.streams.scala.ImplicitConversions._' and 'org.apache.kafka.streams.scala.Serdes._'. The reason for this is explained here, it brings primitive SerDes and abstraction methods into scope without having to keep providing them when needed.
Next, we load the configuration.
I use a package called config, which is a configuration library. It allows you to store configuration values in a '.conf' file in the 'resources' folder and then access those values in your code by first creating a reference to the config file then specifying which property you want the value of.
Next, we create an instance of the Transaction SerDes.
This is a custom SerDes that I created that takes a instance of a Transaction class and serializes it to a byte array and vice versa. We could have SerDes for all kinds of scenarios, including SerDes for JSON, Avro etc.
What is also noticeable is that the 'transactionSerde' is declared as 'implicit'. The reason for this is so that that we don’t have to specify it when calling the 'to()' method of the 'KStream' class and 'from()' method of the 'StreamsBuilder' class.
Next we build up the Properties. This is were we use the config file I mentioned earlier to set the broker url and the id of the application. I love config files, hard coded values make me feel uneasy.
We can configure many more properties in here, but as this is an introduction the two properties above will suffice.
Next we create an instance of the 'StreamsBuilder' class.
The 'StreamsBuilder' class provides us with a new Kafka Streams topology. We then are able to attach nodes to this topology. The nodes carry out a particular function such as streaming records from Kafka or performing processing related operations.
Nodes and mapValues
The first node we create is the source node. This node is responsible for consuming records from Kafka for our application to process. We call the 'stream()' method passing in the name of the Topic we want to consume from.
The 'stream()' method also takes an 'implicit', but due to having imported 'ImplicitConversions._' and 'Serdes._' and specifying the Transaction Serdes ('transactionSerde') as 'implicit', we don’t have to pass in a value.
Next we call the 'mapValues()' method. In doing this we are creating another node which is referred to as a processor node. This method transforms the value of each input into a new value, potentially even a new type…but not in our case.
It takes an anonymous function as an argument, with either the key and value types or just the value type as parameters. So here we are passing the current transaction into the function and calling the 'maskCreditCard()' method which takes a credit card number and masks it so only the last 4 digits are visible and returns the transaction object.
Now our application has two nodes. A source node and a processor node. It might not be straight forward to see this from the block of code above.
The code below might help in understanding this concept.
Here the 'transactionsKStream' is the source node and the 'maskedCreditCardStream' is the processoer node. In the first block of code the operations are chained to make the code for concise.
We are now ready to send records back into Kafka.
So now our 'transactionsKStream' contains 'Transaction' objects with a masked credit card number. The object also contains a few other properties such as card type, expiry, amount spent, location of transaction and account number but we aren't going to do anything with these values.
The Transaction records are now ready to be sent back to Kafka.
We call the 'to()' method specifying the Topic name we want to send the records to. Again, we are getting this value from the configuration file. Calling the 'to()' method generates another node which is referred to as the sink node.
Now we have 3 nodes. A source node that’s responsible for consuming records from Kafka. A processor node that masks the credit card number. Lastly, a sink node which sends the records back into Kafka.
Next we create a Kafka Streams client, passing in the properties object and the instance of the 'builder' we created at the beginning and calling the 'build()' method.
The 'build()' method returns a Topology which is an acyclic graph of the 3 nodes we have created. Passing the Topology into the 'KafkaStreams' constructor allows us to use the nodes in our application.
We are then able to start our Kafka Streams application…
And last but not least we tap into the 'ShutdownHookThread'.
This allows us to gracefully shutdown the application. If the JVM detects that the application is to shutdown it calls the 'streams.close()' method for us. We give the threads 10 seconds to join up.
If you have Kafka and Zookeeper running you should have trouble running the Kafka Streams application.
Now we have a working Kafka Streams application, go you…well done!
The only way to tell (at the current moment in time) is to run a console consumer to check the 'credit.card.transactions.masked' Topic is receiving records.
There is nothing wrong with this approach, the data wont be readable but we’ll be able to see records being consumed. There is a way we can get our masked credit card stream writing records to stdout so we can see the actual values. We can use the 'KStream.print()' method to accomplish this. The 'print()' method takes an instance of the 'Printed' class. This class provides two static methods, 'Printed.toSysOut()' and 'Printed.toFile(filePath)'.
With this processing node in our topology, we’ll now see records printed out when the application is running.
We’ll now see records printed out with a label when the application is running.
We can also use 'peek()'. The problem with 'print' is that it changes the type of the KStream it’s called on so we are unable to perform processing operations.
The 'peek()' allows us to perform such as 'printLn()' and carry on adding processing nodes.
There may be times when we need to filter out records that we aren't interested in. We can use the 'filter()' method for this. This method takes one parameter which has to be a predicate. This is a function that returns a Boolean. We can define a predicate in two different ways.
We call the 'filter()' method passing in an anonymous function as an argument which returns records were the amount is over 100.00.
We now have a new KStream with filtered out records.
Alternatively, we can create a anonymous function, assign it to a variable and pass that predicate into the 'filter()' method.
This is useful if the predicate needs to be used in more than one place.
For each record that we process, there may be an operation that we want to perform that isn't a part of the DSL. Such as adding a record to a database, sending the record to a 3rd party API, through an enrichment service etc.
For this, we can use the 'foreach()' method.
Here we’re filtering out transactions were the card type is Visa and for each record we’re passing the transaction into a method which inserts it into a database and prints out the transaction also.
Alternatively, we could have used anonymous functions again.
Our stream application isn’t always going to be receiving records, processing them and sending them back into a Topic. There may be instances were the stream needs to branch into several child streams depending on a particular predicate.
We can use the 'branch()' method for this.
The 'branch()' method takes a number of predicates and returns an array of KStream instances branched from the original stream based on the supplied predicates.
So here we are creating two predicates, one for the MasterCard type and another for the Visa type. We pass these two predicates into the 'branch()' method.
The 'branchedTransactionKStream' is an array containing two KStreams. 'branchedTransactionKStream(0)' contains records were the type is Mastercard and 'branchedTransactionKStream(1)' contains records were the type is Visa.
We then can perform processing operations as usual with the newly created streams.
Alternatively, we could have used anonymous functions again instead of creating predicates and assigning them to variables.
In this article I’ve covered a basic Kafka Streams application and 5 useful methods, 'mapValues()', 'print()', 'filter()', 'foreach()' and 'branch()'.
There are many more methods available and they are documented here.
Many thanks as always for reading my articles, it’s really appreciated. Any thoughts, comments or questions drop me a tweet.