概要
- Python のマルチプロセス実装を試してみた。
- 更に、signal を取り入れて「Ctrl + C」や「kill」で安全に全プロセスを終了させるようにしてみた。
実験環境
- Linux (CentOS 7)
- Python 2.7
サンプルコード概要
メインプロセス
- 3つのサブプロセスを立ち上げ、その後シグナル待ち。
- シグナルを受信したら、終了フラグを投げてサブプロセス終了待ち。
- サブプロセス全部終わったら終了。
サブプロセス 1
- inc queue に数字を詰め込み、sleep。を繰り返す。
- 終了フラグ受信したら終了。
サブプロセス 2
- inc queue から数字を取得し、掛け算して mul queue に詰め込んで、sleep。を繰り返す。
- 終了フラグ受信したら終了。
サブプロセス 3
- mul queue からデータを取得して print し、sleep。を繰り返す。
- 終了フラグ受信したら終了。
実行結果
通常実行し、Ctrl + C で終了してみる。
$ python execute.py
Start subprocess 1...
Start subprocess 3...
Start subprocess 2...
Value : 2
Value : 4
Value : 6
Value : 8
Value : 10
Value : 12
Value : 14
( ここで Ctrl + C )
Stop subprocess 2.
Stop subprocess 3.
Stop subprocess 1.
Complete !!!
- sleep のタイミング等あり、終わる順番は一意でない。
nohup でバックグラウンド実行し、後で kill してみる。
$ nohup python execute.py $
( 別コンソール )
$ ps ax | grep python
17143 pts/0 S 0:00 python execute.py
17144 pts/0 Sl 0:00 python execute.py
17145 pts/0 Sl 0:00 python execute.py
17146 pts/0 S 0:00 python execute.py
17199 pts/1 S+ 0:00 grep --color=auto python
$ kill 17143
$ ps ax | grep python
17199 pts/1 S+ 0:00 grep --color=auto python
( => 正常にスクリプト終了した!! )
ソースコード (概要)
サブプロセス
- クラス化した。子プロセス処理は「run」メソッド。
- self も使える。
- run メソッド内で、シグナルを無効にしておく。(メインからのstop_flagで終了させるので。)
import time
class SubProcess(object) :
def __init__(self) :
pass
def run(self, queue, stop_flag) :
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
while True :
if stop_flag.is_set() :
break
time.sleep(0.01)
メインプロセス
from multiprocessing import Process, Queue, Event
if __name__ == '__main__' :
queue = Queue()
stop_flag = Event()
sub = SubProcess()
sub_process = Process(target = sub.run, args = (queue, stop_flag))
- サブプロセス実行してシグナル受信待ち。シグナル受信したら stop_flag 設定。
processes = [sub_process]
for p in processes :
p.start()
def signalHandler(signal, handler) :
stop_flag.set()
signal.signal(signal.SIGINT, signalHandler)
signal.signal(signal.SIGTERM, signalHandler)
signal.pause()
for p in processes :
p.join()
for p in processes :
p.terminate()
ソースコード全文
"""
Multi process サンプル
複数のプロセスを立ち上げ、Ctrl + C や kill で安全に終了するサンプル。
プロセス 1
inc_q に 数字を詰め込み、sleep する。
プロセス 2
inc_q からデータを取得し、掛け算して mul_q に詰め込む。sleep する。
プロセス 3
mul_q からデータを取得して、print する。
"""
import signal
import time
from multiprocessing import Process, Queue, Event
SLEEP_SEC = 1
class SubProcess1(object) :
def __init__(self) :
self.first = 0
self.step = 1
def run(self, inc_q, stop_flag) :
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
print "Start subprocess 1..."
count = 0
while True :
if stop_flag.is_set() :
break
count += 1
inc_q.put(self.first + self.step * count)
time.sleep(SLEEP_SEC)
print "Stop subprocess 1."
class SubProcess2(object) :
def __init__(self) :
self.coef = 2
def run(self, inc_q, mul_q, stop_flag) :
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
print "Start subprocess 2..."
while True :
if stop_flag.is_set() :
break
if not inc_q.empty() :
value = inc_q.get()
mul_q.put(value * self.coef)
time.sleep(SLEEP_SEC)
print "Stop subprocess 2."
class SubProcess3(object) :
def __init__(self) :
self.temp = "Value : {0}"
def run(self, mul_q, stop_flag) :
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
print "Start subprocess 3..."
while True :
if stop_flag.is_set() :
break
if not mul_q.empty() :
value = mul_q.get()
print self.temp.format(value)
time.sleep(SLEEP_SEC)
print "Stop subprocess 3."
if __name__ == '__main__' :
inc_q = Queue()
mul_q = Queue()
stop_flag = Event()
sub1 = SubProcess1()
sub2 = SubProcess2()
sub3 = SubProcess3()
sub1_process = Process(target = sub1.run, args = (inc_q, stop_flag))
sub2_process = Process(target = sub2.run, args = (inc_q, mul_q, stop_flag))
sub3_process = Process(target = sub3.run, args = (mul_q, stop_flag))
processes = [sub1_process, sub2_process, sub3_process]
for p in processes :
p.start()
def signalHandler(signal, handler) :
stop_flag.set()
signal.signal(signal.SIGINT, signalHandler)
signal.signal(signal.SIGTERM, signalHandler)
while True :
alive_flag = False
for p in processes :
if p.is_alive() :
alive_flag = True
break
if alive_flag :
time.sleep(0.1)
continue
break
"""
# (2016/02/04)
# 修正。join して terminate とかいれると終わらないことが orz...
signal.pause()
### Wait subprocess join
for p in processes :
p.join()
for p in processes :
p.terminate()
"""
print "Complete !!!"
- (2016/02/04) 修正。サブプロセスの待ちを「pause」を使わず、ずっと監視するようにした。
- (join → terminate してたせい?) たまに終わらないことがあったり。
- 無限ループじゃない仕組みの場合、pause 入っていると Ctrl + C を入れないと終わらなかったり。。。