Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Spark Custom Stream Sources by Nicolas A Perez

W1siziisijiwmtkvmdevmtgvmtyvntkvmdqvnzcwl3blegvscy1wag90by03nje1ndcuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

How can we extend this new API so we can use our own streaming sources? Find the answer to this question on this interesting article from Software Data Engineer Nicolas A Perez where he will tell us all about Spark!

 

'Apache Spark is one of the most versatile big data frameworks out there. The ability to mix different kinds of workloads, in memory processing and functional style makes it desirable for anyone coming to code in the data processing world.

One important aspect of Spark is that is has been built for extensibility. Writing new connectors for the RDD API or extending the DataFrame/DataSet API allows third parties to integrate with Spark with easy. Most people will use one of the built-in API, such as Kafka for streams processing or JSON / CVS for file processing. However, there are times where we need more specific implementations, closer to us. For example, we might have a proprietary database we use in our company and there will not be a connector for it. We can simply write one as we explained in this previous post (Spark Data Source API. Extending Our Spark SQL Query Engine).

Starting on Spark 2.0, we can create sources from streams, which gave life to the Spark Structured Streaming API. As we could imagine, there are some built-in streaming sources, being Kafka one of them, alongside FileStreamSource, TextSocketSource, etc…

Using the new Structured Streaming API should be preferred over the old DStreams. However, the same problem than before presents again. How can we extend this new API so we can use our own streaming sources? The answer to this question is on this post.

 

Extensibility Points

Let’s start by reviewing the main components that we need to touch in order to create our own streaming source.

First of all, StreamSourceProvider is what indicates what source will be used as the stream reader.

Secondly, DataSourceRegister will allow us to register our source within Spark so it becomes available to the stream processing.

Thirdly, Source is the interface that we need to implement so we provide streaming source like behavior.

 

Our Streaming Source

For the sake of this post, we will implement a rather easy streaming source, but the same concepts apply to any streaming source that you need to implement your own.

Our streaming source is called InMemoryRandomStrings. It basically generates a sequence of random strings and their length that are viewed as a DataFrame.

Since we want to keep it simple, we will store the batches in memory and discard them when the process is done. InMemoryRandomStrings is not fault tolerant since data is generated at the processing time in contrast to the built-in Kafka Source where data actually lives in a Kafka cluster.

We can start by defining our StreamSourceProvider which defines how our Source is created.

1 class DefaultSource extends StreamSourceProvider with DataSourceRegister {
2
3   override def sourceSchema(sqlContext: SQLContext,
4                             schema: Option[StructType],
5                             providerName: String,
6                             parameters: Map[String, String]): (String, StructType) = {
7     
8     (shortName(), InMemoryRandomStrings.schema)
9   }
10
11   override def createSource(sqlContext: SQLContext,
12                             metadataPath: String,
13                             schema: Option[StructType],
14                             providerName: String,
15                             parameters: Map[String, String]): Source = {
16     
17     new InMemoryRandomStrings(sqlContext)
18   }
19
20   override def shortName(): String = "InMemoryRandomStrings"
21 }

The class DefaultSource is our StreamSourceProvider and we need to implement the two required functions, sourceSchema and createSource.

InMemoryRandomStrings.schema is the fixed schema we are going to use for the example, but the schema can be dynamically passed in.

The createSource function then returns an instance of InMemoryRandomStrings that is our actual Source.

 

InMemoryRandomStrings

Now, let’s see InMemoryRandomStrings code in parts so we can focus on all the details.

1 class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
2  
3   override def schema: StructType = InMemoryRandomStrings.schema
4 
5   override def getOffset: Option[Offset] = ???
6
7   override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???
8  
9   override def commit(end: Offset): Unit = ???
10
11   override def stop(): Unit = ???
12 }
13
14 object InMemoryRandomStrings {
15
16   lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))
17
18 }

'schema' returns the schema that our source uses, in our case, we know that the schema is fixed.

'getOffset' should return the latest offset seen by our source.

1 class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
2  private var offset: LongOffset = LongOffset(-1)
3  
4   override def schema: StructType = InMemoryRandomStrings.schema
5
6   override def getOffset: Option[Offset] = this.synchronized {
7     println(s"getOffset: $offset")
8
9     if (offset.offset == -1) None else Some(offset)
10   }
11
12   override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???
13  
14   override def commit(end: Offset): Unit = ???
15
16   override def stop(): Unit = ???
17 }
18
19 object InMemoryRandomStrings {
20
21   lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))
22
23 }

Notice that we added a variable called 'offset' that will keep track of the seen data. Then we return 'None' if our source has never seen any data, 'Some(offset)' otherwise.

Now, let’s see how our source can produce some data, we will use a running thread for it. Please, notice the dataGeneratorStartingThread function.

1 class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
2   private var offset: LongOffset = LongOffset(-1)
3  
4   private var batches = collection.mutable.ListBuffer.empty[(String, Long)]
5
6   private val incrementalThread = dataGeneratorStartingThread()
7  
8   override def schema: StructType = InMemoryRandomStrings.schema
9
10   override def getOffset: Option[Offset] = this.synchronized {
11     println(s"getOffset: $offset")
12
13     if (offset.offset == -1) None else Some(offset)
14   }
15
16   override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???
17  
18   override def commit(end: Offset): Unit = ???
19 
20   override def stop(): Unit = incrementalThread.stop()
21  
22   private def dataGeneratorStartingThread() = {
23     val t = new Thread("increment") {
24       setDaemon(true)
25       override def run(): Unit = {
26 
27         while (true) {
28           try {
29             this.synchronized {
30               offset = offset + 1
31 
32               val value = Random.nextString(Random.nextInt(5))
33 
34               batches.append((value, offset.offset))
35             }
36           } catch {
37             case e: Exception => println(e)
38           }
39 
40           Thread.sleep(100)
41         }
42       }
43 
44     }
45 
46     t.start()
47 
48     t
49   }
50 }
51
52 object InMemoryRandomStrings {
53 
54   lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))
55
56 }

In here we have added a thread that generates random values and increments the offset while storing the value and offset on an internal buffer. The thread starts running as soon our source is created. The 'stop' function stops the running thread.

At this point, we are only two functions away from our goal.

'getBatch' returns a DataFrame back to spark with data within the passed offset range.

1  override def getBatch(start: Option[Offset], end: Offset): DataFrame = this.synchronized {
2
3     val s = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset + 1
4     val e = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset + 1
5 
6     println(s"generating batch range $start ; $end")
7 
8     val data = batches
9       .par
10       .filter { case (_, idx) => idx >= s && idx <= e }
11       .map { case (v, _) => (v, v.length) }
12       .seq
13 
14     val rdd = sqlContext
15       .sparkContext
16       .parallelize(data)
17       .map { case (v, l) => InternalRow(UTF8String.fromString(v), l.toLong) }
18 
19     sqlContext.sparkSession.internalCreateDataFrame(rdd, schema, isStreaming = true)
20   }

We can see that we are getting the data from our internal buffer so that the data has the corresponding indexes. From there, we generate the DataFrame that we then send back to Spark.

Finally,'commit' is how Spark indicates to us that it will not request offsets less or equal to the one being passed. In other words, we can remove all data from our internal buffer with an offset less or equal than the one passed to 'commit'. In this way, we can save some memory and avoid running out of it.

1  override def commit(end: Offset): Unit = this.synchronized {
2
3     val committed = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset
4
5     val toKeep = batches.filter { case (_, idx) => idx > committed }
6
7     println(s"after clean size ${toKeep.length}")
8
9     println(s"deleted: ${batches.size - toKeep.size}")
10 
11     batches = toKeep
12   }

Now, we have completed our source, the entire code is the following.

1 class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
2   private var offset: LongOffset = LongOffset(-1)
3
4   private var batches = collection.mutable.ListBuffer.empty[(String, Long)]
5
6   private val incrementalThread = dataGeneratorStartingThread()
7
8   override def schema: StructType = InMemoryRandomStrings.schema
9
10   override def getOffset: Option[Offset] = this.synchronized {
11     println(s"getOffset: $offset")
12
13     if (offset.offset == -1) None else Some(offset)
14   }
15
16   override def getBatch(start: Option[Offset], end: Offset): DataFrame = this.synchronized {
17 
18     val s = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset + 1
19     val e = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset + 1
20
21     println(s"generating batch range $start ; $end")
22
23     val data = batches
24       .par
25       .filter { case (_, idx) => idx >= s && idx <= e }
26       .map { case (v, _) => (v, v.length) }
27       .seq
28 
29     val rdd = sqlContext
30       .sparkContext
31       .parallelize(data)
32       .map { case (v, l) => InternalRow(UTF8String.fromString(v), l.toLong) }
33 
34     sqlContext.sparkSession.internalCreateDataFrame(rdd, schema, isStreaming = true)
35   }
36
37   override def commit(end: Offset): Unit = this.synchronized {
38
39     val committed = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset
40 
41     val toKeep = batches.filter { case (_, idx) => idx > committed }
42 
43     println(s"after clean size ${toKeep.length}")
44
45     println(s"deleted: ${batches.size - toKeep.size}")
46 
47     batches = toKeep
48   }
49 
50   override def stop(): Unit = incrementalThread.stop()
51
52   private def dataGeneratorStartingThread() = {
53     val t = new Thread("increment") {
54       setDaemon(true)
55       override def run(): Unit = {
56 
57         while (true) {
58           try {
59             this.synchronized {
60               offset = offset + 1
61 
62               val value = Random.nextString(Random.nextInt(5))
63 
64               batches.append((value, offset.offset))
65             }
66           } catch {
67             case e: Exception => println(e)
68           }
69 
70           Thread.sleep(100)
71         }
72       }
73     }
74 
75     t.start()
76
77     t
78   }
79 }
80
81 
82 object InMemoryRandomStrings {
83   lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))
84 }

 

Using Our Custom Source

Now, we need to plug in our source into the Spark Structured Streaming API by indicating the correct format to be used.

1    val r = sparkSession
2       .readStream
3       .format("com.github.anicolaspp.spark.sql.streaming.DefaultSource")
4       .load()

In here we use the regular '.readStream' API and specify that the stream format is our implementation of 'StreamSourceProvide', that is com.github.anicolaspp.spark.sql.streaming.DefaultSource.

Now we can query our streaming source as any other DataFrame.

1   r.createTempView("w")
2
3     sparkSession
4       .sql("select ts, count(*) as c from w group by ts order by ts, c desc")
5       .writeStream
6       .format("console")
7       .outputMode(OutputMode.Complete())
8       .start()
9       .awaitTermination()

The output will look similar to this.

1 -------------------------------------------
2 Batch: 3
3 -------------------------------------------
4 +---+---+
5 | ts|  c|
6 +---+---+
7 |  0| 81|
8 |  1| 78|
9 |  2| 74|
10 |  3| 82|
11 |  4| 80|
12 +---+---+

What we see in a continuous aggregation of the data generated by our source.

 

Conclusions

Apache Spark is the way to go when processing data at scale. It features outperform almost any other tool out there. Also, it can be extended in many different ways and as we can see, we can write our own data sources and streaming sources so they can be plugged in into our spark code with easy.

The entire project and source code can be found here SparkStreamSources.

Happy Coding.'

 

This article was written by Nicolas A Perez and originally published on Medium