目次
QueueHandlerを用いた並列処理ログ
Pythonで並列処理を行うのに基本となるモジュールがmultiprocessing。
並列処理はそれだけで他のモジュールの1000倍くらい(適当)必要な前提知識と特有の留意点があって深追いするとキリが無いため、 あまり深入りせずいくつかの話題を掻い摘まむ程度でまとめたい。
- multiprocessingについてまとまっている日本語記事
前提環境
- Python3.9.9
- Windows 10
並列処理でのログ
並列処理で困ることの1つが、複数のプロセスが同時に1つのログファイルに書き込もうとしたらレコードが消えたりファイルが壊れたりする問題。
解決策としては、
- プロセスごとにファイルを分ける
- ファイルが分散してしまうけど、被らない命名規則を作れるなら手っ取り早い
- ログファイル出力時に排他処理する
- ログの頻度が低い&プロセス数の高が知れているなら1つの方法ではある
- プロセスが多いと書き込み待ちによる速度低下が心配
- ログ出力専用のスレッドを立ち上げ、他のプロセスからのログは全部そこに送る
- ログを1つに集約できるし、速度低下も抑えられる
- プロセス間通信の仕組み作りがややこしい
可能なら3番目がスマート。
multiprocessingでは、3番目を比較的簡単に行える仕組みが存在する。1)
まず、multiprocessingには、プロセス間通信で共有データを扱うための multiprocessing.Queue
というFIFO形式のデータ構造が存在する。
(特にログに用途を限定したものではない)
これをログの出力先にするハンドラが、QueueHandler。
ファイルを出力先にするFileHandler、コンソール等を出力先にするStreamHandlerと同じ感じ。
で、溜められたログを取り出して書き出すのがQueueListener。
単独スレッドとして立ち上げておき、外部への出力はこいつだけが行うように集約することで、プロセス間の交通整理ができる。
Queueの共有
QueueHandler, QueueListenerは、インスタンス生成時に具体的なQueueインスタンスを指定してやる必要がある。
まぁListenerの方は親プロセスで指定してやれば子プロセスからどうこうする必要は無いのだが、
Handlerの方は子プロセスが扱うloggerに登録して使うので、当然プロセス間で同じQueueを指定しないといけない。
何らかの方法で共有する必要がある。
OSによる新規プロセスの立ち上げ方の違い
前提として、multiprocessingでは、WindowsとLinuxでプロセスの立ち上げ方が異なる。
WindowsとMacは spawn
で、Linuxは fork
で新規プロセスが立ち上がる。
この違いは、以下のサイトにさくっと説明されている。
ざっくりいうと、
- spawn
- 立ち上げに必要最低限の情報だけ渡して、新規プロセスを立ち上げる
- メモリは食わないんだけど、立ち上げが遅い
- グローバル変数は共有されない
- fork
- 親プロセスをコピーする
- メモリは食うけど、立ち上げは速い
- グローバル変数も共有される
特にグローバル変数の違いはたまにハマる。
適当なサイトでのサンプルコードをコピーしても、それがLinux用に書かれたもので、 グローバル変数の共有を前提としていた場合、Windowsでは動かなかったりする。
QueueHandlerにおけるQueueの共有
その上でQueueHandlerに話を戻すと、指定するQueueインスタンスに関して、
- Linuxだったら一度親プロセスで登録してやれば、暗黙的に共有される?(挙動未確認)
- Windowsなら、引数を介してQueueを明示的に共有、サブプロセス内部で改めてQueueHandlerを生成、loggerに登録する必要がある
という違いが生じる。
別にLinuxでも2番目の方法で動くので、多少冗長にはなるが、Windowsでも動く方法で書いておくと保守性が高まる。
Python公式ドキュメントのサンプルは、Windowsでも動く書き方となっている。
ただ、いろいろできることを示すため一般的には不要な部分もあるサンプルとなっているため、よく使いそうな構成に絞ると、以下の感じだろうか。
# You'll need these imports in your own code import logging import logging.handlers import multiprocessing # Next import lines for this demo only from random import random import sys import time # サブプロセス def worker_process(i: int, log_queue: multiprocessing.Queue): # ログの取得・Queueの登録 logger = logging.getLogger() # loggerの取得 handler = logging.handlers.QueueHandler(log_queue) # QueueHandlerの生成 logger.addHandler(handler) # loggerへhandlerを登録 logger.setLevel(logging.DEBUG) # loggerへレベル設定(任意) name = multiprocessing.current_process().name logger.info(f'Worker started: {name}') for j in range(10): time.sleep(random()) logger.info(f'{name} {i} {j}') logger.info(f'Worker finished: {name}') def main(): log_queue = multiprocessing.Queue() # ハンドラの生成と設定 # ファイル mptest4.log と標準エラー stderr の2つへ出力する file_handler = logging.handlers.RotatingFileHandler('mptest4.log', 'a', 3000, 5) console_handler = logging.StreamHandler(sys.stderr) file_formatter = logging.Formatter('%(asctime)s %(processName)-10s %(levelname)-8s %(message)s') file_handler.setFormatter(file_formatter) console_formatter = logging.Formatter("%(processName)s %(message)s") console_handler.setFormatter(console_formatter) # QueueListenerを、QueueとHandlerを指定して生成 listener = logging.handlers.QueueListener(log_queue, console_handler, file_handler) # 待受の開始 listener.start() workers = [] for i in range(10): worker = multiprocessing.Process(target=worker_process, args=(i, log_queue)) workers.append(worker) worker.start() for w in workers: w.join() listener.stop() if __name__ == '__main__': main()
なお、log_queue
へ追加されていくログの実態は LogRecord クラスとなっているため、ハンドラを介さず自前でLogRecordを生成して log_queue
にputしても出力される(あまり意味は無い)。
その他
Poolで使う
はじめに処理させたい引数群をごそっと作っておいて、使うプロセス数上限を指定しつつPoolくんに丸投げすると、 1つの処理が終わり次第、残っている中から次の1つの処理を開始するよう制御してくれる便利なやつ。
ただ、PoolでQueueを引数に与えるとエラーになる。(これは特にQueueHandler用のQueueに限らないが)
def worker_process(i, log_queue): # 略 def main(): log_queue = multiprocessing.Queue() # ハンドラの生成、QueueListenerの設定、待受の開始 # 略 args = [(i, log_queue) for i in range(10)] with multiprocessing.Pool(10) as pool: pool.starmap(worker_process, args)
RuntimeError: Queue objects should only be shared between processes through inheritance
multiprocessing.Manager().Queue()
を使うとよい。
def worker_process(i, log_queue): # 略 def main(): log_queue = multiprocessing.Manager().Queue() # ハンドラの生成、QueueListenerの設定、待受の開始 # 略 args = [(i, log_queue) for i in range(10)] with multiprocessing.Pool(10) as pool: pool.starmap(worker_process, args)