Skip to content

Algorithm Design

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 38.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=c2149ee5f1b68200a671dfab446e477b77a733c24b319c6e977cd2618ae1a79e
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1

Divide-and-Conquer

Classical D&C - Divide problem into 2 parts - Recursively solve each part - Combine the results together

D&C under big data systems

  • Divide problem into  partitions, where (ideally)  is the number of executors in the system
  • Solve the problem on each partition
  • Combine the results together

Example: sum(), reduce()

Prefix sums

Input: Sequence x of n elements, binary associative operator +

Output: Sequence y of n elements, with
yk = x1 + ... + xk

Example: x = [1, 4, 3, 5, 6, 7, 0, 1] y = [1, 5, 8, 13, 19, 26, 26, 27]

Algorithm: Compute sum for each partition Compute the prefix sums of the  sums Compute prefix sums in each partition

Time: O(2n)

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
x = [1, 4, 3, 5, 6, 7, 0, 1]
rdd = sc.parallelize(x, 4).cache()
def f(i):
    yield sum(i)

sums = rdd.mapPartitions(f).collect()

print(sums)

for i in range(1, len(sums)):
    sums[i] += sums[i - 1]

print(sums)
def g(index, iter):
    global sums
    if index == 0:
        s = 0
    else:
        s = sums[index - 1]
    for i in iter:
        s += i
        yield s

prefix_sums = rdd.mapPartitionsWithIndex(g)
print(prefix_sums.collect())
[5, 8, 13, 1]
[5, 13, 26, 27]
[1, 5, 8, 13, 19, 26, 26, 27]

Given a sequence of integers, check whether these numbers are monotonically decreasing.

x = [1, 3, 5, 6, 7, 8, 3]
rdd = sc.parallelize(x, 4).cache()

def f(it):
    first = next(it)
    last = first
    increasing = True
    for i in it:
        if i < last:
            increasing = False
        last = i
    yield increasing, first, last

results = rdd.mapPartitions(f).collect()
print(results)

increasing = True
if results[0][0] == False:
    increasing = False

else:
    for i in range(1, len(results)):
        if results[i][0] == False or results[i][1] < results[i - 1][2]:
            increasing = False

print(increasing)
[(True, 1, 1), (True, 3, 5), (True, 6, 7), (False, 8, 3)]
False

Maximum sub array

Level 1: Naively: 2 executors are working, all others idle time =  \(O(n/2)\) Smarter: \(L_m\) and \(R_m\)  can be found by the prefix-sum algorithm Can use all executors, time =$O(n/p) 

Level 2: We have 4 subarrays, and solve two prefix-sums for each subarray Each subarray has size , and we make sure that each has the same number of partitions Time = \(O(n / p\)

Level 3: Time = \(O(n / p)\) Stop recursion when each subarray is one partition.

Total time: \(O(\frac{n}{p}\times logp)\)