Pythonで並列処理を行うのに基本となるモジュールがmultiprocessing。
並列処理はそれだけで他のモジュールの1000倍くらい(適当)必要な前提知識と特有の留意点があって深追いするとキリが無いため、 あまり深入りせずいくつかの話題を掻い摘まむ程度でまとめたい。
並列処理で困ることの1つが、複数のプロセスが同時に1つのログファイルに書き込もうとしたらレコードが消えたりファイルが壊れたりする問題。
解決策としては、
可能なら3番目がスマート。
multiprocessingでは、3番目を比較的簡単に行える仕組みが存在する。1)
まず、multiprocessingには、プロセス間通信で共有データを扱うための multiprocessing.Queue
というFIFO形式のデータ構造が存在する。
(特にログに用途を限定したものではない)
これをログの出力先にするハンドラが、QueueHandler。
ファイルを出力先にするFileHandler、コンソール等を出力先にするStreamHandlerと同じ感じ。
で、溜められたログを取り出して書き出すのがQueueListener。
単独スレッドとして立ち上げておき、外部への出力はこいつだけが行うように集約することで、プロセス間の交通整理ができる。
QueueHandler, QueueListenerは、インスタンス生成時に具体的なQueueインスタンスを指定してやる必要がある。
まぁListenerの方は親プロセスで指定してやれば子プロセスからどうこうする必要は無いのだが、
Handlerの方は子プロセスが扱うloggerに登録して使うので、当然プロセス間で同じQueueを指定しないといけない。
何らかの方法で共有する必要がある。
前提として、multiprocessingでは、WindowsとLinuxでプロセスの立ち上げ方が異なる。
WindowsとMacは spawn
で、Linuxは fork
で新規プロセスが立ち上がる。
この違いは、以下のサイトにさくっと説明されている。
ざっくりいうと、
特にグローバル変数の違いはたまにハマる。
適当なサイトでのサンプルコードをコピーしても、それがLinux用に書かれたもので、 グローバル変数の共有を前提としていた場合、Windowsでは動かなかったりする。
その上でQueueHandlerに話を戻すと、指定するQueueインスタンスに関して、
という違いが生じる。
別にLinuxでも2番目の方法で動くので、多少冗長にはなるが、Windowsでも動く方法で書いておくと保守性が高まる。
Python公式ドキュメントのサンプルは、Windowsでも動く書き方となっている。
ただ、いろいろできることを示すため一般的には不要な部分もあるサンプルとなっているため、よく使いそうな構成に絞ると、以下の感じだろうか。
# You'll need these imports in your own code import logging import logging.handlers import multiprocessing # Next import lines for this demo only from random import random import sys import time # サブプロセス def worker_process(i: int, log_queue: multiprocessing.Queue): # ログの取得・Queueの登録 logger = logging.getLogger() # loggerの取得 handler = logging.handlers.QueueHandler(log_queue) # QueueHandlerの生成 logger.addHandler(handler) # loggerへhandlerを登録 logger.setLevel(logging.DEBUG) # loggerへレベル設定(任意) name = multiprocessing.current_process().name logger.info(f'Worker started: {name}') for j in range(10): time.sleep(random()) logger.info(f'{name} {i} {j}') logger.info(f'Worker finished: {name}') def main(): log_queue = multiprocessing.Queue() # ハンドラの生成と設定 # ファイル mptest4.log と標準エラー stderr の2つへ出力する file_handler = logging.handlers.RotatingFileHandler('mptest4.log', 'a', 3000, 5) console_handler = logging.StreamHandler(sys.stderr) file_formatter = logging.Formatter('%(asctime)s %(processName)-10s %(levelname)-8s %(message)s') file_handler.setFormatter(file_formatter) console_formatter = logging.Formatter("%(processName)s %(message)s") console_handler.setFormatter(console_formatter) # QueueListenerを、QueueとHandlerを指定して生成 listener = logging.handlers.QueueListener(log_queue, console_handler, file_handler) # 待受の開始 listener.start() workers = [] for i in range(10): worker = multiprocessing.Process(target=worker_process, args=(i, log_queue)) workers.append(worker) worker.start() for w in workers: w.join() listener.stop() if __name__ == '__main__': main()
なお、log_queue
へ追加されていくログの実態は LogRecord クラスとなっているため、ハンドラを介さず自前でLogRecordを生成して log_queue
にputしても出力される(あまり意味は無い)。
はじめに処理させたい引数群をごそっと作っておいて、使うプロセス数上限を指定しつつPoolくんに丸投げすると、 1つの処理が終わり次第、残っている中から次の1つの処理を開始するよう制御してくれる便利なやつ。
ただ、PoolでQueueを引数に与えるとエラーになる。(これは特にQueueHandler用のQueueに限らないが)
def worker_process(i, log_queue): # 略 def main(): log_queue = multiprocessing.Queue() # ハンドラの生成、QueueListenerの設定、待受の開始 # 略 args = [(i, log_queue) for i in range(10)] with multiprocessing.Pool(10) as pool: pool.starmap(worker_process, args)
RuntimeError: Queue objects should only be shared between processes through inheritance
multiprocessing.Manager().Queue()
を使うとよい。
def worker_process(i, log_queue): # 略 def main(): log_queue = multiprocessing.Manager().Queue() # ハンドラの生成、QueueListenerの設定、待受の開始 # 略 args = [(i, log_queue) for i in range(10)] with multiprocessing.Pool(10) as pool: pool.starmap(worker_process, args)