Apache Sparkは魔法の杖ではない

チューニングや導入の苦労も想像せずに,SparkがあればMapReduce1は過去のものだと思っている人もいそうですが...

特にSpark SQLは実装がまだ甘いこともあって,非常に単純なクエリだとHiveにあっさり負けます.理由のうちの一つは,Hive SQL on Sparkということで,SparkがHive側に寄ったこともあるのですが...

単純なクエリ.Word Countよりも単純な,タダのCOUNT(*).

Hiveのデータベースには,400Gバイトほどのデータがあるとします.

>hdfs dfs -du -h /user/hive/warehouse/metameta.db/
405.3 G  /user/hive/warehouse/metameta.db/user_log

ただし,Hiveの常套手段として,日別にパーティションが切られていて,これからクエリを投げる対象は60Gバイトくらいな感じです.

> hdfs dfs -du /user/hive/warehouse/metameta.db/user_log|grep 2014-02 | awk '{sum+=$1}END{print sum/1024/1024/1024" G"}'
60.8193 G

あるIntのカラムの総和を計算してみます.Hiveで.

> time hive --database=metameta -e "select count(*) from user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10"
MapReduce Total cumulative CPU time: 0 days 3 hours 4 minutes 15 seconds 540 msec
Ended Job = job_1401264313901_26799
MapReduce Jobs Launched: 
Job 0: Map: 1116  Reduce: 1   Cumulative CPU: 11055.54 sec   HDFS Read: 65304557940 HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 0 days 3 hours 4 minutes 15 seconds 540 msec
OK
_c0
991398
Time taken: 178.672 seconds

real    3m4.654s
user    0m33.209s
sys     0m0.831s

3分位で一ヶ月の集計が終わりです.

同じことをSparkで,Hive on Sparkで.

> ./bin/spark-shell --master yarn-client --driver-class-path /usr/local/hive/lib/mysql-connector-java.jar --executor-cores 8 --num-executors 12

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@158f8cfe

scala> import hiveContext._
import hiveContext._

scala> val res = hql("select count(*) from metameta.user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10")

res: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
Aggregate false, [], [SUM(PartialCount#17L) AS c_0#0L]
 Exchange SinglePartition
  Aggregate true, [], [COUNT(1) AS PartialCount#17L]
   Project []
    Filter (tt#12:0 > 10)
     HiveTableScan [tsumcnt#12], (MetastoreRelation metameta, user_log, None), Some(((dt#2 >= 2014-02-01) && (dt#2 <= 2014-02-28)))

scala> res.map(t=>t(0)).collect().foreach(println)
14/07/10 16:36:30 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1274.304651956 s
991398

集計結果は等しく991398ですが,Hiveだと3分,Hive on Sparkだと20分以上かかりました.

Sparkの方が秒にして一桁遅い.約180秒のHiveと約1300秒のSpark.


もちろんこれは意地悪な例で,例えば,結果が複数出るクエリを投げる.これもシンプル過ぎる例ですが.

scala> val res = hql("select sum(tcnt),sum(addtcnt) from metameta.user_log where (dt >= '2014-02-01' and dt <= '2014-02-28') and tcnt>10")  

(中略)
14/07/10 21:11:19 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1189.674418821 s
count: 13693172,60772

また20分近くかかってしまいましたが...
そしてこの結果から,さらに別な統計量を求める.

14/07/10 21:11:19 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1189.674418821 s
count: 13693172,60772

scala> res.map(t=>"average: " + ((t(0).asInstanceOf[Long] + t(1).asInstanceOf[Long])/2)).collect().foreach(println)
14/07/10 21:17:33 INFO spark.SparkContext: Job finished: collect at <console>:20, took 1.613828991 s
average: 6876972

統計量といっても,2量の相加平均でしたが...結果がキャッシュされているので,1.6秒で出ます.
この場合は2数の値が出ているのだからSparkじゃなくても自分で計算すれば良いとかいろいろあるかと思いますが,ここは例えばScala shellを使うのだったら,Scalaのプログラムを書けばどんな計算でもできるので,例が悪いということでお許し下さい.

だいたい,Hiveで長い時間かけて計算させて,「クエリ間違った!」とか「一つ値求めるの忘れた!」とか,何度あったことか...Hiveなら最初からやり直しですが,Hive on Sparkなら,終わってからガッカリせずに,その後1秒で忘れたものを取り出すことが可能かもしれません(上記の例は,実体験に基づく...).

何より,既存のHadoopなりYARNなりHDFSなりHiveなりと完全共存できるのが素晴らしいです.なので導入が手軽です.

Spark SQL with Hadoop Hiveにハマる - なぜか数学者にはワイン好きが多い
(↑まぁ,手軽とはいえ悩んだのですが...)