Skip to content

Rdd

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 43.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=2e05d41450b321a95e338481ca93d3ac73ca8949d26874ce7b1a793684007856
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from google.colab import drive
drive.mount('/content/drive')
Mounted at /content/drive
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

How do I make an RDD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from files on the local drive. All data files can be downloaded from https://www.cse.ust.hk/msbd5003/data/ For example, https://www.cse.ust.hk/msbd5003/data/fruits.txt

# Read data from local file system:
print(sc.version)

fruits = sc.textFile('../data/fruits.txt')
yellowThings = sc.textFile('../data/yellowthings.txt')
print(fruits.collect())
print(yellowThings.collect())
3.0.0
['apple', 'banana', 'canary melon', 'grape', 'lemon', 'orange', 'pineapple', 'strawberry']
['banana', 'bee', 'butter', 'canary melon', 'gold', 'lemon', 'pineapple', 'sunflower']
# Read data from HDFS :
fruits = sc.textFile('hdfs://url:9000/pathname/fruits.txt')
fruits.collect()

RDD operations

# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
# fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
fruitsReversed.persist()
# try changing the file and re-execute with and without cache
print(fruitsReversed.collect())
# What happens when you uncomment the first line and run the whole program again with cache()?
['elppab', 'ananab', 'nolem yranac', 'parg', 'nomel', 'egnaro', 'elppaenip', 'yrrebwarts']
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
print(shortFruits.collect())
['grap', 'lemon']
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
print(characters.collect())
['b', 'a', 'p', 'p', 'l', 'e', 'b', 'a', 'n', 'a', 'n', 'a', 'c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n', 'g', 'r', 'a', 'p', 'l', 'e', 'm', 'o', 'n', 'o', 'r', 'a', 'n', 'g', 'e', 'p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e', 's', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y']
# union
fruitsAndYellowThings = fruits.union(yellowThings)
print(fruitsAndYellowThings.collect())
['bapple', 'banana', 'canary melon', 'grap', 'lemon', 'orange', 'pineapple', 'strawberry', 'banana', 'bee', 'butter', 'canary melon', 'gold', 'lemon', 'pineapple', 'sunflower']
# intersection
yellowFruits = fruits.intersection(yellowThings)
print(yellowFruits.collect())
['pineapple', 'canary melon', 'lemon', 'banana']
# distinct
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
print(distinctFruitsAndYellowThings.collect())
['orange', 'pineapple', 'canary melon', 'lemon', 'bee', 'banana', 'butter', 'gold', 'sunflower', 'apple', 'grap', 'strawberry']

RDD actions

Following are examples of some of the common actions available. For a detailed list, see RDD Actions.

Run some transformations below to understand this better. Place the cursor in the cell and press SHIFT + ENTER.

# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
print(fruitsArray)
['apple', 'banana', 'canary melon', 'grap', 'lemon', 'orange', 'pineapple', 'strawberry']
# count
numFruits = fruits.count()
print(numFruits)
8
# take
first3Fruits = fruits.take(3)
print(first3Fruits)
['apple', 'banana', 'canary melon']
# reduce
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
print(letterSet)
{'o', 'r', 'a', 'i', 'p', 'g', 'c', ' ', 'l', 'y', 'e', 'w', 'n', 'b', 'm', 't', 's'}
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
print(letterSet)
['p', 'l', 'b', 'c', 'r', 'y', 'g', 'i', 's', 'a', 'e', 'n', ' ', 'm', 'o', 't', 'w']

Closure

counter = 0
rdd = sc.parallelize(range(10))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x

print(rdd.collect())
rdd.foreach(increment_counter)

print(counter)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
0
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x

a = rdd.foreach(g)

print(accum.value)
-45
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g)
print(accum.value)
#print(a.reduce(lambda x, y: x+y))
a.cache()
tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y))

tmp = a.count()
print(accum.value)
print(rdd.reduce(lambda x, y: x+y))
0
45
45
45
45

Computing Pi using Monte Carlo simulation

# From the official spark examples.

import random
import time

partitions = 1000
n = 1000 * partitions

def f(_):
    x = random.random()
    y = random.random()
    return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(range(1, n + 1), partitions) \
          .map(f).sum()

print("Pi is roughly", 4.0 * count / n)
Pi is roughly 3.140944
# Example: glom
import sys
import random

def f(_):
    random.seed(time.time())
    return random.random()

a = sc.parallelize(range(0,100),10)
print(a.collect())
print(a.glom().collect())
print(a.map(f).glom().collect())

# Weird behavior: Initially, random numbers are synched across all workers, but will get 
# out-of-sync after a large (e.g, 1000000) number of random numbers have been generated.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]
[[0.6608713426170987, 0.698767024318554, 0.1874105777790005, 0.7623702078433652, 0.8851287594440702, 0.31740294580255735, 0.19323310102732394, 0.42450071105921683, 0.5933781451859748, 0.7458943680790939], [0.7261502175930336, 0.22659503054053598, 0.9192074261481535, 0.4774662604141523, 0.7974422880272903, 0.2584976474338707, 0.6055611352765481, 0.5244790798752513, 0.6861813792912159, 0.5652815222674437], [0.27860057141024697, 0.27383515025078553, 0.9176819782462265, 0.417689753313761, 0.6135860183360143, 0.8162090147099693, 0.39224804876974406, 0.543173888187219, 0.3098912544023783, 0.633182881742779], [0.0952563896474653, 0.7477071810186972, 0.5004564582092008, 0.2614834043253954, 0.5982982446751687, 0.8544002333592715, 0.26000819037953216, 0.40177311792144454, 0.03851083747397188, 0.05167636277510712], [0.9726302497724043, 0.42432064255976365, 0.9305610323744404, 0.771694551386715, 0.6789841281422876, 0.9487832709253969, 0.4943030306526911, 0.22888583384514705, 0.6165263440265218, 0.8948635092093183], [0.9816006872849989, 0.3233518004555158, 0.6660672115030636, 0.9921564654020117, 0.9574487554669273, 0.00033642413291157247, 0.5729463981674527, 0.63676146970985, 0.1068707761119706, 0.4974835849045728], [0.6877782810075579, 0.11000878013616322, 0.6630366287015564, 0.0320757478156235, 0.5550374523078817, 0.11429763248899893, 0.7746616174182379, 0.6935564378314162, 0.6081187039755812, 0.3594774747771995], [0.3402744125431225, 0.8533066685831103, 0.18605963113570156, 0.9700428171414653, 0.9046533776474858, 0.4199976147427207, 0.01833313615444565, 0.5003118405702941, 0.9167261953361863, 0.6543553598701435], [0.5463089308369264, 0.19187434980340723, 0.5311179490604816, 0.7210872364087648, 0.25848050944241396, 0.9138829006068386, 0.5015098582184656, 0.9245322749204768, 0.4746635193819774, 0.733561516539988], [0.5924804586325896, 0.44157691425623313, 0.06474182310396659, 0.3705313104712945, 0.218280453275444, 0.911250263493956, 0.4908690024649712, 0.031427016100674665, 0.3749922950484815, 0.29534800562581187]]
# Example: mapPartition and mapPartitionWithIndex
a = sc.parallelize(range(0,20),4)
print(a.glom().collect())

def f(it):
    s = 0
    for i in it:
        s += i
        yield s

print(a.mapPartitions(f).collect())

def f(index, it):
    s = index
    for i in it:
        s += i
        yield s

print(a.mapPartitionsWithIndex(f).collect())
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]
[0, 1, 3, 6, 10, 5, 11, 18, 26, 35, 10, 21, 33, 46, 60, 15, 31, 48, 66, 85]
[0, 1, 3, 6, 10, 6, 12, 19, 27, 36, 12, 23, 35, 48, 62, 18, 34, 51, 69, 88]
# Correct version
import random
import time

partitions = 1000
n = 1000 * partitions

seed = time.time()

def f(index, it):
    random.seed(index + seed)
    for i in it:
        x = random.random()
        y = random.random()
        yield 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(range(1, n + 1), partitions) \
          .mapPartitionsWithIndex(f).sum()

print("Pi is roughly", 4.0 * count / n)
Pi is roughly 3.141832

Closure and Persistence

# RDD variables are references
A = sc.parallelize(range(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    A1.cache()
    A2.cache()
    mid = A1.count()
    if mid == k:
        print(x)
        break

    if k < mid:
        A = A1
    else:
        A = A2
        k = k - mid - 1
43
sorted(data)
[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]
A = sc.parallelize(range(10))

x = 5
B = A.filter(lambda z: z < x)
# B.cache()
print(B.count())
x = 3
print(B.count())
5
5
A = sc.parallelize(range(10))

x = 5
B = A.filter(lambda z: z < x)
# B.cache()
B.unpersist()
# print(B.take(10))
print(B.collect())

x = 3
#print(B.take(10))
print(B.collect())
# collect() doesn't always re-collect data - bad design!
# Always use take() instead of collect()
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]

Key-Value Pairs

# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
print(numFruitsByLength.take(10))
from operator import add

lines = sc.textFile('../data/course.txt')
counts = lines.flatMap(lambda x: x.split()) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
print(counts.sortByKey().take(20))
print(counts.sortBy(lambda x: x[1], False).take(20))
# Join simple example

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(products.join(trans).take(20))

K-means clustering

import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split()])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01  
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))

    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,
    # (array of sums of coordinates, total number of points assigned))

    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers

    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers

    for (i, p) in newCenters:
        kCenters[i] = p

print("Final centers: ", kCenters)

PageRank

import re
from operator import add

def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey()

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)
print('ranks', ranks.collect())
print('links', links.collect())
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0],
                                             url_urls_rank[1][1]))
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))

    # Re-calculates URL ranks based on neighbor contributions.
    # ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    ranks = contribs.reduceByKey(add).map(lambda t: (t[0], t[1] * 0.85 + 0.15))

print(ranks.top(5, lambda x: x[1]))
ranks [('1', 1.0), ('4', 1.0), ('2', 1.0), ('3', 1.0)]
links [('1', <pyspark.resultiterable.ResultIterable object at 0x7f8b12a20ef0>), ('4', <pyspark.resultiterable.ResultIterable object at 0x7f8b12a20a58>), ('2', <pyspark.resultiterable.ResultIterable object at 0x7f8b12a20780>), ('3', <pyspark.resultiterable.ResultIterable object at 0x7f8b12a207f0>)]
[('1', 1.2981882732854677), ('4', 0.9999999999999998), ('3', 0.9999999999999998), ('2', 0.7018117267145316)]

Join vs. Broadcast Variables

products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(trans.join(products).take(20))
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

broadcasted_products = sc.broadcast(products)

results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
#  results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print(results.take(20))