How to Use a Table Load Tool to Batch Puts into HBase/MapR Database

Contributed by

8 min read

Apache HBase is an in-Hadoop database that delivers wide-column schema flexibility with strongly consistent reads and writes. Clients can access HBase data through either a native Java API, a Thrift or REST gateway, or now through a C API, making it very easy to access data. MapR Database, yet another in-Hadoop database has the same HBase APIs, but provides enterprise-grade features for production deployments of HBase applications.

Put, Get and Scan are some of the prominent programming APIs that get used in the context of HBase applications. For certain write-heavy workloads, Put operations can get slow, so batching these Put operations is a commonly used technique to increase the overall throughput of the system. The following program illustrates a table load tool, which is a great utility program that can be used for batching Puts into an HBase/MapR Database table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the Puts is set to 500 in this example.

To get free training on HBase including Data Modeling & Architecture and Schema Design, please visit the Free Hadoop On Demand Training pages.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;

import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

    static long uniqueSeed = System.currentTimeMillis();
    static long[] count;
    static long[] latency;
    static int[] keySizes;
    public static long printPerNum = 10000;
    public static boolean noCRC = false;
    public static long keySize = 8;
    public static long startRow = 0;
    public static int batchSize = 500;
    public static int preSplit = 1; //Used as a demo - Not accurated key distribution
    public static boolean flush = false;
    public static boolean autoFlush = false;
        public static final String KEY_PREFIX="user";
    public static final long startKey = 0L;
    public static final long endKey = 999999999999999L;
    public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.98.9/conf/hbase-site.xml";
    public static String ZOOKEEPER_NODES = "localhost"; //Default to localhost, only needed for accessing HBase
    public static final Pair ZOOKEEPER_SETTINGS = new Pair(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
                .println("loadMT -rows <100000> -valuesize <100 bytes="">  -debug -path  -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes  -AutoFlush -flush");
        System.exit(1);
    }

    public static void main(String[] args) throws java.io.IOException {
        Configuration conf = HBaseConfiguration.create();
        String tableName = null;
        long numRows = 100000;
        long numCF = 1;
        long numC = 1;
        long valueSize = 100;
        int numThreads = 10;
        boolean augment = false;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-rows")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numRows = Long.parseLong(args[i]);
            } else if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                tableName = args[i];
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-valuesize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                valueSize = Long.parseLong(args[i]);
            } else if (args[i].equals("-threads")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numThreads = Integer.parseInt(args[i]);
            } else if (args[i].equals("-p")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                printPerNum = Long.parseLong(args[i]);
            } else if (args[i].equals("-hbase")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                conf.addResource(new Path(args[i]));
            } else if (args[i].equals("-numCF")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numCF = Integer.parseInt(args[i]);
            } else if (args[i].equals("-numC")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numC = Integer.parseInt(args[i]);
            } else if (args[i].equals("-batchSize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                batchSize = Integer.parseInt(args[i]);
            } else if (args[i].equals("-preSplit")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                preSplit = Integer.parseInt(args[i]);
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
            } else if (args[i].equals("-AutoFlush")) {
                autoFlush = true;
            } else if (args[i].equals("-flush")) {
                flush = true;
            } else {
                usage(args[i]);
            }
        }
        if (tableName == null) {
            System.out.println("Must specify path");
            usage("path");
        }
        LoadTableMTBatch lt = new LoadTableMTBatch();
        try {
            LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
                    valueSize, augment);
            lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
                    numThreads);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void generateKeySizes() {
        Random rand = new Random(uniqueSeed);
        keySizes = new int[10];
        keySizes[0] = rand.nextInt(5) + 5;
        keySizes[1] = rand.nextInt(40) + 10;
        keySizes[2] = rand.nextInt(50) + 50;
        keySizes[3] = rand.nextInt(400) + 100;
        keySizes[4] = rand.nextInt(500) + 500;
        keySizes[5] = rand.nextInt(4000) + 1000;
        keySizes[6] = rand.nextInt(5000) + 5000;
        keySizes[7] = rand.nextInt(10000) + 10000;
        keySizes[8] = rand.nextInt(12000) + 20000;
        keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
    }

    public void loadTable(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int numThreads)
            throws Exception {
        Thread[] loadThreads = new Thread[numThreads];
        count = new long[numThreads];
        latency = new long[numThreads];

        if (keySize < 1) {
            generateKeySizes();
        }

        long offset = (endKey - startKey) / numThreads;
        for (int i = 0; i < loadThreads.length; i++) {
            latency[i] = 0;
            if (preSplit <= 1000="" 1)="" {="" loadthreads[i]="new" thread(new="" loadtablerunnable(conf,="" tablename,="" numrows,="" numcf,="" numc,="" valuesize,="" i,="" numthreads,="" batchsize));="" }="" else="" batchsize,="" startkey="" +="" i="" *="" offset,="" ((i="" offset)="" -="" 1));="" for="" (int="" <="" loadthreads.length;="" i++)="" loadthreads[i].start();="" long="" inserts="0," insertsold="0," rate="0," overallrate="0," ta="0," tb="0," t0="0," elapsedtime="0;" averagelatency="0;" minlatency="0;" maxlatency="0;" boolean="" alive="true;" 1;="" while="" (true)="" if="" (loadthreads[i].isalive())="" insertsold)="" (ta="" tb);="" t0);="" elapsedtime;="" min="" max="" average="" latency="" synchronized="" (latency)="" arrays.sort(latency);="" 1];="" latency.length);="" system.out.println("elapsed="" time:="" "="" ";="" inserts:="" current="" sec;="" overall="" batchsize="" 1000000l="" "ms;"="" "ms");="" (!alive)="" break;="" print="" out="" interval="" thread.sleep(1000);="" loadthreads[i].join();="" public="" static="" getsum(long[]="" array)="" sum="0;" (long="" l="" :="" return="" sum;="" void="" createtable(configuration="" conf,="" string="" numcf)="" throws="" exception="" hbaseadmin="" admin="new" hbaseadmin(conf);="" system.out.println("created="" object");="" htabledescriptor="" des="new" htabledescriptor(tablename.getbytes());="" numcf;="" des.addfamily(new="" hcolumndescriptor("f"="" i));="" try="" (presplit="" admin.createtable(des);="" byte[]="" startkeybyte="Bytes.toBytes(KEY_PREFIX+startKey);" endkeybyte="Bytes.toBytes(KEY_PREFIX+endKey);" admin.createtable(des,="" startkeybyte,="" endkeybyte,="" presplit);="" catch="" (tableexistsexception="" te)="" te.printstacktrace();="" (ioexception="" ie)="" ie.printstacktrace();="" init(configuration="" augment)="" ioexception,="" (augment)="" htable="" intable="new" htable(conf,="" tablename);="" result="" infores="inTable.get(new" get("homerow".getbytes()));="" startrow="inTable.incrementColumnValue("homeRow".getBytes()," "f0".getbytes(),="" "c0".getbytes(),="" numrows)="" numrows;="" numcf="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c1".getbytes()));="" numc="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c2".getbytes()));="" uniqueseed="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c3".getbytes()));="" keysize="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c4".getbytes()));="" createtable(conf,="" numcf);="" put="" info="new" put("homerow".getbytes());="" info.add("f0".getbytes(),="" bytes.tobytes(numrows));="" "c1".getbytes(),="" bytes.tobytes(numcf));="" "c2".getbytes(),="" bytes.tobytes(numc));="" "c3".getbytes(),="" bytes.tobytes(uniqueseed));="" "c4".getbytes(),="" bytes.tobytes(keysize));="" intable.put(info);="" intable.flushcommits();="" load(configuration="" int="" threadnum,="" startkey,="" endkey)="" ioexception="" system.out.println("starting="" load="" thread="" threadnum);="" threadnum="" start="" key="" (key_prefix="" startkey)="" end="" :"="" endkey));="" family;="" column;="" p="null;" counter="0;" table="null;" random="" rand="new" random(uniqueseed);="" incrementrandom(rand,="" (int)="" startrow);="" endrow="startRow" htable(createhbaseconfiguration(),="" tablename.getbytes());="" table.setautoflush(autoflush);="" startrow;="" endrow;="" byte[][]="" rowkeys="new" byte[batchsize][];="" families="new" columns="new" values="new" batch="0;" batchsize;="" batch++)="" rowkey="new" byte[(int)="" keysize];="" (keysize="" 0)="" randsize="keySizes[rand.nextInt(Integer.MAX_VALUE)" %="" 10];="" numthreads="" 1);="" byte[randsize="" stringbuilder="" keybuilder="new" stringbuilder();="" keybuilder.append(i);="" keybuilder.append(batch);="" createkey(rowkey,="" long.valueof(keybuilder.tostring())="" ^="" uniqueseed);="" rowkeys[batch]="rowKey;" generate="" endkey);="" value="" valuesize];="" fillbuffer(valuesize,="" value,="" batch);="" values[batch]="value;" cf="" c="" family="f" (numcf="" families[batch]="family.getBytes();" column="c" (numc="" columns[batch]="column.getBytes();" list="" puts="new" arraylist();="" starttime="System.nanoTime();" put(rowkeys[batch]);="" p.add(families[batch],="" columns[batch],="" values[batch]);="" puts.add(p);="" table.put(puts);="" (flush)="" table.flushcommits();="" (exception="" e)="" e.printstacktrace();="" endtime="System.nanoTime();" latency[threadnum]="(endTime" starttime);="" count[threadnum]="counter;" finally="" (table="" !="null)" table.close();="" incrementrandom(random="" rand,="" num)="" num;="" rand.nextint();="" createkey(byte[]="" buffer,="" seed)="" random(seed);="" crc32="" chksum="new" crc32();="" rand.nextbytes(buffer);="" chksum.update(buffer);="" return;="" createkeyforregion(byte[]="" longrandom().nextlong(endkey="" startkey);="" buffer="Bytes.toBytes(KEY_PREFIX" key);="" buffer;="" fillbuffernocrc(long="" newseed="seed" system.currenttimemillis();="" random(newseed);="" fillbuffer(long="" chksum.getvalue();="" configuration="" createhbaseconfiguration()="" conf="HBaseConfiguration.create();" conf.addresource(new="" path(hbase_resource_name));="" conf.set((string)zookeeper_settings.getfirst(),="" (string)="" zookeeper_settings.getsecond());="" conf;="" class="" loadtablerunnable="" implements="" runnable="" private="" tablename;="" valuesize;="" threadnum;="" endkey="-1;" loadtablerunnable(configuration="" batchsize)="" this.conf="conf;" this.tablename="tableName;" this.numrows="numRows;" this.numcf="numCF;" this.numc="numC;" this.valuesize="valueSize;" this.threadnum="threadNum;" this.numthreads="numThreads;" this.batchsize="batchSize;" this.startkey="startKey;" this.endkey="endKey;" run()="" (endkey="=" -1)="" loadtablemtbatch.load(conf,="" 0,="" 0);="" system.exit(-1);="" longrandom="" extends="" final="" serialversionuid="1L;" **="" generating="" a="" in="" the="" range="" of="" 0<="value<=n" @param="" n="" @return="" nextlong(long="" n)="" (n="" throw="" new="" illegalargumentexception();="" small="" use="" nextint="" and="" cast="" (long)="" nextint((int)="" n);="" large="" both="" high="" low="" ints="" highlimit="(int)">> 32);
            long high = (long) nextInt(highLimit) << 32;
            long low = ((long) nextInt()) & 0xffffffffL;
            return (high | low);
        }
    }

}


This blog post was published April 22, 2015.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now