Connecting...

Pexels Photo 460621

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 5 by Marko Švaljek

Pexels Photo 460621

This is part 5 from the series of blogs from Marko Švaljek regarding Stream Processing With Spring, Kafka, Spark and Cassandra. If you missed part 3 and part 4 read it here.

 

'Part 5 - Displaying Cassandra Data With Spring Boot

Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: boot-cassandra-data-show
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: boot-cassandra-data-show
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Display Cassandra Data
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Template Engines - Mustache
  18. Next
  19. Project name: boot-cassandra-data-show
  20. The rest is just fine ...
  21. Finish
  22. After creating project check sdk setting, it should be java 8
 

Cassandra dependencies

1 compile('com.datastax.cassandra:cassandra-driver-core:2.1.9') 

 

Vote class

We'll use this class to map rows from cassandra.

1 package com.example;
2
3 import java.io.Serializable;
4
5 public class Vote implements Serializable {
6    private String name;
7    private Integer votes;
8
9    public Vote(String name, Integer votes) {
10        this.name = name;
11        this.votes = votes;
12    }
13
14    public Vote() {
15    }
16
17    public String getName() {
18        return name;
19    }
20
21    public void setName(String name) {
22        this.name = name;
23    }
24
25    public Integer getVotes() {
26        return votes;
27    }
28
29    public void setVotes(Integer votes) {
30        this.votes = votes;
31    }
32 }

 

application.properties

1 server.port = 8090
2 contactPoint = 127.0.0.1
3 keyspace = voting

 

CassandraSessionManager

This bean is used to setup connection towards Cassandra
 

1 package com.example;
2
3 import com.datastax.driver.core.Cluster;
4 import com.datastax.driver.core.Session;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.context.annotation.Configuration;
7
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10
11 @Configuration
12 public class CassandraSessionManager {
13
14    private Session session;
15    private Cluster cluster;
16
17    @Value("${contactPoint}")
18    private String contactPoint;
19
20    @Value("${keyspace}")
21    private String keyspace;
22
23    public CassandraSessionManager() {
24
25    }
26
27    public Session getSession() {
28        return session;
29    }
30
31    @PostConstruct
32    public void initIt() {
33        cluster = Cluster.builder().addContactPoint(
34            contactPoint).build();
35        session = cluster.connect(keyspace);
36    }
37
38    @PreDestroy
39    public void destroy() {
40        if (session != null) {
41            session.close();
42        }
43        if (cluster != null) {
44            cluster.close();
45        }
46    }
47}

 

BootCassandraDataShowApplication

Automatically generated ...

1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class BootCassandraDataShowApplication {
8
9    public static void main(String[] args) {
10        SpringApplication.run(
11        BootCassandraDataShowApplication.class, args);
12    }
13 }

 

AppBeans

Bean for holding configured objects.

 

1 package com.example;
2
3 import com.datastax.driver.core.Session;
4 import org.springframework.context.annotation.Bean;
5 import org.springframework.context.annotation.Configuration;
6
7 @Configuration
8 public class AppBeans {
9
10    @Bean
11    public Session session() {
12        return sessionManager().getSession();
13    }
14
15    @Bean
16    public CassandraSessionManager sessionManager() {
17        return new CassandraSessionManager();
18    }
19 }

 

Web Controller

1 package com.example;
2
3 import com.datastax.driver.core.ResultSet;
4 import com.datastax.driver.core.Row;
5 import com.datastax.driver.core.Session;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.stereotype.Controller;
9 import org.springframework.web.bind.annotation.RequestMapping;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.Map;
14
15 @Configuration
16 @Controller
17 public class WelcomeController {
18
19    @Autowired
20    Session session;
21
22    @RequestMapping("/")
23    public String welcome(Map<String, Object> model) {
24
25        final ResultSet rows = session.execute("SELECT * FROM votes");
26
27        ArrayList<vote> results = new ArrayList<>();
28
29        for (Row row : rows.all()) {
30            results.add(new Vote(
31                row.getString("name"),
32                row.getInt("votes")
33            ));
34        }
35
36        Collections.sort(results, (a, b) ->
37        b.getVotes().compareTo(a.getVotes()));
38
39        model.put("results", results);
40
41        return "welcome";
42    }
43 }
44    </vote>

 

Template to show the results

1 <!DOCTYPE html>
2 <html lang="en">
3 <body>
4
5 <h1>Voting results:</h1>
6
7
8 {{#results}}
9    <strong>{{this.name}}</strong> {{this.votes}} 
10
11 {{/results}}
12
13 </body>
14 </html>

 

That's all folks

Now this app might not seem as a lot, but there's a kafka cluster that receives messages coming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days.'

 

This article was written by Marko Švaljek and originally posted onto msvaljek.blogspot.com