Hadoop Quick Notes :: Part - 4
Bhaskar S | 12/30/2013 |
Introduction
In Part-3, we explored the Hadoop MapReduce framework by executing the wordcount example (that is bundled with the Hadoop 1.x distribution in the hadoop-examples-1.2.1.jar file).
Also, we took a peek at the source code of wordcount to get an understanding.
Now we will continue from where we left off and implement our own Hadoop MapReduce program.
Hands-on with Hadoop MapReduce 1.x - Part 2
Let us implement our own version of the word count program called the EnhancedWordCount. Our implementation will be a clearner version (we will strip out punctuation characters and convert all words to a lowercase) and will count only words that are 5 letters or more.
The first step is to implement our custom org.apache.hadoop.mapreduce.Mapper.
The following is the Java class called com.polarsparc.hadoop.hmr.EnhancedWordCountMapper that implements our Mapper:
As can be inferred from the Listing-1 above, we tokenize using the common punctuation characters, check for the word to be at least 5 letters long, convert the identified word to a lowercase and write it out with a count of 1.
The second step is to implement our custom org.apache.hadoop.mapreduce.Reducer.
The following is the Java class called com.polarsparc.hadoop.hmr.EnhancedWordCountReducer that implements our Reducer:
As can be inferred from the Listing-2 above, we sum up all the 1's from the input value list and output the word and its corresponding count.
The final step is to implement the main Hadoop MapReduce job.
The following is the main Java class called com.polarsparc.hadoop.hmr.EnhancedWordCount:
As can be inferred from the Listing-3 above, in the main() function, we create a Job object and specify the Mapper class (EnhancedWordCountMapper) and Reducer class (EnhancedWordCountReducer). Next, we specify the input and output paths. And finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish.
Hooray !!! we have implemented our first Hadoop MapReduce program.
Wait a minute. How do we know what we have implemented will work ???
How does one unit test this ???
Despair Not. Enter MRUnit.
MRUnit is a unit testing framework that is based on JUnit and can be used to unit test Hadoop MapReduce programs.
We will leverage MRUnit to test our EnhancedWordCount program.
Download the latest distribution of MRUnit from Apache MRUnit website and use it in the project.
The MRUnit framework includes a driver for unit testing just the Mapper, another driver for unit testing just the Reducer, yet another driver for unit testing the MapReduce together.
The following is the Java class called com.polarsparc.hadoop.hmr.test.EnhancedWordCountUnitTests that demostrates the use of MRUnit unit testing framework:
Let us try to understand the source code from Listing-4 above.
The Java class org.apache.hadoop.mrunit.mapreduce.MapDriver is the driver to unit test the Mapper.
The Java class org.apache.hadoop.mrunit.mapreduce.ReduceDriver is the driver to unit test the Reducer.
The Java class org.apache.hadoop.mrunit.mapreduce.MapReduceDriver is the driver to unit test both the Mapper and Reducer together.
In the method setup(), we first create an instance of our Mapper EnhancedWordCountMapper and an instance of our Reducer EnhancedWordCountReducer.
Then, we initialze an instance of the MapDriver specifying our instance of Mapper, an instance of the ReduceDriver specifying our instance of Reducer, and an instance of the MapReduceDriver specifying our instances of Mapper and Reducer.
The generic parameter types of the MapDriver must match the generic parameter types of our Mapper, which is Object,Text,Text,IntWritable in our case.
Also, the generic parameter types of the ReduceDriver must match the generic parameter types of our Reducer, which is Text,IntWritable,Text,IntWritable in our case.
Last but not the least, there are 6 generic parameter types for the MapReduceDriver and they must match the 4 generic parameter types of our Mapper and the last 2 generic parameter types of our Reducer. The end result is Object,Text,Text,IntWritable,Text,IntWritable in our case.
In the Listing-4 above, we demostrate unit testing using two methods - first by specifying the input and the expected output result and second by specifying input and testing output using assertions.
In the method testEnhancedWordCountMapper(), we unit test our Mapper. This approach specifies the input as well as the expected result. The input is provided through the method withInput() and the expected output through the method withOutput().
In the method assertEnhancedWordCountMapper(), again we unit test our Mapper. In this approach, we specify the input and assert the expected results. Note that the result comes in a org.apache.hadoop.mrunit.types.Pair, which encapsulates both the key and the value. To get the key, invoke the method getFirst() and to get the value invoke the method getSecond().
Now for the Reducer.
In the method testEnhancedWordCountReducer(), we unit test our Reducer. Recollect from Part-3, that the input to the reduce() method is a value list. This approach specifies the input as well as the expected result.
In the method assertEnhancedWordCountReducer(), again we unit test our Reducer. In this approach, we specify the input and assert the expected results.
And, finally for the Mapper and Reducer together.
In the method testEnhancedWordCountMapReduce(), we unit test our Mapper and Reducer together. This approach specifies the input as well as the expected result.
In the method assertEnhancedWordCountMapReduce(), again we unit test our Mapper and Reducer together. In this approach, we specify the input and assert the expected results.
Executing the above program from Listing-4 in Eclipse results in a successful execution as shown in the following Figure-1 below:
From Figure-1 above, it is clear that we have sucessfully written and executed unit tests for our first Hadoop MapReduce program.
What we have not yet done is an end-to-end integration test with a smaller input to actually prove our Hadoop MapReduce really works.
Interestingly, there is a way to perform the end-to-end integration test locally without spinning up the 3-node cluster.
Create a file called hadoop-local.xml in the directory $HADOOP_PREFIX/conf whose contents look as shown below:.
Package the classes of our EnhancedWordCount program into a jar file called hadoop-polarsparc-1.0.jar in the directory /tmp.
To perform the end-to-end integration test locally, create a small text file in the directory /tmp/hadoop/input and issue the following command:
$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.EnhancedWordCount -conf $HADOOP_PREFIX/conf/hadoop-local.xml /tmp/hadoop/input /tmp/hadoop/output
The following will be the output:
13/12/31 16:50:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/12/31 16:50:07 INFO input.FileInputFormat: Total input paths to process : 1 13/12/31 16:50:07 WARN snappy.LoadSnappy: Snappy native library not loaded 13/12/31 16:50:07 INFO mapred.JobClient: Running job: job_local1285465663_0001 13/12/31 16:50:07 INFO mapred.LocalJobRunner: Waiting for map tasks 13/12/31 16:50:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1285465663_0001_m_000000_0 13/12/31 16:50:07 INFO util.ProcessTree: setsid exited with exit code 0 13/12/31 16:50:07 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4b2d5c2b 13/12/31 16:50:07 INFO mapred.MapTask: Processing split: file:/tmp/hadoop/input/short-story.txt:0+604 13/12/31 16:50:07 INFO mapred.MapTask: io.sort.mb = 100 13/12/31 16:50:07 INFO mapred.MapTask: data buffer = 79691776/99614720 13/12/31 16:50:07 INFO mapred.MapTask: record buffer = 262144/327680 13/12/31 16:50:07 INFO mapred.MapTask: Starting flush of map output 13/12/31 16:50:07 INFO mapred.MapTask: Finished spill 0 13/12/31 16:50:07 INFO mapred.Task: Task:attempt_local1285465663_0001_m_000000_0 is done. And is in the process of commiting 13/12/31 16:50:07 INFO mapred.LocalJobRunner: 13/12/31 16:50:07 INFO mapred.Task: Task 'attempt_local1285465663_0001_m_000000_0' done. 13/12/31 16:50:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1285465663_0001_m_000000_0 13/12/31 16:50:07 INFO mapred.LocalJobRunner: Map task executor complete. 13/12/31 16:50:07 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2921ef9 13/12/31 16:50:07 INFO mapred.LocalJobRunner: 13/12/31 16:50:07 INFO mapred.Merger: Merging 1 sorted segments 13/12/31 16:50:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 476 bytes 13/12/31 16:50:07 INFO mapred.LocalJobRunner: 13/12/31 16:50:07 INFO mapred.Task: Task:attempt_local1285465663_0001_r_000000_0 is done. And is in the process of commiting 13/12/31 16:50:07 INFO mapred.LocalJobRunner: 13/12/31 16:50:07 INFO mapred.Task: Task attempt_local1285465663_0001_r_000000_0 is allowed to commit now 13/12/31 16:50:07 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1285465663_0001_r_000000_0' to /tmp/hadoop/output 13/12/31 16:50:07 INFO mapred.LocalJobRunner: reduce > reduce 13/12/31 16:50:07 INFO mapred.Task: Task 'attempt_local1285465663_0001_r_000000_0' done. 13/12/31 16:50:08 INFO mapred.JobClient: map 100% reduce 100% 13/12/31 16:50:08 INFO mapred.JobClient: Job complete: job_local1285465663_0001 13/12/31 16:50:08 INFO mapred.JobClient: Counters: 20 13/12/31 16:50:08 INFO mapred.JobClient: File Output Format Counters 13/12/31 16:50:08 INFO mapred.JobClient: Bytes Written=346 13/12/31 16:50:08 INFO mapred.JobClient: File Input Format Counters 13/12/31 16:50:08 INFO mapred.JobClient: Bytes Read=604 13/12/31 16:50:08 INFO mapred.JobClient: FileSystemCounters 13/12/31 16:50:08 INFO mapred.JobClient: FILE_BYTES_READ=32336 13/12/31 16:50:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=135280 13/12/31 16:50:08 INFO mapred.JobClient: Map-Reduce Framework 13/12/31 16:50:08 INFO mapred.JobClient: Reduce input groups=35 13/12/31 16:50:08 INFO mapred.JobClient: Map output materialized bytes=480 13/12/31 16:50:08 INFO mapred.JobClient: Combine output records=35 13/12/31 16:50:08 INFO mapred.JobClient: Map input records=8 13/12/31 16:50:08 INFO mapred.JobClient: Reduce shuffle bytes=0 13/12/31 16:50:08 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 13/12/31 16:50:08 INFO mapred.JobClient: Reduce output records=35 13/12/31 16:50:08 INFO mapred.JobClient: Spilled Records=70 13/12/31 16:50:08 INFO mapred.JobClient: Map output bytes=437 13/12/31 16:50:08 INFO mapred.JobClient: Total committed heap usage (bytes)=504365056 13/12/31 16:50:08 INFO mapred.JobClient: CPU time spent (ms)=0 13/12/31 16:50:08 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 13/12/31 16:50:08 INFO mapred.JobClient: SPLIT_RAW_BYTES=103 13/12/31 16:50:08 INFO mapred.JobClient: Map output records=38 13/12/31 16:50:08 INFO mapred.JobClient: Combine input records=38 13/12/31 16:50:08 INFO mapred.JobClient: Reduce input records=35
Walla !!! did it really work ???
Let us check the output directory /tmp/hadoop/output by issuing the following command:
ls -l /tmp/hadoop/output
The following will be the output:
total 4 -rwxrwxrwx 1 bswamina bswamina 334 Dec 31 16:50 part-r-00000 -rwxrwxrwx 1 bswamina bswamina 0 Dec 31 16:50 _SUCCESS
Excellent !!! our Hadoop MapReduce program did complete successfully.
Let us tail the contents of the file /tmp/hadoop/output/part-r-00000 by issuing the following command:
tail /tmp/hadoop/output/part-r-00000
The following will be the output:
their 1 there 1 three 1 times 1 truly 1 truth 1 village 1 villagers 1 watched 1 whole 1
References