AnsweredAssumed Answered

Issue regarding streaming kafka topics to MapR HBase

Question asked by sohan on Jun 29, 2018

Hello guys,

 

I am working on a use case where trying to stream the records which are in Kafka topics to MapR HBase. And here Kafka and Spark are installed in my local. After running the spark application records from Kafka are not reaching to HBase which is in the server. And below I post my src code and also the logs of MapR zookeeper and HBase. And when I use the same spark application code in the MapR server I can see records in HBase.

 

val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "***.**.**.**:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "usepddxste_",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicname")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = stream.map(_.value())
lines.print()
lines.foreachRDD{rdd=>
rdd.foreachPartition(iter => {
val hConf = HBaseConfiguration.create()
val tableName = "/user/mapr/samplecustomer4"
hConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
hConf.set("hbase.zookeeper.quorum", "***.**.**.**")
hConf.set("hbase.zookeeper.property.clientPort", "5181")
hConf.set("hbase.rootdir","maprfs:///hbase")
val hTable = new HTable(hConf, "/user/mapr/samplecustomer4")
iter.foreach(record => {
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
val str1 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(0).split(":")(1)
val str2 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(1).split(":")(1)
val str3 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(2).split(":")(1)
val str4 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(3).split(":")(1)
val str5 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(4).split(":")(1)
val str6 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(5).split(":")(1)
val str7 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(6).split(":")(1)
val str8 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(7).split(":")(1)
val str9 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(8).split(":")(1)
val str10 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(9).split(":")(1)
val str11 = record.substring(0, record.length - 1).split("\"pay****\":")(1).split(",")(10).split(":")(1).split("}")(0)
val id_con = str1+"_"+str2+"_"+str3
val id = id_con.toString
val thePut = new Put(Bytes.toBytes(id))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str2))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str3))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str4))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str5))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str6))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str7))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str8))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str9))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str10))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str11))
hTable.put(thePut);
})
})
}
ssc.start()

 

------------------------------------------------------------------------------------------------------------------------------------------------------------

hbase.root.master.log

java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1070)
2018-05-29 17:04:52,804 INFO [mapr60:16000.activeMasterManager-SendThread(mapr60.*****.com:5181)] client.ZooKeeperSaslClient: Client will use MAPR-SECURITY as SASL mechanism.
2018-05-29 17:04:52,804 INFO [mapr60:16000.activeMasterManager-SendThread(mapr60.*******.com:5181)] zookeeper.ClientCnxn: Opening socket connection to server mapr60.hexstream.com/172.**.**.**:5181. Will attempt to SASL-authenticate using Login Context section 'Client'
2018-05-29 17:04:52,805 WARN [mapr60:16000.activeMasterManager-SendThread(mapr60.********.com:5181)] zookeeper.ClientCnxn: Session 0x16399068d4c05bc for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1070)
2018-05-29 17:04:53,726 INFO [mapr60:16000.activeMasterManager-SendThread(mapr60.**********.com:5181)] client.ZooKeeperSaslClient: Client will use MAPR-SECURITY as SASL mechanism.
2018-05-29 17:04:53,727 INFO [mapr60:16000.activeMasterManager-SendThread(mapr60.************.com:5181)] zookeeper.ClientCnxn: Opening socket connection to server mapr60.***************.com/172.21.7.101:5181. Will attempt to SASL-authenticate using Login Context section 'Client'
2018-05-29 17:04:53,727 WARN [mapr60:16000.activeMasterManager-SendThread(mapr60.***********.com:5181)] zookeeper.ClientCnxn: Session 0x16399068d4c05bb for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1070)

 

---------------------------------------------------------------------------------------------------------------------------------------------------------------

zookeeper.log

 

2018-06-09 16:22:29,248 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:5181:ZooKeeperServer@935] - adding SASL authorization for authorizationID: mapr
2018-06-09 16:22:29,249 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x163b6b3e73f0b23 type:create cxid:0x6 zxid:0x1d7c9d txntype:-1 reqpath:n/a Error Path:/services Error:KeeperErrorCode = NodeExists for /services
2018-06-09 16:22:29,250 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x163b6b3e73f0b23 type:create cxid:0x7 zxid:0x1d7c9e txntype:-1 reqpath:n/a Error Path:/services/elasticsearch Error:KeeperErrorCode = NodeExists for /services/elasticsearch
2018-06-09 16:22:30,834 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:5181:NIOServerCnxnFactory@197] - Accepted socket connection from /172.**.**.**:20392
2018-06-09 16:22:30,835 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:5181:ZooKeeperServer@839] - Client attempting to establish new session at /172.**.**.**:20392
2018-06-09 16:22:30,836 [myid:] - INFO [SyncThread:0:ZooKeeperServer@595] - Established session 0x163b6b3e73f0b24 with negotiated timeout 30000 for client /172**.**.**:20392
2018-06-09 16:22:30,837 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:5181:ZooKeeperServer@935] - adding SASL authorization for authorizationID: mapr.

Outcomes