AnsweredAssumed Answered

Spark fileStream issue

Question asked by zaheer.shaik2 on May 18, 2015
Latest reply on May 29, 2015 by andylerner
I've the following program to read data from HDFS file stream. It works when I use 1.3.1 version on non-mapr cluster. But, it fails when I use 1.2.1 version on MapR. Any help would be appreciated.

 I see that this issue has been resolved [here][1]. But, I cannot use 1.3.0 version on MapR as it is not supported yet. Any work arounds?

               //Initialize Spark Context
  JavaSparkContext jsc = new JavaSparkContext(sparkConf);
 
  //Initialize streaming context
  JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(5000));
 
  //Create Stream from Flume
  JavaPairInputDStream<LongWritable, Text> textStream = jssc.fileStream("hdfs://localhost:9000/tmp/spark-sink");


I get the following error:

2015-05-18 18:41:35,221 ERROR [sparkDriver-akka.actor.default-dispatcher-14] scheduler.JobScheduler: Error generating jobs for time 1431999695000 ms
**java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat**
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:232)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:230)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:230)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "main" java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:301)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:232)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:230)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:230)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


  [1]: https://issues.apache.org/jira/browse/SPARK-5297

Outcomes