入力データの性質によってはOutOfMemoryになってしまうHBaseへのデータ投入用のReduceタスクをどうすべきか?

今週ずっと考えているんだけど今のところの結論としてはデータを捨てるしかないかなと思ってる。
ちょっと整理されていない、というかオレの頭が混乱している感がありありだけどラフなメモ書き。

例をあげるとこんな感じのユーザの行動ログがHDFSにあるとしましょう。ほんとは他にも情報あったりログをパースする必要があったりすると思うけどそこは省略。

userID timestamp sentence
user1 2012-12-12 12:12:11 あと1秒!
user1 2012-12-12 12:12:12 ktkr>121212121212
user1 2012-12-12 15:00:00 三時のおやつは文明堂

複数のサーバーからログがくるのでtimestampが同じという可能性もある。sentenceは違ってもね。

やりたいことはユーザごとにまとめてsortしてHBaseに突っ込むこと。このHDFS上にあるログをMapReduceで処理した後にバルクロードでHBaseに突っ込むわけです。

なんでHBaseかというとデータが多いから。Hiveだとユーザごとに検索するのにもMapReduceの処理が必要なので時間がかかる。
そうじゃなくてちゃちゃっと検索したい。その代わりといってはなんだが検索に特化しているので別に集計機能とかいらない。なのでHBase。
ユーザ単位で検索したときに時系列に表示されるようにHBaseのcolumnにはtimestampの情報も含めます。row keyはuserIDのイメージ。

HBaseへのデータ投入用のMapReduceジョブのmap側のkeyはuserIDでvalueはそれ以外すべてを使う。

そうするとreduce側ではuserIDごとにデータがまとめられてくる。ここまではいい。

問題はまとめられたデータをsortしたいということ。なんでsortするか?それはHBaseはキーでsortされている必要があるから。

sortされてないと

java.io.IOException: Added a key not lexically larger than previous key=...

とか言われる。

で、そのような用途に使えるサンプルとしてHBaseにはorg.apache.hadoop.hbase.mapreduce.KeyValueSortReducerというクラスがあります。
こんな感じ。

/**
 * Emits sorted KeyValues.
 * Reads in all KeyValues from passed Iterator, sorts them, then emits
 * KeyValues in sorted order.  If lots of columns per row, it will use lots of
 * memory sorting.
 * @see HFileOutputFormat
 */
public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
  throws java.io.IOException, InterruptedException {
    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
    for (KeyValue kv: kvs) {
      map.add(kv.clone());
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv: map) {
      context.write(row, kv);
      if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index);
    }
  }
}

KeyValueというのがHBaseのセルに相当するクラスで、row keyだったりcolumnだったりvalueの情報など全部ある。
この例の場合はrow単位でまとめられたkvsというリスト(Iterableだけどリストということにしておこう)がreduce側にわたってくる。

上のログの例だとrowがuser1でkvsが下記の3つというイメージかな。

2012-12-12 12:12:11 あと1秒!
2012-12-12 12:12:12 ktkr>121212121212
2012-12-12 15:00:00 三時のおやつは文明堂

TreeSet(KeyValue.COMPARATOR)というセットを使ってsortすることになる。

kvsのデータが大量にあったらOutOfMemoryになっちゃう。

普通のユーザは問題ないけどspamユーザが大量にアクセスしている場合なんかはkvsが大量になる可能性がある。

え、セカンダリソートとか使えばreduce側でsortしなくても良くね?っていう意見もありそうですが、reduce側で処理を入れたい場合もあるわけです。

同じユーザで同じtimestampだけど値が違うログがあったとしてその重複をさけるために連番を入れたいとかね。

2012-12-12 12:12:12-01
2012-12-12 12:12:12-02
みたいなね。
要は一意にしないとHBase上でデータが上書きされちゃうからね。
こういうことをやるためには状態を持つ必要があるのでメモリに依存してしまうというオチ。

つまりreduce側のメモリが必要であり、そのため入力データによってはOutOfMemoryになる。ちょっと詰んでる感がありますね。

そういうケースを考慮してかHBaseにはPutSortReducerというクラスがあってこれを見ると指定した閾値を超えたらデータを捨ててます。
下記の例だと2Gを超えたら捨ててますね。



2012/12/17追記
2Gを超えたら捨ててる云々は嘘。ソースコメントにあるようにflushしているだけ。
いくらなんでも勝手に捨てるとか無いよね。。。
早とちりしすぎオレ。
thanks @ueshin さん!

/**
 * Emits sorted Puts.
 * Reads in all Puts from passed Iterator, sorts them, then emits
 * Puts in sorted order.  If lots of columns per row, it will use lots of
 * memory sorting.
 * @see HFileOutputFormat
 * @see KeyValueSortReducer
 */
public class PutSortReducer extends
    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
  
  @Override
  protected void reduce(
      ImmutableBytesWritable row,
      java.lang.Iterable<Put> puts,
      Reducer<ImmutableBytesWritable, Put,
              ImmutableBytesWritable, KeyValue>.Context context)
      throws java.io.IOException, InterruptedException
  {
    // although reduce() is called per-row, handle pathological case
    long threshold = context.getConfiguration().getLong(
        "putsortreducer.row.threshold", 2L * (1<<30));
    Iterator<Put> iter = puts.iterator();
    while (iter.hasNext()) {
      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
      long curSize = 0;
      // stop at the end or the RAM threshold
      while (iter.hasNext() && curSize < threshold) {
        Put p = iter.next();
        for (List<KeyValue> kvs : p.getFamilyMap().values()) {
          for (KeyValue kv : kvs) {
            map.add(kv);
            curSize += kv.getLength();
          }
        }
      }
      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
          + "(" + StringUtils.humanReadableInt(curSize) + ")");
      int index = 0;
      for (KeyValue kv : map) {
        context.write(row, kv);
        if (index > 0 && index % 100 == 0)
          context.setStatus("Wrote " + index);
      }

      // if we have more entries to process
      if (iter.hasNext()) {
        // force flush because we cannot guarantee intra-row sorted order
        context.write(null, null);
      }
    }
  }
}

こういう風にしてデータを捨てないとダメなのかなあ。