AnsweredAssumed Answered

My hbase coprocessor does not get effected

Question asked by mahdi62b on Dec 4, 2016
Latest reply on Dec 5, 2016 by MichaelSegel

Hi,

 

I am developing a Spark streaming application which create an original hbase table and then tried to add a coprocessor to that..the aim is one update on original table result in adding another row to another table..

 

I am creating the orginal table inside a spark application as follow:

 

def main(args: Array[String]): Unit = {



  val config = HBaseConfiguration.create()
  //val connection = HConnectionManager.createConnection(configuration)
  val admin = new HBaseAdmin(config)
  val path=new org.apache.hadoop.fs.Path("/mapr/cellos-mapr/user/mbazarganigilani/SparkStreaming1/target/scala-2.10/sparkstreaming_2.10-1.0.2.jar")

  if (!admin.tableExists("/tmp/AggReconTable")) {
    val tableDescriptor = new HTableDescriptor(TableName.valueOf("/tmp/AggReconTable"))
    tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("row_key"),300,org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE.toString(),true,true,Int.MaxValue,
      org.apache.hadoop.hbase.regionserver.BloomType.NONE.toString()))

    tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("episodes"),300,org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE.toString(),true,true,Int.MaxValue,
      org.apache.hadoop.hbase.regionserver.BloomType.NONE.toString()))

    tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("nw_data"),1,org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE.toString(),true,true,Int.MaxValue,
      org.apache.hadoop.hbase.regionserver.BloomType.NONE.toString()))

    tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("chg_data"),1,org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE.toString(),true,true,Int.MaxValue,
      org.apache.hadoop.hbase.regionserver.BloomType.NONE.toString()))

    tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("vol_data"),1,org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE.toString(),true,true,Int.MaxValue,
      org.apache.hadoop.hbase.regionserver.BloomType.NONE.toString()))

    tableDescriptor.addCoprocessor(new AccessControlCoprocessor().getClass().getCanonicalName(), path, Coprocessor.PRIORITY_USER, null)

    admin.createTable(tableDescriptor)
  }

...

}

 

 

the coprocessor class is inside my spark application modules

 

import java.io.IOException
import java.util.List
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.Cell
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.client.Durability
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver
import org.apache.hadoop.hbase.coprocessor.ObserverContext
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment
import org.apache.hadoop.hbase.regionserver.wal.WALEdit
import org.apache.hadoop.hbase.util.Bytes

class AccessControlCoprocessor extends BaseRegionObserver {
  @throws[IOException]
  override def postPut(e: ObserverContext[RegionCoprocessorEnvironment], put: Put, edit: WALEdit, durability: Durability) {
    val configuration: Configuration = HBaseConfiguration.create
    val hTable2: HTable = new HTable(configuration, "/tmp/TableCount2")
    val p: Put = new Put(put.get(Bytes.toBytes("episodes"), Bytes.toBytes("last")).get(0).getValue)
    p.add(Bytes.toBytes("cf1"),
      Bytes.toBytes("count"), Bytes.toBytes("1"))
    hTable2.put(p)
    hTable2.close()
  }
}


I do sbt on all project and then add coprocessor as above..but after running my application and there are some updates on
AggReconTable..but the TableCount2 has zero size value..I am sure I am  updating the original tbale but nothing 
is addded to the second table

 Can someone help me figure out the problem?

Outcomes