HBaseのコンパクション周りのソースを読んでいたのでメモ

HBaseのコンパクション周りのソースを読んでいたのでメモっておく。

馬本の8.2.3.5 コンパクションと下記togetterも参考

HBaseのコンパクションまわりを調べてみた件。 - Togetter

僕が読んだソースのHBaseのバージョンは0.94.2と新しいですが上記馬本とtogetterの内容に加えて下記JIRAが加わったぐらいの印象です。大筋では変わってないと思います。

[HBASE-5199] Delete out of TTL store files before compaction selection - ASF JIRA

このJIRAは、今まではコンパクションしてからTTLを過ぎたファイルを消していたがそれだと非効率なのでTTLを過ぎたファイルを最初に削除するとのことです。

コンパクション対象のファイルを集めてくる箇所はStore#compactSelectionですね。

ソースはこんな感じになってます。長いですね。。。

  /**
   * Algorithm to choose which files to compact
   *
   * Configuration knobs:
   *  "hbase.hstore.compaction.ratio"
   *    normal case: minor compact when file <= sum(smaller_files) * ratio
   *  "hbase.hstore.compaction.min.size"
   *    unconditionally compact individual files below this size
   *  "hbase.hstore.compaction.max.size"
   *    never compact individual files above this size (unless splitting)
   *  "hbase.hstore.compaction.min"
   *    min files needed to minor compact
   *  "hbase.hstore.compaction.max"
   *    max files to compact at once (avoids OOM)
   *
   * @param candidates candidate files, ordered from oldest to newest
   * @return subset copy of candidate list that meets compaction criteria
   * @throws IOException
   */
  CompactSelection compactSelection(List<StoreFile> candidates, int priority)
      throws IOException {
    // ASSUMPTION!!! filesCompacting is locked when calling this function

    /* normal skew:
     *
     *         older ----> newer
     *     _
     *    | |   _
     *    | |  | |   _
     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
     *    | |  | |  | |  | |  _  | |
     *    | |  | |  | |  | | | | | |
     *    | |  | |  | |  | | | | | |
     */
    CompactSelection compactSelection = new CompactSelection(conf, candidates);

    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
    if (!forcemajor) {
      // Delete the expired store files before the compaction selection.
      if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
          && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
        CompactSelection expiredSelection = compactSelection
            .selectExpiredStoreFilesToCompact(
                EnvironmentEdgeManager.currentTimeMillis() - this.ttl);

        // If there is any expired store files, delete them  by compaction.
        if (expiredSelection != null) {
          return expiredSelection;
        }
      }
      // do not compact old files above a configurable threshold
      // save all references. we MUST compact them
      int pos = 0;
      while (pos < compactSelection.getFilesToCompact().size() &&
             compactSelection.getFilesToCompact().get(pos).getReader().length()
               > maxCompactSize &&
             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
      compactSelection.clearSubList(0, pos);
    }

    if (compactSelection.getFilesToCompact().isEmpty()) {
      LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
        this.storeNameStr + ": no store files to compact");
      compactSelection.emptyFileList();
      return compactSelection;
    }

    // Force a major compaction if this is a user-requested major compaction,
    // or if we do not have too many files to compact and this was requested
    // as a major compaction
    boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
      (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
    );
    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
      this.getColumnFamilyName() + ": Initiating " +
      (majorcompaction ? "major" : "minor") + "compaction");

    if (!majorcompaction &&
        !hasReferences(compactSelection.getFilesToCompact())) {
      // we're doing a minor compaction, let's see what files are applicable
      int start = 0;
      double r = compactSelection.getCompactSelectionRatio();

      // remove bulk import files that request to be excluded from minors
      compactSelection.getFilesToCompact().removeAll(Collections2.filter(
          compactSelection.getFilesToCompact(),
          new Predicate<StoreFile>() {
            public boolean apply(StoreFile input) {
              return input.excludeFromMinorCompaction();
            }
          }));

      // skip selection algorithm if we don't have enough files
      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Not compacting files because we only have " +
            compactSelection.getFilesToCompact().size() +
            " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
        }
        compactSelection.emptyFileList();
        return compactSelection;
      }

      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
      // Sort files by size to correct when normal skew is altered by bulk load.
      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
       */

      // get store file sizes for incremental compacting selection.
      int countOfFiles = compactSelection.getFilesToCompact().size();
      long [] fileSizes = new long[countOfFiles];
      long [] sumSize = new long[countOfFiles];
      for (int i = countOfFiles-1; i >= 0; --i) {
        StoreFile file = compactSelection.getFilesToCompact().get(i);
        fileSizes[i] = file.getReader().length();
        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
        int tooFar = i + this.maxFilesToCompact - 1;
        sumSize[i] = fileSizes[i]
                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
      }

      /* Start at the oldest file and stop when you find the first file that
       * meets compaction criteria:
       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
       *      OR
       *   (2) within the compactRatio of sum(newer_files)
       * Given normal skew, any newer files will also meet this criteria
       *
       * Additional Note:
       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
       * compact().  Consider the oldest files first to avoid a
       * situation where we always compact [end-threshold,end).  Then, the
       * last file becomes an aggregate of the previous compactions.
       */
      while(countOfFiles - start >= this.minFilesToCompact &&
            fileSizes[start] >
              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
        ++start;
      }
      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
      long totalSize = fileSizes[start]
                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
      compactSelection = compactSelection.getSubList(start, end);

      // if we don't have enough files to compact, just wait
      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Skipped compaction of " + this.storeNameStr
            + ".  Only " + (end - start) + " file(s) of size "
            + StringUtils.humanReadableInt(totalSize)
            + " have met compaction criteria.");
        }
        compactSelection.emptyFileList();
        return compactSelection;
      }
    } else {
      if(majorcompaction) {
        if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
          LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
            " files, probably because of a user-requested major compaction");
          if(priority != PRIORITY_USER) {
            LOG.error("Compacting more than max files on a non user-requested compaction");
          }
        }
      } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
        // all files included in this compaction, up to max
        int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
        compactSelection.getFilesToCompact().subList(0, pastMax).clear();
      }
    }
    return compactSelection;
  }

まず最初にTTLが過ぎて削除できるファイルを集めます。下記の部分ですね。これが空でないならこの時点でコンパクションが実行されます。

      if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
          && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
        CompactSelection expiredSelection = compactSelection
            .selectExpiredStoreFilesToCompact(
                EnvironmentEdgeManager.currentTimeMillis() - this.ttl);

        // If there is any expired store files, delete them  by compaction.
        if (expiredSelection != null) {
          return expiredSelection;
        }
      }

hbase.store.delete.expired.storefileというプロパティを見ていますが、これは下記JIRAにあるようにHBase 0.94.1からdefaultでtrueになっています。
[HBASE-6267] hbase.store.delete.expired.storefile should be true by default - ASF JIRA

CompactSelection#selectExpiredStoreFilesToCompactは下記のようになっています。

  /**
   * Select the expired store files to compact
   * 
   * @param maxExpiredTimeStamp
   *          The store file will be marked as expired if its max time stamp is
   *          less than this maxExpiredTimeStamp.
   * @return A CompactSelection contains the expired store files as
   *         filesToCompact
   */
  public CompactSelection selectExpiredStoreFilesToCompact(
      long maxExpiredTimeStamp) {
    if (filesToCompact == null || filesToCompact.size() == 0)
      return null;
    ArrayList<StoreFile> expiredStoreFiles = null;
    boolean hasExpiredStoreFiles = false;
    CompactSelection expiredSFSelection = null;

    for (StoreFile storeFile : this.filesToCompact) {
      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
        LOG.info("Deleting the expired store file by compaction: "
            + storeFile.getPath() + " whose maxTimeStamp is "
            + storeFile.getReader().getMaxTimestamp()
            + " while the max expired timestamp is " + maxExpiredTimeStamp);
        if (!hasExpiredStoreFiles) {
          expiredStoreFiles = new ArrayList<StoreFile>();
          hasExpiredStoreFiles = true;
        }
        expiredStoreFiles.add(storeFile);
      }
    }

    if (hasExpiredStoreFiles) {
      expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
    }
    return expiredSFSelection;
  }

regionserverのlogに「Deleting the expired store file by compaction: 」と出ていればそれでTTLを過ぎたファイルが削除されているのが確認できます。

TTLを過ぎたファイルがコンパクションによって削除された後はまた別のコンパクションが実行されます。その際のロジックのポイントはソースの下記の部分です。

      while(countOfFiles - start >= this.minFilesToCompact &&
            fileSizes[start] >
              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
        ++start;
      }
      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
      long totalSize = fileSizes[start]
                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
      compactSelection = compactSelection.getSubList(start, end);

ストアファイルの数がhbase.hstore.compaction.minプロパティ(古くはhbase.hstore.compactionThresholdでデフォルトは3)の値よりも大きくなったらコンパクションが実行されます。ソースで言うとthis.minFilesToCompactがhbase.hstore.compaction.minプロパティの値に相当します。

minCompactSizeはhbase.hstore.compaction.min.sizeプロパティの値(これはhbase.hregion.memstore.flush.sizeの値でもありデフォルトは128MB)になります。

sumSizeは新しいストアファイルサイズの合計値を表す配列で配列のインデックスが進むごとに累積していくイメージです。rはhbase.hregion.memstore.flush.sizeのhbase.hstore.compaction.ratioプロパティの値でデフォルトは1.2です。

最初はファイルサイズが小さいファイルばかりなのでつねにメジャーコンパクションになります。ある程度大きくなってくると新しいストアファイルの合計にhbase.hstore.compaction.ratioをかけ算してそれとhbase.hstore.compaction.min.sizeの大きい方と比較して小さいファイルならコンパクション対象とするという感じです。

ソースコメントにある図(このブログでの表示だとフォーマットが崩れちゃってますが)や馬本の図8-4はその辺を図にしたものです。