AnsweredAssumed Answered

MapR, Mesos, Spark.

Question asked by mandoskippy on Apr 3, 2016
Latest reply on Apr 3, 2016 by mandoskippy

Hey all, I've posted about MapR, Mesos and Spark before.

 

Spark on MapR - Using Hadoop Provided

 

And am circling back around to the MapR/Spark/Mesos combination now.  Basically, I'd like to use Spark on Mesos, running over the MapR platform.  I could use some help understanding how class path affects things, it can be very confusing as a user to know what we need to do.  I started with this:

 

Spark 1.6.0 Developer Preview - MapR 5.0 Documentation - doc.mapr.com

 

Now, to be fair, I am using Spark 1.6.1 (which shouldn't be much different, other than bug fixes from Spark 1.6.0) and MapR 5.1 which is different.

 

To simplify, I used the hadoop provided and updated the env files per the documentation.  I kept getting a slf4j error, and that was resolved by changing:

MAPR_HADOOP_CLASSPATH=`hadoop classpath`:/opt/mapr/lib/slf4j-log4j12-1.7.5.jar: #(from the docs)

to

 

MAPR_HADOOP_CLASSPATH="`hadoop classpath`:/opt/mapr/lib/slf4j-log4j12-1.7.12.jar:/opt/mapr/lib/slf4j-api-1.7.12.jar:"

 

(I needed both)

 

Then my next issues were related to errors with login on Hadoop simple, this was resolved by adding:

 

 

SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf"

 

Ok, now I have the driver working. However, executors in Mesos would fail right away. I am running in Coarse grain mode, so it attempted to start the executors on launch of pyspark.  When I looked at the logs for the executors, I noticed the same slf4j error.  So at this point, I used, in the spark-defaults.conf,  spark.executor.extraClassPath to add the same classpath from the spark-env ( I used echo so I could see it). Ok, this worked, but then I got another failure on the executors, well, it was the same login error for hadoop simple, so I added  spark.executor.extraJavaOptions with the -D setting for mapr.login.conf file.

 

Now the executors start with no errors. Great!

 

So I try something simple:

 

>>> tfile = sc.textFile("maprfs:///path/to/my.csv")

16/04/03 12:29:27 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 148.3 KB, free 148.3 KB)

16/04/03 12:29:27 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.0 KB, free 164.3 KB)

16/04/03 12:29:27 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.100:53379 (size: 16.0 KB, free: 143.2 MB)

16/04/03 12:29:27 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2

 

This seems promising. So let's count the lines...

Full error below, but I am getting:

: java.io.IOException: No FileSystem for scheme: maprfs

 

And this confuses me. I think maprfs:/// should work. When I self mount each node via NFS, setting it to be file://mapr/mycluster/path/to/my.csv works fine, and the count returns properly.  So I just need to understand how the hadoop-provided interacts with the hadoop installations on each... Any thoughts would be welcome here.

 

John

 

 

 

 

>>> tfile.count()

Traceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/pyspark/rdd.py", line 1004, in count

    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/pyspark/rdd.py", line 995, in sum

    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/pyspark/rdd.py", line 869, in fold

    vals = self.mapPartitions(func).collect()

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/pyspark/rdd.py", line 771, in collect

    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/pyspark/sql/utils.py", line 45, in deco

    return f(*a, **kw)

  File "/mapr/brewpot/mesos/prod/spark/spark-1.6.1-bin-without-hadoop/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

: java.io.IOException: No FileSystem for scheme: maprfs

  at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2680)

  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2687)

  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)

  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2723)

  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2705)

  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:407)

  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:309)

  at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)

  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)

  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:317)

  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)

  at scala.Option.getOrElse(Option.scala:120)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)

  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)

  at scala.Option.getOrElse(Option.scala:120)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)

  at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)

  at scala.Option.getOrElse(Option.scala:120)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

  at org.apache.spark.rdd.RDD.collect(RDD.scala:926)

  at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)

  at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

  at java.lang.reflect.Method.invoke(Method.java:498)

  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)

  at py4j.Gateway.invoke(Gateway.java:259)

  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

  at py4j.commands.CallCommand.execute(CallCommand.java:79)

  at py4j.GatewayConnection.run(GatewayConnection.java:209)

  at java.lang.Thread.run(Thread.java:745)

Outcomes