Press enter to see results or esc to cancel.

How to read 1.7 billion Reddit comments with Spark and Python Part 1: Setting up a local cluster

In this post, I’ll provide a walkthrough of how to set up a Spark cluster locally and run some simple queries on a month of Reddit comment data. In the next post, we will look at scaling up the Spark cluster using Amazon EMR and S3 buckets to query ~1.7 billion Reddit comments from 2007 to 2015, representing approximately 1 terabyte of data.

What is Apache Spark?

Apache Spark is an open-source cluster-computing framework. Spark allows for fast and efficient analysis of large data sets with Resilient Distributed Datasets (RDD). RDDs represent a collection of objects that can be split across a computing cluster. Both the data and operations on the data can be split across workers in a cluster and executed in parallel.

Hadoop vs. Spark

One of the main differences between Spark and Hadoop is that that Hadoop’s MapReduce reads and writes from disk, while Spark has an in-memory data engine. From the Spark documentation:

[Spark] has been used to sort 100 TB of data 3X faster than Hadoop MapReduce on 1/10th of the machines, winning the 2014 Daytona GraySort Benchmark, as well as to sort 1 PB. Several production workloads use Spark to do ETL and data analysis on PBs of data.

Spark is significantly easier to develop for and deploy, as it provides RDD APIs for Python, Scala, Java and R. The framework also supports SQL queries, streaming data and machine learning.

Step 1: Running an Ubuntu VM

For a single node Spark cluster, we’ll use Ubuntu 18.04 LTS (Desktop) running as a virtual machine in VMWare Workstation Player

n this example, I increased the hard disk size to 50 GB and increase the default memory to 3072 MB.

Step 2: Install the required packages

We need to update the package index:

Next we install the latest Java Development Kit with:

It’s worth checking the version of the JDK that gets installed during this step. On one of the virtual machines the default was a version for AMD processors which caused issues later on. To be sure you have the right version for your machine, download the JDK from here and check the version is the right one for your machine.

The 64-bit Linux version of the JDK is the correct one for our VM.

Once the file is downloaded, we unzip and move it to the correct folder with:

Now we get Scala, the programming language that Spark is written in:

Finally, we install a series of packages related to Python (we will go through what these are for later on):

Now that the Python package manager pip is installed, we can install the required Python packages with:

Pip’s caching mechanism tries to read the entire file into memory before caching it, which can cause problems on VMs or other limited memory environments. Running pip with –no-cache-dir avoids using the cache.

Step 3: Download and install Spark

Download the latest version of Spark here and make sure the package type is ‘Pre-built for Apache Hadoop 2.7 and later’.

Navigate to the folder where Spark was downloaded. We will unzip the file and move to /usr/local with:

Another big advantage of Spark is the excellent support it provides for external APIs, particularly the Python API, PySpark. In this example, we’re going to use PySpark and a Jupyter notebook for our queries. To make things easier, we’ll add the following commands to the end of our .bashrc file:

The first two lines set the JAVA_HOME path, which is required for the Spark shell. Make sure you get the version of the JDK that you donwloaded. The last two lines configure the PySpark driver to use Jupyter Notebook. Save and close the gedit window. You may also need to log out then log back for the changes to take effect.  When we run pyspark in the terminal, Spark will start and a Jupyter notebook will automatically open.

If you would prefer to use a different IDE to Jupyter Notebook, the findspark Python package makes a Spark Context available by adding pyspark to sys.path at runtime.

Step 4: Getting the Reddit comments

Reddit user /u/Stuck_In_the_Matrix provided a massive dataset in their post ‘I have every publicly available Reddit comment for research. ~ 1.7 billion comments @ 250 GB compressed. Any interest in this?.  To start, we’ll download a subset of the data – one month of comments.  

Step 5: Loading the data into a Spark DataFrame

Restart the terminal, then run:

You should see Spark starting in the terminal, and a Jupyter notebook open:

With the Jupyter notebook open, select ‘New’ and open a new Python 2 notebook

DataFrames

Dataframes in Spark are inspired by dataframes in R and the Python library Pandas. In simple terms, a dataframe is a collection of rows under named columns, similar to a SQL table or an Excel sheet.

The advantage of Spark DataFrames is that like RDDs, they are distributed. This means that they can scale from kilobytes of data running on basic hardware to petabytes on a cluster of machines.

Add the following lines to the Jupyter notebook:

import pyspark 
data_file = '/home/thomas/Downloads/RC_2015-01.bz2'
raw_data = sc.textFile(data_file)

Spark can read both uncompressed and compressed files. Files compressed into the .gz format will read into a single partition, so we would lose the benefits of the parallel processing in Spark. The .bz2 format can be split across partitions, but is very CPU intensive. According to this article, uncompressed text files run fastest, however for this example we’ll just use the .bz2 file.

Schema specification and Spark SQL

Spark can infer a schema by sampling the whole dataset. For this example we’ll specify the schema. It’s useful to know how to do this if we want to generate a schema from a complex dataset – one with nested schemas for example.

Schemas are specified with a StructType, containing an Array[StructField], where each element of the array corresponds to a column in the schema. The Reddit comments have a known structure that we define as follows:

fields = [StructField("archived", BooleanType(), True), 
          StructField("author", StringType(), True), 
          StructField("author_flair_css_class", StringType(), True), 
          StructField("body", StringType(), True), 
          StructField("controversiality", LongType(), True), 
          StructField("created_utc", StringType(), True), 
          StructField("distinguished", StringType(), True), 
          StructField("downs", LongType(), True), 
          StructField("edited", StringType(), True), 
          StructField("gilded", LongType(), True), 
          StructField("id", StringType(), True), 
          StructField("link_id", StringType(), True), 
          StructField("name", StringType(), True), 
          StructField("parent_id", StringType(), True), 
          StructField("retrieved_on", LongType(), True), 
          StructField("score", LongType(), True), 
          StructField("score_hidden", BooleanType(), True), 
          StructField("subreddit", StringType(), True), 
          StructField("subreddit_id", StringType(), True), 
          StructField("ups", LongType(), True)] 
schema = StructType(fields)

Once the raw data and schema has been specified, we can build our DataFrame with the sql.context.read.json function. To check that the data has been loaded correctly, we’ll query the top 20 rows of the author column.

df = sqlContext.read.json(raw_data, schema)
df.select("author").show()

The sql function allows us to run SQL queries and return the result as a DataFrame. We’ll use Spark SQL to get the top 5 most up-voted comment authors in the month.

df_most_ups = df.select("author", "ups")
df_most_ups.registerTempTable("most_ups")
most_ups_query = sqlContext.sql(
"""
SELECT author
,sum(ups) as sum_ups 
FROM most_ups 
GROUP BY author 
ORDER BY sum_ups desc
""")
most_ups_query.take(5)


 

In the next post, we’ll look at running Spark on AWS, with the full Reddit comment dataset.