AnsweredAssumed Answered

MapR-DB (or HBase) and PySpark can't convert to Python types

Question asked by imichaeldotorg on Nov 19, 2014
Latest reply on Nov 21, 2014 by imichaeldotorg
I'm attempting to query MapR-DB from Spark using Python.  It does work but the results that I get back are not in the proper format.  It looks like it is important to specify a keyConverter and valueConverter when creating a new RDD, like this:
<pre>
        conf = {"hbase.mapreduce.inputtable": table_name}
        rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                                 "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                                 "org.apache.hadoop.hbase.client.Result",
                                 keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
                                 valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
                                 conf=conf)
</pre>

I'm seeing this in most examples.  However, in MapR's spark 1.1.0, I'm getting this error:

    /opt/mapr/spark/spark-1.1.0/python/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
        469         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
        470                                                    valueClass, keyConverter, valueConverter,
    --> 471                                                    jconf, batchSize)
        472         return RDD(jrdd, self, ser)
        473
    
    /opt/mapr/spark/spark-1.1.0/python/build/py4j/java_gateway.py in __call__(self, *args)
        536         answer = self.gateway_client.send_command(command)
        537         return_value = get_return_value(answer, self.gateway_client,
    --> 538                 self.target_id, self.name)
        539
        540         for temp_arg in temp_args:
    
    /opt/mapr/spark/spark-1.1.0/python/build/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        298                 raise Py4JJavaError(
        299                     'An error occurred while calling {0}{1}{2}.\n'.
    --> 300                     format(target_id, '.', name), value)
        301             else:
        302                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
    : java.lang.ClassNotFoundException: org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
     at java.security.AccessController.doPrivileged(Native Method)
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
     at java.lang.Class.forName0(Native Method)
     at java.lang.Class.forName(Class.java:274)
     at org.apache.spark.util.Utils$.classForName(Utils.scala:150)
     at org.apache.spark.api.python.Converter$$anonfun$getInstance$1$$anonfun$1.apply(PythonHadoopUtil.scala:46)
     at org.apache.spark.api.python.Converter$$anonfun$getInstance$1$$anonfun$1.apply(PythonHadoopUtil.scala:45)
     at scala.util.Try$.apply(Try.scala:161)
     at org.apache.spark.api.python.Converter$$anonfun$getInstance$1.apply(PythonHadoopUtil.scala:45)
     at org.apache.spark.api.python.Converter$$anonfun$getInstance$1.apply(PythonHadoopUtil.scala:44)
     at scala.Option.map(Option.scala:145)
     at org.apache.spark.api.python.Converter$.getInstance(PythonHadoopUtil.scala:44)
     at org.apache.spark.api.python.PythonRDD$.getKeyValueConverters(PythonRDD.scala:574)
     at org.apache.spark.api.python.PythonRDD$.convertRDD(PythonRDD.scala:587)
     at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:439)
     at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:606)
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
     at py4j.Gateway.invoke(Gateway.java:259)
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
     at py4j.commands.CallCommand.execute(CallCommand.java:79)
     at py4j.GatewayConnection.run(GatewayConnection.java:207)
     at java.lang.Thread.run(Thread.java:745)

That class is in /opt/mapr/spark/spark-1.1.0/lib/spark-examples-1.1.0-hadoop2.4.1-mapr-1408.jar.  However, when I add it to my SPARK_DAEMON_CLASSPATH, I get another error:

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/mapr/spark/spark-1.1.0/lib/spark-examples-1.1.0-hadoop2.4.1-mapr-1408.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/mapr/spark/spark-1.1.0/lib/spark-assembly-1.1.0-hadoop2.4.1-mapr-1408.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    14/11/19 12:42:03 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver]
    java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
            at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:282)
            at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:239)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
            at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
            at scala.util.Try$.apply(Try.scala:161)
            at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
            at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
            at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
            at scala.util.Success.flatMap(Try.scala:200)
            at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
            at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
            at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
            at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
            at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
            at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
            at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
            at akka.actor.ActorCell.invoke(ActorCell.scala:456)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
            at akka.dispatch.Mailbox.run(Mailbox.scala:219)
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
            at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
            at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
            at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
            at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Any help would be greatly appreciated.


Outcomes