Skip to content

Spark Streaming

  • Supports real time processing of streaming data

  • Using the Spark core for parallelization & fault tolerance

  • Combine streaming with batch and interactive queries.

DStream: a continuous stream of data.

  • DStreams can be created

    • from input data streams
    • by applying high-level operations on other DStreams.
  • Any operation applied on a DStream translates to operations on the underlying RDDs

Steps of a spark streaming application

  1. Define the input sources by creating input DStreams.
  2. Define the streaming computations by applying transformation and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  5. The processing can be manually stopped using streamingContext.stop().

Keep in mind

  • Once a context has been started, no new streaming computations can be set up or added to it.

  • Once a context has been stopped, it cannot be restarted.

  • Only one StreamingContext can be active in a JVM at the same time.

  • stop() on StreamingContext also stops the SparkContext.

  • To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.

  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

  • Interrupting the streaming program in the shell / jupyter cannot properly clean up the streaming tasks (even after running ssc.stop). You will have to shut down the kernel and re-open the notebook.

  • Input DStreams are DStreams representing the stream of input data received from streaming sources.

  • Every input DStream is associated with a Receiver

    • Note: Each Receiver requires a core, so you need to allocate more cores than the number of input Dstreams.
  • Two categories of built-in streaming sources:

  • Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, rdd queues.

  • Advanced sources: Other streaming systems like Kafka, Flume, Kinesis, etc.

Transformations on DStreams

DStreams support many of the transformations available on normal Spark RDD’s.

transform(): Invoke any RDD operation on a Dstream

UpdateStateByKey

Allows you to maintain arbitrary state while continuously updating it with new information - Steps: 1. Define the initial state: An RDD of (key, value) pairs 2. Define the state update function - Specify a function on how to update the value using the previous value and the new values from an input stream.

  • In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not
    • If the update function returns None then the key-value pair will be eliminated.

Reservoir Sampling

from random import randint

def reservoir_sample(inputs):
    k = 10
    r = []
    for i in range(k):
        r.append(inputs[i])

    for i in range(k + 1, len(inputs)):
        j = randint(0, i)
        if j < k:
            r[j] = inputs[i]
    return r
reservoir_sample(range(100))
[0, 25, 84, 46, 33, 82, 15, 48, 69, 38]

Misra-Gries Algorithm

class MisraGries:
    def __init__(self, count):
        self.count = count
        self.entries = {}

    def add(self, entry):
        if entry in self.entries:
            self.entries[entry] = self.entries[entry] + 1
        else:
            self.entries[entry] = 1
            if len(self.entries) > self.count:
                for e in self.entries:
                    self.entries[e]  = self.entries[e] - 1
                new_entries = {}
                for k, v in self.entries.items():
                    if v > 0:
                        new_entries[k] = v
                self.entries = new_entries
        return self.entries
agl = MisraGries(3)
entries = "ABCEAAADFEFFFBBCCDFFF"

for c in entries:
    print(agl.add(c))
{'A': 1}
{'A': 1, 'B': 1}
{'A': 1, 'B': 1, 'C': 1}
{}
{'A': 1}
{'A': 2}
{'A': 3}
{'A': 3, 'D': 1}
{'A': 3, 'D': 1, 'F': 1}
{'A': 2}
{'A': 2, 'F': 1}
{'A': 2, 'F': 2}
{'A': 2, 'F': 3}
{'A': 2, 'F': 3, 'B': 1}
{'A': 2, 'F': 3, 'B': 2}
{'A': 1, 'F': 2, 'B': 1}
{'F': 1}
{'F': 1, 'D': 1}
{'F': 2, 'D': 1}
{'F': 3, 'D': 1}
{'F': 4, 'D': 1}

Structured Streaming

Structured streaming = Spark Streaming on top of Spark SQL

Flink

  • DataStream: Central data structure in Flink: an unbounded continuous stream of the data.

  • DataSet: A special case of Stream processing where we have a finite data source. The batch application is also executed on the streaming runtime.

Stateful stream processing

Local: Flink state is kept local to the machine that processes it

Durable: Flink state is fault-tolerant, i.e., it is automatically checkpointed at regular intervals, and is restored upon failure

Vertically scalable: Flink state can be kept in embedded RocksDB instances that scale by adding more local disk Horizontally scalable: Flink state is redistributed as your cluster grows and shrinks

Queryable: Flink state can be queried externally

截屏2020-12-14 下午11.51.28.png