目次

Luigi

世界で2番目に有名な配管工の名を持つ、「バッチジョブパイプライン管理フレームワーク」

つまり、ある大きな処理をするのに複数の小さなタスクがあり、タスクBをするためにはタスクAの出力が必要となり、タスクCをするにはタスクB,D,Eの出力が必要となり、けどタスクDの結果がこうだったら代わりにタスクFを実行し……というのを、

などをパパッと行ってくれる(?)。

Windowsのバッチファイルやタスクスケジューラ、Linuxのcronを、より設定を容易に、多機能にした感じと捉えればいいのかな。

詳細はまた調べる。

類似のフレームワーク

データパイプライン管理ツールは、まだまだ決定版がない、というか状況によって適するのが変わる。

なので、いくつかのフレームワークが存在する。(自分で調べてないのでこの辺の特徴の捉え方は間違ってるかも)

(追い切れないほど様々なフレームワークが現れては消えつつつトレンドが移り変わっていく様は、Web開発周りと似た空気を感じなくもない)

一応、Luigi特有のメリット・デメリットは以下が言われている。

最後について、もちろん、オンメモリで全て処理すると1度のエラーや記述ミスで吹っ飛ぶので、要所要所で中間ファイルを残すのは大事。だが、せっかくタスク管理を使うのに、タスクの内容でなく、中間ファイルの容量を気にして分割しなきゃいけないのは本末転倒感がある。必要に応じてオンメモリでの受け渡しも出来るとこの問題は解消される。

一応、オンメモリの受け渡しもluigi.mockでできるらしいが、とても遅いらしい。luigiのon-memoryでのデータ受け渡しの非効率さについて - Qiita

使い方

基本的な使い方

スクリプトの書き方

「Task」が最も基本の1単位のタスク。Taskは内部に「requires」「output」「run」の3つのメソッドを持つ。

こんな感じ

import luigi
from somewhere import PreliminaryTask1, PreliminaryTask2

class FinallyWantedTask(luigi.Task):
    def requires(self):
        return [PreliminaryTask1(), PreliminaryTask2()]

        # (または、以下の書き方でもよい)

        yield PreliminaryTask1()
        yield PreliminaryTask2()
    
    def output(self):
        return luigi.Target('FinallyResult.csv')
    
    def run(self):
        with self.input()[0].open() as tin1:
            # PreliminaryTask1の結果の読込
        with self.input()[1].open() as tin2:
            # PreliminaryTask2の結果の読込
        # 何か処理
        with self.output().open('w') as tout:
            # 結果の出力

if __name__ == '__main__':
    luigi.run()

実行の仕方

コマンドプロンプトからの実行時に、第1引数に「最終的に結果が欲しいタスク名」を指定する。--local-schedulerはおまじない(後述)。

> python luigi_run.py FinallyWantedTask --local-scheduler

もしくは、スクリプト内でluigi.run()に引数を与える。開発段階ではこっちの方が楽かな。

if __name__ == '__main__':

    luigi.run(main_task_cls=FinallyWantedTask, local_scheduler=True)
    

処理の流れ

各タスクは、以下の要領で処理を行う

既存のoutputがあるかどうかは、デフォルトではファイル名で照合される。与えるパラメータが異なっても同名ファイルがあれば存在すると認識されるため、パラメータを変えた結果を両方保持する場合は異なる名称で保存するか、照合処理をオーバーライドする必要がある。

2つのスケジューラ

Local SchedulerとCentral Schedulerがある。

時間のかかる処理をいくつも同時に起動する場合、中央の司令塔のような存在がいると、以下のことが出来る。

これがCentral Schedulerであり、あらかじめサーバを起動しておき、各タスクは実行時にまずそのサーバにお伺いを立てる。主に開発済みの処理フローをがんがん複数回していく時に有用。

しかし、開発中だし同時に実行する人も自分しかいないよ、なんて場合にはちょっと大げさで面倒。そんな時はLocal Schedulerを使えば他との連携を考慮しない自身の処理だけのスケジュール管理が行われるようになる。

調整した使い方

デフォルト設定では少しずつ感じる不便を解消していく。

既存の処理済みファイルの確認で、ファイル名だけでなく更新日時を見る

FIXME

依存タスクを柔軟に変更できるようにする

依存タスクをパラメータとして持つと、依存タスクが変わっても別のクラスを作る必要なく、外部から注入できる。

以下の例。TaskBは通常はTaskA1に依存するが、たまにTaskA2に依存して実行させたいとき、パラメータとして持って生成時に指定できるようにしておく方法がある。

class TaskA1(luigi.Task):
    pass

class TaskA2(luigi.Task):
    pass

class TaskB(luigi.Task):
    required_tasks = luigi.Parameter(default=TaskA1)
    required_param = luigi.Parameter(default=dict(param1 = True))
    
    def requires(self):
        yield self.require_tasks(**required_param)

class All(luigi.WrapperTask):
    def requires(self):
        TaskB(required_tasks = TaskA2, required_param = dict(param2 = True))

所感

少し使って見たが、自分の場合、試行錯誤してデータ処理の方法を固める段階なので、あまり合わないかなあ、という印象。

具体的に仕様が決まってる内容を流すのはいいかも知れないけどね。

大まかに言うと、今の技術ではノイズ混じりにしか取得できないデータだけど、上手く処理を施して使えるレベルまで持っていきたい。そのためにどういった処理がいいか、試行錯誤して実証していきましょう、という段階なので、新しい試行的な処理フローの発生が多い。

この場合、試行のたびに毎回頭から処理を流すのは悲しくなるので、適当に中間ファイルを作るのだが、 かなり意識的に管理しないと、「どこまでの処理を」「どのアルゴリズム・パラメータで」実行した中間ファイルか、割とすぐにごっちゃになる。

これをすっきり整理できる一助となることをLuigiに期待していた。

A→B→Cという処理フローの、Bを変更してDにする、ということが頻繁に起こるが、LuigiではCの実行によってCの中間ファイルができあがると、その後依存先がBからDに変わろうが、気にせず過去のCの中間ファイルを使おうとする。もちろん作り込めば、依存先ファイルの生成日時比較等でいけるのだが、フロー毎にわざわざそれを作り込む必要があるのなら、使わなくても一緒かなあ。

もうちょっと自動でいろいろやってくれたらありがたかったが、今回の作業では見送り。