Programmatic ways to access Mapr DB using HTable in a multi threaded application

Document created by snayeem Employee on Feb 27, 2016
Version 1Show Document
  • View in full screen mode

Author: Sanjamala Nayeem

Original Publication Date: April 3, 2015

 

Env:

  • MapR 4.0.1
  • Hbase 0.98
  • JDK 6/7

Symptom:

As Hbase api are used to access MaprDB, in general programmer get confused how to access MaprDB in multi threaded program.

Goal:

This Article is to show how to handle HTable API to access MaprDB.

Solutions:

HTable is a client side resource just like any Java stream and MUST be closed once it is no longer needed. There is no way around it.An HTable instance created over a MapR-DB table can be safely shared among multiple threads. So in this example, if all the threads were working on a single table instead of separate one of their own, you could create the HTable instance in the main method and hand it to each of the threads to perform various operations.

 

Basic elements of MaprDB access Now let’s go through the steps of accessing an MaprDB table from a java program.

  • Hadoop has a generic configuration object which we will use to construct the HBase configuration object. So first import the two Configuration classes and then create HBaseConfiguration object to tell your program (client) where to connect. The HBaseConfiguration reads hbase-site.xml on CLASSPATH and factors in hbase-default.xml if found (hbase-default.xml ships inside the hbase.X.X.X.jar).
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration;
  • Now create a conf object
Configuration conf = HBaseConfiguration.create();
  • Create an HTable object
HTable  table = new HTable(conf_, myTable_);
  • Then insert data using Put using the HBase table object. This class is used to insert row and can be instantiated with row key and add column family values.
Put put = new Put((getName() + "_row_" + i).getBytes()); 
put.add(FAMILY, (myTable_+"col").getBytes(), (getId() + "_value_" + i).getBytes());
table.put(put);

Sample to access diffrent table in multiple threads. each thread accessing different table.

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

 

public class TableFactoryDemo {

  private static TableFactoryDemo instance_ = null;

  private static Configuration conf_;

  private Map<String, HTable> tableMap_;

 

  static {

  instance_ = new TableFactoryDemo();

  }

 

  private TableFactoryDemo() {

  try {

  conf_ = HBaseConfiguration.create();

  tableMap_ = new HashMap<String, HTable>();

  } catch (Exception ex) {

  System.out.println("in exception while initilization ");

  }

  }

 

  static byte[] FAMILY = "f".getBytes();

 

  static class MyThreads extends Thread {

  String myTable_;

  MyThreads(String myTable) {

  myTable_ = myTable;

  }

 

  @Override

  public void run() {

  try {

  HTable table = new HTable(conf_, myTable_);

  for (int i = 0; i < 1000; i++) {

  Put put = new Put((getName() + "_row_" + i).getBytes());

  put.add(FAMILY, (myTable_+"col").getBytes(), (getId() + "_value_" + i).getBytes());

  table.put(put);

  }

  table.close();

  } catch (IOException e) {

  

  e.printStackTrace();

  }

  }

  }

 

  // run this with number of tables followed by table name

  // that each thread should access

  public static void main(String[] args) throws InterruptedException {

  int numThreads = 0;

  if (args.length < 2

  || (numThreads = Integer.valueOf(args[0])) != args.length-1) {

  usage();

  }

 

  MyThreads threads[] = new MyThreads[numThreads];

  for (int i = 0; i < threads.length; i++) {

  threads[i] = new MyThreads(args[i+1]);

  threads[i].start();

  }

 

  

  }

 

  private static void usage() {

  System.err.println("Usage:\nTableFactoryDemo <numThread> <tablesNames>...");

  System.exit(1);

  }

}

  • Above sample different table is passed at the time of construction of Thread table names are passed as arguments to program and first argument is number of thread from second onwards it is table names, eg if number of threads is 2 you need to pass 2 table names total three arguments
MyThreads threads[] = new MyThreads[numThreads]; 
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThreads(args[i+1]);
threads[i].start();

Sample to access single table in multiple threads, each thread accessing same table.

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

 

public class TableMultiThreadSample {

private static TableMultiThreadSample instance_ = null;

private Configuration conf_;

private Map<String, HTable> tableMap_;

 

static {

instance_ = new TableMultiThreadSample();

}

 

private TableMultiThreadSample() {

try {

conf_ = HBaseConfiguration.create();

tableMap_ = new HashMap<String, HTable>();

} catch (Exception ex) {

System.out.println("in exception while initilization ");

}

}

 

public static HTable getHTable(String tableName) throws IOException {

return instance_.getHTableInternal(tableName);

}

 

public static void close() {

instance_.closeInternal();

}

 

private void closeInternal() {

for (HTable table : tableMap_.values()) {

try {

table.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

 

private HTable getHTableInternal(String tableName) throws IOException {

HTable table = tableMap_.get(tableName);

if (table == null) {

synchronized (this) {

table = tableMap_.get(tableName);

if (table == null) {

table = new HTable(conf_, tableName);

tableMap_.put(tableName, table);

}

}

}

return table;

}

 

static byte[] FAMILY = "f".getBytes();

 

static class MyThreads extends Thread {

String myTable_;

MyThreads(String myTable) {

myTable_ = myTable;

}

 

@Override

public void run() {

try {

HTable table = TableMultiThreadSample.getHTable(myTable_);

for (int i = 0; i < 1000; i++) {

Put put = new Put((getName() + "_row_" + i).getBytes());

put.add(FAMILY, (myTable_+"col").getBytes(), (getId() + "_value_" + i).getBytes());

table.put(put);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

 

// run this with number of tables followed by table name

// that each thread should access

public static void main(String[] args) throws InterruptedException {

int numThreads = 0;

if (args.length != 2 ) {

usage();

}

 

numThreads = Integer.valueOf(args[0]);

MyThreads threads[] = new MyThreads[numThreads];

for (int i = 0; i < threads.length; i++) {

threads[i] = new MyThreads(args[1]);

threads[i].start();

}

 

for (int i = 0; i < threads.length; i++) {

threads[i].join();

}

 

TableMultiThreadSample.close();

}

 

private static void usage() {

System.err.println("Usage:\nTableFactoryDemo <numThread> <tablesName>");

System.exit(1);

}

}

  • Above sample single table name is been passed to all threads, name of the table will be second argument first argument will be number of threads. at any point there will be only two arguments.

numThreads = Integer.valueOf(args[0]);

MyThreads threads[] = new MyThreads[numThreads];

for (int i = 0; i < threads.length; i++) {

threads[i] = new MyThreads(args[1]);

threads[i].start();

}

Attachments

    Outcomes