HiveのSequenceFileとかパーティションとか

Hiveに関するまとまった情報源として書籍なら徹底入門があります。

とはいえそんなに突っ込んだことは書かれていないので、本家のWikiを参照することになると思います。

https://cwiki.apache.org/confluence/display/Hive/Home

最近Confluenceに変わりましたね。

ここではSequenceFileとかパーティションとかその辺について書いてみたいと思います。

が、まずはテキストファイル形式から。なお動作環境はMac上のVirtualBox上のCentOS5.6です。yumでCDH3を疑似分散でいれてます。

下記は本家のサンプルにもあるテーブルです。dtとcountryがパーティション用のカラムです。

CREATE TABLE page_view (
  viewTime INT,
  userid BIGINT,
  page_url STRING,
  referrer_url STRING,
  ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ','
 STORED AS TEXTFILE;

投入するcsvデータはこんな感じ。

1,1,http://hadoop.apache.org/,http://www.google.co.jp/search?q=hadoop,192.168.0.1
1,2,http://hive.apache.org/,http://www.google.co.jp/search?q=hive,192.168.0.2
1,3,http://hbase.apache.org/,http://www.google.co.jp/search?q=hbase,192.168.0.3
LOAD DATA LOCAL INPATH '2011-07-03_us.csv' INTO TABLE page_view PARTITION(dt='2011-07-03', country='US');

でデータ投入して、

select * from page_view;

でselectすれば

[wyukawa@localhost textfile]$ hive -f select.hql 
Hive history file=/tmp/wyukawa/hive_job_log_wyukawa_201107022151_698271647.txt
OK
1       1       http://hadoop.apache.org/       http://www.google.co.jp/search?q=hadoop 192.168.0.1     2011-07-03      US
1       2       http://hive.apache.org/ http://www.google.co.jp/search?q=hive   192.168.0.2     2011-07-03      US
1       3       http://hbase.apache.org/        http://www.google.co.jp/search?q=hbase  192.168.0.3     2011-07-03      US
Time taken: 4.483 seconds

とデータが取得できます。見てわかるとおりこのケースではMapReduceは実行されません。*を使わずにカラム指定ならMapReduceが実行されます。

で、次にSequenceFile形式の場合です。

ちなみに参照するWikiは下記なのですが、注意点としてはio.seqfile.compression.typeじゃなくてmapred.output.compression.typeです。

https://cwiki.apache.org/confluence/display/Hive/CompressedStorage

このWikiを見るとわかるとおり、いったんTEXTFILE形式のテーブルからselectしてinsertという流れになります。

ここではinsert先のDDLは下記のとおりです。

CREATE TABLE page_view_seq (
  viewTime INT,
  userid BIGINT,
  page_url STRING,
  referrer_url STRING,
  ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ','
 STORED AS SEQUENCEFILE;

実行するクエリは下記です。また圧縮機能を使うのでhadoop-nativeもインストールしたほうがいいです。

set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapred.output.compression.type=BLOCK;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

INSERT OVERWRITE TABLE page_view_seq PARTITION(dt='2011-07-03', country='US')
SELECT
  viewTime,
  userid,
  page_url,
  referrer_url,
  ip
FROM
  page_view
;

これでデータが投入されたので、

select * from page_view_seq;

でselectすればデータがとれます。

ですが、見てわかるとおりPARTITION(dt='2011-07-03', country='US')の部分がハードコードされています。

別のデータをinsertするたびに毎回手で書くのは大変ですよね。

例えばdt='2011-07-04', country='CA'の下記のデータがある場合は

1,4,http://pig.apache.org/,http://www.google.co.jp/search?q=pig,192.168.0.4
1,5,http://mahout.apache.org/,http://www.google.co.jp/search?q=mahout,192.168.0.5
1,6,http://zookeeper.apache.org/,http://www.google.co.jp/search?q=zookeeper,192.168.0.6

INSERT OVERWRITE TABLE page_view_seq PARTITION(dt='2011-07-04', country='CA')と書く必要があります。

2、3日ならいいですけど、1か月とか1年とかだったら大変ですよね。

そんなあなたにDynamic-partition Insertです。

https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-DynamicpartitionInsert

クエリはこちら。これで動的にinsertできます。

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

FROM page_view pv
INSERT OVERWRITE TABLE page_view_seq PARTITION(dt, country)
SELECT
  pv.viewTime,
  pv.userid,
  pv.page_url,
  pv.referrer_url,
  pv.ip,
  pv.dt,
  pv.country
DISTRIBUTE BY pv.dt, pv.country;

ま、基本はこれなんでしょうけど、ちょっと番外編も紹介します。

テキストファイルからSequenceFileに変換してHDFSに突っ込んで後でパーティションを追加するという方法です。

テキストファイルからSequenceFileに変換してHDFSに突っ込むJavaソースはこちら。

package sample.sequencefile;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;

public class TextFile2SequenceFile {
	
	private static final BytesWritable EMPTY_KEY = new BytesWritable();

	public static void main(String[] args) throws IOException {
		
		String textfile = args[0];

		String uri = args[1];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);

		Text value = new Text();
		SequenceFile.Writer writer = null;
		BufferedReader br = null;
		try {
			GzipCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
			writer = SequenceFile.createWriter(fs, conf, path, EMPTY_KEY.getClass(),
					value.getClass(), CompressionType.BLOCK, codec);
			br = new BufferedReader(new FileReader(textfile));
			String line = null;
			while( (line=br.readLine()) != null) {
				value.set(line);
				writer.append(EMPTY_KEY, value);
			}
		} finally {
			IOUtils.closeStream(writer);
			IOUtils.closeStream(br);
		}
	}
}

valueがテキストファイルの1行になってます。keyはなんでもいいと思いますw

というのもHiveのソース(ExecMapper)を見る限り読み飛ばしているように見えるからです。

Hiveに突っ込まれたSequenceFileを調べてもKeyに値は入っていないです。

上記のJavaソースのようにkeyを値をセットしないBytesWritableしたのはHiveのHiveSequenceFileOutputFormatがそうなっていたから真似したたけです。

実行方法はこんな感じ。

$ export HADOOP_CLASSPATH=target/classes/
$ hadoop sample.sequencefile.TextFile2SequenceFile /home/wyukawa/git-work/hive-sample/textfile/2011-07-03_us.csv /user/hive/warehouse/page_view_seq/dt=2011-07-03/country=US/2011-07-03_us.seq
11/07/02 22:56:30 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/07/02 22:56:30 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/07/02 22:56:30 INFO compress.CodecPool: Got brand-new compressor

HDFSに直に見にいくとちゃんと入っていることがわかります。

$ hadoop fs -text  /user/hive/warehouse/page_view_seq/dt=2011-07-03/country=US/2011-07-03_us.seq
11/07/02 22:56:52 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/07/02 22:56:52 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor
11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor
11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor
11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor
        1,1,http://hadoop.apache.org/,http://www.google.co.jp/search?q=hadoop,192.168.0.1
        1,2,http://hive.apache.org/,http://www.google.co.jp/search?q=hive,192.168.0.2
        1,3,http://hbase.apache.org/,http://www.google.co.jp/search?q=hbase,192.168.0.3

あとは

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AddPartitions

にあるように

ALTER TABLE page_view_seq ADD IF NOT EXISTS PARTITION(dt='2011-07-03', country='US');

としてパーティションを追加すれば、selectして見れるようになります。

一連のソースは下記にあります。

https://github.com/wyukawa/hive-sample
https://github.com/wyukawa/hadoop-sample

そんじゃーね!