AnsweredAssumed Answered

Spark-streaming with legacy Apache Kafka java.io.EOFException

Question asked by catverdier on Aug 24, 2016
Latest reply on Aug 30, 2016 by catverdier


I am currently trying to compare 2 different streaming architectures with the MapR community edition :

First one :

Kafka Producer ==> Kafka ==> spark-streaming ==> HBase

Second One :

MapRStream Producer ==> MapRStream ==> spark-streaming ==> MaprDB (HBase API)

 

For the first one, I have started the Kafka Server supplied in the community edition. The Kafka server is connected to the MapR global Zookeeper (port 5181 by default).

Kafka runs properly (I can produce and consume messages with the kafka-console-producer.sh and  the kafka-console-consumer.sh tools)

 

My topic contains messages :

$ bin/kafka-console-consumer.sh --zookeeper localhost:5181 --topic aphptopic --from-beginning

{"key":"ke1ArE", "value":"fgcuVf7ljcYC"}

{"key":"EOEOQa", "value":"itiuEDhQ0GVx"}

{"key":"5lGhl6", "value":"TSrjgMhS3FaB"}

{"key":"Oe0RwR", "value":"fXRrwlqUiIDR"}

{"key":"DwrngU", "value":"5FjeASb6bnQH"}

{"key":"fsGE4N", "value":"YCbJtgHSdvYr"}

{"key":"IsRW5W", "value":"wo48GRGPcbDH"}

{"key":"8356lY", "value":"wMWVXprf32bI"}

{"key":"LFrujW", "value":"Dm6US4GECAel"}

{"key":"5bOfvl", "value":"8AfTIgCvLG8l"}

{"key":"U2jhO0", "value":"Q18Efof7tkPF"}

{"key":"n4oAjT", "value":"sssrKncyBbNi"}

{"key":"lneXRo", "value":"mdAMLgX0vQ8V"}

{"key":"CILY4s", "value":"E7o3LHuTlW7H"}

{"key":"L2Umfj", "value":"TRpaNfjNojnD"}

{"key":"akY1dP", "value":"le8B6DP6DFiS"}

{"key":"7eWRjf", "value":"A2TGKCW6iXC6"}

{"key":"hW5jLh", "value":"ojI2hihUSKhH"}

{"key":"TM0nj4", "value":"HottcJp5OQAI"}

{"key":"8nWKRQ", "value":"SWarsuLY81Bk"}

{"key":"MFQbU0", "value":"FNh0S2m41rYQ"}

{"key":"RutXbP", "value":"lk4xo2bcO1Pf"}

{"key":"1eOBBK", "value":"JnEQXJtkQFaq"}

{"key":"VLkfFS", "value":"hpgB8IPbSG44"}

{"key":"gKS17l", "value":"8HB0kLTVvOmh"}

{"key":"o0oeWt", "value":"drbTxyDYRVPA"}

{"key":"rr8YWM", "value":"nt8W3jLO3pnd"}

{"key":"kKMFyU", "value":"rPRfb1UIt1ta"}

{"key":"DoOxca", "value":"sNjEvxyogvXe"}

{"key":"2TpOfn", "value":"XvPXyhtI0KER"}

 

I am trying to consume messages from spark-streaming, but I do not succeed.

 

I have added the spark-streaming-kafka_2.10-1.6.1.jar lib to spark-shell and my job is the following :

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.Seconds

import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder

import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.kafka.common.serialization.StringDeserializer

 

 

val ssc = new StreamingContext(sc, Seconds(2))

val topicsSet = "aphptopic".split(",").toSet

val brokers = "localhost:5181"

val kafkaParams = Map[String, String](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

 

The createDirectStream call fails with the following exception :

org.apache.spark.SparkException: java.io.EOFException

        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)

        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)

        at scala.util.Either.fold(Either.scala:97)

        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)

        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)

        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)

        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)

        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)

        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)

        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)

        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:52)

        at $iwC$$iwC$$iwC.<init>(<console>:54)

        at $iwC$$iwC.<init>(<console>:56)

        at $iwC.<init>(<console>:58)

        at <init>(<console>:60)

        at .<init>(<console>:64)

        at .<clinit>(<console>)

        at .<init>(<console>:7)

        at .<clinit>(<console>)

        at $print(<console>)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)

        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)

        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)

        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)

        at org.apache.spark.repl.Main$.main(Main.scala:31)

        at org.apache.spark.repl.Main.main(Main.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:752)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 

ANY IDEA ?????

Outcomes