とりあえずStormをローカルモードで動かしてみた。

身近なところでStormがちょっとブームなので話題についていくためにも軽く素振りしたいと思います。

日本語の資料でざっくり中身をつかむには下記がいいんじゃないでしょうか。

StormにはNimbus, Supervisor, Worker, Spout, Bolt, Topologyと用語が出てきますが、
ざっくりHadoop用語に当てはめるとそれぞれJobTracker, TaskTracker, task, map, reduce, jobって感じでしょうか。

上記スライド資料もそうですが、日本だとAcroquest TechnologyさんがいろいろとStormの調査資料をアウトプットしてくれているので見てみるといいんじゃないでしょうか。下記ブログのStorm関連記事もそうですね。

http://d.hatena.ne.jp/acro-engineer/archive?word=%2A%5BStorm%5D


なお本家は下記GitHubプロジェクトですね。
https://github.com/nathanmarz/storm


今回はとりあえずStormをローカルモードで動かしてみたという話です。

storm-starterというのがStormのHello, Worldプロジェクトのようです。
こいつを試したいと思います。

試すにはLeiningenというClojureのビルドツールが必要っぽいのでインストールします。

brew install leiningen

で、storm-starterのソースを取得してビルドして実行する流れはこんな感じです。

git clone git://github.com/nathanmarz/storm-starter.git
cd storm-starter/
lein deps
lein compile
java -cp $(lein classpath) storm.starter.ExclamationTopology

Eclipseにアタッチしたい場合は下記のように実行します。

mvn -f m2-pom.xml -DdownloadSources=true eclipse:eclipse

今回実行したExclamationTopologyのソースは下記の通りです。

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {
    
    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("word", new TestWordSpout(), 10);        
        builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                .shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2)
                .shuffleGrouping("exclaim1");
                
        Config conf = new Config();
        conf.setDebug(true);
        
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);
            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
        
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();    
        }
    }
}

Spout, Boltを組み合わせているのが下記の部分です。word→exclaim1→exclaim2という順番で処理が流れます。

        builder.setSpout("word", new TestWordSpout(), 10);        
        builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                .shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2)
                .shuffleGrouping("exclaim1");

まず最初のword、今回使うSpoutつまりTestWordSpoutのnextTupleメソッドは下記のようになっています。
"nathan", "mike", "jackson", "golda", "bertels"の5単語をランダムににemitします。

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }

これを受け取るexclaim1つまりExclamationBoltは単語の後ろに!!!をつけるBoltです。

_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

exclaim2もexclaim1と同様に単語の後ろに!!!をつけるBoltです。
つまりword(TestWordSpout)→exclaim1(ExclamationBolt)→exclaim2(ExclamationBolt)と処理が進むに従い下記のようにデータが変わっていくわけです。
nathan→nathan!!!→nathan!!!!!!

ExclamationTopologyを実行するとログがいっぱい出力されます。

ログの見方はまだよくわかりませんが、以下のようにnathan→nathan!!!→nathan!!!!!!と変化していく様子がわかります。

3319 [Thread-42] INFO  backtype.storm.daemon.task  - Emitting: word default [nathan]
3320 [Thread-18] INFO  backtype.storm.daemon.executor  - Processing received message source: word:14, stream: default, id: {}, [nathan]
3320 [Thread-18] INFO  backtype.storm.daemon.task  - Emitting: exclaim1 default [nathan!!!]
3320 [Thread-24] INFO  backtype.storm.daemon.executor  - Processing received message source: exclaim1:2, stream: default, id: {}, [nathan!!!]
3321 [Thread-24] INFO  backtype.storm.daemon.task  - Emitting: exclaim2 default [nathan!!!!!!]