Parallel and Iterative Processing for Machine Learning Recommendations with Spark

Blog Post created by maprcommunity Employee on Mar 1, 2017

Parallel and Iterative Processing for Machine Learning Recommendations with Spark

by Carol McDonald


Recommendation systems help narrow your choices to those that best meet your particular needs, and they are among the most popular applications of big data processing. In this post we are going to discuss building a recommendation model from movie ratings, similar to these posts: An Inside Look at the Components of a Recommendation Engine and Recommender System with Mahout and Elasticsearch, but this time using an iterative algorithm and parallel processing with Apache Spark MLlib.

In this post we’ll cover:

  1. A key difference between Spark and MapReduce, which makes Spark much faster for iterative algorithms.
  2. Collaborative filtering for recommendations with Spark.
  3. Loading and exploring the sample data set with Spark.
  4. Using Spark MLlib’s Alternating Least Squares algorithm to make movie recommendations.
  5. Testing the results of the recommendations.

This post is the third in a series. If you are new to Spark, please read these first:

  1. Getting Started with Spark on MapR Sandbox
  2. Using Apache Spark DataFrames for Processing of Tabular Data
  3. Getting Started with the Spark Web UI

A Key Difference between Spark and MapReduce

Spark is especially useful for parallel processing of distributed data with iterative algorithms. As discussed in The 5-Minute Guide to Understanding the Significance of Apache Spark, Spark tries to keep things in memory, whereas MapReduce involves more reading and writing from disk. As shown in the image below, for each MapReduce Job, data is read from an HDFS file for a mapper, written to and from a SequenceFile in between, and then written to an output file from a reducer. When a chain of multiple jobs is needed, Spark can execute much faster by keeping data in memory. For the record, there are benefits to writing to disk, as disk is more fault tolerant than memory.

RDDs Data Partitions Read from RAM Instead of Disk

Spark’s Resilient Distributed Datasets, RDDs, are a collection of elements partitioned across the nodes of a cluster and can be operated on in parallel. RDDs can be created from HDFS files and can be cached, allowing reuse across parallel operations.

The diagram below shows a Spark application running on an example Hadoop cluster. A task applies its unit of work to the RDD elements in its partition, and outputs a new partition. Because iterative algorithms apply operations repeatedly to data, they benefit from RDDs in-memory, caching across iterations.

Collaborative Filtering with Spark

Collaborative filtering algorithms recommend items (this is the filtering part) based on preference information from many users (this is the collaborative part). The collaborative filtering approach is based on similarity; the basic idea is people who liked similar items in the past will like similar items in the future. In the example below, Ted likes movies A, B, and C. Carol likes movies B and C. Bob likes movie B. To recommend a movie to Bob, we calculate that users who liked B also liked C, so C is a possible recommendation for Bob. Of course, this is a tiny example. In real situations, we would have much more data to work with.

Spark MLlib implements a collaborative filtering algorithm called Alternating Least Squares (ALS).

ALS approximates the sparse user item rating matrix of dimension K as the product of two dense matrices, User and Item factor matrices of size U×K and I×K (see picture below). The factor matrices are also called latent feature models. The factor matrices represent hidden features which the algorithm tries to discover. One matrix tries to describe the latent or hidden features of each user, and one tries to describe latent properties of each movie.

ALS is an iterative algorithm. In each iteration, the algorithm alternatively fixes one factor matrix and solves for the other, and this process continues until it converges. This alternation between which matrix to optimize is where the "alternating" in the name comes from.


Typical Machine Learning Workflow

A typical machine learning workflow is shown below.

In this tutorial we will perform the following steps:

  1. Load the sample data.
  2. Parse the data into the input format for the ALS algorithm.
  3. Split the data into two parts, one for building the model and one for testing the model.
  4. Run the ALS algorithm to build/train a user product matrix model.
  5. Make predictions with the training data and observe the results.
  6. Test the model with the test data.

The Sample set

The table below shows the Rating data fields with some sample data:

The table below shows the Movie data fields with some sample data:

First we will explore the data using Spark Dataframes with questions like:

  • Count the max, min ratings along with the number of users who have rated a movie.
  • Display the title for movies with ratings > 4

Loading Data into Spark Dataframes

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with

$ spark-shell 

First we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.

(In the code boxes, comments are in Green and output is in Blue)

// SQLContext entry point for working with structured dataval sqlContext = new org.apache.spark.sql.SQLContext(sc)// This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Import Spark SQL data types import org.apache.spark.sql._// Import mllib recommendation data types import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

Below we use Scala case classes to define the Movie and User schemas corresponding to the movies.dat, and users.dat files.

// input format MovieID::Title::Genrescase class Movie(movieId: Int, title: String, genres: Seq[String]) // input format is UserID::Gender::Age::Occupation::Zip-codecase class User(userId: Int, gender: String, age: Int, occupation: Int, zip: String)

The functions below parse a line from the movie.dat, user.dat, and rating.dat files into the corresponding Movie and User classes.

// function to parse input into Movie classdef parseMovie(str: String): Movie = {       val fields = str.split("::")       assert(fields.size == 3)       Movie(fields(0).toInt, fields(1))  }// function to parse input into User classdef parseUser(str: String): User = {       val fields = str.split("::")       assert(fields.size == 5)       User(fields(0).toInt, fields(1).toString, fields(2).toInt,fields(3).toInt, fields(4).toString)  }

Below we load the data from the ratings.dat file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions. The first() action returns the first element in the RDD, which is the String “1::1193::5::978300760

// load the data into a  RDDval ratingText = sc.textFile("/user/user01/moviemed/ratings.dat")// MapPartitionsRDD[1] at textFile // Return the first element in this RDDratingText.first()// String = 1::1193::5::978300760

We use the org.apache.spark.mllib.recommendation.Rating class for parsing the ratings.dat file. Later we will use the Rating class as input for the ALS run method.

Then we use the map transformation on ratingText, which will apply the parseRating function to each element in ratingText and return a new RDD of Rating objects. We cache the ratings data, since we will use this data to build the matrix model. Then we get the counts for the number of ratings, movies and users.

// function to parse input UserID::MovieID::Rating//  Into org.apache.spark.mllib.recommendation.Rating classdef parseRating(str: String): Rating= {       val fields = str.split("::")       Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } // create an RDD of Ratings objectsval ratingsRDD = org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD// count number of total ratingsval numRatings = ratingsRDD.count()//numRatings: Long = 1000209// count number of movies ratedval numMovies = Long = 3706// count number of users who rated a movieval numUsers = Long = 6040

Explore and Query the Movie Lens Data with Spark DataFrames

Spark SQL provides a programming abstraction called DataFrames. A Dataframe is a distributed collection of data organized into named columns. Spark supports automatically converting an RDD containing case classes to a DataFrame with the method toDF, and the case class defines the schema of the table.


Below we load the data from the users and movies data files into an RDD, use the map transformation with the parse functions, and then call toDF() which returns a DataFrame for the RDD. Then we register the Dataframes as temp tables so that we can use the tables in SQL statements.

// load the data into DataFramesval usersDF = sc.textFile("/user/user01/moviemed/users.dat").map(parseUser).toDF() val moviesDF = sc.textFile("/user/user01/moviemed/movies.dat").map(parseMovie).toDF() // create a DataFrame from the ratingsRDD val ratingsDF = ratingsRDD.toDF() // register the DataFrames as a temp table ratingsDF.registerTempTable("ratings") moviesDF.registerTempTable("movies") usersDF.registerTempTable("users")

DataFrame printSchema() Prints the schema to the console in a tree format

// Return the schema of this DataFrameusersDF.printSchema()root  |-- userId: integer (nullable = false)  |-- gender: string (nullable = true)  |-- age: integer (nullable = false)  |-- occupation: integer (nullable = false)  |-- zip: string (nullable = true) moviesDF.printSchema()root  |-- movieId: integer (nullable = false)  |-- title: string (nullable = true) ratingsDF.printSchema()root  |-- user: integer (nullable = false)  |-- product: integer (nullable = false)  |-- rating: double (nullable = false) |-- zip: string (nullable = true)

Here are some example queries using Spark SQL with DataFrames on the Movie Lens data. The first query gets the maximum and minimum ratings along with the count of users who have rated a movie.

// Get the max, min ratings along with the count of users who have rated a movie. val results =sqlContext.sql("select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product ) movierates join movies on movierates.product=movies.movieId order by movierates.cntu desc ")  // DataFrame show() displays the top 20 rows in  tabular                maxr minr cntu American Beauty (... 5.0  1.0  3428 Star Wars: Episod... 5.0  1.0  2991 Star Wars: Episod... 5.0  1.0  2990 Star Wars: Episod... 5.0  1.0  2883 Jurassic Park (1993) 5.0  1.0  2672 Saving Private Ry... 5.0  1.0  2653

The query below finds the users who rated the most movies, then finds which movies the most active user rated higher than 4. We will get recommendations for this user later.

// Show the top 10 most-active users and how many times they rated a movieval mostActiveUsersSchemaRDD = sqlContext.sql("SELECT ratings.user, count(*) as ct from ratings group by ratings.user order by ct desc limit 10")  println(mostActiveUsersSchemaRDD.collect().mkString("\n"))[4169,2314] [1680,1850] [4277,1743]. . . // Find the movies that user 4169 rated higher than 4 val results =sqlContext.sql("SELECT ratings.user, ratings.product, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.product where ratings.user=4169 and ratings.rating > 4") user product rating title4169 1231    5.0    Right Stuff, The ... 4169 232     5.0    Eat Drink Man Wom... 4169 3632    5.0    Monsieur Verdoux ... 4169 2434    5.0    Down in the Delta... 4169 1834    5.0    Spanish Prisoner,... …

Using ALS to Build a MatrixFactorizationModel with the Movie Ratings data

Now we will use the MLlib ALS algorithm to learn the latent factors that can be used to predict missing entries in the user-item association matrix. First we separate the ratings data into training data (80%) and test data (20%). We will get recommendations for the training data, then we will evaluate the predictions with the test data. This process of taking a subset of the data to build the model and then verifying the model with the remaining data is known as cross validation, the goal is to estimate how accurately a predictive model will perform in practice. To improve the model this process is often done multiple times with different subsets, we will only do it once.

We run ALS on the input trainingRDD of Rating (user, product, rating) objects with the rank and Iterations parameters:

  • rank is the number of latent factors in the model.
  • iterations is the number of iterations to run.

The ALS run(trainingRDD) method will build and return a MatrixFactorizationModel, which can be used to make product predictions for users.

// Randomly split ratings RDD into training data RDD (80%) and test data RDD (20%)val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), 0L)   val trainingRatingsRDD = splits(0).cache()  val testRatingsRDD = splits(1).cache()   val numTraining = trainingRatingsRDD.count()  val numTest = testRatingsRDD.count()  println(s"Training: $numTraining, test: $numTest.")//Training: 800702, test: 199507.// build a ALS user product matrix model with rank=20, iterations=10val model = (new ALS().setRank(20).setIterations(10).run(trainingRatingsRDD))

Making Predictions with the MatrixFactorizationModel

Now we can use the MatrixFactorizationModel to make predictions. First we will get movie predictions for the most active user, 4169, with the recommendProducts() method , which takes as input the userid and the number of products to recommend. Then we print out the recommended movie titles.


// Get the top 4 movie predictions for user 4169 val topRecsForUser = model.recommendProducts(4169, 5)// get movie titles to show with recommendationsval => (array(0), array(1))).collectAsMap()// print out top recommendations for user 4169 with => (movieTitles(rating.product), rating.rating)).foreach(println)(Other Side of Sunday) (1996),5.481923568209796) (Shall We Dance? (1937),5.435728723311838) (42 Up (1998),5.3596886655841995) (American Dream (1990),5.291663089739282)

Evaluating the Model

Next we will compare predictions from the model with actual ratings in the testRatingsRDD. First we get the user product pairs from the testRatingsRDD to pass to the MatrixFactorizationModel predict(user: Int, product: Int) method , which will return predictions as Rating (user, product, rating) objects .


// get user product pair from testRatingsval testUserProductRDD = {    case Rating(user, product, rating) => (user, product) }// get predicted ratings to compare to test ratingsval predictionsForTestRDD  = model.predict(testUserProductRDD)  predictionsForTestRDD.take(10).mkString("\n")Rating(5874,1084,4.096802264684769) Rating(6002,1084,4.884270180173981)

Now we will compare the test predictions to the actual test ratings. First we put the predictions and the test RDDs in this key, value pair format for joining: ((user, product), rating). Then we print out the (user, product), (test rating, predicted rating) for comparison.

// prepare  predictions for comparisonval predictionsKeyedByUserProductRDD ={    case Rating(user, product, rating) => ((user, product), rating) }// prepare  test for comparisonval testKeyedByUserProductRDD ={    case Rating(user, product, rating) => ((user, product), rating)  } //Join the  test with  predictionsval testAndPredictionsJoinedRDD = testKeyedByUserProductRDD.join(predictionsKeyedByUserProductRDD) // print the (user, product),( test rating, predicted rating)testAndPredictionsJoinedRDD.take(3).mkString("\n")((455,1254),(4.0,4.48399986469759)) ((2119,1101),(4.0,3.83955683816239)) ((1308,1042),(4.0,2.616444598335322))

The example below finds false positives by finding predicted ratings which were >= 4 when the actual test rating was <= 1. There were 557 false positives out of 199,507 test ratings.

val falsePositives =(testAndPredictionsJoinedRDD.filter{   case ((user, product), (ratingT, ratingP)) => (ratingT <= 1 && ratingP >=4)    }) falsePositives.take(2)Array[((Int, Int), (Double, Double))] =  ((3842,2858),(1.0,4.106488210964762)),  ((6031,3194),(1.0,4.790778049100913)) falsePositives.countres23: Long = 557

Next we evaluate the model using Mean Absolute Error (MAE). MAE is the absolute differences between the predicted and actual targets.

//Evaluate the model using Mean Absolute Error (MAE) between test and predictions val meanAbsoluteError = {    case ((user, product), (testRating, predRating)) =>      val err = (testRating - predRating)     Math.abs(err) }.mean() meanAbsoluteError: Double = 0.7244940545944053


This concludes the tutorial on Parallel and Iterative processing for Machine Learning Recommendations with Spark. If you have any further questions, please ask them in the comments section below.

References and More Information:


Content Originally posted in MapR Converge Blog post, visit here 

Subscribe to Converge Blog



Related Content

Apache Spark