Skip to content

Spark AWS Exercise2

Jason Baldridge edited this page Apr 8, 2013 · 8 revisions

Introduction

The previous exercise provides an end-to-end guide to interactively computing word counts on a file on HDFS using Spark, but it doesn't cover writing your own precompiled code or working files that are actually large. This exercise takes us to the next step: running standalone jobs on an EC2 cluster for larger files.

Note: I will use the address for a cluster I obtained for testing this exercise out. You should of course substitute your cluster's address as appropriate.

Launch a cluster and compute pi on it

See the previous exercise as a reminder for how to launch a cluster. This time, request four worker nodes.

Before, we only ran Spark using the Spark interactive shell. However, we generally need to run standalone jobs on the cluster from the command line. To start this, let's run the example spark.examples.SparkPi, which computes the value of pi.

Login to your cluster, go to the spark directory, and use the run command to run the main method of SparkPi.

[root@ip-10-169-18-49 spark]# ./run spark.examples.SparkPi
Usage: SparkPi <master> [<slices>]

The first (and only required) argument is the URL of the master node. Spark stores this in the file /root/spark-ec2/cluster-url. Cat that to get the URL.

[root@ip-10-159-62-80 spark]# cat /root/spark-ec2/cluster-url 
spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077

We can supply that as the argument to SparkPi.

[root@ip-10-159-62-80 spark]# ./run spark.examples.SparkPi spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077

You'll see lots of output, and if everything went well, you'll have an estimate of the value of pi as one of the last lines of output.

Run standalone jobs

Now let's compile our own code and run it on the cluster. Let's use the code in the spark-tutorial repository. Clone the repository, then build the code.

[root@ip-10-169-18-49 ~]# git clone git://github.com/jasonbaldridge/spark-tutorial.git
[root@ip-10-169-18-49 spark-tutorial]# cd spark-tutorial
[root@ip-10-169-18-49 spark-tutorial]# ./build package

We need to put a file on HDFS to work with.

root@ip-10-169-18-49 rawd]# ~/ephemeral-hdfs/bin/hadoop fs -mkdir input
[root@ip-10-169-18-49 spark-tutorial]# ~/ephemeral-hdfs/bin/hadoop fs -copyFromLocal ~/spark/README.md input

Now boot up SBT and run the main method of the word count program.

root@ip-10-169-18-49 spark-tutorial]# ./build 

root@ip-10-159-62-80 spark-tutorial]# ./build 
> run-main sparktutorial.WordCount spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077 /root/spark target/scala-2.9.2/spark-tutorial_2.9.2-0.1.jar hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/input/README.md hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/readme-counts

Check the Spark web UI to see how the job and nodes are doing.

http://ec2-54-242-12-195.compute-1.amazonaws.com:8080/

You'll see that the WordCount program has been run in the "Completed Applications" part of the page, though it was such a trivial job you won't have seen the worker nodes getting fired up. We need more data to do that, so let's do that next.

Create EBS Volume with Wikipedia WEX data and attach it to the cluster

Amazon has public datasets available. This matters because it costs money to upload data to Amazon storage, to keep it there, to transfer it, etc. Even with free files, you need to make sure you are running instances in the same region as where the datasets are held.

Here, we'll work with Freebase's Wikipedia Extraction (WEX) dataset, which has all the articles from Wikipedia from mid-2009.

To get access to the WEX data, we need to create an EBS volume with the snapshot and attach it to our cluster. This is actually quite straightforward to do. To create the volume, go to the volumes screen, and select the "Create Volume" button at the top of the screen. Fill out the information:

  • Volume type: select "Standard"
  • Size: The WEX data requires 66GB, but grab 100GB for this volume so that there is some space to create and work with subsets of the data.
  • Availability zone: Use the same region as where your cluster is, e.g. if your cluster is on us-east-1b, then the EBS volume must be on us-east-1b (and not us-east-1a, etc) in order to be attached to your cluster.
  • Snapshot: choose snap-1781757e WEX (Linux)

The EBS volume we created could be attached to any of several EC2 instances you might have running. To attach a volume means is that you are making the data on the volume available on the file system for an EC2 instance. We can then do all the standard stuff one does with files, including putting them from the node to HDFS.

So, let's attach the volume we created above to our master node. Look at your instances page and find which node is the master node of your cluster---if you look at the "Security Groups" column, it will have the label SparkTutorial-master. Next, back in the volumes screen, right click on the EBS volume that you created with the WEX snapshot, and then choose "Attach Volume" and select the instance label of the master node from the drop down menu. This makes the volume available on the master node, usually as device /dev/xvdf (but look at the messages on the AWS interface to see if it might be another device). We can now mount that device on the file system as follows.

[root@ip-10-169-18-49 dev]# mkdir /mnt/wex
[root@ip-10-169-18-49 dev]# mount /dev/xvdf /mnt/wex

That's all you need for now, but if you want more information, there are many more details in this AWS article about EBS.

Attach the volume to the master node and put data on HDFS

Now, go to the directory to inspect the data.

[root@ip-10-169-18-49 ~]# cd /mnt/wex/
[root@ip-10-169-18-49 wex]# ls
fixd  freebase_wex.icss.yaml  log  lost+found  pkgd  rawd  README  README-credits  README-dirs  README-license  ripd  tmp

The file we are most interested in is /mnt/wex/rawd/freebase-wex-2009-01-12-articles.tsv, which has all the pages of Wikipedia from 2009, one per line. Each line has five fields, separated by tabs:

  • page id
  • page title
  • last date the page was modified
  • XML version of the page
  • plain text version of the page

For more information, see the full documentation for WEX.

Here are the first three columns of the head of the file.

[root@ip-10-169-18-49 rawd]# head freebase-wex-2009-01-12-articles.tsv | awk -F "\\t" '{print $1"\t"$2"\t"$3}'
12	Anarchism	2008-12-30 06:23:05
25	Autism	2008-12-24 20:41:05
39	Albedo	2008-12-29 18:19:09
290	A	2008-12-27 04:33:16
303	Alabama	2008-12-29 08:15:47
305	Achilles	2008-12-30 06:18:01
307	Abraham Lincoln	2008-12-28 20:18:23
308	Aristotle	2008-12-29 23:54:48
309	An American in Paris	2008-09-27 19:29:28
324	Academy Award	2008-12-28 17:50:43

The XML and plain text of the articles are obviously quite large, so I won't show them here.

So, even though our cluster can now manage the full data set, let's cut it down a bit to allow the exercise to move more quickly. A side note: it isn't ideal to be working with plain text since it is uncompressed and the larger the file, the longer it takes to transfer from the local filesystem to HDFS. But, for present purposes, this makes other things easy, so we'll stick with plain text.

To grab the first 50,000 articles, and getting just the plain text and not the XML, do the following.

root@ip-10-169-18-49 rawd]# head -50000 freebase-wex-2009-01-12-articles.tsv | awk -F "\t" '{print $1"\t"$2"\t"$3"\t"$5}' > subset.tsv

This takes about 1-2 minutes and creates a file that is 363MB. Now put it on HDFS.

[root@ip-10-169-18-49 rawd]# ~/ephemeral-hdfs/bin/hadoop fs -copyFromLocal subset.tsv input

You now have data---time to play!

First, let's use the Spark shell on the cluster. Previously, we were just using Spark on the local machine: though it had access to HDFS, it did not spread the tasks out to the workers and instead computed everything on the master node. To kick the Spark shell off on the cluster, call it as follows:

[root@ip-10-169-18-49 spark]# MASTER=spark://ec2-54-242-12-195.compute-1.amazonaws.com:7077 ./spark-shell

Once everything is booted up, go into paste mode and put in the following code:

scala> :paste // Entering paste mode (ctrl-D to finish)

val wex = sc.textFile("hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/input/subset.tsv")
val wexCounts = wex
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
wexCounts.saveAsTextFile("hdfs://ec2-54-242-12-195.compute-1.amazonaws.com:9000/user/root/wex-counts")

// Exiting paste mode, now interpreting.

The output is now in three separate files.

root@ip-10-169-18-49 spark]# ~/ephemeral-hdfs/bin/hadoop fs -ls wex-counts
Found 3 items
-rw-r--r--   3 root supergroup   13532440 2013-04-08 17:19 /user/root/wex-counts/part-00000
-rw-r--r--   3 root supergroup   13529297 2013-04-08 17:19 /user/root/wex-counts/part-00001
-rw-r--r--   3 root supergroup   13519273 2013-04-08 17:19 /user/root/wex-counts/part-00002

As an exercise, get these as a single file on the local machine with a single command.

Exercise: I was sloppy with how I computed word counts, given the format of the WEX file. Fix this and recompute the counts.

Exercise: Adapt the WordCount.scala code so that it works with the WEX data, run it as a stand-alone program to get the word counts.

Exercise: Work with the Google Books Ngrams data. This can be done either via EBS or directly from the Google Books Ngrams S3 bucket (see the AWS page for the Ngrams data). Write a program that computes the histogram of counts per year for a given word.

Exercise: Use the XML version of the pages and create a link graph between pages based on the links and the page titles. (E.g. such that you could run PageRank.)

Wrap up

Make sure to delete unneeded files and volumes, which otherwise will cost you. Detach the EBS volume you created for the WEX snapshot, and delete it.

Clone this wiki locally