Internal
!pip install pyspark
Code at internal.ipynb
Data Partitioning¶
-
RDDs are stored in partitions. When performing computations on RDDs, these partitions can be operated on in parallel.
-
You get better parallelism when the partitions are balanced.
-
When RDDs are first created, the partitions are balanced. However, partitions may get out of balance after certain transformations.
Hash Partitioning¶
We can view the contents of each partition: - e.g., prime.glom().collect()[1][0:4] - We see that it hashed all numbers x such that x mod 8 = 1 to partition #1
In general, hash partitioning allocates tuple (k, v) to partition p where - p = k.hashCode() % numPartitions
Usually works well but be aware of bad inputs!
Shuffle¶
Spark uses shuffles to implement wide dependencies - Examples: reduceByKey, repartition, coalesce, join (on RDDs not partitioned using the same partitioner)
Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to group/aggregate it.
Internally, Spark builds a hash table within each task to perform the grouping.
If the hash table is too large, Spark will spill these tables to disk, incurring the additional overhead of disk I/O
RDDs resulting from shuffles are automatically cached.
Range partitioning¶
For data types that have or ordering defined - Examples: Int, Char, String, … - Internally, Spark samples the data so as to produce more balanced partitions. - Used by default after sorting Example: - An RDD with keys [8, 96, 240, 400, 401, 800], - Number of partitions: 4 - In this case, hash partitioning distributes the keys as follows among the partitions: - partition 0: [8, 96, 240, 400, 800] - partition 1: [401] - partition 2: [] - partition 3: []
Range partitioning would improve the partitioning significantly
Partitioner inheritance¶
Operations on Pair RDDs that hold to (and propagate) a partitioner:
- mapValues (if parent has a partitioner)
- flatMapValues (if parent has a partitioner)
- filter (if parent has a partitioner)
Partitioning Data Using Transformations¶
Map will lose partitioner because it may change the key so that mapValues will enable us to still do map transformations without changing the keys.
Job Scheduling¶
-
Job: a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action.
-
Multiple parallel jobs can run simultaneously if they are submitted from separate threads.
-
Spark’s scheduler runs jobs in FIFO fashion (default)
-
Each job consists of multiple stages
-
A stage can only start after all its parent stages have completed
-
Each stage has many tasks Spark assigns tasks to machines based on data locality
-
Different levels of locality are used
- PROCESS_LOCAL data is in the same JVM as the running code
- NODE_LOCAL data is on the same node
- RACK_LOCAL data is on the same rack of servers.
- ANY
-
Spark will wait a bit for a free executor before switching to the next locality level.