Guide to Using HDFS and Spark
In addition to other resources made available to Phd students at Northeastern, the security group has access to a cluster of machines specifically designed to run compute-intensive tasks on large datasets. Our cluster uses Hadoop HDFS as the storage layer for large datasets, and Apache Spark as the programming and execution environment for computing over these large datasets. This guide will briefly explain what these software platforms are and how to use them.
Please note that this Spark cluster is not a publicly available resource. This cluster is run by and reserved for PhD students working with Christo Wilson, Alan Mislove, and Dave Choffnes. If you are a Northeastern PhD student, and you have a compelling need to access these resources, contact Professor Wilson and we may be able to accommodate your request.
What are HDFS and Spark
HDFS is a distributed file system designed to store large files spread across multiple physical machines and hard drives. Spark is a tool for running distributed computations over large datasets. Spark is a successor to the popular Hadoop MapReduce computation framework. Together, Spark and HDFS offer powerful capabilities for writing simple code that can quickly compute over large amounts of data in parallel.
From a users perspective, HDFS looks like a typical Unix file system. There is a root directory, users have home directories under /user, etc. However, behind the scenes all files stored in HDFS are split apart and spread out over multiple physical machines and hard drives. As a user, these details are transparent; you don't need to know how your files are broken apart or where they are stored.
Spark is two things: 1) a set of programming tools for writing parallelizable, data crunching code, and 2) a framework for executing this code in parallel across many physical machines and CPUs. Spark supports code written in Java, Scala, and Python. Any library that can be used by these languages can also be used in a Spark "job". Furthermore, Spark comes with several libraries that implement parallel machine learning, graph processing, SQL-like querying of data, and date stream processing.
We have installed and configured HDFS and Spark on a cluster of machines known as the Decepticons. The Decepticons cluster is currently composed of 20 Dell R530 servers, plus two additional servers that serve as the Master and Secondary Master. All 20 machines are located on a private, 10 Gbps Ethernet network that is only accessible from achtung02-27.ccs.neu.edu. Once you have sshed into one of the Achtungs, the Decepticons are accessible as [name].ccs.neu.edu in the 192.168.5.* IP range.
Each server in the cluster is provisioned with either 32 or 40 CPU cores, 192 GB of RAM, 24 TB of hard drive space, and 10 Gbps Ethernet. In total, the cluster has 704 CPUs. The servers are running Ubuntu 20.04, Hadoop 3.2.1, and Spark 3.0.0. Common Python modules have been pre-installed. Should you require additional software tools, contact Prof. Wilson.
Setting Up Your Environment
In order to use HDFS and Spark, you first need to configure your environment so that you have access to the required tools. The easiest way to do this is to modify the .bashrc configuration file in your home directory on Achtung. Specifically, you should add the following two lines to your .bashrc file:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export HADOOP_HOME=/usr/local/hadoop export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/spark/bin export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/After modifying .bashrc, you should log out of Achtung and then back in to make the changes take effect. At this point, you should have access to the hdfs and spark-submit programs.
How to Have Good Manners
Before discussing how to use the Spark cluster, we first need to discuss how to have good manners when using these resources. The fundamental point to keep in mind is that the Decepticons cluster is a shared resource, meaning that if you abuse it, you will have a direct, negative impact on your colleagues. Thus, it is important to plan your usage of the cluster carefully and conscientiously.
First: disk space. The roughly 60 TB of usable space available in HDFS is shared across all users of the cluster. The administrators reserve the right to delete your data from HDFS at any time if we find you are taking up too much space, and we are not responsible if you lose critical data due to negligence or hardware failures.
Second: CPU time. We have configured Spark to run jobs in FIFO order. This means that if you submit a job that takes hours to complete, nobody else can use the cluster until your job is finished. Thus, please be mindful when submitting large jobs to the cluster. If other students are working on a deadline and need cluster resources, be a good coworker and wait until they are done before starting large jobs. The administrators reserve the right to kill any job at any time, should the need arise.
In order to use Spark, you need to have a home directory on HDFS. Just because you can login to Achtung, does not mean you have a home directory in HDFS. If you would like to use Spark and you don't have a home directory in HDFS, mail Prof. Wilson and a home directory will be created for you.
Most Spark jobs will be doing computations over large datasets. Thus, before you run a Spark job, the data should be moved onto the cluster's HDFS storage. To access HDFS, use the hdfs tool provided by Hadoop. We have preconfigured the hdfs tool so that it will automatically connect to the HDFS storage offered by the Decepticons cluster.
To see what files are stored in your user directory on HDFS, you can use the following command:
[cbw@achtung02 ~] hdfs dfs -ls . Found 3 items drwxr-xr-x - cbw cbw 0 2014-09-03 15:35 .sparkStaging -rw-r--r-- 2 cbw cbw 4728744788 2014-09-03 11:24 a1_tweets.txt drwxr-xr-x - cbw cbw 0 2014-09-03 23:28 a1_word_counts
In this case, user cbw has three files in their home directory. Notice how the hdfs utility takes -ls as a command line parameter; this tells it to list the files in an HDFS directory, just like the ls command lists files in a local directory.
Just like any other POSIX filesystem, the root directory in HDFS is /. Similarly, as shown above, all files and directories have 1) an owner:group, and 2) read/write/execute permissions for the user, group, and everybody. The following commands show examples of the root directory and /user directory on the HDFS storage:
[cbw@achtung02 ~] hdfs dfs -ls / Found 3 items drwxrwxrwx - hduser supergroup 0 2014-09-03 23:27 /spark_logs drwxrwxrwx - hduser supergroup 0 2014-09-02 18:25 /tmp drwxr-xr-x - hduser supergroup 0 2014-09-03 13:17 /user [cbw@achtung02 ~] hdfs dfs -ls /user Found 3 items drwxr-xr-x - amislove amislove 0 2014-09-03 11:12 /user/amislove drwxr-xr-x - cbw cbw 0 2014-09-03 23:27 /user/cbw drwxr-xr-x - choffnes choffnes 0 2014-09-03 11:12 /user/choffnes
In addition to -ls, the hdfs utility also includes versions of many common Unix file utilities. These commands include:
- -mv and -cp: Move and copy files on HDFS storage
- -mkdir: Make a directory on HDFS
- -rm and -rm -r: Delete files and directories from HDFS
- -chown and -chmod: Change owner:group and permissions of files on HDFS
In addition to these standard commands, the hdfs utility can also upload files from local storage into HDFS, and download files from HDFS into local storage:
- -put: Takes local file(s) and uploads them to the specified location on HDFS
- -get: Downloads file(s) from HDFS into the specified location on local storage
Other commands are also available; run hdfs dfs with no parameters to see a list of possible commands. Note that file name parameters to hdfs may contain wildcards (*) just like parameters on the Linux command line. Also note that the same file permission rules that apply on Linux also apply to files on HDFS: by default, you have total ownership of files in /user/[your name] on HDFS, but you only have read access to other people's files and the root directories. You can give people greater access to your files by changing their ownership, or by changing their permissions.
Finally, a note on data formats. When files are uploaded to HDFS, they are automatically split up into smaller pieces and distributed throughout the cluster. By default, HDFS splits files based on line breaks ("\n"). Thus, you should be able to upload any data that you have stored in simple text files to HDFS.
However, HDFS is not well suited for binary data files. Since binary files are not organized around line breaks, HDFS ends up splitting binary data arbitrarily, which will probably ruin the data. The one exception to this rule is bzip2 compressed files: HDFS does support splitting bzip2 compressed files in a way that preserves the integrity of the compressed data. In fact, you can directly load bzip2 compressed data into Spark jobs, and the framework will automatically handle decompressing the data on-the-fly.
Spark is a framework for writing parallel data processing code and running it across a cluster of machines. Spark supports code written in Java, Scala, and Python. This tutorial will use examples written in Python; online resources are available for writing Spark code in Java and Scala. If you are interested in how we have configured Spark, you can examine the file /usr/local/spark/conf/spark-defaults.conf on Achtung.
Note that by default, all of the Achtung and Decepticon machines are configured to use Python 3.8. Spark is also configured to use Python 3.8. Python 2 is deprecated, and we urge all Spark users to adopt Python 3 as their baseline for Spark scripts.
Users can access the Decepticons Spark cluster from a special Spark command line, or by writing standalone programs/scripts. The Scala-based and Python-based Spark command lines are very similar to the command line offered by Python, and can be accessed using the following commands:
[cbw@achtung02 ~] spark-shell [cbw@achtung02 ~] pyspark
In both cases, you should give the shell a minute or so to connect to the cluster and setup the necessary remote processes. Once you receive a command-prompt, Spark can be accessed by using the predefined sc (Spark Context) object. We have pre-configured the Spark command line utilities to automatically connect to the Decepticons cluster and use 32 CPU cores/100 GB RAM per machine. Thus, you do not need to specify the --master when you run the command line, unless you want to test your code using a local instantiation of Spark (i.e. run your code on Achtung, instead of on the Decepticons). More information about the Spark command line is available from the Spark documentation.
The other way to access the Spark cluster is by writing your own code and submitting it to the Decepticons cluster. This is done using the following command:
[cbw@achtung02 ~] spark-submit [your_program]
By default, your job will be submitted to the Decepticons cluster (so you do not need to specify a --master) and will use all available CPU and RAM resources (so you do not need to specify the number of executors, memory per executor, cores per worker, etc.).
Example Spark Code
Spark's programming model is centered around Resilient Distributed Datasets (RDDs). An RDD is simply a bunch of data that your program will compute over. RDDs can be hard-coded, generated dynamically in-memory, loaded from a local file, or loaded from HDFS. The following example snippet of Python code gives four examples of RDDs:
from pyspark import SparkContext import sys if __name__ == "__main__": sc = SparkContext(appName="RDD_Examples") my_list = sc.parallelize([1, 2, 3, 4, 5]) my_other_list = sc.parallelize([x for x in range(0, int(sys.argv))]) tweets = sc.textFile("hdfs:///user/cbw/a1_tweets.txt") # load data from HDFS words = sc.textFile("hdfs://megatron.ccs.neu.edu/user/cbw/words*.txt") # load data from HDFS
Every Spark script includes a SparkContext object. This object connects to the Decepticons cluster and allows your code to be parallelized. In this example, the sc object is being used to create four RDDs, which can then be computed over. The first two RDDs are created in-memory, based on a static list and a dynamically generated list. The third and fourth example loads data from HDFS. Note the two different URL formats for loading data from HDFS: the former begins with a triple slash, and uses the default settings that we have preconfigured; the latter explicitly sets the URL. Both URL formats are equivalent..
There are several subtle things to notice about the last two examples shown in the above code. First, as shown in the fourth example, wildcards (*) are allowed. This will load data from all files that match the given pattern. Second, in both file loading cases, the data is treated as text and is split apart based on line breaks ("\n"). bzip2 compressed archives can also be loaded from HDFS.
Note that you must place your Spark initialization and RDD manipulation code inside Python's __name__ == "__main__" block. When you run a Spark job, your script gets copied to and executed on all of the Decepticons. Any code that is outside the "__main__" block will be executed on all 20 machines. Clearly, you don't want each machine to create a new SparkContext, attempt to open RDDs, initialize data processing, etc. You only want those steps to occur on the driver, which is Achtung. Thus, take care to protect sensitive regions of your code by placing them in "__main__", to ensure that the given code will only execute on Achtung.
Once your code has loaded one or more RDDs, Spark provides various functions for manipulating, transforming, filtering, and aggregating the data. The following example does a filtered word-count on a file of JSON-encoded tweets and saves the results to HDFS. Note that the output of a Spark job is a directory full of partial results, not a single file containing all results.
import json, operator from pyspark import SparkContext if __name__ == "__main__": def json_to_words(tweet): try: d = json.loads(tweet) except: return  if 'text' in d: return d['text'].split() return  sc = SparkContext(appName="TweetWordCount") tweets = sc.textFile("hdfs://megatron.ccs.neu.edu/user/cbw/a1_tweets.txt") counts = tweets.flatMap(json_to_words) \ .map(lambda word: (word, 1)) \ .reduceByKey(operator.add) \ .filter(lambda pair: pair >= 10) counts.saveAsTextFile("hdfs://megatron.ccs.neu.edu/user/cbw/a1_word_counts") sc.stop()
As the example shows, data processing proceeds as a serious of steps, where each step performs some action on the data. The actions themselves can be specified as using custom functions (e.g. json_to_words above), Python lambda functions, or common operator lambdas from Python's standard library (e.g. operator.add above). In the previous example, the different data processing steps are:
- flatMap: Takes a single object and transforms it into a list of objects
- map: Takes a single object and transforms it into another single object
- reduce: Combines objects together
- filter: Removes objects based on a given true/false condition
Spark offers a large number of data processing functions, which you can read about in the Spark programming guide. The Spark documentation also provides a number of examples in various languages. Spark also offers additional functionality that you can read about in the Spark programming guide. Finally, Spark comes with several higher-level data processing libraries, including:
- Spark SQL: a language for running distributed, SQL-like data queries
- Spark Streaming: functions for working with streaming (live) data
- Spark MLlib: distributed machine learning algorithms
- Spark GraphX: functions for graph processing
Monitoring Your Jobs and Debugging
Because Spark is a distributed computation platform, understanding the execution of your Spark jobs is more complicated than debugging a single-threaded, local process. When you run a Spark job or open a Spark command-line, it automatically spawns a driver that runs on Achtung, as well as executors that run your program in parallel on the Decepticon nodes. You can override this default behavior and execute your job locally, on Achtung, by changing the Spark configuration to use "local" as the master, like this:
conf = SparkConf().setMaster("local").setAppName("") sc = SparkContext(conf = conf)Note that this functionality is meant to help you debug your code: only load a small subset of your data when you debug locally on Achtung!
The driver on Achtung automatically spawns a webserver that you can visit to see stats about your program. This webserver runs on whichever Achtung is running the driver; for example, if you run the driver on achtung02, then the URL will be http://achtung02.ccs.neu.edu:4040/. However, note that the Achtungs are hidden behind achtung-login, so you will need to use an SSH tunnel to browse sites running on achtung02-11. Furthermore, this web server is only available as long as your driver is running; if you program closes, crashes, or is killed, the web server will also go offline. Finally, note that if multiple users are running Spark jobs simultaneously, your web server may spawn on a different port in the 404* range. The exact address of your driver's web server is displayed to the console when you run Spark jobs.
There is a command line utility that allows you to check on the status of the Spark Cluster, including how many machines are alive, what jobs are currently running or waiting to run, and historical jobs that have completed.
[cbw@achtung02 ~] spark-statusThis utility has several command line options; to learn more pass in the --help parameter.
In addition to the web server associated with your program's driver, there are also three web servers that are always available that report stats on the Decepticons cluster. Two of these servers are running on megatron.ccs.neu.edu, which is not accessible from the public internet; you must setup a tunnel through Achtung in order to view these sites. One way to do this is to setup an ssh tunnel (ssh -D [port] achtung-login.ccs.neu.edu) and then configure your browser to use localhost:[port] as a SOCKS proxy.
Once you have a tunnel through Achtung, the following three sites become accessible:
- Ganglia: This site monitors the overall health of the Achtungs and the Decepticons cluster, including overall CPU, RAM, and disk utilization, as well as utilization per machine.
- Spark Master: This site lists all the Spark jobs that are currently running or have run recently. All of the standard output/error logs from Spark jobs are available here, which are very useful for debugging your code.
- HDFS Health: This site gives statistics about HDFS, including how much space is being used, lists of files, etc.
Managing Memory Usage in Spark
One of the most challenging things about writing and optimizing Spark code is managing Spark's use of memory. By default, Spark places the results of RDD computations into an in-memory cache. As the cache fills up, Spark uses an LRU policy to evict old data. This default policy works fine for computations on relatively small datasets; however, for large datasets, or scripts with deep data pipelines using heavyweight operations (e.g. flatMap(), union(), cogroup(), or cartesian()), Spark can quickly run out of memory. If Spark runs out of memory and discards useful data, it will attempt to regenerate that data on demand by re-executing portions of your code. In the Spark monitoring websites, this behavior often manifests itself as failed tasks that are repeatedly re-executed. If you see this behavior, it means your code is running out of cache memory and will probably never complete.
The way to solve cache memory related issues is to explicitly modify the persistence of RDDs. Suppose you have an RDD named rdd. By default, the persistence level is set to rdd.persist(StorageLevel.MEMORY_ONLY). You can modify the persistence level of this RDD by changing it to either rdd.persist(StorageLevel.MEMORY_AND_DISK) or rdd.persist(StorageLevel.DISK_ONLY). Obviously, DISK_ONLY is the slowest option since it forces all I/O to be disk-bound. However, for extremely large RDDs this may be the best option overall: by forcing a large RDD to be placed on the disk, this frees up cache memory to be used for other, smaller RDDs, rather than forcing the small and large RDDs to compete for space in the LRU cache.
Lastly, note that you can force Spark to discard an RDD by calling rdd.unpersist(). This is useful when you know you will not be using a particular RDD any more.
A Note on User Defined Fucntions (UDFs) in Spark
Recent versions of Spark have begun to move away from the low-level RDD interface to a higher-level, more Pandas-style interface that defines dataframes and operations over them. Many of these operations are like SQL commands, and just like SQL, the programmer can define new functions that can then be invoked over the dataframes.
While this functionality is quite powerful, it's also a bit finnicky to get working. If you plan on using UDFs, note that you should include the following in your script to avoid receiving strange exceptions:
import findspark findspark.os.environ["PYSPARK_PYTHON"]="python" spark = SparkSession.builder.appName("YOUR APPNAME").config("PYSPARK_PYTHON","python").getOrCreate()This snippet helps Spark find the correct Python executable to run your UDF. Hat tip to Kenny Joseph and Nir Grinberg for figuring this out.
Python Utilities for HDFS
Once you get confortable with Spark, you will quickly realize that you need to spend a lot of time managing files in HDFS. For example, whenever you run a Spark job, the results get placed in a folder in HDFS. However, if that folder already exists your job will fail (i.e. Spark will not overwrite existing directories or files). Thus, it may be convenient for your Spark scripts to automatically remove existing directories from HDFS. Furthermore, once your Spark job completes, you will need to copy the results back to Achtung; it is convenient if this task can be automated as part of your Spark script.
To simplify managing HDFS, we have installed the Python pydoop module on Achtung and the cluster. Pydoop gives you programmatic access to HDFS; anything you can do with the hdfs dfs command line you can do with pydoop. The following code is an example Spark script that uses pydoop to 1) clear existing results out of HDFS before the job is run, and 2) copy the results to local storage after the job completes. More information on pydoop HDFS methods can be found here.
from pyspark import SparkContext import hdfs import shutil, os if __name__ == "__main__": client = hdfs.client.InsecureClient('http://megatron:9870') # Remove old results from HDFS try: client.delete('/user/cbw/join_results', recursive=True) except: pass # Remove old results from local storage try: shutil.rmtree('join_results') except: pass sc = SparkContext(appName="JoinTest") list1 = sc.parallelize([x for x in range(0,101)]) list2 = sc.parallelize([str(x) for x in range(1000,1101)]) list1.cartesian(list2).saveAsTextFile("hdfs://megatron.ccs.neu.edu/user/cbw/join_results") sc.stop() # Copy the results to local storage client.download('/user/cbw/join_results', 'join_results') # (remote path, local path)
Useful Python TricksIt's very common to use Python modules like re and json in Spark jobs, but these modules are not optimized for speed. There are drop-in replacement modules (re2 and ujson) that have the same APIs, but are natively compiled and thus much, much faster than the originals. For example, the following small changes can dramatically increase the speed of your jobs if you use re or json:
import ujson as json import re2 as re