PySpark: Overview and Key Features
PySpark: Overview and Key Features
PySpark is a Spark library written in Python to run Python application using Apache Spark capabilities, using PySpark
we can run applications parallelly on the distributed cluster (multiple nodes).
PySpark is a Python API for Apache Spark. Apache Spark is an analytical processing engine for large scale powerful
distributed data processing and machine learning applications.
Spark basically written in Scala and later on due to its industry adaptation it’s API PySpark released for Python using
Py4J. Py4J is a Java library that is integrated within PySpark and allows python to dynamically interface with JVM
objects
• PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data
efficiently in a distributed fashion.
• Applications running on PySpark are 100x faster than traditional systems.
• You will get great benefits using PySpark for data ingestion pipelines.
• Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.
• PySpark also is used to process real-time data using Streaming and Kafka.
• Using PySpark streaming you can also stream files from the file system and also stream from the socket.
• PySpark natively has machine learning and graph libraries.
As of writing this Spark with Python (PySpark) tutorial, Spark supports below cluster managers:
• Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
• Apache Mesos – Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark
applications.
• Hadoop YARN – the resource manager in Hadoop 2. This is mostly used, cluster manager.
• Kubernetes – an open-source system for automating deployment, scaling, and management of
containerized applications.
local – which is not really a cluster manager but still I wanted to mention as we use “local” for master() in order
to run Spark on your laptop/computer
Apache Spark provides a suite of Web UIs (Jobs, Stages, Tasks, Storage, Environment, Executors, and SQL)
to monitor the status of your Spark application, resource consumption of Spark cluster, and Spark
configurations. On Spark Web UI, you can see how the operations are executed.
[Link] true
[Link] [Link]
SparkSession can be created using a builder() or newSession() methods of the SparkSession. Spark session
internally creates a sparkContext variable of SparkContext. You can create multiple SparkSession objects but only one
SparkContext per JVM. In case if you want to create another new SparkContext you should stop existing
Sparkcontext (using stop()) before creating a new one.
# Import SparkSession
from [Link] import SparkSession
# Create SparkSession
spark = [Link] \
.master("local[1]") \
.appName("[Link]") \
.getOrCreate()
SparkContext has several functions to use with RDDs. For example, it’s parallelize() method is used to create an
RDD from a list.
# Create RDD from parallelize
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=[Link](dataList)
RDD can also be created from a text file using textFile() function of the SparkContext.
# Create RDD from external Data source
rdd2 = [Link]("/path/[Link]")
RDD transformations – Transformations are lazy operations. When you run a transformation(for example
update), instead of updating a current RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values to the driver.
Transformations on Spark RDD returns another RDD and transformations are lazy meaning they don’t execute
until you call an action on RDD. Some transformations on RDD’s
are flatMap() , map() , reduceByKey() , filter() , sortByKey() and return new RDD instead of updating
the current.
1.7.5. RDD Actions
RDD Action operation returns the values from an RDD to a driver node. In other words, any RDD function that
returns non RDD[T] is considered as an action.
Some actions on RDDs are count() , collect() , first() , max() , reduce() and more.
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table
in a relational database or a data frame in R/Python. DataFrames can be constructed from a wide array of sources
such as structured data files, tables in Hive, external databases, or existing RDDs.
PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed
in the cluster (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in
PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine.
The simplest way to create a DataFrame is from a Python list of data. DataFrame can also be created from an RDD
and by reading files from several sources.
data = [('James','','Smith','1991-04-01','M',3000),
('Michael','Rose','','2000-05-19','M',4000),
('Robert','','Williams','1978-09-05','M',4000),
('Maria','Anne','Jones','1967-12-01','F',4000),
('Jen','Mary','Brown','1980-02-17','F',-1)
]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = [Link](data=data, schema = columns)
[Link]()
Like RDD, DataFrame also has operations like Transformations and Actions.
In real-time applications, DataFrames are created from external sources like files from the local system, HDFS, S3
Azure, HBase, MySQL table e.t.c. Below is an example of how to read a CSV file from a local system.
df = [Link]("/tmp/resources/[Link]")
[Link]()
DataFrame has a rich set of API which supports reading and writing several file formats
• csv
• text
• Avro
• Parquet
• tsv
• xml and many more
PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data
format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.
In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL’s on
Spark Dataframe
Apache Spark provides a suite of Web UI/User Interfaces (Jobs, Stages, Tasks, Storage, Environment, Executors,
and SQL) to monitor the status of your Spark/PySpark application, resource consumption of Spark cluster, and
Spark configurations.
Your application code is the set of instructions that instructs the driver to do a Spark Job and let the driver decide
how to achieve it with the help of executors.
Instructions to the driver are called Transformations and action will trigger the execution.
• Spark Jobs
• Stages
• Tasks
• Storage
• Environment
• Executors
• SQL
2.1. Spark Jobs Tab
The details that I want you to be aware of under the jobs section are Scheduling mode,
the number of Spark Jobs, the number of stages it has, and Description in your spark
job.
• Standalone mode
• YARN mode
• Mesos
Always keep in mind, the number of Spark jobs is equal to the number of actions in the application and each Spark
job should have at least one Stage.
In our above application, we have performed 3 Spark jobs (0,1,2)
Each Wide Transformation results in a separate Number of Stages. In our case, Spark job0 and Spark job1
have individual single stages but when it comes to Spark job 3 we can see two stages that are because of the
partition of data. Data is partitioned into two files by default.
2.1.4. Description
Description links the complete details of the associated SparkJob like Spark Job Status, DAG Visualization,
Completed Stages
• Select the Description of the respective Spark job (Shows stages only for the Spark job opted)
• On the top of Spark Job tab select Stages option (Shows all stages in Application)
The number of tasks you could see in each stage is the number of partitions that spark is going to work on and each
task inside a stage is the same work that will be done by spark but on a different partition of data.
2.2.1. Stage detail
Details of stage showcase Directed Acyclic Graph (DAG) of this stage, where vertices represent the RDDs or
DataFrame and edges represent an operation to be applied.
FileScanRDD
MapPartitionsRDD
SQLExecutionRDD
SQLExecutionRDD is Spark property that is used to track multiple Spark jobs that should all together constitute
a single structured query execution.
Operation in Stage(2) and Stage(3) are
[Link]
[Link]
[Link]
[Link]
Wholestagecodegen
A physical query optimizer in Spark SQL that fuses multiple physical operators
Exchange
2.3. Tasks
2.4. Storage
The Storage tab displays the persisted RDDs and DataFrames, if any, in the application. The summary page shows
the storage levels, sizes and partitions of all RDDs, and the details page shows the sizes and using executors for all
partitions in an RDD or DataFrame.
This environment page has five parts. It is a useful place to check whether your properties have been set correctly.
1. Runtime Information: simply contains the runtime properties like versions of Java and Scala.
2. Spark Properties: lists the application properties like ‘[Link]’ and ‘[Link]’.
3. Hadoop Properties: displays properties relative to Hadoop and YARN. Note: Properties
like ‘[Link]’ are shown not in this part but in ‘Spark Properties’.
4. System Properties: shows more details about the JVM.
5. Classpath Entries: lists the classes loaded from different sources, which is very useful to resolve class
conflicts.
The Executors tab displays summary information about the executors that were created for the application,
including memory and disk usage and task and shuffle information. The Storage Memory column shows the
amount of memory used and reserved for caching data.
2.7. SQL Tab
If the application executes Spark SQL queries then the SQL tab displays information, such as the duration, Spark
jobs, and physical and logical plans for the queries.
In our application, we performed read and count operation on files and DataFrame. So both read and count are
listed SQL Tab
Since Spark 2.0 SparkSession has become an entry point to PySpark to work with RDD, and DataFrame. Prior to 2.0,
SparkContext used to be an entry point.
What is SparkSession
SparkSession was introduced in version 2.0, It is an entry point to underlying PySpark functionality in order to
programmatically create PySpark RDD, DataFrame. It’s object spark is default available in pyspark-shell and it can be
created programmatically using SparkSession.
3.1. SparkSession
With Spark 2.0 a new class SparkSession ([Link] import SparkSession ) has been introduced.
SparkSession is a combined class for all different contexts we used to have prior to 2.0 release (SQLContext and
HiveContext e.t.c). Since 2.0 SparkSession can be used in replace with SQLContext, HiveContext, and other contexts
defined prior to 2.0.
As mentioned in the beginning SparkSession is an entry point to PySpark and creating a SparkSession instance
would be the first statement you would write to program with RDD, DataFrame, and Dataset. SparkSession will be
created using [Link] builder patterns.
Though SparkContext used to be an entry point prior to 2.0, It is not completely replaced with SparkSession, many
features of SparkContext are still available and used in Spark 2.0 and later. You should also know that SparkSession
internally creates SparkConfig and SparkContext with the configuration provided with SparkSession.
• SparkContext,
• SQLContext,
• StreamingContext,
• HiveContext.
You can create as many SparkSession as you want in a PySpark application using
either [Link]() or [Link]() . Many Spark session objects are
required when you wanted to keep PySpark tables (relational entities) logically separated.
# Create SparkSession from builder
import pyspark
from [Link] import SparkSession
spark = [Link]("local[1]") \
.appName('[Link]') \
.getOrCreate()
master() – If you are running it on the cluster you need to use your master name as an argument to master().
usually, it would be either yarn or mesos depends on your cluster setup.
Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this
represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should
be the number of CPU cores you have.
getOrCreate() – This returns a SparkSession object if already exists, and creates a new one if not exist.
version() – Returns the Spark version where your application is running, probably the Spark version your cluster
is configured with.
readStream() – Returns an instance of DataStreamReader class, this is used to read streaming data. that
can be used to read streaming data into DataFrame.
[Link] is an entry point to the PySpark functionality that is used to communicate with the cluster and
to create an RDD, accumulator, and broadcast variables. In this article, you will learn how to create PySpark
SparkContext with examples. Note that you can create only one SparkContext per JVM, in order to create another
first you need to stop the existing one using stop() method.
The Spark driver program creates and uses SparkContext to connect to the cluster manager to submit PySpark jobs,
and know what resource manager (YARN, Mesos, or Standalone) to communicate to. It is the heart of the PySpark
application.
At any given time only one SparkContext instance should be active per JVM. In case you want to create another you
should stop existing SparkContext using stop() before creating a new one.
# Create SparkSession from builder
from [Link] import SparkSession
spark = [Link]("local[1]") \
.appName('[Link]') \
.getOrCreate()
print([Link])
print("Spark App Name : "+ [Link])
# SparkContext stop() method
[Link]()
As explained above you can have only one SparkContext per JVM. If you wanted to create another, you need to
shutdown it first by using stop() method and create a new SparkContext.
When you try to create multiple SparkContext you will get the below error.
You can create SparkContext by programmatically using its constructor, and pass parameters like master and
appName at least as these are mandatory params. The below example creates context with a master as local and
app name as Spark_Example_App.
# Create SparkContext
from pyspark import SparkContext
sc = SparkContext("local", "Spark_Example_App")
print([Link])
You can also create it using [Link]() . It actually returns an existing active
SparkContext otherwise creates one with a specified master and app name.
# Create Spark Context
from pyspark import SparkConf, SparkContext
conf = SparkConf()
[Link]("local").setAppName("Spark Example App")
sc = [Link](conf)
print([Link])
# Create RDD
rdd = [Link](1, 5)
print([Link]())
accumulator (value[, accum_param]) – It creates an pyspark accumulator variable with initial specified
value. Only a driver can access accumulator variables.
broadcast(value) – read-only PySpark broadcast variable . This will be broadcast to the entire cluster.
You can broadcast a variable to a PySpark cluster only once.
SparkContext is an entry point to the PySpark execution engine which communicates with the cluster. Using this
you can create a RDD , Accumulators and Broadcast variables.
5. PySpark RDD Tutorial | Learn with Examples
RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable
distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in
RDD is divided into logical partitions, which can be computed on different nodes of the cluster.
In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed
on several processes scattered across multiple physical servers also called nodes in a cluster while a Python
collection lives and process in just one process.
Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run
computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the
parallelism as PySpark by default provides.
In-Memory Processing
PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference
between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the
RDD in memory to reuse the previous computations.
Immutability
PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply
transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.
Fault Tolerance
PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically
reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures
are automatically recovered for a certain number of times (as per the configuration) and finish the application
seamlessly.
Lazy Evolution
PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all
transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.
Partitioning
When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the
number of cores available.
PySpark RDDs are not much suitable for applications that make updates to the state store such as storage systems
for a web application. For these applications, it is more efficient to use systems that perform traditional update
logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model
for batch analytics and leave these asynchronous applications.
Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this
represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should
be the number of CPU cores you have.
getOrCreate() – This returns a SparkSession object if already exists, and creates a new one if not exist.
Note: Creating SparkSession object, internally creates one SparkContext per JVM.
By using parallelize() function of SparkContext ([Link]() ) you can create an RDD. This
function loads the existing collection from your driver program into parallelizing RDD. This is a basic method to
create RDD and is used when you already have data in memory that is either loaded from a file or from a database.
and it required all data to be present on the driver program prior to creating RDD.
#Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=[Link](data)
Using textFile() method we can read a text (.txt) file into RDD.
#Create RDD from external Data source
rdd2 = [Link]("/path/[Link]")
wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.
#Reads entire file into a RDD as single record.
rdd3 = [Link]("/path/[Link]")
Using emptyRDD() method on sparkContext we can create an RDD with no data . This method creates an
empty RDD with no partition.
# Creates empty RDD with no partition
rdd = [Link]
# rddString = [Link][String]
Sometimes we may need to write an empty RDD to files by partition, In this case, you should create an empty RDD
with partition.
#Create empty RDD with partition
rdd2 = [Link]([],10) #This creates 10 partitions
Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; first
using repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method
which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing coalesce(2) moves
data from just 2 nodes.
reparRdd = [Link](4)
print("re-partition count:"+str([Link]()))
#Outputs: "re-partition count:4
Note: repartition() or coalesce() methods also returns a new RDD.
Transformations on PySpark RDD returns another RDD and transformations are lazy meaning they don’t
execute until you call an action on RDD. Some transformations on RDD’s
are flatMap() , map() , reduceByKey() , filter() , sortByKey() and return new RDD instead of updating
the current.
flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the
below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a
single word on each record.
rdd2 = [Link](lambda x: [Link](" "))
map – map() transformation is used the apply any complex operations like adding a column, updating a column
e.t.c, the output of map transformations would always have the same number of records as input.
In our word count example, we are adding a new column with value 1 for each word, the result of the RDD
is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as
value.
rdd3 = [Link](lambda x: (x,1))
reduceByKey – reduceByKey() merges the values for each key with the function specified. In our example, it
reduces the word string by applying the sum function on value. The result of our RDD contains unique words and
their count.
rdd5 = [Link](lambda a,b: a+b)
sortByKey – sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert
RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an
integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value
pair
rdd6 = [Link](lambda x: (x[1],x[0])).sortByKey()
#Print rdd6 result to console
print([Link]())
filter – filter() transformation is used to filter the records in an RDD. In our example we are filtering all
words starts with “a”.
RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function
that returns non-RDD is considered as an action.
PairRDDFunctions or PairRDD – Pair RDD is a key-value pair This is mostly used RDD type,
ShuffledRDD –
DoubleRDD –
SequenceFileRDD –
HadoopRDD –
ParallelCollectionRDD –
Shuffling is a mechanism PySpark uses to redistribute the data across different executors and even across
machines. PySpark shuffling triggers when we perform certain transformation operations
like gropByKey(), reduceByKey(), join() on RDDS
• Disk I/O
• Involves data serialization and deserialization
• Network I/O
When creating an RDD, PySpark doesn’t necessarily store the data for all keys in a partition since at the time of
creation there is no way we can set the key for data set.
Hence, when we run the reduceByKey() operation to aggregate the data on keys, PySpark does the following. needs
to first run tasks to collect all the data from all partitions and
For example, when we perform reduceByKey() operation, PySpark does the following
• PySpark first runs map tasks on all partitions which groups all values for a
single key.
• The results of the map tasks are kept in memory.
• When results do not fit in memory, PySpark stores the data into a disk.
• PySpark shuffles the mapped data across partitions, some times it also stores
the shuffled data into a disk for reuse when it needs to recalculate.
• Run the garbage collection
• Finally runs reduce tasks on each partition based on key.
PySpark RDD triggers shuffle and repartition for several operations
like repartition() and coalesce(), groupByKey(), reduceByKey(), cogroup() and join() but not countByKey() .
PySpark Cache and Persist are optimization techniques to improve the performance of the RDD jobs
that are iterative and interactive
Using cache() and persist() methods, PySpark provides an optimization mechanism to store the intermediate
computation of an RDD so they can be reused in subsequent actions.
When you persist or cache an RDD, each worker node stores it’s partitioned data in memory or disk and reuses
them in other actions on that RDD. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition
is lost, it will automatically be recomputed using the original transformations that created it.
PySpark RDD cache() method by default saves RDD computation to storage level `MEMORY_ONLY`
meaning it will store the data in the JVM heap as unserialized objects.
PySpark cache() method in RDD class internally calls persist() method which in turn
uses [Link] to cache the result set of RDD. Let’s look at
an example.
cachedRdd = [Link]()
PySpark persist() method is used to store the RDD to one of the storage
levels MEMORY_ONLY ,MEMORY_AND_DISK , MEMORY_ONLY_SER , MEMORY_AND_
DISK_SER , DISK_ONLY , MEMORY_ONLY_2 ,MEMORY_AND_DISK_2 and more.
PySpark persist has two signature first signature doesn’t take any argument which by default saves it
to <strong>MEMORY_ONLY</strong> storage level and the second signature which
takes StorageLevel as an argument to store it to different storage levels.
import pyspark
dfPersist = [Link]([Link].MEMORY_ONLY)
[Link](false)
PySpark automatically monitors every persist() and cache() calls you make and it checks usage on each
node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also
manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all
blocks for it from memory and disk.
When PySpark executes transformation using map() or reduce() operations, It executes the transformations on a
remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark
Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve
this problem using the below two techniques. PySpark provides two types of shared variables.
• Broadcast variables (read-only shared variable)
• Accumulator variables (updatable shared variables)
PySpark parallelize() is a function in SparkContext and is used to create an RDD from a list collection.
import pyspark
from [Link] import SparkSession
spark = [Link]('[Link]').getOrCreate()
sparkContext=[Link]
rdd=[Link]([1,2,3,4,5])
rddCollect = [Link]()
print("Number of Partitions: "+str([Link]()))
print("Action: First element: "+str([Link]()))
print(rddCollect)
repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used
to only decrease the number of partitions in an efficient way.
One important point to note is, PySpark repartition() and coalesce() are very expensive operations as they shuffle
the data across many partitions hence try to minimize using these as much as possible.
rdd1 = [Link]((0,25), 6)
print("parallelize : "+str([Link]()))
rddFromFile = [Link]("src/main/resources/[Link]",10)
print("TextFile : "+str([Link]()))
[Link]("/tmp/partition")
//Writes 6 part files, one for each partition
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19
Spark RDD repartition() method is used to increase or decrease the partitions. The below example decreases the
partitions from 10 to 4 by moving data from all partitions.
rdd2 = [Link](4)
print("Repartition size : "+str([Link]()))
[Link]("/tmp/re-partition")
Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18
Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version
of repartition() where the movement of the data across the partitions is lower using coalesce.
rdd3 = [Link](4)
print("Repartition size : "+str([Link]()))
[Link]("/tmp/coalesce")
Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19
Like RDD, you can’t specify the partition/parallelism while creating DataFrame. DataFrame by default internally
uses the methods specified in Section 1 to determine the default partition and splits the data for parallelism.
import pyspark
from [Link] import SparkSession
spark = [Link]('[Link]') \
.master("local[5]").getOrCreate()
df=[Link](0,20)
print([Link]())
[Link]("overwrite").csv("c:/tmp/[Link]")
The above example creates 5 partitions as specified in master("local[5]") and the data is distributed across all
these 5 partitions.
Partition 1 : 0 1 2 3
Partition 2 : 4 5 6 7
Partition 3 : 8 9 10 11
Partition 4 : 12 13 14 15
Partition 5 : 16 17 18 19
5.14.5. DataFrame repartition()
Similar to RDD, the PySpark DataFrame repartition() method is used to increase or decrease the partitions. The
below example increases the partitions from 5 to 6 by moving data from all partitions.
df2 = [Link](6)
print([Link]())
Just increasing 1 partition results data movements from all partitions.
Partition 1 : 14 1 5
Partition 2 : 4 16 15
Partition 3 : 8 3 18
Partition 4 : 12 2 19
Partition 5 : 6 17 7 0
Partition 6 : 9 10 11 13
And, even decreasing the partitions also results in moving data from all partitions. hence when you wanted to
decrease the partition recommendation is to use coalesce()
Spark DataFrame coalesce() is used only to decrease the number of partitions. This is an optimized or improved
version of repartition() where the movement of the data across the partitions is fewer using coalesce.
df3 = [Link](2)
print([Link]())
Partition 1 : 0 1 2 3 8 9 10 11
Partition 2 : 4 5 6 7 12 13 14 15 16 17 18 19
Since we are reducing 5 to 2 partitions, the data movement happens only from 3 partitions and it moves to remain
2 partitions.
Calling groupBy() , union() , join() and similar functions on DataFrame results in shuffling data between
multiple executors and even machines and finally repartitions data into 200 partitions by default. PySpark default
defines shuffling partition to 200 using [Link] configuration.
You can manually create a PySpark DataFrame using toDF() and createDataFrame() methods, both these function
takes different signatures in order to create DataFrame from existing RDD, list, and DataFrame.
You can also create PySpark DataFrame from data sources like TXT, CSV, JSON, ORV, Avro, Parquet, XML formats by
reading from HDFS, S3, DBFS, Azure Blob file systems e.t.c.
Finally, PySpark DataFrame also can be created by reading data from RDBMS Databases and NoSQL databases.
createDataFrame(dataList) toDF(*cols)
createDataFrame(rowData,columns)
createDataFrame(dataList,schema)
columns = ["language","users_count"]
One easy way to manually create PySpark DataFrame is from an existing RDD. first, let’s create a Spark RDD from a
collection List by calling parallelize() function from SparkContext . We would need this rdd object for all our
examples below.
spark = [Link]('[Link]').getOrCreate()
rdd = [Link](data)
PySpark RDD’s toDF() method is used to create a DataFrame from the existing RDD. Since RDD doesn’t have
columns, the DataFrame is created with default column names “_1” and “_2” as we have two columns.
dfFromRDD1 = [Link]()
If you wanted to provide column names to the DataFrame use toDF() method with column names as arguments as
shown below.
columns = ["language","users_count"]
dfFromRDD1 = [Link](columns)
[Link]()
Using createDataFrame() from SparkSession is another way to create manually and it takes rdd object as
an argument. and chain with toDF() to specify name to the columns.
dfFromRDD2 = [Link](rdd).toDF(*columns)
we will see how to create PySpark DataFrame from a list. These examples would be similar to what we
have seen in the above section with RDD, but we use the list data object instead of “rdd” object to create
DataFrame.
Calling createDataFrame() from SparkSession is another way to create PySpark DataFrame manually, it
takes a list object as an argument. and chain with toDF() to specify names to the columns.
dfFromData2 = [Link](data).toDF(*columns)
6.2.2. Using createDataFrame() with the Row type
createDataFrame() has another signature in PySpark which takes the collection of Row type and schema
for column names as arguments. To use this first we need to convert our “data” object from the list to list
of Row.
rowData = map(lambda x: Row(*x), data)
dfFromData3 = [Link](rowData,columns)
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = [Link](data=data2,schema=schema)
[Link]()
[Link](truncate=False)
In real-time mostly you create DataFrame from data source files like CSV, Text, JSON, XML e.t.c.
PySpark by default supports many data formats out of the box without importing any libraries and to create
DataFrame you need to use the appropriate method available in DataFrameReader class.
6.4.1. Creating DataFrame from CSV
Use csv() method of the DataFrameReader object to create a DataFrame from CSV file. you can also
provide options like what delimiter to use, whether you have quoted data, date formats, infer schema, and
many more. Please refer PySpark Read CSV into DataFrame
df2 = [Link]("/src/resources/[Link]")
df2 = [Link]("/src/resources/[Link]")
df2 = [Link]("/src/resources/[Link]")
While working with files, sometimes we may not receive a file for processing, however, we still need to
create a DataFrame manually with the same schema we expect. If we don’t create with the same schema,
our operations/transformations (like union’s) on DataFrame fail as we refer to the columns that may not
present.
To handle situations similar to these, we always need to create a DataFrame with the same schema,
which means the same column names and datatypes regardless of the file exists or empty file processing.
#Diplays
#EmptyRDD[188] at emptyRDD
Alternatively you can also get empty RDD by using [Link]([]) .
In order to create an empty PySpark DataFrame manually with schema ( column names & data types) first, Create
a schema using StructType and StructField .
#Create Schema
from [Link] import StructType,StructField, StringType
schema = StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])
Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with
the schema for column names & data types.
You can also create empty DataFrame by converting empty RDD to DataFrame using toDF() .
So far I have covered creating an empty DataFrame from RDD, but here will create it manually with schema and
without RDD.
#Create empty DataFrame directly.
df2 = [Link]([], schema)
[Link]()
To create empty DataFrame with out schema (no columns) just create a empty schema and use it while creating
PySpark DataFrame.
structureData = [
(("James","","Smith"),"36636","M",3100),
(("Michael","Rose",""),"40288","M",4300),
(("Robert","","Williams"),"42114","M",1400),
(("Maria","Anne","Jones"),"39192","F",5500),
(("Jen","Mary","Brown"),"","F",-1)
]
structureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', IntegerType(), True)
])
df2 = [Link](data=structureData,schema=structureSchema)
[Link]()
[Link](truncate=False)
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
6.7.1. Adding & Changing struct of the DataFrame
[Link]()
[Link](truncate=False)
Here, it copies “gender“, “salary” and “id” to the new struct “otherInfo” and add’s a new column
“Salary_Grade“.
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- OtherInfo: struct (nullable = false)
| |-- identifier: string (nullable = true)
| |-- gender: string (nullable = true)
| |-- salary: integer (nullable = true)
| |-- Salary_Grade: string (nullable = false)
arrayStructureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('hobbies', ArrayType(StringType()), True),
StructField('properties', MapType(StringType(),StringType()), True)
])
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- hobbies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
print([Link]("firstname"))
print([Link](StructField("firstname",StringType,true)))
Before we start using it on RDD & DataFrame, let’s understand some basics of Row class.
Row class extends the tuple hence it takes variable number of arguments, Row() is used to create the row object.
Once the row object created, we can retrieve the data from Row using index similar to tuple.
row=Row(name="Alice", age=11)
print([Link])
We can also create a Row like class, for example “Person” and use it similar to Row object. This would be helpful
when you wanted to create real time object and refer it’s properties. On below example, we have created a Person
class and used similar to Row.
We can use Row class on PySpark RDD. When you use Row to create an RDD, after collecting the data you will get
the result back in Row.
data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"),
Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
rdd=[Link](data)
print([Link]())
This yields below output.
collData=[Link]()
for row in collData:
print([Link] + "," +str([Link]))
This yields below output.
Person=Row("name","lang","state")
data = [Person("James,,Smith",["Java","Scala","C++"],"CA"),
Person("Michael,Rose,",["Spark","Java","C++"],"NJ"),
Person("Robert,,Williams",["CSharp","VB"],"NV")]
6.10.4. Using Row class on PySpark DataFrame
Similarly, Row class also can be used with PySpark DataFrame, By default data in DataFrame represent as Row. To
demonstrate, I will use the same data that was created for RDD.
Note that Row on DataFrame is not allowed to omit a named argument to represent that the value is None or
missing. This should be explicitly set to None in this case.
df=[Link](data)
[Link]()
[Link]()
This yields below output. Note that DataFrame able to take the column names from Row object.
root
|-- name: string (nullable = true)
|-- lang: array (nullable = true)
| |-- element: string (containsNull = true)
|-- state: string (nullable = true)
You can also change the column names by using toDF() function
columns = ["name","languagesAtSchool","currentState"]
df=[Link](data).toDF(*columns)
[Link]()
This yields below output, note the column name “languagesAtSchool” from the previous example.
root
|-- name: string (nullable = true)
|-- languagesAtSchool: array (nullable = true)
| |-- element: string (containsNull = true)
|-- currentState: string (nullable = true)
The below example provides a way to create a struct type using the Row class. Alternatively, you can also create
struct type using By Providing Schema using PySpark StructType & StructFields
#Create DataFrame with struct using Row class
from [Link] import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=[Link](data)
[Link]()
Yields below schema
root
|-- name: string (nullable = true)
|-- prop: struct (nullable = true)
| |-- hair: string (nullable = true)
| |-- eye: string (nullable = true)
6.10.6. Complete Example of PySpark Row usage on RDD &
DataFrame
row=Row("James",40)
print(row[0] +","+str(row[1]))
row2=Row(name="Alice", age=11)
print([Link])
#PySpark Example
spark = [Link]('[Link]').getOrCreate()
data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"),
Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
#RDD Example 1
rdd=[Link](data)
collData=[Link]()
print(collData)
for row in collData:
print([Link] + "," +str([Link]))
# RDD Example 2
Person=Row("name","lang","state")
data = [Person("James,,Smith",["Java","Scala","C++"],"CA"),
Person("Michael,Rose,",["Spark","Java","C++"],"NJ"),
Person("Robert,,Williams",["CSharp","VB"],"NV")]
rdd=[Link](data)
collData=[Link]()
print(collData)
for person in collData:
print([Link] + "," +str([Link]))
#DataFrame Example 1
columns = ["name","languagesAtSchool","currentState"]
df=[Link](data)
[Link]()
[Link]()
collData=[Link]()
print(collData)
for row in collData:
print([Link] + "," +str([Link]))
#DataFrame Example 2
columns = ["name","languagesAtSchool","currentState"]
df=[Link](data).toDF(*columns)
[Link]()
[Link] class provides several functions to work with DataFrame to manipulate the Column
values, evaluate the boolean expression to filter rows, retrieve a value or part of a value from a DataFrame column,
and to work with list, map & struct columns.
Key Points:
• PySpark Column class represents a single Column in a Dat aFrame.
• It provides functions that are most used to manipulate DataFrame Columns &
Rows.
• Some of these Column functions evaluate a Boolean expression that can be used
with filter() transformation to filter the DataFrame Rows.
• Provides functions to get a value from a list column by index, map value by key &
index, and finally struct nested column.
• PySpark also provides additional functions [Link] that take Column
object and return a Column type.
•
Note: Most of the [Link] return Column type hence it is very important to know the operation
you can perform with Column type.
One of the simplest ways to create a Column class object is by using PySpark lit() SQL function , this takes a
literal value and returns a Column object.
data=[("James",23),("Ann",40)]
df=[Link](data).toDF("[Link]","gender")
[Link]()
#root
# |-- [Link]: string (nullable = true)
# |-- gender: long (nullable = true)
PySpark column also provides a way to do arithmetic operations on columns using operators.
data=[(100,2,1),(200,3,4),(300,4,4)]
df=[Link](data).toDF("col1","col2","col3")
#Arthmetic operations
[Link](df.col1 + df.col2).show()
[Link](df.col1 - df.col2).show()
[Link](df.col1 * df.col2).show()
[Link](df.col1 / df.col2).show()
[Link](df.col1 % df.col2).show()
Let’s see some of the most used Column Functions, on below table, I have grouped related functions together to
make it easy, click on the link for examples.
COLUMN FUNCTION FUNCTION DESCRIPTION
between(lowerBound, upperBound) Checks if the columns values are between lower and upper bound. Returns
boolean value.
bitwiseAND(other) Compute bitwise AND, OR & XOR of this expression with another expression
bitwiseOR(other) respectively.
bitwiseXOR(other)
isin(*cols) A boolean expression that is evaluated to true if the value of this expression is
contained by the evaluated values of the arguments.
when(condition, value) Similar to SQL CASE WHEN, Executes a list of conditions and returns one of
otherwise(value) multiple possible result expressions.
Let’s create a simple DataFrame to work with PySpark SQL Column examples. For most of the examples below, I will
be referring DataFrame object name (df.) to get the column.
data=[("James","Bond","100",None),
("Ann","Varsa","200",'F'),
("Tom Cruise","XXX","400",''),
("Tom Brand",None,"400",'M')]
columns=["fname","lname","id","gender"]
df=[Link](data,columns)
On below example [Link] refers to Column object and alias() is a function of the Column to give
alternate name. Here, fname column has been changed to first_name & lname to last_name.
On second example I have use PySpark expr() function to concatenate columns and named column
as fullName.
#alias
from [Link] import expr
[Link]([Link]("first_name"), \
[Link]("last_name")
).show()
#Another example
[Link](expr(" fname ||','|| lname").alias("fullName") \
).show()
#cast
[Link]([Link],[Link]("int")).printSchema()
7.4.4. between() – Returns a Boolean expression when a column
#between
[Link]([Link](100,300)).show()
#contains
[Link]([Link]("Cruise")).show()
#startswith, endswith()
[Link]([Link]("T")).show()
[Link]([Link]("Cruise")).show()
Refer to
#isNull & isNotNull
[Link]([Link]()).show()
[Link]([Link]()).show()
#like , rlike
[Link]([Link],[Link],[Link]) \
.filter([Link]("%om"))
the Column
[Link]([Link](1,2).alias("substr")).show()
7.4.10. when() & otherwise() – It is similar to SQL Case When,
#isin
li=["100","200"]
[Link]([Link],[Link],[Link]) \
.filter([Link](li)) \
.show()
Rest of the below functions operates on List, Map & Struct data structures hence to demonstrate these I will use
another DataFrame with list, map and struct columns. For more explanation how to use Arrays refer to PySpark
ArrayType Column on DataFrame Examples & for map refer to PySpark MapType Examples
schema = StructType([
StructField('name', StructType([
StructField('fname', StringType(), True),
StructField('lname', StringType(), True)])),
StructField('languages', ArrayType(StringType()),True),
StructField('properties', MapType(StringType(),StringType()),True)
])
df=[Link](data,schema)
[Link]()
#Display's to console
root
|-- name: struct (nullable = true)
| |-- fname: string (nullable = true)
| |-- lname: string (nullable = true)
|-- languages: array (nullable = true)
| |-- element: string (containsNull = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
getField Example
#getField from MapType
[Link]([Link]("hair")).show()
[Link]("firstname","lastname").show()
[Link]([Link],[Link]).show()
[Link](df["firstname"],df["lastname"]).show()
Examples
PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert
the datatype of an existing column, create a new column, and many more
PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL
expression, you can also use where() clause instead of the filter() if you are coming from an SQL background,
both these functions operate exactly the same.
[Link]([Link] == "OH") \
.show(truncate=False)
[Link](col("state") == "OH") \
.show(truncate=False)
[Link]("gender == 'M'") \
.show(truncate=False)
[Link](array_contains([Link],"Java")) \
.show(truncate=False)
[Link]([Link] == "Williams") \
.show(truncate=False)
PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame
and dropDuplicates() is used to drop rows based on selected (one or multiple) columns. In this article, you
will learn how to use distinct() and dropDuplicates() functions with PySpark example.
#Distinct
distinctDF = [Link]()
print("Distinct count: "+str([Link]()))
[Link](truncate=False)
#Drop duplicates
df2 = [Link]()
print("Distinct count: "+str([Link]()))
[Link](truncate=False)
#Drop duplicates on selected columns
dropDisDF = [Link](["department","salary"])
print("Distinct count of department salary : "+str([Link]()))
[Link](truncate=False)
}
You can use either sort() or orderBy() function of PySpark DataFrame to sort DataFrame by ascending or
descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions,
[Link]("department","state").show(truncate=False)
[Link](col("department"),col("state")).show(truncate=False)
[Link]("department","state").show(truncate=False)
[Link](col("department"),col("state")).show(truncate=False)
[Link]([Link](),[Link]()).show(truncate=False)
[Link](col("department").asc(),col("state").asc()).show(truncate=False)
[Link](col("department").asc(),col("state").asc()).show(truncate=False)
[Link]([Link](),[Link]()).show(truncate=False)
[Link](col("department").asc(),col("state").desc()).show(truncate=False)
[Link](col("department").asc(),col("state").desc()).show(truncate=False)
[Link]("EMP")
[Link]("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department
asc").show(truncate=False)
PySpark UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame that is used to
extend the PySpark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to
create and use it on DataFrame select(), withColumn() and SQL using PySpark (Spark with Python) examples.
Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential.
def convertCase(str):
resStr=""
arr = [Link](" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())
""" Converting function to UDF
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))
def upperCase(str):
return [Link]()
""" Using UDF on SQL """Registering PySpark UDF & use it on SQL
[Link]("convertUDF", convertCase,StringType())
[Link]("NAME_TABLE")
[Link]("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)
While working on PySpark DataFrame we often need to replace null values since certain operations on null value
return error hence, we need to graciously handle nulls as the first step before processing. Also, while writing to a
file, it’s always best practice to replace null values, not doing this result nulls on the output file.
PySpark provides [Link]() and [Link]() to replace NULL/None values. These two
are aliases of each other and returns the same results.
[Link](value="").show()
[Link](value="").show()
[Link]("unknown",["city"]) \
.fillna("",["type"]).show()
[Link]("unknown",["city"]) \
.[Link]("",["type"]).show()
import pyspark
from [Link] import SparkSession
from [Link] import approx_count_distinct,collect_list
from [Link] import collect_set,sum,avg,max,countDistinct,count
from [Link] import first, last, kurtosis, min, mean, skewness
from [Link] import stddev, stddev_samp, stddev_pop, sumDistinct
from [Link] import variance,var_samp, var_pop
spark = [Link]('[Link]').getOrCreate()
print("approx_count_distinct: " + \
str([Link](approx_count_distinct("salary")).collect()[0][0]))
[Link](collect_list("salary")).show(truncate=False)
[Link](collect_set("salary")).show(truncate=False)
print("count: "+str([Link](count("salary")).collect()[0]))
[Link](first("salary")).show(truncate=False)
[Link](last("salary")).show(truncate=False)
[Link](kurtosis("salary")).show(truncate=False)
[Link](max("salary")).show(truncate=False)
[Link](min("salary")).show(truncate=False)
[Link](mean("salary")).show(truncate=False)
[Link](skewness("salary")).show(truncate=False)
[Link](stddev("salary"), stddev_samp("salary"), \
stddev_pop("salary")).show(truncate=False)
[Link](sum("salary")).show(truncate=False)
[Link](sumDistinct("salary")).show(truncate=False)
[Link](variance("salary"),var_samp("salary"),var_pop("salary")) \
.show(truncate=False)
to_date(column, fmt) Converts the column into a `DateType` with a specified format
date_add(column, days) Returns the date that is `days` days after `start`
date_sub(column, days)
months_between(end, start) Returns number of months between dates `start` and `end`. A
whole number is returned if both inputs have the same day of
month or both are the last day of their respective months.
Otherwise, the difference is calculated assuming 31 days per
month.
PYSPARK DATE FUNCTION DATE FUNCTION DESCRIPTION
months_between(end, start, Returns number of months between dates `end` and `start`. If
roundOff) `roundOff` is set to true, the result is rounded off to 8 digits; it
is not rounded otherwise.
next_day(column, dayOfWeek) Returns the first date which is later than the value of the `date`
column that is on the specified day of the week.
For example, `next_day('2015-07-27', "Sunday")` returns 2015-
08-02 because that is the first Sunday after 2015-07-27.
trunc(column, format) Returns date truncated to the unit specified by the format.
For example, `trunc("2018-11-19 [Link]", "year")` returns
2018-01-01
format: 'year', 'yyyy', 'yy' to truncate by year,
'month', 'mon', 'mm' to truncate by month
date_trunc(format, timestamp) Returns timestamp truncated to the unit specified by the format.
For example, `date_trunc("year", "2018-11-19 [Link]")`
returns 2018-01-01 [Link]
format: 'year', 'yyyy', 'yy' to truncate by year,
'month', 'mon', 'mm' to truncate by month,
'day', 'dd' to truncate by day,
Other options are: 'second', 'minute', 'hour', 'week', 'month',
'quarter'
last_day(column) Returns the last day of the month which the given date belongs
to. For example, input "2015-07-27" returns "2015-07-31" since
July 31 is the last day of the month in July 2015.
unix_timestamp(column, p) Converts time string with given pattern to Unix timestamp (in
seconds).
to_timestamp(column, fmt) Converts time string with the given pattern to timestamp.
import pyspark
from [Link] import SparkSession
from [Link] import StructType,StructField, StringType, IntegerType
from [Link] import ArrayType, DoubleType, BooleanType
from [Link] import col,array_contains
spark = [Link]('[Link]').getOrCreate()
df = [Link]("/tmp/resources/[Link]")
[Link]()
df2 = [Link]("header",True) \
.csv("/tmp/resources/[Link]")
[Link]()
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True)
df_with_schema = [Link]("csv") \
.option("header", True) \
.schema(schema) \
.load(/tmp/resources/[Link]")
df_with_schema.printSchema()
[Link]("header",True) \
.csv("/tmp/spark_output/zipcodes123")
While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. As a
result aggregation queries consume less time compared to row-oriented databases.
Pyspark SQL provides support for both reading and writing Parquet files that automatically capture the schema of
the original data, It also reduces data storage by 75% on average. Pyspark by default supports Parquet in its library
hence we don’t need to add any dependency libraries.
df=[Link](data,columns)
[Link]("overwrite").parquet("/tmp/output/[Link]")
parDF1=[Link]("/tmp/output/[Link]")
[Link]("parquetTable")
[Link]()
[Link](truncate=False)
[Link]("gender","salary").mode("overwrite").parquet("/tmp/output/[Link]")
parDF2=[Link]("/tmp/output/[Link]/gender=M")
[Link](truncate=False)
df_with_schema = [Link](schema) \
.json("resources/[Link]")
df_with_schema.printSchema()
df_with_schema.show()
Apache Spark:
➡️ It is general purpose
➡️ in memory
➡️ compute engine
❗Compute Engine:
what does hadoop provides?
hadoop provides 3 things:
hdfs = storage
MapReduce = computation
YARN = Resource manager.
❗In Memory:
-- for each MapReduce job HDFS required 2-disc access i.e. onetime for reading and one time for writing.
-- but in Spark only one Io's disc is required which is initial read and final write.
-- spark is said to be 10 to 100 times faster than MapReduce.
General Purpose:
❗in hadoop we use Pig for cleaning.
❗hive for querying.
❗ for machine learning mahout
❗sqoop for database streaming data.
❗in mapreduce we only bound to use map and reduce.
❗but in spark everything is possible. like ❗whatever discussed above.
❗to achieve we just need to learn one style of code.
❗these are the reasons Spark is most preferred choice.
❗spark also provides filter too.
❗And this is called as General purpose compute.
❗The basic unit which holds the data in spark is called as RDD (Resilient Distributed Dataset).
PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the
dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually
after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.
Usually, collect() is used to retrieve the action output when you have very small result set and
calling collect() on an RDD/DataFrame with a bigger result set causes out of memory as it returns the entire
dataset (from all workers) to the driver hence we should avoid calling collect() on a larger dataset.
collect () vs select ()
select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas
collect() is an action that returns the entire data set in an Array to the driver.
Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on
DataFrame and perform aggregate functions on the grouped data. In this article, I will explain
several groupBy() examples using PySpark (Spark with Python).
When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains below
aggregate functions.
Similar to SQL “HAVING” clause, On PySpark DataFrame we can use either where() or filter() function to filter
the rows of aggregated data.
schema = ["employee_name","department","state","salary","age","bonus"]
[Link]("department").sum("salary").show(truncate=False)
[Link]("department").count().show(truncate=False)
[Link]("department","state") \
.sum("salary","bonus") \
.show(truncate=False)
[Link]("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus") \
) \
.show(truncate=False)
[Link]("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus")) \
.where(col("sum_bonus") >= 50000) \
.show(truncate=False)
PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it
supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT
OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. PySpark Joins are wider transformations that
involve data shuffling across the network .
PySpark SQL Joins comes with more optimization by default (thanks to DataFrames) however still there would be
some performance issues to consider while using.
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
deptColumns = ["dept_name","dept_id"]
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"outer") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"full") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"fullouter") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"left") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftouter") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"right") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"rightouter") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftsemi") \
.show(truncate=False)
[Link](deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftanti") \
.show(truncate=False)
[Link]("emp1").join([Link]("emp2"), \
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("[Link]"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("[Link]").alias("superior_emp_name")) \
.show(truncate=False)
[Link]("EMP")
[Link]("DEPT")
PySpark union() and unionAll() transformations are used to merge two or more DataFrame’s of the same schema or
structure. In this PySpark article, I will explain both union transformations with PySpark examples.
Dataframe union() – union() method of the DataFrame is used to merge two DataFrame’s of the same
structure/schema. If schemas are not the same it returns an error.
DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().
Note: In other SQL languages, Union eliminates the duplicates but UnionAll merges two datasets including
duplicate records. But, in PySpark both behave the same and recommend using DataFrame duplicate()
function to remove duplicate rows .
columns= ["employee_name","department","state","salary","age","bonus"]
columns2= ["employee_name","department","state","salary","age","bonus"]
unionDF = [Link](df2)
[Link](truncate=False)
disDF = [Link](df2).distinct()
[Link](truncate=False)
27. PySpark map() Transformation
PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every
element of RDD/DataFrame and returns a new RDD. In this article, you will learn the syntax and usage of the RDD
map() transformation with an example and how to use it with DataFrame.
RDD map() transformation is used to apply any complex operations like adding a column, updating a column,
transforming the data e.t.c, the output of map transformations would always have the same number of records as
input.
Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to DataFrame
to RDD first.
Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as
with mapPartitions() heavy initialization executes only once for each partition instead of every record.
rdd=[Link](data)
rdd2=[Link](lambda x: (x,1))
for element in [Link]():
print(element)
rdd2=[Link](lambda x:
(x[0]+","+x[1],x[2],x[3]*2)
)
df2=[Link](["name","gender","new_salary"] )
[Link]()
def func1(x):
firstName=[Link]
rdd2=[Link](lambda x: func1(x))
Partitioning the data on the file system is a way to improve the performance of the query when dealing with a large
dataset in the Data lake. A Data Lake is a centralized repository of structured, semi-structured, unstructured, and
binary data that allows you to store a large amount of data as-is in its original raw format.
1. What is PySpark Partition?
PySpark partition is a way to split a large dataset into smaller datasets based on one or more partition keys. When
you create a DataFrame from a file/table, based on certain parameters PySpark creates the DataFrame with a
certain number of partitions in memory. This is one of the main advantages of PySpark DataFrame over Pandas
DataFrame. Transformations on partitioned data run faster as they execute transformations parallelly for each
partition.
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).
Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce()
transformations.
Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data
based on columns using partitionBy() of [Link]. This is similar to Hives partitions scheme.
2. Partition Advantages
As you are aware PySpark is designed to process large datasets with 100x faster than the tradition processing, this
wouldn’t have been possible with out partition. Below are some of the advantages using PySpark partitions on
memory or on disk.
When creating partitions you have to be very cautious with the number of partitions you would create, as having
too many partitions creates too many sub-directories on HDFS which brings unnecessarily and overhead to
NameNode (if you are using Hadoop) since it must keep all metadata for the file system in memory.
Let’s assume you have a US census table that contains zip code, city, state, and other columns. Creating a partition
on the state, splits the table into around 50 partitions, when searching for a zipcode within a state (state=’CA’ and
zipCode =’92704′) results in faster as it needs to scan only in a state=CA partition directory.
Partition on zipcode may not be a good option as you might end up with too many partitions.
Another good example of partition is on the Date column. Ideally, you should partition on Year/Month but not on a
date.
[Link]("header",True) \
.partitionBy("state") \
.mode("overwrite") \
.csv("/tmp/zipcodes-state")
#partitionBy() multiple columns
[Link]("header",True) \
.partitionBy("state","city") \
.mode("overwrite") \
.csv("/tmp/zipcodes-state")
Using repartition() and partitionBy() together
For each partition column, if you wanted to further divide into several partitions,
use repartition() and partitionBy() together as explained in the below example.
repartition() creates specified number of partitions in memory. The partitionBy () will write files to disk for
each memory partition and partition column.
Reads are much faster on partitioned data. This code snippet retrieves the data from a specific
partition "state=AL and city=SPRINGVILLE" . Here, It just reads the data from that specific
folder instead of scanning a whole file (when not partitioned).
dfSinglePart=[Link]("header",True) \
.csv("c:/tmp/zipcodes-state/state=AL/city=SPRINGVILLE")
When creating partitions you have to be very cautious with the number of partitions you would create, as having
too many partitions creates too many sub-directories on HDFS which brings unnecessarily and overhead to
NameNode (if you are using Hadoop) since it must keep all metadata for the file system in memory.
Let’s assume you have a US census table that contains zip code, city, state, and other columns. Creating a partition
on the state, splits the table into around 50 partitions, when searching for a zipcode within a state (state=’CA’ and
zipCode =’92704′) results in faster as it needs to scan only in a state=CA partition directory.
Partition on zipcode may not be a good option as you might end up with too many partitions.
Another good example of partition is on the Date column. Ideally, you should partition on Year/Month but not on a
date.
29. Hive Table Types
By default, Hive creates an Internal table also known as the Managed table, In the managed table, Hive owns the
data/files on the table meaning any data you insert or load files to the table are managed by the Hive process when
you drop the table the underlying data or files are also get deleted.
Using EXTERNAL option you can create an external table, Hive doesn’t manage the external table, when you drop
an external table, only table metadata from Metastore will be removed but the underlying files will not be removed
and still they can be accessed via HDFS commands, Pig, Spark or any other Hadoop compatible tools.
Let’s see this in action by dropping the table emp.employee_external using DROP TABLE
emp.employee_external command and check if the file still exists by running above hdfs -ls command.
A temporary table is created using TEMPORARY option, these tables exist only within the current session, upon
exiting the session the temporary tables will be removed and cannot be accessed in another session.
Hive 4.0 supports another type of table called Transactional tables., Transactional Tables have support
ACID operations like Insert, Update and Delete operations.
A Databricks cluster is a set of computation resources that performs the heavy lifting of all of the data workloads
you run in Databricks. These workloads can be run as commands in notebooks, commands run from BI tools that
are connected to Databricks, or automated jobs that you’ve scheduled. Clusters perform the processing of these
workloads and then return results or save them out to data stores.
A cluster consists of multiple nodes (individual machines) that operate on your workloads in parallel. There is one
driver node for every cluster, which is the one that delegates tasks and oversees the execution of your specific
workload. There are also many worker nodes for every cluster that perform the processing. If a worker node in a
Databricks cluster is lost for any reason, the driver can reallocate remaining work to the remaining nodes.
At the left side are two columns indicating if the cluster has been pinned and the status of the cluster:
• Pinned
• Starting , Terminating
Standard cluster
• Running
• Terminated
High concurrency cluster
• Running
• Terminated
Access Denied
• Running
• Terminated
• Running
• Terminated
Databricks supports three cluster modes: Standard, High Concurrency, and Single Node. Most regular users use
Standard or Single Node clusters.
• Standard clusters are ideal for processing large amounts of data with Apache Spark.
• Single Node clusters are intended for jobs that use small amounts of data or non-distributed workloads
such as single-node machine learning libraries.
• High Concurrency clusters are ideal for groups of users who need to share resources or run ad-hoc jobs.
Administrators usually create High Concurrency clusters. Databricks recommends enabling autoscaling for
High Concurrency clusters.
Delta Lake is an open source project that enables building a Lakehouse architecture on top of data
lakes. Delta Lake provides ACID transactions, scalable metadata handling, and
unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
• ACID transactions on Spark: Serializable isolation levels ensure that readers never see inconsistent data.
• Scalable metadata handling: Leverages Spark distributed processing power to handle all the metadata for
petabyte-scale tables with billions of files at ease.
• Streaming and batch unification: A table in Delta Lake is a batch table as well as a streaming source and
sink. Streaming data ingest, batch historic backfill, interactive queries all just work out of the box.
• Schema enforcement: Automatically handles schema variations to prevent insertion of bad records during
ingestion.
• Time travel: Data versioning enables rollbacks, full historical audit trails, and reproducible machine
learning experiments.
• Upserts and deletes: Supports merge, update and delete operations to enable complex use cases like
change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, and so on.
31.1. Create a table
To create a Delta table, write a DataFrame out in the delta format. You can use existing Spark SQL code and
change the format from parquet, csv, json, and so on, to delta.
data = [Link](0, 5)
[Link]("delta").save("/tmp/delta-table")
You read data in your Delta table by specifying the path to the files: "/tmp/delta-table":
df = [Link]("delta").load("/tmp/delta-table")
[Link]()
Delta Lake supports several operations to modify tables using standard DataFrame APIs. This example runs a batch
job to overwrite the data in the table:
data = [Link](5, 10)
[Link]("delta").mode("overwrite").save("/tmp/delta-table")
deltaTable = [Link](spark, "/tmp/delta-table")
[Link]("oldData") \
.merge(
[Link]("newData"),
"[Link] = [Link]") \
.whenMatchedUpdate(set = { "id": col("[Link]") }) \
.whenNotMatchedInsert(values = { "id": col("[Link]") }) \
.execute()
[Link]().show()
You can also write to a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-
once processing, even when there are other streams or batch queries running concurrently against the table. By
default, streams run in append mode, which adds new records to the table:
streamingDf = [Link]("rate").load()
stream = [Link]("value as
id").[Link]("delta").option("checkpointLocation",
"/tmp/checkpoint").start("/tmp/delta-table")
You can stop the stream by running [Link]() in the same terminal that started the stream.
Delta Lake supports most of the options provided by Apache Spark DataFrame read and write APIs for performing
batch reads and writes on tables.
Delta Lake supports creating two types of tables—tables defined in the metastore and tables defined by path.
To work with metastore-defined tables, you must enable integration with Apache Spark DataSourceV2 and Catalog
APIs by setting configurations when you create a new SparkSession. See Configure SparkSession.
• SQL DDL commands: You can use standard SQL DDL commands supported in Apache Spark (for
example, CREATE TABLE and REPLACE TABLE) to create Delta tables.
CREATE TABLE IF NOT EXISTS default.people10m (
id INT,
firstName STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
SQL also supports creating a table at a path, without creating an entry in the Hive metastore.
-- Create or replace table with path
CREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (
id INT,
firstName STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
• DataFrameWriter API: If you want to simultaneously create a table and insert data into it from Spark
DataFrames or Datasets, you can use the Spark DataFrameWriter (Scala or Java and Python).
# Create table in the metastore using DataFrame's schema and write data to it
[Link]("delta").saveAsTable("default.people10m")
# Create or replace partitioned table with path using DataFrame's schema and write/overwrite
data to it
[Link]("delta").mode("overwrite").save("/tmp/delta/people10m")
You can also create Delta tables using the Spark DataFrameWriterV2 API.
• DeltaTableBuilder API: You can also use the DeltaTableBuilder API in Delta Lake to create
tables. Compared to the DataFrameWriter APIs, this API makes it easier to specify additional information
like column comments, table properties, and generated columns.
This feature is new and is in Preview.
# Create table in the metastore
[Link](spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
You can partition data to speed up queries or DML that have predicates involving the partition columns. To
partition data when you create a Delta table, specify a partition by columns. The following example partitions by
gender.
-- Create table in the metastore
CREATE TABLE default.people10m (
id INT,
firstName STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
To determine whether a table contains a specific partition, use the statement
For tables defined in the metastore, you can optionally specify the LOCATION as a path. Tables created with a
specified LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is
specified, an unmanaged table’s files are not deleted when you DROP the table.
When you run CREATE TABLE with a LOCATION that already contains data stored using Delta Lake, Delta
Lake does the following:
• If you specify only the table name and location, for example:
the table in the metastore automatically inherits the schema, partitioning, and table properties of the existing data.
This functionality can be used to “import” data into the metastore.
• If you specify any configuration (schema, partitioning, or table properties), Delta Lake verifies that the
specification exactly matches the configuration of the existing data.
Important: If the specified configuration does not exactly match the configuration of the data, Delta Lake throws
an exception that describes the discrepancy.
Note: The metastore is not the source of truth about the latest information of a Delta table. In fact, the table
definition in the metastore may not contain all the metadata like schema and properties. It contains the location of
the table, and the table’s transaction log at the location is the source of truth. If you query the metastore from a
system that is not aware of this Delta-specific customization, you may see incomplete or stale table information.
Delta Lake supports generated columns which are a special type of columns whose values are automatically
generated based on a user-specified function over other columns in the Delta table. When you write to a table with
generated columns and you do not explicitly provide values for them, Delta Lake automatically computes the
values. For example, you can automatically generate a date column (for partitioning the table by date) from the
timestamp column; any writes into the table need only specify the data for the timestamp column. However, if you
explicitly provide values for them, the values must satisfy
the constraint (<value> <=> <generation expression>) IS TRUE or the write will fail with an
error.
Important
Tables created with generated columns have a higher table writer protocol version than the default. See Table
protocol versioning to understand table protocol versioning and what it means to have a higher version of a
table protocol version.
The following example shows how to create a table with generated columns:
[Link](spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
Generated columns are stored as if they were normal columns. That is, they occupy storage.
• A generation expression can use any SQL functions in Spark that always return the same result when given
the same argument values, except the following types of functions:
o User-defined functions.
o Aggregate functions.
o Window functions.
o Functions returning multiple rows.
• For Delta Lake 1.1.0 and above, MERGE operations support generated columns when you
set [Link] to true.
Delta Lake may be able to generate partition filters for a query whenever a partition column is defined by one of
the following expressions:
If a partition column is defined by one of the preceding expressions, and a query filters data using the underlying
base column of a generation expression, Delta Lake looks at the relationship between the base column and the
generated column, and populates partition filters based on the generated partition column if possible. For example,
given the following table:
[Link](spark) \
.tableName("[Link]") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
.partitionedBy("eventType", "eventDate") \
.execute()
Delta Lake automatically generates a partition filter so that the preceding query only reads the data in
partition date=2020-10-01 even if a partition filter is not specified.
As another example, given the following table:
[Link](spark) \
.tableName("[Link]") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
.addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
.addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
.partitionedBy("eventType", "year", "month", "day") \
.execute()
If you then run the following query:
[Link]('SELECT * FROM [Link] WHERE eventTime >= "2020-10-01 [Link]" <= "2020-10-
01 [Link]"')
Delta Lake automatically generates a partition filter so that the preceding query only reads the data in
partition year=2020/month=10/day=01 even if a partition filter is not specified.
You can use an EXPLAIN clause and check the provided plan to see whether Delta Lake automatically generates
any partition filters.
By default, special characters such as spaces and any of the characters ,;{}()\n\t= are not supported in table
column names. To include these special characters in a table’s column name, enable column mapping.
You can load a Delta table as a DataFrame by specifying a table name or a path:
SELECT * FROM default.people10m -- query table in the metastore
Delta Lake time travel allows you to query an older snapshot of a Delta table. Time travel has many use cases,
including:
• Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could
be useful for debugging or auditing, especially in regulated industries.
• Writing complex temporal queries.
• Fixing mistakes in your data.
• Providing snapshot isolation for a set of queries for fast changing tables.
31.7.1. Syntax
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of
the table.
df1 = [Link]("delta").option("timestampAsOf",
timestamp_string).load("/tmp/delta/people10m")
df2 = [Link]("delta").option("versionAsOf", version).load("/tmp/delta/people10m")
For timestamp_string, only date or timestamp strings are accepted. For example, "2019-01-
01" and "2019-01-01T[Link].000Z".
A common pattern is to use the latest state of the Delta table throughout the execution of a job to update
downstream applications.
Because Delta tables auto update, a DataFrame loaded from a Delta table may return different results across
invocations if the underlying data is updated. By using time travel, you can fix the data returned by the DataFrame
across invocations:
history = [Link]("DESCRIBE HISTORY delta.`/tmp/delta/people10m`")
latest_version = [Link]("max(version)").collect()
df = [Link]("delta").option("versionAsOf",
latest_version[0][0]).load("/tmp/delta/people10m")
31.7.2. Examples
• Query the number of new customers added over the last week.
last_week = [Link]("SELECT CAST(date_sub(current_date(), 7) AS STRING)").collect()[0][0]
df = [Link]("delta").option("timestampAsOf", last_week).load("/tmp/delta/events")
last_week_count = [Link]("userId").distinct().count()
count =
[Link]("delta").load("/tmp/delta/events").select("userId").distinct().count()
new_customers_count = count - last_week_count
To time travel to a previous version, you must retain both the log and the data files for that version.
The data files backing a Delta table are never deleted automatically; data files are deleted only when you
run VACUUM. VACUUM does not delete Delta log files; log files are automatically cleaned up after checkpoints
are written.
By default you can time travel to a Delta table up to 30 days old unless you have:
Note
Due to log entry cleanup, instances can arise where you cannot time travel to a version that is less than the
retention interval. Delta Lake requires all consecutive log entries since the previous checkpoint to time travel to a
particular version. For example, with a table initially consisting of log entries for versions [0, 19] and a checkpoint
at verison 10, if the log entry for version 0 is cleaned up, then you cannot time travel to versions [1, 9]. Increasing
the table property [Link] can help avoid these situations.
31.8.1. Append
To atomically add new data to an existing Delta table, use append mode:
INSERT INTO default.people10m SELECT * FROM morePeople
31.8.2. Overwrite
Using DataFrames, you can also selectively overwrite only the data that matches an arbitrary expression. This
feature is available in Delta Lake 1.1.0 and above. The following command atomically replaces events in January in
the target table, which is partitioned by start_date, with the data in df:
[Link] \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
.save("/tmp/delta/events")
This sample code writes out the data in df, validates that it all matches the predicate, and performs an atomic
replacement. If you want to write out data that doesn’t all match the predicate, to replace the matching rows in
the target table, you can disable the constraint check by setting
[Link] to false:
[Link]("[Link]", False)
In Delta Lake 1.0.0 and below, replaceWhere overwrites data matching a predicate over partition columns
only. The following command atomically replaces the month in January in the target table, which is partitioned
by date, with the data in df:
[Link] \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
.save("/tmp/delta/people10m")
In Delta Lake 1.1.0 and above, if you want to fall back to the old behavior, you can disable the
[Link] flag:
[Link]("[Link]", False)
Dynamic Partition Overwrites
Delta Lake 2.0 and above supports dynamic partition overwrite mode for partitioned tables.
When in dynamic partition overwrite mode, we overwrite all existing data in each logical partition for which the
write will commit new data. Any existing logical partitions for which the write does not contain data will remain
unchanged. This mode is only applicable when data is being written in overwrite mode:
either INSERT OVERWRITE in SQL, or a DataFrame write with [Link]("overwrite").
Configure dynamic partition overwrite mode by setting the Spark session configuration
[Link] to dynamic.
Dynamic partition overwrite conflicts with the option replaceWhere for partitioned tables.
If dynamic partition overwrite is enabled in the Spark session configuration, and replaceWhere is provided as
a DataFrameWriter option, data will be overwritten according to the replaceWhere expression (query-
specific options override session configurations).
If both dynamic partition overwrite and replaceWhere are enabled in the DataFrameWriter options, an
error will be thrown.
Important: Validate that the data being written with dynamic partition overwrite touches only the expected
partitions. A single row in the incorrect partition can lead to unintentionally overwritting an entire partition. We
strongly recommend using replaceWhere to explicitly specify which data to overwrite.
If a partition has been accidentally overwritten, you can use Restore a Delta table to an earlier state to
undo the change.
You can use the SQL session configuration [Link] to specify the
maximum number of records to write to a single file for a Delta Lake table. Specifying a value of zero or a negative
value represents no limit.
You can also use the DataFrameWriter option maxRecordsPerFile when using the DataFrame APIs to write
to a Delta Lake table. When maxRecordsPerFile is specified, the value of the SQL session
configuration [Link] is ignored.
[Link]("delta") \
.mode("append") \
.option("maxRecordsPerFile", "10000") \
.save("/tmp/delta/people10m")
Sometimes a job that writes data to a Delta table is restarted due to various reasons (for example, job encounters a
failure). The failed job may or may not have written the data to Delta table before terminating. In the case where
the data is written to the Delta table, the restarted job writes the same data to the Delta table which results in
duplicate data.
To address this, Delta tables support the following DataFrameWriter options to make the writes idempotent:
• txnAppId: A unique string that you can pass on each DataFrame write. For example, this can be the
name of the job.
• txnVersion: A monotonically increasing number that acts as transaction version. This number needs to
be unique for data that is being written to the Delta table(s). For example, this can be the epoch seconds of
the instant when the query is attempted for the first time. Any subsequent restarts of the same job needs
to have the same value for txnVersion.
The above combination of options needs to be unique for each new data that is being ingested into the Delta table
and the txnVersion needs to be higher than the last data that was ingested into the Delta table. For example:
Warning
This solution assumes that the data being written to Delta table(s) in multiple retries of the job is same. If a write
attempt in a Delta table succeeds but due to some downstream failure there is a second write attempt with same
txn options but different data, then that second write attempt will be ignored. This can cause unexpected results.
Example
You can specify user-defined strings as metadata in commits made by these operations, either using the
DataFrameWriter option userMetadata or the SparkSession
configuration [Link] . If both of them have been
specified, then the option takes preference. This user-defined metadata is readable in the history operation.
SET [Link]=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople
Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema
of the table. Delta Lake uses the following rules to determine whether a write from a DataFrame to a table is
compatible:
• All DataFrame columns must exist in the target table. If there are columns in the DataFrame not present in
the table, an exception is raised. Columns present in the table but not in the DataFrame are set to null.
• DataFrame column data types must match the column data types in the target table. If they don’t match,
an exception is raised.
• DataFrame column names cannot differ only by case. This means that you cannot have columns such as
“Foo” and “foo” defined in the same table. While you can use Spark in case sensitive or insensitive (default)
mode, Parquet is case sensitive when storing and returning column information. Delta Lake is case-
preserving but insensitive when storing the schema and has this restriction to avoid potential mistakes,
data corruption, or loss issues.
Delta Lake support DDL to add new columns explicitly and the ability to update schema automatically.
If you specify other options, such as partitionBy, in combination with append mode, Delta Lake validates that
they match and throws an error for any mismatch. When partitionBy is not present, appends automatically
follow the partitioning of the existing data.
Delta Lake lets you update the schema of a table. The following types of changes are supported:
Important
When you update a Delta table schema, streams that read from that table terminate. If you want the stream to
continue you must restart it.
31.10.1. Explicitly update schema
You can use the following DDL to explicitly change the schema of a table.
Add columns
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER
colA_name], ...)
By default, nullability is true.
Adding nested columns is supported only for structs. Arrays and maps are not supported.
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment]
[FIRST|AFTER colA_name]
To change a column in a nested field, use:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type
[COMMENT col_comment] [FIRST|AFTER colA_name]
Example
Replace columns
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Example
Rename columns
Note: This feature is available in Delta Lake 1.2.0 and above. This feature is currently experimental.
To rename columns without rewriting any of the columns’ existing data, you must enable column mapping for the
table. See enable column mapping.
To rename a column:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Drop columns
Note: This feature is available in Delta Lake 2.0 and above. This feature is currently experimental.
To drop columns as a metadata-only operation without rewriting any data files, you must enable column
mapping for the table. See enable column mapping.
Important: Dropping a column from metadata does not delete the underlying data for the column in files.
To drop a column:
ALTER TABLE table_name DROP COLUMN col_name
You can change a column’s type or name or drop a column by rewriting the table. To do this, use
the overwriteSchema option:
[Link](...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)
Delta Lake can automatically update the schema of a table as part of a DML transaction (either appending or
overwriting), and make the schema compatible with the data being written.
Add columns
Columns that are present in the DataFrame but missing from the table are automatically added as part of a write
transaction when:
NullType columns
Because Parquet doesn’t support NullType, NullType columns are dropped from the DataFrame when
writing into Delta tables, but are still stored in the schema. When a different data type is received for that column,
Delta Lake merges the schema to the new data type. If Delta Lake receives a NullType for an existing column,
the old schema is retained and the new column is dropped during the write.
NullType in streaming is not supported. Since you must set schemas when using streaming this should be very
rare. NullType is also not accepted for complex types such as ArrayType and MapType.
By default, overwriting the data in a table does not overwrite the schema. When overwriting a table
using mode("overwrite") without replaceWhere, you may still want to overwrite the schema of the data
being written. You replace the schema and partitioning of the table by setting the overwriteSchema option
to true:
[Link]("overwriteSchema", "true")
Delta Lake supports the creation of views on top of Delta tables just like you might with a data source table.
The core challenge when you operate with views is resolving the schemas. If you alter a Delta table schema, you
must recreate derivative views to account for any additions to the schema. For instance, if you add a new column
to a Delta table, you must make sure that this column is available in the appropriate views built on top of that base
table.
You can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER. You can
then SHOW that metadata. For example:
ALTER TABLE default.people10m SET TBLPROPERTIES ('department' = 'accounting',
'[Link]' = 'true');
In addition, to tailor behavior and performance, Delta Lake supports certain Delta table properties:
Note
Modifying a Delta table property is a write operation that will conflict with other concurrent write
operations, causing them to fail. We recommend that you modify a table property only when there are no
concurrent write operations on the table.
You can also set delta.-prefixed properties during the first commit to a Delta table using Spark configurations.
For example, to initialize a Delta table with the property [Link]=true, set the Spark
configuration [Link] to true. For
example:
[Link]("SET [Link] = true")
• DESCRIBE DETAIL
• DESCRIBE HISTORY
Provides information about schema, partitioning, table size, and so on. For details, see Retrieve Delta table
details.
Provides provenance information, including the operation, user, and so on, and operation metrics for each write to
a table. Table history is retained for 30 days. For details, see Retrieve Delta table history.
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("[Link]", "[Link]") \
.config("[Link].spark_catalog",
"[Link]") \
.getOrCreate()
Alternatively, you can add configurations when submitting your Spark application using spark-submit or when
starting spark-shell or pyspark by specifying them as command-line parameters.
Bash
Delta Lake uses Hadoop FileSystem APIs to access the storage systems. The credentails for storage systems usually
can be set through Hadoop configurations. Delta Lake provides multiple ways to set Hadoop configurations similar
to Apache Spark.
When you start a Spark application on a cluster, you can set the Spark configurations in the form
of [Link].* to pass your custom Hadoop configurations. For example, Setting a value
for [Link].a.b.c will pass the value as a Hadoop configuration a.b.c, and Delta Lake will use it to
access Hadoop FileSystem APIs.
Spark SQL will pass all of the current SQL session configurations to Delta Lake, and Delta Lake will use them to
access Hadoop FileSystem APIs. For example, SET a.b.c=x.y.z will tell Delta Lake to pass the
value x.y.z as a Hadoop configuration a.b.c, and Delta Lake will use it to access Hadoop FileSystem APIs.
Besides setting Hadoop file system configurations through the Spark (cluster) configurations or SQL session
configurations, Delta supports reading Hadoop file system configurations
from DataFrameReader and DataFrameWriter options (that is, option keys that start with
the fs. prefix) when the table is read or written, by
using [Link](path) or [Link](path).
For example, you can pass your storage credentails through DataFrame options:
df1 = [Link]("delta") \
.option("[Link].<storage-account-name>.[Link]", "<storage-
account-access-key-1>") \
.read("...")
df2 = [Link]("delta") \
.option("[Link].<storage-account-name>.[Link]", "<storage-
account-access-key-2>") \
.read("...")
[Link](df2).[Link]("delta") \
.mode("overwrite") \
.option("[Link].<storage-account-name>.[Link]", "<storage-
account-access-key-3>") \
.save("...")
• Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs)
• Efficiently discovering which files are new when using files as the source for a stream
For many Delta Lake operations on tables, you enable integration with Apache Spark DataSourceV2 and Catalog
APIs (since 3.0) by setting configurations when you create a new SparkSession. See Configure
SparkSession.
When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data
present in the table as well as any new data that arrives after the stream is started.
[Link]("delta")
.load("/tmp/delta/events")
import [Link]._
[Link]("/tmp/delta/events")
• maxFilesPerTrigger: How many new files to be considered in every micro-batch. The default is
1000.
• maxBytesPerTrigger: How much data gets processed in each micro-batch. This option sets a “soft
max”, meaning that a batch processes approximately this amount of data and may process more than the
limit in order to make the streaming query move forward in cases when the smallest input unit is larger
than this limit. If you use [Link] for your streaming, this option is ignored. This is not set by
default.
Note
In cases when the source table transactions are cleaned up due to
the logRetentionDuration configuration and the stream lags in processing, Delta Lake processes the
data corresponding to the latest available transaction history of the source table but does not fail the stream. This
can result in data being dropped.
Structured Streaming does not handle input that is not an append and throws an exception if any modifications
occur on the table being used as a source. There are two main strategies for dealing with changes that cannot be
automatically propagated downstream:
• You can delete the output and checkpoint and restart the stream from the beginning.
• You can set either of these two options:
o ignoreDeletes: ignore transactions that delete data at partition boundaries.
o ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data
changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE.
Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle
duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes.
Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates
to the source table.
Example
For example, suppose you have a table user_events with date, user_email, and action columns that is
partitioned by date. You stream out of the user_events table and you need to delete data from it due to
GDPR.
When you delete at partition boundaries (that is, the WHERE is on a partition column), the files are already
segmented by value so the delete just drops those files from the metadata. Thus, if you just want to delete data
from some partitions, you can use:
[Link]("delta")
.option("ignoreDeletes", "true")
.load("/tmp/delta/user_events")
However, if you have to delete data based on user_email, then you will need to use:
[Link]("delta")
.option("ignoreChanges", "true")
.load("/tmp/delta/user_events")
If you update a user_email with the UPDATE statement, the file containing the user_email in question is
rewritten. When you use ignoreChanges, the new record is propagated downstream with all other unchanged
records that were in the same file. Your logic should be able to handle these incoming duplicate records.
You can use the following options to specify the starting point of the Delta Lake streaming source without
processing the entire table.
• startingVersion: The Delta Lake version to start from. All table changes starting from this version
(inclusive) will be read by the streaming source. You can obtain the commit versions from
the version column of the DESCRIBE HISTORY command output.
• To return only the latest changes, specify latest.
• startingTimestamp: The timestamp to start from. All table changes committed at or after the
timestamp (inclusive) will be read by the streaming source. One of:
• A timestamp string. For example, "2019-01-01T[Link].000Z".
• A date string. For example, "2019-01-01".
You cannot set both options at the same time; you can use only one of them. They take effect only when starting a
new streaming query. If a streaming query has started and the progress has been recorded in its checkpoint, these
options are ignored.
Important
Although you can start the streaming source from a specified version or timestamp, the schema of the streaming
source is always the latest schema of the Delta table. You must ensure there is no incompatible schema change to
the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect
results when reading the data with an incorrect schema.
Example
For example, suppose you have a table user_events. If you want to read changes since version 5, use:
[Link]("delta")
.option("startingVersion", "5")
.load("/tmp/delta/user_events")
You can also write data into a Delta table using Structured Streaming. The transaction log enables Delta Lake to
guarantee exactly-once processing, even when there are other streams or batch queries running concurrently
against the table.
Note
The Delta Lake VACUUM function removes all files not managed by Delta Lake but skips any directories that begin
with _. You can safely store checkpoints alongside other data and metadata for a Delta table using a directory
structure such as <table_name>/_checkpoints.
By default, streams run in append mode, which adds new records to the table.
You can also use Structured Streaming to replace the entire table with every batch. One example use case is to
compute a summary using aggregation:
([Link]
.format("delta")
.load("/tmp/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.start("/tmp/delta/eventsByCustomer")
)
The preceding example continuously updates a table that contains the aggregate number of events by customer.
For applications with more lenient latency requirements, you can save computing resources with one-time triggers.
Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived
since the last update.
Note
The command foreachBatch allows you to specify a function that is executed on the output of every micro-
batch after arbitrary transformations in the streaming query. This allows implementating
a foreachBatch function that can write the micro-batch output to one or more target Delta table
destinations. However, foreachBatch does not make those writes idempotent as those write attempts lack
the information of whether the batch is being re-executed or not. For example, rerunning a failed batch could
result in duplicate data writes.
To address this, Delta tables support the following DataFrameWriter options to make the writes
idempotent:
• txnAppId: A unique string that you can pass on each DataFrame write. For example,
you can use the StreamingQuery ID as txnAppId.
• txnVersion: A monotonically increasing number that acts as transaction version.
Delta table uses the combination of txnAppId and txnVersion to identify duplicate writes and ignore them.
If a batch write is interrupted with a failure, rerunning the batch uses the same application and batch ID, which
would help the runtime correctly identify duplicate writes and ignore them. Application ID (txnAppId) can be
any user-generated unique string and does not have to be related to the stream ID.
Warning
If you delete the streaming checkpoint and restart the query with a new checkpoint, you must provide a
different appId; otherwise, writes from the restarted query will be ignored because it will contain the
same txnAppId and the batch ID would start from 0.
The same DataFrameWriter options can be used to achieve the idempotent writes in non-Streaming job. For
details _.
31.20.1. Example
Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.
You can remove data that matches a predicate from a Delta table. For instance, in a table named people10m or a
path at /tmp/delta/people-10m, to delete all rows corresponding to people with a value in
the birthDate column from before 1955, you can run the following:
DELETE FROM people10m WHERE birthDate < '1955-01-01'
Important
delete removes the data from the latest version of the Delta table but does not remove it from the physical
storage until the old versions are explicitly vacuumed. See vacuum for details.
Tip
When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can
significantly speed up the operation.
You can update data that matches a predicate in a Delta table. For example, in a table named people10m or a
path at /tmp/delta/people-10m, to change an abbreviation in the gender column
from M or F to Male or Female, you can run the following:
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
Similar to delete, update operations can get a significant speedup with predicates on partitions.
You can upsert data from a source table, view, or DataFrame into a target Delta table by using the MERGE SQL
operation. Delta Lake supports inserts, updates and deletes in MERGE, and it supports extended syntax beyond the
SQL standards to facilitate advanced use cases.
Suppose you have a source table named people10mupdates or a source path at /tmp/delta/people-
10m-updates that contains new data for a target table named people10m or a target path
at /tmp/delta/people-10m. Some of these new records may already be present in the target data. To
merge the new data, you want to update rows where the person’s id is already present and insert the new rows
where no matching id is present. You can run the following:
MERGE INTO people10m
USING people10mupdates
ON [Link] = [Link]
WHEN MATCHED THEN
UPDATE SET
id = [Link],
firstName = [Link],
middleName = [Link],
lastName = [Link],
gender = [Link],
birthDate = [Link],
ssn = [Link],
salary = [Link]
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
[Link],
[Link],
[Link],
[Link],
[Link],
[Link],
[Link],
[Link]
)
See Configure SparkSession for the steps to enable support for SQL commands.
See the Delta Lake APIs for Scala, Java, and Python syntax details.
Delta Lake merge operations typically require two passes over the source data. If your source data contains
nondeterministic expressions, multiple passes on the source data can produce different rows causing incorrect
results. Some common examples of nondeterministic expressions include
the current_date and current_timestamp functions. If you cannot avoid using non-deterministic
functions, consider saving the source data to storage, for example as a temporary Delta table. Caching the source
data may not address this issue, as cache invalidation can cause the source data to be recomputed partially or
completely (for example when a cluster loses some of it executors when scaling down).
merge automatically validates that the schema of the data generated by insert and update expressions are
compatible with the schema of the table. It uses the following rules to determine whether the merge operation is
compatible:
• For update and insert actions, the specified target columns must exist in the target Delta table.
• For updateAll and insertAll actions, the source dataset must have all the columns of the target
Delta table. The source dataset can have extra columns and they are ignored.
• If you do not want the extra columns to be ignored and instead want to update the target table schema to
include new columns, see Automatic schema evolution.
• For all actions, if the data type generated by the expressions producing the target columns are different
from the corresponding columns in the target Delta table, merge tries to cast them to the types in the
table.
By default, updateAll and insertAll assign all the columns in the target Delta table with columns of the
same name from the source dataset. Any columns in the source dataset that don’t match columns in the target
table are ignored. However, in some use cases, it is desirable to automatically add source columns to the target
Delta table. To automatically update the table schema during a merge operation
with updateAll and insertAll (at least one of them), you can set the Spark session
configuration [Link] to true before running
the merge operation.
Note
• Schema evolution occurs only when there is either an updateAll (UPDATE SET *) or
an insertAll (INSERT *) action, or both.
• update and insert actions cannot explicitly refer to target columns that do not already exist in the
target table (even it there are updateAll or insertAll as one of the clauses). See the examples
below.
You can reduce the time taken by merge using the following approaches:
• Reduce the search space for matches: By default, the merge operation searches the entire Delta
table to find matches in the source table. One way to speed up merge is to reduce the search space by
adding known constraints in the match condition. For example, suppose you have a table that is partitioned
by country and date and you want to use merge to update information for the last day and a specific
country. Adding the condition
• [Link] = current_date() AND [Link] = 'USA'
• will make the query faster as it looks for matches only in the relevant partitions. Furthermore, it will also
reduce the chances of conflicts with other concurrent operations. See Concurrency control for more
details.
• Compact files: If the data is stored in many small files, reading the data to search for matches can
become slow. You can compact small files into larger files to improve read throughput. See Compact
files for details.
• Control the shuffle partitions for writes: The merge operation shuffles data multiple times to
compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session
configuration [Link]. Setting this parameter not only controls the
parallelism but also determines the number of output files. Increasing the value increases parallelism but
also generates a larger number of smaller data files.
• Repartition output data before write: For partitioned tables, merge can produce a much larger
number of small files than the number of shuffle partitions. This is because every shuffle task can write
multiple files in multiple partitions, and can become a performance bottleneck. In many cases, it helps to
repartition the output data by the table’s partition columns before writing it. You enable this by setting the
Spark session
configuration [Link] t
o true.
A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources
can generate duplicate log records and downstream deduplication steps are needed to take care of them.
With merge, you can avoid inserting the duplicate records.
MERGE INTO logs
USING newDedupedLogs
ON [Link] = [Link]
WHEN NOT MATCHED
THEN INSERT *
Note
The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it
matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within
the new dataset, it is inserted. Hence, deduplicate the new data before merging into the table.
If you know that you may get duplicate records only for a few days, you can optimized your query further by
partitioning the table by date, and then specifying the date range of the target table to match on.
MERGE INTO logs
USING newDedupedLogs
ON [Link] = [Link] AND [Link] > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND [Link] > current_date() - INTERVAL 7 DAYS
THEN INSERT *
This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the
entire table. Furthermore, you can use this insert-only merge with Structured Streaming to perform continuous
deduplication of the logs.
• In a streaming query, you can use merge operation in foreachBatch to continuously write any
streaming data to a Delta table with deduplication. See the following streaming example for more
information on foreachBatch.
• In another streaming query, you can continuously read deduplicated data from this Delta table. This is
possible because an insert-only merge only appends new data to the Delta table.
tables
Another common operation is SCD Type 2, which maintains history of all changes made to each key in a
dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the
inserting the new rows as the latest values. Given a source table with updates and the target table with the
dimensional data, SCD Type 2 can be expressed with merge.
Here is a concrete example of maintaining the history of addresses for a customer along with the active date range
of each address. When a customer’s address needs to be updated, you have to mark the previous address as not
the current one, update its active date range, and add the new address as the current one.
val customersTable: DeltaTable = ... // table with schema (customerId, address, current,
effectiveDate, endDate)
Similar to SCD, another common use case, often called change data capture (CDC), is to apply all data changes
generated from an external database into a Delta table. In other words, a set of updates, deletes, and inserts
applied to an external table needs to be applied to a Delta table. You can do this using merge as follows.
val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)
// Find the latest change for each key based on the timestamp
// Note: For nested structs, max on struct is computed as
// max on first struct field, if equal fall back to second fields, and so on.
val latestChangeForEachKey = changesDF
.selectExpr("key", "struct(time, newValue, deleted) as otherCols" )
.groupBy("key")
.agg(max("otherCols").as("latest"))
.selectExpr("key", "latest.*")
[Link]("t")
.merge(
[Link]("s"),
"[Link] = [Link]")
.whenMatched("[Link] = true")
.delete()
.whenMatched()
.updateExpr(Map("key" -> "[Link]", "value" -> "[Link]"))
.whenNotMatched("[Link] = false")
.insertExpr(Map("key" -> "[Link]", "value" -> "[Link]"))
.execute()
You can use a combination of merge and foreachBatch (see foreachbatch for more information) to write
complex upserts from a streaming query into a Delta table. For example:
• Write streaming aggregates in Update Mode: This is much more efficient than
Complete Mode.
import [Link].*
Note
Make sure that your merge statement inside foreachBatch is idempotent as restarts of the streaming query
can apply the operation on the same batch of data multiple times.
When merge is used in foreachBatch, the input data rate of the streaming query (reported
through StreamingQueryProgress and visible in the notebook rate graph) may be reported as a multiple
of the actual rate at which data is generated at the source. This is because merge reads the input data multiple
times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame
before merge and then uncache it after merge.
For many Delta Lake operations, you enable integration with Apache Spark DataSourceV2 and Catalog APIs (since
3.0) by setting configurations when you create a new SparkSession. See Configure SparkSession.
You can remove files no longer referenced by a Delta table and are older than the retention threshold by
running the vacuum command on the table. vacuum is not triggered automatically. The default retention
threshold for the files is 7 days. To change this behavior, see Data retention.
Important
• vacuum removes all files from directories not managed by Delta Lake, ignoring directories beginning
with _. If you are storing additional metadata like Structured Streaming checkpoints within a Delta table
directory, use a directory name such as _checkpoints.
• vacuum deletes only data files, not log files. Log files are deleted automatically and asynchronously after
checkpoint operations. The default retention period of log files is 30 days, configurable through
the [Link] property which you set with
the ALTER TABLE SET TBLPROPERTIES SQL method. See Table properties.
• The ability to time travel back to a version older than the retention period is lost after running vacuum.
from [Link] import *
[Link]() # vacuum files not required by versions older than the default
retention period
[Link](100) # vacuum files not required by versions more than 100 hours old
Note
When using VACUUM, to configure Spark to delete files in parallel (based on the number of shuffle partitions) set
the session
configuration "[Link]" to "true" .
See the Delta Lake APIs for Scala, Java, and Python syntax details.
Warning
It is recommended that you set a retention interval to be at least 7 days, because old snapshots and
uncommitted files can still be in use by concurrent readers or writers to the table. If VACUUM cleans up active
files, concurrent readers can fail or, worse, tables can be corrupted when VACUUM deletes files that have not yet
been committed. You must choose an interval that is longer than the longest running concurrent transaction and
the longest period that any stream can lag behind the most recent update to the table.
Delta Lake has a safety check to prevent you from running a dangerous VACUUM command. If you are certain
that there are no operations being performed on this table that take longer than the retention interval you plan
to specify, you can turn off this safety check by setting the Spark configuration
property [Link] to false.
You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by
running the history command. The operations are returned in reverse chronological order. By default table
history is retained for 30 days.
from [Link] import *
History schema
timestam
timestamp When this version was committed.
p
operationParamete
map Parameters of the operation (for example, predicates.)
rs
notebook struct Details of notebook from which the operation was run.
readVersion long Version of the table that was read to perform the write operation.
+-------+-------------------+------+--------+---------+--------------------+----+--------+----
-----+-----------+--------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters|
job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+----
-----+-----------+--------------+-------------+--------------------+
| 5|2019-07-29 [Link]| null| null| DELETE|[predicate -> ["(...|null| null|
null| 4| Serializable| false|[numTotalRows -> ...|
| 4|2019-07-29 [Link]| null| null| UPDATE|[predicate -> (id...|null| null|
null| 3| Serializable| false|[numTotalRows -> ...|
| 3|2019-07-29 [Link]| null| null| DELETE|[predicate -> ["(...|null| null|
null| 2| Serializable| false|[numTotalRows -> ...|
| 2|2019-07-29 [Link]| null| null| UPDATE|[predicate -> (id...|null| null|
null| 1| Serializable| false|[numTotalRows -> ...|
| 1|2019-07-29 [Link]| null| null| DELETE|[predicate -> ["(...|null| null|
null| 0| Serializable| false|[numTotalRows -> ...|
| 0|2019-07-29 [Link]| null| null| WRITE|[mode -> ErrorIfE...|null| null|
null| null| Serializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+----
-----+-----------+--------------+-------------+--------------------+
Note
Some of the columns may be nulls because the corresponding information may not be available in your
environment.
Columns added in the future will always be added after the last column.
The history operation returns a collection of operations metrics in the operationMetrics column map.
STREAMING UPDATE
DELETE
TRUNCATE
MERGE
UPDATE
OPTIMIZE
RESTORE
You can retrieve detailed information about a Delta table (for example, number of files, data size)
using DESCRIBE DETAIL.
DESCRIBE DETAIL '/data/events/'
Detail schema
The output of this operation has only one row with the following schema.
Column Type Description
partitionColum array of
Names of the partition columns if the table is partitioned.
ns strings
numFiles long Number of the files in the latest version of the table.
sizeInBytes int The size of the latest snapshot of the table in bytes.
string-string
properties All the properties set for this table.
map
minReaderVer Minimum version of readers (according to the log protocol) that can
int
sion read the table.
minWriterVers Minimum version of writers (according to the log protocol) that can
int
ion write to the table.
+------+--------------------+------------------+-----------+--------------------+-------------
-------+-------------------+----------------+--------+-----------+----------+----------------
+----------------+
|format| id| name|description| location|
createdAt|
lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersio
n|
+------+--------------------+------------------+-----------+--------------------+-------------
-------+-------------------+----------------+--------+-----------+----------+----------------
+----------------+
| delta|d31f82d2-a69f-42e...|[Link]| null|file:/Users/tuor/...|2020-06-05
[Link]...|2020-06-05 [Link]| []| 10| 12345| []|
1| 2|
+------+--------------------+------------------+-----------+--------------------+-------------
-------+-------------------+----------------+--------+-----------+----------+----------------
+----------------+
31.25.4. Generate a manifest file
You can a generate manifest file for a Delta table that can be used by other processing engines (that is, other than
Apache Spark) to read the Delta table. For example, to generate a manifest file that can be used by Presto and
Athena to read a Delta table, you run the following:
deltaTable = [Link](<path-to-delta-table>)
[Link]("symlink_format_manifest")
Convert a Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta
Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all
Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted
string (that is, <column-name1> <type>, <column-name2> <type>, ...).
Note
If a Parquet table was created by Structured Streaming, the listing of files can be avoided by using
the _spark_metadata sub-directory as the source of truth for files contained in the table setting the SQL
configuration [Link] to true.
from [Link] import *
You can easily convert a Delta table back to a Parquet table using the following steps:
1. If you have performed Delta Lake operations that can change the data files (for
example, delete or merge), run vacuum with retention of 0 hours to delete all data
files that do not belong to the latest version of the table.
2. Delete the _delta_log directory in the table directory.
You can restore a Delta table to its earlier state by using the RESTORE command. A Delta table internally
maintains historic versions of the table that enable it to be restored to an earlier state. A version corresponding to
the earlier state or a timestamp of when the earlier state was created are supported as options by
the RESTORE command.
Important
The timestamp format for restoring to an earlier state is yyyy-MM-dd HH:mm:ss. Providing only a
date(yyyy-MM-dd) string is also supported.
from [Link] import *
Restore is considered a data-changing operation. Delta Lake log entries added by the RESTORE command
contain dataChange set to true. If there is a downstream application, such as a Structured streaming job
that processes the updates to a Delta Lake table, the data change log entries added by the restore operation are
considered as new data updates, and processing them may result in duplicate data.
For example:
Tabl
e Operati Records in data change log
Delta log updates
versi on updates
on
In the preceding example, the RESTORE command results in updates that were already seen when reading the
Delta table version 0 and 1. If a streaming query was reading this table, then these files will be considered as newly
added data and will be processed again.
Restore metrics
RESTORE reports the following metrics as a single row DataFrame once the operation is complete:
• removed_files_size: Total size in bytes of the files that are removed from the
table.
31.26. Constraints
Delta tables support standard SQL constraint management clauses that ensure that the quality and integrity of data
added to a table is automatically verified. When a constraint is violated, Delta Lake throws
an InvariantViolationException to signal that the new data can’t be added.
Important
Adding a constraint automatically upgrades the table writer protocol version. See Table protocol versioning to
understand table protocol versioning and what it means to upgrade the protocol version.
You specify NOT NULL constraints in the schema when you create a table and
drop NOT NULL constraints using the ALTER TABLE CHANGE COLUMN command.
> CREATE TABLE default.people10m (
id INT NOT NULL,
firstName STRING,
middleName STRING NOT NULL,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA;
> ALTER TABLE default.people10m CHANGE COLUMN middleName DROP NOT NULL;
If you specify a NOT NULL constraint on a column nested within a struct, the parent struct is also constrained to
not be null. However, columns nested within array or map types do not accept NOT NULL constraints.
31.26.2. CHECK constraint
> ALTER TABLE default.people10m ADD CONSTRAINT dateWithinRange CHECK (birthDate > '1900-01-
01');
> ALTER TABLE default.people10m DROP CONSTRAINT dateWithinRange;
CHECK constraints are table properties in the output of
the DESCRIBE DETAIL and SHOW TBLPROPERTIES commands.
> ALTER TABLE default.people10m ADD CONSTRAINT validIds CHECK (id > 1 and id < 99999999);
Delta Lake ACID guarantees are predicated on the atomicity and durability guarantees of the storage system.
Specifically, Delta Lake relies on the following when interacting with storage systems:
• Atomic visibility: There must a way for a file to visible in its entirety or not visible at
all.
• Mutual exclusion: Only one writer must be able to create (or rename) a file at the
final destination.
• Consistent listing: Once a file has been written in a directory, all future listings for
that directory must return that file.
Because storage systems do not necessarily provide all of these guarantees out-of-the-box, Delta Lake transactional
operations typically go through the LogStore API instead of accessing the storage system directly. To provide the
ACID guarantees for different storage systems, you may have to use different LogStore implementations. This
article covers how to configure Delta Lake for various storage systems. There are two categories of storage
systems:
• Storage systems with built-in support: For some storage systems, you do not need
additional configurations. Delta Lake uses the scheme of the path (that
is, s3a in s3a://path) to dynamically identify the storage system and use the
corresponding LogStore implementation that provides the transactional
guarantees. However, for S3, there are additional caveats on concurrent writes. See
the section on S3 for details.
• Other storage systems: The LogStore, similar to Apache Spark, uses
Hadoop FileSystem API to perform reads and writes. So Delta Lake supports
concurrent reads on any storage system that provides an implementation
of FileSystem API. For concurrent writes with transactional guarantees, there are
two cases based on the guarantees provided by FileSystem implementation. If the
implementation provides consistent listing and atomic renames-without-overwrite
(that is, rename(... , overwrite = false) will either generate the target file
atomically or fail if it already exists
with [Link]), then the
default LogStore implementation using renames will allow concurrent writes with
guarantees. Otherwise, you must configure a custom implementation of LogStore by
setting the following Spark configuration
[Link].<scheme>.impl=<full-qualified-class-name>
where <scheme> is the scheme of the paths of your storage system. This configures Delta Lake to dynamically
use the given LogStore implementation only for those paths. You can have multiple such configurations for
different schemes in your application, thus allowing it to simultaneously read and write from different storage
systems.
Note
Delta Lake on local file system may not support concurrent transactional writes. This is because the local file system
may or may not provide atomic renames. So you should not use the local file system for testing concurrent writes.
Before version 1.0, Delta Lake supported configuring LogStores by setting [Link].
This approach is now deprecated. Setting this configuration will use the configured LogStore for all paths,
thereby disabling the dynamic scheme-based delegation.
Delta Lake has built-in support for the various Azure storage systems with full transactional guarantees for
concurrent reads and writes from multiple clusters.
Delta Lake relies on Hadoop FileSystem APIs to access Azure storage services. Specifically, Delta Lake requires
the implementation of [Link]() to be atomic, which is only supported in newer Hadoop
versions (Hadoop-15156 and Hadoop-15086)). For this reason, you may need to build Spark with newer Hadoop
versions and use them for deploying your application. See Specifying the Hadoop Version and Enabling YARN for
building Spark with a specific Hadoop version and Quickstart for setting up Spark with Delta Lake.
Here are the steps to configure Delta Lake on Azure Blob storage.
1. Include hadoop-azure JAR in the classpath. See the requirements above for version details.
2. Set up credentials.
We recommend that you use a SAS token. In Scala, you can use the following:
[Link](
"[Link].<your-container-name>.<your-storage-account-
name>.[Link]",
"<complete-query-string-of-your-sas-for-the-container>")
[Link](5).[Link]("delta").save("wasbs://<your-container-name>@<your-storage-
account-name>.[Link]/<path-to-delta-table>")
[Link]("delta").load("wasbs://<your-container-name>@<your-storage-account-
name>.[Link]/<path-to-delta-table>").show()
Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1.
1. Include hadoop-azure-datalake JAR in the classpath. See the requirements above for
version details.
2. Set up Azure Data Lake Storage Gen1 credentials.
You can set the following Hadoop configurations with your credentials (in Scala):
[Link]("[Link]", "ClientCredential")
[Link]("[Link]", "<your-oauth2-client-id>")
[Link]("[Link]", "<your-oauth2-credential>")
[Link]("[Link]", "[Link]
directory-id>/oauth2/token")
[Link](5).[Link]("delta").save("adl://<your-adls-
account>.[Link]/<path-to-delta-table>")
[Link]("delta").load("adl://<your-adls-account>.[Link]/<path-to-
delta-table>").show()
Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1.
1. Include the JAR of the Maven artifact hadoop-azure-datalake in the classpath. See
the requirements for version details. In addition, you may also have to include JARs for Maven
artifacts hadoop-azure and wildfly-openssl.
[Link](5).[Link]("delta").save("abfss://<container-name>@<storage-account-
name>.[Link]/<path-to-delta-table>")
[Link]("delta").load("abfss://<container-name>@<storage-account-
name>.[Link]/<path-to-delta-table>").show()
HDFS
Delta Lake has built-in support for HDFS with full transactional guarantees on concurrent reads and writes from
multiple clusters. See Hadoop and Spark documentation for configuring credentials.
32. End to End Industrial IoT (IIoT) on Azure Databricks
1. Data Ingest - stream real-time raw sensor data from Azure IoT Hubs into the Delta format in
Azure Storage
2. Data Processing - stream process sensor data from raw (Bronze) to silver (aggregated) to
gold (enriched) Delta tables on Azure Storage
# AzureML Workspace info (name, region, resource group and subscription ID) for model
deployment
[Link]("Storage Account","<your ADLS Gen 2 account name>","Storage
Account")
[Link]("Event Hub Name","<your IoT Hub's Event Hub Compatible
Name>","Event Hub Name")
• 3-node (min) Databricks Cluster running DBR 7.0+ and the following libraries:
o Azure Event Hubs Connector for Databricks - Maven
coordinates [Link]:azure-eventhubs-spark_2.12:2.3.17
• The following Secrets defined in scope iot
o iothub-cs - Connection string for your IoT Hub (Important - use the Event Hub
Compatible connection string)
o adls_key - Access Key to ADLS storage account (Important - use the Access
Key)
o synapse_cs - JDBC connect string to your Synapse SQL Pool (Important - use
the SQL Authentication with username/password connection string)
• The following notebook widgets populated:
o Storage Account - Name of your storage account
# Setup access to storage account for temp data when pushing to Synapse
storage_account = [Link]("Storage Account")
[Link](f"[Link].{storage_account}.[Link]",
[Link]("iot","adls_key"))
# Other initializations
IOT_CS = "Endpoint=sb://iothub-ns-sguptaioth-4012358-
[Link]/;SharedAccessKeyName=iothubowner;SharedAccessKey=Lcr
LjsLZKxjdzYklb4Dp2egNnKwjKLveywWUhVNIJyM=;EntityPath=sguptaiothub" #
[Link]('iot','iothub-cs') # IoT Hub connection string (Event Hub
Compatible)
ehConf = {
'[Link]':sc._jvm.[Link]
(IOT_CS),
'ehName':[Link]("Event Hub Name")
}
%sql
-- Clean up tables & views
DROP TABLE IF EXISTS turbine_raw;
DROP TABLE IF EXISTS weather_raw;
DROP TABLE IF EXISTS turbine_agg;
DROP TABLE IF EXISTS weather_agg;
DROP TABLE IF EXISTS turbine_enriched;
DROP TABLE IF EXISTS turbine_power;
DROP TABLE IF EXISTS turbine_maintenance;
DROP VIEW IF EXISTS turbine_combined;
DROP VIEW IF EXISTS feature_view;
DROP TABLE IF EXISTS turbine_life_predictions;
DROP TABLE IF EXISTS turbine_power_predictions;
OK
Step 2- Data Ingest from IoT Hubs
Azure Databricks provides a native connector to IoT and Event Hubs. Below, we will use PySpark
Structured Streaming to read from an IoT Hub stream of data and write the data in it's raw format
directly into Delta.
Make sure that your IoT Simulator is sending payloads to IoT Hub as shown below.
We split out the two payloads into separate streams and write them both into Delta locations on
Azure Storage. We are able to query these two Bronze tables immediately as the data streams
in.
# Schema of incoming data from IoT hub
schema = "timestamp timestamp, deviceId string, temperature double, humidity double,
windspeed double, winddirection string, rpm double, angle double"
# Read directly from IoT Hub using the EventHubs library for Databricks
iot_stream = (
[Link]("eventhubs") #
Read from IoT Hubs directly
.options(**ehConf) #
Use the Event-Hub-enabled connect string
.load() #
Load the data
.withColumn('reading', F.from_json([Link]('body').cast('string'), schema)) #
Extract the "body" payload from the messages
.select('reading.*', F.to_date('[Link]').alias('date')) #
Create a "date" field for partitioning
)
# Split our IoT Hub stream into separate streams and write them both into their own
Delta locations
write_turbine_to_delta = (
iot_stream.filter('temperature is null') #
Filter out turbine telemetry from other data streams
.select('date','timestamp','deviceId','rpm','angle') #
Extract the fields of interest
.[Link]('delta') #
Write our stream to the Delta format
.partitionBy('date') #
Partition our data by Date for performance
.option("checkpointLocation", CHECKPOINT_PATH + "turbine_raw") #
Checkpoint so we can restart streams gracefully
.start(BRONZE_PATH + "turbine_raw") #
Stream the data into an ADLS Path
)
write_weather_to_delta = (
iot_stream.filter(iot_stream.[Link]()) #
Filter out weather telemetry only
.select('date','deviceid','timestamp','temperature','humidity','windspeed','winddirect
ion')
.[Link]('delta') #
Write our stream to the Delta format
.partitionBy('date') #
Partition our data by Date for performance
.option("checkpointLocation", CHECKPOINT_PATH + "weather_raw") #
Checkpoint so we can restart streams gracefully
.start(BRONZE_PATH + "weather_raw") #
Stream the data into an ADLS Path
)
We will use the following schema for Silver and Gold data sets:
The first step of our processing pipeline will clean and aggregate the measurements to 1 hour
intervals.
Since we are aggregating time-series values and there is a likelihood of late-arriving data and
data changes, we will use the MERGE functionality of Delta to upsert records into target tables.
MERGE allows us to upsert source records into a target storage location. This is useful when
dealing with time-series data as:
When streaming source data, foreachBatch() can be used to perform a merges on micro-
batches of data.
# Create functions to merge turbine and weather data into their target Delta tables
def merge_delta(incremental, target):
[Link](['date','window','deviceid']).createOrReplaceTempView("incr
emental")
try:
# MERGE records into the target table using the specified join key
incremental._jdf.sparkSession().sql(f"""
MERGE INTO delta.`{target}` t
USING incremental i
ON [Link]=[Link] AND [Link] = [Link] AND [Link] = [Link]
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
except:
# If the †arget table does not exist, create one
[Link]("delta").partitionBy("date").save(target)
turbine_b_to_s = (
[Link]('delta').table("turbine_raw") # Read
data as a stream from our source Delta table
.groupBy('deviceId','date',[Link]('timestamp','5 minutes')) #
Aggregate readings to hourly intervals
.agg([Link]('rpm').alias('rpm'), [Link]("angle").alias("angle"))
.writeStream # Write
the resulting stream
.foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "turbine_agg")) # Pass
each micro-batch to a function
.outputMode("update") # Merge
works with update mode
.option("checkpointLocation", CHECKPOINT_PATH + "turbine_agg") #
Checkpoint so we can restart streams gracefully
.start()
)
weather_b_to_s = (
[Link]('delta').table("weather_raw") # Read
data as a stream from our source Delta table
.groupBy('deviceid','date',[Link]('timestamp','5 minutes')) #
Aggregate readings to hourly intervals
.agg({"temperature":"avg","humidity":"avg","windspeed":"avg","winddirection":"last"})
.selectExpr('date','window','deviceid','`avg(temperature)` as
temperature','`avg(humidity)` as humidity',
'`avg(windspeed)` as windspeed','`last(winddirection)` as
winddirection')
.writeStream # Write
the resulting stream
.foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "weather_agg")) # Pass
each micro-batch to a function
.outputMode("update") # Merge
works with update mode
.option("checkpointLocation", CHECKPOINT_PATH + "weather_agg") #
Checkpoint so we can restart streams gracefully
.start()
)
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Trac
eback (most recent call last):
%sql
-- As data gets merged in real-time to our hourly table, we can query it immediately
SELECT * FROM turbine_agg t JOIN weather_agg w ON ([Link]=[Link] AND
[Link]=[Link]) WHERE [Link]='WindTurbine-1' ORDER BY [Link] DESC
2020-08-07T[Link].000+0000,2020-08-07T[Link].000+00002020-08-07T[Link].000+0000,2020-08-
07T[Link].000+0000012345678
rpmwindspeedwindowrpm, windspeed
Next we perform a streaming join of weather and turbine readings to create one enriched dataset
we can use for data science and model training.
# Read streams from Delta Silver tables and join them together on common columns (date
& window)
turbine_agg = [Link]('delta').option("ignoreChanges",
True).table('turbine_agg')
weather_agg = [Link]('delta').option("ignoreChanges",
True).table('weather_agg').drop('deviceid')
turbine_enriched = turbine_agg.join(weather_agg, ['date','window'])
# Write the stream to a foreachBatch function which performs the MERGE as before
merge_gold_stream = (
turbine_enriched
.selectExpr('date','deviceid','[Link] as
window','rpm','angle','temperature','humidity','windspeed','winddirection')
.writeStream
.foreachBatch(lambda i, b: merge_delta(i, GOLD_PATH + "turbine_enriched"))
.option("checkpointLocation", CHECKPOINT_PATH + "turbine_enriched")
.start()
)
Synapse Analytics provides on-demand SQL directly on Data Lake source formats. Databricks
can also directly stream data to Synapse SQL Pools for Data Warehousing workloads like BI
dashboarding and reporting.
[Link]("[Link]", "copy")
# Use COPY INTO for faster loads to Synapse from Databricks
write_to_synapse = (
[Link]('delta').option('ignoreChanges',True).table('turbine_enriched'
) # Read in Gold turbine readings from Delta as a stream
.[Link]("[Link]")
# Write to Synapse (SQL DW connector)
.option("url",[Link]("iot","synapse_cs"))
# SQL Pool JDBC connection (with SQL Auth) string
.option("tempDir", SYNAPSE_PATH)
# Temporary ADLS path to stage the data (with forwarded permissions)
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "turbine_enriched")
# Table in Synapse to write to
.option("checkpointLocation", CHECKPOINT_PATH+"synapse")
# Checkpoint for resilient streaming
.start()
)
ef808c48-f19c-4776-a934-0fc601b72402
In order to train a model, we will need to backfill our streaming data with historical data. The cell
below generates 1 year of historical hourly turbine and weather data and inserts it into our Gold
Delta table.
import pandas as pd
import numpy as np
# Function to simulate generating time-series data given a baseline, slope, and some
seasonality
def generate_series(time_index, baseline, slope=0.01, period=365*24*12):
rnd = [Link](time_index)
season_time = (time_index % period) / period
seasonal_pattern = [Link](season_time < 0.4, [Link](season_time * 2 * [Link]), 1 /
[Link](3 * season_time))
return baseline * (1 + 0.1 * seasonal_pattern + 0.1 * [Link](len(time_index)))
# Get the baseline readings for each sensor for backfilling data
turbine_enriched_pd = [Link]('turbine_enriched').toPandas()
baselines = turbine_enriched_pd.min()[3:8]
devices = turbine_enriched_pd['deviceid'].unique()
# Iterate through each device to generate historical data for that device
print("---Generating Historical Enriched Turbine Readings---")
for deviceid in devices:
print(f'Backfilling device {deviceid}')
windows = pd.date_range(start=dates['start'][0], end=dates['end'][0], freq='5T') #
Generate a list of hourly timestamps from start to end date
historical_values = [Link]({
'date': [Link],
'window': windows,
'winddirection': [Link](['N','NW','W','SW','S','SE','E','NE'],
size=len(windows)),
'deviceId': deviceid
})
time_index = historical_values.index.to_numpy() #
Generate a time index
[Link](historical_values).[Link]("delta").mode("append").saveAsTa
ble("turbine_enriched")
# Create power readings based on weather and operating conditions
print("---Generating Historical Turbine Power Readings---")
[Link](f'CREATE TABLE turbine_power USING DELTA PARTITIONED BY (date) LOCATION
"{GOLD_PATH + "turbine_power"}" AS SELECT date, window, deviceId, 0.1 *
(temperature/humidity) * (3.1416 * 25) * windspeed * rpm AS power FROM
turbine_enriched')
%sql
-- Optimize all 3 tables for querying and model training performance
OPTIMIZE turbine_enriched WHERE date<current_date() ZORDER BY deviceid, window;
OPTIMIZE turbine_power ZORDER BY deviceid, window;
OPTIMIZE turbine_maintenance ZORDER BY deviceid;
path
metrics
1
null
A key component of this architecture is the Azure Data Lake Store (ADLS), which enables the
write-once, access-often analytics pattern in Azure. However, Data Lakes alone do not solve
challenges that come with time-series streaming data. The Delta storage format provides a layer
of resiliency and performance on all data sources stored in ADLS. Specifically for time-series
data, Delta provides the following advantages over other storage formats on ADLS:
Required
Capability Other formats on ADLS Delta Format on ADLS
Unified batch & Data Lakes are often used in ACID-compliant transactions enable data
streaming conjunction with a streaming store engineers to perform streaming ingest and
like CosmosDB, resulting in a historically batch loads into the same locations
complex architecture on ADLS
Schema Data Lakes do not enforce schema, Schema is enforced by default. As new IoT
enforcement and requiring all data to be pushed into a devices are added to the data stream, schemas
evolution relational database for reliability can be evolved safely so downstream
applications don’t fail
Efficient Upserts Data Lakes do not support in-line MERGE commands are effective for situations
updates and merges, requiring handling delayed IoT readings, modified
deletion and insertions of entire dimension tables used for real-time
partitions to perform updates enrichment, or if data needs to be reprocessed
File Compaction Streaming time-series data into Data Auto-compaction in Delta optimizes the file
Lakes generate hundreds or even sizes to increase throughput and parallelism
thousands of tiny files
Multi- Data Lakes provide push-down ZORDERing time-series on fields like timestamp
dimensional filtering on partitions only or sensor ID allows Databricks to filter and join
clustering on those columns up to 100x faster than simple
partitioning techniques
DELTA LAKE DDL/DML: UPDATE, DELETE, MERGE, ALTER TABLE TIME TRAVEL (CONTINUED)
Update rows that match a predicate condition Rollback a table to an earlier version
LQS KRAPS HTIW UPDATE tableName SET event = 'click' WHERE event = 'clk' -- RESTORE requires Delta Lake version 0.7.0+ & DBR 7.4+.
RESTORE tableName VERSION AS OF 0
Delta Lake is an open source storage layer that brings ACID Delete rows that match a predicate condition RESTORE tableName TIMESTAMP AS OF "2020-12-18"
transactions to Apache Spark™ and big data workloads. DELETE FROM tableName WHERE "date < '2017-01-01"
[Link] | Documentation | GitHub | Delta Lake on Databricks Insert values directly into table
INSERT INTO TABLE tableName VALUES ( UTILITY METHODS
(8003, "Kim Jones", "2020-12-18", 3.875),
CREATE AND QUERY DELTA TABLES (8004, "Tim Jones", "2020-12-20", 3.750) View table details
);
Create and use managed database DESCRIBE DETAIL tableName
-- Insert using SELECT statement
DESCRIBE FORMATTED tableName
-- Managed database is saved in the Hive metastore. INSERT INTO tableName SELECT * FROM sourceTable
Default database is named "default". -- Atomically replace all data in table with new values Delete old files with Vacuum
DROP DATABASE IF EXISTS dbName; INSERT OVERWRITE loan_by_state_delta VALUES (...)
VACUUM tableName [RETAIN num HOURS] [DRY RUN]
CREATE DATABASE dbName;
USE dbName -- This command avoids having to specify Upsert (update + insert) using MERGE Clone a Delta Lake table
[Link] every time instead of just tableName. MERGE INTO target -- Deep clones copy data from source, shallow clones don't.
USING updates CREATE TABLE [dbName.] targetName
Query Delta Lake table by table name (preferred) ON [Link] = [Link] [SHALLOW | DEEP] CLONE sourceName [VERSION AS OF 0]
/* You can refer to Delta Tables by table name, or by WHEN MATCHED AND target.delete_flag = "true" THEN [LOCATION "path/to/table"]
path. Table name is the preferred way, since named tables DELETE -- specify location only for path-based tables
are managed in the Hive Metastore (i.e., when you DROP a WHEN MATCHED THEN
named table, the data is dropped also — not the case for UPDATE SET * -- star notation means all columns
path-based tables.) */ WHEN NOT MATCHED THEN Interoperability with Python / DataFrames
SELECT * FROM [dbName.] tableName INSERT (date, Id, data) -- or, use INSERT * -- Read name-based table from Hive metastore into DataFrame
VALUES (date, Id, data) df = [Link]("tableName")
Query Delta Lake table by path -- Read path-based table into DataFrame
Insert with Deduplication using MERGE df = [Link]("delta").load("/path/to/delta_table")
SELECT * FROM delta.`path/to/delta_table` -- note backticks
MERGE INTO logs Run SQL queries from Python
Convert Parquet table to Delta Lake format in place USING newDedupedLogs
[Link]("SELECT * FROM tableName")
ON [Link] = [Link]
-- by table name [Link]("SELECT * FROM delta.`/path/to/delta_table`")
WHEN NOT MATCHED
CONVERT TO DELTA [dbName.]tableName
[PARTITIONED BY (col_name1 col_type1, col_name2
THEN INSERT * Modify data retention settings for Delta Lake table
col_type2)] -- logRetentionDuration -> how long transaction log history
Alter table schema — add columns
is kept, deletedFileRetentionDuration -> how long ago a file
-- path-based tables ALTER TABLE tableName ADD COLUMNS ( must have been deleted before being a candidate for VACCUM.
CONVERT TO DELTA parquet.`/path/to/table` -- note backticks col_name data_type ALTER TABLE tableName
[PARTITIONED BY (col_name1 col_type1, col_name2 col_type2)] [FIRST|AFTER colA_name]) SET TBLPROPERTIES(
[Link] = "interval 30 days",
Create Delta Lake table as SELECT * with no upfront Alter table — add constraint [Link] = "interval 7 days"
schema definition -- Add "Not null" constraint: );
ALTER TABLE tableName CHANGE COLUMN col_name SET NOT NULL SHOW TBLPROPERTIES tableName;
CREATE TABLE [dbName.] tableName -- Add "Check" constraint:
USING DELTA ALTER TABLE tableName
AS SELECT * FROM tableName | parquet.`path/to/data` ADD CONSTRAINT dateWithinRange CHECK date > "1900-01-01"
[LOCATION `/path/to/table`] -- Drop constraint: PERFORMANCE OPTIMIZATIONS
-- using location = unmanaged table ALTER TABLE tableName DROP CONSTRAINT dateWithinRange
Compact data files with Optimize and Z-Order
Create table, define schema explicitly with SQL DDL *Databricks Delta Lake feature
CREATE TABLE [dbName.] tableName ( OPTIMIZE tableName
id INT [NOT NULL], TIME TRAVEL [ZORDER BY (colNameA, colNameB)]
name STRING,
date DATE, View transaction log (aka Delta Log) Auto-optimize tables
int_rate FLOAT) *Databricks Delta Lake feature
USING DELTA DESCRIBE HISTORY tableName
ALTER TABLE [table_name | delta.`path/to/delta_table`]
[PARTITIONED BY (time, date)] -- optional SET TBLPROPERTIES ([Link] = true)
Query historical versions of Delta Lake tables
Copy new data into Delta Lake table (with idempotent retries) SELECT * FROM tableName VERSION AS OF 0 Cache frequently queried data in Delta Cache
SELECT * FROM tableName@v0 -- equivalent to VERSION AS OF 0 *Databricks Delta Lake feature
COPY INTO [dbName.] targetTable
SELECT * FROM tableName TIMESTAMP AS OF "2020-12-18" CACHE SELECT * FROM tableName
FROM "/path/to/table"
-- or:
FILEFORMAT = DELTA -- or CSV, Parquet, ORC, JSON, etc.
Find changes between 2 versions of table CACHE SELECT colA, colB FROM tableName WHERE colNameA > 0
Provided to the open source community by Databricks SELECT * FROM tableName VERSION AS OF 12
© Databricks 2021. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are EXCEPT ALL SELECT * FROM tableName VERSION AS OF 11
trademarks of the Apache Software Foundation.
WORKING WITH DELTA
DELTATABLES
TABLES TIME TRAVEL (CONTINUED)
# A DeltaTable is the entry point for interacting with Find changes between 2 versions of a table
tables programmatically in Python — for example, to
df1 = [Link]("delta").load(pathToTable)
perform updates or deletes.
NOHTYP HTIW
df2 = [Link]("delta").option("versionAsOf",
from [Link] import *
2).load("/path/to/delta_table")
Delta Lake is an open source storage layer that brings ACID [Link](df2).show()
deltaTable = [Link](spark, tableName)
transactions to Apache Spark™ and big data workloads. deltaTable = [Link](spark, Rollback a table by version or timestamp
delta.`path/to/table`)
[Link] | Documentation | GitHub | API reference | Databricks [Link](0)
[Link]('2020-12-01')
READS AND WRITES WITH DELTA LAKE DELTA LAKE DDL/DML: UPDATES, DELETES, INSERTS, MERGES
Read data from pandas DataFrame Delete rows that match a predicate condition
UTILITY METHODS
df = [Link](pdf) # predicate using SQL formatted string Run Spark SQL queries in Python
# where pdf is a pandas DF [Link]("date < '2017-01-01'")
# then save DataFrame in Delta Lake format as shown below # predicate using Spark SQL functions [Link]("SELECT * FROM tableName")
[Link](col("date") < "2017-01-01") [Link]("SELECT * FROM delta.`/path/to/delta_table`")
Read data using Apache Spark™ [Link]("DESCRIBE HISTORY tableName")
# read by path Update rows that match a predicate condition
df = ([Link]("parquet"|"csv"|"json"|etc.) # predicate using SQL formatted string
Compact old files with Vacuum
.load("/path/to/delta_table")) [Link](condition = "eventType = 'clk'", [Link]() # vacuum files older than default
# read by table name set = { "eventType": "'click'" } ) retention period (7 days)
df = [Link]("events") # predicate using Spark SQL functions [Link](100) # vacuum files not required by
[Link](condition = col("eventType") == "clk", versions more than 100 hours old
Save DataFrame in Delta Lake format set = { "eventType": lit("click") } )
Clone a Delta Lake table
([Link]("delta")
.mode("append"|"overwrite")
Upsert (update + insert) using MERGE [Link](target="/path/to/delta_table/",
.partitionBy("date") # optional # Available options for merges [see docs for details]: isShallow=True, replace=True)
.option("mergeSchema", "true") # option - evolve schema .whenMatchedUpdate(...) | .whenMatchedUpdateAll(...) |
.saveAsTable("events") | .save("/path/to/delta_table") .whenNotMatchedInsert(...) | .whenMatchedDelete(...) Get DataFrame representation of a Delta Lake table
) ([Link]("target").merge( df = [Link]()
source = [Link]("updates"),
Streaming reads (Delta table as streaming source) condition = "[Link] = [Link]") Run SQL queries on Delta Lake tables
# by path or by table name .whenMatchedUpdateAll() [Link]("SELECT * FROM tableName")
df = ([Link] .whenNotMatchedInsert( [Link]("SELECT * FROM delta.`/path/to/delta_table`")
.format("delta") values = {
.schema(schema) "date": "[Link]",
.table("events") | .load("/delta/events") "eventId": "[Link]",
) "data": "[Link]", PERFORMANCE OPTIMIZATIONS
"count": 1
Streaming writes (Delta table as a sink) }
Compact data files with Optimize and Z-Order
).execute()
([Link]("delta") ) *Databricks Delta Lake feature
.outputMode("append"|"update"|"complete") [Link]("OPTIMIZE tableName [ZORDER BY (colA, colB)]")
.option("checkpointLocation", "/path/to/checkpoints") Insert with Deduplication using MERGE
.trigger(once=True|processingTime="10 seconds") ([Link]("logs").merge( Auto-optimize tables
.table("events") | .start("/delta/events") [Link]("newDedupedLogs"), *Databricks Delta Lake feature. For existing tables:
) "[Link] = [Link]") [Link]("ALTER TABLE [table_name |
.whenNotMatchedInsertAll() delta.`path/to/delta_table`]
.execute() SET TBLPROPERTIES ([Link] = true)
) To enable auto-optimize for all new Delta Lake tables:
CONVERT PARQUET TO DELTA LAKE [Link]("SET [Link].
[Link] = true")
Convert Parquet table to Delta Lake format in place TIME TRAVEL
from [Link] import * Cache frequently queried data in Delta Cache
View transaction log (aka Delta Log) *Databricks Delta Lake feature
deltaTable = [Link](spark, fullHistoryDF = [Link]() [Link]("CACHE SELECT * FROM tableName")
"parquet.`/path/to/parquet_table`") -- or:
Query historical versions of Delta Lake tables [Link]("CACHE SELECT colA, colB FROM tableName
partitionedDeltaTable = [Link](spark, WHERE colNameA > 0")
"parquet.`/path/to/parquet_table`", "part int") # choose only one option: versionAsOf, or timestampAsOf
df = ([Link]("delta")
.option("versionAsOf", 0)
Provided to the open source community by Databricks
.option("timestampAsOf", "2020-12-18")
© Databricks 2021. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are .load("/path/to/delta_table"))
trademarks of the Apache Software Foundation.