Internal
3
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
Finding Prime Numbers¶
Algorithm:
- take every number from 2 to n
- find all multiples of these numbers that are smaller than or equal to n (containing duplicates, but that’s ok)
- subtract from all numbers these composite numbers
- We see that all tasks but one finished quickly, while the last one takes a long time.
before more deep but less efficient
n = 5000
allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x))
prime = allnumbers.subtract(composite)
print(composite.take(10))
print(prime.take(10))
[4, 6, 8, 10, 12, 14, 16, 18, 20, 22]
[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
# Find the number of elements in each parttion
def partitionsize(it):
yield len(list(it))
print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[2][0:4])
[624, 625, 625, 625, 624, 625, 625, 625]
[4174, 4160, 4170, 4170, 4170, 4164, 4170, 4181]
[0, 81, 1, 84, 0, 81, 0, 87, 0, 80, 0, 84, 0, 87, 0, 84]
[2]
after efficient in time but more in total time
allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x)).repartition(8)
prime = allnumbers.subtract(composite)
print(composite.take(10))
print(prime.take(10))
[44, 46, 48, 50, 52, 54, 56, 58, 60, 62]
[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[1][0:4])
[624, 625, 625, 625, 624, 625, 625, 625]
[4174, 4160, 4170, 4170, 4170, 4164, 4170, 4181]
[0, 81, 1, 84, 0, 81, 0, 87, 0, 80, 0, 84, 0, 87, 0, 84]
[17, 97, 113, 193]
Data Partitioning¶
data = [8, 96, 240, 400, 1, 800, 4, 12]
rdd = sc.parallelize(zip(data, data),4)
print(rdd.partitioner)
print(rdd.glom().collect())
rdd = rdd.reduceByKey(lambda x,y: x+y)
print(rdd.glom().collect())
print(rdd.partitioner)
print(rdd.partitioner.partitionFunc)
rdd1 = rdd.map(lambda x: (x[0], x[1]+1))
print(rdd1.glom().collect())
print(rdd1.partitioner)
rdd2 = rdd.mapValues(lambda x: x+1)
print(rdd2.partitioner.partitionFunc)
rdd = rdd.sortByKey()
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)
rdd3 = rdd.mapValues(lambda x: x+1)
print(rdd3.partitioner.partitionFunc)
None
[[(8, 8), (96, 96)], [(240, 240), (400, 400)], [(1, 1), (800, 800)], [(4, 4), (12, 12)]]
[[(8, 8), (96, 96), (240, 240), (400, 400), (800, 800), (4, 4), (12, 12)], [(1, 1)], [], []]
<pyspark.rdd.Partitioner object at 0x7fd9be009198>
<function portable_hash at 0x7fd9cc338488>
[[(8, 9), (96, 97), (240, 241), (400, 401), (800, 801), (4, 5), (12, 13)], [(1, 2)], [], []]
None
<function portable_hash at 0x7fd9cc338488>
[[(1, 1), (4, 4), (8, 8)], [(12, 12), (96, 96)], [(240, 240), (400, 400)], [(800, 800)]]
<function RDD.sortByKey.<locals>.rangePartitioner at 0x7fd9be0b68c8>
<function RDD.sortByKey.<locals>.rangePartitioner at 0x7fd9be0b68c8>
def partitionsize(it): yield len(list(it))
n = 40000
def f(x):
return x % 9
data1 = list(range(0, n, 16)) + list(range(0, n, 16))
data2 = range(0, n, 8)
rdd1 = sc.parallelize(zip(data1, data2), 8)
print(rdd1.mapPartitions(partitionsize).collect())
rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
print(rdd2.mapPartitions(partitionsize).collect())
rdd3 = rdd2.partitionBy(8, f)
print(rdd3.mapPartitions(partitionsize).collect())
rdd4 = rdd1.reduceByKey(lambda x,y: x+y, partitionFunc=f)
print(rdd4.mapPartitions(partitionsize).collect())
[625, 625, 625, 625, 625, 625, 625, 625]
[2500, 0, 0, 0, 0, 0, 0, 0]
[556, 278, 277, 278, 277, 278, 278, 278]
[556, 278, 277, 278, 277, 278, 278, 278]
a = sc.parallelize(zip(range(10000), range(10000)), 8)
b = sc.parallelize(zip(range(10000), range(10000)), 8)
print(a.partitioner)
a = a.reduceByKey(lambda x,y: x+y)
print(a.partitioner.partitionFunc)
b = b.reduceByKey(lambda x,y: x+y)
print(b.partitioner.partitionFunc)
c = a.join(b)
print(c.getNumPartitions())
print(c.partitioner.partitionFunc)
print(c.glom().first()[0:4])
None
<function portable_hash at 0x7fd9cc338488>
<function portable_hash at 0x7fd9cc338488>
8
<function portable_hash at 0x7fd9cc338488>
[(0, (0, 0)), (8, (8, 8)), (16, (16, 16)), (24, (24, 24))]
Partitioning in DataFrames¶
data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())
48
[[], [], [], [], [Row(_1=1, _2=2)], [], [], [], [], [Row(_1=1, _2=2)], [], [], [], [], [Row(_1=1, _2=3)], [], [], [], [], [Row(_1=2, _2=4)], [], [], [], [Row(_1=2, _2=5)], [], [], [], [], [Row(_1=2, _2=3)], [], [], [], [], [Row(_1=3, _2=1)], [], [], [], [], [Row(_1=3, _2=1)], [], [], [], [], [Row(_1=3, _2=2)], [], [], [], [Row(_1=4, _2=3)]]
df1 = df.repartition(6, df._1)
print(df1.rdd.glom().collect())
df1.show()
[[], [], [Row(_1=2, _2=4), Row(_1=2, _2=5), Row(_1=2, _2=3), Row(_1=4, _2=3)], [Row(_1=3, _2=1), Row(_1=3, _2=1), Row(_1=3, _2=2)], [], [Row(_1=1, _2=2), Row(_1=1, _2=2), Row(_1=1, _2=3)]]
+---+---+
| _1| _2|
+---+---+
| 2| 4|
| 2| 5|
| 2| 3|
| 4| 3|
| 3| 1|
| 3| 1|
| 3| 2|
| 1| 2|
| 1| 2|
| 1| 3|
+---+---+
# A 'real' example from SF Express
# Prepare three relational tables
from pyspark.sql.functions import *
num_waybills = 1000
num_customers = 100
rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybills = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'),
floor(rand()*num_customers).alias('customer')) \
.repartition('waybill')\
.cache()
waybills.show()
print(waybills.count())
rdd = sc.parallelize((i, i) for i in range(num_customers))
customers = spark.createDataFrame(rdd, ['customer', 'phone']).cache()
customers.show()
print(customers.count())
rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybill_status = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'),
floor(rand()*10).alias('version')) \
.groupBy('waybill').max('version').cache()
waybill_status.show()
print(waybill_status.count())
+-------+--------+
|waybill|customer|
+-------+--------+
| 964| 90|
| 474| 10|
| 26| 73|
| 26| 66|
| 191| 56|
| 191| 89|
| 541| 73|
| 541| 2|
| 938| 12|
| 278| 78|
| 720| 93|
| 705| 11|
| 367| 22|
| 442| 12|
| 442| 91|
| 367| 1|
| 367| 48|
| 296| 62|
| 926| 86|
| 965| 9|
+-------+--------+
only showing top 20 rows
1000
+--------+-----+
|customer|phone|
+--------+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+--------+-----+
only showing top 20 rows
100
+-------+------------+
|waybill|max(version)|
+-------+------------+
| 474| 4|
| 964| 4|
| 29| 8|
| 191| 7|
| 541| 5|
| 293| 3|
| 270| 7|
| 938| 3|
| 730| 8|
| 243| 3|
| 278| 9|
| 367| 5|
| 442| 6|
| 54| 9|
| 19| 1|
| 965| 9|
| 926| 6|
| 296| 0|
| 0| 5|
| 287| 4|
+-------+------------+
only showing top 20 rows
635
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.
# waybills.join(customers, 'customer').join(waybill_status, 'waybill').show()
waybills.join(waybill_status, 'waybill').join(customers, 'customer').show()
+--------+-------+------------+-----+
|customer|waybill|max(version)|phone|
+--------+-------+------------+-----+
| 90| 964| 4| 90|
| 10| 474| 4| 10|
| 56| 191| 7| 56|
| 89| 191| 7| 89|
| 73| 541| 5| 73|
| 2| 541| 5| 2|
| 12| 938| 3| 12|
| 78| 278| 9| 78|
| 22| 367| 5| 22|
| 12| 442| 6| 12|
| 91| 442| 6| 91|
| 1| 367| 5| 1|
| 48| 367| 5| 48|
| 62| 296| 0| 62|
| 86| 926| 6| 86|
| 9| 965| 9| 9|
| 22| 19| 1| 22|
| 45| 54| 9| 45|
| 73| 926| 6| 73|
| 10| 926| 6| 10|
+--------+-------+------------+-----+
only showing top 20 rows
Threading¶
import threading
import random
partitions = 20
n = 5000000 * partitions
# use different seeds in different threads and different partitions
# a bit ugly, since mapPartitionsWithIndex takes a function with only index
# and it as parameters
def f1(index, it):
random.seed(index + 987231)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f2(index, it):
random.seed(index + 987232)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f3(index, it):
random.seed(index + 987233)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f4(index, it):
random.seed(index + 987234)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f5(index, it):
random.seed(index + 987245)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
f = [f1, f2, f3, f4, f5]
# the function executed in each thread/job
def dojob(i):
count = sc.parallelize(range(1, n + 1), partitions) \
.mapPartitionsWithIndex(f[i]).reduce(lambda a,b: a+b)
print("Worker", i, "reports: Pi is roughly", 4.0 * count / n)
# create and execute the threads
threads = []
for i in range(5):
t = threading.Thread(target=dojob, args=(i,))
threads += [t]
t.start()
# wait for all threads to complete
for t in threads:
t.join()
Worker 1 reports: Pi is roughly 3.14160468
Worker 0 reports: Pi is roughly 3.14166108
Worker 2 reports: Pi is roughly 3.141534
Worker 3 reports: Pi is roughly 3.14153212
Worker 4 reports: Pi is roughly 3.1413932