Parallel and Iterative Processing for Machine Learning Recommendations with Spark

Contributed by

18 min read

Recommendation systems help narrow your choices to those that best meet your particular needs, and they are among the most popular applications of big data processing. In this post we are going to discuss building a recommendation model from movie ratings, similar to these posts: An Inside Look at the Components of a Recommendation Engine and Recommender System with Mahout and Elasticsearch, but this time using an iterative algorithm and parallel processing with Apache Spark MLlib.

In this post we’ll cover:

  1. A key difference between Spark and MapReduce, which makes Spark much faster for iterative algorithms.
  2. Collaborative filtering for recommendations with Spark.
  3. Loading and exploring the sample data set with Spark.
  4. Using Spark MLlib’s Alternating Least Squares algorithm to make movie recommendations.
  5. Testing the results of the recommendations.

This post is the third in a series. If you are new to Spark, please read these first:

  1. Getting Started with Spark on MapR Sandbox
  2. Using Apache Spark DataFrames for Processing of Tabular Data
  3. Getting Started with the Spark Web UI

A Key Difference between Spark and MapReduce

Spark is especially useful for parallel processing of distributed data with iterative algorithms. As discussed in The 5-Minute Guide to Understanding the Significance of Apache Spark, Spark tries to keep things in memory, whereas MapReduce involves more reading and writing from disk. As shown in the image below, for each MapReduce Job, data is read from an HDFS file for a mapper, written to and from a SequenceFile in between, and then written to an output file from a reducer. When a chain of multiple jobs is needed, Spark can execute much faster by keeping data in memory. For the record, there are benefits to writing to disk, as disk is more fault tolerant than memory.

RDDs Data Partitions Read from RAM Instead of Disk

Spark’s Resilient Distributed Datasets, RDDs, are a collection of elements partitioned across the nodes of a cluster and can be operated on in parallel. RDDs can be created from HDFS files and can be cached, allowing reuse across parallel operations.

The diagram below shows a Spark application running on an example Hadoop cluster. A task applies its unit of work to the RDD elements in its partition, and outputs a new partition. Because iterative algorithms apply operations repeatedly to data, they benefit from RDDs in-memory, caching across iterations.

Collaborative Filtering with Spark

Collaborative filtering algorithms recommend items (this is the filtering part) based on preference information from many users (this is the collaborative part). The collaborative filtering approach is based on similarity; the basic idea is people who liked similar items in the past will like similar items in the future. In the example below, Ted likes movies A, B, and C. Carol likes movies B and C. Bob likes movie B. To recommend a movie to Bob, we calculate that users who liked B also liked C, so C is a possible recommendation for Bob. Of course, this is a tiny example. In real situations, we would have much more data to work with.

Spark MLlib implements a collaborative filtering algorithm called Alternating Least Squares (ALS).

ALS approximates the sparse user item rating matrix of dimension K as the product of two dense matrices, User and Item factor matrices of size U×K and I×K (see picture below). The factor matrices are also called latent feature models. The factor matrices represent hidden features which the algorithm tries to discover. One matrix tries to describe the latent or hidden features of each user, and one tries to describe latent properties of each movie.

ALS is an iterative algorithm. In each iteration, the algorithm alternatively fixes one factor matrix and solves for the other, and this process continues until it converges. This alternation between which matrix to optimize is where the "alternating" in the name comes from.

Software

Typical Machine Learning Workflow

A typical machine learning workflow is shown below.

In this tutorial we will perform the following steps:

  1. Load the sample data.
  2. Parse the data into the input format for the ALS algorithm.
  3. Split the data into two parts, one for building the model and one for testing the model.
  4. Run the ALS algorithm to build/train a user product matrix model.
  5. Make predictions with the training data and observe the results.
  6. Test the model with the test data.

The Sample set

The table below shows the Rating data fields with some sample data:

The table below shows the Movie data fields with some sample data:

First we will explore the data using Spark Dataframes with questions like:

  • Count the max, min ratings along with the number of users who have rated a movie.
  • Display the title for movies with ratings > 4

Loading Data into Spark Dataframes

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with

$ spark-shell

First we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.

(In the code boxes, comments are in Green and output is in Blue)

<font color="#00DB00">// SQLContext entry point for working with structured data</font>
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
<font color="#00DB00">// This is used to implicitly convert an RDD to a DataFrame.</font>
import sqlContext.implicits._
<font color="#00DB00">// Import Spark SQL data types</font>
import org.apache.spark.sql._
<font color="#00DB00">// Import mllib recommendation data types</font>
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

Below we use Scala case classes to define the Movie and User schemas corresponding to the movies.dat, and users.dat files.

<font color="#00DB00">// input format MovieID::Title::Genres</font>
case class Movie(movieId: Int, title: String, genres: Seq[String])

<font color="#00DB00">// input format is UserID::Gender::Age::Occupation::Zip-code</font>
case class User(userId: Int, gender: String, age: Int, occupation: Int, zip: String)

The functions below parse a line from the movie.dat, user.dat, and rating.dat files into the corresponding Movie and User classes.

<font color="#00DB00">// function to parse input into Movie class</font>
def parseMovie(str: String): Movie = {
      val fields = str.split("::")
      assert(fields.size == 3)
      Movie(fields(0).toInt, fields(1))
 }
<font color="#00DB00">// function to parse input into User class</font>
def parseUser(str: String): User = {
      val fields = str.split("::")
      assert(fields.size == 5)
      User(fields(0).toInt, fields(1).toString, fields(2).toInt,fields(3).toInt, fields(4).toString)
 }

Below we load the data from the ratings.dat file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions. The first() action returns the first element in the RDD, which is the String “1::1193::5::978300760

<font color="#00DB00">// load the data into a  RDD</font>
val ratingText = sc.textFile("/user/user01/moviemed/ratings.dat")
<font color="blue">// MapPartitionsRDD[1] at textFile</font>

<font color="#00DB00">// Return the first element in this RDD</font>
ratingText.first()
<font color="blue">// String = 1::1193::5::978300760</font>

We use the org.apache.spark.mllib.recommendation.Rating class for parsing the ratings.dat file. Later we will use the Rating class as input for the ALS run method.

Then we use the map transformation on ratingText, which will apply the parseRating function to each element in ratingText and return a new RDD of Rating objects. We cache the ratings data, since we will use this data to build the matrix model. Then we get the counts for the number of ratings, movies and users.

<font color="#00DB00">// function to parse input UserID::MovieID::Rating</font>
<font color="#00DB00">//  Into org.apache.spark.mllib.recommendation.Rating class</font>
def parseRating(str: String): Rating= {
      val fields = str.split("::")
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}

<font color="#00DB00">// create an RDD of Ratings objects</font>
val ratingsRDD = ratingText.map(parseRating).cache()
<font color="blue">//ratingsRDD: org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD</font>

<font color="#00DB00">// count number of total ratings</font>
val numRatings = ratingsRDD.count()
<font color="blue">//numRatings: Long = 1000209</font>

<font color="#00DB00">// count number of movies rated</font>
val numMovies = ratingsRDD.map(_.product).distinct().count()
<font color="blue">//numMovies: Long = 3706</font>

<font color="#00DB00">// count number of users who rated a movie</font>
val numUsers = ratingsRDD.map(_.user).distinct().count()
<font color="blue">//numUsers: Long = 6040</font>

Explore and Query the Movie Lens Data with Spark DataFrames

Spark SQL provides a programming abstraction called DataFrames. A Dataframe is a distributed collection of data organized into named columns. Spark supports automatically converting an RDD containing case classes to a DataFrame with the method toDF, and the case class defines the schema of the table.

Below we load the data from the users and movies data files into an RDD, use the map transformation with the parse functions, and then call toDF() which returns a DataFrame for the RDD. Then we register the Dataframes as temp tables so that we can use the tables in SQL statements.

<font color="#00DB00">// load the data into DataFrames</font>
val usersDF = sc.textFile("/user/user01/moviemed/users.dat").map(parseUser).toDF()
val moviesDF = sc.textFile("/user/user01/moviemed/movies.dat").map(parseMovie).toDF()

<font color="#00DB00">// create a DataFrame from the ratingsRDD</font>
 val ratingsDF = ratingsRDD.toDF()

<font color="#00DB00">// register the DataFrames as a temp table</font>
ratingsDF.registerTempTable("ratings")
moviesDF.registerTempTable("movies")
usersDF.registerTempTable("users")

DataFrame printSchema() Prints the schema to the console in a tree format


<font color="#00DB00">// Return the schema of this DataFrame</font>
usersDF.printSchema()
<font color="blue">root
 |-- userId: integer (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- occupation: integer (nullable = false)
 |-- zip: string (nullable = true)</font>

moviesDF.printSchema()
<font color="blue">root
 |-- movieId: integer (nullable = false)
 |-- title: string (nullable = true)</font>

ratingsDF.printSchema()
<font color="blue">root
 |-- user: integer (nullable = false)
 |-- product: integer (nullable = false)
 |-- rating: double (nullable = false) |-- zip: string (nullable = true)</font>

Here are some example queries using Spark SQL with DataFrames on the Movie Lens data. The first query gets the maximum and minimum ratings along with the count of users who have rated a movie.

<font color="#00DB00">// Get the max, min ratings along with the count of users who have rated a movie.</font>
val results =sqlContext.sql("select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product ) movierates join movies on movierates.product=movies.movieId order by movierates.cntu desc ")

<font color="#00DB00">// DataFrame show() displays the top 20 rows in  tabular form</font>
results.show()
<font color="blue">title                maxr minr cntu
American Beauty (... 5.0  1.0  3428
Star Wars: Episod... 5.0  1.0  2991
Star Wars: Episod... 5.0  1.0  2990
Star Wars: Episod... 5.0  1.0  2883
Jurassic Park (1993) 5.0  1.0  2672
Saving Private Ry... 5.0  1.0  2653</font>

The query below finds the users who rated the most movies, then finds which movies the most active user rated higher than 4. We will get recommendations for this user later.

<font color="#00DB00">// Show the top 10 most-active users and how many times they rated a movie</font>
val mostActiveUsersSchemaRDD = sqlContext.sql("SELECT ratings.user, count(*) as ct from ratings group by ratings.user order by ct desc limit 10")

println(mostActiveUsersSchemaRDD.collect().mkString("\n"))
<font color="blue">[4169,2314]
[1680,1850]
[4277,1743]</font>
. . .
<font color="#00DB00">// Find the movies that user 4169 rated higher than 4</font>
val results =sqlContext.sql("SELECT ratings.user, ratings.product, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.product where ratings.user=4169 and ratings.rating > 4")

results.show
user product rating title
<font color="blue">4169 1231    5.0    Right Stuff, The ...
4169 232     5.0    Eat Drink Man Wom...
4169 3632    5.0    Monsieur Verdoux ...
4169 2434    5.0    Down in the Delta...
4169 1834    5.0    Spanish Prisoner,... …</font>

Using ALS to Build a MatrixFactorizationModel with the Movie Ratings data

Now we will use the MLlib ALS algorithm to learn the latent factors that can be used to predict missing entries in the user-item association matrix. First we separate the ratings data into training data (80%) and test data (20%). We will get recommendations for the training data, then we will evaluate the predictions with the test data. This process of taking a subset of the data to build the model and then verifying the model with the remaining data is known as cross validation, the goal is to estimate how accurately a predictive model will perform in practice. To improve the model this process is often done multiple times with different subsets, we will only do it once.

We run ALS on the input trainingRDD of Rating (user, product, rating) objects with the rank and Iterations parameters:

  • rank is the number of latent factors in the model.
  • iterations is the number of iterations to run.

The ALS run(trainingRDD) method will build and return a MatrixFactorizationModel, which can be used to make product predictions for users.

<font color="#00DB00">// Randomly split ratings RDD into training data RDD (80%) and test data RDD (20%)</font>
val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), 0L)

 val trainingRatingsRDD = splits(0).cache()
 val testRatingsRDD = splits(1).cache()

 val numTraining = trainingRatingsRDD.count()
 val numTest = testRatingsRDD.count()
 println(s"Training: $numTraining, test: $numTest.")
<font color="blue">//Training: 800702, test: 199507.</font>

<font color="#00DB00">// build a ALS user product matrix model with rank=20, iterations=10</font>
val model = (new ALS().setRank(20).setIterations(10).run(trainingRatingsRDD))

Making Predictions with the MatrixFactorizationModel

Now we can use the MatrixFactorizationModel to make predictions. First we will get movie predictions for the most active user, 4169, with the recommendProducts() method , which takes as input the userid and the number of products to recommend. Then we print out the recommended movie titles.

<font color="#00DB00">// Get the top 4 movie predictions for user 4169</font>
val topRecsForUser = model.recommendProducts(4169, 5)
<font color="#00DB00">// get movie titles to show with recommendations</font>
val movieTitles=moviesDF.map(array => (array(0), array(1))).collectAsMap()
<font color="#00DB00">// print out top recommendations for user 4169 with titles</font>
topRecsForUser.map(rating => (movieTitles(rating.product), rating.rating)).foreach(println)
<font color="blue">(Other Side of Sunday) (1996),5.481923568209796)
(Shall We Dance? (1937),5.435728723311838)
(42 Up (1998),5.3596886655841995)
(American Dream (1990),5.291663089739282)</font>

Evaluating the Model

Next we will compare predictions from the model with actual ratings in the testRatingsRDD. First we get the user product pairs from the testRatingsRDD to pass to the MatrixFactorizationModel predict(user: Int, product: Int) method , which will return predictions as Rating (user, product, rating) objects .


<font color="#00DB00">// get user product pair from testRatings</font>
val testUserProductRDD = testRatingsRDD.map {
  case Rating(user, product, rating) => (user, product)
}
<font color="#00DB00">// get predicted ratings to compare to test ratings</font>
val predictionsForTestRDD  = model.predict(testUserProductRDD)

predictionsForTestRDD.take(10).mkString("\n")
<font color="blue">Rating(5874,1084,4.096802264684769)
Rating(6002,1084,4.884270180173981)</font>

Now we will compare the test predictions to the actual test ratings. First we put the predictions and the test RDDs in this key, value pair format for joining: ((user, product), rating). Then we print out the (user, product), (test rating, predicted rating) for comparison.

<font color="#00DB00">// prepare  predictions for comparison</font>
val predictionsKeyedByUserProductRDD = predictionsForTestRDD.map{
  case Rating(user, product, rating) => ((user, product), rating)
}
<font color="#00DB00">// prepare  test for comparison</font>
val testKeyedByUserProductRDD = testRatingsRDD.map{
  case Rating(user, product, rating) => ((user, product), rating)
}

<font color="#00DB00">//Join the  test with  predictions</font>
val testAndPredictionsJoinedRDD = testKeyedByUserProductRDD.join(predictionsKeyedByUserProductRDD)

<font color="#00DB00">// print the (user, product),( test rating, predicted rating)</font>
testAndPredictionsJoinedRDD.take(3).mkString("\n")
<font color="blue">((455,1254),(4.0,4.48399986469759))
((2119,1101),(4.0,3.83955683816239))
((1308,1042),(4.0,2.616444598335322))</font>

The example below finds false positives by finding predicted ratings which were >= 4 when the actual test rating was <= 1. There were 557 false positives out of 199,507 test ratings.

val falsePositives =(testAndPredictionsJoinedRDD.filter{
  case ((user, product), (ratingT, ratingP)) => (ratingT <= 1 && ratingP >=4)
  })
falsePositives.take(2)
<font color="blue">Array[((Int, Int), (Double, Double))] =
((3842,2858),(1.0,4.106488210964762)),
((6031,3194),(1.0,4.790778049100913))</font>

falsePositives.count
<font color="blue">res23: Long = 557</font>

Next we evaluate the model using Mean Absolute Error (MAE). MAE is the absolute differences between the predicted and actual targets.

<font color="#00DB00">//Evaluate the model using Mean Absolute Error (MAE) between test and predictions</font>
val meanAbsoluteError = testAndPredictionsJoinedRDD.map {
  case ((user, product), (testRating, predRating)) =>
    val err = (testRating - predRating)
    Math.abs(err)
}.mean()
<font color="blue">meanAbsoluteError: Double = 0.7244940545944053</font>

Summary

This concludes the tutorial on Parallel and Iterative processing for Machine Learning Recommendations with Spark. If you have any further questions, please ask them in the comments section below.

References and More Information:


This blog post was published August 03, 2015.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now