Apache Sparkを高性能並列処理シェルとして使う
例題.
こんな3行のファイル.
$ hdfs dfs -cat test.csv 20131007,PC,http://www.xxx.co.jp/test,100 20131007,PC,http://www.xxx.co.jp/test3,87 20131007,PC,http://www.xxx.co.jp/test2,1000
Sparkシェルは別にHDFSじゃなくてローカルなハードディスクのファイルも扱えますが,せっかくなのでYARNの上で構築したシステムでテストしてみます.
そのファイルを読み込む準備を.
$ ./bin/spark-shell --master yarn-client scala> val file = sc.textFile("test.csv")
シェルのcat的なコマンド.
scala> file.collect().foreach(println) 20131007,PC,http://www.xxx.co.jp/test,100 20131007,PC,http://www.xxx.co.jp/test3,87 20131007,PC,http://www.xxx.co.jp/test2,1000
wc -l的なコマンド.
scala> file.collect().size res9: Int = 3
head -2的なコマンド.
scala> file.collect().take(2).foreach(println) 20131007,PC,http://www.xxx.co.jp/test,100 20131007,PC,http://www.xxx.co.jp/test3,87
tail -2的なこまんど.
scala> file.collect().takeRight(2).foreach(println) 20131007,PC,http://www.xxx.co.jp/test3,87 20131007,PC,http://www.xxx.co.jp/test2,1000
grep的なコマンド.
scala> file.filter(i=>i.contains("test3")).collect().foreach(i=>println(i)) 20131007,PC,http://www.xxx.co.jp/test3,87
あとはオマケです.
grep -v的なコマンド.
scala> file.filter(i => !i.contains("test,")).collect().foreach(i=>println(i)) 20131007,PC,http://www.xxx.co.jp/test3,87 20131007,PC,http://www.xxx.co.jp/test2,1000
grep -v|head -1的なコマンド.
scala> file.filter(i => !i.contains("test,")).take(1).foreach(i=>println(i)) 20131007,PC,http://www.xxx.co.jp/test3,87
grep -v|wc -l的なコマンド.
scala> file.filter(i => !i.contains("test,")).count() res74: Long = 2
などなど.
分散処理で性能があがるのかどうかは,次回に報告します.