Python で Multi process (して、更に Signal で安全に終了させる)
概要
- Python のマルチプロセス実装を試してみた。
- 更に、signal を取り入れて「Ctrl + C」や「kill」で安全に全プロセスを終了させるようにしてみた。
実験環境
- Linux (CentOS 7)
- Python 2.7
サンプルコード概要
メインプロセス
- 3つのサブプロセスを立ち上げ、その後シグナル待ち。
- シグナルを受信したら、終了フラグを投げてサブプロセス終了待ち。
- サブプロセス全部終わったら終了。
サブプロセス 1
- inc queue に数字を詰め込み、sleep。を繰り返す。
- 終了フラグ受信したら終了。
サブプロセス 2
- inc queue から数字を取得し、掛け算して mul queue に詰め込んで、sleep。を繰り返す。
- 終了フラグ受信したら終了。
サブプロセス 3
- mul queue からデータを取得して print し、sleep。を繰り返す。
- 終了フラグ受信したら終了。
実行結果
通常実行し、Ctrl + C で終了してみる。
$ python execute.py Start subprocess 1... Start subprocess 3... Start subprocess 2... Value : 2 Value : 4 Value : 6 Value : 8 Value : 10 Value : 12 Value : 14 ( ここで Ctrl + C ) Stop subprocess 2. Stop subprocess 3. Stop subprocess 1. Complete !!!
- sleep のタイミング等あり、終わる順番は一意でない。
nohup でバックグラウンド実行し、後で kill してみる。
$ nohup python execute.py $ ( 別コンソール ) $ ps ax | grep python 17143 pts/0 S 0:00 python execute.py 17144 pts/0 Sl 0:00 python execute.py 17145 pts/0 Sl 0:00 python execute.py 17146 pts/0 S 0:00 python execute.py 17199 pts/1 S+ 0:00 grep --color=auto python $ kill 17143 $ ps ax | grep python 17199 pts/1 S+ 0:00 grep --color=auto python ( => 正常にスクリプト終了した!! )
ソースコード (概要)
サブプロセス
- クラス化した。子プロセス処理は「run」メソッド。
- self も使える。
- run メソッド内で、シグナルを無効にしておく。(メインからのstop_flagで終了させるので。)
import time class SubProcess(object) : def __init__(self) : # 初期化 pass def run(self, queue, stop_flag) : signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) # 子プロセス処理、ループ回す。 while True : if stop_flag.is_set() : break time.sleep(0.01)
メインプロセス
- Queue、Event 定義、サブプロセス定義。
from multiprocessing import Process, Queue, Event if __name__ == '__main__' : queue = Queue() stop_flag = Event() sub = SubProcess() sub_process = Process(target = sub.run, args = (queue, stop_flag))
- サブプロセス実行してシグナル受信待ち。シグナル受信したら stop_flag 設定。
processes = [sub_process] for p in processes : p.start() def signalHandler(signal, handler) : stop_flag.set() signal.signal(signal.SIGINT, signalHandler) signal.signal(signal.SIGTERM, signalHandler) signal.pause()
- シグナル受信後のサブプロセス終了待ち。
for p in processes : p.join() for p in processes : p.terminate()
ソースコード全文
# -*- coding: utf-8 -*- """ Multi process サンプル 複数のプロセスを立ち上げ、Ctrl + C や kill で安全に終了するサンプル。 プロセス 1 inc_q に 数字を詰め込み、sleep する。 プロセス 2 inc_q からデータを取得し、掛け算して mul_q に詰め込む。sleep する。 プロセス 3 mul_q からデータを取得して、print する。 """ import signal import time from multiprocessing import Process, Queue, Event ############################## ### Settings ### ############################## # 休息時間 (秒) SLEEP_SEC = 1 ############################## ### Sub processes ### ############################## class SubProcess1(object) : def __init__(self) : self.first = 0 self.step = 1 def run(self, inc_q, stop_flag) : ### Signal disable signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) print "Start subprocess 1..." count = 0 while True : if stop_flag.is_set() : break count += 1 inc_q.put(self.first + self.step * count) time.sleep(SLEEP_SEC) print "Stop subprocess 1." class SubProcess2(object) : def __init__(self) : self.coef = 2 def run(self, inc_q, mul_q, stop_flag) : ### Signal disable signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) print "Start subprocess 2..." while True : if stop_flag.is_set() : break if not inc_q.empty() : value = inc_q.get() mul_q.put(value * self.coef) time.sleep(SLEEP_SEC) print "Stop subprocess 2." class SubProcess3(object) : def __init__(self) : self.temp = "Value : {0}" def run(self, mul_q, stop_flag) : ### Signal disable signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) print "Start subprocess 3..." while True : if stop_flag.is_set() : break if not mul_q.empty() : value = mul_q.get() print self.temp.format(value) time.sleep(SLEEP_SEC) print "Stop subprocess 3." ############################## ### Main ### ############################## if __name__ == '__main__' : ### Queue inc_q = Queue() mul_q = Queue() ### Event stop_flag = Event() ### Sub processes sub1 = SubProcess1() sub2 = SubProcess2() sub3 = SubProcess3() sub1_process = Process(target = sub1.run, args = (inc_q, stop_flag)) sub2_process = Process(target = sub2.run, args = (inc_q, mul_q, stop_flag)) sub3_process = Process(target = sub3.run, args = (mul_q, stop_flag)) ### Start sub processes processes = [sub1_process, sub2_process, sub3_process] for p in processes : p.start() ### Signal settings def signalHandler(signal, handler) : stop_flag.set() signal.signal(signal.SIGINT, signalHandler) signal.signal(signal.SIGTERM, signalHandler) ### Wait subprocess stop # (2016/02/04) 追加。こっちで子プロセス終了待ちした方が賢明な気がしてる。 while True : alive_flag = False for p in processes : if p.is_alive() : alive_flag = True break if alive_flag : time.sleep(0.1) continue break """ # (2016/02/04) # 修正。join して terminate とかいれると終わらないことが orz... signal.pause() ### Wait subprocess join for p in processes : p.join() for p in processes : p.terminate() """ print "Complete !!!"
- (2016/02/04) 修正。サブプロセスの待ちを「pause」を使わず、ずっと監視するようにした。
- (join → terminate してたせい?) たまに終わらないことがあったり。
- 無限ループじゃない仕組みの場合、pause 入っていると Ctrl + C を入れないと終わらなかったり。。。