diskodev

DESIGN & DEVELOPMENT AGENCY

Parallel Processing Using the Map Reduce Programming Model

Let’s say we want to find the highest rated movie each year. As input to this program, we can download movie data as a text file from IMDB’s alternative interface. We need to work a bit on it before we use it. For now, let us focus on English movies alone, so we shall remove all foreign films.  And also let us get rid of unwanted columns and do other formatting tasks. Finally, we prepare a tab limited file of the format [<Total Votes> <Rating> <Movie Name> <Year>]. (If you want to use the worked file for your own mashup, you can get it from here)

We can then write a perl script (given below) to read the above file and give us the highest rated movie each year as the output.

The script maintains a hash for the movie, rating and votes with the year as the key. We iterate per line to find the highest rated movie for that particular year. We only consider movies whose total votes is greater than 30,000. This removes movies whose ratings are ‘fixed’. And in case of a tie in the rating, we break it using the total votes as this pushes the movies which are more popular and well known. The output should be of the format [<Year> <Movie Name> <Rating>].

We can then run the program by issuing the following command (The command is same under Windows):

         raj@ubuntu:~$perl ratings.pl ratings_worked.list

This should print out the results to the terminal. If we need the output in the file, we could redirect using the ‘>’ operator. The sample output for the last five lines is given below.

         2007   No Country for Old Men    8.2
         2008  The Dark Knight      8.9
         2009   Inglourious Basterds          8.4
         2010   Inception       8.9
         2011    X-Men: First Class  8

Next, Let us assume we have a much larger dataset with consists of all language movies and their details. And after running the new file on the same program, we find that it takes a lot of time. We want it to run faster. There are two ways to go about this. One, we hack the hell out of perl. Although this sounds promising, this might not be the best path. Or, we can try to run parts of the program in parallel thereby speeding up the execution. But, going this route, we might open up a Pandora’s Box of issues that accompany parallel processing.

How many processes are we going to run? How do we co-ordinate the processes? How are we going combine the output of all the process? What if processes fail? How are we going to control access to a shared resource? What if the dataset outgrows our single machine? These are some of the questions that need answering before we start writing parallel programs. Although we can get the program to run faster, it is often messy in real life.

What if there is programming model that abstracts away these details and let us concentrate only on writing our business logic? And that the model automatically takes care of all the finer details of parallelism, freeing us from thinking about its intrinsic details. It seems there is that model available.

Enter: MapReduce.

MapReduce is a programming model for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Although this might sound constraining, lots of real world problems can be expressed this way.

Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines (We will be running our program on a single node). The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.

MapReduce consists of two phases – the map phase and the reduce phase. Each phase take a <key, value> as input and gives a <key, value> as output. As a programmer, you need to implement the map and the reduce interface in your program. Then you create a new job for your program and pass on this job to the framework. The framework runs the job and returns you the results.

MapReduce programming model is very useful since it can run on top of the Hadoop framework (I will not be going into Hadoop’s details in this post). So you can store huge amount of data on Hadoop’s distributed file system and then run MapReduce programs to process these huge quantities of data. And Hadoop is designed to run on a variety of machines – from your commodity cluster to Amazon EC2 instances. Hadoop & MapReduce have combined to solve problems in various domains like CRM, Machine Learning, Social Analysis etc. For more problems solved using Hadoop/MapReduce, check here. (Although Hadoop/MapReduce is powerful, as they say in investing, please read the fine print as to what it can and not do).

If you want to install & configure Hadoop on a single machine, check out this post. If you want a VM with Hadoop installed and configured, Cloudera’s VM should be alright. Both use Ubuntu as their OS, but if you want to install Hadoop on Windows, read the instructions on this page.

Now coming back to the problem, we need to write our map and reduce interface. The input to our map interface is the file that contains the movie details. We read the file line by line as a text value. The key is the line number (which we will never be using in our analysis) and the value is the line of text. The part of code which checks if the movie has registered more than 30,000 votes can be put into the mapping function. This greatly reduces the data going into the reduce function.

To see how the mapper and reducer interface work, consider the following lines as the input (format - [<Votes> <Rating> <Movie Name> <Year>]) to the MapReduce program:

         193718            8          Casino Royale           2006
         313960           8.3      Batman Begins        2005
         90848            7.7       Munich          2005
         270071           8.2      V for Vendetta          2006
         168557            8          Shutter Island          2010
         313004           8.5       The Departed            2006

So the input to the map interface would be

         1          193718            8          Casino Royale           2006
         2          313960           8.3      Batman Begins        2005
         3          90848            7.7       Munich          2005
         4          270071           8.2      V for Vendetta          2006
         5          168557            8          Shutter Island          2010
         6          313004           8.5       The Departed            2006

The keys to the map interface are the line numbers. And the value is the line of text.  The output of the map function is of the form [<Year>, <Votes> <Rating> <Movie Name>] (All movies are included since they have votes greater than 30,000).

         [2006,            193718            8          Casino Royale]
         [2005,            313960           8.3      Batman Begins]
         [2005,            90848            7.7       Munich]
         [2006,            270071           8.2      V for Vendetta]
         [2010,             168557            8          Shutter Island]
         [2006,            313004           8.5       The Departed]

The output of the map interface is processed midway by the framework to order and group the <key, value>. The ordered and grouped <key, value> is then sent to the reducer interface. The input that is passed to the reduce interface is:

         [2005,            ([313960        8.3      Batman Begins], [90848   7.7       Munich])]
         [2006,            ([193718         8          Casino Royale], [270071     8.2      V for Vendetta], [313004   8.5            The Departed])]
         [2010,             ([168557         8          Shutter Island])]

When the reduce interface gets the above input, it just needs to iterate over the movies for each year and find the highest rated movie for that year. The final output of the MapReduce program is the output from the reduce interface. The output for the above example would be:

         2005   Batman Begins        8.3
         2006   The Departed            8.5
         2010   Shutter Island          8

The java code (other languages are not covered in this post) which implements the mapper and reducer interface of MapReduce are given below.

The main class (imdbrating) is the class that implements the mapper and reducer interface of Hadoop. Inside the main class, you have the static imdbmapper which extends the Mapper class of Hadoop. The inputs to the mapper (<LongWritable, Text>) and the output of the mapper (<Text, Text>) are passed as parameters to the Mapper class. Types like LongWritable and Text are Hadoop specific types and they map to a Java specific type. LongWritable maps to a long in Java and Text maps to a String. Hadoop types are optimized for network serialization. Details about various Hadoop types are given here. The imdbmapper needs to implement the map interface of the Mapper class. The parameters to the interface is the key and value along with a Context object. The Context object collects the output of the interface and passes it on to the next stage in the framework. Here, We do not do anything in the mapper interface except filtering out the movies which has votes less than 30,000. And the output finally got from the map interface is a <Text, Text>.

We follow the same process for implementing the reducer. We create a static class imdbreducer which extends the Reducer class of Hadoop. The inputs from the previous map interface (<Text, Text>) and the output of the reducer (<Text, Text>) is passed as parameters to the Reducer class. The imdbreducer needs to implement the reduce interface of the Reducer class. The parameters to the interface is again key and value along with the Context object. The Context object collects the output of the interfaces and stores it in a file as the final output. The whole business logic is contained in the reducer interface. We basically iterate through the input got at this stage, break it up by tabs and then compare against the maximum for that year. In the case of a tie, it is again broken by comparing the total votes.

In the main function, we first check if the right number of arguments are passed to the program. Then we create a new MapReduce job for this task. We then set the entry point of the program in the next line using the setJarByClass() method. We must next set up the path to the input and the output. Since the path in the file system and the distributed file system of Hadoop is different, we need to construct a new Path specific to Hadoop. One thing to note is that the input path should be a path and the output path should be a directory. And the directory should not be present already. Otherwise Hadoop will spilt out an error, when running the job, that the directory is already present. We set the mapper and reducer class using setMapperClass() and setReducerClass() respectively. We then specify the output key class and output value class. Since this is same for both the mapper and the reducer, we can set them using setOutputKeyClass() and setOutputValueClass(). If they are not the same, map output types can be set by using setMapOutputKeyClass() and setMapOutputValueClass(). Finally we wait for the MapReduce job to complete and then exit.

Before running the MapReduce job on Hadoop, we need to prepare the environment for execution. First we need to make a jar out of our class files. This can be done by issuing the following command (Assuming the class files are in the same directory) :

        raj@ubuntu:~$jar cvf imdbrating.jar *.class

we now need to start Hadoop’s services. We shall start all the services for now. We can issue the following command to do so:

        raj@ubuntu:/usr/local/hadoop$ bin/start-all.sh

You can check if all the services are running by issuing the command:

        raj@ubuntu:/usr/local/hadoop$ jps

If some of the services are not running, check the log files for errors. They should be present under $HADOOP_DIR/logs, where $HADOOP_DIR is the base directory of the Hadoop installation. Next we need to copy the input dataset (txt file) to Hadoop’s directory. This can be done by issuing the following command:

        raj@ubuntu:~$hadoop dfs –copyFromLocal ratings_worked.list /user/HADOOP_USER_NAME/

Above, /user/HADOOP_USER_NAME is the root directory for that user. And Hadoop supports some basic UNIX commands along with its own set. –copyFromaLocal copies a local file to the given Hadoop file system. After finishing the above tasks, we can run the MapReduce job on Hadoop. This can be done by:

        raj@ubuntu:~/Projects/Java/Hadoop$ hadoop jar imdbrating.jar imdbrating /user/ HADOOP_USER_NAME/ratings_worked.list /user/ HADOOP_USER_NAME /imdbrating-output

imdbrating.jar is the jar that we have created above and imdbrating is the entry point into the job. The input is the dataset – ratings.txt, given with the right path. The output file is the directory that does not already exist on Hadoop. You should be getting a sample output similar to the one shown below:

         11/08/05 14:23:04 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
         11/08/05 14:23:04 INFO input.FileInputFormat: Total input paths to process : 1
         11/08/05 14:23:04 INFO mapred.JobClient: Running job: job_201108051418_0001
         11/08/05 14:23:05 INFO mapred.JobClient:  map 0% reduce 0%
         11/08/05 14:23:20 INFO mapred.JobClient:  map 100% reduce 0%
         11/08/05 14:23:35 INFO mapred.JobClient:  map 100% reduce 100%
         11/08/05 14:23:40 INFO mapred.JobClient: Job complete: job_201108051418_0001
         11/08/05 14:23:40 INFO mapred.JobClient: Counters: 25
         11/08/05 14:23:40 INFO mapred.JobClient:   Job Counters
         11/08/05 14:23:40 INFO mapred.JobClient:     Launched reduce tasks=1
         11/08/05 14:23:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=11406
         11/08/05 14:23:40 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
         11/08/05 14:23:40 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
         11/08/05 14:23:40 INFO mapred.JobClient:     Launched map tasks=1
         11/08/05 14:23:40 INFO mapred.JobClient:     Data-local map tasks=1
         11/08/05 14:23:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=12695
         11/08/05 14:23:40 INFO mapred.JobClient:   File Output Format Counters
         11/08/05 14:23:40 INFO mapred.JobClient:     Bytes Written=2270
         11/08/05 14:23:40 INFO mapred.JobClient:   FileSystemCounters
         11/08/05 14:23:40 INFO mapred.JobClient:     FILE_BYTES_READ=48090
         11/08/05 14:23:40 INFO mapred.JobClient:     HDFS_BYTES_READ=7791294
         11/08/05 14:23:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=137855
         11/08/05 14:23:40 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2270
         11/08/05 14:23:40 INFO mapred.JobClient:     Bytes Read=7791175
         11/08/05 14:23:40 INFO mapred.JobClient:   Map-Reduce Framework
         11/08/05 14:23:40 INFO mapred.JobClient:     Reduce input groups=76
         11/08/05 14:23:40 INFO mapred.JobClient:     Map output materialized bytes=48090
         11/08/05 14:23:40 INFO mapred.JobClient:     Combine output records=0
         11/08/05 14:23:40 INFO mapred.JobClient:     Map input records=233997
         11/08/05 14:23:40 INFO mapred.JobClient:     Reduce shuffle bytes=48090
         11/08/05 14:23:40 INFO mapred.JobClient:     Reduce output records=76
         11/08/05 14:23:40 INFO mapred.JobClient:     Spilled Records=2806
         11/08/05 14:23:40 INFO mapred.JobClient:     Map output bytes=45278
         11/08/05 14:23:40 INFO mapred.JobClient:     Combine input records=0
         11/08/05 14:23:40 INFO mapred.JobClient:     Map output records=1403
         11/08/05 14:23:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=119

         11/08/05 14:23:40 INFO mapred.JobClient:     Reduce input records=140

The above output means, the MapReduce job has run successfully without any problems. If the job is not successful, you can check the terminal and logs for the exact nature of your error. Or you can check the web interface of the MapReduce job tracker - http://localhost:50030/ (localhost and port 50030 are the defaults) for any errors and its descriptions. The output of the MapReduce job should be present in /user/raj/imdbrating-output/part-r-00000. Issuing the following command should give us the following results as above perl output (getting the last five entries) :

         raj@ubuntu:~$hadoop dfs –cat  /user/HADOOP_USER_NAME/imdbrating-output/part-r-00000 | tail

The MapReduce job should return the output in the same format as the above perl script ([<Year> <Movie Name> <Rating>]). You can transfer the output file to your local file system by running the following command:

         raj@ubuntu:~$hadoop dfs –copyToLocal  /user/HADOOP_USER_NAME/imdbrating-output/part-r-00000  Ouput.list

That is it. If you have any issues in running the program, leave a comment or email/tweet me. And sorry for the long post.

If you read this far, you should follow me on twitter.

Also feel free to follow my projects on github. You can get the above code from the same place.