Apache Sparkを高性能並列処理シェルとして使う

例題.

こんな3行のファイル.

$ hdfs dfs -cat test.csv
20131007,PC,http://www.xxx.co.jp/test,100
20131007,PC,http://www.xxx.co.jp/test3,87
20131007,PC,http://www.xxx.co.jp/test2,1000

Sparkシェルは別にHDFSじゃなくてローカルなハードディスクのファイルも扱えますが,せっかくなのでYARNの上で構築したシステムでテストしてみます.

そのファイルを読み込む準備を.

$ ./bin/spark-shell --master yarn-client

scala> val file = sc.textFile("test.csv")

シェルのcat的なコマンド.

scala> file.collect().foreach(println)
20131007,PC,http://www.xxx.co.jp/test,100
20131007,PC,http://www.xxx.co.jp/test3,87
20131007,PC,http://www.xxx.co.jp/test2,1000

wc -l的なコマンド.

scala> file.collect().size
res9: Int = 3

head -2的なコマンド.

scala> file.collect().take(2).foreach(println)
20131007,PC,http://www.xxx.co.jp/test,100
20131007,PC,http://www.xxx.co.jp/test3,87

tail -2的なこまんど.

scala> file.collect().takeRight(2).foreach(println)
20131007,PC,http://www.xxx.co.jp/test3,87
20131007,PC,http://www.xxx.co.jp/test2,1000

grep的なコマンド.

scala> file.filter(i=>i.contains("test3")).collect().foreach(i=>println(i))
20131007,PC,http://www.xxx.co.jp/test3,87

あとはオマケです.

grep -v的なコマンド.

scala> file.filter(i => !i.contains("test,")).collect().foreach(i=>println(i))
20131007,PC,http://www.xxx.co.jp/test3,87
20131007,PC,http://www.xxx.co.jp/test2,1000

grep -v|head -1的なコマンド.

scala> file.filter(i => !i.contains("test,")).take(1).foreach(i=>println(i))
20131007,PC,http://www.xxx.co.jp/test3,87

grep -v|wc -l的なコマンド.

scala> file.filter(i => !i.contains("test,")).count()
res74: Long = 2

などなど.

分散処理で性能があがるのかどうかは,次回に報告します.