Apache Sparkで即席データベース作成&分析

Spark SQLというのがあります.
Spark SQL & DataFrames | Apache Spark
これがまた,速度も機能も凄まじいのです.
リレーショナルデータベースなしで,即席でそれっぽいことができてしまいます.

こんな感じの巨大なファイルがあるとします(HadoopHDFS上に).

$ hdfs dfs -cat sample.log
ts loc
2014-08-06T17:41:09+09:00 HK
2014-08-06T17:41:10+09:00 HK
2014-08-06T17:41:11+09:00 HK
2014-08-06T17:41:12+09:00 MO
2014-08-06T17:41:13+09:00 HK
2014-08-06T17:41:15+09:00 TW
2014-08-06T17:41:15+09:00 TW
2014-08-06T17:41:15+09:00 HK
2014-08-06T17:41:16+09:00 HK
2014-08-06T17:41:16+09:00 SG
2014-08-06T17:41:16+09:00 US

海外からのアクセスを,国ごとにログに落とした例です.
これを国ごとにアクセス数を出したい場合.

ファイルサイズが小さければ,シェルではこんな感じ?

$ cat sample.log | awk -F '\t' '{ sum[$2]+=1}END{ for(i in sum) print i" -> "sum[i]}'
HK -> 6
MO -> 1
US -> 1
loc -> 1
TW -> 2
SG -> 1

今回はレコード数というかファイル容量がテラなので,Sparkを使います.

起動.

./bin/spark-shell --master yarn-client --executor-cores 2 --num-executors 4

準備.

// 決まり文句
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 決まり文句
scala> import sqlContext._
// 用途に応じて.これは別にSparkじゃなくてscalaの構文なので,scalaコマンドで練習すると良いです
scala> case class AccessCount(time:String, loc:String)
// ファイル名は,ワイルドカードも使えます
scala> val table = sc.textFile("hdfs://mycluster/user/hadoop/sample.log").map(_.split("\t")).map(q=>AccessCount(q(0),q(1))
// 用途に応じて
scala> table.registerAsTable("accessCount")
// カスタマイズして下さい
scala> val country=sql("select loc,count(loc) as c from accessCount group by loc order by c")
// ここでSparkが実行されますので,データ量が大きいときはチェックを!
scala> country.map(t=>"Country:"+t(0)+" -> " + t(1)).collect().foreach(println)

Country:'PH' -> 1257
Country:'MY' -> 2383
Country:'DE' -> 2617
Country:'SG' -> 5244
Country:'CA' -> 7003
Country:'AU' -> 7767
Country:'GB' -> 7883
Country:'KR' -> 8865
Country:'MO' -> 15195
Country:'US' -> 36241
Country:'TW' -> 67860
Country:'HK' -> 637678

簡単.瞬速.データベース・マネージメント・システムを葬る恐るべきSpark.