Pular para o conteúdo principal
InicioFolhas de consultaPython

PySpark Cheat Sheet: Spark in Python

This PySpark cheat sheet with code samples covers the basics like initializing Spark in Python, loading data, sorting, and repartitioning.
jul. de 2021  · 6 min leer

Apache Spark is generally known as a fast, general and open-source engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. It allows you to speed analytic applications up to 100 times faster compared to technologies on the market today. You can interface Spark with Python through "PySpark". This is the Spark Python API exposes the Spark programming model to Python.

Even though working with Spark will remind you in many ways of working with Pandas DataFrames, you'll also see that it can be tough getting familiar with all the functions that you can use to query, transform, inspect, ... your data. What's more, if you've never worked with any other programming language or if you're new to the field, it might be hard to distinguish between RDD operations.

Let's face it, map() and flatMap() are different enough, but it might still come as a challenge to decide which one you really need when you're faced with them in your analysis. Or what about other functions, like reduce() and reduceByKey()

PySpark cheat sheet

Have this cheat sheet at your fingertips

Download PDF

Even though the documentation is very elaborate, it never hurts to have a cheat sheet by your side, especially when you're just getting into it.

This PySpark cheat sheet covers the basics, from initializing Spark and loading your data, to retrieving RDD information, sorting, filtering and sampling your data. But that's not all. You'll also see that topics such as repartitioning, iterating, merging, saving your data and stopping the SparkContext are included in the cheat sheet. 

Note that the examples in the document take small data sets to illustrate the effect of specific functions on your data. In real life data analysis, you'll be using Spark to analyze big data.

Are you hungry for more? Don't miss our other Python cheat sheets for data science that cover topics such as Python basicsNumpyPandasPandas Data Wrangling and much more! 

PySpark is the Spark Python API that exposes the Spark programming model to Python.

Initializing Spark 

SparkContext 

>>> from pyspark import SparkContext>>> sc = SparkContext(master = 'local[2]')

Inspect SparkContext 

>>> sc.version #Retrieve SparkContext version>>> sc.pythonVer #Retrieve Python version>>> sc.master #Master URL to connect to>>> str(sc.sparkHome) #Path where Spark is installed on worker nodes>>> str(sc.sparkUser()) #Retrieve name of the Spark User running SparkContext>>> sc.appName #Return application name>>> sc.applicationld #Retrieve application ID>>> sc.defaultParallelism #Return default level of parallelism>>> sc.defaultMinPartitions #Default minimum number of partitions for RDDs

Configuration 

>>> from pyspark import SparkConf, SparkContext>>> conf = (SparkConf()     .setMaster("local")     .setAppName("My app")     . set   ("spark. executor.memory",   "lg"))>>> sc = SparkContext(conf = conf)

Using the Shell 

In the PySpark shell, a special interpreter-aware SparkContext is already created in the variable called sc.

$ ./bin/spark-shell --master local[2]$ ./bin/pyspark --master local[s] --py-files code.py

Set which master the context connects to with the --master argument, and add Python .zip..egg or.py files to the

runtime path by passing a comma-separated list to  --py-files.

Loading Data 

Parallelized Collections 

>>> rdd = sc.parallelize([('a',7),('a',2),('b',2)])>>> rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])>>> rdd3 = sc.parallelize(range(100))>>> rdd = sc.parallelize([("a",["x","y","z"]),               ("b" ["p","r,"])])

External Data 

Read either one text file from HDFS, a local file system or any Hadoop-supported file system URI with textFile(), or read in a directory of text files with wholeTextFiles(). 

>>> textFile = sc.textFile("/my/directory/•.txt")>>> textFile2 = sc.wholeTextFiles("/my/directory")

Retrieving RDD Information 

Basic Information 

>>> rdd.getNumPartitions() #List the number of partitions>>> rdd.count() #Count RDD instances 3>>> rdd.countByKey() #Count RDD instances by keydefaultdict(<type 'int'>,{'a':2,'b':1})>>> rdd.countByValue() #Count RDD instances by valuedefaultdict(<type 'int'>,{('b',2):1,('a',2):1,('a',7):1})>>> rdd.collectAsMap() #Return (key,value) pairs as a dictionary   {'a': 2, 'b': 2}>>> rdd3.sum() #Sum of RDD elements 4950>>> sc.parallelize([]).isEmpty() #Check whether RDD is emptyTrue

Summary 

>>> rdd3.max() #Maximum value of RDD elements 99>>> rdd3.min() #Minimum value of RDD elements0>>> rdd3.mean() #Mean value of RDD elements 49.5>>> rdd3.stdev() #Standard deviation of RDD elements 28.866070047722118>>> rdd3.variance() #Compute variance of RDD elements 833.25>>> rdd3.histogram(3) #Compute histogram by bins([0,33,66,99],[33,33,34])>>> rdd3.stats() #Summary statistics (count, mean, stdev, max & min)

Applying Functions 

#Apply a function to each RFD element>>> rdd.map(lambda x: x+(x[1],x[0])).collect()[('a' ,7,7, 'a'),('a' ,2,2, 'a'), ('b' ,2,2, 'b')]#Apply a function to each RDD element and flatten the result>>> rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))>>> rdd5.collect()['a',7 , 7 ,  'a' , 'a' , 2,  2,  'a', 'b', 2 , 2, 'b']#Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys>>> rdds.flatMapValues(lambda x: x).collect()[('a', 'x'), ('a', 'y'), ('a', 'z'),('b', 'p'),('b', 'r')]

Selecting Data

Getting

>>> rdd.collect() #Return a list with all RDD elements [('a', 7), ('a', 2), ('b', 2)]>>> rdd.take(2) #Take first 2 RDD elements [('a', 7),  ('a', 2)]>>> rdd.first() #Take first RDD element('a', 7)>>> rdd.top(2) #Take top 2 RDD elements [('b', 2), ('a', 7)]

Sampling

>>> rdd3.sample(False, 0.15, 81).collect() #Return sampled subset of rdd3     [3,4,27,31,40,41,42,43,60,76,79,80,86,97]

Filtering

>>> rdd.filter(lambda x: "a" in x).collect() #Filter the RDD[('a',7),('a',2)]>>> rdd5.distinct().collect() #Return distinct RDD values['a' ,2, 'b',7]>>> rdd.keys().collect() #Return (key,value) RDD's keys['a',  'a',  'b']

Iterating 

>>> def g (x): print(x)>>> rdd.foreach(g) #Apply a function to all RDD elements('a', 7)('b', 2)('a', 2)

Reshaping Data 

Reducing

>>> rdd.reduceByKey(lambda x,y : x+y).collect() #Merge the rdd values for each key[('a',9),('b',2)]>>> rdd.reduce(lambda a, b: a+ b) #Merge the rdd values('a', 7, 'a' , 2 , 'b' , 2)

 

Grouping by

>>> rdd3.groupBy(lambda x: x % 2) #Return RDD of grouped values          .mapValues(list)          .collect()>>> rdd.groupByKey() #Group rdd by key          .mapValues(list)          .collect() [('a',[7,2]),('b',[2])]

Aggregating

>> seqOp = (lambda x,y: (x[0]+y,x[1]+1))>>> combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))#Aggregate RDD elements of each partition and then the results>>> rdd3.aggregate((0,0),seqOp,combOp) (4950,100)#Aggregate values of each RDD key>>> rdd.aggregateByKey((0,0),seqop,combop).collect()      [('a',(9,2)), ('b',(2,1))]#Aggregate the elements of each partition, and then the results>>> rdd3.fold(0,add)     4950#Merge the values for each key>>> rdd.foldByKey(0, add).collect()[('a' ,9), ('b' ,2)]#Create tuples of RDD elements by applying a function>>> rdd3.keyBy(lambda x: x+x).collect()

Mathematical Operations 

>>>> rdd.subtract(rdd2).collect() #Return each rdd value not contained in rdd2[('b' ,2), ('a' ,7)]#Return each (key,value) pair of rdd2 with no matching key in rdd>>> rdd2.subtractByKey(rdd).collect()[('d', 1)1>>>rdd.cartesian(rdd2).collect() #Return the Cartesian product of rdd and rdd2

Sort 

>>> rdd2.sortBy(lambda x: x[1]).collect() #Sort RDD by given function[('d',1),('b',1),('a',2)]>>> rdd2.sortByKey().collect() #Sort (key, value) ROD by key[('a' ,2), ('b' ,1), ('d' ,1)]

Repartitioning 

>>> rdd.repartition(4) #New RDD with 4 partitions>>> rdd.coalesce(1) #Decrease the number of partitions in the RDD to 1

Saving 

>>> rdd.saveAsTextFile("rdd.txt")>>> rdd.saveAsHadoopFile("hdfs:// namenodehost/parent/child",               'org.apache.hadoop.mapred.TextOutputFormat')

Stopping SparkContext 

>>> sc.stop()

Execution 

$ ./bin/spark-submit examples/src/main/python/pi.py

Learn more about PySpark

Introduction to PySpark

Big Data Fundamentals with PySpark

Cleaning Data with PySpark

Feature Engineering with PySpark

Machine Learning with PySpark

Building Recommendation Engines with PySpark

Temas
Relacionado

cheat-sheet

PySpark Cheat Sheet: Spark DataFrames in Python

This PySpark SQL cheat sheet is your handy companion to Apache Spark DataFrames in Python and includes code samples.
Karlijn Willems's photo

Karlijn Willems

5 min

cheat-sheet

Pandas Cheat Sheet for Data Science in Python

A quick guide to the basics of the Python data analysis library Pandas, including code samples.
Karlijn Willems's photo

Karlijn Willems

4 min

cheat-sheet

Pandas Cheat Sheet: Data Wrangling in Python

This cheat sheet is a quick reference for data wrangling with Pandas, complete with code samples.
Karlijn Willems's photo

Karlijn Willems

4 min

tutorial

Pyspark Tutorial: Getting Started with Pyspark

Discover what Pyspark is and how it can be used while giving examples.
Natassha Selvaraj's photo

Natassha Selvaraj

10 min

tutorial

Apache Spark Tutorial: ML with PySpark

Apache Spark tutorial introduces you to big data processing, analysis and ML with PySpark.
Karlijn Willems's photo

Karlijn Willems

34 min

tutorial

Installation of PySpark (All operating systems)

This tutorial will demonstrate the installation of PySpark and hot to manage the environment variables in Windows, Linux, and Mac Operating System.

Olivia Smith

8 min

See MoreSee More