maprcommunity

How to Use a Table Load Tool to Batch Puts into HBase/MapR-DB

Blog Post created by maprcommunity Employee on Jan 18, 2017

How to Use a Table Load Tool to Batch Puts into HBase/MapR-DB 

by Terry He

 

Apache HBase is an in-Hadoop database that delivers wide-column schema flexibility with strongly consistent reads and writes. Clients can access HBase data through either a native Java API, a Thrift or REST gateway, or now through a C API, making it very easy to access data. MapR-DB, yet another in-Hadoop database has the same HBase APIs, but provides enterprise-grade features for production deployments of HBase applications.

Put, Get and Scan are some of the prominent programming APIs that get used in the context of HBase applications. For certain write-heavy workloads, Put operations can get slow, so batching these Put operations is a commonly used technique to increase the overall throughput of the system. The following program illustrates a table load tool, which is a great utility program that can be used for batching Puts into an HBase/MapR-DB table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the Puts is set to 500 in this example.

To get free training on HBase including Data Modeling & Architecture and Schema Design, please visit the Free Hadoop On Demand Training pages.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;


import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

    static long uniqueSeed = System.currentTimeMillis();
    static long[] count;
    static long[] latency;
    static int[] keySizes;
    public static long printPerNum = 10000;
    public static boolean noCRC = false;
    public static long keySize = 8;
    public static long startRow = 0;
    public static int batchSize = 500;
    public static int preSplit = 1; //Used as a demo - Not accurated key distribution
    public static boolean flush = false;
    public static boolean autoFlush = false;
        public static final String KEY_PREFIX="user";
    public static final long startKey = 0L;
    public static final long endKey = 999999999999999L;
    public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.98.9/conf/hbase-site.xml";
    public static String ZOOKEEPER_NODES = "localhost"; //Default to localhost, only needed for accessing HBase
    public static final Pair ZOOKEEPER_SETTINGS = new Pair(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
             .println("loadMT -rows <100000> -valuesize <100 bytes> -debug -path -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes -AutoFlush -flush");
        System.exit(1);
     }

   public static void main(String[] args) throws java.io.IOException {
        Configuration conf = HBaseConfiguration.create();
        String tableName = null;
        long numRows = 100000;
        long numCF = 1;
        long numC = 1;
        long valueSize = 100;
        int numThreads = 10;
        boolean augment = false;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-rows")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numRows = Long.parseLong(args[i]);
            } else if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                tableName = args[i];
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-valuesize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                valueSize = Long.parseLong(args[i]);
            } else if (args[i].equals("-threads")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numThreads = Integer.parseInt(args[i]);
            } else if (args[i].equals("-p")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                printPerNum = Long.parseLong(args[i]);
            } else if (args[i].equals("-hbase")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                conf.addResource(new Path(args[i]));
            } else if (args[i].equals("-numCF")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numCF = Integer.parseInt(args[i]);
            } else if (args[i].equals("-numC")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numC = Integer.parseInt(args[i]);
            } else if (args[i].equals("-batchSize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                batchSize = Integer.parseInt(args[i]);
            } else if (args[i].equals("-preSplit")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                preSplit = Integer.parseInt(args[i]);
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
            } else if (args[i].equals("-AutoFlush")) {
                autoFlush = true;
            } else if (args[i].equals("-flush")) {
                flush = true;
            } else {
                usage(args[i]);
            }
        }
        if (tableName == null) {
            System.out.println("Must specify path");
            usage("path");
        }
        LoadTableMTBatch lt = new LoadTableMTBatch();
        try {
            LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
                    valueSize, augment);
            lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
                    numThreads);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
    public void generateKeySizes() {
        Random rand = new Random(uniqueSeed);
        keySizes = new int[10];
        keySizes[0] = rand.nextInt(5) + 5;
        keySizes[1] = rand.nextInt(40) + 10;
        keySizes[2] = rand.nextInt(50) + 50;
        keySizes[3] = rand.nextInt(400) + 100;
        keySizes[4] = rand.nextInt(500) + 500;
        keySizes[5] = rand.nextInt(4000) + 1000;
        keySizes[6] = rand.nextInt(5000) + 5000;
        keySizes[7] = rand.nextInt(10000) + 10000;
        keySizes[8] = rand.nextInt(12000) + 20000;
        keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
    }
    public void loadTable(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int numThreads)
            throws Exception {
        Thread[] loadThreads = new Thread[numThreads];
        count = new long[numThreads];
        latency = new long[numThreads];

        if (keySize < 1) {
            generateKeySizes();
        }

        long offset = (endKey - startKey) / numThreads;
        for (int i = 0; i < loadThreads.length; i++) {
            latency[i] = 0;
            if (preSplit <= 1) {
                 loadThreads[i] = new Thread(new LoadTableRunnable(conf,
                        tableName, numRows, numCF, numC, valueSize, i,
                        numThreads, batchSize));
            } else {
                loadThreads[i] = new Thread(new LoadTableRunnable(conf,
                        tableName, numRows, numCF, numC, valueSize, i,
                        numThreads, batchSize, startKey + i * offset, startKey
                                + ((i + 1) * offset) - 1));
            }
        }
        for (int i = 0; i < loadThreads.length; i++) {
            loadThreads[i].start();
        }
        long inserts = 0, insertsOld = 0, rate = 0, overallRate = 0, tA = 0, tB = 0, t0 = 0, elapsedTime = 0;
        long averageLatency = 0;
        long minLatency = 0;
        long maxLatency = 0;
        boolean alive = true;
        t0 = System.currentTimeMillis() - 1;
        tA = t0;
        tB = t0;
        while (true) {
            insertsOld = inserts;
            inserts = 0;
            tB = tA;
            tA = System.currentTimeMillis();
            alive = false;
            for (int i = 0; i < loadThreads.length; i++) {
                inserts += count[i];
                if (loadThreads[i].isAlive())
                    alive = true;
            }
            rate = (inserts - insertsOld) * 1000 / (tA - tB);
            elapsedTime = (tA - t0);
            overallRate = inserts * 1000 / elapsedTime;
            // Min/Max/Average latency
            synchronized (latency) {
                Arrays.sort(latency);
                minLatency = latency[0];
                maxLatency = latency[numThreads - 1];
                averageLatency = (getSum(latency) / latency.length);
            }
            System.out.println("Elapsed time: " + elapsedTime / 1000
                    + "; Inserts: " + inserts + "; current rate " + rate
                    + " inserts/sec; overall rate " + overallRate
                    + " inserts/sec; BatchSize " + batchSize + "; Min Latency "
                    + minLatency / 1000000L + "ms;" + " Max Latency "
                    + maxLatency / 1000000L + "ms;" + " Average Latency "
                    + averageLatency / 1000000L + "ms");
            if (!alive)
                break;
            // Print out interval
            Thread.sleep(1000);
        }
        for (int i = 0; i < loadThreads.length; i++) {
            loadThreads[i].join();
        }
    }

    public static long getSum(long[] array) {
        long sum = 0;
        for (long l : array) {
            sum += l;
        }
        return sum;
    }
    public static void createTable(Configuration conf, String tableName,
            long numCF) throws Exception {
        HBaseAdmin admin = new HBaseAdmin(conf);
        System.out.println("created admin object");
        HTableDescriptor des = new HTableDescriptor(tableName.getBytes());
        for (int i = 0; i < numCF; i++) {
            des.addFamily(new HColumnDescriptor("f" + i));
        }
        try {
            if (preSplit <= 1)
                admin.createTable(des);
            else {
                byte[] startKeyByte = Bytes.toBytes(KEY_PREFIX+startKey);
                byte[] endKeyByte = Bytes.toBytes(KEY_PREFIX+endKey);
                admin.createTable(des, startKeyByte, endKeyByte, preSplit);
            }
        } catch (TableExistsException te) {
            te.printStackTrace();
        } catch (IOException ie) {
            ie.printStackTrace();
        }
    }
    public static void init(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, boolean augment)
            throws IOException, Exception {
        if (augment) {
            HTable inTable = new HTable(conf, tableName);
            Result infoRes = inTable.get(new Get("homeRow".getBytes()));
            startRow = inTable.incrementColumnValue("homeRow".getBytes(),
                    "f0".getBytes(), "c0".getBytes(), numRows)
                    - numRows;
            numCF = Bytes.toLong(infoRes.getValue("f0".getBytes(),
                    "c1".getBytes()));
            numC = Bytes.toLong(infoRes.getValue("f0".getBytes(),
                    "c2".getBytes()));
            uniqueSeed = Bytes.toLong(infoRes.getValue("f0".getBytes(),
                    "c3".getBytes()));
            keySize = Bytes.toLong(infoRes.getValue("f0".getBytes(),
                    "c4".getBytes()));
        } else {
            createTable(conf, tableName, numCF);
            HTable inTable = new HTable(conf, tableName);
            Put info = new Put("homeRow".getBytes());
            info.add("f0".getBytes(), "c0".getBytes(), Bytes.toBytes(numRows));
            info.add("f0".getBytes(), "c1".getBytes(), Bytes.toBytes(numCF));
            info.add("f0".getBytes(), "c2".getBytes(), Bytes.toBytes(numC));
            info.add("f0".getBytes(), "c3".getBytes(),
            Bytes.toBytes(uniqueSeed));
                   info.add("f0".getBytes(), "c4".getBytes(), Bytes.toBytes(keySize));
            inTable.put(info);
            inTable.flushCommits();
       }
    }
    public static void load(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int threadNum,
            int numThreads, int batchSize, long startKey, long endKey)
            throws IOException {

        if (preSplit <= 1)
            System.out.println("Starting load thread " + threadNum);
        else
            System.out.println("Starting load thread " + threadNum
                    + " start key : " + (KEY_PREFIX + startKey) + "; end key :" + (KEY_PREFIX + endKey));
        String family;
        String column;
        Put p = null;
        long counter = 0;
        HTable table = null;
        Random rand = new Random(uniqueSeed);
        incrementRandom(rand, (int) startRow);
        incrementRandom(rand, threadNum);
        long endRow = startRow + numRows;

        try {
            table = new HTable(createHBaseConfiguration(), tableName.getBytes());
            table.setAutoFlush(autoFlush);
            for (int i = threadNum + (int) startRow; i < endRow; i += numThreads) {
                byte[][] rowKeys = new byte[batchSize][];
                byte[][] families = new byte[batchSize][];
                byte[][] columns = new byte[batchSize][];
                byte[][] values = new byte[batchSize][];

                for (int batch = 0; batch < batchSize; batch++) {
                    // Key
                    byte[] rowKey = new byte[(int) keySize];
                    if (keySize < 0) {
                        int randSize = keySizes[rand.nextInt(Integer.MAX_VALUE) % 10];
                        incrementRandom(rand, numThreads - 1);
                        rowKey = new byte[randSize + 1];
                    }
                    if (preSplit <= 1) {
                        StringBuilder keyBuilder = new StringBuilder();
                        keyBuilder.append(i);
                        keyBuilder.append(batch);
                        createKey(rowKey, Long.valueOf(keyBuilder.toString())
                                ^ uniqueSeed);
                        rowKeys[batch] = rowKey;
                    } else {
                        // Generate random long key
                        rowKey = createKeyForRegion(rowKey, startKey, endKey);
                        rowKeys[batch] = rowKey;
                    }
                    // Value
                    byte[] value = new byte[(int) valueSize];
                    fillBuffer(valueSize, value, batch);
                    values[batch] = value;
                    // CF + C
                    family = "f" + (numCF - 1);
                    families[batch] = family.getBytes();
                    column = "c" + (numC - 1);
                    columns[batch] = column.getBytes();
                }

                List puts = new ArrayList();
                long startTime = System.nanoTime();
                for (int batch = 0; batch < batchSize; batch++) {
                    p = new Put(rowKeys[batch]);
                    p.add(families[batch], columns[batch], values[batch]);
                    puts.add(p);
                }
                try {
                    table.put(puts);
                    if (flush) {
                        table.flushCommits();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                long endTime = System.nanoTime();
                latency[threadNum] = (endTime - startTime);
                counter += batchSize;
                count[threadNum] = counter;
           }  
       } finally {
           if (table != null)
               table.close();
       }
    }

    public static void incrementRandom(Random rand, int num) {
        for (int i = 0; i < num; i++) {
            rand.nextInt();
        }
    }

    public static void createKey(byte[] buffer, long seed) {
        Random rand = new Random(seed);
        CRC32 chksum = new CRC32();
        rand.nextBytes(buffer);
        chksum.update(buffer);
        return;
    }

    public static byte[] createKeyForRegion(byte[] buffer, long startKey,
            long endKey) {

        long key = new LongRandom().nextLong(endKey - startKey);
        buffer = Bytes.toBytes(KEY_PREFIX + startKey + key);
        return buffer;
    }

    public static void fillBufferNoCRC(long valueSize, byte[] buffer, int seed) {
        long newSeed = seed + System.currentTimeMillis();
        Random rand = new Random(newSeed);
        rand.nextBytes(buffer);
        return;
    }

    public static long fillBuffer(long valueSize, byte[] buffer, int seed) {
        long newSeed = seed + System.currentTimeMillis();
        Random rand = new Random(newSeed);
        CRC32 chksum = new CRC32();
        rand.nextBytes(buffer);
        chksum.update(buffer);
        return chksum.getValue();
    }

    public static Configuration createHBaseConfiguration() {
        Configuration conf = HBaseConfiguration.create();
        conf.addResource(new Path(HBASE_RESOURCE_NAME));
        conf.set((String)ZOOKEEPER_SETTINGS.getFirst(), (String) ZOOKEEPER_SETTINGS.getSecond());
        return conf;
    }
    public class LoadTableRunnable implements Runnable {
        private Configuration conf;
        private String tableName;
        private long numRows, numCF, numC, valueSize;
        private int numThreads, threadNum;
        private int batchSize;
        private long startKey, endKey = -1;

        LoadTableRunnable(Configuration conf, String tableName, long numRows,
                long numCF, long numC, long valueSize, int threadNum,
                int numThreads, int batchSize) {
            this.conf = conf;
            this.tableName = tableName;
            this.numRows = numRows;
            this.numCF = numCF;
            this.numC = numC;
            this.valueSize = valueSize;
            this.threadNum = threadNum;
            this.numThreads = numThreads;
            this.batchSize = batchSize;
        }

        LoadTableRunnable(Configuration conf, String tableName, long numRows,
                long numCF, long numC, long valueSize, int threadNum,
                int numThreads, int batchSize, long startKey, long endKey) {
            this.conf = conf;
            this.tableName = tableName;
            this.numRows = numRows;
            this.numCF = numCF;
            this.numC = numC;
            this.valueSize = valueSize;
            this.threadNum = threadNum;
            this.numThreads = numThreads;
            this.batchSize = batchSize;
            this.startKey = startKey;
            this.endKey = endKey;
        }

        public void run() {
            try {
                if (endKey == -1) {
                    LoadTableMTBatch.load(conf, tableName, numRows, numCF,
                        numC, valueSize, threadNum, numThreads, batchSize,
                        0, 0);
                } else {
                    LoadTableMTBatch.load(conf, tableName, numRows, numCF,
                        numC, valueSize, threadNum, numThreads, batchSize,
                        startKey, endKey);
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(-1);
            }
        }

    }
    static class LongRandom extends Random {

        private static final long serialVersionUID = 1L;

        /**
        * Generating a long in the range of 0<=value<=n
        *
        * @param n
        * @return
        */
        public long nextLong(long n) {
            if (n <= 0L)
                throw new IllegalArgumentException();

            // for small n use nextInt and cast
            if (n <= Integer.MAX_VALUE) {
                    return (long) nextInt((int) n);
            }

            // for large n use nextInt for both high and low ints
            int highLimit = (int) (n >> 32);
            long high = (long) nextInt(highLimit) << 32;
            long low = ((long) nextInt()) & 0xffffffffL;
            return (high | low);
        }
    }

}

Related Content

Visit: mapr-db hbase 

 

Content Originally posted in MapR Converge Blog post, visit here

Subscribe to Converge Blog

Outcomes