Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

MapR-DB Spark Connector Performance Tests by Nicolas A Perez

W1siziisijiwmtkvmdmvmjevmtevmdavmzuvndczl2fkdmvudhvyzs1jaxr5lwnvdw50cnktmjqwodm0lmpwzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

You know that there can be some issues with the official MapR-DB Connector for Apache Spark, but how can they be overcome?

Software Engineer, Nicolas A Perez tells us how creating a MapR-DB table can show us how two pieces of software compare to each other performance wise.

 

'In a previous post, we have described some of the issues with the official MapR-DB Connector for Apache Spark. We have also introduced an alternative way to overcome the known issues by implementing our own connector.

We have pushed our new connector out to Maven Central and it can be downloaded and used with ease.

<dependency>
  <groupId>com.github.anicolaspp</groupId>
  <artifactId>maprdbconnector_2.11</artifactId>
  <version>1.0.2</version>
</dependency>
libraryDependencies += "com.github.anicolaspp" % "maprdbconnector_2.11" % "1.0.2"

The corresponding GitHub repository is here and we accept improvements and Pull Request so we can take advantage of all the features MapR-DB offers.

One of the questions we had during the implementation of our connector was related to how fast our connector is compared to the official connector.

In order to see how these two pieces of software compare to each other performance wise, we have created a MapR-DB table and set it in the way described below. Remember that the main idea of our connector is to gain performance by using the secondary indexes defined in the MapR-DB table, a feature not being used by the official connector nowadays.

 

Set up

MapR-DB tables with a path '/user/mapr/tables/from_parquet' and with two 'string' fields:
  • '_id'
  • 'payload'

The table has '100000000' rows, that is, '100M' records and there is an index called 'payload_idx' indexing the field 'payload'.

The average 'payload' size can be calculated as follows.

val df = spark.loadFromMapRDB("/user/mapr/tables/from_parquet")

df.agg(mean(length(col("payload")))).show

+--------------------+                                                          
|avg(length(payload))|
+--------------------+
|  264.84391354843916|
+--------------------+

The table information shows that the table occupies '32.11 GB' of logical size and '28.79 GB' of space in disk.

maprcli table info -path /user/mapr/tables/from_parquet -json

{
    "timestamp":1551951201329,
    "timeofday":"2019-03-07 01:33:21.329 GMT-0800 AM",
    "status":"OK",
    "total":10,
    "data":[
        {
            "path":"/user/mapr/tables/from_parquet",
            "numregions":10,
            "totallogicalsize":34481758208,
            "totalphysicalsize":30907727872,
            "totalcopypendingsize":0,
            "totalrows":99999999,
            "totalnumberofspills":14840,
            "totalnumberofsegments":14841,
            "autosplit":true,
            "bulkload":false,
            "tabletype":"json",
            "regionsizemb":4096,
            "hasindex":true,
            ...
            "uuid":"a7095a04-da06-0101-cdde-05394c805c00"
        }
    ]
}

The 'payload_idx' index information is the following. Interesting to notice the index size.

maprcli table index list -path /user/mapr/tables/from_parquet -json

{
    "timestamp":1551951454409,
    "timeofday":"2019-03-07 01:37:34.409 GMT-0800 AM",
    "status":"OK",
    "total":1,
    "data":[
        {
            "cluster":"nico.cluster",
            "type":"maprdb.si",
            "indexFid":"2242.45.131206",
            "indexName":"payload_idx",
            "hashed":false,
            "indexState":"REPLICA_STATE_REPLICATING",
            "idx":1,
            "indexedFields":"payload:ASC",
            "isUptodate":true,
            "minPendingTS":0,
            "maxPendingTS":0,
            "bytesPending":0,
            "putsPending":0,
            "bucketsPending":0,
            "copyTableCompletionPercentage":100,
            "numTablets":32,
            "numRows":96824224,
            "totalSize":37746925568
        }
    ]
}

Now, let’s run some queries.

 

Querying

Using The MapRDB Official Connector

  • EQUAL
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload = '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
      .count()
println(s"RESULT: ${time(f1)}")
TIME: 11012
RESULT: 1
  • LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload like '67-3485%'")
      .select("_id")
      .count()
    println(s"RESULT: ${time(f1)}")
TIME: 12011
RESULT: 7
  • LESS THAN
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload <= '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
      .select("_id")
      .count()
    println(s"RESULT: ${time(f1)}")
    
    TIME: 752273            -- around 12.5 minutes
RESULT: 85975334
  • OR LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload like '67-3485-8-3299%' or payload like '3485-8-3299%'")
      .select("_id")
      .count()
IME: 162803
RESULT: 9
  • AND LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload like '67-3485-8-3299%' and payload like '3485-8-3299%'")
      .select("_id")
      .count()
      
      TIME: 21837
RESULT: 0
  • AND
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload <= '67-3485-8-3299' and payload >= '3485-8-3299'")
      .select("_id")
      .count()
      
TIME: 363296
RESULT: 13819336

 

Using The Independent MapRDBConnector

  • EQUAL
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload = '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
      .count()
    println(s"RESULT: ${time(f2)}")
    
 TIME: 889
RESULT: 1
  • LIKE
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
       .filter("payload like '67-3485%'")
      .count()
    println(s"RESULT: ${time(f2)}")
TIME: 878
RESULT: 70
  • LESS THAN
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload <= '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
      .select("_id")
      .count()
    println(s"RESULT: ${time(f2)}")
    
    TIME: 688387            ------ 11.4 
RESULT: 85975334
  • OR LIKE
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload like '67-3485-8-3299%' or payload like '3485-8-3299%'")
      .select("_id")
      .count()
      
      TIME: 231398
RESULT: 9
  • AND LIKE
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload like '67-3485-8-3299%' and payload like '3485-8-3299%'")
      .select("_id")
      .count()
      
      TIME: 31257
RESULT: 0
  • AND
val f2 =  sparkSession
      .loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
      .filter("payload <= '67-3485-8-3299' and payload >= '3485-8-3299'")
      .select("_id")
      .count()
TIME: 280387
RESULT: 13819336

We can summarize the result in the following chart (BLUE is our connector; RED is the official connector). Less is better since the chart shows time of running queries.

 

As we can see, our connector beats the official one in most of the queries. Those results are given by that our connector uses MapR-DB secondary indexes as much as it can.

There are certain situations where using secondary indexes is not feasible which implies that the queries issued to MapR-DB are similar to the ones issued by the official connector, which in those cases we are in the same time range of the official connector.

On the other hand, when secondary indexes are a good choice to be used, our connector is clearly faster while keeping the same functionality from an API perspective.'

 

This article was written by Nicolas A Perez and originally posted on Medium