AnsweredAssumed Answered

Spark-csv failing when connecting to mapR via sparkR/RStudio

Question asked by brett_ on Dec 14, 2016
Latest reply on Jan 19, 2017 by dmeng

We are running SparkR in RStudio Server on a remote client node. We're doing this to support another team so they can log into the client node and spin up an RStudio session on the fly. They log in with their credentials (shown below as "dataScientist").

 

R-base is installed on each node, and df <- createDataFrame(sqlContext, faithful) (the example Old Faithful data set) works.

 

We have a csv file in HDFS that we'd like to use for a demonstration of different libraries in Apache Spark. Here is our launch scripts after logging into the R Studio environment:

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-1.6.1/")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

# load packages ----
library("SparkR")
sc <- sparkR.init(master="spark://omahaNode5:7077", sparkPackages = "com.databricks:spark-csv_2.11:1.5.0")

# Create a Hive Context
hiveContext <- sparkRHive.init(sc)

# Create a SQL Context
sqlContext <- sparkRSQL.init(sc)

However, when I try connecting to the data source- in this case the csv file- using the spark package com.databricks.spark.csv, we are getting an error:

> df2 <- read.df(sqlContext, "hdfs:///user/dataScientist/WireData.csv",source = "com.databricks.spark.csv", inferSchema = "true")

16/12/14 16:11:38 ERROR GPLNativeCodeLoader: Could not load native gpl library java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path      at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1878)      at java.lang.Runtime.loadLibrary0(Runtime.java:849)      at java.lang.System.loadLibrary(System.java:1087)      at com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:32)      at com.hadoop.compression.lzo.LzoCodec.<clinit>(LzoCodec.java:71)      at java.lang.Class.forName0(Native Method)      at java.lang.Class.forName(Class.java:270)      at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2147)      at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2112)      at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:132)      at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:179)      at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:38)      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)      at java.lang.reflect.Method.invoke(Method.java:606)      at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)      at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)      at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)      at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185)      at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)      at scala.Option.getOrElse(Option.scala:120)      at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)      at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)      at scala.Option.getOrElse(Option.scala:120)      at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)      at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)      at scala.Option.getOrElse(Option.scala:120)      at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)      at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)      at org.apache.spark.rdd.RDD.take(RDD.scala:1302)      at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)      at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)      at org.apache.spark.rdd.RDD.first(RDD.scala:1341)      at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:269)      at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:265)      at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:242)      at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:74)      at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:171)      at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)      at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)      at org.apache.spark.sql.api.r.SQLUtils$.loadDF(SQLUtils.scala:160)      at org.apache.spark.sql.api.r.SQLUtils.loadDF(SQLUtils.scala)      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)      at java.lang.reflect.Method.invoke(Method.java:606)      at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141)      at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86)      at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)      at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)      at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)      at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)      at java.lang.Thread.run(Thread.java:724) 16/12/14 16:11:38 ERROR LzoCodec: Cannot load native-lzo without native-hadoop16/12/14 16:11:40 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job16/12/14 16:11:40 ERROR RBackendHandler: loadDF on org.apache.spark.sql.api.r.SQLUtils failedError in invokeJava(isStatic = TRUE, className, methodName, ...) :    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, omahaNode9): java.io.IOException: Error getting user info for current user, dataScientist      at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:665)      at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:702)      at com.mapr.fs.MapRFileSystem.open(MapRFileSystem.java:976)      at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:803)      at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:102)      at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:58)      at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:237)      at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)      at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)      a

Is this because the user is not logged into the R Studio session as the MapR user? I changed the file permissions of this particular CSV file to be owned by the user dataScientist,

hadoop fs -chown dataScientist /user/dataScientist/WireData.csv

and running the same command generated this output:

16/12/14 16:14:06 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job16/12/14 16:14:06 ERROR RBackendHandler: loadDF on org.apache.spark.sql.api.r.SQLUtils failedError in invokeJava(isStatic = TRUE, className, methodName, ...) :    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7, omahaNode2): java.io.IOException: Error getting user info for current user, dataScientist      at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:665)      at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:702)      at com.mapr.fs.MapRFileSystem.open(MapRFileSystem.java:976)      at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:803)      at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:102)      at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:58)      at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:237)      at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)      at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)      at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)      a

So, this could be a Hadoop LZO-codec issue. I'm stuck here, not sure if others have experience with common sparkR connectivity issues on the mapR platform, any help is much appreciated, thanks.

Outcomes