Batched Puts into HBase

The following program illustrates a table load tool, which is a great utility program that can be used for batching puts into an HBase/M7 table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the puts is set to 500 here.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

    static long uniqueSeed = System.currentTimeMillis();
    static long[] count;
    static long[] latency;
    static int[] keySizes;
    public static long printPerNum = 10000;
    public static boolean noCRC = false;
    public static long keySize = 8;
    public static long startRow = 0;
    public static int batchSize = 500;
    public static int preSplit = 1;
    public static boolean flush = false;
    public static boolean autoFlush = false;
    public static final long startKey = 0L;
    public static final long endKey = 999999999999999L;
    public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.94.5/conf/hbase-site.xml";
    public static String ZOOKEEPER_NODES = "localhost";
    public static final Pair <string, string="">ZOOKEEPER_SETTINGS = new Pair<string, string="">(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
                .println("loadMT -rows <100000> -valuesize <100 bytes="">  -debug -path <path></path> -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes <localhost>-AutoFlush -flush");
        System.exit(1);
    }

    public static void main(String[] args) throws java.io.IOException {
        Configuration conf = HBaseConfiguration.create();
        String tableName = null;
        long numRows = 100000;
        long numCF = 1;
        long numC = 1;
        long valueSize = 100;
        int numThreads = 10;
        boolean augment = false;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-rows")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numRows = Long.parseLong(args[i]);
            } else if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                tableName = args[i];
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-valuesize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                valueSize = Long.parseLong(args[i]);
            } else if (args[i].equals("-threads")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numThreads = Integer.parseInt(args[i]);
            } else if (args[i].equals("-p")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                printPerNum = Long.parseLong(args[i]);
            } else if (args[i].equals("-hbase")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                conf.addResource(new Path(args[i]));
            } else if (args[i].equals("-numCF")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numCF = Integer.parseInt(args[i]);
            } else if (args[i].equals("-numC")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numC = Integer.parseInt(args[i]);
            } else if (args[i].equals("-batchSize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                batchSize = Integer.parseInt(args[i]);
            } else if (args[i].equals("-preSplit")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                preSplit = Integer.parseInt(args[i]);
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
            } else if (args[i].equals("-AutoFlush")) {
                autoFlush = true;
            } else if (args[i].equals("-flush")) {
                flush = true;
            } else {
                usage(args[i]);
            }
        }
        if (tableName == null) {
            System.out.println("Must specify path");
            usage("path");
        }
        LoadTableMTBatch lt = new LoadTableMTBatch();
        try {
            LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
                    valueSize, augment);
            lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
                    numThreads);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void generateKeySizes() {
        Random rand = new Random(uniqueSeed);
        keySizes = new int[10];
        keySizes[0] = rand.nextInt(5) + 5;
        keySizes[1] = rand.nextInt(40) + 10;
        keySizes[2] = rand.nextInt(50) + 50;
        keySizes[3] = rand.nextInt(400) + 100;
        keySizes[4] = rand.nextInt(500) + 500;
        keySizes[5] = rand.nextInt(4000) + 1000;
        keySizes[6] = rand.nextInt(5000) + 5000;
        keySizes[7] = rand.nextInt(10000) + 10000;
        keySizes[8] = rand.nextInt(12000) + 20000;
        keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
    }

    public void loadTable(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int numThreads)
            throws Exception {
        Thread[] loadThreads = new Thread[numThreads];
        count = new long[numThreads];
        latency = new long[numThreads];

        if (keySize < 1) {
            generateKeySizes();
        }

        long offset = (endKey - startKey) / numThreads;
        for (int i = 0; i < loadThreads.length; i++) {
            latency[i] = 0;
            if (preSplit <= 1000="" 1)="" {="" loadthreads[i]="new" thread(new="" loadtablerunnable(conf,="" tablename,="" numrows,="" numcf,="" numc,="" valuesize,="" i,="" numthreads,="" batchsize));="" }="" else="" batchsize,="" startkey="" +="" i="" *="" offset,="" ((i="" offset)="" -="" 1));="" for="" (int="" <="" loadthreads.length;="" i++)="" loadthreads[i].start();="" long="" inserts="0," insertsold="0," rate="0," overallrate="0," ta="0," tb="0," t0="0," elapsedtime="0;" averagelatency="0;" minlatency="0;" maxlatency="0;" boolean="" alive="true;" 1;="" while="" (true)="" if="" (loadthreads[i].isalive())="" insertsold)="" (ta="" tb);="" t0);="" elapsedtime;="" min="" max="" average="" latency="" synchronized="" (latency)="" arrays.sort(latency);="" 1];="" latency.length);="" system.out.println("elapsed="" time:="" "="" ";="" inserts:="" current="" sec;="" overall="" batchsize="" 1000000l="" "ms;"="" "ms");="" (!alive)="" break;="" print="" out="" interval="" thread.sleep(1000);="" loadthreads[i].join();="" public="" static="" getsum(long[]="" array)="" sum="0;" (long="" l="" :="" return="" sum;="" void="" createtable(configuration="" conf,="" string="" numcf)="" throws="" exception="" hbaseadmin="" admin="new" hbaseadmin(conf);="" system.out.println("created="" object");="" htabledescriptor="" des="new" htabledescriptor(tablename.getbytes());="" numcf;="" des.addfamily(new="" hcolumndescriptor("f"="" i));="" try="" (presplit="" admin.createtable(des);="" byte[]="" startkeybyte="Bytes.toBytes(startKey);" endkeybyte="Bytes.toBytes(endKey);" admin.createtable(des,="" startkeybyte,="" endkeybyte,="" presplit);="" catch="" (tableexistsexception="" te)="" te.printstacktrace();="" (ioexception="" ie)="" ie.printstacktrace();="" init(configuration="" augment)="" ioexception,="" (augment)="" htable="" intable="new" htable(conf,="" tablename);="" result="" infores="inTable.get(new" get("homerow".getbytes()));="" startrow="inTable.incrementColumnValue("homeRow".getBytes()," "f0".getbytes(),="" "c0".getbytes(),="" numrows)="" numrows;="" numcf="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c1".getbytes()));="" numc="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c2".getbytes()));="" uniqueseed="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c3".getbytes()));="" keysize="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c4".getbytes()));="" createtable(conf,="" numcf);="" put="" info="new" put("homerow".getbytes());="" info.add("f0".getbytes(),="" bytes.tobytes(numrows));="" "c1".getbytes(),="" bytes.tobytes(numcf));="" "c2".getbytes(),="" bytes.tobytes(numc));="" "c3".getbytes(),="" bytes.tobytes(uniqueseed));="" "c4".getbytes(),="" bytes.tobytes(keysize));="" intable.put(info);="" intable.flushcommits();="" load(configuration="" int="" threadnum,="" startkey,="" endkey)="" ioexception="" system.out.println("starting="" load="" thread="" threadnum);="" threadnum="" start="" key="" end="" :"="" endkey);="" family;="" column;="" p="null;" counter="0;" table="null;" random="" rand="new" random(uniqueseed);="" incrementrandom(rand,="" (int)="" startrow);="" endrow="startRow" htable(createhbaseconfiguration(),="" tablename.getbytes());="" table.setautoflush(autoflush);="" startrow;="" endrow;="" byte[][]="" rowkeys="new" byte[batchsize][];="" families="new" columns="new" values="new" batch="0;" batchsize;="" batch++)="" rowkey="new" byte[(int)="" keysize];="" (keysize="" 0)="" randsize="keySizes[rand.nextInt(Integer.MAX_VALUE)" %="" 10];="" numthreads="" 1);="" byte[randsize="" stringbuilder="" keybuilder="new" stringbuilder();="" keybuilder.append(i);="" keybuilder.append(batch);="" createkey(rowkey,="" long.valueof(keybuilder.tostring())="" ^="" uniqueseed);="" rowkeys[batch]="rowKey;" generate="" value="" valuesize];="" fillbuffer(valuesize,="" value,="" batch);="" values[batch]="value;" cf="" c="" family="f" (numcf="" families[batch]="family.getBytes();" column="c" (numc="" columns[batch]="column.getBytes();" list <put="">puts = new ArrayList<put>();
                long startTime = System.nanoTime();
                for (int batch = 0; batch < batchSize; batch++) {
                    p = new Put(rowKeys[batch]);
                    p.add(families[batch], columns[batch], values[batch]);
                    puts.add(p);
                }
                try {
                    table.put(puts);
                    if (flush) {
                        table.flushCommits();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                long endTime = System.nanoTime();
                latency[threadNum] = (endTime - startTime);
                counter += batchSize;
                count[threadNum] = counter;
            }
        } finally {
            if (table != null)
                table.close();
        }
    }

    public static void incrementRandom(Random rand, int num) {
        for (int i = 0; i < num; i++) {
            rand.nextInt();
        }
    }

    public static void createKey(byte[] buffer, long seed) {
        Random rand = new Random(seed);
        CRC32 chksum = new CRC32();
        rand.nextBytes(buffer);
        chksum.update(buffer);
        return;
    }

    public static byte[] createKeyForRegion(byte[] buffer, long startKey,
            long endKey) {

        long key = new LongRandom().nextLong(endKey - startKey);
        buffer = Bytes.toBytes(startKey + key);
        return buffer;
    }

    public static void fillBufferNoCRC(long valueSize, byte[] buffer, int seed) {
        long newSeed = seed + System.currentTimeMillis();
        Random rand = new Random(newSeed);
        rand.nextBytes(buffer);
        return;
    }

    public static long fillBuffer(long valueSize, byte[] buffer, int seed) {
        long newSeed = seed + System.currentTimeMillis();
        Random rand = new Random(newSeed);
        CRC32 chksum = new CRC32();
        rand.nextBytes(buffer);
        chksum.update(buffer);
        return chksum.getValue();
    }

    public static Configuration createHBaseConfiguration() {
        Configuration conf = HBaseConfiguration.create();
        conf.addResource(new Path(HBASE_RESOURCE_NAME));
        conf.set(ZOOKEEPER_SETTINGS.getFirst(), ZOOKEEPER_SETTINGS.getSecond());
        return conf;
    }

    public class LoadTableRunnable implements Runnable {
        private Configuration conf;
        private String tableName;
        private long numRows, numCF, numC, valueSize;
        private int numThreads, threadNum;
        private int batchSize;
        private long startKey, endKey = -1;

        LoadTableRunnable(Configuration conf, String tableName, long numRows,
                long numCF, long numC, long valueSize, int threadNum,
                int numThreads, int batchSize) {
            this.conf = conf;
            this.tableName = tableName;
            this.numRows = numRows;
            this.numCF = numCF;
            this.numC = numC;
            this.valueSize = valueSize;
            this.threadNum = threadNum;
            this.numThreads = numThreads;
            this.batchSize = batchSize;
        }

        LoadTableRunnable(Configuration conf, String tableName, long numRows,
                long numCF, long numC, long valueSize, int threadNum,
                int numThreads, int batchSize, long startKey, long endKey) {
            this.conf = conf;
            this.tableName = tableName;
            this.numRows = numRows;
            this.numCF = numCF;
            this.numC = numC;
            this.valueSize = valueSize;
            this.threadNum = threadNum;
            this.numThreads = numThreads;
            this.batchSize = batchSize;
            this.startKey = startKey;
            this.endKey = endKey;
        }

        public void run() {
            try {
                if (endKey == -1) {
                    LoadTableMTBatch.load(conf, tableName, numRows, numCF,
                            numC, valueSize, threadNum, numThreads, batchSize,
                            0, 0);
                } else {
                    LoadTableMTBatch.load(conf, tableName, numRows, numCF,
                            numC, valueSize, threadNum, numThreads, batchSize,
                            startKey, endKey);
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(-1);
            }
        }

    }

    static class LongRandom extends Random {

        private static final long serialVersionUID = 1L;

        /**
         * Generating a long in the range of 0<=value<=n *="" @param="" n="" @return="" public="" long="" nextlong(long="" n)="" {="" if="" (n="" <="0L)" throw="" new="" illegalargumentexception();="" for="" small="" use="" nextint="" and="" cast="" return="" (long)="" nextint((int)="" n);="" }="" large="" both="" high="" low="" ints="" int="" highlimit="(int)">> 32);
            long high = (long) nextInt(highLimit) << 32;
            long low = ((long) nextInt()) & 0xffffffffL;
            return (high | low);
        }
    }

}</put></put=""></localhost></string,></string,>