beelineでHiveからのデータ転送にハマる

hiveserverを立てて,jdbcでリモートからデータを取得してローカルのhiveに落とす,というシステムを組んだら,結構ハマりました.

Javaか何かでjdbc経由でデータを取得するプログラムを作る時間が無かったので,コマンドラインシェルのbeelineで引っ張ってくればよいだろうと次のような感じでデータを取りました.

$ beeline --outputformat=tsv -u jdbc:hive://hiveserver.example.com:10000/default -e 'SELECT * FROM my_database'

落ちてきたのはこんな感じのデータです.

'name' 'level' 'dt'
'foo' '3' '2013-12-25'
'bar' '5' '2013-12-25'

テーブルスキーマはこんな感じで.

$ beeline --outputformat=tsv -u jdbc:hive://hiveserver.example.com:10000/default -e 'DESCRIBE my_database'
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| name      | string     |          |
| level     | int        |          |
| dt        | string     |          |
+-----------+------------+----------+

まず,何も考えずに落としたtsvデータをhiveに流し込んでみました.

$ hive -e "LOAD DATA LOCAL INPATH '/home/hadoop/my_database.tsv' INTO TABLE local_my_table PARTITION(dt='2013-12-25')"
$ hive -e "SELECT * FROM local_my_table PARTITION(dt='2013-12-25')"
'name'     'level'        'dt'       NULL     2013-12-25
'foo'       '3'        '2013-12-25'  NULL     2013-12-25
'bar'       '5'        '2013-12-25'  NULL     2013-12-25

悲惨...

タブ区切りのデータを,Hiveデフォルトの\001区切りのテーブルに押し込んだので,最初のname:stringカラムにタブ区切りデータが全部入り,残りはNULLで埋められて,パーティションのカラムは自動的に入るので綺麗になってる感じでした.

なので,Hiveのカラムの区切り文字をタブに変換.

$ hive -e "ALTER TABLE local_my_table SET SERDEPROPERTIES('field.delim'='\t')"

一度さっきのゴミパーティションを削除して,改めてロードしてみます.

$ hive -e "ALTER TABLE local_my_table DROP PARTITION(dt='2013-12-25')"
$ hive -e "LOAD DATA LOCAL INPATH '/home/hadoop/my_database.tsv' INTO TABLE local_my_table PARTITION(dt='2013-12-25')"
$ hive -e "SELECT * FROM local_my_table PARTITION(dt='2013-12-25')"
'name'     NULL         2013-12-25
'foo'      NULL         2013-12-25
'bar'      NULL         2013-12-25

ダメダメ...数値intのカラムに'3'とかを入れたから文字列代入だとみなされたようです.まてよそれなら,'foo'もfooって入るのが正しいのではないかー?
実際,HDFSの中を見てみると,元のTSVファイルが完全にそのまま入っているだけです.

$ hdfs dfs -cat /user/hive/warehouse/local_my_database/dt=2013-12-25/my_database.tsv
'name'     'level'     'dt'
'foo'       '3'        '2013-12-25'
'bar'       '5'        '2013-12-25'

良く見たら,ヘッダも邪魔!というわけで,ヘッダとシングルクオートを取ったファイルを作ります.

$ cat my_datase2.tsv
foo       3        2013-12-25
bar       5        2013-12-25

ゴミパーティションを削除してやり直し.

$ hive -e "ALTER TABLE local_my_table DROP PARTITION(dt='2013-12-25')"
$ hive -e "LOAD DATA LOCAL INPATH '/home/hadoop/my_database2.tsv' INTO TABLE local_my_table PARTITION(dt='2013-12-25')"
$ hive -e "SELECT * FROM local_my_table PARTITION(dt='2013-12-25')"
foo        3         2013-12-25
bar        5         2013-12-25

イケた感じですが,本当は,tsvダンプしたものにはパーティションのカラムが含まれていて,SELECTなんかでは意識しないでカラムとして使えるのですが,CREATEの時に指定したカラムではないので,LOAD DATAするときはパーティションのカラムも削るのが正しいようです.

$ cat my_datase3.tsv
foo       3
bar       5

自動化するために,ヘッダを取ってパーティションカラムを取ってシングルクォートも削除するスクリプトを作成.

#!/usr/local/bin/ruby

# 0:string, 1:int, 9:partition
columns = [0, 1, 9];

STDIN.each_line do |line|
  line.chomp!;
  items = line.split("\t");
  next if items[items.size() - 1] == "'dt'"; # skip header
  output = "";
  (0..(items.size() - 1)).each{|i|
    if columns[i] == 9 then
      # skip
    else
      if i == 0 then
        output = "#{items[i].delete('\'')}"
      else
        output += "\t#{items[i].delete('\'')}"
      end;
    end;
  }
  puts output;
end;

実行速度の遅さを誤魔化すために,hadoop streamingで実行.

$ hdfs dfs -put my_database3.tsv .
$ hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.4.0.jar -input my_database3.tsv -output output -file formatter.rb -mapper formatter.rb

hadoop streamingの結果はHDFSに入るので,LOAD DATAにLOCALが要らなくなる感じですね.

$ hive -e "LOAD DATA INPATH 'output/part-00000.deflate' INTO TABLE local_my_database PARTITION(dt='2013-12-25')"