You might already know Apache Spark as a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. It’s well-known for its speed, ease of use, generality and the ability to run virtually everywhere. And even though Spark is one of the most asked tools for data engineers, also data scientists can benefit from Spark when doing exploratory data analysis, feature extraction, supervised learning and model evaluation.
Today’s post will introduce you to some basic Spark in Python topics, based on 9 of the most frequently asked questions, such as
What language to pick when you’re working with Spark: Python or Scala? What are the benefits of using one over the other?
Next, you’ll see how you can work with Spark in Python : locally or via the Jupyter Notebook. You’ll learn how to install Spark and how to run Spark applications with Jupyter notebooks, either by adding PySpark as any other library, by working with a kernel or by running PySpark with Jupyter in Docker containers.
Now that you have made sure that you can work with Spark in Python, you’ll get to know one of the basic building blocks that you will frequently use when you’re working with PySpark: the RDD. You’ll learn how the RDD differs from the DataFrame API and the DataSet API and when you should use which structure.
Then, you’ll learn more about the differences between Spark DataFrames and Pandas DataFrames and how you can switch from one to the other. If you want to go further into Pandas DataFrames, consider DataCamp’s Pandas Foundations course.
When you’re working with RDDs, it’s very important to understand the differences between RDD actions and transformations ,
And why you need to cache or persist RDDs , or when you need to broadcast a variable.
To round up, you’ll get introduced to some of the best practices in Spark , like using DataFrames and the Spark UI,
And you’ll also see how you can turn off the logging for PySpark .
Spark: Python or Scala?
When you start out, you’ll probably read a lot about using Spark with Python or with Scala. There has been some discussion about it on forums.
Spark Performance: Scala or Python?
In general, most developers seem to agree that Scala wins in terms of performance and concurrency: it’s definitely faster than Python when you’re working with Spark, and when you’re talking about concurrency, it’s sure that Scala and the Play framework make it easy to write clean and performant async code that is easy to reason about. Play is fully asynchronous, which make it possible to have many concurrent connections without dealing with threads. It will be easier to make I/O calls in paralllel to improve performance and enables the use of real-time, streaming, and server push technologies.
Note that asynchronous code allows for non-blocking I/O when making calls to remote services. Let’s state it differently with an example: when you have two lines of code of which the first one queries a database and the next prints something to the console, synchronous programming will wait for the query to finish before printing something. Your program is (momentarily) blocked. If your programming language doesn’t support asynchronous programming, you’ll need to make threads to execute lines of code in parallel. Asynchronous programming, on the other hand, will already print to the console while the database is being queried. The query will be processed on the background.
In short, the above explains why it’s still strongly recommended to use Scala over Python when you’re working with streaming data, even though structured streaming in Spark seems to reduce the gap already.
But streaming data is not the only performance consideration that you might make.
When you’re working with the DataFrame API, there isn’t really much of a difference between Python and Scala, but you do need to be wary of User Defined Functions (UDFs), which are less efficient than its Scala equivalents. That’s why you should favor built-in expressions if you’re working with Python. When you’re working with Python, also make sure not to pass your data between DataFrame and RDD unnecessarily, as the serialization and deserialization of the data transfer is particularly expensive.
Remember that serialization is a process of converting an object into a sequence of bytes which can be persisted to a disk or database or can be sent through streams. The reverse process, creating object from sequence of bytes, is called deserialization. In a more practical example, you can have a movie application, for example, with a server and clients. Whenever the application from a client send queries to the server to retrieve, for example, a list of movies. The server needs to pass a list of available Movie objects back to the clients, the object needs to be serialized.
Learning Spark: Python or Scala?
Python seems to be a better choice in terms of learning curve and easy to use, as it’s less verbose and more readable than Scala. It’s ideal for those who don’t have too much programming experience.
But even for those who have some programming experience, working with Spark in Python isn’t far fetched at all, as you’ll see in the following paragraphs.
Spark and Type Safety: Scala or Python?
Two things that are somewhat in the middle are the type safety and the amount of advanced features that you get when you’re working with either Python or Scala. For what concerns the type safety, you can say that Python is a good choice when you’re doing smaller ad hoc experiments, while Scala is great when you’re working on bigger projects in production. This is mostly because statically typed language like Scala are much easier and hassle-free when you’re refactoring.
Remember that when a language is statically typed, every variable name is bound both to a type and an object. Type checking happens at compile time. Typical examples are Java or Scala. Note that in Scala’s case, the type systemcan deduce the type of a variable, so there is a form of type inference that will make your work a bit quicker. In dynamically typed languages, every variable name is bound only to an object, unless it is null, of course. Type checking happens at run time. As a developer, this generally means that you can work quicker because you don’t have to specify types every time. Typical examples here are Python or Ruby.
Let’s make this visual. Consider the following Java and Python code chunks:
# Java
String str = "Hello";
str = 5;
# Python
str = "Hello"
str = 5
Assigning the integer 5
to str
in Java will give an error, since you declared it to be a String
. In Python, this won’t be a problem.
Note that the Spark DataSets, which are statically typed, don’t really have much of a place in Python. You’ll read more about this later on.
Spark and Advanced Features: Python or Scala?
And, lastly, there are some advanced features that might sway you to use either Python or Scala. Here, you would have to argue that Python has the main advantage if you’re talking about data science, as it provides the user with a lot of great tools for machine learning and natural language processing, such as SparkMLib.
Conclusion
In a nutshell, both languages have their advantages and disadvantages when you’re working with Spark. Deciding for one or the other depends on your projects’ needs, your own or your teams’ capabilities, … The general advice that is given is to use Scala unless you’re already proficient in it or if you don’t have much programming experience. That means that, in the end, it’s important that you know how to work with both!
PS. If you want to get started with PySpark, don’t miss DataCamp’s PySpark cheat sheet .
How To Install Spark
Installing Spark and getting to work with it can be a daunting task. This section will go deeper into how you can install it and what your options are to start working with it.
First, check if you have the Java jdk installed. Then, go to the Spark download page . Keep the default options in the first three steps and you’ll find a downloadable link in step 4. Click to download it.
Next, make sure that you untar the directory that appears in your “Downloads” folder. Next, move the untarred folder to /usr/local/spark
.
$ mv spark-2.1.0-bin-hadoop2.7 /usr/local/spark
Now that you’re all set to go, open the README file in /usr/local/spark
. You’ll see that you’ll need to run a command to build Spark if you have a version that has not been built yet. So, make sure you run the command:
$ build/mvn -DskipTests clean package run
This might take a while, but after this, you’re all set to go!
Interactive Spark Shell
Next, you can immediately start working in the Spark shell by typing ./bin/pyspark
in the same folder in which you left off at the end of the last section. It can take a bit of time, but eventually, you’ll see something like this:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 12:39:47)
SparkSession available as 'spark'.
You’re ready to start working interactively!
Note that the SparkContext has already been initialized. You don’t need to import SparkContext from pyspark to begin working.
Of course, you can adjust the command to start the Spark shell according to the options that you want to change. In the following command, you see that the --master
argument allows you to specify to which master the SparkContext connects to. In this case, you see that the local mode is activated. The number in between the brackets designates the number of cores that are being used; In this case, you use all cores, while local[4]
would only make use of four cores.
$ ./bin/pyspark --master local[*]
Note that the application UI is available at localhost:4040. Open up a browser, paste in this location and you’ll get to see a dashboard with tabs designating jobs, stages, storage, etc. This will definitely come in handy when you’re executing jobs and looking to tune them. You’ll read more about this further on.
Running Spark Applications Using Jupyter Notebooks
When you have downloaded a Spark distribution, you can also start working with Jupyter Notebook. If you want to try it out first, go here and make sure you click on the “Welcome to Spark with Python” notebook. The demo will show you how you can interactively train two classifiers to predict survivors in the Titanic data set with Spark MLlib.
There are various options to get Spark in your Jupyter Notebook: you can run PySpark notebooks in your Docker container, you can set up your Jupyter Notebook with Spark or you can make sure you add a kernel to work with it in your notebook.
In any case, make sure you have the Jupyter Notebook Application ready. If you don’t, consider installing Anaconda , which includes the application, or install it with the help of pip pip3 install jupyter
. You can find more information on the installation process or running specific notebooks with Spark in Python in a Docker container, consult DataCamp’s Definitive Guide to Jupyter Notebook .
Spark in Jupyter Notebook
Now that you have all that you need to get started, you can launch the Jupyter Notebook Application by typing the following:
PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
Or you can launch Jupyter Notebook normally with jupyter notebook
and run the following code before importing PySpark:
! pip install findspark
With findspark
, you can add pyspark to sys.path at runtime. Next, you can just import pyspark just like any other regular library:
import findspark
findspark.init()
# Or the following command
findspark.init("/path/to/spark_home")
from pyspark import SparkContext, SparkConf
Note that if you haven’t installed Spark with brew and in accordance with the instructions that are listed above, it could be that you need to add the path to SPARK_HOME to findspark.init()
. If you’re still in doubt where SPARK_HOME is located at, you can call findspark.find()
to automatically detect the location of where Spark is installed.
Tip : you can read more about findspark
here .
Jupyter Notebook with Spark Kernel
Next, if you want to install a kernel, you want to make sure you get Apache Toree installed. Install Toree via pip with pip install toree
. Next, install a jupyter application toree
:
jupyter toree install --spark_home=/usr/local/bin/apache-spark/ --interpreters=Scala,PySpark
Make sure that you fill out the spark_home
argument correctly and also note that if you don’t specify PySpark
in the interpreters
argument, that the Scala kernel will be installed by default. This path should point to the unzipped directory that you have downloaded earlier from the Spark download page. Next, verify whether the kernel is included in the following list:
jupyter kernelspec list
Start Jupyter notebook as usual with jupyter notebook
or configure Spark even further with, for example, the following line:
SPARK_OPTS='--master=local[4]' jupyter notebook
In which you specify to run Spark locally with 4 threads.
Running PySpark with Jupyter in Docker Containers
One of the other options to run the Jupyter Notebook Application is to run it in Docker containers. All you need to do is set up Docker and download a Docker image that best fits your porject. First, consult this section for the Docker installation instructions if you haven’t gotten around installing Docker yet.
Once you have set up, go to DockerHub and go for an image like jupyter/pyspark-notebook
to kickstart your journey with PySpark in Jupyter. For other images, check out this repository .
Do you want to get a better overview of the evolution of the Jupyter project and the components of the Jupyter Notebook? Consider reading up on our IPython or Jupyter? post.
Spark APIs: RDD, Dataset and DataFrame
These three APIs can seem very confusing for anyone who’s just getting acquainted with Spark. You will agree with me when I say that it’s sometimes hard to see the forest for the trees… This section will focus on making the distinction between these three a little bit more clear and also explains when you should be using which API.
RDDs
RDDs are the building blocks of Spark. It’s the original API that Spark exposed and pretty much all the higher level APIs decompose to RDDs. From a developer’s perspective, an RDD is simply a set of Java or Scala objects representing data.
RDDs have three main characteristics: they are compile-time type safe (they have a type!), they are lazy and they are based on the Scala collections API.
The advantages of RDDs are manifold, but there are also some problems. For example, it’s easy to build inefficient transformation chains, they are slow with non-JVM languages such as Python, they can not be optimized by Spark. Lastly, it’s difficult to understand what is going on when you’re working with them, because, for example, the transformation chains are not very readable in the sense that you don’t immediately see what will be the solution, but how you are doing it.
DataFrames
Because of the disadvantages that you can experience while working with RDDs, the DataFrame API was conceived: it provides you with a higher level abstraction that allows you to use a query language to manipulate the data. This higher level abstraction is a logical plan that represents data and a schema. This means that the frontend to interacting with your data is a lot easier! Because the logical plan will be converted to a physical plan for execution, you’re actually a lot closer to what you’re doing when you’re working with them rather than how you’re trying to do it, because you let Spark figure out the most efficient way to do what you want to do.
Remember though that DataFrames are still built on top of RDDs!
And exactly because you let Spark worry about the most efficient way to do things, DataFrames are optimized: more intelligent decisions will be made when you’re transforming data and that also explains why they are faster than RDDs.
More specifically, the performance improvements are due to two things, which you’ll often come across when you’re reading up DataFrames: custom memory management (project Tungsten), which will make sure that your Spark jobs much faster given CPU constraints, and optimized execution plans (Catalyst optimizer), of which the logical plan of the DataFrame is a part.
Datasets
The only downside to using DataFrames is that you’ve lost compile-time type safety when you work with DataFrames, which makes your code more prone to errors. This is part of the reason why they have moved more to the notion of Datasets: getting back some type safety and the use of lambda functions, which means that you want to go a bit back to the advantage that RDDs has to offer, but you don’t want to lose all the optimalizations that the DataFrames offer.
The notion of the Dataset has developed and has become the second main Spark API, besides the RDD API. As a result, the Dataset can take on two distinct characteristics: a strongly-typed API and an untyped API. This means that the DataFrame is still there conceptually, as a synonym for a Dataset: any DataFrame is now a synonym for Dataset[Row]
in Scala, where Row
is a generic untyped JVM object. The Dataset is a collection of strongly-typed JVM objects.
Note that, since Python has no compile-time type-safety, only the untyped DataFrame API is available. Or, in other words, Spark DataSets are statically typed, while Python is a dynamically typed programming language. That explains why the DataFrames or the untyped API is available when you want to work with Spark in Python. Also, remember that Datasets are built on top of RDDs, just like DataFrames.
To summarize, the clear advantage of working with the DataSet API (which includes both DataSets and DataFrames) are the static typing and the runtime type safety, the higher level abstraction over the data, and the performance and optimization. Of course, it also helps that the DataSet API basically forces you to work with more structured data, which also adds to the ease of use of the API itself.
When to use which?
All of the above explains why it’s generally advised to use DataFrames when you’re working with PySpark, also because they are so close to the DataFrame structure that you might already know from the pandas library. Also note that what has been pointed out in the section above: there is not really a place for Datasets in Python because of the lack of compile-time type-safety of the Python language. However, that doesn’t mean that the Dataset API, which includes both a strongly-typed and untyped API, can’t come in handy: it is ideal for when you want to use high-level expressions, SQL queries, columnar access and the use of lambda functions, … on semi-structured data. However, if you might still want to have more control, you can always fall back on the RDDs.
You can use RDDs when you want to perform low-level transformations and actions on your unstructured data. This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column. In addition, you don’t necessarily need the optimization and performance benefits that DataFrames and DataSets can offer for (semi-) structured data. Also, you usually use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.
In short, use the API that best fits your use case, but also remember that it’s fairly easy to switch from a DataFrame to an RDD, as you will see in the next section!
The Difference Between Spark DataFrames and Pandas DataFrames
DataFrames are often compared to tables in a relational database or a data frame in R or Python: they have a scheme, with column names and types and logic for rows and columns. This mimics the implementation of DataFrames in Pandas!
Note that, even though the Spark, Python and R data frames can be very similar, there are also a lot of differences: as you have read above, Spark DataFrames carry the specific optimalization under the hood and can use distributed memory to handle big data, while Pandas DataFrames and R data frames can only run on one computer.
However, these differences don’t mean that the two of them can’t work together: you can reuse your existing Pandas DataFrames to scale up to larger data sets. If you want to convert your Spark DataFrame to a Pandas DataFrame and you expect the resulting Pandas’s DataFrame to be small, you can use the following lines of code:
df.toPandas()
You see, the two integrate very well: you can parallelize the work load thanks to the Spark DataFrame, you can make use of the wealth of libraries that Python and R DataFrames have to offer, which make visualization or machine learning a whole lot more easy!
Note that you do need to make sure that the DataFrame needs to be small enough because all the data is loaded into the driver’s memory!
RDD Actions versus Transformations
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map()
is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce()
is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program. Note, however, that there is also a reduceByKey()
that returns a distributed dataset.
All transformations in Spark are lazy, in that they do not compute their results right away: instead, they just remember the transformations applied to some base dataset. The transformations are only computed when an action requires a result to be returned to the driver program.
With these two types of RDD operations, Spark can run more efficiently: a dataset created through map()
operation will be used in a consequent reduce()
operation and will return only the result of the the last reduce function to the driver. That way, the reduced data set rather than the larger mapped data set will be returned to the user. This is more efficient, without a doubt!
Why Do You Need To Cache or Persist RDDs?
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
A couple of use cases for caching or persisting RDDs are the use of iterative algorithms and fast interactive RDD use.
If you’re going to persist RDDs, take a look at the level of persistence that you’re going to define. You can find more information here .
Persist or Broadcast Variable?
RDDs are divided into partitions: each partition can be considered as an immutable subset of the entire RDD. When you execute your Spark program, each partition gets sent to a worker. This means that each worker operates on the subset of the data. Each worker can cache the data if the RDD needs to be re-iterated: the partitions that it elaborates are stored in memory and will be reused in other actions. As you read in the above paragraph, by persisting, Spark will have faster access to that data partition next time an operation makes use of it.
But you need to keep in mind that when you pass a function to a Spark operation, it is executed on separate cluster nodes. Every node receives a copy of the variable inside the function, and so every change to the local value of the variable is not propagated to the driver program.
A typical use case in which this might happen is when you have to redistribute intermediate results of operations, such as trained models, or as static lookup tables in cases where you want to perform lookups against a small table to join it together with your bigger data set.
Instead of creating a copy of the variable for each machine, you use broadcast variables to send some immutable state once to each worker. Broadcast variables allow the programmer to keep a cached read-only variable in every machine. In short, you use these variables when you want a local copy of a variable.
You can create a broadcast variable with SparkContext.broadcast(variable)
. This will return the reference of the broadcast variable.
As you can see, persisting an RDD or using a broadcast variable are two different solutions to different problems.
What Are The Best Practices in Spark?
There are tons of possibilities when you’re working with PySpark, but that doesn’t mean that there are some simple and general best practices that you can follow:
Use Spark DataFrames
Consider the section above to see whether you should use RDDs or DataFrames. As you already read above, Spark DataFrames are optimized and therefore also faster than RDDs. Especially when you’re working with structured data, you should really consider switching your RDD to a DataFrame.
RDD Best Practices
Don’t call collect()
on large RDDs
By calling collect()
on any RDD, you drag data back into your applications from the nodes. Each RDD element will be copy onto the single driver program, which will run out of memory and crash. Given the fact that you want to make use of Spark in the most efficient way possible, it’s not a good idea to call collect()
on large RDDs.
Other functions that you can use to inspect your data are take()
or takeSample()
, but also countByKey()
, countByValue()
or collectAsMap()
can help you out. If you really need to take a look at the complete data, you can always write out the RDD to files or export it to a database that is large enough to keep your data.
Reduce Your RDD Before Joining
The fact that you can chain operations comes in handy when you’re working with Spark RDDs, but what you might not realize is that you have a responsibility to build efficient transformation chains, too. Taking care of the efficiency is also a way of tuning your Spark jobs’ efficiency and performance.
One of the most basic rules that you can apply when you’re revising the chain of operations that you have written down is to make sure that you filter or reduce your data before joining it. This way, you avoid sending too much data over the network that you’ll throw away after the join, which is already a good reason, right?
But there is more. The join operation is one of the most expensive operations that you can use in Spark, so that’s why it makes sense to be wary of this. When you reduce the data before the join, you avoid shuffling your data around too much.
Avoid groupByKey()
on large RDDs
On big data sets, you’re better off making use of other functions, such as reduceByKey()
, combineByKey()
or foldByKey()
. When you use groupByKey()
, all key-value pairs are shuffled around in the cluster. A lot of unnecessary data is being transferred over the network. Additionally, this also means that if more data is shuffled onto a single machine than can fit in memory, the data will be spilled to disk. This heavily impacts the performance of your Spark job.
When you make use of reduceByKey()
, for example, the pairs with the same key are already combined before the data is shuffled. As a result, you’ll have to send less data over the network. Next, the reduce function is called again so that all the values from each partition are reduced.
Broadcast Variables
Since you already know what broadcast variables are and in which situations they can come in handy, you’ll also have gathered that this is one of the best practices for when you’re working with Spark because you can reduce the cost of launching a job over the cluster.
Avoid flatmap()
, join()
and groupBy()
Pattern
When you have two datasets that are grouped by key and you want to join them, but still keep them grouped, use cogroup()
instead of the above pattern.
Spark UI
You already read about it in one of the sections above, but making use of the Spark UI is really something that you can not miss. This web interface allows you to monitor and inspect the execution of your jobs in a web browser, which is extremely important if you want to exercise control over your jobs.
The Spark UI allows you to maintain an overview off your active, completed and failed jobs. You can see when you submitted the job, and how long it took for the job to run. Besides the schematic overview, you can also see the event timeline section in the “Jobs” tab. Make sure to also find out more about your jobs by clicking the jobs themselves. You’ll get more information on the stages of tasks inside it.
The “Stages” tab in the UI shows you the current stage of all stages of all jobs in a Spark application, while the “Storage” tab will give you more insights on the RDD size and the memory use.
How To Turn Off PySpark Logging
Go to the Spark directory and execute the following command:
cp conf/log4j.properties.template conf/log4j.properties
Note that this command copies the file log4j.properties.template
into the same folder conf
, but under a different name, namely log4j.properties
instead of the original name log4j.properties.template
. Next, you should also know wthat Apache Log4j is a Java-based logging utility. In short, you’re copying a logging template.
Next, before you start editing anything, take your time to inspect the template file log4j.properties.template
with a text editor. You’ll see a section like this:
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
You’ll see that the log4j rootCategory
attribute is set to INFO
by default. That means that you’ll get to see logs that are at INFO
level and that offer “informational messages that highlight the progress of the application at coarse-grained level”. It’s not hard to imagine that this level of logging could offer you many, if not too many, logs. However, turning the logging entirely off is usually not a good option either because you usually do want to stay in the loop with whatever your application is doing: this makes debugging or spotting anomalies prematurely a lot easier.
Consider setting the level to WARN
, which will indicate potentially harmful situations in the logs, or ERROR
, which indicate error events that might not block your application from running. You can find the other levels here .
You know now what to do: edit log4j.properties
with nano log4j.properties
or vim and replace the INFO
with the desired level of logging.
Save the file correctly and restart your shell. Run the application again and you’ll see that the logging has reduced to the level that you defined in the properties file.
Spark Up Your Data Analyses!
You’ve reached the end of this tutorial so now it’s time for you to start working with Spark if you’ve just read and not followed the tutorial!
If you are interested in learning more about PySpark, consider taking DataCamp’s Introduction to PySpark course.