In Part-3,
Part-4,
and Part-5,
we got our hands dirty with the Hadoop MapReduce 1.x
framework.
But, how does the Hadoop MapReduce 1.x
framework really work ???
In this part, we will try to unravel the mechanics behind Hadoop
MapReduce 1.x program.
How Hadoop MapReduce 1.x Works
From our previous exercises, we kicked off a Hadoop
MapReduce process by:
Creating an instance of a Job object
Specifying the Mapper class
Specifying the Reducer class
Pointing the HDFS location to the input
file(s)
Indicating the HDFS location to the
output directory
And, finally submitting the MapReduce
job
When a Job is submitted, it is put in a
queue to be picked up and processed by the JobTracker.
Here is what the JobTracker does:
Checks for the existence of the output directory in HDFS.
If exists, terminates with an error
Uses an implementation of org.apache.hadoop.mapreduce.InputFormat
that validates the existence of the input directory in HDFS.
If does not exist, terminates with an error
Uses the InputFormat to determine all
the input file(s) to be processed under the specified HDFS
input directory and returns a list of org.apache.hadoop.mapreduce.InputSplit
(encapsulates the size of the data block as well as the list of
nodes in the cluster where the data block resides)
Copies all the resources (Java jar, the configuration information,
and the list of InputSplits) needed to
execute the job in HDFS
Based on the information from each InputSplit,
directs the appropriate TaskTracker
closest to the input (data block) for starting the Mapper
task. The number of Mapper tasks is
controlled by the system property mapred.map.tasks
(Default is 2)
Periodically gets a heartbeat from each of the TaskTrackers
on the status of the Mapper tasks
Once all the Mapper tasks complete
successfully, directs the available TaskTrackers
to start the Reducer tasks. The number of
Reducer tasks is controlled by the system
property mapred.reduce.tasks (Default
is 1)
Periodically gets a heartbeat from the TaskTrackers
on the status of the Reducer tasks
Once the Reducer task finishes, the
submitted Job is completed
The following Figure-1 illustrates the end-to-end picture of how it
works:
Figure-1
The picture in Figure-1 is from the O'Reilly book: Hadoop,
The Definitie Guide by Tom White.
In the above paragraph describing the JobTracker,
we indicated few classes such as the InputFormat,
InputSplit, etc. Let us elaborate on them in
the following paragraph(s).
InputSplit
Used by the JobTracker
Is typically a data block from the HDFS
with a byte-oriented view of the data
Encapsulates the size of the input data block as well as the list
of nodes in the cluster where the data block(s) reside
Used as the input for the Mapper task
The default implementation for InputSplit
is org.apache.hadoop.mapreduce.lib.input.FileSplit
The following is the source code for InputSplit
taken as is from the hadoop-1.2.1:
As can be inferred from Listing-1 above, InputSplit
is an abstract class with no implementation.
The concrete implementation for InputSplit
is the class FileSplit.
InputFormat
Used by the JobTracker
Validates the specified HDFS input
directory
Based on the input file(s) to be processed, returns a list of org.apache.hadoop.mapreduce.InputSplit
objects
Is a factory for org.apache.hadoop.mapreduce.RecordReader
The default implementation for InputFormat
is org.apache.hadoop.mapreduce.lib.input.TextInputFormat.
TextInputFormat is used for processing
text file(s)
The following is the source code for InputFormat
taken as is from the hadoop-1.2.1:
As can be inferred from Listing-2 above, InputFormat
is an abstract class with no implementation.
One of the concrete implementations of InputFormat
for file based input is the class TextInputFormat.
The class TextInputFormat however extends
the abstract class FileInputFormat.
The abstract class FileInputFormat
provides the implementation for the method getSplits(),
while the class TextInputFormat implements
the factory method createRecordReader().
The following is an excerpt of the source code for FileInputFormat
taken as is from the hadoop-1.2.1:
From the Listing-3 above, in the method getSplits(),
the logic is to go through each of the input file(s) and return a list
of FileSplits.
Notice that the method getSplits()
references the class org.apache.hadoop.fs.FileSystem
to determine the splits.
For HDFS, the FileSystem
used is an instance of org.apache.hadoop.hdfs.DistributedFileSystem.
Now that we have peeked into the functionality of some of the important
classes above, it is time to look at the TaskTracker.
Here is what the TaskTracker does:
Fetches all the resources (Java jar, the configuration information,
and the allocated InputSplit from HDFS
Periodically sends a heartbeat to the JobTracker
to report status on the assigned task
On start of the Mapper task, creates a
local temporary directory to store the output from the Mapper
task
For the Mapper task, uses RecordReader
to fetch the next record, and invokes the map()
function with the input record as the input value
The map() function outputs a KEY-VALUE
pair as output, which is run through the org.apache.hadoop.mapreduce.Partitioner
so that output for the same KEY are combined into a set of
KEY-LIST<VALUE> pairs. The KEY-LIST<VALUE> pairs are
then sorted by the KEY
When the Mapper task completes, all the
sorted KEY-LIST<VALUE> pairs are saved to a file as output in
the local temporary directory and the JobTracker
is notified of the completion
For the Reducer task, it fetches the
output file (containing the sorted KEY-LIST<VALUE> pairs) from
the nodes that executed the Mapper tasks
and invokes the reduce() function for
each KEY-LIST<VALUE> pair
The Reducer task uses an implementation
of org.apache.hadoop.mapreduce.OutputFormat
to write to the output directory in HDFS
In the above paragraph describing the TaskTracker,
we indicated few classes such as the RecordReader,
Partitioner, OutputFormat,
etc. Let us elaborate on them in the following paragraph(s).
RecordReader
Used by the Mapper task in the TaskTracker
An InputSplit provides a raw
byte-oriented view of the input data block, while the org.apache.hadoop.mapreduce.RecordReader
provides a record-oriented view of the input data
Provides input value into the map()
function of the Mapper
The default implementation for RecordReader
is org.apache.hadoop.mapreduce.lib.input.LineRecordReader
The LineRecordReader is what provides
each line (terminated by a newline) as a record to the Mapper
The following is the source code for RecordReader
taken as is from the hadoop-1.2.1:
As can be inferred from Listing-4 above, RecordReader
is an abstract class with no implementation.
The concrete implementation for RecordReader
for reading text input files is the class LineRecordReader.
The following is the source code for LineRecordReader
taken as is from the hadoop-1.2.1:
From the Listing-5 above, in the method initialize(),
one of the steps is to open the data input stream for the specified InputSplit. In otherwords, it references the
class org.apache.hadoop.fs.FSDataInputStream
for the data input stream.
For HDFS, the FSDataInputStream
used is an instance of org.apache.hadoop.hdfs.DFSClient$DFSDataInputStream.
DFSClient$DFSDataInputStream internally
uses an instance of org.apache.hadoop.hdfs.DFSClient$DFSInputStream.
It is DFSClient$DFSInputStream that
handles all the communication with the NameNode
and various DataNodes for the various
splits (data blocks) of an input file.
Partitioner
Used in the TaskTracker after the Mapper task produces output
Helps in the distribution of output from the Mapper
to the Reducer
The default implementation for Partitioner
used is org.apache.hadoop.mapred.lib.HashPartitioner.
It uses the hash code on KEY to distribute the output
OutputFormat
Used by the Reducer task in the TaskTracker
Used for writing output to HDFS from the
Reducer task
The default implementation for OutputFormat
is org.apache.hadoop.mapreduce.lib.input.TextOutputFormat
Is a factory for org.apache.hadoop.mapreduce.RecordWriter
RecordWriter
Used by the Reducer task in the TaskTracker
The default implementation for RecordWriter
is org.apache.hadoop.mapreduce.lib.input.TextOutputFormat.LineRecordWriter
The TextOutputFormat.LineRecordWriter is
what outputs each line of KEY-VALUE pairs (terminated by a newline)
as a record from the Reducer
The following Figure-2 illustrates the end-to-end picture of the
classes:
Figure-2
Note that the sort step is really not a
class and hence depicted as a box with dotted boundary.