This is part 5 from the series of blogs from Marko Švaljek regarding Stream Processing With Spring, Kafka, Spark and Cassandra. If you missed part 3 and part 4 read it here.
'Part 5 - Displaying Cassandra Data With Spring Boot
Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.
Setting up a project
- Project SDK: Java 8
- Initializr Service URL: https://start.spring.io
- Next
- Name: boot-cassandra-data-show
- Type: Gradle Project
- Packaging: Jar
- Java Version: 1.8
- Language: Java
- Group: com.example
- Artifact: boot-cassandra-data-show
- Vesion: 0.0.1-SNAPSHOT
- Description: Spring Boot Display Cassandra Data
- Package: com.example
- Next
- Spring Boot Version: 1.3
- Core - Web
- Template Engines - Mustache
- Next
- Project name: boot-cassandra-data-show
- The rest is just fine ...
- Finish
- After creating project check sdk setting, it should be java 8
Cassandra dependencies
1 compile('com.datastax.cassandra:cassandra-driver-core:2.1.9')
Vote class
We'll use this class to map rows from cassandra.
1 package com.example;
2
3 import java.io.Serializable;
4
5 public class Vote implements Serializable {
6 private String name;
7 private Integer votes;
8
9 public Vote(String name, Integer votes) {
10 this.name = name;
11 this.votes = votes;
12 }
13
14 public Vote() {
15 }
16
17 public String getName() {
18 return name;
19 }
20
21 public void setName(String name) {
22 this.name = name;
23 }
24
25 public Integer getVotes() {
26 return votes;
27 }
28
29 public void setVotes(Integer votes) {
30 this.votes = votes;
31 }
32 }
application.properties
1 server.port = 8090
2 contactPoint = 127.0.0.1
3 keyspace = voting
CassandraSessionManager
This bean is used to setup connection towards Cassandra
1 package com.example;
2
3 import com.datastax.driver.core.Cluster;
4 import com.datastax.driver.core.Session;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.context.annotation.Configuration;
7
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10
11 @Configuration
12 public class CassandraSessionManager {
13
14 private Session session;
15 private Cluster cluster;
16
17 @Value("${contactPoint}")
18 private String contactPoint;
19
20 @Value("${keyspace}")
21 private String keyspace;
22
23 public CassandraSessionManager() {
24
25 }
26
27 public Session getSession() {
28 return session;
29 }
30
31 @PostConstruct
32 public void initIt() {
33 cluster = Cluster.builder().addContactPoint(
34 contactPoint).build();
35 session = cluster.connect(keyspace);
36 }
37
38 @PreDestroy
39 public void destroy() {
40 if (session != null) {
41 session.close();
42 }
43 if (cluster != null) {
44 cluster.close();
45 }
46 }
47}
BootCassandraDataShowApplication
Automatically generated ...
1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class BootCassandraDataShowApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(
11 BootCassandraDataShowApplication.class, args);
12 }
13 }
AppBeans
Bean for holding configured objects.
1 package com.example;
2
3 import com.datastax.driver.core.Session;
4 import org.springframework.context.annotation.Bean;
5 import org.springframework.context.annotation.Configuration;
6
7 @Configuration
8 public class AppBeans {
9
10 @Bean
11 public Session session() {
12 return sessionManager().getSession();
13 }
14
15 @Bean
16 public CassandraSessionManager sessionManager() {
17 return new CassandraSessionManager();
18 }
19 }
Web Controller
1 package com.example;
2
3 import com.datastax.driver.core.ResultSet;
4 import com.datastax.driver.core.Row;
5 import com.datastax.driver.core.Session;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.stereotype.Controller;
9 import org.springframework.web.bind.annotation.RequestMapping;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.Map;
14
15 @Configuration
16 @Controller
17 public class WelcomeController {
18
19 @Autowired
20 Session session;
21
22 @RequestMapping("/")
23 public String welcome(Map<String, Object> model) {
24
25 final ResultSet rows = session.execute("SELECT * FROM votes");
26
27 ArrayList<vote> results = new ArrayList<>();
28
29 for (Row row : rows.all()) {
30 results.add(new Vote(
31 row.getString("name"),
32 row.getInt("votes")
33 ));
34 }
35
36 Collections.sort(results, (a, b) ->
37 b.getVotes().compareTo(a.getVotes()));
38
39 model.put("results", results);
40
41 return "welcome";
42 }
43 }
44 </vote>
Template to show the results
1 <!DOCTYPE html>
2 <html lang="en">
3 <body>
4
5 <h1>Voting results:</h1>
6
7
8 {{#results}}
9 <strong>{{this.name}}</strong> {{this.votes}}
10
11 {{/results}}
12
13 </body>
14 </html>
That's all folks
Now this app might not seem as a lot, but there's a kafka cluster that receives messages coming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days.'