Bulk Loading Data into HBase with Spark

There are two options for bulk loading data into HBase with Spark:

Note: The bulk load operation is currently not supported for MapR-DB.
Basic bulk load functionality
The basic bulk load functionality works for cases where your rows have millions of columns and cases where your columns are not consolidated.
Thin-record bulk load option
The thin-record bulk load option with Spark is designed for tables that have less then 10,000 columns per row. The advantage of this second option is higher throughput and less overall load on the Spark shuffle operation.

Both implementations work more or less like the MapReduce bulk load process. A partitioner partitions the rowkeys based on region splits, and the row keys are sent to the reducers in order, so that HFiles can be written out directly from the reduce phase.

In Spark terms, the bulk load is implemented around a SparkrepartitionAndSortWithinPartitions followed by a Spark foreachPartition. Here is an example of using the basic bulk load functionality:

Bulk Loading Example

val sc = new SparkContext("local", "test")
  val config = new HBaseConfiguration()

  val hbaseContext = new HBaseContext(sc, config)

  val tableName = "/my_table"
  val stagingFolder = "/home/mapr"
  val columnFamily1 = "cf1"
  val rdd = sc.parallelize(Array(
    (toBytes("1"), (toBytes(columnFamily1), toBytes("a"), toBytes("foo1"))),
    (toBytes("3"), (toBytes(columnFamily1), toBytes("b"), toBytes("foo2.b")))
  ))

  rdd.hbaseBulkLoad(hbaseContext,
    TableName.valueOf(tableName),
    t => {
      val rowKey = t._1
      val family: Array[Byte] = t._2._1
      val qualifier = t._2._2
      val value: Array[Byte] = t._2._3

      val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

      Seq((keyFamilyQualifier, value)).iterator
    },
    stagingFolder)

  val connection = HBaseConnectionCache.getConnection(config)
  val table = connection.getTable(TableName.valueOf(tableName))
  val load = new LoadIncrementalHFiles(config)
  load.doBulkLoad(
    new Path(stagingFolder),
    connection.getAdmin,
    table,
    connection.getRegionLocator(TableName.valueOf(tableName)))
}

Required Parameters for Bulk Loading with Spark

The hbaseBulkLoad function takes three required parameters:
  • The table name of the table you intend to bulk load to.
  • A function that converts a record in the RDD to a tuple key-value pair, with the tuple key being a KeyFamilyQualifer object and the value being the cell value. The KeyFamilyQualifer object holds the RowKey, Column Family, and Column Qualifier. The shuffle partitions on the RowKey but sorts by all three values.
  • The temporary path for the HFile to be written out to. Following the Spark bulk load command, use the HBase LoadIncrementalHFiles object to load the newly created HFiles into HBase.

Additional Parameters for Bulk Loading with Spark

You can set the following attributes with additional parameter options on hbaseBulkLoad:
  • Max file size of the HFiles
  • A flag to exclude HFiles from compactions
  • Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding

The following example shows the use of additional parameters:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
  (Bytes.toBytes("1"),
    (Bytes.toBytes(columnFamily1),
      Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  (Bytes.toBytes("3"),
    (Bytes.toBytes(columnFamily1),
      Bytes.toBytes("b"),
      Bytes.toBytes("foo2.b"))), ...

val familyHBaseWriterOptions =
  new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")

familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
    val rowKey = t._1
    val family:Array[Byte] = t._2._1
    val qualifier = t._2._2
    val value = t._2._3

    val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

    Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath,
  familyHBaseWriterOptions,
  compactionExclude = false,
  HConstants.DEFAULT_MAX_FILE_SIZE)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

Thin-Record Bulk Load Example

The following example shows how to call the thin-record bulk load implementation:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      ("1",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1

        val familyQualifiersValues = new FamiliesQualifiersValues
        t._2.foreach(f => {
          val family:Array[Byte] = f._1
          val qualifier = f._2
          val value:Array[Byte] = f._3

          familyQualifiersValues +=(family, qualifier, value)
        })
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
      },
      stagingFolder.getPath,
      new java.util.HashMap[Array[Byte], 
	FamilyHFileWriteOptions],
      compactionExclude = false,
      20)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(
	new Path(stagingFolder.getPath),
	conn.getAdmin,
	table,
	conn.getRegionLocator(TableName.valueOf(tableName)))

The big difference in using bulk load for thin rows is that the function returns a tuple with the first value being the row key and the second value being an object of FamiliesQualifiersValues. FamiliesQualifiersValues contains all the values for this row for all column families.