Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Hello Batch Processing with Apache Flink by Marko Švaljek

W1siziisijiwmtgvmdyvmjivmtmvndgvmjcvmtqwl3blegvscy1wag90by0zntu5ndguanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

It's Saturday! Today we have a learn from Marko Švaljek on 'Hello Batch Processing with Apache Flink' where he discusses his motivation and how he then got started. We hope you enjoy reading.

 

'Just recently I had a chance to visit the Big Data, Berlin v 10.0 meetup. I really liked a very inspiring talk from Stephan Ewen about Apache Flink. Basically I'm aware that technology is around for some time now and that it's used by some very big players.

More or less this technology is about streaming but what I liked is that it doesn't perform that bad in batch operations too. Again there are better technologies to do it, but since I really liked the talk (and one slide was about flink not totally sucking in batch). I decided to write a blog post on Apache Flink in a simple case of batch processing. Just as a heads up this technology is super new to me and I just started to get my feet wet but it might be useful to people that want to try it out.

 

Motivation

For the past 2-3 years I was involved in a very cool IOT project that was mostly about vehicle telemetry data. Since the beginning of the year I started to gather telemetry data about my own car drives. My approach to the problem was pretty much simple and totally uncool. I just wanted to have some telemetry data, so I gathered it by using a simple Android app GPSLogger.

There was no on line processing just CSV files that I sent to my Google Drive after I ended the drives. I didn't know what I'm going to do with the data. And then during the talk mentioned in the introduction it just hit me that I might do a heat map of all the places where my car was not moving. The idea was kind of semi cool but It's worth a shot. Just as a side note I won't release the data set to public because it's private data. But I guess data about my car not moving might identify possible bottle necks in the Croatian and/or Zagreb's traffic.

 

Getting started

To kick start my Hello Apache Flink project I used giter. I guess you could find out a lot more about this on line and that it's pretty much a topic of it's own. I'll just stick with the commands to get you trough the this blog post.
1 $ brew install giter8
2 $ g8 tillrohrmann/flink-project

You can fill in your own data or use something on the lines of:

1 A Flink Application Project Using sbt
2
3 name [Flink Project]: testinglocations
4 organization [org.example]: com.msvaljek
5 version [0.1-SNAPSHOT]:
6 scala_version [2.11.7]:
7 flink_version [1.1.4]:
8
9 Template applied in ./testinglocations

I guess I'm opinionated when it comes to using IDE. I heavily rely on IntelliJ Idea. Basically importing project is done very simple ...

1 File -> New -> Project from Existing Sources...
2
3 sbt project ...

Navigate the sources a bit, you will find very interesting examples. WordCount is one of them. It's the usual hello world in the Big Data ecosystem. Try to run it and you will get an exception. After you see the exception, simply go to the run configuration that was created by the IntelliJ and modify it so that you can try out the WordCount example. You have to do following:

1 Run -> Edit Configurations... 
2
3 and then choose mainRunner from the Use classpath 
4
5 of module dropbox of your current configuration.

You will need to repeat this setup step for the class we are going to create.
 

Filtering out locations with Apache Flink

The code that I used is pretty straight forward, just create FindAndGroupStandingLocations object and run it by applying previously mentioned file:

package com.msvaljek
 
import java.io.File
 
import org.apache.flink.api.scala._
 
object FindAndGroupStandingLocations {
 
  def main(args: Array[String]) {
 
    val dir = "/examples/scala/flink/GPSLogger/"
 
    val env = ExecutionEnvironment.getExecutionEnvironment
 
    val files = getListOfFiles(
     new File(dir), List("csv")).map(_.getAbsolutePath)
 
    // sorry line a bit long, but I need it :)
    getAllStandingLocations(env, files)
      .map(loc => 
           s"new ol.Feature({geometry: new ol.geom.Point(ol.proj.fromLonLat([${loc.lon}, ${loc.lat}]))}),")
      .print()
 
    // you could also map it some simple json
    //s"{lat: ${loc.lat}, lon: ${loc.lon} },
  }
 
  // some small snippet to help us load all the csv files
  def getListOfFiles(
   dir: File, extensions: List[String]): List[File] = {
   
    dir.listFiles.filter(_.isFile).toList.filter { file =>
      extensions.exists(file.getName.endsWith(_))
    }
  }
 
  def getAllStandingLocations(env: ExecutionEnvironment,
        files: List[String]): DataSet[Location] = {
   
    def traverseStandingLocations(
     fileList: List[String], locations: DataSet[Location]
    ): DataSet[Location] = fileList match {
        case h :: t =>
          locations.union(
            traverseStandingLocations(
             t, getStandingLocations(env, h)))
        case Nil =>
          locations
      }
 
    traverseStandingLocations(
     files, env.fromElements[Location]())
  }
 
  def getStandingLocations(
   env: ExecutionEnvironment, filePath: String
  ): DataSet[Location] = {
    //csv file format
    //_1=time; _2=lat; _3=lon; _4=elevation; _5=accuracy; 
    //_6=bearing; _7=speed; _8=satellites; _9=provider
 
    env.readCsvFile[Tuple9[String, String, String, String, String, String, String, String, String]](
      filePath = filePath,
      fieldDelimiter = ";",
      ignoreFirstLine = true
    )
    // just locations where we don't move
    .filter(line => strToDouble(line._7) == 0)
    // just coordinates
    .map(line => 
         Location(strToDouble(line._2),
                  strToDouble(line._3)))
  }
 
  // I had files in european csv format
  // meaning doubles are separated by ,
  // so out of the box parsing didn't work
  def strToDouble(a: String): Double = 
    a.replace(',', '.').toDouble
 
  // nothing too fancy just interested into coordinates
  case class Location(lat: Double, lon: Double)
}

I'll just provide a small snippet of how a csv file looks like:

time;lat;lon;elevation;accuracy;bearing;speed;satellites;provider
2016-04-07T04:09:43Z;46,156319;15,87539;253;10;61,900002;0,374433;13;gps
2016-04-07T04:10:50Z;46,156273;15,875374;223;10;132,800003;2,146719;17;gps
2016-04-07T04:10:57Z;46,156045;15,875469;226;5;120,099998;3,563846;9;gps

 

The Result

Here is the result, standing points shown on a map:

 
 

Observations

In the beginning I tried to follow the hello world Apache Flink examples, they kind of didn't work. I guess it's due to a very high dynamics with project development. Basically some stuff like mentioned in the official docs is not working at all:

1 curl https://raw.githubusercontent.com/apache/incubator-flink/master/flink-quickstart/quickstart.sh | bash

The template I used to kick start a project contains references in the generated code to pages like http://flink.apache.org/docs/latest/examples.html but the links where it's pointing are not there (as you can see if you click previous link).

I had trouble with parsing european csv format so I had to write my own parsing methods for floating point numbers and read everything as a string from file

I couldn't persuade Flink the output the results to a file, so I just used print and copy pasted everything from a console output into OpenLayers map.

I guess I could do pull requests to resolve some of this stuff but point of least resistance is just mentioning it here. So I'll take it.'

 

This article was written by Marko Švaljek and originally posted on msvaljek.blogspot.com