Hadoop Quick Notes :: Part - 5
Bhaskar S | 01/01/2014 |
Introduction
In Part-4, we implemented and tested our first Hadoop MapReduce program called the EnhancedWordCount.
Also, we demonstrated how to use the MRUnit framework to unit test our Hadoop MapReduce program.
Now we will continue our journey to implement a more advanced Hadoop MapReduce program with custom Writable classes.
Hands-on with Hadoop MapReduce 1.x - Part 3
Let us implement a Hadoop MapReduce program to analyze stocks between 2009 and 2010 to see when they hit a low and a high.
We need some historical stock data for this, which we can get from the website Historical Data for S&P 500 Stocks
Download the historical full set and save it in a file called historical.txt.
Each line in this file is a record that has the format: Ticker, Open, High, Low, Close, Volume, delimited by commas.
In the Mapper, we want each line to be parsed and encapsulated into a Stock Ticker.
The following is the Java class that encapsulates a Stock Ticker called com.polarsparc.hadoop.hmr.StockMapOutputWritable that implements the Writable interface:
As can be inferred from the Listing-1 above, the Stock Ticker has fields symbol, date of the close price, the close price and the volume.
Next it is time to implement our custom org.apache.hadoop.mapreduce.Mapper.
The following is the Java class called com.polarsparc.hadoop.hmr.StockAnalysisMapper that implements our Mapper:
As can be inferred from the Listing-2 above, we tokenize the input text using comma, check for the token count to be 7, we create and populate an instance of StockMapOutputWritable, and finally write the symbol as the output key from the Mapper and our custom stock object as the output value.
In the Reducer, we want to iterate over the list of StockMapOutputWritable values and determine the low close and high close information for the stock symbol.
The following is the Java class that encapsulates this Stock Analysis information called com.polarsparc.hadoop.hmr.StockReduceOutputWritable that implements the Writable interface:
As can be inferred from the Listing-3 above, the custom class encapsulates information of the symbol, the date, the close price and the volume for the low price point and similarly the date, the close price and the volume for the high price point.
Next it is time to implement our custom org.apache.hadoop.mapreduce.Reducer.
The following is the Java class called com.polarsparc.hadoop.hmr.StockAnalysisReducer that implements our Reducer:
As can be inferred from the Listing-4 above, we create an instance of StockReduceOutputWritable. As we iterate over the list of StockMapOutputWritable values to determine the low and high price points, we populate the information in the instance of StockReduceOutputWritable. Finally, we write the symbol as the output key from the Reducer and our custom stock object as the output value.
The final step is to implement the main Hadoop MapReduce job.
The following is the main Java class called com.polarsparc.hadoop.hmr.StockAnalysis:
As can be inferred from the Listing-5 above, in the main() function, we create a Job object, specify the Mapper class (StockAnalysisMapper), specify the output key class (Text) and value class (StockMapOutputWritable) from the Mapper, specify the Reducer class (StockAnalysisReducer), and specify the output key class (Text) and value class (StockReduceOutputWritable) from the Reducer. Next, we specify the input and output paths. And finally, we invoke the waitForCompletion() method on the Job object to submit the Hadoop MapReduce job and wait for it to finish.
We will leverage MRUnit to test our StockAnalysis program.
The following is the Java class called com.polarsparc.hadoop.hmr.test.StockMapReduceUnitTests that demostrates the use of MRUnit unit testing framework:
In the Listing-6 above, we demostrate unit testing using assertions.
Executing the above program from Listing-6 in Eclipse results in a successful execution as shown in the following Figure-1 below:
From Figure-1 above, it is clear that we have sucessfully written and executed unit tests for our Stock Analysis Hadoop MapReduce program.
Next, we will perform an end-to-end integration test locally.
Package the classes of our StockAnalysis program into a jar file called hadoop-polarsparc-1.0.jar in the directory /tmp.
To perform the end-to-end integration test locally, create a smaller historical.txt file (we copied upto the symbol AAPL - about 731 records) in the directory /tmp/hadoop/input and issue the following command:
$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.StockAnalysis -conf $HADOOP_PREFIX/conf/hadoop-local.xml /tmp/hadoop/input /tmp/hadoop/output
The following will be the output:
14/01/01 21:30:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/01/01 21:30:26 INFO input.FileInputFormat: Total input paths to process : 1 14/01/01 21:30:26 WARN snappy.LoadSnappy: Snappy native library not loaded 14/01/01 21:30:27 INFO mapred.JobClient: Running job: job_local1821376773_0001 14/01/01 21:30:27 INFO mapred.LocalJobRunner: Waiting for map tasks 14/01/01 21:30:27 INFO mapred.LocalJobRunner: Starting task: attempt_local1821376773_0001_m_000000_0 14/01/01 21:30:27 INFO util.ProcessTree: setsid exited with exit code 0 14/01/01 21:30:27 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@146816b7 14/01/01 21:30:27 INFO mapred.MapTask: Processing split: file:/tmp/hadoop/input/historical.txt:0+31887 14/01/01 21:30:27 INFO mapred.MapTask: io.sort.mb = 100 14/01/01 21:30:27 INFO mapred.MapTask: data buffer = 79691776/99614720 14/01/01 21:30:27 INFO mapred.MapTask: record buffer = 262144/327680 14/01/01 21:30:27 INFO mapred.MapTask: Starting flush of map output 14/01/01 21:30:27 INFO mapred.MapTask: Finished spill 0 14/01/01 21:30:27 INFO mapred.Task: Task:attempt_local1821376773_0001_m_000000_0 is done. And is in the process of commiting 14/01/01 21:30:27 INFO mapred.LocalJobRunner: 14/01/01 21:30:27 INFO mapred.Task: Task 'attempt_local1821376773_0001_m_000000_0' done. 14/01/01 21:30:27 INFO mapred.LocalJobRunner: Finishing task: attempt_local1821376773_0001_m_000000_0 14/01/01 21:30:27 INFO mapred.LocalJobRunner: Map task executor complete. 14/01/01 21:30:27 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@6eb1b8de 14/01/01 21:30:27 INFO mapred.LocalJobRunner: 14/01/01 21:30:27 INFO mapred.Merger: Merging 1 sorted segments 14/01/01 21:30:27 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 26061 bytes 14/01/01 21:30:27 INFO mapred.LocalJobRunner: 14/01/01 21:30:27 INFO mapred.Task: Task:attempt_local1821376773_0001_r_000000_0 is done. And is in the process of commiting 14/01/01 21:30:27 INFO mapred.LocalJobRunner: 14/01/01 21:30:27 INFO mapred.Task: Task attempt_local1821376773_0001_r_000000_0 is allowed to commit now 14/01/01 21:30:27 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1821376773_0001_r_000000_0' to /tmp/hadoop/output 14/01/01 21:30:27 INFO mapred.LocalJobRunner: reduce > reduce 14/01/01 21:30:27 INFO mapred.Task: Task 'attempt_local1821376773_0001_r_000000_0' done. 14/01/01 21:30:28 INFO mapred.JobClient: map 100% reduce 100% 14/01/01 21:30:28 INFO mapred.JobClient: Job complete: job_local1821376773_0001 14/01/01 21:30:28 INFO mapred.JobClient: Counters: 20 14/01/01 21:30:28 INFO mapred.JobClient: File Output Format Counters 14/01/01 21:30:28 INFO mapred.JobClient: Bytes Written=161 14/01/01 21:30:28 INFO mapred.JobClient: File Input Format Counters 14/01/01 21:30:28 INFO mapred.JobClient: Bytes Read=31887 14/01/01 21:30:28 INFO mapred.JobClient: FileSystemCounters 14/01/01 21:30:28 INFO mapred.JobClient: FILE_BYTES_READ=122125 14/01/01 21:30:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=188467 14/01/01 21:30:28 INFO mapred.JobClient: Map-Reduce Framework 14/01/01 21:30:28 INFO mapred.JobClient: Reduce input groups=3 14/01/01 21:30:28 INFO mapred.JobClient: Map output materialized bytes=26065 14/01/01 21:30:28 INFO mapred.JobClient: Combine output records=0 14/01/01 21:30:28 INFO mapred.JobClient: Map input records=731 14/01/01 21:30:28 INFO mapred.JobClient: Reduce shuffle bytes=0 14/01/01 21:30:28 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 14/01/01 21:30:28 INFO mapred.JobClient: Reduce output records=3 14/01/01 21:30:28 INFO mapred.JobClient: Spilled Records=1462 14/01/01 21:30:28 INFO mapred.JobClient: Map output bytes=24597 14/01/01 21:30:28 INFO mapred.JobClient: Total committed heap usage (bytes)=504365056 14/01/01 21:30:28 INFO mapred.JobClient: CPU time spent (ms)=0 14/01/01 21:30:28 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 14/01/01 21:30:28 INFO mapred.JobClient: SPLIT_RAW_BYTES=102 14/01/01 21:30:28 INFO mapred.JobClient: Map output records=731 14/01/01 21:30:28 INFO mapred.JobClient: Combine input records=0 14/01/01 21:30:28 INFO mapred.JobClient: Reduce input records=731
Let us check the output directory /tmp/hadoop/output by issuing the following command:
ls -l /tmp/hadoop/output
The following will be the output:
total 4 -rwxrwxrwx 1 bswamina bswamina 149 Jan 1 21:30 part-r-00000 -rwxrwxrwx 1 bswamina bswamina 0 Jan 1 21:30 _SUCCESS
Let us tail the contents of the file /tmp/hadoop/output/part-r-00000 by issuing the following command:
tail /tmp/hadoop/output/part-r-00000
The following will be the output:
A A,20090902,25.22,64614,20100429,37.23,23156 AA AA,20100702,10.0,154945,20100429,37.23,23156 AAPL AAPL,20100702,10.0,154945,20100618,274.074,280221
We have successfully implemented and tested our Stock Analysis Hadoop MapReduce program locally.
Time to deploy and test in our 3-node Hadoop cluster.
First, copy the file historical.txt into HDFS under /data/stocks.
Now let us execute our Stock Analysis Hadoop MapReduce by issuing the following command:
$HADOOP_PREFIX/bin/hadoop jar /tmp/hadoop-polarsparc-1.0.jar com.polarsparc.hadoop.hmr.StockAnalysis /data/stocks /data/output
The following will be the output:
14/01/01 21:51:11 INFO input.FileInputFormat: Total input paths to process : 1 14/01/01 21:51:11 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/01/01 21:51:11 WARN snappy.LoadSnappy: Snappy native library not loaded 14/01/01 21:51:11 INFO mapred.JobClient: Running job: job_201401012020_0001 14/01/01 21:51:12 INFO mapred.JobClient: map 0% reduce 0% 14/01/01 21:51:21 INFO mapred.JobClient: map 100% reduce 0% 14/01/01 21:51:29 INFO mapred.JobClient: map 100% reduce 33% 14/01/01 21:51:31 INFO mapred.JobClient: map 100% reduce 100% 14/01/01 21:51:33 INFO mapred.JobClient: Job complete: job_201401012020_0001 14/01/01 21:51:33 INFO mapred.JobClient: Counters: 29 14/01/01 21:51:33 INFO mapred.JobClient: Job Counters 14/01/01 21:51:33 INFO mapred.JobClient: Launched reduce tasks=1 14/01/01 21:51:33 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9408 14/01/01 21:51:33 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 14/01/01 21:51:33 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 14/01/01 21:51:33 INFO mapred.JobClient: Launched map tasks=1 14/01/01 21:51:33 INFO mapred.JobClient: Data-local map tasks=1 14/01/01 21:51:33 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10273 14/01/01 21:51:33 INFO mapred.JobClient: File Output Format Counters 14/01/01 21:51:33 INFO mapred.JobClient: Bytes Written=26854 14/01/01 21:51:33 INFO mapred.JobClient: FileSystemCounters 14/01/01 21:51:33 INFO mapred.JobClient: FILE_BYTES_READ=4539170 14/01/01 21:51:33 INFO mapred.JobClient: HDFS_BYTES_READ=5326654 14/01/01 21:51:33 INFO mapred.JobClient: FILE_BYTES_WRITTEN=9193733 14/01/01 21:51:33 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=26854 14/01/01 21:51:33 INFO mapred.JobClient: File Input Format Counters 14/01/01 21:51:33 INFO mapred.JobClient: Bytes Read=5326536 14/01/01 21:51:33 INFO mapred.JobClient: Map-Reduce Framework 14/01/01 21:51:33 INFO mapred.JobClient: Map output materialized bytes=4539170 14/01/01 21:51:33 INFO mapred.JobClient: Map input records=122574 14/01/01 21:51:33 INFO mapred.JobClient: Reduce shuffle bytes=4539170 14/01/01 21:51:33 INFO mapred.JobClient: Spilled Records=245148 14/01/01 21:51:33 INFO mapred.JobClient: Map output bytes=4294016 14/01/01 21:51:33 INFO mapred.JobClient: Total committed heap usage (bytes)=177016832 14/01/01 21:51:33 INFO mapred.JobClient: CPU time spent (ms)=1810 14/01/01 21:51:33 INFO mapred.JobClient: Combine input records=0 14/01/01 21:51:33 INFO mapred.JobClient: SPLIT_RAW_BYTES=118 14/01/01 21:51:33 INFO mapred.JobClient: Reduce input records=122574 14/01/01 21:51:33 INFO mapred.JobClient: Reduce input groups=524 14/01/01 21:51:33 INFO mapred.JobClient: Combine output records=0 14/01/01 21:51:33 INFO mapred.JobClient: Physical memory (bytes) snapshot=230092800 14/01/01 21:51:33 INFO mapred.JobClient: Reduce output records=524 14/01/01 21:51:33 INFO mapred.JobClient: Virtual memory (bytes) snapshot=771518464 14/01/01 21:51:33 INFO mapred.JobClient: Map output records=122574
Let us list all the file(s) under the /data/output directory of HDFS by issuing the following command:
hadoop fs -ls /data/output
The following will be the output:
Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2014-01-01 21:51 /data/output/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2014-01-01 21:51 /data/output/_logs -rw-r--r-- 2 hadoop supergroup 26854 2014-01-01 21:51 /data/output/part-r-00000
From the Output-5, we see a file named _SUCCESS. The presence of this file indicates that the Stock Analysis Hadoop MapReduce program completed successfully.
The stock analysis results will be in the file named part-r-00000.
We are not going to display all the contents of this file, rather peek at the tail of the file. Issue the following command to tail the contents of the file /data/output/part-r-00000 in HDFS:
hadoop fs -tail /data/output/part-r-00000
The following will be the output:
WMT WMT,20090825,1.35,266104,20100104,626.75,19579 WPI WPI,20090825,1.35,266104,20100104,626.75,19579 WPO WPO,20090825,1.35,266104,20100104,626.75,19579 WU WU,20090825,1.35,266104,20100104,626.75,19579 WY WY,20090825,1.35,266104,20100104,626.75,19579 WYE WYE,20090825,1.35,266104,20100104,626.75,19579 WYN WYN,20090825,1.35,266104,20100104,626.75,19579 WYNN WYNN,20090825,1.35,266104,20100104,626.75,19579 X X,20090825,1.35,266104,20100104,626.75,19579 XEL XEL,20090825,1.35,266104,20100104,626.75,19579 XL XL,20090825,1.35,266104,20100104,626.75,19579 XLNX XLNX,20090825,1.35,266104,20100104,626.75,19579 XOM XOM,20090825,1.35,266104,20100104,626.75,19579 XRAY XRAY,20090825,1.35,266104,20100104,626.75,19579 XRX XRX,20090825,1.35,266104,20100104,626.75,19579 XTO XTO,20090825,1.35,266104,20100104,626.75,19579 YHOO YHOO,20090825,1.35,266104,20100104,626.75,19579 YUM YUM,20090825,1.35,266104,20100104,626.75,19579 ZION ZION,20090825,1.35,266104,20100104,626.75,19579 ZMH ZMH,20090825,1.35,266104,20100104,626.75,19579
References