AnsweredAssumed Answered

Spark 1.4.1 on MapR 5 - crashing Spark application

Question asked by dafox on Sep 23, 2015
Latest reply on Oct 15, 2015 by akrd
Hello,

I have crashing Spark application:

    object RunSpark {
        def main(args: Array[String]) {
            val sparkContext: SparkContext = new SparkContext()
            val data: RDD[String] = sparkContext.textFile("banana-big.tsv")
            val repartitioned: RDD[String] = data.repartition(5)
            val mean: Double = repartitioned
                .groupBy((s: String) => s.split("\t")(1))
                .mapValues((strings: Iterable[String]) =>strings.size)
                .values.mean()
            println(mean)
        }
    }

It crash with error:

    Exception in thread "main" java.lang.IllegalArgumentException: argument type mismatch
     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
     at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$instantiateClass(ClosureCleaner.scala:330)
     at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:268)
     at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:262)
     at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
     at scala.collection.immutable.List.foreach(List.scala:318)
     at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:262)
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
     at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:700)
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:699)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
     at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:699)
     at org.apache.spark.rdd.RDD$$anonfun$coalesce$1.apply(RDD.scala:381)
     at org.apache.spark.rdd.RDD$$anonfun$coalesce$1.apply(RDD.scala:367)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
     at org.apache.spark.rdd.RDD.coalesce(RDD.scala:366)
     at org.apache.spark.rdd.RDD$$anonfun$repartition$1.apply(RDD.scala:342)
     at org.apache.spark.rdd.RDD$$anonfun$repartition$1.apply(RDD.scala:342)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
     at org.apache.spark.rdd.RDD.repartition(RDD.scala:341)
     at repartitionissue.RunSpark$.main(RunSpark.scala:10)
     at repartitionissue.RunSpark.main(RunSpark.scala)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:497)
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Application is executed using spark-submit on Yarn hadoop 2.7 with yarn-client mode. When I run same application using spark-shell it's completed successfully. I suspect problem is in code of Spark Utils's ClosureCleaner.class (can be found in stack trace). The method in ClosureCleaner distinguishes between REPL (spark-shell) and spark-submit, however it can be somehow related to MapR distribution of Spark 1.4.1 as well. When application is executed on Spark 1.3.1 no error occured and computation is successful (in case of spark-submit).

Spark 1.4.1 JAR's (used by YARN) name is `spark-assembly-1.4.1-hadoop2.5.1-mapr-1501.jar` (MapR distribution of spark 1.4.1 RPM). It looks like this JAR was build for hadoop version 2.5.1 obviously (from file name). MapR version 5 is distributed with YARN/hadoop 2.7.0. Does spark-assembly JAR works properly with YARN 2.7.0? It is really strange behaviour of Spark applications and same question is asked on https://issues.apache.org/jira/browse/SPARK-10773. Is it possible Spark application crash is related to different YARN versions - built with 2.5.1 and run on 2.7.0?

Any ideas/hints how to debug or invistigate issue in more details?

Thanks.

Outcomes