vmeghraj

SparkR from R Interactive Shell

Blog Post created by vmeghraj Employee on May 25, 2017

SparkR is an R package that provides a lightweight front end for using MapR Spark from R. In MapR-Spark 2.1.0, SparkR provides a Distributed DataFrame implementation that supports operations like selection, filtering, aggregation, etc. The entry point into SparkR is the SparkSession, which connects your R program to a MapR/Spark cluster. 

 

SparkContext, SQLContext, and SparkSession -  In Spark 1.x, SparkContext and SQLContext let you access Spark. In Spark 2.x, SparkSession becomes the primary method. We can create SparkSession using sparkR.session and pass in options, such as the application name, Spark packages depended on, etc.

 

Here are a few examples with different use cases, which would connect R program to a MapR cluster from R shell. Set the SPARK_HOME, load the R packages, create sparkR session with passing the required arguments, and execute the program.

 

The following Spark driver properties can be set in sparkConfig with sparkR.session from R:

 

Property NameProperty groupspark-submit equivalent
spark.masterApplication Properties--master
spark.yarn.keytabApplication Properties--keytab
spark.yarn.principalApplication Properties--principal
spark.driver.memoryApplication Properties--driver-memory
spark.driver.extraClassPathRuntime Environment--driver-class-path
spark.driver.extraJavaOptionsRuntime Environment--driver-java-options
spark.driver.extraLibraryPathRuntime Environment--driver-library-path

 

Use Case 1: Local DataFrames -  Convert a local R data frame into a SparkDataFrame. The following creates a SparkDataFrame, based on using the faithful dataset from R.

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
# results is now a SparkDataFrame
df <- as.DataFrame(faithful)
head(df)

 

Use Case 2: From Data Sources - The method for creating SparkDataFrames from data sources is read.df. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically, which supports reading JSON,CSV, and Parquet.

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)

 

Use Case 3: From Data Sources with YARN as Master -  SparkR supports reading JSON, CSV, and Parquet files natively, and through packages available from sources like Third Party Projects, you can find data source connectors for popular file formats like Avro. These packages can either be added by specifying --packages with spark-submit or sparkR commands, or if initializing SparkSession with sparkPackages parameter when in an interactive R shell.

 

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "yarn", sparkConfig = list(spark.driver.memory = "2g"))
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)

 

Use Case 4: Create SparkDataFrames from Hive tables - In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (enableHiveSupport = TRUE).

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session()
# Queries can be expressed in HiveQL.
sql("CREATE TABLE IF NOT EXISTS mapr(id INT, name STRING)")
sql("LOAD DATA LOCAL INPATH '/opt/mapr/spark/spark-2.1.0/examples/src/main/resources/kv1.txt' INTO TABLE src")
employeeCount <- sql("SELECT count(*) from mapr")
# results is now a SparkDataFrame
head(employeeCount)

 

Prerequisites:
1) yum install R
2) yum install mapr-spark-2.1.0.201703271134-1.noarch
3) yum install mapr-spark-master-2.1.0.201703271134-1.noarch

 

The MapR-Spark-2.1.0 packages can be downloaded from http://package.mapr.com/releases/MEP/MEP-3.0.0/redhat/.

 

Program Execution Snippet R Program from R shell

 

[mapr@sn2 bin]$ R

R version 3.3.3 (2017-03-06) -- "Another Canoe"
Copyright (C) 2017 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

> Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union

> sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
Spark package found in SPARK_HOME: /opt/mapr/spark/spark-2.1.0
Launching java with spark-submit command /opt/mapr/spark/spark-2.1.0/bin/spark-submit   --driver-memory "2g" sparkr-shell /tmp/RtmpzKbIS6/backend_port310323a10d39
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Java ref type org.apache.spark.sql.SparkSession id 1
> df <- as.DataFrame(faithful)
> head(df)
  eruptions waiting
1     3.600      79
2     1.800      54
3     3.333      74
4     2.283      62
5     4.533      85
6     2.883      55 

 

Conclusion

In this blog you have learned how R can be used with MapR Spark packages with few examples. Thanks for reading my post, if you have any questions please leave a comment in the comments section below.

 

 

Related

Apache Spark

sparkr

Outcomes