0% found this document useful (0 votes)
18 views41 pages

Spark Performance Tuning Techniques

Uploaded by

riponmahmud8088
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
18 views41 pages

Spark Performance Tuning Techniques

Uploaded by

riponmahmud8088
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd

Spark-Performance Tuning, Stream Processing Fundamentals, Event-Time and State full

Processing - Event Time, State full Processing, Windows on Event Time- Tumbling
Windows, Handling Late Data with Watermarks, Dropping Duplicates in a Stream,
Structured Streaming Basics- CoreConcepts, StructuredStreaming in Action,
TransformationsonStreams, Input and Output.

Spark-Performance Tuning

Spark Performance tuning is a process to improve the performance of the Spark and PySpark
applications by adjusting and optimizing system resources (CPU cores and memory), tuning
some configurations, and following some framework guidelines and best practices.

Spark application performance can be improved in several ways.


Spark applications which ideally improves the performance of the application, most of these
best practices would be the same for both Spark with Scala or PySpark (Python).
Spark Performance Tuning – Best Guidelines & Practices

Spark performance tuning and optimization is a bigger topic which consists of several
techniques, and configurations (resources memory & cores),
 Use DataFrame/Dataset over RDD
 Use coalesce() over repartition()
 Use mapPartitions() over map()
 Use Serialized data format’s
 Avoid UDF’s (User Defined Functions)
 Caching data in memory
 Reduce expensive Shuffle operations
 Disable DEBUG & INFO Logging

1. Use DataFrame/Dataset over RDD

For Spark jobs, prefer using Dataset/DataFrame over RDD as Dataset and DataFrame’s
includes several optimization modules to improve the performance of the Spark workloads. In
PySpark use, DataFrame over RDD as Dataset’s are not supported in PySpark applications.
Spark RDD is a building block of Spark programming, even when we use DataFrame/Dataset,
Spark internally uses RDD to execute operations/queries but the efficient and optimized way
by analyzing your query and creating the execution plan
Why RDD is slow?

Using RDD directly leads to performance issues as Spark doesn’t know how to apply the
optimization techniques and RDD serialize and de-serialize the data when it distributes across
a cluster (repartition & shuffling).
Serialization and de-serialization are very expensive operations for Spark applications or any
distributed systems, most of our time is spent only on serialization of data rather than
executing the operations hence try to avoid using RDD.
Is DataFrame Faster

Since Spark DataFrame maintains the structure of the data and column types (like an RDMS
table) it can handle the data better by storing and managing more efficiently.
The DataFrame API does two things that help to do this. First, using off-heap storage for data
in binary format. Second, generating encoder code on the fly to work with this binary format
for your specific objects.
Since Spark/PySpark DataFrame internally stores data in binary there is no need of
Serialization and deserialization data when it distributes across a cluster hence you would see
a performance improvement.
Spark Dataset/DataFrame includes Project Tungsten which optimizes Spark jobs for Memory
and CPU efficiency. Tungsten is a Spark SQL component that provides increased
performance by rewriting Spark operations in bytecode, at runtime. Tungsten performance by
focusing on jobs close to bare metal CPU and memory efficiency.
Since DataFrame is a column format that contains additional metadata, hence Spark can
perform certain optimizations on a query. Before your query is run, a logical plan is created
using Catalyst Optimizer and then it’s executed using the Tungsten execution engine.
What is Catalyst?

Catalyst Optimizer is an integrated query optimizer and execution scheduler for Spark
Datasets/DataFrame. Catalyst Optimizer is the place where Spark tends to improve the speed
of your code execution by logically improving it.
Catalyst Optimizer can perform refactoring complex queries and decides the order of your
query execution by creating a rule-based and code-based optimization.
Additionally, if you want type safety at compile time prefer using Dataset. For example, if
you refer to a field that doesn’t exist in your code, Dataset generates compile-time error
whereas DataFrame compiles fine but returns an error during run-time.
2. Use coalesce() over repartition()

When you want to reduce the number of partitions prefer using coalesce() as it is an optimized
or improved version of repartition() where the movement of the data across the partitions
is lower using coalesce which ideally performs better when you dealing with bigger datasets.
Note: Use repartition() when you wanted to increase the number of partitions.
Example repartition()

val rdd1 = [Link](Range(0,25), 6)


println("parallelize : "+[Link])
val rdd2 = [Link](4)
println("Repartition size : "+[Link])
[Link]("/tmp/re-partition")
This yields output Repartition size : 4 and the repartition re-distributes the data(as shown
below) from all partitions which is full shuffle leading to very expensive operation when
dealing with billions and trillions of data. By tuning the partition size to optimal, you
can improve the performance of the Spark application

Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18
Example coalesce()

val rdd3 = [Link](4)


println("Repartition size : "+[Link])
[Link]("/tmp/coalesce")
If you compared the below output with section 1, you will notice partition 3 has been moved
to 2 and Partition 6 has moved to 5, resulting data movement from just 2 partitions.

Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19
3. Use mapPartitions() over map()

Spark map() and mapPartitions() transformation applies the function on each


element/record/row of the DataFrame/Dataset and returns the new
DataFrame/Dataset. mapPartitions() over map() prefovides performance improvement when
you have havy initializations like initializing classes, database connections e.t.c
Spark mapPartitions() provides a facility to do heavy initializations (for example Database
connection) once for each partition instead of doing it on every DataFrame row. This helps the
performance of the Spark jobs when you dealing with heavy-weighted initialization on larger
datasets.
Example map()

import [Link]._
val df3 = [Link](row=>{
val util = new Util() // Initialization happends for every record
val fullName = [Link]([Link](0),[Link](1),[Link](2))
(fullName, [Link](3),[Link](5))
})
val df3Map = [Link]("fullName","id","salary")
Example mapPartitions()

val df4 = [Link](iterator => {


val util = new Util()
val res = [Link](row=>{
val fullName = [Link]([Link](0),[Link](1),[Link](2))
(fullName, [Link](3),[Link](5))
})
res
})
val df4part = [Link]("fullName","id","salary")
Note: One key point to remember is these both transformations returns the Dataset[U] but not
the DataFrame (In Spark 2.0, DataFrame = Dataset[Row]) .
4. Use Serialized data format’s

Most of the Spark jobs run as a pipeline where one Spark job writes data into a File and
another Spark jobs read the data, process it, and writes to another file for another Spark job to
pick up. When you have such use case, prefer writing an intermediate file in Serialized and
optimized formats like Avro, Kryo, Parquet e.t.c, any transformations on these formats
performs better than text, CSV, and JSON.
What is Parquet

Apache Parquet is a columnar file format that provides optimizations to speed up queries and
is a far more efficient file format than CSV or JSON, supported by many data processing
systems.
It is compatible with most of the data processing frameworks in the Hadoop echo systems. It
provides efficient data compression and encoding schemes with enhanced performance to
handle complex data in bulk.
val dF = [Link](“/tmp/output/[Link]”) //Read Parquet file
[Link](“/tmp/output/[Link]”)//Writing parquet file

What is Avro

Apache Avro is an open-source, row-based, data serialization and data exchange framework
for Hadoop projects, originally developed by databricks as an open-source library that
supports reading and writing data in Avro file format. it is mostly used in Apache Spark
especially for Kafka-based data pipelines. When Avro data is stored in a file, its schema is
stored with it, so that files may be processed later by any program.
It has build to serialize and exchange big data between different Hadoop based projects. It
serializes data in a compact binary format and schema is in JSON format that defines the field
names and data types.
val df = [Link]("avro").load("[Link]") // Reading avro files
[Link]("avro").save("[Link]") //Writing Avro file
//Avro Spark SQL
[Link]("CREATE TEMPORARY VIEW PERSON USING avro
OPTIONS (path \"[Link]\")")
[Link]("SELECT * FROM PERSON").show()
5. Avoid UDF’s (User Defined Functions)

Try to avoid Spark/PySpark UDF’s at any cost and use when existing Spark built-in functions
are not available for use. UDF’s are a black box to Spark hence it can’t apply optimization
and you will lose all the optimization Spark does on Dataframe/Dataset. When possible you
should use Spark SQL built-in functions as these functions provide optimization
Before you create any UDF, do your research to check if the similar function you wanted is
already available in Spark SQL Functions. Spark SQL provides several predefined common
functions and many more new functions are added with every release. hence, It is best to
check before you reinventing the wheel.
6. Persisting & Caching data in memory

Spark persisting/caching is one of the best techniques to improve the performance of the
Spark workloads. Spark Cache and Persist are optimization techniques in DataFrame /
Dataset for iterative and interactive Spark applications to improve the performance of Jobs.
Using cache() and persist() methods, Spark provides an optimization mechanism to store the
intermediate computation of a Spark DataFrame so they can be reused in subsequent actions.
When you persist a dataset, each node stores it’s partitioned data in memory and reuses them
in other actions on that dataset. And Spark’s persisted data on nodes are fault-tolerant
meaning if any partition of a Dataset is lost, it will automatically be recomputed using the
original transformations that created it.
[Link](col("State") === "PR").cache()
When caching use in-memory columnar format, By tuning the batchSize property you can
also improve Spark performance.
[Link]("[Link]", true)
[Link]("[Link]",10000)
Spark provides several storage levels to store the cached data , use the once which suits your
cluster.
7. Reduce expensive Shuffle operations

Shuffling is a mechanism Spark uses to redistribute the data across different executors and
even across machines. Spark shuffling triggers when we perform certain transformation
operations like groupByKey(), reducebyKey(), join() on RDD and DataFrame.
Spark Shuffle is an expensive operation since it involves the following
 Disk I/O
 Involves data serialization and deserialization
 Network I/O
We cannot completely avoid shuffle operations in but when possible try to reduce the number
of shuffle operations removed any unused operations.
Spark provides [Link] configurations to control the partitions of the
shuffle, By tuning this property you can improve Spark performance.
[Link]("[Link]",100)
[Link]("[Link]", "100") // older version
8. Disable DEBUG & INFO Logging

This is one of the simple ways to improve the performance of Spark Jobs and can be easily
avoided by following good coding principles. During the development phase of
Spark/PySpark application, we usually write debug/info messages to console
using println() and logging to a file using some logging framework (log4j);
These both methods results I/O operations hence cause performance issues when you run
Spark jobs with greater workloads. Before promoting your jobs to production make sure you
review your code and take care of the following.
Remove or convert all println() statements to log4j info/debug.
[Link]("Debug logging messages")
[Link]("Info logging messages")
Disable DEBUG/INFO by enabling ERROR/WARN/FATAL logging
If you are using [Link] use the following or use appropriate configuration based on
your logging framework and configuration method (XML vs properties vs yaml)
[Link]=warn, stdout
Personally I’ve seen this in my project where our team written 5 log statements in
a map() transformation; When we are processing 2 million records which resulted 10 million
I/O operations and caused my job running for hrs. After disabling DEBUG & INFO logging
witnessed jobs running in few mins.
Note: Spark workloads are increasingly bottlenecked by CPU and memory use rather than I/O
and network, but still avoiding I/O operations are always a good practice.

Spark Performance Tuning refers to the process of adjusting settings to record for memory,
cores, and instances used by the system. This process guarantees that the Spark has a flawless
performance and also prevents bottlenecking of resources in Spark.
What is Data Serialization?

In order, to reduce memory usage you might have to store spark RDDs in serialized form. Data
serialization also determines a good network performance. You will be able to obtain good
results in Spark performance by:
 Terminating those jobs that run long.
 Ensuring that jobs are running on a precise execution engine.
 Using all resources in an efficiently.
 Enhancing the system’s performance time
Spark supports two serialization libraries, as follows:
 Java Serialization
 Kryo Serialization

What is Memory Tuning?


While tuning memory usage, there are three aspects that stand out:
 The entire dataset has to fit in memory, consideration of memory used by your objects is the
must.
 By having an increased high turnover of objects, the overhead of garbage collection becomes a
necessity.
 You’ll have to take into account the cost of accessing those objects.

What is Data Structure Tuning?


One option to reduce memory consumption is by staying away from java features that could
overhead. Here are a few ways to do this:
 In case the RAM size is less than 32 GB, the JVM flag should be set to –xx:+
UseCompressedOops. This operation will build a pointer of four bytes instead of eight.
 Nested structures can be dodged by using several small objects as well as pointers.
 Instead of using strings for keys you could use numeric IDs and enumerated objects

What is Garbage Collection Tuning?


In order to avoid the large “churn” related to the RDDs that have been previously stored by the
program, java will dismiss old objects in order to create space for new ones. However, by using
data structures that feature fewer objects the cost is greatly reduced. One such example would be
the employment an array of Ints instead of a linked list. Alternatively, you could use objects in
the serialized form, so you will only have a single object for each RDD partition.

What is Memory Management?


An efficient memory use is essential to good performance. Spark uses memory mainly for
storage and execution. Storage memory is used to cache data that will be reused later. On the
other hand, execution memory is used for computation in shuffles, sorts, joins, and aggregations.
Memory contention poses three challenges for Apache Spark:
 How to arbitrate memory between execution and storage?
 How to arbitrate memory across tasks running simultaneously?
 How to arbitrate memory across operators running within the same task?
Instead of avoiding statically reserving memory in advance, you could deal with memory
contention when it arises by forcing members to spill.

Stream Processing Fundamentals

Apache Spark Streaming is the previous generation of Apache Spark’s streaming engine. There
are no longer updates to Spark Streaming and it’s a legacy project. There is a newer and easier to
use streaming engine in Apache Spark called Structured Streaming. You should use Spark
Structured Streaming for your streaming applications and pipelines. See Structured Streaming.

What is Spark Streaming?


Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively
supports both batch and streaming workloads. Spark Streaming is an extension of the core Spark
API that allows data engineers and data scientists to process real-time data from various sources
including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be
pushed out to file systems, databases, and live dashboards. Its key abstraction is a Discretized
Stream or, in short, a DStream, which represents a stream of data divided into small batches.
DStreams are built on RDDs, Spark’s core data abstraction. This allows Spark Streaming to
seamlessly integrate with any other Spark components like MLlib and Spark SQL. Spark
Streaming is different from other systems that either have a processing engine designed only for
streaming, or have similar batch and streaming APIs but compile internally to different engines.
Spark’s single execution engine and unified programming model for batch and streaming lead to
some unique benefits over other traditional streaming systems.

Four Major Aspects of Spark Streaming


 Fast recovery from failures and stragglers
 Better load balancing and resource usage
 Combining of streaming data with static datasets and interactive queries
 Native integration with advanced processing libraries (SQL, machine learning, graph processing)

This unification of disparate data processing capabilities is the key reason behind Spark
Streaming’s rapid adoption. It makes it very easy for developers to use a single framework to
satisfy all their processing needs.

Event-Time and State full Processing

Event-time is the timestamp embedded in data when the event occurred, while processing-time is
when the system processes it; stateful processing in Spark maintains state across batches to
handle operations like aggregations or joins, often using event time to ensure accuracy even with
late-arriving data. Spark's stateful operations use event time to manage and update state for
specific time windows, and fault tolerance is ensured through checkpoints to a reliable location.
Event-Time
 Definition: The timestamp that is part of the data record itself, representing when the event
originally occurred at its source.
 Purpose: It allows for correct processing of events in the order they happened, regardless of
network delays or other processing latencies.
 Example: For a billing event, event-time is the time the transaction occurred, not the time it was
received by the system.
Stateful Processing
 Definition: A processing method where the system keeps a memory of past events to update
intermediate results as new data arrives.
 Purpose: Enables more advanced analytics like running totals, sessionization, deduplication, and
windowed aggregations by maintaining and updating state across batches.
 How it works: Spark maintains a distributed state in the executors' memory, which is updated
incrementally with each batch. Changes are backed by a write-ahead log in a checkpoint location
to ensure fault tolerance.
 Example: Calculating the average temperature over a specific time window requires stateful
processing to keep a running total of temperatures and counts. If a data point arrives late, the
state must be updated for the correct past window.
Combining Event-Time and Stateful Processing
 Stateful processing in Spark uses event-time to correctly handle the state for events that are
based on when they occurred.
 When a late-arriving event is processed, Spark uses its event-time to update the state for the
corresponding past window, ensuring the final results are accurate.
 This is crucial for applications where the exact timing of events is critical, but managing state
indefinitely can be a challenge for very late data, so some state needs to be discarded over time.
Structured Streaming
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark
SQL engine. You can express your streaming computation the same way you would express a
batch computation on static data. The Spark SQL engine will take care of running it
incrementally and continuously and updating the final result as streaming data continues to
arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming
aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on
the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once
fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured
Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing
without the user having to reason about streaming.
Internally, by default, Structured Streaming queries are processed using a micro-batch
processing engine, which processes data streams as a series of small batch jobs thereby
achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance
guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode
called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond
with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your
queries, you will be able to choose the mode based on your application requirements.
In this guide, we are going to walk you through the programming model and the APIs. We are
going to explain the concepts mostly using the default micro-batch processing model, and
then later discuss Continuous Processing model. First, let’s start with a simple example of a
Structured Streaming query - a streaming word count.
Programming Model
The key idea in Structured Streaming is to treat a live data stream as a table that is being
continuously appended. This leads to a new stream processing model that is very similar to a
batch processing model. You will express your streaming computation as standard batch-like
query as on a static table, and Spark runs it as an incremental query on the unbounded input
table. Let’s understand this model in more detail.
Basic Concepts
Consider the input data stream as the “Input Table”. Every data item that is arriving on the
stream is like a new row being appended to the Input Table.

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1
second), new rows get appended to the Input Table, which eventually updates the Result Table.
Whenever the result table gets updated, we would want to write the changed result rows to an
external sink.

The “Output” is defined as what gets written out to the external storage. The output can be
defined in a different mode:
 Complete Mode - The entire updated Result Table will be written to the external storage.
It is up to the storage connector to decide how to handle writing of the entire table.
 Append Mode - Only the new rows appended in the Result Table since the last trigger
will be written to the external storage. This is applicable only on the queries where
existing rows in the Result Table are not expected to change.
 Update Mode - Only the rows that were updated in the Result Table since the last trigger
will be written to the external storage (available since Spark 2.1.1). Note that this is
different from the Complete Mode in that this mode only outputs the rows that have
changed since the last trigger. If the query doesn’t contain aggregations, it will be
equivalent to Append mode.
Note that each mode is applicable on certain types of queries. This is discussed in detail later.
To illustrate the use of this model, let’s understand the model in context of the Quick
Example above. The first lines DataFrame is the input table, and the
final wordCounts DataFrame is the result table. Note that the query on
streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static
DataFrame. However, when this query is started, Spark will continuously check for new data
from the socket connection. If there is new data, Spark will run an “incremental” query that
combines the previous running counts with the new data to compute updated counts, as shown
below.

Note that Structured Streaming does not materialize the entire table. It reads the latest
available data from the streaming data source, processes it incrementally to update the result, and
then discards the source data. It only keeps around the minimal intermediate state data as
required to update the result (e.g. intermediate counts in the earlier example).
This model is significantly different from many other stream processing engines. Many
streaming systems require the user to maintain running aggregations themselves, thus having to
reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-
once). In this model, Spark is responsible for updating the Result Table when there is new data,
thus relieving the users from reasoning about it. As an example, let’s see how this model handles
event-time based processing and late arriving data.
Handling Event-time and Late Data
Event-time is the time embedded in the data itself. For many applications, you may want to
operate on this event-time. For example, if you want to get the number of events generated by
IoT devices every minute, then you probably want to use the time when the data was generated
(that is, event-time in the data), rather than the time Spark receives them. This event-time is very
naturally expressed in this model – each event from the devices is a row in the table, and event-
time is a column value in the row. This allows window-based aggregations (e.g. number of
events every minute) to be just a special type of grouping and aggregation on the event-time
column – each time window is a group and each row can belong to multiple windows/groups.
Therefore, such event-time-window-based aggregation queries can be defined consistently on
both a static dataset (e.g. from collected device events logs) as well as on a data stream, making
the life of the user much easier.
Furthermore, this model naturally handles data that has arrived later than expected based on its
event-time. Since Spark is updating the Result Table, it has full control over updating old
aggregates when there is late data, as well as cleaning up old aggregates to limit the size of
intermediate state data. Since Spark 2.1, we have support for watermarking which allows the
user to specify the threshold of late data, and allows the engine to accordingly clean up old state.
These are explained later in more detail in the Window Operations section.
Fault Tolerance Semantics
Delivering end-to-end exactly-once semantics was one of key goals behind the design of
Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the
sinks and the execution engine to reliably track the exact progress of the processing so that it can
handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed
to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position
in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of
the data being processed in each trigger. The streaming sinks are designed to be idempotent for
handling reprocessing. Together, using replayable sources and idempotent sinks, Structured
Streaming can ensure end-to-end exactly-once semantics under any failure.
API using Datasets and DataFrames
Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as
streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common
entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets
from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If
you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself
with them using the DataFrame/Dataset Programming Guide.
Creating streaming DataFrames and streaming Datasets
Streaming DataFrames can be created through the DataStreamReader interface
(Scala/Java/Python docs) returned by [Link](). In R, with
the [Link]() method. Similar to the read interface for creating static DataFrame, you can
specify the details of the source – data format, schema, options, etc.
Input Sources
There are a few built-in sources.
 File source - Reads files written in a directory as a stream of data. Files will be processed
in the order of file modification time. If latestFirst is set, order will be reversed.
Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the
DataStreamReader interface for a more up-to-date list, and supported options for each file
format. Note that the files must be atomically placed in the given directory, which in
most file systems, can be achieved by file move operations.
 Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions
0.10.0 or higher.
 Socket source (for testing) - Reads UTF8 text data from a socket connection. The
listening server socket is at the driver. Note that this should be used only for testing as
this does not provide end-to-end fault-tolerance guarantees.
 Rate source (for testing) - Generates data at the specified number of rows per second,
each output row contains a timestamp and value. Where timestamp is a Timestamp type
containing the time of message dispatch, and value is of Long type containing the
message count, starting from 0 as the first row. This source is intended for testing and
benchmarking.
 Rate Per Micro-Batch source (for testing) - Generates data at the specified number of
rows per micro-batch, each output row contains a timestamp and value.
Where timestamp is a Timestamp type containing the time of message dispatch,
and value is of Long type containing the message count, starting from 0 as the first row.
Unlike rate data source, this data source provides a consistent set of input rows per micro-
batch regardless of query execution (configuration of trigger, query being lagging, etc.),
say, batch 0 will produce 0~999 and batch 1 will produce 1000~1999, and so on. Same
applies to the generated time. This source is intended for testing and benchmarking.

Defining a Stateful Processor


A stateful processor is the core of the user-defined logic used to operate on the input events. A
stateful processor is defined by extending the StatefulProcessor class and implementing a few
methods.
A typical stateful processor deals with the following constructs:

 Input Records - Input records received by the stream


 State Variables - Zero or more class specific members used to store user state
 Output Records - Output records produced by the processor. Zero or more output records
may be produced by the processor.

A stateful processor uses the object-oriented paradigm to define the stateful logic. The stateful
logic is defined by implementing the following methods:

 init - Initialize the stateful processor and define any state variables as needed
 handleInputRows - Process input rows belonging to a grouping key and emit output if
needed
 handleExpiredTimer - Handle expired timers and emit output if needed
 close - Perform any cleanup operations if needed
 handleInitialState - Optionally handle the initial state batch dataframe

The methods above will be invoked by the Spark query engine when the operator is executed as
part of a streaming query.
Note also that not all types of operations are supported in each of the methods. For eg, users
cannot register timers in the init method. Similarly, they cannot operate on input rows in
the handleExpiredTimer method. The engine will detect unsupported/incompatible operations
and fail the query, if needed.
Using the StatefulProcessorHandle
Many operations within the methods above can be performed using
the StatefulProcessorHandle object. The StatefulProcessorHandle object provides methods to
interact with the underlying state store. This object can be retrieved within the StatefulProcessor
by invoking the getHandle method.
Using State Variables
State variables are class specific members used to store user state. They need to be declared once
and initialized within the init method of the stateful processor.
Initializing a state variable typically involves the following steps:

 Provide a unique name for the state variable (unique within the stateful processor
definition)
 Provide a type for the state variable (ValueState, ListState, MapState) - depending on the
type, the appropriate method on the handle needs to be invoked
 Provide a state encoder for the state variable (in Scala - this can be skipped if implicit
encoders are available)
 Provide an optional TTL config for the state variable

Types of state variables


State variables can be of the following types:

 Value State
 List State
 Map State

Similar to collections for popular programming languages, the state types could be used to model
data structures optimized for various types of operations for the underlying storage layer. For
example, appends are optimized for ListState and point lookups are optimized for MapState.
Providing state encoders
State encoders are used to serialize and deserialize the state variables. In Scala, the state encoders
can be skipped if implicit encoders are available. In Java and Python, the state encoders need to
be provided explicitly. Built-in encoders for primitives, case classes and Java Bean classes are
provided by default via the Spark SQL encoders.
Providing implicit encoders in Scala
In Scala, implicit encoders can be provided for case classes and primitive types.
The implicits object is provided as part of the StatefulProcessor class. Within the
StatefulProcessor definition, the user can simply import implicits as import implicits._ and then
they do not require to pass the encoder type explicitly.
Providing TTL for state variables
State variables can be configured with an optional TTL (Time-To-Live) value. The TTL value is
used to automatically evict the state variable after the specified duration. The TTL value can be
provided as a Duration.
Handling input rows
The handleInputRows method is used to process input rows belonging to a grouping key and
emit output if needed. The method is invoked by the Spark query engine for each grouping key
value received by the operator. If multiple rows belong to the same grouping key, the provided
iterator will include all those rows.
Handling expired timers
Within the handleInputRows or handleExpiredTimer methods, the stateful processor can register
timers to be triggered at a later time. The handleExpiredTimer method is invoked by the Spark
query engine when a timer set by the stateful processor has expired. This method is invoked once
for each expired timer. Here are a few timer properties that are supported:

 Multiple timers associated with the same grouping key can be registered
 The engine provides the ability to list/add/remove timers as needed
 Timers are also checkpointed as part of the query checkpoint and can be triggered on
query restart as well.

Handling initial state


The handleInitialState method is used to optionally handle the initial state batch dataframe. The
initial state batch dataframe is used to pre-populate the state for the stateful processor. The
method is invoked by the Spark query engine when the initial state batch dataframe is available.
This method is only called once in the lifetime of the query. This is invoked before any input
rows are processed by the stateful processor.
Putting it all together
Here is an example of a StatefulProcessor that implements a downtime detector. Each time a new
value is seen for a given key, it updates the lastSeen state value, clears any existing timers, and
resets a timer for the future.
When a timer expires, the application emits the elapsed time since the last observed event for the
key. It then sets a new timer to emit an update 10 seconds later.

State Schema Evolution


TransformWithState also allows for performing schema evolution of the managed state. There
are 2 parts here:
 evolution across state variables
 evolution within a state variable

Note that schema evolution is only supported on the value side. Key side state schema evolution
is not supported.
Evolution across state variables
This operator allows for state variables to be added and removed across different runs of the
same streaming query. In order to remove a variable, we also need to inform the engine so that
the underlying state can be purged. Users can achieve this by invoking the deleteIfExists method
for a given state variable within the init method of the StatefulProcessor.
Evolution within a state variable
This operator also allows for the state schema of a specific state variable to also be evolved. For
example, if you are using a case class to store the state within a ValueState variable, then it’s
possible for you to evolve this case class by adding/removing/widening fields. We support such
schema evolution only when the underlying encoding format is set to Avro. In order to enable
this, please set the following Spark config
as [Link]("[Link]", "avro").
The following evolution operations are supported within Avro rules:

 Adding a new field


 Removing a field
 Type widening
 Reordering fields

The following evolution operations are not supported:

 Renaming a field
 Type narrowing

Integration with State Data Source


TransformWithState is a stateful operator that allows users to maintain arbitrary state across
batches. In order to read this state, the user needs to provide some additional options in the state
data source reader query. This operator allows for multiple state variables to be used within the
same query. However, because they could be of different composite types and encoding formats,
they need to be read within a batch query one variable at a time. In order to allow this, the user
needs to specify the stateVarName for the state variable they are interested in reading.
Timers can read by setting the option readRegisteredTimers to true. This will return all the
registered timer across grouping keys.
We also allow for composite type variables to be read in 2 formats:

 Flattened: This is the default format where the composite types are flattened out into
individual columns.
 Non-flattened: This is where the composite types are returned as a single column of
Array or Map type in Spark SQL.
Depending on your memory requirements, you can choose the format that best suits your use
case.
In Apache Spark's Structured Streaming, "windows on event time" refer to the ability to perform
aggregations over time-based windows defined by the event time of the data records, rather than
the processing time when Spark receives them. This is crucial for handling out-of-order data and
ensuring correct results in streaming applications.
Key Concepts:
 Event Time:
The time when an event actually occurred, as recorded within the data itself (e.g., a timestamp
column in a log record).
 Processing Time:
The time when Spark processes the event. This can differ significantly from event time due to
network delays, system load, or other factors.
 Windowed Aggregations:
Grouping data into fixed or variable-length time intervals (windows) and performing
aggregations (e.g., count, sum, average) within those windows.
 Watermarking:
A mechanism in Structured Streaming to handle late-arriving data. Watermarks define a
threshold for how late data can be before it's considered too old to be processed and its state is
cleaned up. This prevents unbounded state growth in long-running streaming queries.
Types of Windows on Event Time:
 Tumbling Windows: Fixed-size, non-overlapping windows. Each event belongs to exactly one
window.
Python
[Link](window("eventTime", "10 minutes")).count()
 Sliding Windows: Fixed-size, overlapping windows. Each event can belong to multiple
windows.
Python
[Link](window("eventTime", "10 minutes", "5 minutes")).count()
 Session Windows: Variable-length windows defined by a period of inactivity (gap
duration). Events are grouped into a session if they occur within a specified time gap of each
other.
Python
[Link](session_window("eventTime", "5 minutes")).count()
Handling Late Data with Watermarks:
When performing windowed aggregations on event time, late data (records arriving after their
corresponding window has been processed) can cause incorrect results without proper
handling. Watermarks address this by allowing Spark to track the progress of event time and
determine when it's safe to finalize and output results for a particular window.
Python
[Link]("eventTime", "10 minutes") \
.groupBy(window("eventTime", "1 hour")) \
.count()
In this example, withWatermark("eventTime", "10 minutes") tells Spark that data up to 10
minutes late is expected. This allows Spark to hold state for windows for a longer period,
incorporating late-arriving data before emitting final results and cleaning up the state.
Tumbling windows in Apache Spark, particularly within Spark Structured Streaming, represent a
method of aggregating data over fixed-size, non-overlapping, and contiguous time intervals.
Key Characteristics of Tumbling Windows:
 Fixed Size:
Each tumbling window has a predefined, constant duration (e.g., 5 minutes, 1 hour).
 Non-Overlapping:
The windows do not share any data points. Once a window ends, a new one begins immediately,
and events are assigned to only one window.
 Contiguous:
Tumbling windows form a continuous sequence in time, covering all events within the specified
period without gaps.
How they work in Spark:
In Spark Structured Streaming, tumbling windows are typically used to perform aggregations on
incoming data streams. You define a window function on a timestamp column within your
DataFrame or Dataset, specifying the window duration. Spark then groups the data into these
fixed-size, non-overlapping time segments and applies the desired aggregation (e.g., count, sum,
average) within each window.
Example in PySpark Structured Streaming:
Python
from [Link] import window, col, count

# Assuming 'df' is a streaming DataFrame with a timestamp column 'event_time'


windowed_df = [Link](
window(col("event_time"), "10 minutes")
).agg(
count("value").alias("event_count")
)

# You can then write this 'windowed_df' to a sink


# windowed_df.[Link]("console").outputMode("complete").start()
In this example, the data is grouped into 10-minute tumbling windows based on
the event_time column, and the count of events within each window is calculated. The output
would include the start and end times of each 10-minute window along with the aggregated
count.
Use Cases:
Tumbling windows are valuable for scenarios requiring analysis of data in discrete, isolated time
segments, such as:
 Calculating hourly website traffic.
 Generating minute-by-minute metrics for system performance.
 Aggregating sales data for fixed intervals.
 Monitoring sensor data in fixed time batches.

Handling late data in Spark Structured Streaming, especially in stateful operations like
aggregations and joins, is primarily achieved using watermarks. Watermarks provide a
mechanism to manage the state size and ensure timely results by defining how long Spark should
wait for late-arriving data based on event time.
Here's how watermarks work and how to implement them: Defining the Watermark.
You define a watermark on an event-time column in your streaming DataFrame using
the withWatermark function. This function takes two arguments: the name of the event-time
column and a "delay threshold" (e.g., "10 minutes").
Python
from [Link] import window

# Assuming 'timestamp' is your event-time column


streamingDF = [Link]("rate").load() \
.withColumn("eventTime", current_timestamp()) # Example: Add an eventTime column

windowedCounts = streamingDF \
.withWatermark("eventTime", "10 minutes") \
.groupBy(window("eventTime", "5 minutes"), "id") \
.count()
 How Watermarks Manage Late Data:
 Spark keeps track of the maximum event time observed so far in the data.
 The watermark is calculated by subtracting the specified delayThreshold from this maximum
observed event time.
 Any data record with an event time older than the current watermark is considered "too late" and
is typically discarded or not included in aggregations for already-closed windows. This prevents
unbounded state growth and resource exhaustion.
Impact on Stateful Operations:
 Aggregations: Watermarks allow Spark to finalize and emit results for windows even if some
late data might still arrive. Once a window's end time plus the watermark delay has passed,
Spark considers that window complete and can clean up its state.
 Joins: In streaming joins, watermarks help manage the state of records waiting for their
corresponding join partners. Records older than the watermark for a particular side of the join
can be discarded if their partner is unlikely to arrive within the allowed delay.
Output Modes and Watermarks:
 Update Mode: Watermarks are highly effective with the update output mode, where Spark can
discard outdated data and prevent memory issues in stateful processing.
 Append Mode: When using append mode, Spark waits for the watermark to pass before
emitting the final results of a window to the sink. This ensures that all potentially late data within
the watermark threshold is included before the window is finalized.
Key Considerations:
 Delay Threshold:
The choice of the delay threshold is crucial. A longer threshold allows for more late data to be
included but increases latency and state size. A shorter threshold reduces latency and state but
might miss more late data. This is a trade-off that needs to be evaluated based on your
application's requirements.
 Event Time vs. Processing Time:
Watermarks operate on event time, which is the time the event actually occurred, rather than
processing time, which is when Spark processed the event. This distinction is vital for accurate
handling of late data.

Dropping Duplicates in a Stream

Dropping duplicates in Spark Structured Streaming involves using dropDuplicates() in


conjunction with withWatermark() to manage state and ensure correctness.
Here is how to drop duplicates in a Spark Structured Stream:
 Define a Watermark: Before applying dropDuplicates(), you must define a watermark on a
timestamp column in your streaming DataFrame. The watermark helps Spark manage state by
defining how late data can arrive and still be considered for deduplication. This prevents the state
from growing indefinitely.
Python
streamingDF = [Link] \
.format("kafka") \
.option("[Link]", "localhost:9092") \
.option("subscribe", "mytopic") \
.load() \
.withColumn("timestamp_col", [Link]("value").cast(TimestampType())) \
.withWatermark("timestamp_col", "10 minutes") # 10 minutes late data allowed
 Apply dropDuplicates(): After defining the watermark, you can use dropDuplicates() to remove
duplicate rows. You can specify a subset of columns to consider when identifying duplicates.
Python
deduplicatedDF = [Link](["id", "timestamp_col"])
 dropDuplicates() will keep track of the seen values for the specified columns within the
watermark window.
 Any new record with the same values for the specified columns that falls within the watermark
window will be considered a duplicate and dropped.
 Records older than the watermark will be evicted from the state, preventing unbounded state
growth.
 Write the Stream: Finally, write the deduplicated stream to your desired sink.
Python
query = [Link] \
.outputMode("append") \
.format("console") \
.start()

[Link]()
Important Considerations:
 Event Time vs. Processing Time:
Watermarking and deduplication in Structured Streaming rely on event time, not processing
time. Ensure your timestamp column accurately reflects the event time.
 Watermark Delay:
The watermark delay should be chosen carefully based on the expected maximum delay of your
incoming data. A larger delay allows for more late data but increases the state size.
 State Management:
Deduplication in streaming is a stateful operation. Spark manages this state, and checkpointing is
crucial for fault tolerance.
 Ordering and Latest Records:
If you need to keep the latest record among duplicates (e.g., based on
an updated_at timestamp), dropDuplicates() alone might not be sufficient. You might need to
combine it with window functions and row_number() to explicitly select the latest record within
a partition before deduplicating.

Structured Streaming Basics


Structured Streaming is a high-level API for stream processing that became production-ready in
Spark 2.2. Structured Streaming allows you to take the same operations that you perform in
batch mode using Spark’s structured APIs, and run them in a streaming fashion. This can reduce
latency and allow for incremental processing. The best thing about Structured Streaming is that it
allows you to rapidly and quickly get value out of streaming systems with virtually no code
changes. It also makes it easy to reason about because you can write your batch job as a way to
prototype it and then you can convert it to a streaming job. The way all of this works is by
incrementally processing that data.
Spark Structured Streaming offers a high-level API for stream processing built on the Spark SQL
engine, allowing users to express streaming computations using familiar DataFrame/Dataset
APIs. The core concepts include:
 Treating Streams as Unbounded Tables:
Structured Streaming views a continuous stream of data as an unbounded table that is
continuously appended with new rows. This allows users to apply standard batch-like queries
and operations on this "streaming table," and Spark incrementally and continuously updates the
results.
 Micro-batch Processing:
Internally, Structured Streaming processes data in small, time-based batches called micro-
batches. This provides near real-time processing by dividing the continuous stream into
manageable chunks, enabling efficient execution and fault tolerance.
 Unified API for Batch and Streaming:
Structured Streaming uses the same DataFrame/Dataset API for both batch and streaming
computations. This unification simplifies development, reduces the need for separate codebases,
and facilitates the migration of existing batch jobs to streaming.
 Fault Tolerance and Exactly-Once Guarantees:
The engine provides end-to-end exactly-once fault-tolerance guarantees through mechanisms
like checkpointing and Write-Ahead Logs. Checkpointing stores the state of the streaming query,
allowing recovery from failures without data loss or duplication.
 Sources and Sinks:
Structured Streaming supports various data sources (e.g., Kafka, files, Delta Lake) and sinks
(e.g., Delta Lake, files, Kafka, console, memory). Users define the input source to read from and
the output sink to write processed data to.
 Streaming Queries:
A streaming query represents the continuous computation defined on the streaming data. Users
start a streaming query using [Link](), which returns a StreamingQuery object for
monitoring and control.
 Output Modes:
Structured Streaming offers different output modes (e.g., Append, Complete, Update) to control
how the results of the streaming query are written to the sink. This allows for flexibility in
handling various streaming scenarios, such as appending new data, updating a complete result
table, or only showing updated rows.

Spark Structured Streaming provides a high-level API for stream processing that treats live data
streams as continuous, unbounded tables. This allows you to express streaming computations
using the same DataFrame and Dataset APIs used for batch processing, simplifying development
and enabling code reuse.
Key Concepts and How It Works:
 Treating Streams as Tables:
The core idea is to view incoming data as rows continuously appended to a table. You write your
streaming queries as if you were querying a static table, and Spark handles the incremental and
continuous execution.
 Micro-batch Processing:
Structured Streaming internally processes data in small, time-based batches called micro-
batches. This allows for near real-time processing while benefiting from Spark SQL's
optimizations and fault-tolerance mechanisms.
 Unified API:
You can use the familiar DataFrame and Dataset APIs (in Scala, Java, Python, or R) for
transformations and aggregations on streaming data, including operations like filtering, joining,
windowing, and aggregations.
 Sources and Sinks:
 Sources: Structured Streaming can read data from various sources, including file systems (e.g.,
HDFS, S3), Kafka, Azure Event Hubs, and TCP sockets.
 Sinks: Processed streaming data can be written to various sinks, such as file systems (e.g., Delta
Lake), databases, or the console for debugging.
Fault Tolerance and Exactly-Once Guarantees:
Structured Streaming provides end-to-end exactly-once fault-tolerance guarantees through
checkpointing and Write-Ahead Logs, ensuring no data loss or duplication even in case of
failures.
Example (PySpark):
Python
from [Link] import SparkSession
from [Link] import *

# Create a SparkSession
spark = [Link] \
.appName("StructuredStreamingExample") \
.getOrCreate()

# Read streaming data from a file source


# (e.g., a directory where new CSV files are added)
streaming_df = [Link] \
.format("csv") \
.option("header", "true") \
.schema("id INT, name STRING, value DOUBLE") \
.load("input_data_directory")

# Apply transformations (e.g., calculate sum of 'value' per 'name')


aggregated_df = streaming_df \
.groupBy("name") \
.agg(sum("value").alias("total_value"))

# Write the processed data to a sink (e.g., console for debugging)


query = aggregated_df.writeStream \
.outputMode("complete") \
.format("console") \
.start()

# Wait for the termination of the query


[Link]()
In this example:
 [Link] defines the streaming source.
 groupBy and agg perform a streaming aggregation.
 writeStream defines the output sink (console in this case) and the output mode.
 start() initiates the streaming query.
 awaitTermination() keeps the application running until the query is manually stopped or an error
occurs.

Transformations on Streams, Input and Output.


Input DStreams and Receivers
Input DStreams are DStreams representing the stream of input data received from streaming
sources. In the quick example, lines was an input DStream as it represented the stream of data
received from the netcat server. Every input DStream (except file stream, discussed later in this
section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from
a source and stores it in Spark’s memory for processing.
Spark Streaming provides two categories of built-in streaming sources.

 Basic sources: Sources directly available in the StreamingContext API. Examples: file
systems, and socket connections.
 Advanced sources: Sources like Kafka, Kinesis, etc. are available through extra utility
classes. These require linking against extra dependencies as discussed in
the linking section.

We are going to discuss some of the sources present in each category later in this section.
Note that, if you want to receive multiple streams of data in parallel in your streaming
application, you can create multiple input DStreams (discussed further in the Performance
Tuning section). This will create multiple receivers which will simultaneously receive multiple
data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one
of the cores allocated to the Spark Streaming application. Therefore, it is important to remember
that a Spark Streaming application needs to be allocated enough cores (or threads, if running
locally) to process the received data, as well as to run the receiver(s).
Points to remember
 When running a Spark Streaming program locally, do not use “local” or “local[1]” as the
master URL. Either of these means that only one thread will be used for running tasks
locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, etc.),
then the single thread will be used to run the receiver, leaving no thread for processing
the received data. Hence, when running locally, always use “local[n]” as the master URL,
where n > number of receivers to run (see Spark Properties for information on how to set
the master).
 Extending the logic to running on a cluster, the number of cores allocated to the Spark
Streaming application must be more than the number of receivers. Otherwise the system
will receive data, but not be able to process it.
Basic Sources
We have already taken a look at the [Link](...) in the quick example which creates
a DStream from text data received over a TCP socket connection. Besides sockets, the
StreamingContext API provides methods for creating DStreams from files as input sources.
File Streams
For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3,
NFS, etc.), a DStream can be created as via [Link][KeyClass, ValueClass,
InputFormatClass].
File streams do not require running a receiver so there is no need to allocate any cores for
receiving file data.
For simple text files, the easiest method is [Link](dataDirectory).

 Python
 Scala
 Java
fileStream is not available in the Python API; only textFileStream is available.
[Link](dataDirectory)
How Directories are Monitored
Spark Streaming will monitor the directory dataDirectory and process any files created in that
directory.
 A simple directory can be monitored, such as "hdfs://namenode:8040/logs/". All files
directly under such a path will be processed as they are discovered.
 A POSIX glob pattern can be supplied, such as "hdfs://namenode:8040/logs/2017/*".
Here, the DStream will consist of all files in the directories matching the pattern. That is:
it is a pattern of directories, not of files in directories.
 All files must be in the same data format.
 A file is considered part of a time period based on its modification time, not its creation
time.
 Once processed, changes to a file within the current window will not cause the file to be
reread. That is: updates are ignored.
 The more files under a directory, the longer it will take to scan for changes — even if no
files have been modified.
 If a wildcard is used to identify directories, such as "hdfs://namenode:8040/logs/2016-*",
renaming an entire directory to match the path will add the directory to the list of
monitored directories. Only the files in the directory whose modification time is within
the current window will be included in the stream.
 Calling [Link]() to fix the timestamp is a way to have the file picked up in
a later window, even if its contents have not changed.

Using Object Stores as a source of data


“Full” Filesystems such as HDFS tend to set the modification time on their files as soon as the
output stream is created. When a file is opened, even before data has been completely written, it
may be included in the DStream - after which updates to the file within the same window will be
ignored. That is: changes may be missed, and data omitted from the stream.
To guarantee that changes are picked up in a window, write the file to an unmonitored directory,
then, immediately after the output stream is closed, rename it into the destination directory.
Provided the renamed file appears in the scanned destination directory during the window of its
creation, the new data will be picked up.
In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename
operations, as the data is actually copied. Furthermore, a renamed object may have the time of
the rename() operation as its modification time, so may not be considered part of the window
which the original create time implied they were.
Careful testing is needed against the target object store to verify that the timestamp behavior of
the store is consistent with that expected by Spark Streaming. It may be that writing directly into
a destination directory is the appropriate strategy for streaming data via the chosen object store.
For more details on this topic, consult the Hadoop Filesystem Specification.
Streams based on Custom Receivers
DStreams can be created with data streams received through custom receivers. See the Custom
Receiver Guide for more details.
Queue of RDDs as a Stream
For testing a Spark Streaming application with test data, one can also create a DStream based on
a queue of RDDs, using [Link](queueOfRDDs). Each RDD pushed into
the queue will be treated as a batch of data in the DStream, and processed like a stream.
Transformations on DStreams
Similar to that of RDDs, transformations allow the data from the input DStream to be modified.
DStreams support many of the transformations available on normal Spark RDD’s. Some of the
common ones are as follows.
Transformation Meaning

Return a new DStream by


passing each element of the
map(func)
source DStream through a
function func.

Similar to map, but each input


flatMap(func) item can be mapped to 0 or more
output items.

Return a new DStream by


selecting only the records of the
filter(func)
source DStream on
which func returns true.

Changes the level of parallelism


repartition(numPartition
in this DStream by creating more
s)
or fewer partitions.

Return a new DStream that


contains the union of the
union(otherStream)
elements in the source DStream
and otherDStream.

Return a new DStream of single-


element RDDs by counting the
count()
number of elements in each
RDD of the source DStream.

reduce(func) Return a new DStream of single-


element RDDs by aggregating
the elements in each RDD of the
source DStream using a
function func (which takes two
arguments and returns one). The
function should be associative
and commutative so that it can
Transformation Meaning

be computed in parallel.

When called on a DStream of


elements of type K, return a new
DStream of (K, Long) pairs
countByValue()
where the value of each key is its
frequency in each RDD of the
source DStream.

When called on a DStream of


(K, V) pairs, return a new
DStream of (K, V) pairs where
the values for each key are
aggregated using the given
reduce function. Note: By
default, this uses Spark's default
reduceByKey(func, number of parallel tasks (2 for
[numTasks]) local mode, and in cluster mode
the number is determined by the
config
property [Link]
m) to do the grouping. You can
pass an
optional numTasks argument to
set a different number of tasks.

When called on two DStreams of


(K, V) and (K, W) pairs, return a
join(otherStream,
new DStream of (K, (V, W))
[numTasks])
pairs with all pairs of elements
for each key.

When called on a DStream of


cogroup(otherStream, (K, V) and (K, W) pairs, return a
[numTasks]) new DStream of (K, Seq[V],
Seq[W]) tuples.

transform(func) Return a new DStream by


applying a RDD-to-RDD
Transformation Meaning

function to every RDD of the


source DStream. This can be
used to do arbitrary RDD
operations on the DStream.

Return a new "state" DStream


where the state for each key is
updated by applying the given
function on the previous state of
updateStateByKey(func)
the key and the new values for
the key. This can be used to
maintain arbitrary state data for
each key.

A few of these transformations are worth discussing in more detail.


UpdateStateByKey Operation
The updateStateByKey operation allows you to maintain arbitrary state while continuously
updating it with new information. To use this, you will have to do two steps.

1. Define the state - The state can be an arbitrary data type.


2. Define the state update function - Specify with a function how to update the state using
the previous state and the new values from an input stream.

In every batch, Spark will apply the state update function for all existing keys, regardless of
whether they have new data in a batch or not. If the update function returns None then the key-
value pair will be eliminated.
Let’s illustrate this with an example. Say you want to maintain a running count of each word
seen in a text data stream. Here, the running count is the state and it is an integer. We define the
update function as:

 Python
 Scala
 Java
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count
to get the new count
This is applied on a DStream containing words (say, the pairs DStream containing (word,
1) pairs in the earlier example).
runningCounts = [Link](updateFunction)
The update function will be called for each word, with newValues having a sequence of 1’s
(from the (word, 1) pairs) and the runningCount having the previous count. For the complete
Python code, take a look at the example stateful_network_wordcount.py.
Note that using updateStateByKey requires the checkpoint directory to be configured, which is
discussed in detail in the checkpointing section.
Transform Operation
The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-
RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is
not exposed in the DStream API. For example, the functionality of joining every batch in a data
stream with another dataset is not directly exposed in the DStream API. However, you can easily
use transform to do this. This enables very powerful possibilities. For example, one can do real-
time data cleaning by joining the input data stream with precomputed spam information (maybe
generated with Spark as well) and then filtering based on it.

 Python
 Scala
 Java
spamInfoRDD = [Link](...) # RDD containing spam information

# join data stream with spam information to do data cleaning


cleanedDStream = [Link](lambda rdd: [Link](spamInfoRDD).filter(...))
Note that the supplied function gets called in every batch interval. This allows you to do time-
varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, etc.
can be changed between batches.
Window Operations
Spark Streaming also provides windowed computations, which allow you to apply
transformations over a sliding window of data. The following figure illustrates this sliding
window.

As shown in the figure, every time the window slides over a source DStream, the source RDDs
that fall within the window are combined and operated upon to produce the RDDs of the
windowed DStream. In this specific case, the operation is applied over the last 3 time units of
data, and slides by 2 time units. This shows that any window operation needs to specify two
parameters.

 window length - The duration of the window (3 in the figure).


 sliding interval - The interval at which the window operation is performed (2 in the
figure).

These two parameters must be multiples of the batch interval of the source DStream (1 in the
figure).
Let’s illustrate the window operations with an example. Say, you want to extend the earlier
example by generating word counts over the last 30 seconds of data, every 10 seconds. To do
this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over
the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow.

 Python
 Scala
 Java
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = [Link](lambda x, y: x + y, lambda x, y: x -
y, 30, 10)
Some of the common window operations are as follows. All of these operations take the said two
parameters - windowLength and slideInterval.
Transformation Meaning

Return a new
DStream which is
window(windowLength, slideInterval
computed based on
)
windowed batches of
the source DStream.

Return a sliding
countByWindow(windowLength, sli window count of
deInterval) elements in the
stream.

reduceByWindow(func, windowLen Return a new single-


gth, slideInterval) element stream,
created by
aggregating elements
in the stream over a
sliding interval
using func. The
Transformation Meaning

function should be
associative and
commutative so that
it can be computed
correctly in parallel.

When called on a
DStream of (K, V)
pairs, returns a new
DStream of (K, V)
pairs where the
values for each key
are aggregated using
the given reduce
function func over
batches in a sliding
window. Note: By
default, this uses
reduceByKeyAndWindow(func, win Spark's default
dowLength, slideInterval, number of parallel
[numTasks]) tasks (2 for local
mode, and in cluster
mode the number is
determined by the
config
property [Link]
[Link]) to do
the grouping. You
can pass an
optional numTasks a
rgument to set a
different number of
tasks.

reduceByKeyAndWindow(func, inv A more efficient


Func, windowLength, slideInterval, version of the
[numTasks]) above reduceByKey
AndWindow() where
the reduce value of
Transformation Meaning

each window is
calculated
incrementally using
the reduce values of
the previous
window. This is done
by reducing the new
data that enters the
sliding window, and
“inverse reducing”
the old data that
leaves the window.
An example would
be that of “adding”
and “subtracting”
counts of keys as the
window slides.
However, it is
applicable only to
“invertible reduce
functions”, that is,
those reduce
functions which have
a corresponding
“inverse reduce”
function (taken as
parameter invFunc).
Like
in reduceByKeyAnd
Window, the number
of reduce tasks is
configurable through
an optional
argument. Note
that checkpointing m
ust be enabled for
using this operation.
countByValueAndWindow(window When called on a
Length, slideInterval, [numTasks]) DStream of (K, V)
pairs, returns a new
DStream of (K,
Long) pairs where
the value of each key
Transformation Meaning

is its frequency
within a sliding
window. Like
in reduceByKeyAnd
Window, the number
of reduce tasks is
configurable through
an optional
argument.

Join Operations
Finally, it’s worth highlighting how easily you can perform different kinds of joins in Spark
Streaming.
Stream-stream joins
Streams can be very easily joined with other streams.

 Python
 Scala
 Java
stream1 = ...
stream2 = ...
joinedStream = [Link](stream2)
Here, in each batch interval, the RDD generated by stream1 will be joined with the RDD
generated by stream2. You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin. Furthermore,
it is often very useful to do joins over windows of the streams. That is pretty easy as well.

 Python
 Scala
 Java
windowedStream1 = [Link](20)
windowedStream2 = [Link](60)
joinedStream = [Link](windowedStream2)
Stream-dataset joins
This has already been shown earlier while explain [Link] operation. Here is yet
another example of joining a windowed stream with a dataset.

 Python
 Scala
 Java
dataset = ... # some RDD
windowedStream = [Link](20)
joinedStream = [Link](lambda rdd: [Link](dataset))
In fact, you can also dynamically change the dataset you want to join against. The function
provided to transform is evaluated every batch interval and therefore will use the current dataset
that dataset reference points to.
The complete list of DStream transformations is available in the API documentation. For the
Python API, see DStream. For the Scala API, see DStream and PairDStreamFunctions. For the
Java API, see JavaDStream and JavaPairDStream.
Output Operations on DStreams
Output operations allow DStream’s data to be pushed out to external systems like a database or a
file system. Since the output operations actually allow the transformed data to be consumed by
external systems, they trigger the actual execution of all the DStream transformations (similar to
actions for RDDs). Currently, the following output operations are defined:
Output Operation Meaning

Prints the first ten elements


of every batch of data in a
DStream on the driver node
running the streaming
application. This is useful
print()
for development and
debugging.
Python API This is
called pprint() in the
Python API.

Save this DStream's


contents as text files. The
saveAsTextFiles(prefix, file name at each batch
[suffix]) interval is generated based
on prefix and suffix: "prefix-
TIME_IN_MS[.suffix]".

saveAsObjectFiles(prefix, Save this DStream's


[suffix]) contents
as SequenceFiles of
serialized Java objects. The
file name at each batch
interval is generated based
Output Operation Meaning

on prefix and suffix: "prefix-


TIME_IN_MS[.suffix]".
Python API This is not
available in the Python API.

Save this DStream's


contents as Hadoop files.
The file name at each batch
saveAsHadoopFiles(prefix, interval is generated based
[suffix]) on prefix and suffix: "prefix-
TIME_IN_MS[.suffix]".
Python API This is not
available in the Python API.

The most generic output


operator that applies a
function, func, to each RDD
generated from the stream.
This function should push
the data in each RDD to an
external system, such as
saving the RDD to files, or
foreachRDD(func) writing it over the network
to a database. Note that the
function func is executed in
the driver process running
the streaming application,
and will usually have RDD
actions in it that will force
the computation of the
streaming RDDs.

Design Patterns for using foreachRDD


[Link] is a powerful primitive that allows data to be sent out to external systems.
However, it is important to understand how to use this primitive correctly and efficiently. Some
of the common mistakes to avoid are as follows.
Often writing data to external systems requires creating a connection object (e.g. TCP connection
to a remote server) and using it to send data to a remote system. For this purpose, a developer
may inadvertently try creating a connection object at the Spark driver, and then try to use it in a
Spark worker to save records in the RDDs. For example (in Scala),

 Python
 Scala
 Java
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
[Link](lambda record: [Link](record))
[Link]()

[Link](sendRecord)
This is incorrect as this requires the connection object to be serialized and sent from the driver to
the worker. Such connection objects are rarely transferable across machines. This error may
manifest as serialization errors (connection object not serializable), initialization errors
(connection object needs to be initialized at the workers), etc. The correct solution is to create the
connection object at the worker.
However, this can lead to another common mistake - creating a new connection for every record.
For example,

 Python
 Scala
 Java
def sendRecord(record):
connection = createNewConnection()
[Link](record)
[Link]()

[Link](lambda rdd: [Link](sendRecord))


Typically, creating a connection object has time and resource overheads. Therefore, creating and
destroying a connection object for each record can incur unnecessarily high overheads and can
significantly reduce the overall throughput of the system. A better solution is to
use [Link] - create a single connection object and send all the records in a RDD
partition using that connection.

 Python
 Scala
 Java
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
[Link](record)
[Link]()

[Link](lambda rdd: [Link](sendPartition))


This amortizes the connection creation overheads over many records.
Finally, this can be further optimized by reusing connection objects across multiple
RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs
of multiple batches are pushed to the external system, thus further reducing the overheads.

 Python
 Scala
 Java
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = [Link]()
for record in iter:
[Link](record)
# return to the pool for future reuse
[Link](connection)

[Link](lambda rdd: [Link](sendPartition))


Note that the connections in the pool should be lazily created on demand and timed out if not
used for a while. This achieves the most efficient sending of data to external systems.
Other points to remember:
 DStreams are executed lazily by the output operations, just like RDDs are lazily executed
by RDD actions. Specifically, RDD actions inside the DStream output operations force
the processing of the received data. Hence, if your application does not have any output
operation, or has output operations like [Link]() without any RDD action
inside them, then nothing will get executed. The system will simply receive the data and
discard it.
 By default, output operations are executed one-at-a-time. And they are executed in the
order they are defined in the application.

DataFrame and SQL Operations


You can easily use DataFrames and SQL operations on streaming data. You have to create a
SparkSession using the SparkContext that the StreamingContext is using. Furthermore, this has
to be done such that it can be restarted on driver failures. This is done by creating a lazily
instantiated singleton instance of SparkSession. This is shown in the following example. It
modifies the earlier word count example to generate word counts using DataFrames and SQL.
Each RDD is converted to a DataFrame, registered as a temporary table and then queried using
SQL.

 Python
 Scala
 Java
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):


print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance([Link]())

# Convert RDD[String] to RDD[Row] to DataFrame


rowRdd = [Link](lambda w: Row(word=w))
wordsDataFrame = [Link](rowRdd)

# Creates a temporary view using the DataFrame


[Link]("words")

# Do word count on table using SQL and print it


wordCountsDataFrame = [Link]("select word, count(*) as total from words group by
word")
[Link]()
except:
pass

[Link](process)
See the full source code.
You can also run SQL queries on tables defined on streaming data from a different thread (that
is, asynchronous to the running StreamingContext). Just make sure that you set the
StreamingContext to remember a sufficient amount of streaming data such that the query can
run. Otherwise the StreamingContext, which is unaware of any of the asynchronous SQL
queries, will delete off old streaming data before the query can complete. For example, if you
want to query the last batch, but your query can take 5 minutes to run, then
call [Link](Minutes(5)) (in Scala, or equivalent in other languages).
See the DataFrames and SQL guide to learn more about DataFrames.

MLlib Operations
You can also easily use machine learning algorithms provided by MLlib. First of all, there are
streaming machine learning algorithms (e.g. Streaming Linear Regression, Streaming KMeans,
etc.) which can simultaneously learn from the streaming data as well as apply the model on the
streaming data. Beyond these, for a much larger class of machine learning algorithms, you can
learn a learning model offline (i.e. using historical data) and then apply the model online on
streaming data. See the MLlib guide for more details.

Caching / Persistence
Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That
is, using the persist() method on a DStream will automatically persist every RDD of that
DStream in memory. This is useful if the data in the DStream will be computed multiple times
(e.g., multiple operations on the same data). For window-based operations
like reduceByWindow and reduceByKeyAndWindow and state-based operations
like updateStateByKey, this is implicitly true. Hence, DStreams generated by window-based
operations are automatically persisted in memory, without the developer calling persist().
For input streams that receive data over the network (such as, Kafka, sockets, etc.), the default
persistence level is set to replicate the data to two nodes for fault-tolerance.
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in
memory. This is further discussed in the Performance Tuning section.

You might also like