Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

MapR-DB Spark Connector with Secondary Indexes by Nicolas A Perez

W1siziisijiwmtkvmdmvmtevmtyvmdkvmjavodu5l3blegvscy1wag90by0ynjy1mjyuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

MapR-DB is a powerful tool!

How powerful? Find out how it interacts with Spark Connector in this interesting article by Software Engineer, Nicolas A Perez.

 

 

 

MapR Data Platform offers significant advantages over any other tool on the big data space. MapR-DB is one of the core components of the platform and it offers state of the art capabilities that blow away most of the NoSQL databases out there.

An important add-on to MapR-DB is the ability to use, for writing and querying, Apache Spark through the Connector for Apache Spark. Using this connector comes very handy since it can read and write from spark to MapR-DB using the different Spark APIs such as RDDs, DataFrames, and Streams.

Using the connector we can issue queries like the following one.

1 val df: DataFrame = sparkSession.loadFromMapRDB("/tmp/user_profiles", someSchema)

The resulting type is a 'Dataframe' that we can use as any other Dataframe from any other source, as we normally do in Spark.

If we then filter our data set out, problems start to emerge. For instance, let’s look at the following query.

1 val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
2
3 val filteredDF = df.filter("first_name = 'Bill'")

The filter is being pushed down, so MapR-DB does the filtering and only sends back the data that complies with the filter reducing the amount of data transferred between MapR-DB and Spark. However, if there is an index created on the field first-name, the index is ignored and the table is fully scanned, trying to find the rows that comply with the filter.

By having an index on a field, we expect to use it so queries on that fields are optimized, ultimately speeding up the computation. The provided connector is simply not using this capability.

 

Necessity

Our team, MapR Professional Services, knows that filtering using MapR-DB secondary indexes is huge for performance and since many of our customers do actually try to take advantages of this feature (secondary indexes) we have taken different approaches in order to force the use of the indexes when using Spark.

The following post was written by a fellow coworker, How to use secondary indexes in Spark with OJAI, where he explains some ways to overcome the issue on hand.

Even when we can take some shortcuts, we have to give up some of the of nice constructs the default connector has such as '.loadFromMapRDB(...)'.

 

An Independent Connector

In the past, I have extended Apache Spark in too many ways. I have written my own Custom Data Sources and most recently a Custom Streaming Source for Spark Structured Streams.

Once again, I have sailed in the adventure to write my own Spark Data Source, but this time for MapR-DB so we leverage the full advantages of secondary indexes while keeping the same API the Current MapR-DB Connector for Apache Spark has.

At the end of this post, we will be able to write a query in the following way while fully using secondary indexes.

1 val schema = StructType(Seq(StructField("_id", StringType), StructField("uid", StringType)))
2
3 val data = sparkSession
4   .loadFromMapRDB("/user/mapr/tables/data", schema)
5   .filter("uid = '101'")
6   .select("_id")
7
8 data.take(3).foreach(println)

Spark Data Sources Version 2

The following data source implementation uses spark 2.3.1 and uses the data source API V2.

Let’s start by looking at the things we need.

  1. ReadSupportWithSchema, allows us to create a DataSourceReader.
  2. DataSourceReader, allows us to get the schema for our data while we need to specify how to create a DataReaderFactory.
  3. SupportsPushDownFilters, allows us to intercept the query filters so we can push them down to MapR-DB.
  4. SupportsPushDownRequiredColumns, allows us to intercept the query projections so we can push them down to MapR-DB.

Let’s start by implementing ReadSupportWithSchema.

1 class Reader extends ReadSupportWithSchema {
2 
3   override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
4 
5     val tablePath = options.get("path").get()
6 
7     new MapRDBDataSourceReader(schema, tablePath)
8   }
9 }

As we can see, we simply get the table path and the schema we want to use when reading from MapR-DB. Then we pass them to MapRDBDataSourceReader.

 

MapRDBDataSourceReader

MapRDBDataSourceReader implements 'DataSourceReader' and we are also mixing in 'SupportsPushDownFilters' and 'SupportsPushDownRequiredColumns' to indicate that we want to push filters and projections down to MapR-DB.
1  class MapRDBDataSourceReader(schema: StructType, tablePath: String)
2    extends DataSourceReader
3      with SupportsPushDownFilters
4      with SupportsPushDownRequiredColumns {
5       
6    private var projections: Option[StructType] = None
7         
8    override def readSchema(): StructType = ???
9 
10   override def pushFilters(filters: Array[Filter]): Array[Filter] = ???
11  
12   override def pushedFilters(): Array[Filter] = ???
13       
14   override def pruneColumns(requiredSchema: StructType): Unit = ???
15 
16   override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???
17 
18
19 }

The 'projections' variable will hold the schema we want to project if any. In case we don’t explicitly project fields by doing '.select' we will project all the fields on the 'schema' variable.

'readSchema' works in conjunction with 'projections' and 'pruneColumns'. If in our Spark query we specify a 'select' then the selected fields are passed to 'pruneColumns' and those are the only fields we will bring from MapR-DB.

1  class MapRDBDataSourceReader(schema: StructType, tablePath: String)
2    extends DataSourceReader
3      with SupportsPushDownFilters
4      with SupportsPushDownRequiredColumns {
5        
6    private var projections: Option[StructType] = None
7       
8    override def readSchema(): StructType = projections match {
9      case None                  => schema
10     case Some(fieldsToProject) => fieldsToProject
11   }
12       
13   override def pruneColumns(requiredSchema: StructType): Unit = projections = Some(requiredSchema)
14 
15   override def pushFilters(filters: Array[Filter]): Array[Filter] = ???
16   
17   override def pushedFilters(): Array[Filter] = ???
18       
19   override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???
20
21 }

'pushFilters' indicates what filters we have specified in the 'where' or 'filter' clause in our Spark query. Basically, we have to decide which of those we want to push down to MapR-DB, the other ones will be applied by Spark after the data is in memory.

1  class MapRDBDataSourceReader(schema: StructType, tablePath: String)
2    extends DataSourceReader
3      with SupportsPushDownFilters
4      with SupportsPushDownRequiredColumns {
5         
6    private var projections: Option[StructType] = None
7      
8    override def readSchema(): StructType = projections match {
9      case None                  => schema
10     case Some(fieldsToProject) => fieldsToProject
11   }
12       
13   override def pruneColumns(requiredSchema: StructType): Unit = projections = Some(requiredSchema)
14 
15   private var supportedFilters: List[Filter] = List.empty
16       
17   override def pushFilters(filters: Array[Filter]): Array[Filter] = {
18     val (supported, unsupported) = filters.partition {
19       case EqualTo(_, _)     => true
20       case GreaterThan(_, _) => true
21 
22       case _ => false
23     }
24 
25     supportedFilters = supported.toList
26 
27     unsupported
28   }
29 
30   override def pushedFilters(): Array[Filter] = supportedFilters.toArray
31       
32   override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???
33 }

In the snippet above, we are indicating we are going to push down only two types of filters, 'EqualTo' and 'GreaterThan'. Any other filter besides these two will not be pushed down and the filtering will happen in memory (spark memory) after the data is loaded from MapR-DB.

We are working on adding more filters to match the current MapR-DB Connector.

'createDataReaderFactories' creates a list of data readers that actually do the heavy work of reading from our source, MapR-DB. In our case, we are only creating one data reader, but ideally, we have one reader for each MapR-DB region/partition so we can take advantage of parallelism offered by MapR-DB.

1  class MapRDBDataSourceReader(schema: StructType, tablePath: String)
2    extends DataSourceReader
3      with SupportsPushDownFilters
4      with SupportsPushDownRequiredColumns {
5        
6    private var projections: Option[StructType] = None
7        
8    override def readSchema(): StructType = projections match {
9      case None                  => schema
10     case Some(fieldsToProject) => fieldsToProject
11   }
12       
13   override def pruneColumns(requiredSchema: StructType): Unit = projections = Some(requiredSchema)
14 
15   private var supportedFilters: List[Filter] = List.empty
16       
17   override def pushFilters(filters: Array[Filter]): Array[Filter] = {
18     val (supported, unsupported) = filters.partition {
19       case EqualTo(_, _)     => true
20       case GreaterThan(_, _) => true
21 
22       case _ => false
23     }
24 
25     supportedFilters = supported.toList
26 
27     unsupported
28   }
29 
30   override def pushedFilters(): Array[Filter] = supportedFilters.toArray
31       
32   override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] =
33     List(new MapRDBDataReaderFactory(tablePath, supportedFilters, readSchema()))
34 }

 

MapRDBDataReaderFactory

We are almost done, yet, the most important parts is about to come.

The 'MapRDBDataReaderFactory' is where we actually build the MapR-DB query and execute it again our MapR-DB table. Notice we are passing the table we are going to read from, the filters and projections we want to push down.

1 class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
2   extends DataReaderFactory[Row] {
3 
4   override def createDataReader(): DataReader[Row] = ???
5 }

Now we need to connect to MapR-DB by opening a connection and creating a document store object.

1  class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
2    extends DataReaderFactory[Row] {
3     
4    import org.ojai.store._
5    import scala.collection.JavaConverters._
6
7    @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")
8 
9    @transient private lazy val store: DocumentStore = connection.getStore(table)
10
11   override def createDataReader(): DataReader[Row] = ???
12 }

'createFilterCondition' builds the query condition we want to execute against MapR-DB. This is the most important part of our entire implementation.

1  class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
2    extends DataReaderFactory[Row] {
3    
4    import org.ojai.store._
5    import scala.collection.JavaConverters._
6 
7    @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")
8 
9    @transient private lazy val store: DocumentStore = connection.getStore(table)
10 
11   override def createDataReader(): DataReader[Row] = ???
12     
13   private def createFilterCondition(filters: List[Filter]): QueryCondition = {
14 
15     filters.foldLeft(connection.newCondition().and()) { (condition, filter) =>
16       filter match {
17         case EqualTo(field, value: String) => condition.is(field, QueryCondition.Op.EQUAL, value)
18         case EqualTo(field, value: Int) => condition.is(field, QueryCondition.Op.EQUAL, value)
19         case GreaterThan(field, value: String) => condition.is(field, QueryCondition.Op.GREATER, value)
20         case GreaterThan(field, value: Int) => condition.is(field, QueryCondition.Op.GREATER, value)
21       }
22     }
23     .close()
24     .build()
25   }
26 }
27
28

In here we are combining all the filters. As we can see, we are implementing our two supported filters only for two data types, but we are working on extending this implementation to match the current MapR-DB Connector.

'query' creates the final command to be sent to MapR-DB. This task is a matter of applying the query condition and the projections to our 'connection' object.

1  class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
2    extends DataReaderFactory[Row] {
3     
4    import org.ojai.store._
5    import scala.collection.JavaConverters._
6
7    @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")
8 
9    @transient private lazy val store: DocumentStore = connection.getStore(table)
10 
11   override def createDataReader(): DataReader[Row] = ???
12     
13   private def createFilterCondition(filters: List[Filter]): QueryCondition = {
14 
15     filters.foldLeft(connection.newCondition().and()) { (condition, filter) =>
16       filter match {
17         case EqualTo(field, value: String) => condition.is(field, QueryCondition.Op.EQUAL, value)
18         case EqualTo(field, value: Int) => condition.is(field, QueryCondition.Op.EQUAL, value)
19         case GreaterThan(field, value: String) => condition.is(field, QueryCondition.Op.GREATER, value)
20         case GreaterThan(field, value: Int) => condition.is(field, QueryCondition.Op.GREATER, value)
21       }
22     }
23     .close()
24     .build()
25   }
26 
27   private def query = {
28     val condition = createFilterCondition(filters)
29     
30     val query = connection
31       .newQuery()
32       .select(schema.fields.map(_.name): _*)  // push projections down
33       .where(condition)                       // push filters down
34       .build()
35     
36     query
37   }
38 }
It is very important to notice that since we are using OJAI, it will automatically use any secondary indexes for fields that are part of the filters we are applying. Make sure you check the output at the end of this post.

'documents' is a stream of data coming from MapR-DB based on 'query'.

1  class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
2    extends DataReaderFactory[Row] {
3    
4    import org.ojai.store._
5    import scala.collection.JavaConverters._
6  
7    @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")
8 
9    @transient private lazy val store: DocumentStore = connection.getStore(table)
10 
11   override def createDataReader(): DataReader[Row] = ???
12     
13   private def createFilterCondition(filters: List[Filter]): QueryCondition = {
14 
15     filters.foldLeft(connection.newCondition().and()) { (condition, filter) =>
16       filter match {
17         case EqualTo(field, value: String) => condition.is(field, QueryCondition.Op.EQUAL, value)
18         case EqualTo(field, value: Int) => condition.is(field, QueryCondition.Op.EQUAL, value)
19         case GreaterThan(field, value: String) => condition.is(field, QueryCondition.Op.GREATER, value)
20         case GreaterThan(field, value: Int) => condition.is(field, QueryCondition.Op.GREATER, value)
21       }
22     }
23     .close()
24     .build()
25   }
26 
27   private def query = {
28     val condition = createFilterCondition(filters)
29     
30     val query = connection
31       .newQuery()
32       .select(schema.fields.map(_.name): _*)  // push projections down
33       .where(condition)                       // push filters down
34       .build()
35     
36     query
37   }
38 
39   @transient private lazy val documents = {
40     val queryResult = store.find(query)
41     
42     println(s"QUERY PLAN: ${queryResult.getQueryPlan}")
43     
44     queryResult.asScala.iterator
45   } 
46 }

'createDataReader' uses the stream we have created ('documents') to do the actual reading and returning the data back to Spark.

1 
2  class MapRDBDataReaderFactory(table: String, filters: List[Filter], schema: StructType) 
3    extends DataReaderFactory[Row] {
4    
5    import org.ojai.store._
6    import scala.collection.JavaConverters._
7
8    @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")
9 
10   @transient private lazy val store: DocumentStore = connection.getStore(table)
11 
12   override def createDataReader(): DataReader[Row] = new DataReader[Row] {
13     override def next(): Boolean = documents.hasNext
14     
15     override def get(): Row = {
16       val document = documents.next()
17       val values = schema.fields.map(_.name).foldLeft(List.empty[String])((xs, name) => document.getString(name) :: xs).reverse
18       Row.fromSeq(values)
19     }
20     
21     override def close(): Unit = {
22       store.close()
23       connection.close()
24     }
25   }
26     
27   private def createFilterCondition(filters: List[Filter]): QueryCondition = {
28 
29     filters.foldLeft(connection.newCondition().and()) { (condition, filter) =>
30       filter match {
31         case EqualTo(field, value: String) => condition.is(field, QueryCondition.Op.EQUAL, value)
32         case EqualTo(field, value: Int) => condition.is(field, QueryCondition.Op.EQUAL, value)
33         case GreaterThan(field, value: String) => condition.is(field, QueryCondition.Op.GREATER, value)
34         case GreaterThan(field, value: Int) => condition.is(field, QueryCondition.Op.GREATER, value)
35       }
36     }
37     .close()
38     .build()
39   }
40 
41   private def query = {
42     val condition = createFilterCondition(filters)
43     
44     val query = connection
45       .newQuery()
46       .select(schema.fields.map(_.name): _*)  // push projections down
47       .where(condition)                       // push filters down
48       .build()
49     
50     query
51   }
52 
53   @transient private lazy val documents = {
54     val queryResult = store.find(query)
55     
56     println(s"QUERY PLAN: ${queryResult.getQueryPlan}")
57     
58     queryResult.asScala.iterator
59   } 
60 }

 

Using our Connector

At this point, we are ready to plug in our custom data source into spark in the following way.

1 sparkSession
2   .read
3   .format("com.github.anicolaspp.spark.sql.Reader")
4   .schema(schema)
5   .load(path)
This allows us to use our own way to read from MapR-DB so that any filter being applied that is part of a secondary index on the physical table will be used to optimize the reading.
 

Syntax

In order to maintain a similar API to the one offered by the default MapR-DB Connector, we added some syntax to our library in the following way.

1  object MapRDB {
2 
3    implicit class ExtendedSession(sparkSession: SparkSession) {
4
5      def loadFromMapRDB(path: String, schema: StructType): DataFrame = {
6 
7        // maybe find table partition information here and send it down to the data source
8
9        sparkSession
10         .read
11         .format("com.github.anicolaspp.spark.sql.Reader")
12         .schema(schema)
13         .load(path)
14     }
15   }
16 
17 }

We can now use our connector in the same way we used to use the default connector.

1 val schema = StructType(Seq(StructField("_id", StringType), StructField("uid", StringType)))
2 
3 val data = sparkSession
4   .loadFromMapRDB("/user/mapr/tables/data", schema)
5   .filter("uid = '101'")
6   .select("_id")
7 
8 data.take(3).foreach(println)

 

Using MapR-DB Secondary Indexes

When we run the code above, the TRACE output from OJAI looks similar to the following.

1  QUERY PLAN: {"QueryPlan":[
2    [{
3      "streamName":"DBDocumentStream",
4      "parameters":{
5        "queryConditionPath":false,
6        "indexName":"uid_idx",
7        "projectionPath":[
8          "uid",
9          "_id"
10       ],
11       "primaryTable":"/user/mapr/tables/data"
12     }
13   }
14   ]
15 ]}

Notice that it automatically uses the index called uid_idx which is an index for the field 'uid' that at the same time is the field being used in the spark filter.

 

Conclusions

MapR-DB is a powerful tool that runs as part of the MapR Data Platform. The Spark Connector offers an interesting way to interact with MapR-DB since it allows us to use all Spark constructs at scale when working with this NoSQL system. However, some times the default connector falls short because it does not use the secondary index capabilities of MapR-DB when we need them the most.

On the other hand, our implementation mimics the Connector API and ensures that the implemented Spark data source uses MapR-DB secondary indexes since it relies on pure OJAI queries that are able to support secondary indexes out of the box.


Our library code can be found here MapRDBConnector.


Disclaimer: This is an independent effort to improve querying MapR-DB. This library is not a substitute to the official Connector for Apache Spark offered by MapR as part of its distribution.'

 

 

This article was written by Nicolas A Perez and posted originally on hackernoon.com