SparkRをEC2上で動かして分散処理してみる
高柳 慎一
こんにちは!美味しいコーヒーを飲むために、毎朝早くにデスクでコーヒーミル回してます、アナリティクスチームの高柳です。
アナリティクスチームでは、じゃらんnetやホットペッパービューティーをはじめとしたリクルートライフスタイルのサービスに対して、基礎集計やレポーティング、また、データマイニング(データ分析)を活用し、高速にサービスを改善していくというミッションを担っています。
本記事では、データ分析環境としてのApache Spark、特にver 1.4から利用可能になったSparkRを、当チームのAWS上の分析環境に導入検討していたので、その辺について書きたいと思います。
SparkRは、まだリリースから日が浅いことから、日本語の記事が相当に少ないので、この記事がみなさんの"Happy SparkR ライフ"のお役に立つことを願っております。
Apache Spark導入の背景
サービスを改善するために、俗にいうアドホックな分析をするケースが多々あるのですが、その際、分析・レポーティング作業のPDCAサイクルを、より高速により多くのデータで行うための環境が欲しいと考えていました。
Sparkでは、分散処理技術に加えて、インメモリのキャッシング技術を実装することで高速処理が可能になっており、さらに、メモリ上で同じデータに対して異なるデータ分析/機械学習の手法適用を繰り返し処理したり、データの縦・横持ち変換や欠損の処理などのデータの前処理を繰り返し行う用途にも向いているということのなので、導入検討を開始しました。
最近アナウンスがあった、
【AWS発表】Elastic MapReduceリリース4.0.0がアプリケーションのアップデートと共に利用可能に
を見ると、AWSコンソールのAmazon EMRからSparkのクラスタをGUI経由で簡単に起動できるようになっており、主にサイト改善への施策提言をデータ・ドリブン(データ側から考えるという意味で、ボトムアップ・アプローチとも呼んでいます)な視点から行うデータアナリストの同僚にも簡単にSpark上での分析環境(Spark Sharkの使用を想定)を展開できそうな点も要注目なポイントでした。
始める前に
本記事では、Apache Sparkの環境をAWSのEC2上に構築するため、あらかじめAWSのアカウントを作成しておくほか、環境変数としてアクセスキー・シークレットアクセスキーを設定しておく必要があります。
1
2
export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY>
(<>にご自身のキーを入力してください)
ここでは、AWSのEC2上に一台サーバー(OS: Amazon Linux)を立て、そこからSparkが使用できる環境を構築します。以下の操作はそのEC2インスタンス上から実行しています。
SparkをEC2上に導入する
まず、Sparkの公式サイトからPre-buildされた圧縮ファイルを取得し、それを解凍&解凍したディレクトリに移動します。
1
2
3
4
5
6
# ミラーサイトからsparkをダウンロード
wget http://ftp.kddilabs.jp/infosystems/apache/spark/spark-1.4.1/spark-1.4.1-bin-without-hadoop.tgz
# 解凍&移動
tar zxvf spark-1.4.1-bin-without-hadoop
cd spark-1.4.1-bin-without-hadoop/ec2
次に、AWSで起動するために以下のようにオプションを設定したうえで、起動スクリプトを実行します。みなさんのAWS環境で試される際には、<>内の値を自身の環境・設定に応じて適宜読み替えてください。
1
./spark-ec2 --key-pair=<key-pair> --identity-file=<identity-file> --slaves 4 --vpc-id=<vcp-id> --subnet-id=<subnet-id> --region=ap-northeast-1 --zone=ap-northeast-1c --private-ips --delete-groups --additional-security-group=<sg-id> launch MySparkR
この後、master/slaveとなるEC2のプロビジョニングが実行されます。プロビジョニングには10~20分程度かかるので、美味しいコーヒーでも飲みながら気長に待ちましょう。
ここで設定しているオプションについて、いくつか言及すると、
- アナリティクスチームのAWS環境はプライベートサブネット上に構築しているため、
--private-ips
を使用(VPC、subnetも該当するidを設定) - デフォルトの設定ではmaster nodeにssh出来るようにならなかったため、
--additional-security-group
でssh出来るように設定したセキュリティグループを追加 - リージョンはap-northeast-1
- slave nodeは4つ作成
MySparkR
という名前でクラスタを作成
という設定をしています。
master nodeへのログイン
master nodeへログインする際には、同様にspark-ec2
を使い、
1
2
# master nodeにログイン
./spark-ec2 -k <key-pair> -i <identity-file> --region=ap-northeast-1 --private-ips login MySparkR
とします。
データを用意する
せっかくAWSを使っているので、AWSのS3に分析・集計対象としたいファイルを上げておく……のが通常運用としては良さそうですが、ここでは、
Data Expo 2009でコンテストの題材とされた米国のフライトデータ
をデータとして使用しました。(2006年~2008年のデータを使っています。合計は3GB程度とSparkで扱うには小さいです。)
これを適当にダウンロード&解凍させ、1ファイルに繋げたのち、/root/data/airline.csv
というファイル名で保存しておきました。
1
2
3
4
5
6
7
8
# 結果を一部だけ抜粋
~$head airline.csv
Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2006,1,11,3,743,745,1024,1018,US,343,N657AW,281,273,223,6,-2,ATL,PHX,1587,45,13,0,,0,0,0,0,0,0
2006,1,11,3,1053,1053,1313,1318,US,613,N834AW,260,265,214,-5,0,ATL,PHX,1587,27,19,0,,0,0,0,0,0,0
2006,1,11,3,1915,1915,2110,2133,US,617,N605AW,235,258,220,-23,0,ATL,PHX,1587,4,11,0,,0,0,0,0,0,0
2006,1,11,3,1753,1755,1925,1933,US,300,N312AW,152,158,126,-8,-2,AUS,PHX,872,16,10,0,,0,0,0,0,0,0
2006,1,11,3,824,832,1015,1015,US,765,N309AW,171,163,132,0,-8,AUS,PHX,872,27,12,0,,0,0,0,0,0,0
そして、作成したこのファイルをHDFSにロードしておきます。
1
2
3
4
5
6
7
8
9
# HDFS上にディレクトリを作成
cd /root/ephemeral-hdfs/bin
./hadoop fs -mkdir /root/data/
# ファイルをHDFSへとロード(元のデータはmasterの/root/dataにいれておく)
./hadoop fs -put /root/data/airline.csv /root/data/airline.csv
# verify that is worked by printing the file contents
./hadoop fs -tail /root/data/airline.csv
SparkRを起動する
さて、SparkRを起動してみましょう。
1
2
3
# sparkRの起動
cd /root/spark/bin
./sparkR --packages com.databricks:spark-csv_2.10:1.0.3
引数の--packages com.databricks:spark-csv_2.10:1.0.3
は、後述するcsvファイルを読み込む際に必要になるライブラリを追加するオプションです。
これで、slaveへの分散も含めてよしなにやってくれるSparkR環境が立ち上がりました!
あとは、ガツンと分析・集計用のコードを実行させるだけです。
SparkRで処理を記述する
ここでは、作成したエアライン(Airline)データに対して、
- ジョン・F・ケネディ国際空港(JFK)発の飛行機の、目的地別の発着数
を計算してみましょう。(magrittrパッケージを使っているのは、趣味の問題であり、必須ではありません。)
1
2
3
4
5
6
7
8
9
10
11
12
13
# magrittrパッケージのインストール
install.packages("magrittr")
library(magrittr)
# HDFSからcsvファイルをロード
airline <- read.df(sqlContext, "/root/data/airline.csv", "com.databricks.spark.csv", header="true")
# ジョン・F・ケネディ国際空港(JFK)発の飛行機の目的地別の発着数(の一部)
airline %>%
filter(airline$Origin == "JFK") %>%
group_by(airline$Dest) %>%
agg(count=n(airline$Dest)) %>%
head
結果は以下のようになりました。このように簡単に集計用のコードを実行することができます。
1
2
3
4
5
6
7
8
Dest count
1 IAH 3202
2 TUS 592
3 STL 2281
4 CMH 3616
5 MSP 3421
6 STT 1128
...(以下省略)...
SparkRを終了する
SparkRからは、通常、Rを終了するのと同様にquit()
でOKです。その後、master nodeからexit
し、以下のコマンドを打てばSparkに関連するインスタンスを終了、または削除できます。
1
2
3
4
5
# 起動終了
./spark-ec2 --region=ap-northeast-1 stop MySparkR
# 起動終了(インスタンスごと削除)
./spark-ec2 --region=ap-northeast-1 destroy MySparkR
おまけ:SparkRextで"モダン"なRの書き方を体験する
普段、分析でR、特にデータハンドリングにdplyrパッケージを使われている方は、このSparkRの
1
2
3
4
5
airline %>%
filter(airline$Origin == "JFK") %>%
group_by(airline$Dest) %>%
agg(count=n(airline$Dest)) %>%
head
というコードのairline$Origin, airline$Dest
の部分がイケてないなと思われるかと思います。ここをより美しく書き換えるために、Rに搭載されているの"遅延評価"の機能を活用することができます。
SparkRに対して、この遅延評価を活用し、よりスマートにコードを書くことができるようにSparkRを拡張するパッケージがGithub上で開発されています。
このパッケージを用いることで、先ほどのコードは、
1
2
3
4
5
airline %>%
filter(Origin == "JFK") %>%
group_by(Dest) %>%
summarize(count=n(Dest)) %>%
head
と書くことができ、より美しいモダンなRの書き方で分散処理を実行することができます!
……が、Sparkに組み込まれているRが3.1.1であり、dplyrに非対応(dplyrは3.1.2から使用可能!)であるため、SparkRextをEC2で簡単に分散環境で実行するのはもう少し先のことになりそうです。(ローカル環境では使用可能です。)こちらも期待大です!
まとめ
本記事では、AWS上にSparkの環境を構築するところから始め、SparkRというRのコードをSpark上で分散処理させるための方法について紹介しました。
「Sparkをローカルに導入して、Rを動かす」記事はいくつかありますが、実際にEC2上で複数のインスタンスを起動して、分散処理する内容に踏み込んだ記事はあまりないように思うので、この記事が、AWS上にSpark環境構築される際や、あるいはR言語ユーザのみなさんのSpark自身へのとっかかりになれば幸いです。
私が担当する次回以降の記事では、
- 実際に行った分析の事例紹介
- データ分析手法・環境の紹介
- データ分析用のインハウス・ライブラリ開発方法
などについて投稿していく予定です。
また、アナリティクスチームの仲間からも、最先端の分析手法や分析・基盤ノウハウ、外部カンファレンスの事例など共有していく予定ですので、お楽しみに!