Skip to content

Dc

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 47.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=bd505a7a219c64c9cf329fc493b951b33584b8631e2c6893141869ec65dcc4d0
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Prefix Sums

x = [1, 4, 3, 5, 6, 7, 0, 1]

rdd = sc.parallelize(x, 4).cache()

def f(iterator):
    yield sum(iterator)

sums = rdd.mapPartitions(f).collect()

print(sums)

for i in range(1, len(sums)):
    sums[i] += sums[i-1]

print(sums)

def g(index, iterator):
    global sums
    if index == 0:
        s = 0
    else:
        s = sums[index-1]
    for i in iterator:
        s += i
        yield s

prefix_sums = rdd.mapPartitionsWithIndex(g)
print(prefix_sums.collect())
[5, 8, 13, 1]
[5, 13, 26, 27]
[1, 5, 8, 13, 19, 26, 26, 27]

Monotocity checking

x = [1, 3, 4, 5, 7, 3, 10, 14, 16, 20, 21, 24, 24, 26, 27, 30]

rdd = sc.parallelize(x, 4).cache()

def f(it):
    first = next(it)
    last = first
    increasing = True
    for i in it:
        if i < last:
            increasing = False
        last = i
    yield increasing, first, last

results = rdd.mapPartitions(f).collect()

print(results)

increasing = True
if results[0][0] == False:
    increasing = False
else:
    for i in range(1, len(results)):
        if results[i][0] == False or results[i][1] < results[i-1][2]:
            increasing = False
if increasing:
    print("Monotone")
else:
    print("Not monotone")
[(True, 1, 5), (False, 7, 14), (True, 16, 24), (True, 24, 30)]
Not monotone

Maximum Subarray Problem

# Classical divide and conquer algorithm

A = [-3, 2, 1, -4, 5, 2, -1, 3, -1]

def MaxSubarray(A, p, r):
    if p == r:
        return A[p]
    q = (p+r)//2
    M1 = MaxSubarray(A, p, q)
    M2 = MaxSubarray(A, q+1, r)
    Lm = -float('inf')
    Rm = Lm
    V = 0
    for i in range(q, p-1, -1):
        V += A[i]
        if V > Lm:
            Lm = V
    V = 0
    for i in range(q+1, r+1):
        V += A[i]
        if V > Rm:
            Rm = V
    return max(M1, M2, Lm+Rm)

print(MaxSubarray(A, 0, len(A)-1))
9
# Linear-time algorithm
# Written in a way so that we can call it for each partition

def linear_time(it):
    Vmax = -float('inf')
    V = 0
    for Ai in it:
        V += Ai
        if V > Vmax:
            Vmax = V
        if V < 0:
            V = 0
    yield Vmax

print(next(linear_time(A)))
9
# The Spark algorithm:

def compute_sum(it):
    yield sum(it)

def compute_LmRm(index, it):
    Lm = -float('inf')
    Rm = -float('inf')
    L = sums[index]
    R = 0
    for Ai in it:
        L -= Ai
        R += Ai
        if L > Lm:
            Lm = L
        if R > Rm:
            Rm = R
    yield (Lm, Rm)

num_partitions = 4
rdd = sc.parallelize(A, num_partitions).cache()
sums = rdd.mapPartitions(compute_sum).collect()
print(sums)
LmRms = rdd.mapPartitionsWithIndex(compute_LmRm).collect()
print(LmRms)
best = max(rdd.mapPartitions(linear_time).collect())

for i in range(num_partitions-1):
    for j in range(i+1, num_partitions):
        x = LmRms[i][0] + sum(sums[i+1:j]) + LmRms[j][1]
        if x > best:
            best = x

print(best)
[-1, -3, 7, 1]
[(2, -1), (0, 1), (2, 7), (2, 2)]
9