HBaseのバルクロードというかHFile生成方法

HBaseにデータをロードする場合はバルクロードを使うのが性能的には良いと思います。

HTable#putだとWALをオフにしても遅いんですよね。

バルクロードに関しては馬本の12.2.3を読むと良いと思います。

バルクロードを使うためにはHBase用のデータファイル、HFileを生成する必要があります。
MapReduceジョブでHFileを生成するためには出力フォーマットとしてHFileOutputFormatを使います。
MapReduceジョブを効率的に動作させるためには出力したHFileを1つのリージョンに収まるようにするわけですが、その辺の処理はHFileOutputFormat#configureIncrementalLoadを呼べばいいです。呼べばTotalOrderPartitionerをpartitionerとしてセットしてよろしくやってくれます。

MapReduceでどうやってHFileを生成するかに関してはHADOOP HACKS の#28の具体的なサンプルコードがあるのでそれを参考にすると良いでしょう。

簡単にポイントだけいってしまうとMapper側ではmapの出力を以下のようにImmutableBytesWritableをkeyにPutをvalueにします。

ImmutableBytesWritable key = ...
Put put = ...

context.write(key, put)

ReducerはなくてあとはDriver側で以下のようにすればよいです。

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

HTable table = ...
HFileOutputFormat.configureIncrementalLoad(job, table);

HFileOutputFormat#configureIncrementalLoad内がどうなっているかというと以下のようになっています。対象バージョンはHBase 0.94.3です。mapのoutputのvalueのclassがPutならPutSortReducerをReducerとして使っています。なおmapのoutputのvalueのclassがKeyValueならKeyValueSortReducerを使っています。この2つのクラスは先日少し紹介しました。

入力データの性質によってはOutOfMemoryになってしまうHBaseへのデータ投入用のReduceタスクをどうすべきか? - wyukawa’s blog

  public static void configureIncrementalLoad(Job job, HTable table)
  throws IOException {
    Configuration conf = job.getConfiguration();
    Class<? extends Partitioner> topClass;
    try {
      topClass = getTotalOrderPartitionerClass();
    } catch (ClassNotFoundException e) {
      throw new IOException("Failed getting TotalOrderPartitioner", e);
    }
    job.setPartitionerClass(topClass);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(KeyValue.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    // Based on the configured map output class, set the correct reducer to properly
    // sort the incoming values.
    // TODO it would be nice to pick one or the other of these formats.
    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
      job.setReducerClass(KeyValueSortReducer.class);
    } else if (Put.class.equals(job.getMapOutputValueClass())) {
      job.setReducerClass(PutSortReducer.class);
    } else {
      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
    }

    LOG.info("Looking up current regions for table " + table);
    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
        "to match current region count");
    job.setNumReduceTasks(startKeys.size());

    Path partitionsPath = new Path(job.getWorkingDirectory(),
                                   "partitions_" + UUID.randomUUID());
    LOG.info("Writing partition information to " + partitionsPath);

    FileSystem fs = partitionsPath.getFileSystem(conf);
    writePartitions(conf, partitionsPath, startKeys);
    partitionsPath.makeQualified(fs);

    URI cacheUri;
    try {
      // Below we make explicit reference to the bundled TOP.  Its cheating.
      // We are assume the define in the hbase bundled TOP is as it is in
      // hadoop (whether 0.20 or 0.22, etc.)
      cacheUri = new URI(partitionsPath.toString() + "#" +
        org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
    } catch (URISyntaxException e) {
      throw new IOException(e);
    }
    DistributedCache.addCacheFile(cacheUri, conf);
    DistributedCache.createSymlink(conf);

    // Set compression algorithms based on column families
    configureCompression(table, conf);

    TableMapReduceUtil.addDependencyJars(job);
    LOG.info("Incremental table output configured.");
  }

PutSortReducerは以下のようになっています。対象バージョンはHBase 0.94.3です。

public class PutSortReducer extends
    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
  
  @Override
  protected void reduce(
      ImmutableBytesWritable row,
      java.lang.Iterable<Put> puts,
      Reducer<ImmutableBytesWritable, Put,
              ImmutableBytesWritable, KeyValue>.Context context)
      throws java.io.IOException, InterruptedException
  {
    // although reduce() is called per-row, handle pathological case
    long threshold = context.getConfiguration().getLong(
        "putsortreducer.row.threshold", 2L * (1<<30));
    Iterator<Put> iter = puts.iterator();
    while (iter.hasNext()) {
      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
      long curSize = 0;
      // stop at the end or the RAM threshold
      while (iter.hasNext() && curSize < threshold) {
        Put p = iter.next();
        for (List<KeyValue> kvs : p.getFamilyMap().values()) {
          for (KeyValue kv : kvs) {
            map.add(kv);
            curSize += kv.getLength();
          }
        }
      }
      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
          + "(" + StringUtils.humanReadableInt(curSize) + ")");
      int index = 0;
      for (KeyValue kv : map) {
        context.write(row, kv);
        if (index > 0 && index % 100 == 0)
          context.setStatus("Wrote " + index);
      }

      // if we have more entries to process
      if (iter.hasNext()) {
        // force flush because we cannot guarantee intra-row sorted order
        context.write(null, null);
      }
    }
  }
}

最終的にはImmutableBytesWritableをkeyに、KeyValueをvalueとしてcontext.writeします。
HFileはキーでsortされている必要があるためTreeSet(KeyValue.COMPARATOR)を使ってsortしています。

sortされていないとAbstractHFileWriter#checkKeyのチェックで引っかかって

java.io.IOException: Added a key not lexically larger than previous key=...

といわれます。

context.writeの部分のソースは下記のようになっています。context.write(null, null)だとflushしていますね。

      public void write(ImmutableBytesWritable row, KeyValue kv)
      throws IOException {
        // null input == user explicitly wants to flush
        if (row == null && kv == null) {
          rollWriters();
          return;
        }

        byte [] rowKey = kv.getRow();
        long length = kv.getLength();
        byte [] family = kv.getFamily();
        WriterLength wl = this.writers.get(family);

        // If this is a new column family, verify that the directory exists
        if (wl == null) {
          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
        }

        // If any of the HFiles for the column families has reached
        // maxsize, we need to roll all the writers
        if (wl != null && wl.written + length >= maxsize) {
          this.rollRequested = true;
        }

        // This can only happen once a row is finished though
        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
          rollWriters();
        }

        // create a new HLog writer, if necessary
        if (wl == null || wl.writer == null) {
          wl = getNewWriter(family, conf);
        }

        // we now have the proper HLog writer. full steam ahead
        kv.updateLatestStamp(this.now);
        trt.includeTimestamp(kv);
        wl.writer.append(kv);
        wl.written += length;

        // Copy the row so we know when a row transition.
        this.previousRow = rowKey;
      }

PutSortReducerではTreeSetにどんどんKeyValueをaddしていってputsortreducer.row.thresholdを超えたらflushするためにcontext.write(null, null)を呼んでいます。で、次からは別ファイルに書き込んでいるようですね。なので1つのHFile内ではちゃんとsortされているとそういうことだと思います。


ちなみにHFileですが0.92からVersion 2なるフォーマットになりました。詳細は下記を参照。インデックスがモノリシックじゃなくてマルチレベルになったので性能が向上したとか書いてあります。