Skip to content
!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 37.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=edc89a8f53022da82822d1b1bfb5ff85b57d72c5c4f9530d935fa927d268c34f
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Question 1

Load it into spark and use divide-and-conquer to find the first (adj, noun) pair in which the noun is 'unification'. Print the corresponding adjective. The skeleton code is provided below. One solution is to use filter() to find all pairs where the noun is 'unification', and then report the first one. This is inefficient. The better idea is to find, in parallel, the first such pair in each partition (if one exists), and then find the first partition that returns such a pair.

numPartitions = 10

lines = sc.textFile(path_to_file, numPartitions)
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()

# FILL IN YOUR CODE HERE
numPartitions = 10

def find_word(iterator):
    for adj, noun in iterator:
        if noun == "unification":
            yield (adj, noun)
            break

path_to_file = "/content/drive/My Drive/courses/HKUST/MSBD5003/data/adj_noun_pairs.txt"
lines = sc.textFile(path_to_file, numPartitions)
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()


words = pairs.mapPartitions(find_word).collect()
print(words[0][0])
several

Answer

numPartitions = 10

def find_word(iterator):
    for adj, noun in iterator:
        if noun == "unification":
            yield (adj, noun)
            break

path_to_file = "/content/drive/My Drive/courses/HKUST/MSBD5003/data/adj_noun_pairs.txt"
lines = sc.textFile(path_to_file, numPartitions)
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()


words = pairs.mapPartitions(find_word).collect()
print(words[0][0])

Question 2

Design a parallel divide-and-conquer algorithm for the following problem: Given two strings of equal length, compare them lexicographically. Output '<', '=', or '>', depending on the comparison result. The skeleton code is provided below. Your code should run on all partitions of the rdd in parallel.

x = 'abcccbcbcacaccacaabb'
y = 'abcccbcccacaccacaabb'

numPartitions = 4
rdd = sc.parallelize(zip(x,y), numPartitions)

# FILL IN YOUR CODE HERE
ord("A")
65
x = 'abcccbcbcacaccacaabb'
y = 'abcccbcccacaccacaabb'

def get_sum_by_partitions(iterator):
    sum_1 = 0
    sum_2 = 0
    for a, b in iterator:
        sum_1 += ord(a)
        sum_2 += ord(b)
    yield (sum_1, sum_2)

numPartitions = 4
rdd = sc.parallelize(zip(x,y), numPartitions)
sums = rdd.mapPartitions(get_sum_by_partitions).collect()

sum_1 = 0
sum_2 = 0

for s_1, s_2 in sums:
    sum_1 += s_1
    sum_2 += s_2

if sum_1 < sum_2:
    print("<")

elif sum_1 == sum_2:
    print("=")

else:
    print(">")
>

Answer

x = 'abcccbcbcacaccacaabb'
y = 'abcccbcccacaccacaabb'

def get_sum_by_partitions(iterator):
    sum_1 = 0
    sum_2 = 0
    for a, b in iterator:
        sum_1 += ord(a)
        sum_2 += ord(b)
    yield (sum_1, sum_2)

numPartitions = 4
rdd = sc.parallelize(zip(x,y), numPartitions)
sums = rdd.mapPartitions(get_sum_by_partitions).collect()

sum_1 = 0
sum_2 = 0

for s_1, s_2 in sums:
    sum_1 += s_1
    sum_2 += s_2

if sum_1 < sum_2:
    print("<")

elif sum_1 == sum_2:
    print("=")

else:
    print(">")