Skip to content

Spark basics and RDD

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 45.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=4cb1ed4d5de7d731528bf244d817e4b9dd399567b667d0ffe949093308211f45
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
fruits = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/fruits.txt', 4)

Why is Map/Reduce bad?

Programming model too restricted

Iterative jobs involve a lot of disk I/O

Why is spark

Fast and Expressive Cluster Computing Engine Compatible with Apache Hadoop

Efficient

  • General execution graphs
  • In-memory storage

Usable - Rich APIs in Java, scala, python - Interactive shell

Hadoop Spark
Storage Disk only In-memory or on Disk
Operations Map/Reduce Many transformation and actions, including Map and Reduce
Execution model Batch Batch, interactive, Streaming
Languages Java Scala, Java, R, and Python

RDD Resilient Distributed Datasets

A real or virtual file consisting of records

Partitioned into partitions

Created through deterministic transformations on: Data in persistent storage Other RDDs

RDDs do not need to be materialized

Users can control two other aspects: - Persistence - Partitioning (e.g. key of the record)

Programmer specifies number of partitions for an RDD (Default value used if unspecified)

  • more partitions: more parallelism but also more overhead

Lineage Graph

Fault Tolerance Mechanism

RDD has enough information about how it was derived from to compute its partitions from data in stable storage.


If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.

Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.

Spark recomputes transformations

lines = sc.textFile("...",4)
comments = lines.filter(isComment)
print(lines.count(),comments.count())

截屏2020-12-08 下午2.51.14.png

lines = sc.textFile("...",4)
lines.cache()
comments = lines.filter(isComment)
print(lines.count(),comments.count())

截屏2020-12-08 下午2.51.20.png

Execution is pipelined and parallel. No need to store intermediate results. Lazy execution allows optimization.

RDD has enough information about how it was rderived from to compute its partitions from data in stable storage.


Example: If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.

Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.

RDD Persistence

  • Make an RDD persist using persist() or cache()

  • Different storage levels, default is MEMORY_ONLY Allows faster reuse and fault recovery

  • Spark also automatically persists some intermediate data in shuffle operations

  • Spark automatically drops out old data partitions using LRU policy. You can also unpersist() an RDD manually.

RDD Basics

lines = sc.textFile("README.md")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

Where code runs

Most Python code runs in driver, except for code passed to transformations. Transformations run at executors, actions run at executors and driver.

Closure

  • A task’s closure is those variables and methods which must be visible for the executor to perform its computations on the RDD.
    • Functions that run on RDDs at executors
    • Any global variables used by those executors
  • The variables within the closure sent to each executor are copies.
  • This closure is serialized and sent to each executor from the driver when an action is invoked.
counter = 0
rdd = sc.parallelize(range(10))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x

print(rdd.collect())
rdd.foreach(increment_counter)
print(f"Counter: {counter}")
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Counter: 0

closure and persistence

# RDD variables are references
A = sc.parallelize(range(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
A = sc.parallelize(range(10))

x = 5
B = A.filter(lambda z: z < x)
# B.cache()
print(B.count())
x = 3
print(B.count())
5
3
A = sc.parallelize(range(10))

x = 5
B = A.filter(lambda z: z < x)
# B.cache()
B.unpersist()
# print(B.take(10))
print(B.collect())

x = 3
#print(B.take(10))
print(B.collect())
# collect() doesn't always re-collect data - bad design!
# Always use take() instead of collect()
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]

Accumulators

  • Accumulators are variables that are only “added” to through an associative and commutative operation.

  • Created from an initial value v by calling SparkContext.accumulator(v).

  • Tasks running on a cluster can then add to it using the add method or the += operator

  • Only the driver program can read the accumulator’s value, using its value method.


Note:

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().

Update in transformations may be applied more than once if tasks or job stages are re-executed. See example.

Suggestion: Avoid using accumulators whenever possible. Use reduce() instead.

rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x

a = rdd.foreach(g)

print(f"Accumulator value: {accum.value}")
Accumulator value: 45
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g)
print(f"Accumulator value: {accum.value}")
a.cache()
tmp = a.count()
print(f"Accumulator value: {accum.value}")
print(f"Reduce: {rdd.reduce(lambda x, y: x+y)}")
# If we uncomment cache, then the value will become to 90 since
# accumulator will be executed twice
tmp = a.count()
print(f"Accumulator value: {accum.value}")
print(f"Reduce: {rdd.reduce(lambda x, y: x+y)}")
Accumulator value: 0
Accumulator value: 45
Reduce: 45
Accumulator value: 45
Reduce: 45

Broadcast variables

  • Broadcast variables allow the programmer to keep a read-only variable cached on each machine (not each task)

  • More efficient than sending closures to tasks

# Using join
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print(trans.join(products).take(20))
[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (5, ((162, 'Error'), 'Computer')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV'))]
# Using broadcast
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

broadcasted_products = sc.broadcast(products)

results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
#  results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print(results.take(20))
[(1, 'Apple', (134, 'OK')), (3, 'TV', (34, 'OK')), (5, 'Computer', (162, 'Error')), (1, 'Apple', (135, 'OK')), (2, 'Orange', (53, 'OK')), (1, 'Apple', (45, 'OK'))]

RDD Operations

Map

fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
fruitsReversed.collect()
['elppa',
 'ananab',
 'nolem yranac',
 'parg',
 'nomel',
 'egnaro',
 'elppaenip',
 'yrrebwarts']

Filter

shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()
['apple', 'grap', 'lemon']

FlatMap

characters = fruits.flatMap(lambda fruit: list(fruit))
print(characters.collect())
['a', 'p', 'p', 'l', 'e', 'b', 'a', 'n', 'a', 'n', 'a', 'c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n', 'g', 'r', 'a', 'p', 'l', 'e', 'm', 'o', 'n', 'o', 'r', 'a', 'n', 'g', 'e', 'p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e', 's', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y']

Union

new_fruits = fruits.union(fruits)
new_fruits.collect()
['apple',
 'banana',
 'canary melon',
 'grap',
 'lemon',
 'orange',
 'pineapple',
 'strawberry',
 'apple',
 'banana',
 'canary melon',
 'grap',
 'lemon',
 'orange',
 'pineapple',
 'strawberry']

Intersection

new_fruits = fruits.intersection(fruits)
new_fruits.collect()
['orange',
 'pineapple',
 'canary melon',
 'lemon',
 'banana',
 'apple',
 'grap',
 'strawberry']

RDD Actions

collect

take

first3Fruits = fruits.take(3)
print(first3Fruits)
['apple', 'banana', 'canary melon']

count

fruits.count()
8

reduce

fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
{' ',
 'a',
 'b',
 'c',
 'e',
 'g',
 'i',
 'l',
 'm',
 'n',
 'o',
 'p',
 'r',
 's',
 't',
 'w',
 'y'}

PageRank

import re
from operator import add

def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("/content/drive/MyDrive/courses/HKUST/MSBD5003/homeworks/hw2/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey()

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)
print('ranks', ranks.collect())
print('links', links.collect())
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0],
                                             url_urls_rank[1][1]))
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))

    # Re-calculates URL ranks based on neighbor contributions.
    # ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    ranks = contribs.reduceByKey(add).map(lambda t: (t[0], t[1] * 0.85 + 0.15))

print(ranks.top(5, lambda x: x[1]))
ranks [('1', 1.0), ('4', 1.0), ('2', 1.0), ('3', 1.0)]
links [('1', <pyspark.resultiterable.ResultIterable object at 0x7f6ea9564710>), ('4', <pyspark.resultiterable.ResultIterable object at 0x7f6ea95647f0>), ('2', <pyspark.resultiterable.ResultIterable object at 0x7f6ea95647b8>), ('3', <pyspark.resultiterable.ResultIterable object at 0x7f6ea9564828>)]
[('1', 1.2981882732854677), ('4', 0.9999999999999998), ('3', 0.9999999999999998), ('2', 0.7018117267145316)]

K-means clustering

import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split()])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01  
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))

    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,
    # (array of sums of coordinates, total number of points assigned))

    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers

    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers

    for (i, p) in newCenters:
        kCenters[i] = p

print("Final centers: ", kCenters)
Final centers:  [array([0.1       , 0.33333333, 0.23333333]), array([9.05, 3.05, 4.65]), array([9.2, 2.2, 9.2])]

Linear-time selection

Problem:

— Input: an array A of n numbers (unordered), and k

— Output: the k-th smallest number (counting from 0)

Algorithm

  1. \(x=A[0]\) partition A into \(A[0..mid-1] < A[mid] = x < A[mid+1..n-1]\)
  2. if \(mid =k\) then return \(x\)

  3. if \(k<mid\) then \(A= A[O..mid-1]\) if k > mid then \(A = A[mid+1,n-1], k= k— mid-1\)

  4. gotostep 1
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4

while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x)
    A2 = A.filter(lambda z: z > x)
    A1.cache()
    A2.cache()
    mid = A1.count()
    if mid == k:
        print(x)
        break

    if k < mid:
        A = A1
    else:
        A = A2
        k = k - mid - 1
43

Key-value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation. For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:

lines = sc.textFile("README.md")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as a list of objects.

PMI

PMI (pointwise mutual information) is a measure of association used in information theory and statistics.

Given a list of pairs (x, y)

\[pmi(x, y) = log\frac{p(x,y)}{p(x)p(y}\]

where - \(p(x)\): probability of x - \(p(y)\): probability of y -\(p(x,y)\): joint probability

Example: p(x=0) = 0.8, p(x=1)=0.2, p(y=0)=0.25, p(y=1)=0.75

  • pmi(x=0;y=0) = −1
  • pmi(x=0;y=1) = 0.222392
  • pmi(x=1;y=0) = 1.584963
  • pmi(x=1;y=1) = -1.584963
lines = sc.textFile('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/adj_noun_pairs.txt', 4)
# Converting lines into word pairs. 
# Data is dirty: some lines have more than 2 words, so filter them out.
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()
pairs.take(5)
[('early', 'radical'),
 ('french', 'revolution'),
 ('pejorative', 'way'),
 ('violent', 'means'),
 ('positive', 'label')]
N = pairs.count()
N
3162674
# Compute the frequency of each pair.
# Ignore pairs that not frequent enough
pair_freqs = pairs.map(lambda p: (p,1)).reduceByKey(lambda f1, f2: f1 + f2) \
                  .filter(lambda pf: pf[1] >= 100)
pair_freqs.take(5)
[(('human', 'society'), 154),
 (('16th', 'century'), 950),
 (('first', 'man'), 166),
 (('civil', 'war'), 2236),
 (('social', 'class'), 155)]
# Computing the frequencies of the adjectives and the nouns
a_freqs = pairs.map(lambda p: (p[0],1)).reduceByKey(lambda x,y: x+y)
n_freqs = pairs.map(lambda p: (p[1],1)).reduceByKey(lambda x,y: x+y)
# Broadcasting the adjective and noun frequencies. 
#a_dict = a_freqs.collectAsMap()
#a_dict = sc.parallelize(a_dict).map(lambda x: x)
n_dict = sc.broadcast(n_freqs.collectAsMap())
a_dict = sc.broadcast(a_freqs.collectAsMap())
a_dict.value['violent']
1191
from math import *

# Computing the PMI for a pair.
def pmi_score(pair_freq):
    w1, w2 = pair_freq[0]
    f = pair_freq[1]
    pmi = log(float(f)*N/(a_dict.value[w1]*n_dict.value[w2]), 2)
    return pmi, (w1, w2)
# Computing the PMI for all pairs.
scored_pairs = pair_freqs.map(pmi_score)
# Printing the most strongly associated pairs. 
scored_pairs.top(10)
[(14.41018838546462, ('magna', 'carta')),
 (13.071365888694997, ('polish-lithuanian', 'Commonwealth')),
 (12.990597616733414, ('nitrous', 'oxide')),
 (12.64972604311254, ('latter-day', 'Saints')),
 (12.50658937509916, ('stainless', 'steel')),
 (12.482331020687814, ('pave', 'runway')),
 (12.19140721768055, ('corporal', 'punishment')),
 (12.183248694293388, ('capital', 'punishment')),
 (12.147015483562537, ('rush', 'yard')),
 (12.109945794428935, ('globular', 'cluster'))]