Using the Custom Partitioner

In any distributed computing system, partitioning data is crucial to achieve the best performance. Spark provides a mechanism to register a custom partitioner for partitioning the pipeline.

The MapR-DB OJAI Connector for Apache Spark includes a custom partitioner that takes the following classes as keys:

  • String
  • ByteBuffer (as serializable ByteBuffer)

This custom partitioner can be registered with either the partitionBy function or the repartitionAndSortWithinPartitions function.

There are two versions of the custom partitioner. One version takes a table name (MapR-DB JSON table) as an input. The partition info of the table is used to partition the data, so the saveToMapRDB call can use a bulkInsert to store the data. It is a requirement of the bulkInsert to have the data already sorted on the _id key for full bulkInsert.

The other version of the custom partitioner takes an array of splits as an input. The following examples show the use of the two versions of the custom partitioner.

Specifying tablename for the Partitioner

If you already have a table that has been created and partitioned based on a set of keys, you can can specify that the RDD be partitioned in the same way (using the same set of keys). In the following example, /srctable is provided as a reference partitioner for /dsttable:

sc.loadFromMapRDB("/srctable")
  .keyBy(doc => doc._id[String])
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]("/dsttable"))
  .saveToMapRDB("/dsttable", createTable = false, bulkInsert = true)

Specifying a String Seq as Splits

In the following example, the first line creates an array of splits as id1, id2 ... id9. The rest of the example splits the RDD based on the array of splits (id1, id2 ... id9):

val dstSplits: Array[String] = (1 to 9 by 3).map("id" + _).toArray
val partitionRDD = sc.loadFromMapRDB("/srctable")
  .keyBy(doc => doc._id[String])
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String](dstSplits)).saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)

Specifying a ByteBuffer Seq as Splits

Suppose you have an array of byte buffers to be given as splits to the partitioner. The byte buffers must be converted to serializable byte buffers before they are given as splits:

//Converting bytebuffer to serializable bytebuffer
val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))
sc.loadFromMapRDB("/srctable")
  //KeyBy serializable bytebuffer
  .keyBy(doc => doc.getBinarySerializable(binaryField))
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner(dstSplits))
  .saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)

Specifying tablename for the Partitioner with ByteBuffer as Id Fields

Suppose you have a table with keys that are binary or ByteBuffer, and you have an RDD with some rows and some values. You can repartition the RDD based on the partitions of the table. The following example reads the document from/srctable, but you could provide any table. In the second line, the example specifies a .keyBy on an Id that is binary serializable. In the last line, /dsttable is the RDD that has a key of serializable ByteBuffers:

sc.loadFromMapRDB("/srctable")
  .keyBy(doc => doc.getIdBinarySerializable())
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[ByteBuffer]("/dsttable"))
Note: You must provide the key type of the PairedRDD on which the partitioning is specified. If the ids are serializable bytebuffers, specify ByteBuffer. Otherwise, specify String.

Once the data is partitioned with the custom partitioner, all the downstream transformations should be non-partition-changing transformations. Here is the code for passing on partitioner for an RDD:

user_profiles.repartitionAndSortWithinPartitions
(MapRDBSpark.newPartitioner[String](<table-name>)) 

Or you can use the partitionBy function on the RDD:

user_profiles.partitionBy(MapRDBSpark.newPartitioner[String](<table-name>))

The key of the data for this partitioner should be of the same type as that of the key of the table name. This partitioner yields a single partition if the table supplied to it is not pre-split. The number of partitions is calculated based on the table’s existing tablet info.

For a table created with the bulkInsert option set to true:

  1. If the table is pre-split, then the resulting partitions can be > 1.
  2. If the table is no-split, then the resulting partitions will be 1 if no partition information is available from the RDD lineage.