Cleaning Data with PySpark Notes
Pyspark is lazy by default, so it only applies transformations when executing an action method.
Use this method to get the number of partitions:
df.rdd.getNumPartitions()
Para otimizar o desempenho, podemos usar cache através do método .cache() df.cache()
Para verificar se o DF tem cache, usar o método is_cached() df.is_cached Para remover do cache, usar o método unpersist() df.unpersist()*
To read a configuration setting, call spark.conf.get() with the name of the setting as the argument. To write a configuration setting, call spark.conf.set() with the name of the setting and the actual value as the function arguments.
Cluster types
Spark deployments can vary depending on the exact needs of the users. One large component of a deployment is the cluster management mechanism. Spark clusters can be:
- Single node clusters, deploying all components on a single system (physical / VM / container).
- Standalone clusters, with dedicated machines as the driver and workers.
- Managed clusters, meaning that the cluster components are handled by a third party cluster manager such as YARN, Mesos, or Kubernetes.
Reading spark configurations
Name of the Spark application instance:
app_name = spark.conf.get('spark.app.name')
Driver TCP port:
driver_tcp_port = spark.conf.get('spark.driver.port')
Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
Explaining execution plan
To understand performance implications of Spark, you must be able to see what it's doing under the hood. The easiest way to do this is to use the .explain() df.explain()
How to limit shuffling?
It can be tricky to remove shuffling operations entirely but there are a few things that can limit it. The DataFrame .repartition() function takes a single argument, the number of partitions requested. We've used this in an earlier chapter to illustrate the effect of partitions with the monotonically_increasing_id() function. Repartitioning requires a full shuffle of data between nodes & processes and is quite costly. If you need to reduce the number of partitions, use the .coalesce() function instead. It takes a number of partitions smaller than the current one and consolidates the data without requiring a full data shuffle. Note: calling .coalesce() with a larger number of partitions does not actually do anything.
The .join() function is a great use of Spark and provides a lot of power. Calling .join() indiscriminately can often cause shuffle operations, leading to increased cluster load & slower processing times. To avoid some of the shuffle operations when joining Spark DataFrames you can use the .broadcast() function.
Ex:
from pyspark.sql.functions import broadcast
joined_df = df1.join(broadcast(df2))
Examples
Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)
Remove any duration of 0
departures_df = departures_df.filter(departures_df['duration'] > 0)
Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())
Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')