Hadoop Quick Notes :: Part - 3
Bhaskar S | 12/27/2013 |
Introduction
In Part-2 we started our 3-node Hadoop 1.x cluster and explored the Hadoop Distributed File System (HDFS).
Now, we will explore the Hadoop MapReduce framework.
Hands-on with Hadoop MapReduce 1.x
Hadoop MapReduce is a distributed execution framework that enables one to develop applications which can process large data sets in parallel on a clusters of slave nodes (running on commodity hardware) in a reliable and fault-tolerant way.
The unit of work in Hadoop MapReduce is a job. A job has a map phase and a reduce phase.
During the map phase, the input data read from the HDFS is split into chunks for processing by the map function executing on the slave nodes of the cluster. And, during the reduce phase, the results from map function are fed as input to the reduce function executing on the slave nodes of the cluster. The reduce function consolidates the input into the final result and writes the output to HDFS.
In other words, the map phase performs filtering and sorting, while the reduce phase performs aggregation.
When we learn a new programming language, it is a tradition to first implement and test a simple "Hello World" program. In the case of Hadoop MapReduce, it is the "Word Count" program. The purpose of this simple Hadoop MapReduce job is to counts the different word(s) in a document. The map phase counts the words in each line of the document, while the reduce phase aggregates the per-line counts into word counts across the whole document.
The Hadoop 1.x distribution already comes with a wordcount sample program. The wordcount sample program is part of the hadoop-examples-1.2.1.jar file located in the directory $HADOOP_PREFIX.
In order to see the wordcount Hadoop MapReduce program in action, we need some large text file.
We can download a variety of books in text format from the Project Gutenberg site. For our test, we downloaded the book Christmas in Poetry and called it by the name Book-ChristmasInPoetry.txt.
Let us copy this file to the /data directory in HDFS. To store this file in HDFS, issue the following command:
hadoop fs -put ./Downloads/Book-ChristmasInPoetry.txt /data
This command will not generate an output.
Issue the following command to list all the file(s) under the /data directory of HDFS:
hadoop fs -ls /data
The following will be the output:
Found 1 items -rw-r--r-- 2 hadoop supergroup 63009 2013-12-27 17:12 /data/Book-ChristmasInPoetry.txt
Now we are now all set to execute our first Hadoop MapReduce program in our 3-node cluster. Issue the following command to execute the wordcount sample Hadoop MapReduce program:
hadoop jar $HADOOP_PREFIX/hadoop-examples-1.2.1.jar wordcount /data /out
The following will be the output:
13/12/27 17:15:54 INFO input.FileInputFormat: Total input paths to process : 1 13/12/27 17:15:54 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/12/27 17:15:54 WARN snappy.LoadSnappy: Snappy native library not loaded 13/12/27 17:15:55 INFO mapred.JobClient: Running job: job_201312271637_0002 13/12/27 17:15:56 INFO mapred.JobClient: map 0% reduce 0% 13/12/27 17:16:01 INFO mapred.JobClient: map 100% reduce 0% 13/12/27 17:16:10 INFO mapred.JobClient: map 100% reduce 33% 13/12/27 17:16:11 INFO mapred.JobClient: map 100% reduce 100% 13/12/27 17:16:12 INFO mapred.JobClient: Job complete: job_201312271637_0002 13/12/27 17:16:12 INFO mapred.JobClient: Counters: 29 13/12/27 17:16:12 INFO mapred.JobClient: Job Counters 13/12/27 17:16:12 INFO mapred.JobClient: Launched reduce tasks=1 13/12/27 17:16:12 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=5381 13/12/27 17:16:12 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/12/27 17:16:12 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/12/27 17:16:12 INFO mapred.JobClient: Launched map tasks=1 13/12/27 17:16:12 INFO mapred.JobClient: Data-local map tasks=1 13/12/27 17:16:12 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=9573 13/12/27 17:16:12 INFO mapred.JobClient: File Output Format Counters 13/12/27 17:16:12 INFO mapred.JobClient: Bytes Written=30043 13/12/27 17:16:12 INFO mapred.JobClient: FileSystemCounters 13/12/27 17:16:12 INFO mapred.JobClient: FILE_BYTES_READ=43029 13/12/27 17:16:12 INFO mapred.JobClient: HDFS_BYTES_READ=63132 13/12/27 17:16:12 INFO mapred.JobClient: FILE_BYTES_WRITTEN=200963 13/12/27 17:16:12 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30043 13/12/27 17:16:12 INFO mapred.JobClient: File Input Format Counters 13/12/27 17:16:12 INFO mapred.JobClient: Bytes Read=63009 13/12/27 17:16:12 INFO mapred.JobClient: Map-Reduce Framework 13/12/27 17:16:12 INFO mapred.JobClient: Map output materialized bytes=43029 13/12/27 17:16:12 INFO mapred.JobClient: Map input records=2015 13/12/27 17:16:12 INFO mapred.JobClient: Reduce shuffle bytes=43029 13/12/27 17:16:12 INFO mapred.JobClient: Spilled Records=6554 13/12/27 17:16:12 INFO mapred.JobClient: Map output bytes=91362 13/12/27 17:16:12 INFO mapred.JobClient: Total committed heap usage (bytes)=177016832 13/12/27 17:16:12 INFO mapred.JobClient: CPU time spent (ms)=970 13/12/27 17:16:12 INFO mapred.JobClient: Combine input records=9373 13/12/27 17:16:12 INFO mapred.JobClient: SPLIT_RAW_BYTES=123 13/12/27 17:16:12 INFO mapred.JobClient: Reduce input records=3277 13/12/27 17:16:12 INFO mapred.JobClient: Reduce input groups=3277 13/12/27 17:16:12 INFO mapred.JobClient: Combine output records=3277 13/12/27 17:16:12 INFO mapred.JobClient: Physical memory (bytes) snapshot=184672256 13/12/27 17:16:12 INFO mapred.JobClient: Reduce output records=3277 13/12/27 17:16:12 INFO mapred.JobClient: Virtual memory (bytes) snapshot=771518464 13/12/27 17:16:12 INFO mapred.JobClient: Map output records=9373
For the wordcount sample Hadoop MapReduce program, we specify as input the HDFS directory where the text file to be processed resides (/data in our case) and in addition we specify the directory where we want the results to be stored in HDFS (/out in our case). Make sure the directory /out is not present in HDFS. The wordcount program will create it for us.
When the wordcount sample Hadoop MapReduce program completes, issue the following command to list all the file(s) under the /out directory of HDFS:
hadoop fs -ls /out
The following will be the output:
Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2013-12-27 17:16 /out/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2013-12-27 17:15 /out/_logs -rw-r--r-- 2 hadoop supergroup 30043 2013-12-27 17:16 /out/part-r-00000
From the Output-3, we see a file named _SUCCESS. The presence of this file indicates that the wordcount sample Hadoop MapReduce program completed successfully.
The word count results will be in the file named part-r-00000.
We are not going to display all the contents of this file, rather peek at the tail of the file. Issue the following command to tail the contents of the file /out/part-r-00000 in HDFS:
hadoop fs -tail /out/part-r-00000
The following will be the output:
www.gutenberg.org 4 www.gutenberg.org/contact 1 www.gutenberg.org/donate 2 www.gutenberg.org/license. 1 yard, 1 ye 9 ye, 3 year 1 year's 1 year; 2 yearly 1 years, 1 yet 1 yonder 1 you 69 you!) 1 you, 4 you. 1 young 2 your 30 yours 1 yourselves 1 The 1
Lets take a peek at the source code of the wordcount sample Hadoop MapReduce program.
The following is the listing for the Java class WordCount:
Let us try to understand the source code from Listing-1 above.
For the map phase, we need a map() function, which can be implemented by extending the base class org.apache.hadoop.mapreduce.Mapper.
For the reduce phase, we need a reduce() function, which can be implemented by extending the base class org.apache.hadoop.mapreduce.Reducer.
A Hadoop MapReduce program works on key-value pairs. The input from HDFS is split into key-value pairs (K1, V1), which is processed by the map() function.
The output of the map() function from the different slave nodes in the cluster is then sorted into key-value pairs (K2, List<V2>). Realize that the map() function from the different slave nodes can output a value for the same key K2. Hence, it is a List of values from the map() function from all the slave nodes.
The key-value pairs (K2, List<V2>) from the map() function is then processed by the reduce() function to produce the key-value pairs (K3, V3).
The following Figure-1 illustrates the Hadoop MapReduce process:
Both the Mapper and Reducer are generic classes that takes four type parameters which specify the input key, input value, output key, and output value types. The input types of the reduce() method must match the output types of the map() method.
The key and value objects must be serializable by the Hadoop MapReduce framework. Hence, the key and value objects have to implement the org.apache.hadoop.io.Writable interface.
As can be inferred from the code, the class org.apache.hadoop.io.Text is a serializable implementation for java.lang.String type and the class org.apache.hadoop.io.IntWritable is a serializable implementation for the primitive int type.
The map() method uses the passed in parameter of type org.apache.hadoop.mapreduce.Mapper.Context to write its output.
Similarly, the reduce() method uses the passed in parameter of type org.apache.hadoop.mapreduce.Reducer.Context instance to write its result.
In the map() method, the input is a line of text from the input document. We tokenize the line into words and for each word from the line, output the number 1 as the count.
As can be inferred from the Figure-1 above, the output from the map() method is a word (key K2) and a list of ones (value List<V2>).
In the reduce() method, we sum up all the ones from the input list value and output the word (key K3) and the sum (value V3).
An instance of the class org.apache.hadoop.mapreduce.Job specifies the Hadoop MapReduce job to be run in our 3-node cluster.
In the main() method, we create a Job object and specify the Mapper class (TokenizerMapper) and Reducer class (IntSumReducer).
Notice that the Reducer class (IntSumReducer) has also been specified as the Combiner class. The Combiner is basically an optimization step in the node that runs the Mapper. Realize that the Mapper is outputting a tokenized word (from the input text) and a count of 1. It is possible for the same word to appear multiple times in the input text. The Combiner basically sums up all the same words as an optimization step rather than send all the same words with a count of 1 to the Reducer node and process it there.
Next, we specify the input and output paths.
Finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish. The return value from the waitForCompletion() method is the completion status indicating success (true) or failure (false).
One can get information about the JobTracker by typing in the following URL in a web browser:
http://hadoop-master:50030
The following diagram in Figure-2 shows the result:
One can get information about the completed Hadoop MapReduce jobs by clicking on Job Tracker History as shown in Figure-2 above. The following is the result of clicking the link:
Clicking on job_201312271637_0002 as shown in Figure-3 above, we see the following:
References