AnsweredAssumed Answered

MapR Streams via PySpark Exception

Question asked by ihijazi on Jan 27, 2017
Latest reply on Jan 27, 2017 by ihijazi

I'm trying to execute the following on pyspark and I hit an exception:

 

Py code
strLoc   = '/Path1:Stream1'
from pyspark.streaming.kafka import  *;
from pyspark import  StorageLevel;
APA = KafkaUtils.createDirectStream(ssc, [strLoc], kafkaParams={ \
   ,"zookeeper.connect" : "maprdemo:5181" \
   ,"metadata.broker.list" : "this.will.be.ignored:9092"
   ,"group.id" : "New_Mapping_2_Physical"}, fromOffsets=None, messageHandler=None)

 

Exception:

 

Traceback (most recent call last):
File "/tmp/New_Mapping_2_Physical.py", line 77, in <module>
    ,"group.id" : "New_Mapping_2_Physical"}, fromOffsets=None, messageHandler=None)
  File "/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o58.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
    at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

 

Of course the path and topic exists, and I've tried console producer/consumer to test them.

 

Running the same on Scala works with no issues.

 

Spark version 1.6.1

 

Any idea???###

Outcomes