maprcommunity

Mapreduce Design Patterns Implemented In Apache Spark

Blog Post created by maprcommunity Employee on Mar 27, 2017

This blog is a first in a series that discusses some design patterns from the book MapReduce design patterns and shows how these patterns can be implemented in Apache Spark(R).

When writing MapReduce or Spark programs, it is useful to think about the data flows to perform a job. Even if Pig, Hive, Apache Drill and Spark Dataframes make it easier to analyze your data, there is value in understanding the flow at a lower level, just like there is value in using Explain to understand a query plan. One way to think about this is in groupings for types of patterns, which are templates for solving a common and general data manipulation problems. Below is the list of types of MapReduce patterns in the MapReduce book:

  • Summarization Patterns
  • Filtering Patterns
  • Data Organization Patterns
  • Join Patterns
  • Metapatterns
  • Input and Output Patterns

In this post we will go over one of the summarization patterns, namely numerical summarizations.

NUMERICAL SUMMARIZATIONS

Numerical summarizations are a pattern for calculating aggregate statistical values over data. The intent is to group records by a key field and calculate aggregates per group such as min, max, median. The figure below from the MapReduce design patterns book shows the general execution of this pattern in MapReduce.

This Aggregation pattern corresponds to using GROUP BY in SQL for example:

SELECT MIN(numericalcol1), MAX(numericalcol1),
   COUNT(*) FROM table GROUP BY groupcol2;

In Pig this corresponds to:

b = GROUP a BY groupcol2;
c = FOREACH b GENERATE group, MIN(a.numericalcol1),
MAX(a.numericalcol1), COUNT_STAR(a);

 

In Spark, Key value Pair RDDs are commonly used to group by a key in order to perform aggregations, as shown in the MapReduce diagram, however with Spark Pair RDDS, you have a lot more functions than just Map and Reduce.

We will go through some aggregation examples using the dataset from a previous blog on Spark Dataframes. The dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:

(In the code boxes, comments are in Green and output is in Blue)

Below we load the data from the ebay.csv file, then we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element to create the auctionRDD of Auction objects.

<font color="green">// SQLContext entry point for working with structured data</font>
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
<font color="green">// this is used to implicitly convert an RDD to a DataFrame.</font>
import sqlContext.implicits._
<font color="green">// Import Spark SQL data types and Row.</font>
import org.apache.spark.sql._
<font color="green">//define the schema using a case class</font>
case class Auction(auctionid: String, bid: Double, bidtime: Double, bidder: String, bidderrate: Integer, openbid: Double, price: Double, item: String, daystolive: Integer)
<font color="green">// create an RDD of Auction objects</font>
val auctionRDD= sc.textFile("ebay.csv").map(_.split(",")).map(p => Auction(p(0),p(1).toDouble,p(2).toDouble,p(3),p(4).toInt,p(5).toDouble,p(6).toDouble,p(7),p(8).toInt ))

The figure below shows the general execution in Spark to calculate the average bid per auction for an item.

The corresponding code is shown below. First a key value pair is created with the auction id and item as the key and the bid amount and a 1 as the value , e.g. ((id,item), bid amount,1)) . Next a reduceBykey performs a sum of the bid amounts and a sum of the ones to get the total bid amount and the count. A mapValues calculates the average which is the total bid amount / count of bids.

<font color="green">// create key value pairs of ( (auctionid, item) , (bid, 1))</font>
val apair = auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1)))
<font color="green">// reducebyKey to get the sum of bids and count sum</font>
val atotalcount = apair.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
<font color="green">// get a couple results</font>
atotalcount.take(2)
<font color="blue">// Array(((1641062012,cartier),(4723.99,3)), ((2920322392,palm),(3677.96,32)))</font>
<font color="green">// avg = total/count</font>
val avgs = atotalcount.mapValues{ case (total, count) => total.toDouble / count }
<font color="green">// get a couple results</font>
avgs.take(2)
<font color="blue">// Array(((1641062012,cartier),1574.6633333333332), ((2920322392,palm),114.93625))</font>

<font color="green">// This could also be written like this</font>
val avgs =auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1))).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (total, count) => total.toDouble / count }

It is also possible to use the java Math class or the spark StatCounter class to calculate statistics as shown

import java.lang.Math

<font color="green">// Calculate the minimum bid per auction</font>
val amax = apair.reduceByKey(Math.min)
<font color="green">// get a couple results</font>
amax.take(2)
<font color="blue">// Array(((1641062012,cartier),1524.99), ((2920322392,palm),1.0))</font>

import org.apache.spark.util.StatCounter
<font color="green">// Calculate statistics on the bid amount per auction</font>
val astats = apair.groupByKey().mapValues(list => StatCounter(list))
<font color="green">// get a result</font>
astats.take(1)
<font color="blue">// Array(((1641062012,cartier),(count: 3, mean: 1574.663333, stdev: 35.126723, max: 1600.000000, min: 1524.990000)))</font>

Spark DataFrames provide a domain-specific language for distributed data manipulation, making it easier to perform aggregations. Also DataFrame queries can perform better than coding with PairRDDs because their execution is automatically optimized by a query optimizer. Here is an example of using DataFrames to get the min , max, and avg bid by auctionid and item :

val auctionDF = auctionRDD.toDF()
<font color="green">// get the max, min, average bid by auctionid and item</font>
auctionDF.groupBy("auctionid", "item").agg($"auctionid",$"item", max("bid"), min("bid"), avg("bid")).show

<font color="blue">auctionid item MAX(bid) MIN(bid) AVG(bid)
3016429446 palm 193.0 120.0 167.54900054931642
8211851222 xbox 161.0 51.0 95.98892879486084</font>

You can also use SQL while working with DataFrames, using Spark SQL. This example gets the max, min, average bid by auctionid and item.

<font color="green">// register as a temp table inorder to use sql</font>
auctionDF .registerTempTable("auction")
<font color="green">// get the max, min, average bid by auctionid and item</font>
val aStatDF = sqlContext.sql("SELECT auctionid, item, MAX(bid) as maxbid, min(bid) as minbid, avg(bid) as avgbid FROM auction GROUP BY auctionid, item")

 

<font color="green">// show some results</font>
aStatDF.show
<font color="blue">auctionid item maxbid minbid avgbid
3016429446 palm 193.0 120.0 167.549
8211851222 xbox 161.0 51.0 95.98892857142857</font>

SUMMARY

This concludes the first in a series which will discuss some MapReduce design patterns implemented with Spark. This discussion was very condensed, for more information on the patterns refer to the MapReduce design patterns book, for more information on Spark Pair RDDs refer to the Learning Spark Key value Pairs chapter.

Related Content

Apache Spark

mapreduce

 

Content Originally posted in MapR Converge Blog post, visit here

November 02, 2015 | BY Carol McDonald

Outcomes