Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3 & 4

W1siziisijiwmtgvmdyvmdcvmtqvmzcvmzkvntu5l3blegvscy1wag90by00nja2mjeuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

This is part 3 and part 4 from the series of blogs from Marko Švaljek regarding Stream Processing With Spring, Kafka, Spark and Cassandra. If you missed part 1 and part 2 read it here. 

 

'Part 3 - Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

 

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8
 

build.gradle dependencies

1 compile('org.apache.kafka:kafka_2.11:0.9.0.0')
2 compile('org.apache.zookeeper:zookeeper:3.4.7')

 

application.properties

1 brokerList=localhost:9092
2 sync=sync
3 topic=votes

 

SpringBootKafkaProducer

This is the class where all the important stuff is happening

1 package com.example;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.clients.producer.RecordMetadata;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.context.annotation.Configuration;
9
10 import javax.annotation.PostConstruct;
11 import java.util.Properties;
12 import java.util.concurrent.ExecutionException;
13
14 @Configuration
15 public class SpringBootKafkaProducer {
16
17    @Value("${brokerList}")
18    private String brokerList;
19
20    @Value("${sync}")
21    private String sync;
22
23    @Value("${topic}")
24    private String topic;
25
26    private Producer<String, String> producer;
27
28    public SpringBootKafkaProducer() {
29    }
30
31    @PostConstruct
32    public void initIt() {
33        Properties kafkaProps = new Properties();
34
35        kafkaProps.put("bootstrap.servers", brokerList);
36
37        kafkaProps.put("key.serializer", 
38             "org.apache.kafka.common.serialization.StringSerializer");
39        kafkaProps.put("value.serializer", 
40            "org.apache.kafka.common.serialization.StringSerializer");
41        kafkaProps.put("acks", "1");
42
43        kafkaProps.put("retries", "1");
44        kafkaProps.put("linger.ms", 5);
45
46        producer = new KafkaProducer<>(kafkaProps);
47
48    }
49
50    public void send(String value) throws ExecutionException,
51            InterruptedException {
52        if ("sync".equalsIgnoreCase(sync)) {
53            sendSync(value);
54        } else {
55            sendAsync(value);
56        }
57    }
58
59    private void sendSync(String value) throws ExecutionException,
60            InterruptedException {
61        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
62        producer.send(record).get();
63
64    }
65
66    private void sendAsync(String value) {
67        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
68
69        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
70            if (e != null) {
71                e.printStackTrace();
72            }
73        });
74    }
75 }

 

SpringBootKafkaExampleApplication

This one will be automatically generated.

1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class SpringBootKafkaExampleApplication {
8
9    public static void main(String[] args) {
10        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
11    }
12}

 

AppBeans

Setup beans for the controller.

1 package com.example;
2
3 import org.springframework.context.annotation.Bean;
4 import org.springframework.context.annotation.Configuration;
5
6 @Configuration
7 public class AppBeans {
8
9    @Bean
10    public SpringBootKafkaProducer initProducer() {
11        return new SpringBootKafkaProducer();
12    }
13}

 

Helper beans

Status to return to clients, we'll just send "ok" every time.

1 package com.example;
2
3 public class Status {
4    private String status;
5
6    public Status(String status) {
7        this.status = status;
8    }
9
10    public Status() {
11    }
12
13    public String getStatus() {
14        return status;
15    }
16
17    public void setStatus(String status) {
18        this.status = status;
19    }
20}

This will be the input to our app

1package com.example;
2
3public class Vote {
4    private String name;
5
6    public Vote(String name) {
7        this.name = name;
8    }
9
10    public Vote() {
11    }
12
13    public String getName() {
14        return name;
15    }
16
17    public void setName(String name) {
18        this.name = name;
19    }
20}

 

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

1 package com.example;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.context.annotation.Configuration;
5 import org.springframework.web.bind.annotation.RequestBody;
6 import org.springframework.web.bind.annotation.RequestMapping;
7 import org.springframework.web.bind.annotation.RestController;
8
9 import java.util.concurrent.ExecutionException;
10
11 @RestController
12 public class SpringBootKafkaController {
13
14    @Autowired
15    SpringBootKafkaProducer springBootKafkaProducer;
16
17    @RequestMapping("/vote")
18    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {
19
20        springBootKafkaProducer.send(vote.getName());
21
22        return new Status("ok");
23    }
24
25 }

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

1 {
2    "name": "Test"
3 }

If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

 

Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.
 

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes

1CREATE KEYSPACE voting
2    WITH REPLICATION = {
3        'class' : 'SimpleStrategy',
4        'replication_factor' : 1
5    };
6
7USE voting;
8
9CREATE TABLE votes (name text PRIMARY KEY, votes int);

 

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies

1 group 'spark-kafka-streaming-example'
2 version '1.0-SNAPSHOT'
3
4 apply plugin: 'java'
5
6 sourceCompatibility = 1.8
7
8 repositories {
9    mavenCentral()
10 }
11
12 dependencies {
13    compile('org.apache.spark:spark-core_2.10:1.5.2')
14    compile('org.apache.spark:spark-streaming_2.10:1.5.2')
15    compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
16    compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
17    compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')
18
19    testCompile group: 'junit', name: 'junit', version: '4.11'
20 }

 

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra

1 import java.io.Serializable;
2
3public class Vote implements Serializable {
4    private String name;
5    private Integer votes;
6
7    public Vote(String name, Integer votes) {
8        this.name = name;
9        this.votes = votes;
10    }
11
12    public Vote() {
13    }
14
15    public String getName() {
16        return name;
17    }
18
19    public void setName(String name) {
20        this.name = name;
21    }
22
23    public Integer getVotes() {
24        return votes;
25    }
26
27    public void setVotes(Integer votes) {
28        this.votes = votes;
29    }
30 }

 

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:

1import com.datastax.spark.connector.japi.CassandraRow;
2import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
3import kafka.serializer.StringDecoder;
4import org.apache.spark.SparkConf;
5import org.apache.spark.api.java.JavaRDD;
6import org.apache.spark.api.java.JavaSparkContext;
7import org.apache.spark.api.java.function.Function;
8import org.apache.spark.api.java.function.Function2;
9import org.apache.spark.api.java.function.PairFunction;
10import org.apache.spark.streaming.Durations;
11import org.apache.spark.streaming.api.java.JavaDStream;
12import org.apache.spark.streaming.api.java.JavaPairDStream;
13import org.apache.spark.streaming.api.java.JavaPairInputDStream;
14import org.apache.spark.streaming.api.java.JavaStreamingContext;
15import org.apache.spark.streaming.kafka.KafkaUtils;
16import scala.Tuple2;
17
18import java.io.IOException;
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.List;
23
24import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
25import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
26
27public class SparkStreamingExample {
28
29    public static JavaSparkContext sc;
30
31    public static void main(String[] args) throws IOException {
32
33        String brokers = "localhost:9092,localhost:9093";
34        String topics = "votes";
35
36        SparkConf sparkConf = new SparkConf();
37        sparkConf.setMaster("local[2]");
38        sparkConf.setAppName("SparkStreamingExample");
39        sparkConf.set("spark.cassandra.connection.host",
40            "127.0.0.1");
41
42        JavaStreamingContext jssc = new JavaStreamingContext(
43            sparkConf,
44            Durations.seconds(10));
45
46        HashSet<String> topicsSet = new HashSet<>(
47                Arrays.asList(topics.split(",")));
48        HashMap<String, String> kafkaParams = new HashMap<>();
49        kafkaParams.put("metadata.broker.list", brokers);
50
51        JavaPairInputDStream<String, String> messages =
52                KafkaUtils.createDirectStream(
53                        jssc,
54                        String.class,
55                        String.class,
56                        StringDecoder.class,
57                        StringDecoder.class,
58                        kafkaParams,
59                        topicsSet
60                );
61
62        JavaDStream<String> lines =
63                messages.map(
64                        (Function<Tuple2
65                        <String, String>,
66                        String>) Tuple2::_2);
67
68        JavaPairDStream<String, Integer> voteCount = lines
69            .mapToPair(
70                (PairFunction<String, String, Integer>) s ->
71                        new Tuple2<>(s, 1)).reduceByKey(
72                (Function2<Integer, Integer, Integer>)
73                    (i1, i2) ->i1 + i2);
74
75        sc = jssc.sparkContext();
76
77        voteCount.foreachRDD((v1, v2) -> {
78            v1.foreach((x) -> {
79                CassandraTableScanJavaRDD<CassandraRow> previousVotes =
80                    javaFunctions(sc)
81                        .cassandraTable("voting", "votes")
82                        .where("name = '" + x._1() + "'");
83
84                Integer oldVotes = 0;
85                if (previousVotes.count() > 0) {
86                    oldVotes = 
87                        previousVotes.first().getInt("votes");
88                }
89
90                Integer newVotes = oldVotes + x._2();
91
92                List<Vote> votes = Arrays.asList(
93                    new Vote(x._1(), newVotes));
94                JavaRDD<Vote> rdd = sc.parallelize(votes);
95
96                javaFunctions(rdd)
97                    .writerBuilder("voting", "votes", mapToRow(Vote.class))
98                    .saveToCassandra();
99            });
100
101            return null;
102        });
103
104        voteCount.print();
105
106        jssc.start();
107        jssc.awaitTermination();
108    }
109}

And that's it'

 

Stay tuned for Part 5.

 
This article was written by Marko Švaljek and originally posted onto msvaljek.blogspot.com