Algorithm Design¶
!pip install pyspark
Collecting pyspark
[?25l Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K |████████████████████████████████| 204.2MB 63kB/s
[?25hCollecting py4j==0.10.9
[?25l Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K |████████████████████████████████| 204kB 38.8MB/s
[?25hBuilding wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=c2149ee5f1b68200a671dfab446e477b77a733c24b319c6e977cd2618ae1a79e
Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
Divide-and-Conquer¶
Classical D&C - Divide problem into 2 parts - Recursively solve each part - Combine the results together
D&C under big data systems
- Divide problem into  partitions, where (ideally)  is the number of executors in the system
- Solve the problem on each partition
- Combine the results together
Example: sum(), reduce()
Prefix sums¶
Input: Sequence x of n elements, binary associative operator +
Output: Sequence y of n elements, with yk = x1 + ... + xk
Example: x = [1, 4, 3, 5, 6, 7, 0, 1] y = [1, 5, 8, 13, 19, 26, 26, 27]
Algorithm: Compute sum for each partition Compute the prefix sums of the  sums Compute prefix sums in each partition
Time: O(2n)
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
x = [1, 4, 3, 5, 6, 7, 0, 1]
rdd = sc.parallelize(x, 4).cache()
def f(i):
yield sum(i)
sums = rdd.mapPartitions(f).collect()
print(sums)
for i in range(1, len(sums)):
sums[i] += sums[i - 1]
print(sums)
def g(index, iter):
global sums
if index == 0:
s = 0
else:
s = sums[index - 1]
for i in iter:
s += i
yield s
prefix_sums = rdd.mapPartitionsWithIndex(g)
print(prefix_sums.collect())
[5, 8, 13, 1]
[5, 13, 26, 27]
[1, 5, 8, 13, 19, 26, 26, 27]
Given a sequence of integers, check whether these numbers are monotonically decreasing.
x = [1, 3, 5, 6, 7, 8, 3]
rdd = sc.parallelize(x, 4).cache()
def f(it):
first = next(it)
last = first
increasing = True
for i in it:
if i < last:
increasing = False
last = i
yield increasing, first, last
results = rdd.mapPartitions(f).collect()
print(results)
increasing = True
if results[0][0] == False:
increasing = False
else:
for i in range(1, len(results)):
if results[i][0] == False or results[i][1] < results[i - 1][2]:
increasing = False
print(increasing)
[(True, 1, 1), (True, 3, 5), (True, 6, 7), (False, 8, 3)]
False
Maximum sub array¶
Level 1: Naively: 2 executors are working, all others idle time =  \(O(n/2)\) Smarter: \(L_m\) and \(R_m\)  can be found by the prefix-sum algorithm Can use all executors, time =$O(n/p) 
Level 2: We have 4 subarrays, and solve two prefix-sums for each subarray Each subarray has size , and we make sure that each has the same number of partitions Time = \(O(n / p\)
Level 3: Time = \(O(n / p)\) Stop recursion when each subarray is one partition.
Total time: \(O(\frac{n}{p}\times logp)\)