Hiveのソースを見てみた。

対象はCloudera版の0.7

どこがエントリポイントかっていう話がまずあるわけだが、hiveコマンドをたたくとこうなりますよね。

$ hive
Hive history file=/tmp/wyukawa/hive_job_log_wyukawa_201107032314_538554893.txt
>

hiveコマンドの実態である$HIVE_HOME/bin/hiveをみると、hiveコマンドをたたくとcliって最後に実行してますね。

cliっていうのはシェルの関数で実態は$HIVE_HOME/bin/ext/cli.shに書かれてます。

で、最終的にはorg.apache.hadoop.hive.cli.CliDriverを実行します。ここにmainメソッドがあります。

>!pwd;

みたいな!ではじまるものが指定された場合はそのコマンドがそのまま実行されます。

HiveQLが指定された場合はDriverクラスに処理がうつります。

compileメソッドの中でANTLRを用いた構文解析を行い、その後executeメソッドでMapReduceを実行しているようです。

たぶんTaskRunner経由でExecDriver#executeしていると思われます。

MapperとReducerは下記のとおり。

  • ExecMapper
  • ExecReducer

なおselect * from page_viewのように条件指定なしのselect *はMapReduceが走りません。

ちなみにexplainするとこんな感じで違ってきます。

hive> explain select * from page_view;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME page_view))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))

STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1


Time taken: 4.395 seconds
hive> explain select ip from page_view;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME page_view))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ip)))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        page_view 
          TableScan
            alias: page_view
            Select Operator
              expressions:
                    expr: ip
                    type: string
              outputColumnNames: _col0
              File Output Operator
                compressed: false
                GlobalTableId: 0
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1


Time taken: 0.19 seconds

SemanticAnalyzer#genMapRedTasksがその辺の判定をしているようです。

    // Does this query need reduce job
    if (qb.isSelectStarQuery() && qbParseInfo.getDestToClusterBy().isEmpty()
        && qbParseInfo.getDestToDistributeBy().isEmpty()
        && qbParseInfo.getDestToOrderBy().isEmpty()
        && qbParseInfo.getDestToSortBy().isEmpty()) {
      boolean noMapRed = false;

      Iterator<Map.Entry<String, Table>> iter = qb.getMetaData()
          .getAliasToTable().entrySet().iterator();
      Table tab = (iter.next()).getValue();
      if (!tab.isPartitioned()) {
        if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
          fetch = new FetchWork(tab.getPath().toString(), Utilities
              .getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit());
          noMapRed = true;
          inputs.add(new ReadEntity(tab));
        }
      } else {

        if (topOps.size() == 1) {
          TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];

          // check if the pruner only contains partition columns
          if (PartitionPruner.onlyContainsPartnCols(topToTable.get(ts),
              opToPartPruner.get(ts))) {

            PrunedPartitionList partsList = null;
            try {
              partsList = opToPartList.get(ts);
              if (partsList == null) {
                partsList = PartitionPruner.prune(topToTable.get(ts),
                    opToPartPruner.get(ts), conf, (String) topOps.keySet()
                    .toArray()[0], prunedPartitions);
                opToPartList.put(ts, partsList);
              }
            } catch (HiveException e) {
              // Has to use full name to make sure it does not conflict with
              // org.apache.commons.lang.StringUtils
              LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
              throw new SemanticException(e.getMessage(), e);
            }

            // If there is any unknown partition, create a map-reduce job for
            // the filter to prune correctly
            if ((partsList.getUnknownPartns().size() == 0)) {
              List<String> listP = new ArrayList<String>();
              List<PartitionDesc> partP = new ArrayList<PartitionDesc>();

              Set<Partition> parts = partsList.getConfirmedPartns();
              Iterator<Partition> iterParts = parts.iterator();
              while (iterParts.hasNext()) {
                Partition part = iterParts.next();

                listP.add(part.getPartitionPath().toString());
                try {
                  partP.add(Utilities.getPartitionDesc(part));
                } catch (HiveException e) {
                  throw new SemanticException(e.getMessage(), e);
                }
                inputs.add(new ReadEntity(part));
              }

              fetch = new FetchWork(listP, partP, qb.getParseInfo()
                  .getOuterQueryLimit());
              noMapRed = true;
            }
          }
        }
      }

      if (noMapRed) {
        if (fetch.getTblDesc() != null) {
          PlanUtils.configureTableJobPropertiesForStorageHandler(
            fetch.getTblDesc());
        }
        fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
        setFetchTask(fetchTask);

        // remove root tasks if any
        rootTasks.clear();
        return;
      }
    }


相変わらず取りとめがないエントリですが(汗、いじょ

こちらも参考まで
Hiveのコードを読んでみた - Bangucs de guanxinqu