AnsweredAssumed Answered

Issue with kafka spark streaming

Question asked by sniper99 on Jun 20, 2018
Latest reply on Jun 20, 2018 by maprcommunity

When running kafka spark streaming application on Mapr 6.0.1 we are getting the below error.  The subscribe method in createDirectStream takes a list but the underlying client subscribe method is expecting a collection. I am using the latest jar dependencies.

 

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-9_2.11</artifactId>
<version>2.2.1-mapr-1803</version>

 

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.streaming.kafka09.Subscribe.onStart(ConsumerStrategy.scala:85)
at org.apache.spark.streaming.kafka09.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
at org.apache.spark.streaming.kafka09.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Outcomes