Dc
Collecting pyspark
[?25l Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K |████████████████████████████████| 204.2MB 63kB/s
[?25hCollecting py4j==0.10.9
[?25l Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K |████████████████████████████████| 204kB 47.6MB/s
[?25hBuilding wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=bd505a7a219c64c9cf329fc493b951b33584b8631e2c6893141869ec65dcc4d0
Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
Prefix Sums
x = [1, 4, 3, 5, 6, 7, 0, 1]
rdd = sc.parallelize(x, 4).cache()
def f(iterator):
yield sum(iterator)
sums = rdd.mapPartitions(f).collect()
print(sums)
for i in range(1, len(sums)):
sums[i] += sums[i-1]
print(sums)
def g(index, iterator):
global sums
if index == 0:
s = 0
else:
s = sums[index-1]
for i in iterator:
s += i
yield s
prefix_sums = rdd.mapPartitionsWithIndex(g)
print(prefix_sums.collect())
[5, 8, 13, 1]
[5, 13, 26, 27]
[1, 5, 8, 13, 19, 26, 26, 27]
Monotocity checking
x = [1, 3, 4, 5, 7, 3, 10, 14, 16, 20, 21, 24, 24, 26, 27, 30]
rdd = sc.parallelize(x, 4).cache()
def f(it):
first = next(it)
last = first
increasing = True
for i in it:
if i < last:
increasing = False
last = i
yield increasing, first, last
results = rdd.mapPartitions(f).collect()
print(results)
increasing = True
if results[0][0] == False:
increasing = False
else:
for i in range(1, len(results)):
if results[i][0] == False or results[i][1] < results[i-1][2]:
increasing = False
if increasing:
print("Monotone")
else:
print("Not monotone")
[(True, 1, 5), (False, 7, 14), (True, 16, 24), (True, 24, 30)]
Not monotone
Maximum Subarray Problem
# Classical divide and conquer algorithm
A = [-3, 2, 1, -4, 5, 2, -1, 3, -1]
def MaxSubarray(A, p, r):
if p == r:
return A[p]
q = (p+r)//2
M1 = MaxSubarray(A, p, q)
M2 = MaxSubarray(A, q+1, r)
Lm = -float('inf')
Rm = Lm
V = 0
for i in range(q, p-1, -1):
V += A[i]
if V > Lm:
Lm = V
V = 0
for i in range(q+1, r+1):
V += A[i]
if V > Rm:
Rm = V
return max(M1, M2, Lm+Rm)
print(MaxSubarray(A, 0, len(A)-1))
# Linear-time algorithm
# Written in a way so that we can call it for each partition
def linear_time(it):
Vmax = -float('inf')
V = 0
for Ai in it:
V += Ai
if V > Vmax:
Vmax = V
if V < 0:
V = 0
yield Vmax
print(next(linear_time(A)))
# The Spark algorithm:
def compute_sum(it):
yield sum(it)
def compute_LmRm(index, it):
Lm = -float('inf')
Rm = -float('inf')
L = sums[index]
R = 0
for Ai in it:
L -= Ai
R += Ai
if L > Lm:
Lm = L
if R > Rm:
Rm = R
yield (Lm, Rm)
num_partitions = 4
rdd = sc.parallelize(A, num_partitions).cache()
sums = rdd.mapPartitions(compute_sum).collect()
print(sums)
LmRms = rdd.mapPartitionsWithIndex(compute_LmRm).collect()
print(LmRms)
best = max(rdd.mapPartitions(linear_time).collect())
for i in range(num_partitions-1):
for j in range(i+1, num_partitions):
x = LmRms[i][0] + sum(sums[i+1:j]) + LmRms[j][1]
if x > best:
best = x
print(best)
[-1, -3, 7, 1]
[(2, -1), (0, 1), (2, 7), (2, 2)]
9