HiveのSequenceFileとかパーティションとか
Hiveに関するまとまった情報源として書籍なら徹底入門があります。
とはいえそんなに突っ込んだことは書かれていないので、本家のWikiを参照することになると思います。
https://cwiki.apache.org/confluence/display/Hive/Home
最近Confluenceに変わりましたね。
ここではSequenceFileとかパーティションとかその辺について書いてみたいと思います。
が、まずはテキストファイル形式から。なお動作環境はMac上のVirtualBox上のCentOS5.6です。yumでCDH3を疑似分散でいれてます。
下記は本家のサンプルにもあるテーブルです。dtとcountryがパーティション用のカラムです。
CREATE TABLE page_view ( viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
投入するcsvデータはこんな感じ。
1,1,http://hadoop.apache.org/,http://www.google.co.jp/search?q=hadoop,192.168.0.1 1,2,http://hive.apache.org/,http://www.google.co.jp/search?q=hive,192.168.0.2 1,3,http://hbase.apache.org/,http://www.google.co.jp/search?q=hbase,192.168.0.3
LOAD DATA LOCAL INPATH '2011-07-03_us.csv' INTO TABLE page_view PARTITION(dt='2011-07-03', country='US');
でデータ投入して、
select * from page_view;
でselectすれば
[wyukawa@localhost textfile]$ hive -f select.hql Hive history file=/tmp/wyukawa/hive_job_log_wyukawa_201107022151_698271647.txt OK 1 1 http://hadoop.apache.org/ http://www.google.co.jp/search?q=hadoop 192.168.0.1 2011-07-03 US 1 2 http://hive.apache.org/ http://www.google.co.jp/search?q=hive 192.168.0.2 2011-07-03 US 1 3 http://hbase.apache.org/ http://www.google.co.jp/search?q=hbase 192.168.0.3 2011-07-03 US Time taken: 4.483 seconds
とデータが取得できます。見てわかるとおりこのケースではMapReduceは実行されません。*を使わずにカラム指定ならMapReduceが実行されます。
で、次にSequenceFile形式の場合です。
ちなみに参照するWikiは下記なのですが、注意点としてはio.seqfile.compression.typeじゃなくてmapred.output.compression.typeです。
https://cwiki.apache.org/confluence/display/Hive/CompressedStorage
このWikiを見るとわかるとおり、いったんTEXTFILE形式のテーブルからselectしてinsertという流れになります。
ここではinsert先のDDLは下記のとおりです。
CREATE TABLE page_view_seq ( viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS SEQUENCEFILE;
実行するクエリは下記です。また圧縮機能を使うのでhadoop-nativeもインストールしたほうがいいです。
set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; set mapred.output.compression.type=BLOCK; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; INSERT OVERWRITE TABLE page_view_seq PARTITION(dt='2011-07-03', country='US') SELECT viewTime, userid, page_url, referrer_url, ip FROM page_view ;
これでデータが投入されたので、
select * from page_view_seq;
でselectすればデータがとれます。
ですが、見てわかるとおりPARTITION(dt='2011-07-03', country='US')の部分がハードコードされています。
別のデータをinsertするたびに毎回手で書くのは大変ですよね。
例えばdt='2011-07-04', country='CA'の下記のデータがある場合は
1,4,http://pig.apache.org/,http://www.google.co.jp/search?q=pig,192.168.0.4 1,5,http://mahout.apache.org/,http://www.google.co.jp/search?q=mahout,192.168.0.5 1,6,http://zookeeper.apache.org/,http://www.google.co.jp/search?q=zookeeper,192.168.0.6
INSERT OVERWRITE TABLE page_view_seq PARTITION(dt='2011-07-04', country='CA')と書く必要があります。
2、3日ならいいですけど、1か月とか1年とかだったら大変ですよね。
そんなあなたにDynamic-partition Insertです。
https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-DynamicpartitionInsert
クエリはこちら。これで動的にinsertできます。
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; FROM page_view pv INSERT OVERWRITE TABLE page_view_seq PARTITION(dt, country) SELECT pv.viewTime, pv.userid, pv.page_url, pv.referrer_url, pv.ip, pv.dt, pv.country DISTRIBUTE BY pv.dt, pv.country;
ま、基本はこれなんでしょうけど、ちょっと番外編も紹介します。
テキストファイルからSequenceFileに変換してHDFSに突っ込んで後でパーティションを追加するという方法です。
テキストファイルからSequenceFileに変換してHDFSに突っ込むJavaソースはこちら。
package sample.sequencefile; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.ReflectionUtils; public class TextFile2SequenceFile { private static final BytesWritable EMPTY_KEY = new BytesWritable(); public static void main(String[] args) throws IOException { String textfile = args[0]; String uri = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); Text value = new Text(); SequenceFile.Writer writer = null; BufferedReader br = null; try { GzipCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf); writer = SequenceFile.createWriter(fs, conf, path, EMPTY_KEY.getClass(), value.getClass(), CompressionType.BLOCK, codec); br = new BufferedReader(new FileReader(textfile)); String line = null; while( (line=br.readLine()) != null) { value.set(line); writer.append(EMPTY_KEY, value); } } finally { IOUtils.closeStream(writer); IOUtils.closeStream(br); } } }
valueがテキストファイルの1行になってます。keyはなんでもいいと思いますw
というのもHiveのソース(ExecMapper)を見る限り読み飛ばしているように見えるからです。
Hiveに突っ込まれたSequenceFileを調べてもKeyに値は入っていないです。
上記のJavaソースのようにkeyを値をセットしないBytesWritableしたのはHiveのHiveSequenceFileOutputFormatがそうなっていたから真似したたけです。
実行方法はこんな感じ。
$ export HADOOP_CLASSPATH=target/classes/ $ hadoop sample.sequencefile.TextFile2SequenceFile /home/wyukawa/git-work/hive-sample/textfile/2011-07-03_us.csv /user/hive/warehouse/page_view_seq/dt=2011-07-03/country=US/2011-07-03_us.seq 11/07/02 22:56:30 INFO util.NativeCodeLoader: Loaded the native-hadoop library 11/07/02 22:56:30 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 11/07/02 22:56:30 INFO compress.CodecPool: Got brand-new compressor
HDFSに直に見にいくとちゃんと入っていることがわかります。
$ hadoop fs -text /user/hive/warehouse/page_view_seq/dt=2011-07-03/country=US/2011-07-03_us.seq 11/07/02 22:56:52 INFO util.NativeCodeLoader: Loaded the native-hadoop library 11/07/02 22:56:52 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor 11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor 11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor 11/07/02 22:56:52 INFO compress.CodecPool: Got brand-new decompressor 1,1,http://hadoop.apache.org/,http://www.google.co.jp/search?q=hadoop,192.168.0.1 1,2,http://hive.apache.org/,http://www.google.co.jp/search?q=hive,192.168.0.2 1,3,http://hbase.apache.org/,http://www.google.co.jp/search?q=hbase,192.168.0.3
あとは
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AddPartitions
にあるように
ALTER TABLE page_view_seq ADD IF NOT EXISTS PARTITION(dt='2011-07-03', country='US');
としてパーティションを追加すれば、selectして見れるようになります。
一連のソースは下記にあります。
https://github.com/wyukawa/hive-sample
https://github.com/wyukawa/hadoop-sample
そんじゃーね!