in-mapper combining

並列分散処理の常識をHadoopファミリから学ぶ (3/3):ビッグデータ処理の常識をJavaで身につける(2) - @ITが興味深かったのでここで紹介されている「in-mapper combining」について書いてみたいと思います。

Hadoop MapReduce デザインパターン ―MapReduceによる大規模テキストデータ処理の「3.1 ローカル集計」で示されている「in-mapper combining」というMapReduceのパターンがあります。


#そういえばHadoop MapReduce デザインパターンの3章まで読んでみた。 - wyukawa’s blogでは
#「in-mapper combining」のエントリを書かなかったなー。


これは何かというとmapper内で集約してしまおうというものです。
しかしsetCombinerClassで集約関数を指定するのとは異なります。

集約関数を指定してもそれが実際に呼ばれるかどうかはわかりません。

しかし「in-mapper combining」ではその部分を完全に制御できます。

具体例を見てみましょう。おなじみのワードカウントです。

疑似コードはこんな感じですね。

「in-mapper combining」だとこうなります。要は毎回emitするのではなくてMAP関数が呼ばれるたびにハッシュにためて最後にまとめてemitするというものです。

なおこの図は
Hadoop MapReduce デザインパターン ―MapReduceによる大規模テキストデータ処理の原書である
http://www.umiacs.umd.edu/~jimmylin/MapReduce-book-final.pdf
から取ってきました。


実際にコードを書いてみます。

なお
Java(HashMap版)WordCount - ひしだまの変更履歴
も参考になると思います。

データは@ITの記事にならって郵便番号データを使います。

環境はVirtualBox上のCentOS5.7(64bit)でCDH3u3で疑似分散で試してます。

まず郵便番号データを取ってきます。

wget http://www.post.japanpost.jp/zipcode/dl/roman/ken_all_rome.lzh
# lhaはyum install lha -y --enablerepo=rpmforgeとかしてrpmforge経由でinstallしました。
lha -e ken_all_rome.lzh

データはこんな感じです。5列目に都道府県名があります。

$ wc -l ken_all_rome.csv 
123206 ken_all_rome.csv
$ head ken_all_rome.csv 
01101,"0600000","IKANIKEISAIGANAIBAAI","CHUO-KU SAPPORO-SHI","HOKKAIDO",0,0,0,0,0,0
01101,"0640941","ASAHIGAOKA","CHUO-KU SAPPORO-SHI","HOKKAIDO",0,0,1,0,0,0
01101,"0600041","ODORIHIGASHI","CHUO-KU SAPPORO-SHI","HOKKAIDO",0,0,1,0,0,0
01101,"0600042","ODORINISHI(1-19-CHOME)","CHUO-KU SAPPORO-SHI","HOKKAIDO",1,0,1,0,0,0
01101,"0640820","ODORINISHI(20-28-CHOME)","CHUO-KU SAPPORO-SHI","HOKKAIDO",1,0,1,0,0,0
01101,"0600031","KITA1-JOHIGASHI","CHUO-KU SAPPORO-SHI","HOKKAIDO",0,0,1,0,0,0
01101,"0600001","KITA1-JONISHI(1-19-CHOME)","CHUO-KU SAPPORO-SHI","HOKKAIDO",1,0,1,0,0,0
01101,"0640821","KITA1-JONISHI(20-28-CHOME)","CHUO-KU SAPPORO-SHI","HOKKAIDO",1,0,1,0,0,0
01101,"0600032","KITA2-JOHIGASHI","CHUO-KU SAPPORO-SHI","HOKKAIDO",0,0,1,0,0,0
01101,"0600002","KITA2-JONISHI(1-19-CHOME)","CHUO-KU SAPPORO-SHI","HOKKAIDO",1,0,1,0,0,0

都道府県ごとにどれだけ郵便番号があるかを考えてみます。
SQLでいうと

SELECT todoufuken_name, count(todoufuken_name) FROM postdata GROUP BY todoufuken_name

です。というか後でHiveでこのクエリを実行します。
終結果はこんな風になります。

"AICHI" 7520
"AKITA" 2156
"AOMORI"        2510
"CHIBA" 3579
"EHIME" 1738
"FUKUI" 2254
"FUKUOKA"       3275
"FUKUSHIMA"     3923
"GIFU"  3359
"GUMMA" 1499
"HIROSHIMA"     2150
"HOKKAIDO"      8242
"HYOGO" 5215
"IBARAKI"       2855
"ISHIKAWA"      2538
"IWATE" 1936
"KAGAWA"        710
"KAGOSHIMA"     1454
"KANAGAWA"      2280
"KOCHI" 1693
"KUMAMOTO"      1877
"KYOTO" 6661
"MIE"   2473
"MIYAGI"        3325
"MIYAZAKI"      873
"NAGANO"        1684
"NAGASAKI"      1892
"NARA"  1932
"NIIGATA"       5409
"OITA"  1840
"OKAYAMA"       2188
"OKINAWA"       794
"OSAKA" 3782
"SAGA"  871
"SAITAMA"       2928
"SHIGA" 1842
"SHIMANE"       1179
"SHIZUOKA"      2932
"TOCHIGI"       1830
"TOKUSHIMA"     1426
"TOKYO" 3654
"TOTTORI"       1396
"TOYAMA"        3250
"WAKAYAMA"      1597
"YAMAGATA"      1946
"YAMAGUCHI"     1796
"YAMANASHI"     943

普通のワードカウントとして実装するとMap input recordsとMap output recordsはどちらも入力ファイルの行数と同じ123206になります。

では「in-mapper combining」を実装してみましょう。こんな感じです。なおcleanupメソッドはorg.apache.hadoop.mapreduce.Mapperを使う新APIでないと提供されません。org.apache.hadoop.mapred.Mapperを使う古いAPIだとcloseメソッドがありますが、これだと引数が無くて、Contextに相当するOutputCollectorが渡ってこないのでやりにくいです。

	private Map<String, Integer> countMap;

	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {
		super.setup(context);
		countMap = new HashMap<String, Integer>();
	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String[] records = value.toString().split(",");
		String todoufuken = records[4];
		if (countMap.containsKey(todoufuken)) {
			countMap.put(todoufuken, countMap.get(todoufuken) + 1);
		} else {
			countMap.put(todoufuken, 1);
		}
	}

	@Override
	protected void cleanup(Context context) throws IOException,
			InterruptedException {
		for (Entry<String, Integer> e : countMap.entrySet()) {
			context.write(new Text(e.getKey()), new IntWritable(e.getValue()));
		}
		super.cleanup(context);
	}

これを実行するとMap output recordsが47と大幅に減ります。

このようにmapの出力を抑制することによってシャッフルの負荷が減り全体のパフォーマンスが上がります。

しかしメモリを使っているので入力スプリットが大量データだった場合はOut Of Memoryになる危険性があります。

そのためメモリ使用量のしきい値を超えたらフラッシュするなどの対策が必要となってきます。

この辺はHadoopのMapReduceのシャッフル - wyukawa’s blogで書いた循環メモリバッファと似ています。

Hiveではこの辺も考慮して「in-mapper combining」を実装しています。実際に見てみましょう。

まずテーブルを作ります。

CREATE TABLE IF NOT EXISTS postdata (
    chihou_code     string,
    post_code       string,
    choiki_name     string,
    choson_name     string,
    todoufuken_name string,
    flag1   int,
    flag2   int,
    flag3   int,
    flag4   int,
    flag5   int,
    flag6   int
)
ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
;

次にデータをloadします。

LOAD DATA LOCAL INPATH 'ken_all_rome.csv' INTO TABLE postdata; 

集計してみましょう。

SELECT todoufuken_name, count(todoufuken_name) FROM postdata GROUP BY todoufuken_name;

実行するとMap output recordsが47となり、Javaで「in-mapper combining」を実装した結果と同じになります。

しかしデータ量が多い場合は

Out of memory due to hash maps used in map-side aggregation.

のように言われてhive.map.aggr.hash.percentmemoryを低くするように勧められます(例:0.5→0.25)。

メモリのしきい値を考慮しているので本来ならこのエラーは出ないと思うのですが、出ることもあるようです。。。

map-side aggregationというのはHiveで「in-mapper combining」を実装したものでmap側で集計処理を少しやるというものです。
公式Wikiにも少し説明があります。
https://cwiki.apache.org/Hive/languagemanual-groupby.html#LanguageManualGroupBy-MapsideAggregationforGroupBy

map-side aggregationの有効、無効はhive.map.aggrプロパティで設定します。Wikiだとデフォルトはfalseと書いてありますが実際はtrueですw つまりmap-side aggregationは有効です。

なおhive.map.aggrプロパティをfalseにして実行するとmap-side aggregationは無効になりMap output recordsは入力ファイルの行数と同じ123206になります。

hive.map.aggr.hash.percentmemoryというのは使用メモリのうちどれだけmap-side aggregationで使うかを設定する割合です。
デフォルトは0.5です。

Hiveのソースを見る限りGroupByOperatorクラスがmap-side aggregationを実装していてhashAggregationsという変数に集計結果を保存しているようです。

  // Used by hash-based GroupBy: Mode = HASH, PARTIALS
  protected transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;

結局のところパフォーマンスと安定性のトレードオフの話になってきます。デフォルト設定つまりmap-side aggregationが有効でも大半はうまくいくと思いますが、ときとしてOut of memoryの危険性があります。その場合はhive.map.aggr.hash.percentmemoryを低くしたりmapred.child.java.optsを大きくしたり、はたまたhive.map.aggrをfalseにする必要があります。

メモリ使用のイメージはこんな感じかな。

個人的にはアドホックな処理だったらメモリに依存するチューニングはありだと思いますが、定常的に流すジョブでそれをやるのは微妙かなと。なぜならデータ量が増えたときにジョブが失敗する可能性が高くなるからです。定常ジョブはスピードより安定性が重視かなあと思いますね。


今回のエントリで紹介したソース一式は下記にあります。

https://github.com/wyukawa/InMapperCombining

いじょ。