AnsweredAssumed Answered

Problem Consuming  Records From MaprStream using Spark-Streaming API

Question asked by PETER.EDIKE on Dec 27, 2017
Latest reply on Jan 12, 2018 by maprcommunity

Hello everyone 

 

Please I experience the following error when trying to read messages from mapr-streams

13921 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer
13963 [main] DEBUG com.mapr.streams.impl.listener.MarlinListener - Starting Streams Listener
13973 [main] DEBUG com.mapr.streams.impl.listener.MarlinListenerImpl - Starting Streams Listener
Exception 'java.lang.RuntimeException' occurred in thread 'main' at org.apache.spark.streaming.kafka.v09.KafkaUtils$.getFromOffsets(KafkaUtils.scala:424)
Exception in thread "main" java.lang.RuntimeException: Error occurred while instantiating com.mapr.streams.impl.listener.MarlinListener.
==> java.lang.NoSuchMethodError: com.mapr.streams.impl.listener.MarlinListenerImpl.OpenListener(Ljava/lang/String;Ljava/lang/String;IZZJJIIILjava/lang/String;Ljava/lang/String;J)J.
at org.apache.kafka.clients.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:41)
at org.apache.kafka.clients.consumer.KafkaConsumer.initializeConsumer(KafkaConsumer.java:591)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1563)
at org.apache.spark.streaming.kafka.v09.KafkaCluster$$anonfun$getPartitions$1$$anonfun$1.apply(KafkaCluster.scala:54)
at org.apache.spark.streaming.kafka.v09.KafkaCluster$$anonfun$getPartitions$1$$anonfun$1.apply(KafkaCluster.scala:54)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.v09.KafkaCluster$$anonfun$getPartitions$1.apply(KafkaCluster.scala:53)
at org.apache.spark.streaming.kafka.v09.KafkaCluster$$anonfun$getPartitions$1.apply(KafkaCluster.scala:52)
at org.apache.spark.streaming.kafka.v09.KafkaCluster.withConsumer(KafkaCluster.scala:164)
at org.apache.spark.streaming.kafka.v09.KafkaCluster.getPartitions(KafkaCluster.scala:52)
at org.apache.spark.streaming.kafka.v09.KafkaUtils$.getFromOffsets(KafkaUtils.scala:419)
at org.apache.spark.streaming.kafka.v09.KafkaUtils$.createDirectStream(KafkaUtils.scala:292)
at org.apache.spark.streaming.kafka.v09.KafkaUtils$.createDirectStream(KafkaUtils.scala:397)
at org.apache.spark.streaming.kafka.v09.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.interswitch.bigdata.jobs.consumer.SparkConsumerJob.runJob(SparkConsumerJob.java:98)
at com.interswitch.bigdata.jobs.consumer.SparkConsumerJob.main(SparkConsumerJob.java:63)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.kafka.clients.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:38)
... 19 more
Caused by: java.lang.NoSuchMethodError: com.mapr.streams.impl.listener.MarlinListenerImpl.OpenListener(Ljava/lang/String;Ljava/lang/String;IZZJJIIILjava/lang/String;Ljava/lang/String;J)J
at com.mapr.streams.impl.listener.MarlinListenerImpl.<init>(MarlinListenerImpl.java:156)
at com.mapr.streams.impl.listener.MarlinListener.<init>(MarlinListener.java:58)
... 24 more




The exception arises from the last line in the following block of code

 

 Map<String, String> kafkaParams = getKafkaParamsFromConfig(parameters).entrySet().stream()
         .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
String topicName = parameters.get(ConfigKeys.KAFKA_TOPIC).toString();
Set<String> collections = new HashSet<>();
collections.add(topicName);
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
         streamingContext,String.class, String.class, kafkaParams, collections);

 

 

the contents of my pom.xml are as follows

 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-v09_2.10</artifactId>
    <version>0.9.0.0-mapr-1602</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.mapr.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.mapr.streams</groupId>
<artifactId>mapr-streams</artifactId>
<version>5.1.0-mapr</version>
</dependency>

 

 

Please some pointer as to what I am doing wrong will be deeply appreciated

Outcomes