Spark Streaming¶
-
Supports real time processing of streaming data
-
Using the Spark core for parallelization & fault tolerance
-
Combine streaming with batch and interactive queries.
DStream: a continuous stream of data.
-
DStreams can be created
- from input data streams
- by applying high-level operations on other DStreams.
-
Any operation applied on a DStream translates to operations on the underlying RDDs
Steps of a spark streaming application¶
- Define the input sources by creating input DStreams.
- Define the streaming computations by applying transformation and output operations to DStreams.
- Start receiving data and processing it using streamingContext.start().
- Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
- The processing can be manually stopped using streamingContext.stop().
Keep in mind¶
-
Once a context has been started, no new streaming computations can be set up or added to it.
-
Once a context has been stopped, it cannot be restarted.
-
Only one StreamingContext can be active in a JVM at the same time.
-
stop() on StreamingContext also stops the SparkContext.
-
To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
-
A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
-
Interrupting the streaming program in the shell / jupyter cannot properly clean up the streaming tasks (even after running ssc.stop). You will have to shut down the kernel and re-open the notebook.
-
Input DStreams are DStreams representing the stream of input data received from streaming sources.
-
Every input DStream is associated with a Receiver
- Note: Each Receiver requires a core, so you need to allocate more cores than the number of input Dstreams.
-
Two categories of built-in streaming sources:
-
Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, rdd queues.
-
Advanced sources: Other streaming systems like Kafka, Flume, Kinesis, etc.
Transformations on DStreams¶
DStreams support many of the transformations available on normal Spark RDD’s.
transform(): Invoke any RDD operation on a Dstream
UpdateStateByKey¶
Allows you to maintain arbitrary state while continuously updating it with new information - Steps: 1. Define the initial state: An RDD of (key, value) pairs 2. Define the state update function - Specify a function on how to update the value using the previous value and the new values from an input stream.
- In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not
- If the update function returns None then the key-value pair will be eliminated.
Reservoir Sampling¶
from random import randint
def reservoir_sample(inputs):
k = 10
r = []
for i in range(k):
r.append(inputs[i])
for i in range(k + 1, len(inputs)):
j = randint(0, i)
if j < k:
r[j] = inputs[i]
return r
reservoir_sample(range(100))
[0, 25, 84, 46, 33, 82, 15, 48, 69, 38]
Misra-Gries Algorithm¶
class MisraGries:
def __init__(self, count):
self.count = count
self.entries = {}
def add(self, entry):
if entry in self.entries:
self.entries[entry] = self.entries[entry] + 1
else:
self.entries[entry] = 1
if len(self.entries) > self.count:
for e in self.entries:
self.entries[e] = self.entries[e] - 1
new_entries = {}
for k, v in self.entries.items():
if v > 0:
new_entries[k] = v
self.entries = new_entries
return self.entries
agl = MisraGries(3)
entries = "ABCEAAADFEFFFBBCCDFFF"
for c in entries:
print(agl.add(c))
{'A': 1}
{'A': 1, 'B': 1}
{'A': 1, 'B': 1, 'C': 1}
{}
{'A': 1}
{'A': 2}
{'A': 3}
{'A': 3, 'D': 1}
{'A': 3, 'D': 1, 'F': 1}
{'A': 2}
{'A': 2, 'F': 1}
{'A': 2, 'F': 2}
{'A': 2, 'F': 3}
{'A': 2, 'F': 3, 'B': 1}
{'A': 2, 'F': 3, 'B': 2}
{'A': 1, 'F': 2, 'B': 1}
{'F': 1}
{'F': 1, 'D': 1}
{'F': 2, 'D': 1}
{'F': 3, 'D': 1}
{'F': 4, 'D': 1}
Structured Streaming¶
Structured streaming = Spark Streaming on top of Spark SQL
Flink¶
-
DataStream: Central data structure in Flink: an unbounded continuous stream of the data.
-
DataSet: A special case of Stream processing where we have a finite data source. The batch application is also executed on the streaming runtime.
Stateful stream processing¶
Local: Flink state is kept local to the machine that processes it
Durable: Flink state is fault-tolerant, i.e., it is automatically checkpointed at regular intervals, and is restored upon failure
Vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk Horizontally scalable: Flink state is redistributed as your cluster grows and shrinks
Queryable: Flink state can be queried externally
Flink VS Spark¶