Transformers: RDDs in Disguise

An Introduction to PySpark

Tiffany Jaya
Gab41

--

This blog post is intended as a practical introduction for anyone who has data and needs to reshape that data into another format for analysis. To do this, I will use Spark and its Resilient Distributed Dataset (RDD). Data that are encapsulated in RDD can be transformed from one form to another. I will focus on how RDD transformations can help you group, filter, and sort your data. Although we will take examples from Hermes (Lab 41’s exploration of recommender systems) to learn how these transformations are applied, you do not need to have an extensive knowledge of the project to follow along. Feel free to check out Hermes’s GitHub and its accompanying blog posts, starting with Anna’s post to get an overview of the project and of recommenders in general. You might also want to check out Kyle’s blog post on how to use Docker to build an IPython-driven Spark deployment, if you’re interested in setting up a Spark environment but don’t know how.

What is RDD?

RDDs are Spark’s main abstraction component. Think of it as a handler to your partitioned data after some computation. It addresses the data reuse problem found in existing distributed computing systems, i.e. Hadoop’s MapReduce. Instead of storing intermediate data after a computation into on-disk storage such as HDFS, these data can be run under multiple computations without additional creation of data unless a “reduce” (in Hadoop’s MapReduce term) or a “shuffle” (in Spark term) operation is performed. The ability to not create intermediate data during computation is one of the reasons why Spark is able to execute faster than Hadoop’s MapReduce.

Of course, there are other reasons why Spark is faster than Hadoop’s MapReduce. I will not cover them in this blog post — as that can be another blog post in itself — but this is the reason why I decided to use Spark in this tutorial.

Counting the number of distinct items

Let’s take data that has the following format: [(user_id, movie_id, rating)],where user_id refers to a particular user, movie_id refers to a particular movie, and rating is the rating that the user assigns to the movie. We want to determine what is the total number of users in this dataset. Since one user can rate multiple movies at any given time, we cannot just count the number of records to find the number of users. Instead, we use the distinct function to determine the correct number of users. Here’s how you do it in Python (which I will use throughout this article):

There are only 4 users: 5, 9, 25, and 78.

Grouping

If we are working with the same dataset as above and want to group all the ratings a movie has received — that is, transform data in the original format [(user_id, movie_id, rating)] into pairs of [(movie_id, [(user_id, rating)])] — we can use groupByKey and set movie_id as the key (or label) and user_id and rating as the values (or features). But Databricks, creator of Spark, suggests not to use groupByKey in general because it is computationally expensive. One way to overcome this is to use the transformation function reduceByKey.

Here’s why: the dataset that you provide to Spark is typically partitioned, so that when you assign a task, the task can be run in parallel across the dataset. Tasking reduceByKey on your dataset means that Spark performs the reduce operation at each partition before shuffling. Shuffling is Spark’s way of redistributing the data so that it is grouped differently across partitions. The shuffle step involves data serialization and I/O calls, which are expensive. By contrast, when you task groupByKey on your dataset, Spark shuffles each element in your dataset before executing the group operation. Although reduceByKey will not solve every grouping problem that you encounter, it is usually better to use reduceByKey, foldByKey, or combinebyKey than groupByKey. For this example, we will use both groupByKey and reduceByKey and compare their runtimes on our cluster.

groupByKey returns a ResultIterable as its value. To convert this to a Python list, we have to map the values to a list:

The movie with id 25 has 5 ratings. User 320 rates it a 3. User 333 rates it a 4 and etc.

This groupByKey operation ran on 1M elements on our cluster and returned in 10.76 seconds.

Now let’s see if reduceByKey really is faster. We have to wrap each value — that is, each (user_id, rating) tuple — in a list because lists are mutable. We then have to explicitly specify an append operation to concatenate each movie’s user-rating pairs.

The time it takes to execute this reduceByKey operation with 1M elements on our cluster is 5.50054502487 seconds.

As you can see, it is much faster to compute using reduceByKey than groupByKey as our example of reduceByKey takes approximately 5 seconds while groupByKey takes approximately 10 seconds.

The main limitation of reduceByKey is that the input and output value have to be the same type. If the input and output value are of different types, try combineByKey. We will first compare combineByKey with groupByKey and reduceByKey using the above example case. We will then go into detail on how combineByKey works when the output type is different from the input type.

The time it takes to execute this combineByKey operation with 1M elements is 9.91050696373 seconds.

Let’s go through each argument to combineByKey:

  1. The first argument, createCombiner, is lambda first_user_rating_pair: first_user_rating_pair. This callable handles the very first value encountered by combineByKey. In this case, since the first value is in the format of [(user_id, rating)], it is in the format we already desired. We return it unchanged. Remember, as with any lambda callable, you can substitute this with a defined function.
  2. The second argument, mergeValue, is specified as lambda combined_values, new_user_rating_pair: combined_values + new_user_rating_pair. The mergeValue callable tells combineByKey what to do when it encounters a new value other than the first value. In this case, we want to append the new value to the growing list of ratings tuples. If we encountered a second value in the format of [(user_id_2, rating_2)], we will append this to the first value we have already handled, so it will output [(user_id_1, rating_1), (user_id_2, rating_2)].
  3. The third argument, mergeCombiners, is lambda combined_values_1, combined_values_2: combined_values_1 + combined_values_2. This handles what combineByKey will do when it needs to merge two combiners, for instance after running two combiners on different partitions of the data and then aggregating their results. In this case, the function just appends the two lists together, similar to the mergeValue argument.

combineByKey is essentially what is implemented underneath reduceByKey, but does not constrain the input and output to be of the same type. Let’s take combineByKey and transform the input type into a differing output type which reduceByKey cannot do.

Let’s say we want to compute the average rating of each user by transforming [(user_id, movie_id, rating)] into [(user_id, mean_rating)]. Since calculating the average rating of a user requires only user_id and rating, we can transform the RDD from [(user_id, movie_id, rating)] into [(user_id, rating)] first.

Below we call combineByKey and convert the value rating from type float to type tuple. This tuple consists of the sum of all ratings a user has rated and the number of ratings a user has rated. These two values will be used to calculate the mean rating of a user because mean = sum/num.

User with id 1536 gives an average rating of 4.2. User with id 2304 gives an average rating of 3.7 and etc.

It is much faster to use combineByKey compared to groupByKey as this example takes less than 10 seconds to run on 1 million records. Unlike reduceByKey, combineByKey can have a different output type than its input. In this case, the input rating is of type float and the output (sum_rating, num_rating) is of type tuple.

Filtering

If you are keeping up to this point, great job! Let’s take a step further by extracting all the movies users enjoy. We will do so by retrieving all the movies which a user gave above his/her personal rating average.

All of the examples taken in this blog post are computations we have used in Hermes using the MovieLens dataset. In a recommendation systems, ratings should be tailored to the individual’s habit and preferences, and should probably take into account what their personal average rating is. If we wanted to get a list of movies that each user likes, one approach would be to pull out all the movies that users gave above-average rating to, adjusting that average to their personal preference.

Since we already have mean ratings for each user, let’s filter out all ratings that are less than the user’s mean rating so that we are left with a list of movies with positive weight for each user.

user_movieRating is in the format of [(user_id, (movie_id, rating))]. user_meanRating is in the format of [(user_id, user_mean_rating)]. When you join these two RDDs together, you will receive an output in the format of [(user_id, ((movie_id, rating), user_mean_rating))] because the transformation function join appends the two values together in a tuple when they have matching key; in this case, the matching key is user_id.

Sorting

Normally RDDs are unordered unless you explicitly apply transformations like sortBy, sortByKey, or takeOrdered:

  • sortBy: sort according to the function that is passed in
  • sortByKey: sort by a key
  • takeOrdered: sort according to the function that is passed in and take the first N elements

As an example, let’s say we want to get the 10 users with the highest mean rating. We also want to make sure that the user_id is sorted. To execute this computation, you can do the following:

What is this code doing?

  1. First, it sorts in descending order by the user’s mean_rating. To sort in descending order, you have to take the negative of the mean_rating.
  2. Second, it sorts by user_id in ascending order. If there are two users with the same mean rating, the user with the lower value user_id will be sorted ahead of the user with the higher value user_id. (We don’t actually see any such ties in the example above).
  3. Last, it gives a Python list of the first 10 elements in the RDD.

Grouping and Sorting

Yes, we are nearing completion! By this point, you have a general understanding of how to use transformation functions with RDDs. It is time to take everything that you have learned and roll it into one. As Optimus Prime from Transformers said, “Autobots, roll out!”

In this example, which is an actual use case that we encounter in Hermes, we want to extract the top 20 users who have the most similarity with a given user. The input RDD is in the format [((user_id_a, user_id_b), (pearson_correlation, num_shared_ratings))]. pearson_correlation is the measure of similarity (linear correlation) between two users. num_shared_ratings is the number of ratings user_id_a and user_id_b shared.

Ritesh Agrawal provided a clean solution to this problem in his blog post “PySpark: Top N Records in Each Group”. If you have not checked his blog out, please do so. He writes new ways on how to approach a problem in Spark.

The first thing that we want to do is to specify the key. Assume that we have a RDD named input with the following format: [((user_id_a, user_id_b), (pearson_correlation, num_shared_ratings))]. Since we wanted to know the top 20 users who have the most similarity with a given user, we can select the given user as user_id_a and user_id_b as other users who have similarity (or little similarity) with user_id_a. Because of this, we can specify user_id_a as the key.

Next, we want to group the top 20 user_id_b for each user_id_a and have the Pearson correlation between the two users sorted in descending order. We can use combineByKey function to do this by appending all user_id_b for each user_id_a in a list and sorting this list in the merge_value and merge_combiner functions to extract the top 20. We can employ heapq library’s nsmallest and nlargest functions to extract the smallest or the largest values the list contains respectively.

If you think about it, this is the same thing as takeOrdered but by key. Agrawal who proposed this solution aptly named it takeOrderedByKey.

You can then add this function to the RDD class so that you can call takeOrderedByKey like you would with any transformation function like takeOrdered.

We can call takeOrderedByKey to determine the top 20 users who have the most similarity with a given user by Pearson Correlation, sorted with the highest value first and the lowest value last.

If desired, we can then remove the key, user_id_a, since it is only needed during the computation.

RDD Transformation is not RDD Action

We must remember that RDD transformation is not the same as RDD action. Just like how RDD is created in two ways — either by referencing a dataset stored in external stable storage or by transforming an existing RDD — computing a RDD is also done in two ways: action or transformation. Action produces a value back to the driver program (the program that runs on the main node where you can see the output). Transformation, on the other hand, produces a new RDD. Its computation will not be executed until you call an action on that RDD. Remember that both RDD transformations and actions can only be called within the driver program and not within other RDD transformations and actions.

Remember the distinct example we looked at earlier? The distinct function is a transformation function, and the count function is an action function. Transformation functions do not run until an action function is called. In other words, Spark is a lazy in a sense that transformation function like distinct creates the transformed RDD and defers the actual computation until count is called. “This design enables Spark to run more efficiently — for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.” It is another reasons why Spark can run faster than Hadoop’s MapReduce.

Conclusion

I hope you now have a basic understanding of RDD transformation functions. Please feel free to check out other transformation functions not covered in this blog post, such as aggregate, cogroup, foldByKey, and foreach. You can find a list of these transformation functions along with examples at the Spark’s docs.

Also, if you are interested in getting hands-on with the recommender system project we are carrying out, please feel free to either check out our GitHub page or read my colleagues’ blog posts:

Originally published at www.lab41.org on March 13, 2016.

--

--

Writer for

Passionate in communicating big data insights effectively to non-tech users. Data Vis Aficionado. UX Design Practitioner.