Caching
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Mark Plutowski
Data Scientist
What is caching?
Keeping data in memory
Spark tends to unload memory aggressively
INTRODUCTION TO SPARK SQL IN PYTHON
Eviction Policy
Least Recently Used (LRU)
Eviction happens independently on each worker
Depends on memory available to each worker
INTRODUCTION TO SPARK SQL IN PYTHON
Caching a dataframe
TO CACHE A DATAFRAME:
[Link]()
TO UNCACHE IT:
[Link]()
INTRODUCTION TO SPARK SQL IN PYTHON
Determining whether a dataframe is cached
df.is_cached
False
[Link]()
df.is_cached
True
INTRODUCTION TO SPARK SQL IN PYTHON
Uncaching a dataframe
[Link]()
df.is_cached()
False
INTRODUCTION TO SPARK SQL IN PYTHON
Storage level
[Link]()
[Link]()
[Link]
StorageLevel(True, True, False, True, 1)
In the storage level above the following hold:
1. useDisk = True
2. useMemory = True
3. useOffHeap = False
4. deserialized = True
5. replication = 1
INTRODUCTION TO SPARK SQL IN PYTHON
Persisting a dataframe
The following are equivalent in Spark 2.1+ :
[Link]()
[Link](storageLevel=[Link].MEMORY_AND_DISK)
[Link]() is the same as [Link]()
INTRODUCTION TO SPARK SQL IN PYTHON
Caching a table
[Link]('df')
[Link](tableName='df')
False
[Link]('df')
[Link](tableName='df')
True
INTRODUCTION TO SPARK SQL IN PYTHON
Uncaching a table
[Link]('df')
[Link](tableName='df')
False
[Link]()
INTRODUCTION TO SPARK SQL IN PYTHON
Tips
Caching is lazy
Only cache if more than one operation is to be performed
Unpersist when you no longer need the object
Cache selectively
INTRODUCTION TO SPARK SQL IN PYTHON
Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
The Spark UI
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Mark Plutowski
Data Scientist
Use the Spark UI inspect execution
Spark Task is a unit of execution that runs on a single cpu
Spark Stage a group of tasks that perform the same computation in parallel, each task typically
running on a different subset of the data
Spark Job is a computation triggered by an action, sliced into one or more stages.
INTRODUCTION TO SPARK SQL IN PYTHON
Finding the Spark UI
1. [Link]
2. [Link]
3. [Link]
4. [Link]
...
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Spark catalog operations
[Link]('table1')
[Link]('table1')
[Link]('table1')
[Link]('table1')
INTRODUCTION TO SPARK SQL IN PYTHON
Spark Catalog
[Link]()
[Table(name='text', database=None, description=None, tableType='TEMPORARY', isTemporary=
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Spark UI Storage Tab
Shows where data partitions exist
in memory,
or on disk,
across the cluster,
at a snapshot in time.
INTRODUCTION TO SPARK SQL IN PYTHON
Spark UI SQL tab
query3agg = """
SELECT w1, w2, w3, COUNT(*) as count FROM (
SELECT
word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3
FROM df
)
GROUP BY w1, w2, w3
ORDER BY count DESC
"""
[Link](query3agg).show()
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
INTRODUCTION TO SPARK SQL IN PYTHON
Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Logging
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Mark Plutowski
Data Scientist
Logging primer
import logging
[Link](stream=[Link], level=[Link],
format='%(asctime)s - %(levelname)s - %(message)s')
[Link]("Hello %s", "world")
[Link]("Hello, take %d", 2)
2019-03-14 [Link],359 - INFO - Hello world
INTRODUCTION TO SPARK SQL IN PYTHON
Logging with DEBUG level
import logging
[Link](stream=[Link], level=[Link],
format='%(asctime)s - %(levelname)s - %(message)s')
[Link]("Hello %s", "world")
[Link]("Hello, take %d", 2)
2018-03-14 [Link],000 - INFO - Hello world
2018-03-14 [Link],001 - DEBUG - Hello, take 2
INTRODUCTION TO SPARK SQL IN PYTHON
Debugging lazy evaluation
lazy evaluation
distributed execution
INTRODUCTION TO SPARK SQL IN PYTHON
A simple timer
t = timer()
[Link]()
1. elapsed: 0.0 sec
[Link]() # Do something that takes 2 seconds
2. elapsed: 2.0 sec
[Link]() # Do something else that takes time: reset
[Link]()
3. elapsed: 0.0 sec
INTRODUCTION TO SPARK SQL IN PYTHON
class timer
class timer:
start_time = [Link]()
step = 0
def elapsed(self, reset=True):
[Link] += 1
print("%d. elapsed: %.1f sec %s"
% ([Link], [Link]() - self.start_time))
if reset:
[Link]()
def reset(self):
self.start_time = [Link]()
INTRODUCTION TO SPARK SQL IN PYTHON
Stealth CPU wastage
import logging
[Link](level=[Link],
format='%(asctime)s - %(levelname)s - %(message)s')
# < create dataframe df here >
t = timer()
[Link]("No action here.")
[Link]()
[Link]("df has %d rows.", [Link]())
[Link]()
2018-12-23 [Link],472 - INFO - No action here.
1. elapsed: 0.0 sec
2. elapsed: 2.0 sec
INTRODUCTION TO SPARK SQL IN PYTHON
Disable actions
ENABLED = False
t = timer()
[Link]("No action here.")
[Link]()
if ENABLED:
[Link]("df has %d rows.", [Link]())
[Link]()
2019-03-14 [Link],789 - Pyspark - INFO - No action here.
1. elapsed: 0.0 sec
2. elapsed: 0.0 sec
INTRODUCTION TO SPARK SQL IN PYTHON
Enabling actions
Rerunning the previous example with ENABLED = True triggers the action:
2019-03-14 [Link],789 - INFO - No action here.
1. elapsed: 0.0 sec
2019-03-14 [Link],789 - INFO - df has 1107014 rows.
2. elapsed: 2.0 sec
INTRODUCTION TO SPARK SQL IN PYTHON
Let's practice!
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Query Plans
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N
Mark Plutowski
Data Scientist
Explain
EXPLAIN SELECT * FROM table1
INTRODUCTION TO SPARK SQL IN PYTHON
Load dataframe and register
df = [Link]('/temp/[Link]')
[Link]('df')
INTRODUCTION TO SPARK SQL IN PYTHON
Running an EXPLAIN query
[Link]('EXPLAIN SELECT * FROM df').first()
Row(plan='== Physical Plan ==\n
*FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>')
INTRODUCTION TO SPARK SQL IN PYTHON
Interpreting an EXPLAIN query
== Physical Plan ==
FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>'
INTRODUCTION TO SPARK SQL IN PYTHON
[Link]()
[Link]()
== Physical Plan ==
FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
[Link]("SELECT * FROM df").explain()
== Physical Plan ==
FileScan parquet [word#712,id#713L,title#714,part#715]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
INTRODUCTION TO SPARK SQL IN PYTHON
[Link](), on cached dataframe
[Link]()
[Link]()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
[Link]("SELECT * FROM df").explain()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
INTRODUCTION TO SPARK SQL IN PYTHON
Words sorted by frequency query
SELECT word, COUNT(*) AS count
FROM df
GROUP BY word
ORDER BY count DESC
Equivalent dot notation approach:
[Link]('word')
.count()
.sort(desc('count'))
.explain()
INTRODUCTION TO SPARK SQL IN PYTHON
Same query using dataframe dot notation
== Physical Plan ==
*Sort [count#1040L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1040L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966],
true,10000, StorageLevel(disk, memory, deserialized,
1 replicas)
+- *FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
INTRODUCTION TO SPARK SQL IN PYTHON
Reading from bottom up
FileScan parquet
InMemoryRelation
InMemoryTableScan
`HashAggregate(keys=[word#963], ...)``
`HashAggregate(keys=[word#963], functions=[count(1)])``
`Sort [count#1040L DESC NULLS LAST]``
INTRODUCTION TO SPARK SQL IN PYTHON
Query plan
== Physical Plan ==
*Sort [count#1160L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1160L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- *FileScan parquet [word#963] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/[Link]], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<word:string>
The previous plan had the following lines, which are missing from the plan above:
...
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
...
INTRODUCTION TO SPARK SQL IN PYTHON
Let's practice
I N T R O D U C T I O N TO S PA R K S Q L I N P Y T H O N