Skip to content

Internal

3
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

Finding Prime Numbers

Algorithm:

  • take every number from 2 to n
  • find all multiples of these numbers that are smaller than or equal to n (containing duplicates, but that’s ok)
  • subtract from all numbers these composite numbers
  • We see that all tasks but one finished quickly, while the last one takes a long time.

before more deep but less efficient

n = 5000
allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x))
prime = allnumbers.subtract(composite)
print(composite.take(10))
print(prime.take(10))
[4, 6, 8, 10, 12, 14, 16, 18, 20, 22]
[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
# Find the number of elements in each parttion
def partitionsize(it): 
    yield len(list(it))

print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[2][0:4])
[624, 625, 625, 625, 624, 625, 625, 625]
[4174, 4160, 4170, 4170, 4170, 4164, 4170, 4181]
[0, 81, 1, 84, 0, 81, 0, 87, 0, 80, 0, 84, 0, 87, 0, 84]
[2]

after efficient in time but more in total time

allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x)).repartition(8)
prime = allnumbers.subtract(composite)
print(composite.take(10))
print(prime.take(10))
[44, 46, 48, 50, 52, 54, 56, 58, 60, 62]
[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[1][0:4])
[624, 625, 625, 625, 624, 625, 625, 625]
[4174, 4160, 4170, 4170, 4170, 4164, 4170, 4181]
[0, 81, 1, 84, 0, 81, 0, 87, 0, 80, 0, 84, 0, 87, 0, 84]
[17, 97, 113, 193]

Data Partitioning

data = [8, 96, 240, 400, 1, 800, 4, 12]
rdd = sc.parallelize(zip(data, data),4)
print(rdd.partitioner)
print(rdd.glom().collect())
rdd = rdd.reduceByKey(lambda x,y: x+y)
print(rdd.glom().collect())
print(rdd.partitioner)
print(rdd.partitioner.partitionFunc)

rdd1 = rdd.map(lambda x: (x[0], x[1]+1))
print(rdd1.glom().collect())
print(rdd1.partitioner)

rdd2 = rdd.mapValues(lambda x: x+1)
print(rdd2.partitioner.partitionFunc)

rdd = rdd.sortByKey()
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)
rdd3 = rdd.mapValues(lambda x: x+1)
print(rdd3.partitioner.partitionFunc)
None
[[(8, 8), (96, 96)], [(240, 240), (400, 400)], [(1, 1), (800, 800)], [(4, 4), (12, 12)]]
[[(8, 8), (96, 96), (240, 240), (400, 400), (800, 800), (4, 4), (12, 12)], [(1, 1)], [], []]
<pyspark.rdd.Partitioner object at 0x7fd9be009198>
<function portable_hash at 0x7fd9cc338488>
[[(8, 9), (96, 97), (240, 241), (400, 401), (800, 801), (4, 5), (12, 13)], [(1, 2)], [], []]
None
<function portable_hash at 0x7fd9cc338488>
[[(1, 1), (4, 4), (8, 8)], [(12, 12), (96, 96)], [(240, 240), (400, 400)], [(800, 800)]]
<function RDD.sortByKey.<locals>.rangePartitioner at 0x7fd9be0b68c8>
<function RDD.sortByKey.<locals>.rangePartitioner at 0x7fd9be0b68c8>
def partitionsize(it): yield len(list(it))

n = 40000

def f(x):
    return x % 9

data1 = list(range(0, n, 16)) + list(range(0, n, 16))
data2 = range(0, n, 8)
rdd1 = sc.parallelize(zip(data1, data2), 8)
print(rdd1.mapPartitions(partitionsize).collect())
rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
print(rdd2.mapPartitions(partitionsize).collect())
rdd3 = rdd2.partitionBy(8, f)
print(rdd3.mapPartitions(partitionsize).collect())
rdd4 = rdd1.reduceByKey(lambda x,y: x+y, partitionFunc=f)
print(rdd4.mapPartitions(partitionsize).collect())
[625, 625, 625, 625, 625, 625, 625, 625]
[2500, 0, 0, 0, 0, 0, 0, 0]
[556, 278, 277, 278, 277, 278, 278, 278]
[556, 278, 277, 278, 277, 278, 278, 278]
a = sc.parallelize(zip(range(10000), range(10000)), 8)
b = sc.parallelize(zip(range(10000), range(10000)), 8)
print(a.partitioner)
a = a.reduceByKey(lambda x,y: x+y)
print(a.partitioner.partitionFunc)
b = b.reduceByKey(lambda x,y: x+y)
print(b.partitioner.partitionFunc)
c = a.join(b)
print(c.getNumPartitions())
print(c.partitioner.partitionFunc)
print(c.glom().first()[0:4])
None
<function portable_hash at 0x7fd9cc338488>
<function portable_hash at 0x7fd9cc338488>
8
<function portable_hash at 0x7fd9cc338488>
[(0, (0, 0)), (8, (8, 8)), (16, (16, 16)), (24, (24, 24))]

Partitioning in DataFrames

data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())
48
[[], [], [], [], [Row(_1=1, _2=2)], [], [], [], [], [Row(_1=1, _2=2)], [], [], [], [], [Row(_1=1, _2=3)], [], [], [], [], [Row(_1=2, _2=4)], [], [], [], [Row(_1=2, _2=5)], [], [], [], [], [Row(_1=2, _2=3)], [], [], [], [], [Row(_1=3, _2=1)], [], [], [], [], [Row(_1=3, _2=1)], [], [], [], [], [Row(_1=3, _2=2)], [], [], [], [Row(_1=4, _2=3)]]
df1 = df.repartition(6, df._1)
print(df1.rdd.glom().collect())
df1.show()
[[], [], [Row(_1=2, _2=4), Row(_1=2, _2=5), Row(_1=2, _2=3), Row(_1=4, _2=3)], [Row(_1=3, _2=1), Row(_1=3, _2=1), Row(_1=3, _2=2)], [], [Row(_1=1, _2=2), Row(_1=1, _2=2), Row(_1=1, _2=3)]]
+---+---+
| _1| _2|
+---+---+
|  2|  4|
|  2|  5|
|  2|  3|
|  4|  3|
|  3|  1|
|  3|  1|
|  3|  2|
|  1|  2|
|  1|  2|
|  1|  3|
+---+---+
# A 'real' example from SF Express
# Prepare three relational tables

from pyspark.sql.functions import *

num_waybills = 1000
num_customers = 100

rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybills = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'), 
                                             floor(rand()*num_customers).alias('customer')) \
                .repartition('waybill')\
                .cache()
waybills.show()
print(waybills.count())

rdd = sc.parallelize((i, i) for i in range(num_customers))
customers = spark.createDataFrame(rdd, ['customer', 'phone']).cache()
customers.show()
print(customers.count())

rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybill_status = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias('waybill'), 
                                                   floor(rand()*10).alias('version')) \
                      .groupBy('waybill').max('version').cache()
waybill_status.show()
print(waybill_status.count())
+-------+--------+
|waybill|customer|
+-------+--------+
|    964|      90|
|    474|      10|
|     26|      73|
|     26|      66|
|    191|      56|
|    191|      89|
|    541|      73|
|    541|       2|
|    938|      12|
|    278|      78|
|    720|      93|
|    705|      11|
|    367|      22|
|    442|      12|
|    442|      91|
|    367|       1|
|    367|      48|
|    296|      62|
|    926|      86|
|    965|       9|
+-------+--------+
only showing top 20 rows

1000
+--------+-----+
|customer|phone|
+--------+-----+
|       0|    0|
|       1|    1|
|       2|    2|
|       3|    3|
|       4|    4|
|       5|    5|
|       6|    6|
|       7|    7|
|       8|    8|
|       9|    9|
|      10|   10|
|      11|   11|
|      12|   12|
|      13|   13|
|      14|   14|
|      15|   15|
|      16|   16|
|      17|   17|
|      18|   18|
|      19|   19|
+--------+-----+
only showing top 20 rows

100
+-------+------------+
|waybill|max(version)|
+-------+------------+
|    474|           4|
|    964|           4|
|     29|           8|
|    191|           7|
|    541|           5|
|    293|           3|
|    270|           7|
|    938|           3|
|    730|           8|
|    243|           3|
|    278|           9|
|    367|           5|
|    442|           6|
|     54|           9|
|     19|           1|
|    965|           9|
|    926|           6|
|    296|           0|
|      0|           5|
|    287|           4|
+-------+------------+
only showing top 20 rows

635
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.

# waybills.join(customers, 'customer').join(waybill_status, 'waybill').show()
waybills.join(waybill_status, 'waybill').join(customers, 'customer').show()
+--------+-------+------------+-----+
|customer|waybill|max(version)|phone|
+--------+-------+------------+-----+
|      90|    964|           4|   90|
|      10|    474|           4|   10|
|      56|    191|           7|   56|
|      89|    191|           7|   89|
|      73|    541|           5|   73|
|       2|    541|           5|    2|
|      12|    938|           3|   12|
|      78|    278|           9|   78|
|      22|    367|           5|   22|
|      12|    442|           6|   12|
|      91|    442|           6|   91|
|       1|    367|           5|    1|
|      48|    367|           5|   48|
|      62|    296|           0|   62|
|      86|    926|           6|   86|
|       9|    965|           9|    9|
|      22|     19|           1|   22|
|      45|     54|           9|   45|
|      73|    926|           6|   73|
|      10|    926|           6|   10|
+--------+-------+------------+-----+
only showing top 20 rows

Threading

import threading
import random

partitions = 20
n = 5000000 * partitions

# use different seeds in different threads and different partitions
# a bit ugly, since mapPartitionsWithIndex takes a function with only index
# and it as parameters
def f1(index, it):
    random.seed(index + 987231)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f2(index, it):
    random.seed(index + 987232)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f3(index, it):
    random.seed(index + 987233)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f4(index, it):
    random.seed(index + 987234)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

def f5(index, it):
    random.seed(index + 987245)
    for i in it:
        x = random.random() * 2 - 1
        y = random.random() * 2 - 1
        yield 1 if x ** 2 + y ** 2 < 1 else 0

f = [f1, f2, f3, f4, f5]

# the function executed in each thread/job
def dojob(i):
    count = sc.parallelize(range(1, n + 1), partitions) \
              .mapPartitionsWithIndex(f[i]).reduce(lambda a,b: a+b)
    print("Worker", i, "reports: Pi is roughly", 4.0 * count / n)

# create and execute the threads
threads = []
for i in range(5):
    t = threading.Thread(target=dojob, args=(i,))
    threads += [t]
    t.start()

# wait for all threads to complete
for t in threads:
    t.join()    
Worker 1 reports: Pi is roughly 3.14160468
Worker 0 reports: Pi is roughly 3.14166108
Worker 2 reports: Pi is roughly 3.141534
Worker 3 reports: Pi is roughly 3.14153212
Worker 4 reports: Pi is roughly 3.1413932