!pip install pyspark
import requests
from pyspark.context import SparkContext
r = requests.get('https://www.cse.ust.hk/msbd5003/data/fruits.txt')
open('fruits.txt', 'wb').write(r.content)
sc = SparkContext.getOrCreate()
Question 1¶
The following piece of code computes the frequencies of the words in a text file:
from operator import add
lines = sc.textFile('README.md')
counts = lines.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
Add one line to find the most frequent word. Output this word and its frequency.
Hint: Use sortBy(), reduce(), or max()
from operator import add
lines = sc.textFile('sample_data/README.md')
counts = lines.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.max(key=lambda x: x[1])
print(counts)
('is', 4)
Question 2¶
Modify the word count example above, so that we only count the frequencies of those words consisting of 5 or more characters.
from operator import add
lines = sc.textFile('sample_data/README.md')
counts = lines.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.filter(lambda x: len(x[0]) >= 5)
print(counts.take(10))
[('directory', 1), ('datasets', 1), ('`california_housing_data*.csv`', 1), ('housing', 1), ('https://developers.google.com/machine-learning/crash-course/california-housing-data-description', 1), ('`mnist_*.csv`', 1), ("[Anscombe's", 1), ('originally', 1), ('Anscombe,', 1), ("'Graphs", 1)]
Question 3¶
Consider the following piece of code:
A = sc.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
What's its output? (Yes, you can just run it.)
A = sc.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
49
0
Question 4¶
The intent of the code above is to get all numbers below 50 from A and put them into B, and then get all numbers above 10 from B and put them into C. Fix the code so that it produces the desired behavior, by adding one line of code. You are not allowed to change the existing code.
A = sc.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
B.cache()
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
49
39
Question 5¶
Modify the PMI example by sending a_dict and n_dict inside the closure. Do not use broadcast variables.
By changing broadcast variable
n_dict = sc.broadcast(n_freqs.collectAsMap())
a_dict = sc.broadcast(a_freqs.collectAsMap())
to global variable
n_dict = n_freqs.collectAsMap()
a_dict = a_freqs.collectAsMap()
Question 6¶
The following code creates an RDD with 4 partitions: partition 0, 1, 2, and 3.
A = sc.parallelize(range(100), 4)
For each item in the RDD, add its partition number to it, and write the results to another RDD, i.e., the resulting RDD should contain:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102]
def f(splitIndex, iterator):
for i in iterator:
yield i + splitIndex
A = sc.parallelize(range(100), 4)
A.mapPartitionsWithIndex(f).collect()