Scan Functionality on HBase

This sample program illustrates scan functionality on HBase. The table scan tool shown below is a simple program that can be used to scan for last x seconds of puts that went into the table.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.rest.client.Client;
import org.apache.hadoop.hbase.rest.client.Cluster;
import org.apache.hadoop.hbase.rest.client.RemoteHTable;
import org.apache.hadoop.hbase.util.Bytes;

public class ScanTable {

    // Time range in secs
    public static long timeRange = 10;
    public static byte[] startRow = Bytes.toBytes(LoadTableMTBatch.startKey);
    public static byte[] endRow = Bytes.toBytes(LoadTableMTBatch.endKey);
    public static String path;
    public static String restHost = "localhost";
    public static int restPort = 8080;
    public static boolean rest = false;
    public static String ZOOKEEPER_NODES = "localhost";
    public static Configuration conf = HBaseConfiguration.create();
    public static final Pair <string, string="">ZOOKEEPER_SETTINGS = new Pair<string, string="">(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void main(final String... args) throws IOException {
        loadArgs(args);
        scanTable();
    }

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
                .println("Scan -path <path></path> -startRow -endRow -timeRange<10> -restHost <localhost>-restPort <8080> -zookeeperNodes <localhost>-rest -debug");
        System.exit(1);
    }

    public static void loadArgs(String... args) {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                path = args[i].trim();
            } else if (args[i].equals("-startRow")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                startRow = Bytes.toBytes(args[i].trim());
            } else if (args[i].equals("-endRow")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                endRow = Bytes.toBytes(args[i].trim());
            } else if (args[i].equals("-timeRange")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                timeRange = Long.parseLong(args[i].trim());
            } else if (args[i].equals("-restHost")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                restHost = args[i].trim();
            } else if (args[i].equals("-restPort")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                restPort = Integer.parseInt(args[i].trim());
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
                conf.set(ZOOKEEPER_SETTINGS.getFirst(),
                        ZOOKEEPER_SETTINGS.getSecond());
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-rest")) {
                rest = true;
            } else {
                usage(args[i]);
            }
        }
        if (path == null) {
            System.out.println("Must specify path");
            usage("path");
        }
    }

    public static void scanTable() throws IOException {

        HTable table = new HTable(conf, path);
        ResultScanner scanner = null;
        try {
            while (true) {
                Scan scan = new Scan();
                if (timeRange != -1) {
                    long currentTime = System.currentTimeMillis();
                    scan.setTimeRange(currentTime - (timeRange * 100),
                            System.currentTimeMillis());
                }
                if (rest) {
                    Cluster cluster = new Cluster();
                    cluster.add(restHost, restPort);
                    Client client = new Client(cluster);
                    RemoteHTable restTable = new RemoteHTable(client, path);
                    scanner = restTable.getScanner(scan);
                    long startTime = System.nanoTime();
                    long recs = 0;
                    for (Result res : scanner) {
                        recs++;
                        res = null;
                    }
                    long endTime = System.nanoTime();
                    System.out.println("Rest Scan latency: " + (endTime - startTime)
                            / 1000000 + "ms " + "Record count: " + recs);
                    scanner.close();
                    restTable.close();
                } else {
                    // Key range scan
                    // scan.setStartRow(startRow);
                    // scan.setStopRow(endRow);
                    // System.out.println("Start row " +
                    // Bytes.toString(startRow));
                    // System.out.println("End row " + Bytes.toString(endRow));
                    scanner = table.getScanner(scan);
                    long startTime = System.nanoTime();
                    long recs = 0;
                    for (Result res : scanner) {
                        recs++;
                        res = null;
                    }
                    long endTime = System.nanoTime();
                    System.out.println("Scan latency: " + (endTime - startTime)
                            / 1000000 + "ms " + "Record count: " + recs);
                    scanner.close();
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
            if (table != null)
                table.close();
        }
    }
}</localhost></localhost></string,></string,>