Apache Spark Quick Notes :: Part - 3
Bhaskar S | 10/24/2015 |
Hands-on with Spark Core
We will continue our session from where we left off in Part-2.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
count_one = lines.flatMap(lambda a: a.split(' ')).map(lambda b: (b.lower().strip(",()."), 1)).filter(lambda c: len(c[0]) > 4)
We will be back to the prompt >>>, an indication that all was ok.
The flatMap() function creates a new RDD of words from the lines RDD, which in turn is transformed to a new RDD containing a collection of tuples (first element is the word and the second element is a count of 1 for the word) using the map() function. This new RDD containing tuples is further transformed using the filter() function to create yet another RDD with only those tuples that contain words greater than 4 characters.
In our example above, we create a new RDD called count_one to contain a collection of tuples whose first element is the word of size greater than 4 and the second element is a count of 1 for the word. What we have done here is create a key-value pair of the word and a count of 1.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
count_all = count_one.reduceByKey(lambda a, b: a + b).filter(lambda c: c[1] > 1).sortByKey()
We will be back to the prompt >>>, an indication that all was ok.
The reduceByKey() function on an RDD containing tuples of the form (KEY, VALUE) is a transformation that applies the specified lambda function to VALUEs with the same KEY. The idea is to combine (or aggregate) the VALUEs for a given KEY (in this case a sum). It returns a new RDD containing a collection of tuples of the form (KEY, COUNT). This new RDD is transformed further by eliminating all the KEYs with COUNT of 1.
Finally, the sortByKey() function on an RDD containing tuples of the form (KEY, VALUE) is a transformation that sorts the collection by KEY.
In our example above, we create a new RDD called count_all by summing-up all the tuples of the form (WORD, 1) from the count_one RDD into tuples of the form (WORD, COUNT) where COUNT > 1 and sorting the tuples by the WORD.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
count_all.foreach(lambda x: print(x[0], x[1]))
The following will be the typical output:
15/10/24 20:42:24 INFO SparkContext: Starting job: foreach at <stdin>:1 15/10/24 20:42:24 INFO DAGScheduler: Registering RDD 3 (reduceByKey at <stdin>:1) 15/10/24 20:42:24 INFO DAGScheduler: Got job 0 (foreach at <stdin>:1) with 1 output partitions (allowLocal=false) 15/10/24 20:42:24 INFO DAGScheduler: Final stage: ResultStage 1(foreach at <stdin>:1) 15/10/24 20:42:24 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 15/10/24 20:42:24 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 15/10/24 20:42:24 INFO DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at <stdin>:1), which has no missing parents 15/10/24 20:42:24 INFO MemoryStore: ensureFreeSpace(9064) called with curMem=89387, maxMem=278019440 15/10/24 20:42:24 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.9 KB, free 265.0 MB) 15/10/24 20:42:24 INFO MemoryStore: ensureFreeSpace(5796) called with curMem=98451, maxMem=278019440 15/10/24 20:42:24 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.7 KB, free 265.0 MB) 15/10/24 20:42:24 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:37375 (size: 5.7 KB, free: 265.1 MB) 15/10/24 20:42:24 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874 15/10/24 20:42:24 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at <stdin>:1) 15/10/24 20:42:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/10/24 20:42:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1419 bytes) 15/10/24 20:42:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/10/24 20:42:25 INFO HadoopRDD: Input split: file:/home/spark/spark-1.4.1/text/Spark.txt:0+1369 15/10/24 20:42:25 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/10/24 20:42:25 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/10/24 20:42:25 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/10/24 20:42:25 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/10/24 20:42:25 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/10/24 20:42:25 INFO PythonRDD: Times: total = 1011, boot = 928, init = 79, finish = 4 15/10/24 20:42:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2063 bytes result sent to driver 15/10/24 20:42:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1207 ms on localhost (1/1) 15/10/24 20:42:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/10/24 20:42:26 INFO DAGScheduler: ShuffleMapStage 0 (reduceByKey at <stdin>:1) finished in 1.227 s 15/10/24 20:42:26 INFO DAGScheduler: looking for newly runnable stages 15/10/24 20:42:26 INFO DAGScheduler: running: Set() 15/10/24 20:42:26 INFO DAGScheduler: waiting: Set(ResultStage 1) 15/10/24 20:42:26 INFO DAGScheduler: failed: Set() 15/10/24 20:42:26 INFO DAGScheduler: Missing parents for ResultStage 1: List() 15/10/24 20:42:26 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[6] at foreach at <stdin>:1), which is now runnable 15/10/24 20:42:26 INFO MemoryStore: ensureFreeSpace(6968) called with curMem=104247, maxMem=278019440 15/10/24 20:42:26 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.8 KB, free 265.0 MB) 15/10/24 20:42:26 INFO MemoryStore: ensureFreeSpace(4488) called with curMem=111215, maxMem=278019440 15/10/24 20:42:26 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KB, free 265.0 MB) 15/10/24 20:42:26 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:37375 (size: 4.4 KB, free: 265.1 MB) 15/10/24 20:42:26 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874 15/10/24 20:42:26 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[6] at foreach at <stdin>:1) 15/10/24 20:42:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/10/24 20:42:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1165 bytes) 15/10/24 20:42:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/10/24 20:42:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/10/24 20:42:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms active 2 apache 4 cluster 4 distributed 4 foundation 2 hadoop 2 local 2 machine 2 software 2 source 2 spark 9 storage 3 supports 2 system 3 where 2 15/10/24 20:42:26 INFO PythonRDD: Times: total = 41, boot = -114, init = 154, finish = 1 15/10/24 20:42:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 964 bytes result sent to driver 15/10/24 20:42:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 60 ms on localhost (1/1) 15/10/24 20:42:26 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/10/24 20:42:26 INFO DAGScheduler: ResultStage 1 (foreach at <stdin>:1) finished in 0.060 s 15/10/24 20:42:26 INFO DAGScheduler: Job 0 finished: foreach at <stdin>:1, took 1.440699 s >>>
As can be seen from Output.1, we display a sorted list of words along with their respective counts by executing the foreach() function on the count_all RDD.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
func = (lambda a, b: a + b)
count_all_1 = count_one.aggregateByKey(0, func, func).filter(lambda c: c[1] > 1).sortByKey()
We will be back to the prompt >>>, an indication that all was ok.
The aggregateByKey() function takes 3 parameters - an initial value (zero in our case as we are counting the number of word occurrences), a lambda function to merge (or aggregate) VALUEs within a partition for a given KEY, and a lambda function to merge (or aggregate) VALUEs across partitions for a given KEY. We use the same function func to aggregate VALUEs within and across pertition(s).
The aggregateByKey() function on an RDD containing tuples of the form (KEY, VALUE) is a transformation that aggregates the VALUEs for a given KEY (in this case a sum). It returns a new RDD containing a collection of tuples of the form (KEY, COUNT). This new RDD is transformed further by eliminating all the KEYs with COUNT of 1.
Using the aggregateByKey() function is an alternate to using the reduceByKey() function.
In our example above, we create a new RDD called count_all_1 by summing-up all the tuples of the form (WORD, 1) from the count_one RDD into tuples of the form (WORD, COUNT) where COUNT > 1 and sorting the tuples by the WORD.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
count_all_1.foreach(lambda x: print(x[0], x[1]))
The following will be the typical output:
15/10/24 20:51:23 INFO SparkContext: Starting job: foreach at <stdin>:1 15/10/24 20:51:23 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes 15/10/24 20:51:23 INFO DAGScheduler: Got job 1 (foreach at <stdin>:1) with 1 output partitions (allowLocal=false) 15/10/24 20:51:23 INFO DAGScheduler: Final stage: ResultStage 3(foreach at <stdin>:1) 15/10/24 20:51:23 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2) 15/10/24 20:51:23 INFO DAGScheduler: Missing parents: List() 15/10/24 20:51:23 INFO DAGScheduler: Submitting ResultStage 3 (PythonRDD[11] at foreach at <stdin>:1), which has no missing parents 15/10/24 20:51:23 INFO MemoryStore: ensureFreeSpace(8144) called with curMem=115595, maxMem=278019440 15/10/24 20:51:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.0 KB, free 265.0 MB) 15/10/24 20:51:23 INFO MemoryStore: ensureFreeSpace(5255) called with curMem=123739, maxMem=278019440 15/10/24 20:51:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 5.1 KB, free 265.0 MB) 15/10/24 20:51:23 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:49913 (size: 5.1 KB, free: 265.1 MB) 15/10/24 20:51:23 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874 15/10/24 20:51:23 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (PythonRDD[11] at foreach at <stdin>:1) 15/10/24 20:51:23 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 15/10/24 20:51:23 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2, localhost, PROCESS_LOCAL, 1165 bytes) 15/10/24 20:51:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 2) 15/10/24 20:51:23 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/10/24 20:51:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms active 2 apache 4 cluster 4 distributed 4 foundation 2 hadoop 2 local 2 machine 2 software 2 source 2 spark 9 storage 3 supports 2 system 3 where 2 15/10/24 20:51:23 INFO PythonRDD: Times: total = 41, boot = -49035, init = 49075, finish = 1 15/10/24 20:51:23 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 964 bytes result sent to driver 15/10/24 20:51:23 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 52 ms on localhost (1/1) 15/10/24 20:51:23 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/10/24 20:51:23 INFO DAGScheduler: ResultStage 3 (foreach at <stdin>:1) finished in 0.052 s 15/10/24 20:51:23 INFO DAGScheduler: Job 1 finished: foreach at <stdin>:1, took 0.069153 s >>>
As can be seen from Output.2, we display a sorted list of words along with their respective counts by executing the foreach() function on the count_all RDD.
Let us create another simple text file called Quantity.txt under the directory /home/spark/spark-1.4.1/text with the following contents:
Product-A, 2 Product-B, 1 Product-C, 4 Product-A, 4 Product-B, 3 Product-C, 2 Product-A, 4 Product-B, 2 Product-C, 6 Product-B, 3 Product-C, 2 Product-A, 2 Product-C, 8
This is a simple text file containing the quantities of 3 abstract products sold in different regions.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
products = sc.textFile('./text/Quantity.txt')
The following will be the typical output:
15/10/25 14:17:27 INFO MemoryStore: ensureFreeSpace(44592) called with curMem=89387, maxMem=278019440 15/10/25 14:17:27 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 43.5 KB, free 265.0 MB) 15/10/25 14:17:27 INFO MemoryStore: ensureFreeSpace(10629) called with curMem=133979, maxMem=278019440 15/10/25 14:17:27 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.4 KB, free 265.0 MB) 15/10/25 14:17:27 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60441 (size: 10.4 KB, free: 265.1 MB) 15/10/25 14:17:27 INFO SparkContext: Created broadcast 3 from textFile at NativeMethodAccessorImpl.java:-2 >>>
In our example above, products is an RDD with the contents of the file text/Quantity.txt.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
quantity = products.map(lambda a: tuple(a.replace(" ", "").split(','))).map(lambda b: (b[0], int(b[1])))
We will be back to the prompt >>>, an indication that all was ok.
In our example above, we create a new RDD called quantity by converting each line from the products RDD into a tuple, where the first element is the product name and second element is the quantity sold.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
init_fn = (lambda a: (a, 1))
merge_fn = (lambda a, b: (a[0] + b, a[1] + 1))
combine_fn = (lambda a, b: (a[0] + b[0], a[1] + b[1]))
totals = quantity.combineByKey(init_fn, merge_fn, combine_fun)
We will be back to the prompt >>>, an indication that all was ok.
The combineByKey() function works on a collection of (KEY, VALUE) pairs and takes 3 function parameters - an initialization function, a merge function, and a combine function.
The initialization function is invoked only once for each KEY that is encountered the first time. The input argument for the initialization function is the VALUE of a given (KEY, VALUE) pair. In our example, the initialization function is init_fn and is used to create a tuple of the form (VALUE_SUM, COUNT_SUM) and is equal to (VALUE, 1) when we see the KEY the first time.
The merge function is invoked for each KEY that is encountered every time in the future after the first occurrence. The input arguments for the merge function is the current tuple (VALUE_SUM, COUNT_SUM) for a given KEY and the new VALUE just encountered. In our example, the merge function is merge_fn and is used to update (accumulate) the tuple (VALUE_SUM, COUNT_SUM) for the given KEY as we encounter new VALUEs for the same KEY. In other words, give a (KEY, VALUE) pair, we update VALUE_SUM = VALUE_SUM + VALUE and increment COUNT_SUM = COUNT_SUM + 1.
The combile function is used to combile (accumulate) the tuples (VALUE_SUM, COUNT_SUM) for the same KEY from across the partitions. In our example, the combile function is combile_fn and is used to combile (accumulate) the tuples (VALUE_SUM, COUNT_SUM) for the same KEY from across the partitions (in our example we just have one partition).
In our example above, we create a new RDD called totals that contains the key-value pairs in the form (KEY, (VALUE_SUM, COUNT_SUM)), where VALUE_SUM is the sum of all the VALUEs for a give KEY and COUNT_SUM is the number of times a KEY occurs.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
averages = totals.map(lambda a: (a[0], a[1][0]//a[1][1]))
We will be back to the prompt >>>, an indication that all was ok.
In our example above, we create a new RDD called averages that contains average quantities for each product in the form (KEY, AVERAGE_VALUE). The AVERAGE_VALUE is computed using (VALUE_SUM / COUNT_SUM) for a given KEY.
At the Python Spark shell prompt >>>, type the following command and press ENTER:
averages.take(3)
The following will be the typical output:
15/10/25 14:09:38 INFO SparkContext: Starting job: runJob at PythonRDD.scala:366 15/10/25 14:09:38 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 143 bytes 15/10/25 14:09:38 INFO DAGScheduler: Got job 8 (runJob at PythonRDD.scala:366) with 1 output partitions (allowLocal=true) 15/10/25 14:09:38 INFO DAGScheduler: Final stage: ResultStage 14(runJob at PythonRDD.scala:366) 15/10/25 14:09:38 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 13) 15/10/25 14:09:38 INFO DAGScheduler: Missing parents: List() 15/10/25 14:09:38 INFO DAGScheduler: Submitting ResultStage 14 (PythonRDD[28] at RDD at PythonRDD.scala:43), which has no missing parents 15/10/25 14:09:38 INFO MemoryStore: ensureFreeSpace(5952) called with curMem=432675, maxMem=278019440 15/10/25 14:09:38 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 5.8 KB, free 264.7 MB) 15/10/25 14:09:38 INFO MemoryStore: ensureFreeSpace(3819) called with curMem=438627, maxMem=278019440 15/10/25 14:09:38 INFO MemoryStore: Block broadcast_15_piece0 stored as bytes in memory (estimated size 3.7 KB, free 264.7 MB) 15/10/25 14:09:38 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on localhost:60441 (size: 3.7 KB, free: 265.1 MB) 15/10/25 14:09:38 INFO SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:874 15/10/25 14:09:38 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 14 (PythonRDD[28] at RDD at PythonRDD.scala:43) 15/10/25 14:09:38 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks 15/10/25 14:09:38 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 11, localhost, PROCESS_LOCAL, 1165 bytes) 15/10/25 14:09:38 INFO Executor: Running task 0.0 in stage 14.0 (TID 11) 15/10/25 14:09:38 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/10/25 14:09:38 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/10/25 14:09:38 INFO PythonRDD: Times: total = 7, boot = 3, init = 3, finish = 1 15/10/25 14:09:38 INFO Executor: Finished task 0.0 in stage 14.0 (TID 11). 1043 bytes result sent to driver 15/10/25 14:09:38 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 11) in 12 ms on localhost (1/1) 15/10/25 14:09:38 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool 15/10/25 14:09:38 INFO DAGScheduler: ResultStage 14 (runJob at PythonRDD.scala:366) finished in 0.012 s 15/10/25 14:09:38 INFO DAGScheduler: Job 8 finished: runJob at PythonRDD.scala:366, took 0.018887 s [('Product-A', 3), ('Product-C', 4), ('Product-B', 2)] >>>
As can be seen from Output.4, we get the 3 products and their average quantities as a list by executing the take(3) function on the averages RDD.
To wrap up this part, let us summarize all the transformation functions we used thus far in this part.
The following is the summary of the RDD transformation functions we used in this part:
Tranformation Function | Description |
---|---|
reduceByKey | Works on a collection of (KEY, VALUE) pairs and applies the specified lambda function to VALUEs with the same KEY |
sortByKey | Works on a collection of (KEY, VALUE) pairs and sorts them by the KEY |
aggregateByKey | Works on a collection of (KEY, VALUE) pairs and aggregates the VALUEs for each KEY, using the given initial value, the merge function and combine function |
combineByKey | Works on a collection of (KEY, VALUE) pairs and is a generic function that combines the VALUEs for each KEY, using the given initial function, the merge function and combine function |
References