Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

MPI workloads performance on MapR Data Platform Part 1 by Nicolas A Perez

W1siziisijiwmtkvmdivmtkvmtgvndmvmzmvmzy1l3blegvscy1wag90by0zmzuzotmgkdeplmpwzwcixsxbinailcj0ahvtyiisijkwmhg5mdbcdtawm2uixv0

MapR Data Platform has an important place in the Big Data World.

However, there are some questions that need to be answered and Software Engineer, Nicolas A Perez can help us answer them. Can it be used to run classing computational libraries in the same way we run Spark? Find out here!

 

 

 

In the Big Data world, the MapR Data Platform occupies, without question, an important place given the technology advantages it offers. The ability to run mixed workloads that includes continues data processing with MapR Event Store, batch processing of humongous sizes based on the scalable and performant MapR-Filesystem, and storing virtually unlimited documents with any shape within MapR-DB, the NoSQL database, are only a few examples of how MapR has risen to the technological marvel it is today. However, some questions arise, is MapR able to run classic High-Performance Computers (HPC) workloads in a similar way traditional HPCs do? Are these 20 years old technologies able to keep up with new tools such as Apache Spark? Can we use MapR for running classing computational libraries in the same way we run Spark?

In order to analyze our questions, we should take concrete examples and compare them to see their behaviors, that ultimately will answer the matter in question.

At the same time, we will use examples that are CPU intensive to measure tooling performance on CPU bound tasks. We believe that is a fair comparison since some of the old libraries might not present storage access capabilities, such as access to HDFS, in the same way, new frameworks, like Apache Spark has.

We will use two specific examples, one on this post, and the second example in a follow-up post. The first one is described below.

In this post, we are going to implement the Sieves of Eratosthenes which is a classic algorithm for calculating prime numbers that is highly parallelizable. Our implementation is written in C, using MPI, and a similar implementation using Apache Spark that will run on Yarn. In both cases, we will measure the strict time taken to calculate the required primes, ignoring the overhead of job scheduling that Yarn adds, for instance.

Then, in a second blog post, we will implement a matrix multiplication algorithm which is another classing numerical problem that is CPU intensive, highly distributed and parallelizable. In this case, we will, again, use a pure C implementation using MPI and compare it with the default implementation offered by Spark MLlib. Of course, we could implement our own custom multiplication procedure using Spark, but the idea is to compare to what is already there.

Notice we are using MPI and Spark, and there are some reasons for it. MPI is the defacto standard to write multiprocessor programs in the HPC world. On the other hand, Apache Spark has grown in the big data space and cannot be ignored when talking about distributed workloads.

 

Finding Prime Numbers

If there is a problem that Computer Science students enjoy solving, that is finding prime numbers. There are different ways to do so, from very naive solutions to using very complex ones. In our example, we will use the 'sieve of Eratosthenes', but we will are not adding some possible optimizations for the sake of clarity of the code. Either way, both implementations, the one using MPI and the one using Apache Spark, will not have optimizations and will focus on the raw algorithm.

The following image shows how the algorithm works for finding primes less than the value '10' using '2' parallel processors. Think about scaling to a really large number of processors using the same techniques.

Now, let's implement this idea using MPI.

Let’s first start by some of the supporting functions we need. As we saw in the image above, each processor only has a piece of the data, in other words, our data is partitioned over the number of processors participating in the calculation.

The following function is called by each process and initializes the processor’s data.

1  low = BLOCK_LOW(id, p, n);
2  size = BLOCK_SIZE(id, p, n);
3
4  long *init_local_data() {
5      long *data = calloc((size_t) size, sizeof(long));
6    
7      for (long i = 0; i < size; ++i) {
8          data[i] = low + i;
9      }
10     
11     return data;
12 }

Now, each processor needs to filter its own dataset based on a 'k' value, in other words, if a value is divisible by 'k', then it is not 'prime' and should be eliminated.

1 void filter_non_primes(long k, long *data) {
2     for (int i = 0; i < size; ++i) {
3         if (data[i] != k && data[i] % k == 0) {
4             data[i] = 0;
5         }
6     }
7 }

After this, the next 'k' value must be globally selected, so each processor should select it’s own local 'k' and then they (all processors) should agree on a global 'k' value that is going to be used on the next iteration. The following function selects the local 'k' value.

1 long local_min(const long *data, long k) {
2     long min = LONG_MAX;
3    
4     for (int i = 0; i <size; ++i) {
5         if (data[i] > 0 && data[i] > k && data[i] < min) {
6             min = data[i];
7         }
8     }
9     return min;
10 }

Now, our application becomes the following.

1 long k = 1;
2
3 long *data = init_local_data();
4
5 while(k <= n) {
6     long local_next_k = local_min(data, k);
7     
8     MPI_Allreduce(&local_next_k, &k, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
9     
10     filter_non_primes(k, data);
11 }

Notice that the 'MPI_Allreduce' is used to 'agree' on the global minimum, next 'k' value.

The entire MPI code is the following.

1 #include <stdio.h>
2 #include <mpi.h>
3 #include <stdlib.h>
4 #include <limits.h>
5
6 #define BLOCK_LOW(id, p, n)  ((id)*(n)/(p))
7 #define BLOCK_HIGH(id, p, n) (BLOCK_LOW((id)+1,p,n)-1)
8 #define BLOCK_SIZE(id, p, n) (BLOCK_HIGH(id,p,n)-BLOCK_LOW(id,p,n)+1)
9 
10 int p, id;
11 long low, size, n;
12 int print_numbers;
13
14 long *init_local_data();
15 long local_min(const long *data, long k);
16
17 void filter_non_primes(long k, long *data);
18
19 int main(int argc, char *argv[]) {
20
21     if (argc > 0) {
22         n = atoi(argv[1]);
23     }
24 
25     if (argc > 1) {
26         print_numbers = atoi(argv[2]);
27     }
28
29     MPI_Init(&argc, &argv);
30 
31     MPI_Barrier(MPI_COMM_WORLD);
32     double elapsed_time = -MPI_Wtime();
33
34     MPI_Comm_size(MPI_COMM_WORLD, &p);
35     MPI_Comm_rank(MPI_COMM_WORLD, &id);
36 
37     low = BLOCK_LOW(id, p, n);
38     size = BLOCK_SIZE(id, p, n);
39 
40     long k = 1;
41 
42     long *data = init_local_data();
43 
44     while(k <= n) {
45 
46         long local_next_k = local_min(data, k);
47 
48         MPI_Allreduce(&local_next_k, &k, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
49 
50         filter_non_primes(k, data);
51     }
52 
53     elapsed_time += MPI_Wtime();
54 
55     if (!id) {
56         printf("elapsed time: %lf\n", elapsed_time);
57     }
58 
59     //print the prime numbers
60     if (print_numbers == 1) {
61         for (int i = 0; i < size; ++i) {
62             if (data[i] != 0) {
63                 printf("[%d]: prime: %ld\n", id, data[i]);
64             }
65         }
66     }
67 
68     free(data);
69 
70     MPI_Finalize();
71 
72     return 0;
73 }
74 
75 void filter_non_primes(long k, long *data) {
76     for (int i = 0; i < size; ++i) {
77             if (data[i] != k && data[i] % k == 0) {
78                 data[i] = 0;
79             }
80         }
81 }
82 
83 long *init_local_data() {
84     long *data = calloc((size_t) size, sizeof(long));
85 
86     for (long i = 0; i < size; ++i) {
87         data[i] = low + i;
88     }
89     return data;
90 }
91 
92 long local_min(const long *data, long k) {
93     long min = LONG_MAX;
94 
95     for (int i = 0; i <size; ++i) {
96         if (data[i] > 0 && data[i] > k && data[i] < min) {
97             min = data[i];
98         }
99     }
100 
101     return min;
102 }

We can try running this application on a 6 node MapR cluster by doing

mpirun -np 6 --oversubscribe sieves_of_eratosthenes 100000 0

elapsed time: 5.550563

As we can see it can, it calculates all prime number smaller than '100000' in around 5.5 seconds.

If we run the same application with the same 6 nodes MapR cluster, but using a larger number of processor (we can do this because of the number of CPUs on each node) let’s see what happens.

mpirun -np 24 --oversubscribe sieves_of_eratosthenes 1000000 0

elapsed time: 7.580463

As we can see, the application runs on impressive time considering the number of calculations being executed.

Now, the same application can be implemented in Spark. I will try to keep the code as simple as possible while going over the different parts of it.

The 'run' function creates the data that is partitioned and distributed across the cluster. Then it calls the 'clean' functions.

1 
2   def run(n: Int)(implicit sc: SparkContext): RDD[Int] = {
3
4     val data = sc.parallelize(1 to n)
5 
6     val k = 2
7 
8     clean(data, k, n, 0)
9   }

The 'clean' function filters the data based on the 'k' value as we did before on the MPI implementation. Then it calls 'nextK' to find the next 'k' value.

1 def clean(data: RDD[Int], k: Long, n: Int, checkPointTime: Int)(implicit sc: SparkContext): RDD[Int] = {
2     if (k > n) {
3       data
4     }
5     else {
6       val filtered = data.filter(v => v <= k || v % k != 0)
7 
8       clean(filtered, nextK(filtered, k), n, checkPointTime + 1)
9     }
10   }

The 'nextK' function just select the next possible value for 'k'.

1 def nextK(data: RDD[Int], k: Long)(implicit sc: SparkContext) = {
2 
3     val xs = data.filter(_ > k)
4 
5     if (xs.isEmpty()) Int.MaxValue else xs.first()
6
7 }

It is important to notice that in here we are calling '.first' that semantically equals the synchronization step that we implemented before using 'MPI_Allreduce'.

The entire code looks like the next snippet.

1 package com.github.anicolaspp.sieves
2
3 import org.apache.hadoop.fs.{FileSystem, Path}
4 import org.apache.spark._
5 import org.apache.spark.rdd.RDD
6 import org.joda.time.DateTime
7 
8 object App {
9 
10   def main(args: Array[String]): Unit = {
11
12     val n = args(0).toInt
13
14     val conf = new SparkConf()
15       .setAppName("sieves")
16       .set("spark.driver.extraJavaOptions", "-Xss40M")
17 
18     implicit val sc = new SparkContext(conf)
19     sc.setLogLevel("ERROR")
20     sc.setCheckpointDir("/Users/nperez/tmp/")
21
22
23     val (primes, timeLapse) = time(run(n))
24
25     println("time in MS: " + timeLapse)
26 
27     sc.stop()
28   }
29
30
31   def run(n: Int)(implicit sc: SparkContext): RDD[Int] = {
32 
33     val data = sc.parallelize(1 to n)
34 
35     val k = 2
36 
37     clean(data, k, n, 0)
38   }
39
40   def clean(data: RDD[Int], k: Long, n: Int, checkPointTime: Int)(implicit sc: SparkContext): RDD[Int] = {
41     if (k > n) {
42       data
43     }
44     else {
45 
46       if (checkPointTime % 1000 == 0) {
47         val fs = FileSystem.get(sc.hadoopConfiguration)
48         if (fs.exists(new Path("Users/nperez/tmp/data"))) {
49           fs.delete(new Path("Users/nperez/tmp/data"), true)
50         }
51 
52         data.saveAsObjectFile("Users/nperez/tmp/data")
53 
54         val reload = sc.objectFile[Int]("Users/nperez/tmp/data")
55 
56         clean(reload, k, n, 0)
57       } else {
58 
59         val filtered = data.filter(v => v <= k || v % k != 0)
60 
61         clean(filtered, nextK(filtered, k), n, checkPointTime + 1)
62       }
63     }
64   }
65 
66   def nextK(data: RDD[Int], k: Long)(implicit sc: SparkContext) = {
67 
68     val xs = data.filter(_ > k)
69 
70     if (xs.isEmpty()) Int.MaxValue else xs.first()
71 
72   }
73 
74   def time[A](fn: => A) = {
75 
76     val start = DateTime.now()
77 
78     val a = fn
79 
80     val end = DateTime.now()
81 
82     (a, end.getMillis - start.getMillis)
83   }
84
85 }

Notice that there is a piece where we write our RDD down and reload it. This is necessary since the recursion step causes an issue with the DAG linage. There is not a simple solution around it, but ours solves the problem without major complications.

As we can see, in Spark we have higher levels of abstraction which ultimately makes our code simpler, but how fast is this code compare to our MPI code when executed on the same MapR cluster?

When running on the same MapR cluster, the Spark application is about 10x slower, part of it is the read and write of data to/from HDFS, but that only adds a small part of processing time.

 

Notes on MapR and MPI

We mentioned before that our tests are focused on CPU bound operations since MPI has some disadvantages when accessing HDFS data. However, since we are talking about MapR specifically, we could use the fully POSIX client instead of the HDFS interface. This means, that using pure C and MPI, we could access the distributed file system exposed by MapR without any problems. In a follow-up post, we will look at how good MPI is at accessing large scale data sets stored in MapR using the POSIX client.

Conclusions

If you thought that MPI was something from the past, you might want to reconsider. Even with lower abstractions, it is more than capable, especially when running in modern platforms like MapR. However, that puts a big question around classical HPC systems. The future is unknown, but MPI is a very well built technology, I will be surprised if it does not get included as part of the big data ecosystems soon enough. On the other hand, Apache Spark is the other monster in the room, and with higher abstractions, it is perfect for almost everything you can think of, thus it makes a lot of sense to master such tool even when sometimes it might perform worst than MPI.

The tradeoff between simple usage and high performance has been there for years and cannot be ignored, being aware of it helps us to decide on every situation. '
 
This article was written by Nicolas A Perez and posted originally on Medium.