hive.auto.convert.join

以前
Hiveのjoinの最適化 - wyukawa’s blog
で書きましたがHive 0.7からjoinの最適化が図られています。

本家のドキュメントはこちら
https://cwiki.apache.org/Hive/joinoptimization.html
なぜか画像が無い。。。

どういうものかというとjoinする際に片方のテーブルのデータ量がメモリに収まるくらい小さいならメモリに読み込んでmapフェーズだけでjoinするというのもです。

hive.auto.convert.joinというプロパティをtrueにすればjoin最適化が有効になります。デフォルトはfalseです。

<property>
  <name>hive.auto.convert.join</name>
  <value>false</value>
  <description>Whether Hive enable the optimization about converting common join into mapjoin based on the input file size</description>
</property>

hive.auto.convert.joinをtrueにして実行するとジョブが起動する前にメモリにテーブルのデータを読み込みます。
読み込んだデータをハッシュテーブル形式にしてシリアライズして分散キャッシュ経由で配布します。

2012-03-29 12:15:06     Starting to launch local task to process map join;     maximum memory = 1013645312
2012-03-29 12:15:09     Processing rows:        100     Hashtable size: 100    Memory usage:    1968576 rate:   0.002
2012-03-29 12:15:09     Dump the hashtable into file: file:/tmp/wyukawa/hive_2012-03-29_00-14-46_291_3265299269295350973/-local-10002/HashTable-Stage-3/MapJoin-1--.hashtable
2012-03-29 12:15:09     Upload 1 File to: file:/tmp/wyukawa/hive_2012-03-29_00-14-46_291_3265299269295350973/-local-10002/HashTable-Stage-3/MapJoin-1--.hashtable File size: 4500
2012-03-29 12:15:09     End of local task; Time Taken: 2.663 sec.
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 3

Hiveのソースで言うとMapredLocalTask#executeFromChildJVMのあたりかな。

これもメモリ依存なチューニングですが
in-mapper combining - wyukawa’s blog
とは異なり使うメモリはmapred.child.java.optsではなくHADOOP_HEAPSIZEです。デフォルトは1Gです。

どのマシンのHADOOP_HEAPSIZEかというとHiveクライアント(TaskTrackerが動いているとは限らない)が動いているマシンのHADOOP_HEAPSIZEになります。

例えば/usr/lib/hadoop/conf/hadoop-env.shでHADOOP_HEAPSIZEを100Mにすると下記のようになります。

maximum memoryのところが1桁変わってますね。

2012-03-29 12:20:22     Starting to launch local task to process map join;     maximum memory = 101384192
2012-03-29 12:20:24     Processing rows:        100     Hashtable size: 100    Memory usage:    1949520 rate:   0.019
2012-03-29 12:20:24     Dump the hashtable into file: file:/tmp/wyukawa/hive_2012-03-29_00-20-00_288_662828123769621916/-local-10003/HashTable-Stage-4/MapJoin-1--.hashtable
2012-03-29 12:20:24     Upload 1 File to: file:/tmp/wyukawa/hive_2012-03-29_00-20-00_288_662828123769621916/-local-10003/HashTable-Stage-4/MapJoin-1--.hashtable File size: 3900
2012-03-29 12:20:24     End of local task; Time Taken: 2.497 sec.
Mapred Local Task Succeeded . Convert the Join into MapJoin

メモリに依存するため下記のようにOutOfMemoryErrorが出る可能性がありますが、その場合でもBackupTaskが動いて全体としてはエラーが起きないような設計になっています。スバラシ。ただうまく動かないときもあるようなのでパフォーマンスより安定性を重視するならjoin最適化は使わない方がいいかも。

Exception in thread "Thread-0" java.lang.OutOfMemoryError: Java heap space
        at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:71)
        at java.util.zip.ZipFile$1.<init>(ZipFile.java:212)
        at java.util.zip.ZipFile.getInputStream(ZipFile.java:212)
        at java.util.zip.ZipFile.getInputStream(ZipFile.java:180)
        at java.util.jar.JarFile.getInputStream(JarFile.java:383)
        at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:689)
        at sun.misc.Resource.cachedInputStream(Resource.java:59)
        at sun.misc.Resource.getByteBuffer(Resource.java:154)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:249)
        at java.net.URLClassLoader.access$000(URLClassLoader.java:56)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:195)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
        at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
        at org.apache.hadoop.util.RunJar$1.run(RunJar.java:156)
Exception in thread "Thread-1" java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:1599)
        at org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer.run(FileSystem.java:1586)
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask
ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask

またhive.mapjoin.localtask.max.memory.usageというプロパティもあります。どれぐらいの割合のメモリをハッシュテーブルに使うかというものです。デフォルトは0.9です。この割合を超えると自分でabortするようです。

<property>
  <name>hive.mapjoin.localtask.max.memory.usage</name>
  <value>0.90</value>
  <description>This number means how much memory the local task can take to hold the key/value into in-memory hash table; If the local task's memory usage is more than this number, the local task will be abort by themself. It means the data of small table is too large to be hold in the memory.</description>
</property>

メモリ使用のイメージはこんな感じかな。