AnsweredAssumed Answered

Spark Stream and Flme Integration

Question asked by mickey on Jan 29, 2015
Latest reply on Jan 29, 2015 by mickey
My test environment is Spark Stream v1.1.0 + Flume v1.5.0 on MapR v4.0.1
I am using Flume Avro sink to send the events to Spark Stream.
It is ok when i test   ./bin/run-example streaming.JavaFlumeEventCount sw02 12345,
but it is not ok when i test after coding the exact same source

Appreciate any support for escaping this problem.

And i am getting the following exception:

15/01/29 12:43:49 WARN Responder: user error
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent
        at org.apache.spark.streaming.flume.FlumeEventServer$$anonfun$appendBatch$1.apply(FlumeInputDStream.scala:131)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.streaming.flume.FlumeEventServer.appendBatch(FlumeInputDStream.scala:131)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/01/29 12:43:49 WARN Responder: system error
org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent
        at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:632)
        at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:264)
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597)
        at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at org.apache.avro.ipc.specific.SpecificResponder.writeError(SpecificResponder.java:71)
        at org.apache.avro.ipc.Responder.respond(Responder.java:167)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Outcomes