Last Updated on July 23, 2021
Table of Contents:
- What is PySpark?
- What is Apache Spark?
- What is Apache Spark used for?
- What is PySpark used for?
- Is Apache Spark free?
- Why should I use Apache Spark?
- Why shouldn’t I use Apache Spark?
- Why should I use PySpark?
- Why shouldn’t I use PySpark?
- What are some Apache Spark alternatives?
- What are some Apache Spark clients?
- How to get started with Apache Spark?
- What are the main components of Apache Spark?
- What is the Apache Spark RDD?
- How to use PySpark in Jupyter Notebooks?
- Meet the Data
- How to start a PySpark session?
- How to load data in PySpark?
- What are the most common PySpark functions?
- How to convert an RDD to a DataFrame in PySpark?
- How to preprocess data with PySpark?
- How to run a Machine Learning model with PySpark?
- Full code
What is PySpark?
PySpark is a Python library that serves as an interface for Apache Spark.
What is Apache Spark?
Apache Spark is an open-source distributed computing engine that is used for Big Data processing.
It is a general-purpose engine as it supports Python, R, SQL, Scala, and Java.
What is Apache Spark used for?
Apache Spark is often used with Big Data as it allows for distributed computing and it offers built-in data streaming, machine learning, SQL, and graph processing. It is often used by data engineers and data scientists.
What is PySpark used for?
PySpark is used as an API for Apache Spark. This allows us to leave the Apache Spark terminal and enter our preferred Python programming IDE without losing what Apache Spark has to offer.
Is Apache Spark free?
Apache Spark is an open-source engine and thus it is completely free to download and use.
Why should I use Apache Spark?
- Apache Spark offers distributed computing
- Apache Spark is easy to use
- Apache Spark is free
- Offer advanced analytics
- Is a very powerful engine
- Offers machine learning, streaming, SQL, and graph processing modules
- Is applicable to various programming languages like Python, R, Java…
- Has a good community and is advancing as a product
Why shouldn’t I use Apache Spark?
- Apache Spark can have scaling problems with compute-intensive jobs
- It can consume a lot of memory
- Can have issues with small files
- Is constrained by the number of available ML algorithms
Why should I use PySpark?
- PySpark is easy to use
- PySpark can handle synchronization errors
- The learning curve isn’t steep as in other languages like Scala
- Can easily handle big data
- Has all the pros of Apache Spark added to it
Why shouldn’t I use PySpark?
- PySpark can be less efficient as it uses Python
- It is slow when compared to other languages like Scala
- It can be replaced with other libraries like Dask that easily integrate with Pandas (depends on the problem and dataset)
- Suffers from all the cons of Apache Spark
What are some Apache Spark alternatives?
Apache Spark can be replaced with some alternatives and they are the following:
- Apache Hadoop
- Google BigQuery
- Amazon EMR
- IBM Analytics Engine
- Apache Flink
- Apache Pig
What are some Apache Spark clients?
Some of the programming clients that has Apache Spark APIs are the following:
How to get started with Apache Spark?
In order to get started with Apache Spark and the PySpark library, we will need to go through multiple steps. This can be a bit confusing if you have never done something similar but don’t worry. We will do it together!
The first things that we need to take care of are the prerequisites that we need in order to make Apache Spark and PySpark work. These prerequisites are Java 8, Python 3, and something to extract .tar files.
Let’s see what Java version are you rocking on your computer. If you’re on Windows like me, go to Start, type
cmd, and enter the Command Prompt. When there, type the following command:
And you’ll get a message similar to this one that will specify your Java version:
java version "1.8.0_281"
If you didn’t get a response you don’t have Java installed. If your java is outdated ( < 8) or non-existent, go over to the following link and download the latest version.
Download and set-up
Go over to the following link and download the 3.0.3. Spark release that is pre-built for Apache Hadoop 2.7.
Now click the blue link that is written under number 3 and select one of the mirrors that you would like to download from. While it is downloading create a folder named Spark in your root drive (C:).
Go into that folder and extract the downloaded file into it. The next thing that you need to add is the winutils.exe file for the underlying Hadoop version that Spark will be utilizing.
To do this, go over to the following GitHub page and select the version of Hadoop that we downloaded. After that, scroll down until you see the winutils.exe file. Click on it and download it.
Now create a new folder in your root drive and name it “Hadoop”, then create a folder inside of that folder and name it “bin”. Inside the bin folder paste the winutils.exe file that we just downloaded.
Now for the final steps, we need to configure our environmental variables. Environmental variables allow us to add Spark and Hadoop to our system PATH. This way we can call Spark in Python as they will be on the same PATH.
Click Start and type “environment”. Then select the “Edit the system environment variables” option. A new window will pop up and in the lower right corner of it select “Environment Variables”.
A new window will appear that will show your environmental variables. In my case, I already have Spark there:
To add it there, click on “New”. Then set the name to be “SPARK_HOME” and for the Variable value add the path where you downloaded your spark. It should be something like this
C:\Spark\spark... Click OK.
For the next step be sure to be careful and not change your Path. Click on the “Path” in your user variables and then select “Edit”. A new window will appear, click on the “New” button and then write this
You’ve successfully added Spark to your PATH! Now, repeat this process for both Hadoop and Java. The only things that will change will be their locations and the end name that you give to them.
Your end product should look like this:
Now let us launch our Spark and see it in its full glory. Start a new command prompt and then enter spark-shell to launch Spark. A new window will appear with Spark up and running.
Now open up your browser and write
http://localhost:4040/ or whatever the name of your system is. This will open up the Apache Spark UI where you will be able to see all the information you might need.
What are the main components of Apache Spark?
There are several components that make Apache Spark and they are the following:
- Spark Core – is the main part of Apache Spark that provides in-built memory computing and does all the basic I/O functions, memory management, and much more.
- Spark Streaming – allows for data streaming that can go up to a couple of gigabytes per second.
- Spark SQL – allows the use of SQL (Structured Query Language) for easier data manipulation and analysis.
- MlLib – packs several machine learning models that can be used in several programming languages.
- GraphX – provides several methods for implementing graph theory to your dataset (i.e. network analysis).
What is the Apache Spark RDD?
Apache Spark RDD (Resilient Distributed Dataset) is a data structure that serves as the main building block. An RDD can be seen as an immutable and partitioned set of data values that can be processed on a distributed system.
To conclude, they are resilient because they are immutable, distributed as they have partitions that can be processed in a distributed manner, and datasets as they hold our data.
How to use PySpark in Jupyter Notebooks?
To use PySpark in your Jupyter notebook, all you need to do is to install the PySpark pip package with the following command:
pip install pyspark
As your Python is located on your system PATH it will work with your Apache Spark. If you want to use something like Google Colab you will need to run the following block of code that will set up Apache Spark for you:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q https://www-us.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz !tar xf spark-3.0.3-bin-hadoop2.7.tgz !pip install -q findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7" import findspark findspark.init()
If you want to use Kaggle like we’re going to do, you can just go straight to the “pip install pyspark” command as Apache Spark will be ready for use.
Meet the Data
The dataset that we are going to use for this article will be the Stock Market Data from 1996 to 2020 which is found on Kaggle. The dataset is 12.32 GB which exceeds the zone of being comfortable to use with pandas.
For the purpose of this article, we will go over the basics of Apache Spark that will set you up for future use. In the end, we’ll fit a simple regression algorithm to the data.
We’ll use Kaggle as our IDE. All that you need to do to follow along is to open up a new notebook on the main page of the dataset.
How to start a PySpark session?
To start a PySpark session you will need to specify the builder access, where the program will run, the name of the application, and the session creation parameter. All of that is done with the following lines of code:
!pip install pyspark from pyspark.sql import SparkSession # Create the Session spark = SparkSession.builder\ .master("local")\ .appName("PySpark Tutorial")\ .getOrCreate()
How to create an RDD in PySpark?
In order to create an RDD in PySpark, all we need to do is to initialize the
sparkContext with the data we want it to have. For example, the following code will create an RDD of the FB stock data and show the first two rows:
sc = spark.sparkContext rdd = sc.textFile('../input/stockmarketdatafrom1996to2020/Data/Data/FB/FB.csv') rdd.take(2)
['Date,Open,High,Low,Close,Adj Close,Volume', '2012-05-18,42.049999,45.000000,38.000000,38.230000,38.230000,573576400']
How to load data in PySpark?
To load data in PySpark you will often use the
.read.file_type() function with the specified path to your desired file. To import our dataset, we use the following command:
stock_1 = spark.read.csv('../input/stockmarketdatafrom1996to2020/Data/Data/AAPL/AAPL.csv',\ inferSchema=True, header=True) stock_1.show(5)
To find your data path you can simply navigate the Data section on the right side of your screen and copy the path to the desired file. In our case, I selected a random stock from the Data folder that has all stocks in it.
inferSchema parameter will automatically infer the input schema from our data and the
header parameter will use the first row as the column names. After the data is loaded we print out the first 5 rows.
You could try loading all the stocks from the Data file but that would take too long to wait and the goal of the article is to show you how to go around using Apache Spark.
To list all of them and their directories you can run the following code:
from pathlib import Path contents = list(Path('../input/stockmarketdatafrom1996to2020/Data/Data').iterdir()) print(contents)
Let’s get the second stock ready for when we do the regression:
stock_2 = spark.read.csv('../input/stockmarketdatafrom1996to2020/Data/Data/MSFT/MSFT.csv',\ inferSchema=True, header=True)
You can also check the schema of your data frame:
What are the most common PySpark functions?
Some of the most common PySpark functions that you will probably be using are the
map, and more. I’ll showcase each one of them in an easy-to-understand manner.
select function is often used when we want to see or create a subset of our data. In order to do this, we want to specify the column names. For example, let’s hone in on the closing prices of the APPL stock data:
filter function will apply a filter on the data that you have specified. For example, we can show only the top 10 APPL closing prices that are above $148 with their timestamps.
from pyspark.sql import functions as F stock_1.filter(F.col("Close")>148.00).select("Date","Close").show(10)
map function will allow us to parse the previously created RDD. For example, we can parse the values in it and create a list out of each row.
rdd = rdd.map(lambda line: line.split(",")) rdd.top(5)
reduce function will allow us to “reduce” the values by aggregating them aka by doing various calculations like counting, summing, dividing, and similar.
For example, let’s create an RDD with random numbers and sum them.
num = sc.parallelize([23, 1, 4, 5, 6, 7]) num_sum = num.reduce(lambda a,b:a+b) print(num_sum)
How to convert an RDD to a DataFrame in PySpark?
To convert an RDD to a DataFrame in PySpark, you will need to utilize the
toDF functions while specifying the column names and value lines. Let’s take our previously parsed FB stock RDD and convert it:
from pyspark.sql import Row header = rdd.first() stock_3 = rdd.filter(lambda line: line != header)\ .map(lambda line: Row(date=line, open=line, high=line, low=line, close=line, adj_close=line, volume=line)).toDF() stock_3.show(5)
Notice how I filtered out the first row from the RDD. This was done because the first row carried the column names and we didn’t want it in our values.
How to preprocess data with PySpark?
To preprocess data with PySpark there are several methods that depend on what you wish to do. For example, I will show you how to standardize the values for your analysis.
The first thing that we will do is to convert our Adj Close values to a float type. Then we will rename the columns that will make our analysis later on and merge the two data frames.
After that, we will need to convert those to a vector in order to be available to the standard scaler. We’ll print out the results after each step so that you can see the progression:
from pyspark.ml.feature import StandardScaler from pyspark.ml.feature import VectorAssembler input_1 = stock_1.select("Adj Close") input_1.show(5) input_2 = stock_2.select("Adj Close") ####################### input_1 = input_1.withColumnRenamed("Adj Close","label") input_2 = input_2.withColumnRenamed("Adj Close","feature") input_data = input_1.join(input_2) input_data.show(5)
And now for the scaling part:
assembler = VectorAssembler( inputCols=["feature"], outputCol="features") input_data = assembler.transform(input_data) standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled") scaler = standardScaler.fit(input_data.select("features")) df = scaler.transform(input_data) df.show(5)
How to run a Machine Learning model with PySpark?
To run a Machine Learning model in PySpark, all you need to do is to import the model from the
pyspark.ml library and initialize it with the parameters that you want it to have.
For example, let’s create a simple linear regression model and see if the prices of stock_1 can predict the prices of stock_2. To do this, we will first split the data into train and test sets ( 80-20% respectively).
We then fit the model to the train data. This might take several minutes to complete. As Apache Spark doesn’t have all the models you might need using Sklearn is a good option and it can easily work with Apache Spark.
Moreover, Sklearn sometimes speeds up the model fitting.
from pyspark.ml.regression import LinearRegression train_data, test_data = df.randomSplit([.8,.2], seed=42) reg = LinearRegression(labelCol="label",\ featuresCol="features_scaled", maxIter=5) model = reg.fit(train_data)
When the fitting is done we can do the predictions on the test data. Have in mind that we won’t optimize the hyperparameters in this article. We will zip the predictions and the true labels and print out the first five.
# Predict test_data predicted = model.transform(test_data) # Take predictions and the true label - zip them predictions = predicted.select("prediction").rdd.map(lambda x: x) labels = predicted.select("label").rdd.map(lambda x: x) pred_lab = predictions.zip(labels).collect() # Print out first 5 predictions pred_lab[:5]
Also, have in mind that this is a very x10 simple model that shouldn’t be used on data like this. The goal is to show you how to use the ML library. To access the model’s coefficients and useful statistics we can do the following:
# Model coefficients print(model.coefficients) # Intercept print(model.intercept) # RMSE print(model.summary.rootMeanSquaredError)