This is part 3 and part 4 from the series of blogs from Marko Švaljek regarding Stream Processing With Spring, Kafka, Spark and Cassandra. If you missed part 1 and part 2 read it here.
'Part 3 - Writing a Spring Boot Kafka Producer
We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.
Setting up a project
- Project SDK: Java 8
- Initializr Service URL: https://start.spring.io
- Next
- Name: spring-boot-kafka-example
- Type: Gradle Project
- Packaging: Jar
- Java Version: 1.8
- Language: Java
- Group: com.example
- Artifact: spring-boot-kafka-example
- Vesion: 0.0.1-SNAPSHOT
- Description: Spring Boot Kafka Example
- Package: com.example
- Next
- Spring Boot Version: 1.3
- Core - Web
- Next
- Project name: spring-boot-kafka-example
- The rest is just fine ...
- Finish
- After creating project check sdk setting, it should be java 8
build.gradle dependencies
1 compile('org.apache.kafka:kafka_2.11:0.9.0.0')
2 compile('org.apache.zookeeper:zookeeper:3.4.7')
application.properties
1 brokerList=localhost:9092
2 sync=sync
3 topic=votes
SpringBootKafkaProducer
This is the class where all the important stuff is happening
1 package com.example;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.clients.producer.RecordMetadata;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.context.annotation.Configuration;
9
10 import javax.annotation.PostConstruct;
11 import java.util.Properties;
12 import java.util.concurrent.ExecutionException;
13
14 @Configuration
15 public class SpringBootKafkaProducer {
16
17 @Value("${brokerList}")
18 private String brokerList;
19
20 @Value("${sync}")
21 private String sync;
22
23 @Value("${topic}")
24 private String topic;
25
26 private Producer<String, String> producer;
27
28 public SpringBootKafkaProducer() {
29 }
30
31 @PostConstruct
32 public void initIt() {
33 Properties kafkaProps = new Properties();
34
35 kafkaProps.put("bootstrap.servers", brokerList);
36
37 kafkaProps.put("key.serializer",
38 "org.apache.kafka.common.serialization.StringSerializer");
39 kafkaProps.put("value.serializer",
40 "org.apache.kafka.common.serialization.StringSerializer");
41 kafkaProps.put("acks", "1");
42
43 kafkaProps.put("retries", "1");
44 kafkaProps.put("linger.ms", 5);
45
46 producer = new KafkaProducer<>(kafkaProps);
47
48 }
49
50 public void send(String value) throws ExecutionException,
51 InterruptedException {
52 if ("sync".equalsIgnoreCase(sync)) {
53 sendSync(value);
54 } else {
55 sendAsync(value);
56 }
57 }
58
59 private void sendSync(String value) throws ExecutionException,
60 InterruptedException {
61 ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
62 producer.send(record).get();
63
64 }
65
66 private void sendAsync(String value) {
67 ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
68
69 producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
70 if (e != null) {
71 e.printStackTrace();
72 }
73 });
74 }
75 }
SpringBootKafkaExampleApplication
This one will be automatically generated.
1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class SpringBootKafkaExampleApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
11 }
12}
AppBeans
Setup beans for the controller.
1 package com.example;
2
3 import org.springframework.context.annotation.Bean;
4 import org.springframework.context.annotation.Configuration;
5
6 @Configuration
7 public class AppBeans {
8
9 @Bean
10 public SpringBootKafkaProducer initProducer() {
11 return new SpringBootKafkaProducer();
12 }
13}
Helper beans
Status to return to clients, we'll just send "ok" every time.
1 package com.example;
2
3 public class Status {
4 private String status;
5
6 public Status(String status) {
7 this.status = status;
8 }
9
10 public Status() {
11 }
12
13 public String getStatus() {
14 return status;
15 }
16
17 public void setStatus(String status) {
18 this.status = status;
19 }
20}
This will be the input to our app
1package com.example;
2
3public class Vote {
4 private String name;
5
6 public Vote(String name) {
7 this.name = name;
8 }
9
10 public Vote() {
11 }
12
13 public String getName() {
14 return name;
15 }
16
17 public void setName(String name) {
18 this.name = name;
19 }
20}
SpringBootKafkaController
This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote
1 package com.example;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.context.annotation.Configuration;
5 import org.springframework.web.bind.annotation.RequestBody;
6 import org.springframework.web.bind.annotation.RequestMapping;
7 import org.springframework.web.bind.annotation.RestController;
8
9 import java.util.concurrent.ExecutionException;
10
11 @RestController
12 public class SpringBootKafkaController {
13
14 @Autowired
15 SpringBootKafkaProducer springBootKafkaProducer;
16
17 @RequestMapping("/vote")
18 public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {
19
20 springBootKafkaProducer.send(vote.getName());
21
22 return new Status("ok");
23 }
24
25 }
Checking everything
There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote
1 {
2 "name": "Test"
3 }
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.
Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
Cassandra
Nothing fancy here, just a name of the entity for votes and a number of votes
1CREATE KEYSPACE voting
2 WITH REPLICATION = {
3 'class' : 'SimpleStrategy',
4 'replication_factor' : 1
5 };
6
7USE voting;
8
9CREATE TABLE votes (name text PRIMARY KEY, votes int);
Let's create a simple java project with gradle for stream processing
- File, New Project, Gradle
- Project SDK: Java 8
- Java
- Next
- GroupId: spark-kafka-streaming-example
- ArtifactId: spark-kafka-streaming-example
- Version: 1.0-SNAPSHOT
- Next
- Use default gradle wrapper
- Next
- Project name: spark-kafka-streaming-example
- The rest is just fine ...
- Finish
- After creating project check sdk setting, it should be java 8
Let's have a look at the dependencies
1 group 'spark-kafka-streaming-example'
2 version '1.0-SNAPSHOT'
3
4 apply plugin: 'java'
5
6 sourceCompatibility = 1.8
7
8 repositories {
9 mavenCentral()
10 }
11
12 dependencies {
13 compile('org.apache.spark:spark-core_2.10:1.5.2')
14 compile('org.apache.spark:spark-streaming_2.10:1.5.2')
15 compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
16 compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
17 compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')
18
19 testCompile group: 'junit', name: 'junit', version: '4.11'
20 }
Simple Voting Class to go with Cassandra Table
We'll use this class for storing data into cassandra
1 import java.io.Serializable;
2
3public class Vote implements Serializable {
4 private String name;
5 private Integer votes;
6
7 public Vote(String name, Integer votes) {
8 this.name = name;
9 this.votes = votes;
10 }
11
12 public Vote() {
13 }
14
15 public String getName() {
16 return name;
17 }
18
19 public void setName(String name) {
20 this.name = name;
21 }
22
23 public Integer getVotes() {
24 return votes;
25 }
26
27 public void setVotes(Integer votes) {
28 this.votes = votes;
29 }
30 }
Spark streaming with kafka
And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:
1import com.datastax.spark.connector.japi.CassandraRow;
2import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
3import kafka.serializer.StringDecoder;
4import org.apache.spark.SparkConf;
5import org.apache.spark.api.java.JavaRDD;
6import org.apache.spark.api.java.JavaSparkContext;
7import org.apache.spark.api.java.function.Function;
8import org.apache.spark.api.java.function.Function2;
9import org.apache.spark.api.java.function.PairFunction;
10import org.apache.spark.streaming.Durations;
11import org.apache.spark.streaming.api.java.JavaDStream;
12import org.apache.spark.streaming.api.java.JavaPairDStream;
13import org.apache.spark.streaming.api.java.JavaPairInputDStream;
14import org.apache.spark.streaming.api.java.JavaStreamingContext;
15import org.apache.spark.streaming.kafka.KafkaUtils;
16import scala.Tuple2;
17
18import java.io.IOException;
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.List;
23
24import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
25import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
26
27public class SparkStreamingExample {
28
29 public static JavaSparkContext sc;
30
31 public static void main(String[] args) throws IOException {
32
33 String brokers = "localhost:9092,localhost:9093";
34 String topics = "votes";
35
36 SparkConf sparkConf = new SparkConf();
37 sparkConf.setMaster("local[2]");
38 sparkConf.setAppName("SparkStreamingExample");
39 sparkConf.set("spark.cassandra.connection.host",
40 "127.0.0.1");
41
42 JavaStreamingContext jssc = new JavaStreamingContext(
43 sparkConf,
44 Durations.seconds(10));
45
46 HashSet<String> topicsSet = new HashSet<>(
47 Arrays.asList(topics.split(",")));
48 HashMap<String, String> kafkaParams = new HashMap<>();
49 kafkaParams.put("metadata.broker.list", brokers);
50
51 JavaPairInputDStream<String, String> messages =
52 KafkaUtils.createDirectStream(
53 jssc,
54 String.class,
55 String.class,
56 StringDecoder.class,
57 StringDecoder.class,
58 kafkaParams,
59 topicsSet
60 );
61
62 JavaDStream<String> lines =
63 messages.map(
64 (Function<Tuple2
65 <String, String>,
66 String>) Tuple2::_2);
67
68 JavaPairDStream<String, Integer> voteCount = lines
69 .mapToPair(
70 (PairFunction<String, String, Integer>) s ->
71 new Tuple2<>(s, 1)).reduceByKey(
72 (Function2<Integer, Integer, Integer>)
73 (i1, i2) ->i1 + i2);
74
75 sc = jssc.sparkContext();
76
77 voteCount.foreachRDD((v1, v2) -> {
78 v1.foreach((x) -> {
79 CassandraTableScanJavaRDD<CassandraRow> previousVotes =
80 javaFunctions(sc)
81 .cassandraTable("voting", "votes")
82 .where("name = '" + x._1() + "'");
83
84 Integer oldVotes = 0;
85 if (previousVotes.count() > 0) {
86 oldVotes =
87 previousVotes.first().getInt("votes");
88 }
89
90 Integer newVotes = oldVotes + x._2();
91
92 List<Vote> votes = Arrays.asList(
93 new Vote(x._1(), newVotes));
94 JavaRDD<Vote> rdd = sc.parallelize(votes);
95
96 javaFunctions(rdd)
97 .writerBuilder("voting", "votes", mapToRow(Vote.class))
98 .saveToCassandra();
99 });
100
101 return null;
102 });
103
104 voteCount.print();
105
106 jssc.start();
107 jssc.awaitTermination();
108 }
109}
And that's it'