MPI Workloads Performance on the MapR Data Platform, Part 1

Contributed by

14 min read

Originally posted 2/11/19 on Medium.

In the big data world, the MapR Data Platform occupies, without question, an important place given the technology advantages it offers. The ability to run mixed workloads that includes continuous data processing with MapR Event Store for Apache Kafka, batch processing of humongous sizes based on the scalable and performant MapR XD Distributed File and Object Store (MapR XD), and storing virtually unlimited documents with any shape within MapR Database (a NoSQL database) are only a few examples of how MapR has risen to become the technological marvel it is today. However, some questions arise: is MapR able to run classic high-performance computing (HPC) workloads in a similar way to what traditional HPC systems do? Are these 20-year-old technologies able to keep up with new tools such as Apache Spark? Can we use MapR to run traditional computational libraries in the same way we run Spark?

In order to analyze our questions, we should take concrete examples and compare them to see their behaviors, which ultimately will answer the matter in question.

At the same time, we will use examples that are CPU-intensive to measure tooling performance on CPU-bound tasks. We believe that is a fair comparison, since some of the old libraries might not present storage access capabilities, such as access to HDFS, in the same way that new frameworks, such as Apache Spark, have.

We will use two specific examples: one in this blog post, and the second example in a follow-up post.

In this post, we are going to implement the Sieve of Eratosthenes, which is a classic algorithm for calculating prime numbers that is highly parallelizable. Our implementation is written in C, using MPI, the de facto standard to write multiprocessor programs in the HPC world. We will then compare it to a similar implementation using Apache Spark that will run on YARN. In both cases, we will measure the strict time taken to calculate the required primes, ignoring the overhead of job scheduling that YARN adds, for instance.

Then, in a second blog post, we will implement a matrix multiplication algorithm, which is another classic numerical problem that is CPU-intensive, highly distributed, and parallelizable. In this case, we will, again, use a pure C implementation using MPI and compare it with the default implementation offered by Spark MLlib. Of course, we could implement our own custom multiplication procedure using Spark, but the idea is to compare to what is already out there.

Notice we are using MPI and Spark, and there are some reasons for it. While MPI is considered a standard, Apache Spark has grown in the big data space and cannot be ignored when talking about distributed workloads.

Finding Prime Numbers

If there is a problem that computer science students enjoy solving, it is finding prime numbers. There are different ways to do so, from very naive solutions to using very complex ones. In our example, we will use the Sieve of Eratosthenes, but we are not adding some possible optimizations for the sake of clarity of the code. Either way, both implementations – the one using MPI and the one using Apache Spark – will not have optimizations and will focus on the raw algorithm.

The following image shows how the algorithm works for finding primes less than the value 10using 2 parallel processors. Think about scaling to a really large number of processors using the same techniques.

Now, let's implement this idea using MPI.

Let’s first start with some of the supporting functions we need. As we saw in the image above, each processor only has a piece of the data; in other words, our data is partitioned over the number of processors participating in the calculation.

The following function is called by each process and initializes the processor’s data.

low = BLOCK_LOW(id, p, n);
size = BLOCK_SIZE(id, p, n);

long *init_local_data() {
    long *data = calloc((size_t) size, sizeof(long));
    for (long i = 0; i < size; ++i) {
        data[i] = low + i;
    }

    return data;
}

Now, each processor needs to filter its own dataset based on a value k; in other words, if a value is divisible by k, then it is not prime and should be eliminated.

void filter_non_primes(long k, long *data) {
    for (int i = 0; i < size; ++i) {
        if (data[i] != k && data[i] % k == 0) {
            data[i] = 0;
        }
    }
}

After this, the next k value must be globally selected, so each processor should select its own local k and then they (all processors) should agree on a global k value that is going to be used on the next iteration. The following function selects the local k value.

long local_min(const long *data, long k) {
    long min = LONG_MAX;
    for (int i = 0; i <size; ++i) {
        if (data[i] > 0 && data[i] > k && data[i] < min) {
            min = data[i];
            break;
        }
    }
    return min;
}

Now, our application becomes the following:

long k = 1;

long *data = init_local_data();

while(k <= n) {
    long local_next_k = local_min(data, k);

    MPI_Allreduce(&local_next_k, &k, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);

    filter_non_primes(k, data);
}

Notice that the MPI_Allreduce is used to agreeon the global minimum, next k value.

The entire MPI code is the following:

#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <limits.h>

#define BLOCK_LOW(id, p, n)  ((id)*(n)/(p))
#define BLOCK_HIGH(id, p, n) (BLOCK_LOW((id)+1,p,n)-1)
#define BLOCK_SIZE(id, p, n) (BLOCK_HIGH(id,p,n)-BLOCK_LOW(id,p,n)+1)

int p, id;
long low, size, n;
int print_numbers;

long *init_local_data();
long local_min(const long *data, long k);

void filter_non_primes(long k, long *data);

int main(int argc, char *argv[]) {

    if (argc > 0) {
        n = atoi(argv[1]);
    }

    if (argc > 1) {
        print_numbers = atoi(argv[2]);
    }

    MPI_Init(&argc, &argv);

    MPI_Barrier(MPI_COMM_WORLD);
    double elapsed_time = -MPI_Wtime();

    MPI_Comm_size(MPI_COMM_WORLD, &p);
    MPI_Comm_rank(MPI_COMM_WORLD, &id);

    low = BLOCK_LOW(id, p, n);
    size = BLOCK_SIZE(id, p, n);

    long k = 1;

    long *data = init_local_data();

    while(k <= n) {

        long local_next_k = local_min(data, k);

        MPI_Allreduce(&local_next_k, &k, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);

        filter_non_primes(k, data);
    }

    elapsed_time += MPI_Wtime();

    if (!id) {
        printf("elapsed time: %lf\n", elapsed_time);
    }

    //print the prime numbers
    if (print_numbers == 1) {
        for (int i = 0; i < size; ++i) {
            if (data[i] != 0) {
                printf("[%d]: prime: %ld\n", id, data[i]);
            }
        }
    }

    free(data);

    MPI_Finalize();

    return 0;
}

void filter_non_primes(long k, long *data) {
    for (int i = 0; i < size; ++i) {
            if (data[i] != k && data[i] % k == 0) {
                data[i] = 0;
            }
        }
}

long *init_local_data() {
    long *data = calloc((size_t) size, sizeof(long));

    for (long i = 0; i < size; ++i) {
        data[i] = low + i;
    }
    return data;
}

long local_min(const long *data, long k) {
    long min = LONG_MAX;

    for (int i = 0; i <size; ++i) {
        if (data[i] > 0 && data[i] > k && data[i] < min) {
            min = data[i];
        }
    }

    return min;
}

We can try running this application on a 6-node MapR cluster by doing:

mpirun -np 6 --oversubscribe sieves_of_eratosthenes 100000 0  

elapsed time: 5.550563

As we can see, it can calculate all prime number smaller than 100000 in around 5.5 seconds.

If we run the same application with the same 6-node MapR cluster, but using a larger number of processors (we can do this because of the number of CPUs on each node), let’s see what happens.

mpirun -np 24 --oversubscribe sieves_of_eratosthenes 1000000 0  

elapsed time: 7.580463

As we can see, the application runs in an impressive time, considering the number of calculations being executed.

Now, the same application can be implemented in Spark. I will try to keep the code as simple as possible while going over the different parts of it.

The run function creates the data that is partitioned and distributed across the cluster. Then it calls the clean functions.

def run(n: Int)(implicit sc: SparkContext): RDD[Int] = {

    val data = sc.parallelize(1 to n)

    val k = 2

    clean(data, k, n, 0)
  }

The clean function filters the data based on the k value as we did before on the MPI implementation. Then it calls nextK to find the next k value.

def clean(data: RDD[Int], k: Long, n: Int, checkPointTime: Int)(implicit sc: SparkContext): RDD[Int] = {
    if (k > n) {
      data
    }
    else {
      val filtered = data.filter(v => v <= k || v % k != 0)

      clean(filtered, nextK(filtered, k), n, checkPointTime + 1)
    }
  }

The nextK function just selects the next possible value for k.

def nextK(data: RDD[Int], k: Long)(implicit sc: SparkContext) = {

    val xs = data.filter(_ > k)

    if (xs.isEmpty()) Int.MaxValue else xs.first()
}

It is important to notice that in here we are calling .first, which semantically equals the synchronization step that we implemented before using MPI_Allreduce.

The entire code looks like the next snippet:

package com.github.anicolaspp.sieves

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime

object App {

  def main(args: Array[String]): Unit = {

    val n = args(0).toInt

    val conf = new SparkConf()
      .setAppName("sieves")
      .set("spark.driver.extraJavaOptions", "-Xss40M")

    implicit val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    sc.setCheckpointDir("/Users/nperez/tmp/")


    val (primes, timeLapse) = time(run(n))

    println("time in MS: " + timeLapse)

    sc.stop()
  }


  def run(n: Int)(implicit sc: SparkContext): RDD[Int] = {

    val data = sc.parallelize(1 to n)

    val k = 2

    clean(data, k, n, 0)
  }

  def clean(data: RDD[Int], k: Long, n: Int, checkPointTime: Int)(implicit sc: SparkContext): RDD[Int] = {
    if (k > n) {
      data
    }
    else {

      if (checkPointTime % 1000 == 0) {
        val fs = FileSystem.get(sc.hadoopConfiguration)
        if (fs.exists(new Path("Users/nperez/tmp/data"))) {
          fs.delete(new Path("Users/nperez/tmp/data"), true)
        }

        data.saveAsObjectFile("Users/nperez/tmp/data")

        val reload = sc.objectFile[Int]("Users/nperez/tmp/data")

        clean(reload, k, n, 0)
      } else {

        val filtered = data.filter(v => v <= k || v % k != 0)

        clean(filtered, nextK(filtered, k), n, checkPointTime + 1)
      }
    }
  }

  def nextK(data: RDD[Int], k: Long)(implicit sc: SparkContext) = {

    val xs = data.filter(_ > k)

    if (xs.isEmpty()) Int.MaxValue else xs.first()

  }

  def time[A](fn: => A) = {

    val start = DateTime.now()

    val a = fn

    val end = DateTime.now()

    (a, end.getMillis - start.getMillis)
  }

}

Notice that there is a piece where we write our RDD down and reload it. This is necessary since the recursion step causes an issue with the DAG lineage. There is not a simple solution around it, but ours solves the problem without major complications.

As we can see, in Spark we have higher levels of abstraction, which ultimately makes our code simpler, but how fast is this code compared to our MPI code when executed on the same MapR cluster?

When running on the same MapR cluster, the Spark application is about 10x slower; part of it is the read and write of data to/from HDFS, but that only adds a small part of processing time.

Notes on MapR and MPI

We mentioned before that our tests are focused on CPU-bound operations, since MPI has some disadvantages when accessing HDFS data. However, since we are talking about MapR specifically, we could use the fully POSIX client instead of the HDFS interface. This means that using pure C and MPI, we could access the distributed file system exposed by MapR without any problems. In a follow-up blog post, we will look at how good MPI is at accessing large-scale data sets stored in MapR using the POSIX client.

Conclusion

If you thought that MPI was something from the past, you might want to reconsider. Even with lower abstractions, it is more than capable, especially when running in modern platforms like MapR. However, that puts a big question around classic HPC systems. The future is unknown, but MPI is a very well-built technology, and I will be surprised if it does not get included as part of the big data ecosystems soon enough. On the other hand, Apache Spark is the other monster in the room, and with higher abstractions, it is perfect for almost everything you can think of; thus, it makes a lot of sense to master such a tool even when sometimes it might perform worse than MPI.

The trade-off between simple usage and high performance has been there for years and cannot be ignored; being aware of it helps us to decide on every situation.

All parts of this blog post series:


This blog post was published February 28, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now