ETLフレームワークとジョブ管理

Treasure Dataが面白い記事を書いていたのでこれに関連してETLフレームワークとジョブ管理について僕の経験、意見を書いてみようと思います。
Managing the Data Pipeline with Git + Luigi - Treasure Data Blog

リンク先の記事を僕なりに要約すると、

データやそれを加工するスクリプトがちらばって管理が辛くなり、エラーが起きた時のリカバリが難しい。

それを解決するETLツールというのもあって、例えばGUIフローチャートみたいなのを書いてデータの加工処理を行うことができる。

それだとバージョン管理できないし、ビッグデータにフィットしないケースもある。

そこでGitとLuigiを使ったData Pipelineが良いよ!


紹介されているコードの例がこちら。
Hiveで集計してTDのテーブルにinsertするのがTask1で、そのテーブルにPrestoクエリをなげるのがTask2で、その結果をCSVファイルとしてダウンロードするのがTask3です。Task1 -> Task2 -> Task3と処理が進みます。requiresで依存関係を定義します。

import luigi
import luigi_td
 
# Issue Hive query and insert result into test_db.test_table on TD
class Task1(luigi_td.Query):
    type = 'hive'
    database = 'sample_datasets'
    def query(self):
        return "SELECT path, COUNT(1) cnt FROM www_access GROUP BY path ORDER BY cnt"
    def output(self):
        return luigi_td.ResultTarget('tmp/Task1', 'td://@/test_db/test_table?mode=replace')
 
# Issue Presto query against test_db.test_table
class Task2(luigi_td.Query):
    type = 'presto'
    database = 'test_db'
    def requires(self):
        return Task1()
    def query(self):
        return "SELECT COUNT(1) FROM test_db.test_table"
    def output(self):
        return luigi_td.ResultTarget('tmp/Task2')
 
# Download the result, and format as CSV
class Task3(luigi.Task):
    def requires(self):
        return Task2()
    def output(self):
        return luigi.LocalTarget('tmp/Task3.csv')
    def run(self):
        target = self.input()
        with self.output().open('w') as f:
            target.result.to_csv(f)
 
if __name__ == '__main__':
    luigi.run()

これならETL処理がコード化されてバージョン管理できるし良いよねという話です。

これをこんな感じに実行します。

$ python tasks.py Task3 --local-scheduler

ちなみに

--local-scheduler

を指定して実行すると単体で動きWeb UIにジョブが表示されずテスト用という位置づけです。
productionではCentral Schedulerをサーバーとして起動してそこに投げるという形になります。
Central SchedulerがWeb UIを提供します。

ちなみにLuigiというのはSpotifyOSSで公開しているETLフレームワークです。
https://github.com/spotify/luigi

ドキュメントにあったコード例はこんな感じで、requiresで依存関係を定義して、outputで出力先を指定し、runがメイン処理という感じです。

class AggregateArtists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

    def requires(self):
        return [Streams(date) for date in self.date_interval]

    def run(self):
        artist_count = defaultdict(int)

        for input in self.input():
            with input.open('r') as in_file:
                for line in in_file:
                    timestamp, artist, track = line.strip().split()
                    artist_count[artist] += 1

        with self.output().open('w') as out_file:
            for artist, count in artist_count.iteritems():
                print >> out_file, artist, count

Treasure Dataのコード例はLuigi-TDというTreasure DataのサービスとLuigiをつなげるPythonライブラリを使っていますが、ETL処理自体はLuigiそのままです。

僕はLuigiはAzkaban, Rundeck, JP1のようなジョブ管理ツールだと最初思ったのですが、ドキュメントを軽く読んだ限りではETLフレームワークでむしろembulkに近いのかなと思いました。

そう思った理由は並列実行機能があるわりにはスケジュール機能自体はもってなくてcron頼りみたいだし、スケジュール一覧画面なさそうだし、履歴はexperimentalだっていってるし、Web UIもそっけないからです。あとブラウザからぽちっとボタンを押して実行できるのかな、これ。出来なくてサーバーにログインしないといけないんだったらちと辛いなと思いました。

以上の理由から僕が仮に新規にログ解析基盤作るとしてもLuigiをジョブ管理ツールとしては採用しないだろうなあと思いました。ETLフレームワークとしてならありかなと思ったけど、それはそれで別途ジョブ管理ツール入れるのも微妙なのでやっぱないかな。

ジョブのretry周りがどうなるのか気になったのですが、どうやらLuigiは出力先にファイルがあると処理を実行しないので2回やっても結果は変わらないということで冪等性を保っているようです。

Running the command again will do nothing because the output file is already created. In that sense, any task in Luigi is idempotent because running it many times gives the same outcome as running it once.

Example – Top Artists — Luigi 2.7.8 documentation

なのでTreasure Dataのコード例だとTask2が失敗して再実行した場合はTask1は実行されないという流れのようです。


ちなみに僕のHadoop, Hive環境ではどうやっているかというと内製の薄いPython ETLフレームワークがあって、こんな感じで処理を記述します。

class Task1(Job):

    def validate_before(self):
        hive.exists("access_log", "yyyymmdd='%s'" % (...))

    def process(self):
        insert_query = """
                   INSERT OVERWRITE TABLE aggregate PARTITION(yyyymmdd='%s')
                   SELECT ...
                   FROM access_log
                   WHERE ...
                   GROUP BY ...
         """ % (...)
         hiveCli.query(insert_query)

    def validate_after(self):
        hive.exists("aggregate", "yyyymmdd='%s'" % (...))

validate_before -> process -> validate_afterと処理が進みます。
validate_beforeで入力データのチェック、processでメインの加工処理、validate_afterで集計結果のチェックを行います。

後続の処理がある場合も同じように書きます。他と共有しないんだったら1個のpython処理で良いのですが、共通的なテーブルを作る場合は共有する必要があるので別ファイルに切り出します。

切り出して下記のようなジョブフローになったとします。task1.py, task3.pyがcommon.pyに依存して、もちろんcommon.pyは1回しか実行しないようにします。ちなみにこういうケースでLuigiはどうするのか気になる。Luigiだとジョブは全部1ファイルにするっぽいから。まさか共通処理の部分をコピペしないよね?

common.py -> task1.py -> task2.py
                   -> task3.py -> task4.py

これをAzkabanのジョブとして登録します。
Azkabanの場合は独自のジョブ定義ファイルがあるのでそこに記述します。

雰囲気としては以下のような感じです。

#common.py
type=command
command=python ...
#task1.py
type=command
dependencies=common
command=python ...

ジョブ定義ファイルを作ってzipでかためてAzkabanにアップロードすればジョブフローをGUIで見ることができます。
ジョブ実行はAzkabanから行います。ブラウザでぽちっとすれば実行できます。スケジューリングも出来ます。出来るんですが罠があるので僕はブラウザからはジョブのスケジューリングはせずAPI経由で登録してます。どんな罠かはここに書きました。AzkabanのCLIツールeboshiを書きました - wyukawa’s blog

ジョブが失敗した場合のretryはブラウザから行います。Luigiのような冪等性はありません。
成功したジョブはdisableにして失敗したジョブをenableにしてAzkabanのジョブを実行するとdisableのジョブはスキップされるのでそうやって必要なジョブだけretryしてます。

こう書くと冪等性があった方が良いような気がしますが、入力データが間違っていたので再実行したいケースなどはLuigiだと出力先ファイルを削除しないといけません。僕はすべてHiveでINSERT OVERWRITE TABLEしてるので出力先ファイルを削除する必要はありません。ま、この辺はAzkabanとLuigiの守備範囲の違いですね。Luigiの方が低いレイヤーをサポートしています。

なお偉そうに書いてますが、上記のシステムは僕が構築したものだけど、一から作ったわけじゃなくて前の現場のやり方をほぼそのまま引き継いでます。

以上がざっと僕の感想です。もっと詳しい話はTreasure Data Tech Talk 〜クラウドサービスを支える技術〜|IT勉強会ならTECH PLAY[テックプレイ]で聞けるかも。