Spark basics and RDD¶
!pip install pyspark
Collecting pyspark
[?25l Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K |████████████████████████████████| 204.2MB 66kB/s
[?25hCollecting py4j==0.10.9
[?25l Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K |████████████████████████████████| 204kB 45.5MB/s
[?25hBuilding wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=4cb1ed4d5de7d731528bf244d817e4b9dd399567b667d0ffe949093308211f45
Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
fruits = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/fruits.txt', 4)
Why is Map/Reduce bad?¶
Programming model too restricted
Iterative jobs involve a lot of disk I/O
Why is spark¶
Fast and Expressive Cluster Computing Engine Compatible with Apache Hadoop
Efficient
- General execution graphs
- In-memory storage
Usable - Rich APIs in Java, scala, python - Interactive shell
| Hadoop | Spark | |
|---|---|---|
| Storage | Disk only | In-memory or on Disk |
| Operations | Map/Reduce | Many transformation and actions, including Map and Reduce |
| Execution model | Batch | Batch, interactive, Streaming |
| Languages | Java | Scala, Java, R, and Python |
RDD Resilient Distributed Datasets¶
A real or virtual file consisting of records
Partitioned into partitions
Created through deterministic transformations on: Data in persistent storage Other RDDs
RDDs do not need to be materialized
Users can control two other aspects: - Persistence - Partitioning (e.g. key of the record)
Programmer specifies number of partitions for an RDD (Default value used if unspecified)
- more partitions: more parallelism but also more overhead
Lineage Graph¶
Fault Tolerance Mechanism
RDD has enough information about how it was derived from to compute its partitions from data in stable storage.
If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.
Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.
Spark recomputes transformations¶
lines = sc.textFile("...",4)
comments = lines.filter(isComment)
print(lines.count(),comments.count())
lines = sc.textFile("...",4)
lines.cache()
comments = lines.filter(isComment)
print(lines.count(),comments.count())
Execution is pipelined and parallel. No need to store intermediate results. Lazy execution allows optimization.
RDD has enough information about how it was rderived from to compute its partitions from data in stable storage.
Example: If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.
Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.
RDD Persistence¶
-
Make an RDD persist using persist() or cache()
-
Different storage levels, default is MEMORY_ONLY Allows faster reuse and fault recovery
-
Spark also automatically persists some intermediate data in shuffle operations
-
Spark automatically drops out old data partitions using LRU policy. You can also unpersist() an RDD manually.
RDD Basics¶
lines = sc.textFile("README.md")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.
Where code runs¶
Most Python code runs in driver, except for code passed to transformations. Transformations run at executors, actions run at executors and driver.
Closure¶
- A task’s closure is those variables and methods which must be visible for the executor to perform its computations on the RDD.
- Functions that run on RDDs at executors
- Any global variables used by those executors
- The variables within the closure sent to each executor are copies.
- This closure is serialized and sent to each executor from the driver when an action is invoked.
counter = 0
rdd = sc.parallelize(range(10))
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
print(rdd.collect())
rdd.foreach(increment_counter)
print(f"Counter: {counter}")
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Counter: 0
closure and persistence
# RDD variables are references
A = sc.parallelize(range(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
A = sc.parallelize(range(10))
x = 5
B = A.filter(lambda z: z < x)
# B.cache()
print(B.count())
x = 3
print(B.count())
5
3
A = sc.parallelize(range(10))
x = 5
B = A.filter(lambda z: z < x)
# B.cache()
B.unpersist()
# print(B.take(10))
print(B.collect())
x = 3
#print(B.take(10))
print(B.collect())
# collect() doesn't always re-collect data - bad design!
# Always use take() instead of collect()
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
Accumulators¶
-
Accumulators are variables that are only “added” to through an associative and commutative operation.
-
Created from an initial value v by calling SparkContext.accumulator(v).
-
Tasks running on a cluster can then add to it using the add method or the += operator
-
Only the driver program can read the accumulator’s value, using its value method.
Note:
Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().
Update in transformations may be applied more than once if tasks or job stages are re-executed. See example.
Suggestion: Avoid using accumulators whenever possible. Use reduce() instead.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
a = rdd.foreach(g)
print(f"Accumulator value: {accum.value}")
Accumulator value: 45
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
return x * x
a = rdd.map(g)
print(f"Accumulator value: {accum.value}")
a.cache()
tmp = a.count()
print(f"Accumulator value: {accum.value}")
print(f"Reduce: {rdd.reduce(lambda x, y: x+y)}")
# If we uncomment cache, then the value will become to 90 since
# accumulator will be executed twice
tmp = a.count()
print(f"Accumulator value: {accum.value}")
print(f"Reduce: {rdd.reduce(lambda x, y: x+y)}")
Accumulator value: 0
Accumulator value: 45
Reduce: 45
Accumulator value: 45
Reduce: 45
Broadcast variables¶
-
Broadcast variables allow the programmer to keep a read-only variable cached on each machine (not each task)
-
More efficient than sending closures to tasks
# Using join
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])
print(trans.join(products).take(20))
[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (5, ((162, 'Error'), 'Computer')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV'))]
# Using broadcast
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])
broadcasted_products = sc.broadcast(products)
results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
# results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print(results.take(20))
[(1, 'Apple', (134, 'OK')), (3, 'TV', (34, 'OK')), (5, 'Computer', (162, 'Error')), (1, 'Apple', (135, 'OK')), (2, 'Orange', (53, 'OK')), (1, 'Apple', (45, 'OK'))]
RDD Operations¶
Map¶
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
fruitsReversed.collect()
['elppa',
'ananab',
'nolem yranac',
'parg',
'nomel',
'egnaro',
'elppaenip',
'yrrebwarts']
Filter¶
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()
['apple', 'grap', 'lemon']
FlatMap¶
characters = fruits.flatMap(lambda fruit: list(fruit))
print(characters.collect())
['a', 'p', 'p', 'l', 'e', 'b', 'a', 'n', 'a', 'n', 'a', 'c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n', 'g', 'r', 'a', 'p', 'l', 'e', 'm', 'o', 'n', 'o', 'r', 'a', 'n', 'g', 'e', 'p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e', 's', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y']
Union¶
new_fruits = fruits.union(fruits)
new_fruits.collect()
['apple',
'banana',
'canary melon',
'grap',
'lemon',
'orange',
'pineapple',
'strawberry',
'apple',
'banana',
'canary melon',
'grap',
'lemon',
'orange',
'pineapple',
'strawberry']
Intersection¶
new_fruits = fruits.intersection(fruits)
new_fruits.collect()
['orange',
'pineapple',
'canary melon',
'lemon',
'banana',
'apple',
'grap',
'strawberry']
RDD Actions¶
collect¶
take¶
first3Fruits = fruits.take(3)
print(first3Fruits)
['apple', 'banana', 'canary melon']
count¶
fruits.count()
8
reduce¶
fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
{' ',
'a',
'b',
'c',
'e',
'g',
'i',
'l',
'm',
'n',
'o',
'p',
'r',
's',
't',
'w',
'y'}
PageRank¶
import re
from operator import add
def computeContribs(urls, rank):
# Calculates URL contributions to the rank of other URLs.
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)
def parseNeighbors(urls):
# Parses a urls pair string into urls pair."""
parts = urls.split(' ')
return parts[0], parts[1]
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("/content/drive/MyDrive/courses/HKUST/MSBD5003/homeworks/hw2/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)
numOfIterations = 10
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)) \
.groupByKey()
# Loads all URLs with other URL(s) link to from input file
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)
print('ranks', ranks.collect())
print('links', links.collect())
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks) \
.flatMap(lambda url_urls_rank:
computeContribs(url_urls_rank[1][0],
url_urls_rank[1][1]))
# After the join, each element in the RDD is of the form
# (url, (list of neighbor urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
# ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
ranks = contribs.reduceByKey(add).map(lambda t: (t[0], t[1] * 0.85 + 0.15))
print(ranks.top(5, lambda x: x[1]))
ranks [('1', 1.0), ('4', 1.0), ('2', 1.0), ('3', 1.0)]
links [('1', <pyspark.resultiterable.ResultIterable object at 0x7f6ea9564710>), ('4', <pyspark.resultiterable.ResultIterable object at 0x7f6ea95647f0>), ('2', <pyspark.resultiterable.ResultIterable object at 0x7f6ea95647b8>), ('3', <pyspark.resultiterable.ResultIterable object at 0x7f6ea9564828>)]
[('1', 1.2981882732854677), ('4', 0.9999999999999998), ('3', 0.9999999999999998), ('2', 0.7018117267145316)]
K-means clustering¶
import numpy as np
def parseVector(line):
return np.array([float(x) for x in line.split()])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/kmeans_data.txt', 5)
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)
# lines is an RDD of strings
K = 3
convergeDist = 0.01
# terminate algorithm when the total distance from old center to new centers is less than this value
data = lines.map(parseVector).cache() # data is an RDD of arrays
kCenters = data.takeSample(False, K, 1) # intial centers as a list of arrays
tempDist = 1.0 # total distance from old centers to new centers
while tempDist > convergeDist:
closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
# for each point in data, find its closest center
# closest is an RDD of tuples (index of closest center, (point, 1))
pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
# pointStats is an RDD of tuples (index of center,
# (array of sums of coordinates, total number of points assigned))
newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
# compute the new centers
tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
# compute the total disctance from old centers to new centers
for (i, p) in newCenters:
kCenters[i] = p
print("Final centers: ", kCenters)
Final centers: [array([0.1 , 0.33333333, 0.23333333]), array([9.05, 3.05, 4.65]), array([9.2, 2.2, 9.2])]
Linear-time selection¶
Problem:¶
— Input: an array A of n numbers (unordered), and k
— Output: the k-th smallest number (counting from 0)
Algorithm¶
- \(x=A[0]\) partition A into \(A[0..mid-1] < A[mid] = x < A[mid+1..n-1]\)
-
if \(mid =k\) then return \(x\)
-
if \(k<mid\) then \(A= A[O..mid-1]\) if k > mid then \(A = A[mid+1,n-1], k= k— mid-1\)
- gotostep 1
# Linear-time selection
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4
while True:
x = A.first()
A1 = A.filter(lambda z: z < x)
A2 = A.filter(lambda z: z > x)
A1.cache()
A2.cache()
mid = A1.count()
if mid == k:
print(x)
break
if k < mid:
A = A1
else:
A = A2
k = k - mid - 1
43
Key-value Pairs¶
While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation. For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:
lines = sc.textFile("README.md")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as a list of objects.
PMI¶
PMI (pointwise mutual information) is a measure of association used in information theory and statistics.
Given a list of pairs (x, y)
where - \(p(x)\): probability of x - \(p(y)\): probability of y -\(p(x,y)\): joint probability
Example: p(x=0) = 0.8, p(x=1)=0.2, p(y=0)=0.25, p(y=1)=0.75
- pmi(x=0;y=0) = −1
- pmi(x=0;y=1) = 0.222392
- pmi(x=1;y=0) = 1.584963
- pmi(x=1;y=1) = -1.584963
lines = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/adj_noun_pairs.txt', 4)
# Converting lines into word pairs.
# Data is dirty: some lines have more than 2 words, so filter them out.
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()
pairs.take(5)
[('early', 'radical'),
('french', 'revolution'),
('pejorative', 'way'),
('violent', 'means'),
('positive', 'label')]
N = pairs.count()
N
3162674
# Compute the frequency of each pair.
# Ignore pairs that not frequent enough
pair_freqs = pairs.map(lambda p: (p,1)).reduceByKey(lambda f1, f2: f1 + f2) \
.filter(lambda pf: pf[1] >= 100)
pair_freqs.take(5)
[(('human', 'society'), 154),
(('16th', 'century'), 950),
(('first', 'man'), 166),
(('civil', 'war'), 2236),
(('social', 'class'), 155)]
# Computing the frequencies of the adjectives and the nouns
a_freqs = pairs.map(lambda p: (p[0],1)).reduceByKey(lambda x,y: x+y)
n_freqs = pairs.map(lambda p: (p[1],1)).reduceByKey(lambda x,y: x+y)
# Broadcasting the adjective and noun frequencies.
#a_dict = a_freqs.collectAsMap()
#a_dict = sc.parallelize(a_dict).map(lambda x: x)
n_dict = sc.broadcast(n_freqs.collectAsMap())
a_dict = sc.broadcast(a_freqs.collectAsMap())
a_dict.value['violent']
1191
from math import *
# Computing the PMI for a pair.
def pmi_score(pair_freq):
w1, w2 = pair_freq[0]
f = pair_freq[1]
pmi = log(float(f)*N/(a_dict.value[w1]*n_dict.value[w2]), 2)
return pmi, (w1, w2)
# Computing the PMI for all pairs.
scored_pairs = pair_freqs.map(pmi_score)
# Printing the most strongly associated pairs.
scored_pairs.top(10)
[(14.41018838546462, ('magna', 'carta')),
(13.071365888694997, ('polish-lithuanian', 'Commonwealth')),
(12.990597616733414, ('nitrous', 'oxide')),
(12.64972604311254, ('latter-day', 'Saints')),
(12.50658937509916, ('stainless', 'steel')),
(12.482331020687814, ('pave', 'runway')),
(12.19140721768055, ('corporal', 'punishment')),
(12.183248694293388, ('capital', 'punishment')),
(12.147015483562537, ('rush', 'yard')),
(12.109945794428935, ('globular', 'cluster'))]