maprcommunity

Apache Spark And MapR-DB JSON Integration

Blog Post created by maprcommunity Employee on Jun 12, 2017

by Hanumath Rao Maduri

Apache Spark is an open source big data processing framework, which is being widely used for analytics on streaming and batch workloads. Spark is fully supported on MapR, and it typically uses data in the form of large files. With the Spark/MapR-DB connectors, you can use MapR-DB as a data source and as a data destination for Spark jobs.

MapR-DB is a high performance NoSQL database, which supports two primary data models: JSON documents and wide column tables. A Spark connector is available for each data model. The Native Spark Connector for MapR-DB JSON provides APIs to access MapR-DB JSON documents from Apache Spark, using the Open JSON Application Interface (OJAI) API. To access the wide column data model, which is often referred to as “MapR-DB Binary,” the Spark HBase and MapR-DB Binary Connector should be used. This blog article will describe examples of the connector for MapR-DB JSON.

BACKGROUND

Big data applications are moving towards data sets with flexible (or no predefined) schemas. Hence, the OJAI API was introduced in the MapR-DB 5.1 release. The OJAI API is the set of interfaces that allows the application to manipulate structured, semi-structured, or unstructured data. Please refer to the following GitHub repository for more details on the OJAI API: https://github.com/ojai/ojai.

With the new release (MapR 5.2, MEP 3.0), a new connector was developed to integrate MapR-DB JSON tables with Spark. This connector uses OJAI API internally to access/mutate the tables. It is this connector API that will be further explored in this blog post.

THE SPARK/MAPR-DB JSON CONNECTOR API

In the MapR Ecosystem Pack (MEP) 3.0 release, the Native Spark Connector for MapR-DB JSON supports loading data from a MapR-DB table as a Spark Resilient Distributed Dataset (RDD) of OJAI documents and saving a Spark RDD into a MapR-DB JSON table. (An RDD is the base format for storing data for use by Spark.)

Here are the interfaces for loading the JSON table into an RDD:

loadFromMapRDB(<path-of-mapr-db-json-table>)

The above function also supports another variant wherein one can directly load the documents as an RDD of Scala objects:

loadFromMapRDB[<BeanClass>](<path-of-mapr-db-json-table>)

Below is the API for saving the objects into a MapR-DB table:

rdd.saveToMapRDB(<path-of-mapr-db-json-table>)

The above function (i.e., saveToMapRDB) also contains more self-explanatory parameters:

createTable – Create the table before saving the documents, and throw an exception if the table already exists. The default value is set to false. bulkInsert – Save a group of rows of data at once into a MapR-DB table. The default value is set to false. idFieldPath – Key to be used to identify the document. The default value is set to “id."

Similar to loading the document into a Scala bean class, one can save an RDD of user-specified Scala class objects into the MapR-DB JSON table.

SAVING OBJECTS IN A MAPR-DB JSON TABLE

To access the connector API, it is required to import the Scala package “com.mapr.db.spark._.” All the required implicit definitions are included in the com.mapr.db.spark package.

Below is the code, which saves the RDD of Person objects into the MapR-DB JSON table:

val spark = new SparkConf().setAppName("json app")
.setMaster(“local[*]”)
    val sc = new SparkContext(spark)
    val people = sc.parallelize(getUsers())
    people.saveToMapRDB("/tmp/UserInfo", createTable= true)

Here is the getUsers function, which allocates Person objects:

LOADING DATA FROM A MAPR-DB JSON TABLE

The code provided below will load the documents from the "/tmp/UserInfo" table into an RDD:

val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo").collect

 

Here is the result from the printing of usersInfo documents:

usersInfo.foreach(println(_))

{
  "address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
   "last_name":"Jones"
}

{
  "address":{"Pin":95985,"city":"sunnyvale","street":"35 town way"},
   "dob":"1987-05-04",
   "first_name":"Indiana",
   "interests":["squash","comics","movies"],
   "last_name":"Jones"
}



{
  "address":{"Pin":67765,"city":"phoenix","street":"358 pond way"},
   "dob":"1968-10-02",
   "first_name":"James",
   "interests":["tennis","painting","music"],
   "last_name":"junior"
}

{
  "address":{"Pin":95652,"city":"san jose","street":"305 city way"},
  "dob":"1976-01-09",
   "first_name":"Jimmy",
   "interests":["cricket","sketching"],
    "last_name":"gill"
}

{
   "address":{"Pin":89898,"city":"salt lake","street":"351 lake way"},
   "dob":"1974-01-29",
    "first_name":"Peter",
    "interests":["boxing","music","movies"],
    "last_name":"pan"
}

 

PROJECTION PUSHDOWN AND PREDICATE PUSHDOWN FOR THE LOAD API

The “load” API of the connector also supports “select” and “where” clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.

Here is an example on how to use the “where” clause to restrict the rows:

val usersLivingInMilpitas = sc.loadFromMapRDB("/tmp/UserInfo")
.where(field("address.city") === "milpitas")

 

Similarly, if one wants to project only first_name and last_name fields, the following code will generate the required output:

val namesOfUsers = sc.loadFromMapRDB("/tmp/UserInfo")
.select("first_name", "last_name")

 

SETTING UP THE PROJECT TO USE THE SPARK/MAPR-DB CONNECTOR

To access the loadFromMapRDB and saveToMapRDB API, the following Maven package and artifactId information is required in the project’s pom.xml file:

   <dependency>
            <groupId>com.mapr.db</groupId>
            <artifactId>maprdb-spark</artifactId>
            <version>5.2.1-mapr</version>
   </dependency>

 

To add the Spark core dependency into the pom.xml file:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0-mapr-1703</version>
        </dependency>

 

MapR specific jars are located in the mapr-releases repository. The following repository information should be included in the pom.xml file to enable Maven to download the dependencies:

        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
       </repository>

 

The code for the example application can be accessed here.

SUMMARY

Once the data is loaded as an RDD of either OJAI documents or an RDD of a Scala bean class, it can be processed further using Spark transformations. The data loaded from MapR-DB tables can be enriched using the data from other data sources.

The Spark connector will be further enhanced to support the DataFrame and DataSet APIs. It enables you to use Spark SQL and Spark Streaming to transform the data seamlessly from MapR-DB JSON tables.

Outcomes