Apache Sparkで即席データベース作成&分析
Spark SQLというのがあります.
Spark SQL & DataFrames | Apache Spark
これがまた,速度も機能も凄まじいのです.
リレーショナルデータベースなしで,即席でそれっぽいことができてしまいます.
こんな感じの巨大なファイルがあるとします(HadoopのHDFS上に).
$ hdfs dfs -cat sample.log
ts loc
2014-08-06T17:41:09+09:00 HK
2014-08-06T17:41:10+09:00 HK
2014-08-06T17:41:11+09:00 HK
2014-08-06T17:41:12+09:00 MO
2014-08-06T17:41:13+09:00 HK
2014-08-06T17:41:15+09:00 TW
2014-08-06T17:41:15+09:00 TW
2014-08-06T17:41:15+09:00 HK
2014-08-06T17:41:16+09:00 HK
2014-08-06T17:41:16+09:00 SG
2014-08-06T17:41:16+09:00 US
海外からのアクセスを,国ごとにログに落とした例です.
これを国ごとにアクセス数を出したい場合.
ファイルサイズが小さければ,シェルではこんな感じ?
$ cat sample.log | awk -F '\t' '{ sum[$2]+=1}END{ for(i in sum) print i" -> "sum[i]}' HK -> 6 MO -> 1 US -> 1 loc -> 1 TW -> 2 SG -> 1
今回はレコード数というかファイル容量がテラなので,Sparkを使います.
起動.
./bin/spark-shell --master yarn-client --executor-cores 2 --num-executors 4
準備.
// 決まり文句 scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 決まり文句 scala> import sqlContext._ // 用途に応じて.これは別にSparkじゃなくてscalaの構文なので,scalaコマンドで練習すると良いです scala> case class AccessCount(time:String, loc:String) // ファイル名は,ワイルドカードも使えます scala> val table = sc.textFile("hdfs://mycluster/user/hadoop/sample.log").map(_.split("\t")).map(q=>AccessCount(q(0),q(1)) // 用途に応じて scala> table.registerAsTable("accessCount") // カスタマイズして下さい scala> val country=sql("select loc,count(loc) as c from accessCount group by loc order by c") // ここでSparkが実行されますので,データ量が大きいときはチェックを! scala> country.map(t=>"Country:"+t(0)+" -> " + t(1)).collect().foreach(println) Country:'PH' -> 1257 Country:'MY' -> 2383 Country:'DE' -> 2617 Country:'SG' -> 5244 Country:'CA' -> 7003 Country:'AU' -> 7767 Country:'GB' -> 7883 Country:'KR' -> 8865 Country:'MO' -> 15195 Country:'US' -> 36241 Country:'TW' -> 67860 Country:'HK' -> 637678
簡単.瞬速.データベース・マネージメント・システムを葬る恐るべきSpark.