Dataware for data-driven transformation

MPI Workloads Performance on the MapR Data Platform, Part 2 – Matrix Multiplication

Contributed by

12 min read

Originally published 2/12/19 on Medium

In part one of this blog series, we showed how we can use MPI on top of the MapR Data Platform to successfully find prime numbers within a rather large range. Also, we compared our sieve of Eratosthenes implementation in MPI with the same algorithm in Apache Spark to discover how they both behave while looking at some interesting implementation details.

In the second part of the series, we are going to implement matrix multiplication in both MPI and Apache Spark. Again, we will look at how each implementation behaves when running on MapR. However, our MPI implementation will be based on Cannon's algorithm, while in Spark we will use the MLlib BlockMatrix functions for multiplying matrices.

Cannon's Algorithm

There are implicit implications in parallel computing. The memory requirements tend to increase as we increase the number of processors participating in the computation. This is not trivial but proven.

Cannon's algorithm is extremely scalable. That means that by increasing the number of processors, we keep the memory requirements low. The following is a scalability analysis on matrix multiplication using matrix to matrix multiplication against the block decomposition matrix multiplication used by Cannon's algorithm.

As we can see, in the first part (matrix-matrix) we don't get any scalability, since the memory depends on P – the number of processors. In the second part (Cannon's algorithm), the memory requirement is independent of the number of processors; more specifically, it is a constant which allows us to scale way better.

Cannon's Algorithm Implementation Using MPI

Now, we are going to show the main pieces of the implementation, its core. Then, we will link the entire source code to be reviewed for those interested in it.

First of all, let's look at the functions we are going to use, so we can get a better idea about the application flow.

/* set this parameter to reflect the cache line size of the particular
   machine you're running this program on */
#define CACHE_SIZE 1024

/* in case later we decide to use another data type */
#define mpitype MPI_DOUBLE
typedef double datatype;

void sum(int n, datatype *const *c, datatype *const *partial_c_matrix);


void check_input_files(char *const *argv);

/* initializes local block matrix */
datatype **init_partial_c_matrix(int n);

/* initializes local block matrix on a strip format */
datatype **init_local_c(int n, datatype **c, datatype *sc);

/* checks if a file exists */
int cfileexists(const char *filename);

/* re-construct a matrix from a strip format */
void reconstruct_matrix(int ma, int na, datatype *const *a, const datatype *sa);

/* block decomposition macros */
#define BLOCK_HIGH(id, p, n) (BLOCK_LOW((id)+1,p,n)-1)
#define BLOCK_LOW(id, p, n)  ((id)*(n)/(p))
#define BLOCK_SIZE(id, p, n) (BLOCK_HIGH(id,p,n)-BLOCK_LOW(id,p,n)+1)
#define BLOCK_OWNER(j, p, n) (((p)*((j)+1)-1)/(n))

/* Read a matrix from a file. Each processor reads only its portion of the data */
void read_checkerboard_matrix(
        char *s,              /* IN - File name */
        void ***subs,         /* OUT - 2D array */
        void **storage,       /* OUT - Array elements */
        MPI_Datatype dtype,   /* IN - Element type */
        int *rows,            /* OUT - Array rows */
        int *cols,            /* OUT - Array cols */
        MPI_Comm grid_comm);  /* IN - Communicator */

/*
 * Write a matrix distributed in checkerboard fashion to a file.
 */
void write_checkerboard_matrix(
        char *s,                /* IN -File name */
        void **a,               /* IN -2D matrix */
        MPI_Datatype dtype,     /* IN -Matrix element type */
        int m,                  /* IN -Matrix rows */
        int n,                  /* IN -Matrix columns */
        MPI_Comm grid_comm);    /* IN -Communicator */

/* recursive, block-oriented, sequential matrix multiplication */
void my_matmul(int crow, int ccol, /* corner of C block */
               int arow, int acol, /* corner of A block */
               int brow, int bcol, /* corner of B block */
               int l, int m, int n, /* block A is l*m, block B is m*n, block C is l*n */
               int N, /* matrices are N*N */
               datatype **a, datatype **b, datatype **c);  /* 2D matrices */

Notice that reading and writing the matrices from/to file happens in parallel and is distributed. Each processor is in charge or its own block (piece) of the matrix in question.

Our application then becomes the following:

int main(int argc, char *argv[]) {
    int p, p_sqrt;
    int id, coord[2];
    int dim[2], period[2];
    MPI_Comm comm;
    int ma, na, mb, nb, n;
    datatype **a, *sa;
    datatype **b, *sb;
    datatype **c, *sc;

    /* initialize MPI */
    MPI_Init(&argc, &argv);

    /* start counting time */
    MPI_Barrier(MPI_COMM_WORLD);
    double elapsed_time = -MPI_Wtime();

    /* make sure the number of arguments is correct */
    if (argc != 4) {
        my_abort("Usage: %s fileA fileB fileC\n", argv[0]);
    }

    /* create 2D cartesian communicator and obtain the system configurations */
    MPI_Comm_size(MPI_COMM_WORLD, &p);

    p_sqrt = (int) sqrt(p);

    if (p_sqrt * p_sqrt != p) {
        my_abort("Error: number of processors (p=%d) must be a square number", p);
    }

    dim[0] = dim[1] = p_sqrt;
    period[0] = period[1] = 1;
    MPI_Cart_create(MPI_COMM_WORLD, 2, dim, period, 0, &comm);
    MPI_Comm_rank(comm, &id);
    MPI_Cart_coords(comm, id, 2, coord);

    /* checking input files are accessible */
    check_input_files(argv);

    /* read the submatrix of A managed by this process */
    read_checkerboard_matrix(argv[1], (void ***) &a, (void **) &sa, mpitype, &ma, &na, comm);
    printf("id=%d, coord[%d,%d]: read submatrix of A of dims %dx%d\n", id, coord[0], coord[1], ma, na);

    if (sqrt(ma * na) != ma || sqrt(ma * na) != na) {
        my_abort("id = %d, matrix A is not squared", id);
    }

    /* read the submatrix of B managed by this process */
    read_checkerboard_matrix(argv[2], (void ***) &b, (void **) &sb, mpitype, &mb, &nb, comm);
    printf("id=%d, coord[%d,%d]: read submatrix of B of dims %dx%d\n", id, coord[0], coord[1], mb, nb);

    /* Sanity checks as necessary (such as matrix compatibility) */

    if (sqrt(mb * nb) != mb || sqrt(mb * nb) != nb) {
        my_abort("id = %d, matrix B is not squared", id);
    }

    if (ma != mb || na != nb) {
        my_abort("id = %d, matrix A and B have different dimensions", id);
    }

    if (ma % p_sqrt != 0) {
        my_abort("id = %d, sqrt(%d) = %d cannot divide the number of rows %d", id, p, p_sqrt, ma);
    }

    if (na % p_sqrt != 0) {
        my_abort("id = %d, sqrt(%d) = %d cannot divide the number of columns %d", id, p, p_sqrt, na);
    }

    /* THE CANNON’S ALGORITHM STARTS HERE */

    n = ma / p_sqrt; /* IMPORTANT: we don't have the entire matrix, only the sub */

    int source, dest;

    datatype **partial_c_matrix = init_partial_c_matrix(n);

    MPI_Cart_shift(comm, 1, coord[0], &source, &dest);
    MPI_Sendrecv_replace(sa, n * n, mpitype, dest, 0, source, 0, comm, MPI_STATUS_IGNORE);

    MPI_Cart_shift(comm, 0, coord[1], &source, &dest);
    MPI_Sendrecv_replace(sb, n * n, mpitype, dest, 0, source, 0, comm, MPI_STATUS_IGNORE);

    reconstruct_matrix(n, n, a, sa);
    reconstruct_matrix(n, n, b, sb);

    for (int i = 0; i < p_sqrt; ++i) {
        c = init_local_c(n, c, sc);

        mat_mul(n, a, b, c);

        sum(n, c, partial_c_matrix);

        MPI_Cart_shift(comm, 1, 1, &source, &dest);
        MPI_Sendrecv_replace(sa, n * n, mpitype, dest, 0, source, 0, comm, MPI_STATUS_IGNORE);

        MPI_Cart_shift(comm, 0, 1, &source, &dest);
        MPI_Sendrecv_replace(sb, n * n, mpitype, dest, 0, source, 0, comm, MPI_STATUS_IGNORE);

        reconstruct_matrix(n, n, a, sa);
        reconstruct_matrix(n, n, b, sb);
    }

    /* write the submatrix of C managed by this process */
    write_checkerboard_matrix(argv[3], (void **) partial_c_matrix, mpitype, ma, mb, comm);

    /* final timing */
    elapsed_time += MPI_Wtime();

    if (!id) {
        printf("elapsed time: %lf\n", elapsed_time);
    }

    MPI_Finalize();

    return 0;
}

As we can see, this is not a simple piece of code, but it presents the core elements of Cannon's algorithm. The entire source code can be found in this GitHub Repository.

In Spark, the same can be achieved by simple constructs. The following code snippet shows this process:

def multiply(input: String)(implicit sc: SparkContext) = {
    val mARows = sc
      .textFile(input)
      .zipWithIndex()
      .map { case (line, idx) =>
        val values = line.split(" ").map(_.toDouble)

        IndexedRow(idx, Vectors.dense(values))
      }
      .repartition(16)

    val blockMat = new IndexedRowMatrix(mARows).toBlockMatrix()

    time(blockMat.multiply(blockMat).blocks.count())._2
  }

As we can see, it is very simplistic, but let's see how fast it is compared to our MPI version.

Using MPI

1000x1000 matrix multiplication

mpirun -np 16 multiply 1000x1000.in 1000x1000.in 1000x1000.out  
elapsed time: 2.082910 // around 2 seconds

10000x10000 matrix multiplication

mpirun -np 16 multiply 10000x10000.in 10000x10000.in 10000x10000.out  
elapsed time: 307200 // around 5 mins

In Spark

1000x1000 matrix multiplication

multiply("1000x1000.txt")(sc)  
res19: Long= 4118 // around 4 seconds

10000x10000 matrix multiplication

multiply("10000x10000.txt")(sc)  
res0: Long= 973943 // around 16 mins

As we can see, the time taken by Spark increases incredibly fast, yet MPI is able to run the process with a small time penalty. At the same time, when multiplying 10000x10000 matrices, I ran out of heap space in Spark, and it had to be increased to 12 g per executor to run this operation.

Again, there is a trade-off we cannot avoid. That is simplicity and reliability from Spark, compared to high performance but difficulty of coding and maintaining from MPI.

At the end of the day, it is all about choosing the right tool for the job, and the MapR Data Platform is able to run without any problems workloads build on Spark or MPI.

Notes on MPI Reading from Files

In order to multiply matrices, our MPI application must read the corresponding input from files. These files are stored in the MapR XD Distributed File and Object Store (MapR XD), which supports different interfaces to interact with it, such as HDFS, POSIX, and others.

Our C code for MPI uses regular POSIX calls to read the matrices, so that each process reads only a part of the matrix (partitions) that we call blocks. This is a parallel process since all processors execute this operation at the same time, ultimately parallelizing the I/O as much as the underlying hardware allows it.

Spark, on the other hand, uses, when possible, data locality, which helps with speeding up the process, since each processor reads its data locally from the node it is running on. In MPI, that is dependable on the MapR POSIX client and its internal implementation.

Conclusions

Even though MPI presents some limitations when interacting with MapR XD, the overall processing time is quite impressive when compared to what Spark offers for matrix multiplication. Spark, on the other hand, offers constructs for doing these operations with ease while keeping the entire process reliable and fault-tolerant, features lacking on MPI.

The trade-off between simple usage and high performance has been there for years and cannot be ignored; being aware of it helps us to decide on every situation.

All parts of this blog post series:


This blog post was published March 07, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now