Hiveのソースを見てみた。
対象はCloudera版の0.7
どこがエントリポイントかっていう話がまずあるわけだが、hiveコマンドをたたくとこうなりますよね。
$ hive Hive history file=/tmp/wyukawa/hive_job_log_wyukawa_201107032314_538554893.txt >
hiveコマンドの実態である$HIVE_HOME/bin/hiveをみると、hiveコマンドをたたくとcliって最後に実行してますね。
cliっていうのはシェルの関数で実態は$HIVE_HOME/bin/ext/cli.shに書かれてます。
で、最終的にはorg.apache.hadoop.hive.cli.CliDriverを実行します。ここにmainメソッドがあります。
>!pwd;
みたいな!ではじまるものが指定された場合はそのコマンドがそのまま実行されます。
HiveQLが指定された場合はDriverクラスに処理がうつります。
compileメソッドの中でANTLRを用いた構文解析を行い、その後executeメソッドでMapReduceを実行しているようです。
たぶんTaskRunner経由でExecDriver#executeしていると思われます。
MapperとReducerは下記のとおり。
- ExecMapper
- ExecReducer
なおselect * from page_viewのように条件指定なしのselect *はMapReduceが走りません。
ちなみにexplainするとこんな感じで違ってきます。
hive> explain select * from page_view; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME page_view))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) STAGE DEPENDENCIES: Stage-0 is a root stage STAGE PLANS: Stage: Stage-0 Fetch Operator limit: -1 Time taken: 4.395 seconds hive> explain select ip from page_view; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME page_view))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ip))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: page_view TableScan alias: page_view Select Operator expressions: expr: ip type: string outputColumnNames: _col0 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1 Time taken: 0.19 seconds
SemanticAnalyzer#genMapRedTasksがその辺の判定をしているようです。
// Does this query need reduce job if (qb.isSelectStarQuery() && qbParseInfo.getDestToClusterBy().isEmpty() && qbParseInfo.getDestToDistributeBy().isEmpty() && qbParseInfo.getDestToOrderBy().isEmpty() && qbParseInfo.getDestToSortBy().isEmpty()) { boolean noMapRed = false; Iterator<Map.Entry<String, Table>> iter = qb.getMetaData() .getAliasToTable().entrySet().iterator(); Table tab = (iter.next()).getValue(); if (!tab.isPartitioned()) { if (qbParseInfo.getDestToWhereExpr().isEmpty()) { fetch = new FetchWork(tab.getPath().toString(), Utilities .getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit()); noMapRed = true; inputs.add(new ReadEntity(tab)); } } else { if (topOps.size() == 1) { TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; // check if the pruner only contains partition columns if (PartitionPruner.onlyContainsPartnCols(topToTable.get(ts), opToPartPruner.get(ts))) { PrunedPartitionList partsList = null; try { partsList = opToPartList.get(ts); if (partsList == null) { partsList = PartitionPruner.prune(topToTable.get(ts), opToPartPruner.get(ts), conf, (String) topOps.keySet() .toArray()[0], prunedPartitions); opToPartList.put(ts, partsList); } } catch (HiveException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); throw new SemanticException(e.getMessage(), e); } // If there is any unknown partition, create a map-reduce job for // the filter to prune correctly if ((partsList.getUnknownPartns().size() == 0)) { List<String> listP = new ArrayList<String>(); List<PartitionDesc> partP = new ArrayList<PartitionDesc>(); Set<Partition> parts = partsList.getConfirmedPartns(); Iterator<Partition> iterParts = parts.iterator(); while (iterParts.hasNext()) { Partition part = iterParts.next(); listP.add(part.getPartitionPath().toString()); try { partP.add(Utilities.getPartitionDesc(part)); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); } inputs.add(new ReadEntity(part)); } fetch = new FetchWork(listP, partP, qb.getParseInfo() .getOuterQueryLimit()); noMapRed = true; } } } } if (noMapRed) { if (fetch.getTblDesc() != null) { PlanUtils.configureTableJobPropertiesForStorageHandler( fetch.getTblDesc()); } fetchTask = (FetchTask) TaskFactory.get(fetch, conf); setFetchTask(fetchTask); // remove root tasks if any rootTasks.clear(); return; } }
相変わらず取りとめがないエントリですが(汗、いじょ