ファイルベースのKVSがHadoop Streamingで使えるか確認してみた

MapReduceはキー・バリューで処理するので,キーのIDを最終的にLook up tableなんかを引いて,IDから何らかの意味のある値なり文字列にして出力したいことがよくあります.

ただ,分散処理されちゃうので,LUTはHBaseなりCassandraに置くとかが普通な感じです.私はKyotoTycoonやmemcachedを立てていましたが,何かあるとSPoFになるし何か無くても負荷のSPoFになるので,ちょっと工夫してみました.

ルックアップテーブルだったら,読み出し専門で行けるので,バーンシュタインのConstant Databaseの発想でいいのじゃないか?
cdb - Wikipedia

というわけで,DBの書き換えはないという仮定で,とりあえず実行が可能かどうか確認しました.
大丈夫なはずなんですが,念のため.

レコードが一つだけのしょぼいKVSを作ります.

kcpolymgr create kvstest.kch
kcpolymgr set kvstest.kch "testkey" "testval"

Rubyで簡単なMapperを書きます.

require "kyotocabinet"
include KyotoCabinet

db = DB::new
unless db.open("kvstest.kch", DB::OREADER)
    STDERR.printf("open error: %s\n", db.error)
    exit(1);
end

STDIN.each_line do |line|
  key, val = line.chomp.split("\t");
  if db.get(key) then
    print key,"\t",db.get(key),"\n"
  else
    print key,"\t","NULL","\n"
  end;
end

標準入力からのキーがKVSにあればKVSからgetした値を標準出力に出し,KVSになければNULLという文字列を標準出力に渡す感じです.
レコードはこれだけなので,

 kcpolymgr list -pv kvstest.kch
testkey testval

コマンドプロンプトで試すとこうなります.

$ echo "test" | ./kvstest.rb
test    NULL
$ echo "testkey" | ./kvstest.rb
testkey testval

入力を,ファイルにまとめてみます.

$ echo -e "test\tdummy" > infile.tsv
$ echo -e "testkey\tdummy" >> infile.tsv

そしてまとめてスクリプトに入れてみます.

$ cat infile.tsv | ./kvstest.rb | sort | cat
testkey testval
test    NULL

これがHadoop-Streamingで再現できることを確認したいです.

$ hdfs dfs -rm input/*
$ hdfs dfs -put infile.tsv input/
$ hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.4.0.jar -verbose  -input input/infile.tsv -output output -mapper kvstest.rb -file kvstest.rb -file kvstest.kch 
$ hdfs dfs -cat  output/part-00000
test    NULL
testkey testval

そーとの順番が違うのが気になりますが,大丈夫なようです.
ログを見ると,

packageJobJar: [kvstest.rb, kvstest.kch, /home/hadoop/tmp/hadoop-hadoop/hadoop-unjar3072529028158201168/] [] /tmp/streamjob9147930955841844512.jar tmpDir=null

スクリプトもKVSファイルも同じ場所に配されるみたいですね.
これで中央集権的なDBを立てなくてもすみます.
あとは,処理速度のベンチマークをやってみたいですね.