AnsweredAssumed Answered

kafka.common.ConsumerRebalanceFailedException

Question asked by karthikSpark on Apr 22, 2017
Latest reply on May 25, 2017 by maprcommunity

hi all,

I'm trying to read kafka topic from a different host using spark streaming application.

I have two hosts A and B.

A has zookeeper and kafka(0.9.0.1) installed. B has spark installed on it.
Now when im trying to read the kafka topic from A through spark streaming from B and persist the data in to B.
Here is the issue, when i run the spark app, it thoroughly throws the below errors continuously,

17/04/20 05:22:16 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - kafka.common.ConsumerRebalanceFailedException: consumergroupname_HOSTA-1492680125896-b52bf576 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1979)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

 

17/04/20 05:22:16 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 80, prd-mapr-node1.fishbowl.com): kafka.common.ConsumerRebalanceFailedException:consumergroupname_HOSTA-1492680125896-b52bf576 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1979)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

 

**Below are the steps I tried to resolve the issue**

set the following values in consumer.props and zookeeper.props

zookeeper.connection.timeout.ms=1000000
zookeeper.session.timeout.ms=10000
rebalance.backoff.ms=10000


Both A and B are telnet'ing and pinging.

Then, I tried to print the schema of the topic message, but the output is only

root

Spark app was creating a _success file in the output directory though.

Here is a snap of the zookeeper logs:

 

 

[2017-04-20 09:49:23,648] INFO Got user-level KeeperException when processing sessionid:0x15b8aa804290015 type:create cxid:0x79 zxid:0x1a1f txntype:-1 reqpath:n/a
Error Path:/consumers/test-group/owners/topic_test/0 Error:KeeperErrorCode = InvalidACL for /consumers/test-group/owners/topic_test/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-04-20 09:49:25,978] INFO Got user-level KeeperException when processing sessionid:0x15b8aa804290015 type:create cxid:0xea zxid:0x1a20 txntype:-1 reqpath:n/a
Error Path:/consumers/test-group/owners/topic_test/0 Error:KeeperErrorCode = InvalidACL for /consumers/test-group/owners/topic_test/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-04-20 09:49:28,320] INFO Got user-level KeeperException when processing sessionid:0x15b8aa804290015 type:create cxid:0x15b zxid:0x1a21 txntype:-1 reqpath:n/a
Error Path:/consumers/test-group/owners/topic_test/0 Error:KeeperErrorCode = InvalidACL for /consumers/test-group/owners/topic_test/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-04-20 09:49:30,653] INFO Got user-level KeeperException when processing sessionid:0x15b8aa804290015 type:create cxid:0x1cc zxid:0x1a22 txntype:-1 reqpath:n/a
Error Path:/consumers/test-group/owners/topic_test/0 Error:KeeperErrorCode = InvalidACL for /consumers/test-group/owners/topic_test/0 (org.apache.zookeeper.server.PrepRequestProcessor)

Here "Consumergroupname" is the consumer group id i created.
Here is the spark application code snippet,
*

sparkConf.set("spark.eventLog.enabled","true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false")

 

val ssc = new StreamingContext(sc, Seconds(300))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "broker1:9092,broker2:9093",
"zookeeper.connect" -> "zk:2181",
"group.id" -> "Consumergroupname",
"zookeeper.connection.timeout.ms" -> "1000000")
val topicMaps = Map("topic_test" -> 1)
val messages = KafkaUtils.createStream[String, String,StringDecoder,StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd=>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

 

val dataframe =sqlContext.read.json(rdd.map(_._2))

 

val myDF =dataframe.toDF()
import org.apache.spark.sql.SaveMode
myDF.printSchema()
myDF.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save("maprfs://path")

 

})

 

ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext =true,stopGracefully=true)


*

When i try to list the consumer group, i get the following result.
[root@host kafka]# ./bin/kafka-consumer-groups.sh --describe --zookeeper zk:2181 --describe --group Consumergroupname
No topic available for consumer group provided
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
"No topic available for consumer group provided" what does this mean.


What can be the issue? Any help is appreciated.

bgajjela Artur Sukhenko MapR Community Community Manager 

Outcomes