Bulk Loading Data into HBase with Spark

There are two options for bulk loading data into HBase with Spark:

Note: The bulk load operation is currently not supported for MapR Database.
Basic bulk load functionality
The basic bulk load functionality works for cases where your rows have millions of columns and cases where your columns are not consolidated.
Thin-record bulk load option
The thin-record bulk load option with Spark is designed for tables that have fewer then 10,000 columns per row. The advantage of this option is higher throughput and less overall load on the Spark shuffle operation.

Both implementations work more or less like the MapReduce bulk load process. A partitioner partitions the RowKeys based on region splits, and the RowKeys are sent to the reducers in order, so that HFiles can be written directly from the reduce phase.

In Spark terms, the bulk load is implemented around a SparkrepartitionAndSortWithinPartitions followed by a Spark foreachPartition. Here is an example of using the basic bulk load functionality:

Bulk Loading Example

Note: Before executing the following example by using Spark Shell, you must create a table in HBase Shell. Run the code in :paste mode.
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{HBaseAdmin, HConnectionManager}
 val tableName = "table1"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    @transient val conf = HBaseConfiguration.create()
    val hbaseContext = new HBaseContext(sc, conf)
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val rdd = sc.parallelize(Array(
      (toBytes("1"), (toBytes(columnFamily1), toBytes("a"), toBytes("foo1"))),
      (toBytes("3"), (toBytes(columnFamily1), toBytes("b"), toBytes("foo2.b")))
    ))
    rdd.hbaseBulkLoad(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val family: Array[Byte] = t._2._1
        val qualifier = t._2._2
        val value: Array[Byte] = t._2._3
        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
        Seq((keyFamilyQualifier, value)).iterator
      },
      stagingFolder)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(
      new Path(stagingFolder),
      connection.getAdmin,
      table,
      connection.getRegionLocator(TableName.valueOf(tableName)))

Required Parameters for Bulk Loading with Spark

The hbaseBulkLoad function takes three required parameters:
  • The name of the table you intend to bulk load to.
  • A function that converts a record in the RDD to a tuple key-value pair, with the tuple key being a KeyFamilyQualifer object and the value being the cell value. The KeyFamilyQualifer object holds the RowKey, Column Family, and Column Qualifier. The shuffle partitions on the RowKey but sorts by all three values.
  • The temporary path for the HFile to be written out to. Following the Spark bulk load command, use the HBase LoadIncrementalHFiles object to load the newly created HFiles into HBase.

Additional Parameters for Bulk Loading with Spark

You can set the following attributes with additional parameter options on hbaseBulkLoad:
  • Max file size of the HFiles
  • A flag to exclude HFiles from compactions
  • Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
The following example shows the use of additional parameters:
Note: Before executing the following example by using Spark Shell, you must create a table in HBase Shell. Run the code in :paste mode.
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.sql.SparkSession
  val tableName = "table2"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    val sc = spark.sparkContext
    @transient val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val hbaseContext = new HBaseContext(sc, conf)
    val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1),
          Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1),
          Bytes.toBytes("b"),
          Bytes.toBytes("foo2.b")))))
    val familyHBaseWriterOptions =
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
    rdd.hbaseBulkLoad(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val family:Array[Byte] = t._2._1
        val qualifier = t._2._2
        val value = t._2._3
        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
        Seq((keyFamilyQualifier, value)).iterator
      },
      stagingFolder,
      familyHBaseWriterOptions,
      compactionExclude = false,
      HConstants.DEFAULT_MAX_FILE_SIZE)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(new Path(stagingFolder),
      connection.getAdmin, table, connection.getRegionLocator(TableName.valueOf(tableName)))

Thin-Record Bulk Load Example

The following example shows how to call the thin-record bulk load implementation:
Note: Before executing the following example by using Spark Shell, you must create a table in HBase Shell. Run the code in :paste mode.
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{HBaseContext, _}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table3"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    @transient val conf = HBaseConfiguration.create()
    val hbaseContext = new HBaseContext(sc, conf)
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val rdd = sc.parallelize(Array(
      ("1",  List(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))))
    rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val familyQualifiersValues = new FamiliesQualifiersValues
        val q = t._2
        val family:Array[Byte] = q.head
        val qualifier = q(1)
        val value:Array[Byte] = q(2)
        println(s"family: $family")
        println(s"qualifier: $qualifier")
        println(s"value: $value")
        familyQualifiersValues +=(family, qualifier, value)
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)}, stagingFolder, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, 20)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(
      new Path(stagingFolder),
      connection.getAdmin,
      table,
      connection.getRegionLocator(TableName.valueOf(tableName)))

The big difference in using bulk load for thin rows is that the function returns a tuple with the first value being the RowKey and the second value being an object of FamiliesQualifiersValues. FamiliesQualifiersValues contains all the values for this row for all column families.