読者です 読者をやめる 読者になる 読者になる

雑食性雑感雑記

知識の整理場。ため込んだ知識をブログ記事として再構築します。

Python で Multi process (して、更に Signal で安全に終了させる)

概要

  • Python のマルチプロセス実装を試してみた。
  • 更に、signal を取り入れて「Ctrl + C」や「kill」で安全に全プロセスを終了させるようにしてみた。

実験環境

  • Linux (CentOS 7)
  • Python 2.7

サンプルコード概要

メインプロセス

  • 3つのサブプロセスを立ち上げ、その後シグナル待ち。
  • シグナルを受信したら、終了フラグを投げてサブプロセス終了待ち。
  • サブプロセス全部終わったら終了。

サブプロセス 1

  • inc queue に数字を詰め込み、sleep。を繰り返す。
  • 終了フラグ受信したら終了。

サブプロセス 2

  • inc queue から数字を取得し、掛け算して mul queue に詰め込んで、sleep。を繰り返す。
  • 終了フラグ受信したら終了。

サブプロセス 3

  • mul queue からデータを取得して print し、sleep。を繰り返す。
  • 終了フラグ受信したら終了。

実行結果

通常実行し、Ctrl + C で終了してみる。

$ python execute.py
Start subprocess 1...
Start subprocess 3...
Start subprocess 2...
Value : 2
Value : 4
Value : 6
Value : 8
Value : 10
Value : 12
Value : 14
( ここで Ctrl + C )
Stop subprocess 2.
Stop subprocess 3.
Stop subprocess 1.
Complete !!!
  • sleep のタイミング等あり、終わる順番は一意でない。

nohup でバックグラウンド実行し、後で kill してみる。

$ nohup python execute.py $

( 別コンソール )
$ ps ax | grep python
17143 pts/0    S      0:00 python execute.py
17144 pts/0    Sl     0:00 python execute.py
17145 pts/0    Sl     0:00 python execute.py
17146 pts/0    S      0:00 python execute.py
17199 pts/1    S+     0:00 grep --color=auto python

$ kill 17143

$ ps ax | grep python
17199 pts/1    S+     0:00 grep --color=auto python
( => 正常にスクリプト終了した!! )

ソースコード (概要)

サブプロセス

  • クラス化した。子プロセス処理は「run」メソッド。
  • self も使える。
  • run メソッド内で、シグナルを無効にしておく。(メインからのstop_flagで終了させるので。)
import time

class SubProcess(object) :

    def __init__(self) :
        # 初期化
        pass

    def run(self, queue, stop_flag) :
        signal.signal(signal.SIGINT,  signal.SIG_IGN)
        signal.signal(signal.SIGTERM, signal.SIG_IGN)

        # 子プロセス処理、ループ回す。
        while True :
            if stop_flag.is_set() :
                break
            time.sleep(0.01)

メインプロセス

  • Queue、Event 定義、サブプロセス定義。
from multiprocessing import Process, Queue, Event

if __name__ == '__main__' :

    queue     = Queue()
    stop_flag = Event()

    sub = SubProcess()
    sub_process = Process(target = sub.run, args = (queue, stop_flag))
  • サブプロセス実行してシグナル受信待ち。シグナル受信したら stop_flag 設定。
    processes = [sub_process]
    for p in processes :
        p.start()

    def signalHandler(signal, handler) :
        stop_flag.set()

    signal.signal(signal.SIGINT,  signalHandler)
    signal.signal(signal.SIGTERM, signalHandler)
    signal.pause()
  • シグナル受信後のサブプロセス終了待ち。
    for p in processes :
        p.join()
    for p in processes :
        p.terminate()

ソースコード全文

# -*- coding: utf-8 -*-

"""
Multi process サンプル
複数のプロセスを立ち上げ、Ctrl + C や kill で安全に終了するサンプル。

プロセス 1
    inc_q に 数字を詰め込み、sleep する。

プロセス 2
    inc_q からデータを取得し、掛け算して mul_q に詰め込む。sleep する。

プロセス 3
    mul_q からデータを取得して、print する。
"""

import signal
import time

from multiprocessing import Process, Queue, Event


##############################
### Settings               ###
##############################

# 休息時間 (秒)
SLEEP_SEC = 1


##############################
### Sub processes          ###
##############################
class SubProcess1(object) :

    def __init__(self) :
        self.first = 0
        self.step  = 1

    def run(self, inc_q, stop_flag) :

        ### Signal disable
        signal.signal(signal.SIGINT,  signal.SIG_IGN)
        signal.signal(signal.SIGTERM, signal.SIG_IGN)

        print "Start subprocess 1..."

        count = 0
        while True :
            if stop_flag.is_set() :
                break

            count += 1
            inc_q.put(self.first + self.step * count)

            time.sleep(SLEEP_SEC)

        print "Stop subprocess 1."


class SubProcess2(object) :

    def __init__(self) :
        self.coef = 2

    def run(self, inc_q, mul_q, stop_flag) :

        ### Signal disable
        signal.signal(signal.SIGINT,  signal.SIG_IGN)
        signal.signal(signal.SIGTERM, signal.SIG_IGN)

        print "Start subprocess 2..."

        while True :
            if stop_flag.is_set() :
                break

            if not inc_q.empty() :
                value = inc_q.get()
                mul_q.put(value * self.coef)

            time.sleep(SLEEP_SEC)

        print "Stop subprocess 2."


class SubProcess3(object) :

    def __init__(self) :
        self.temp = "Value : {0}"

    def run(self, mul_q, stop_flag) :

        ### Signal disable
        signal.signal(signal.SIGINT,  signal.SIG_IGN)
        signal.signal(signal.SIGTERM, signal.SIG_IGN)

        print "Start subprocess 3..."

        while True :
            if stop_flag.is_set() :
                break

            if not mul_q.empty() :
                value = mul_q.get()
                print self.temp.format(value)

            time.sleep(SLEEP_SEC)

        print "Stop subprocess 3."


##############################
### Main                   ###
##############################
if __name__ == '__main__' :

    ### Queue
    inc_q = Queue()
    mul_q = Queue()

    ### Event
    stop_flag = Event()


    ### Sub processes
    sub1 = SubProcess1()
    sub2 = SubProcess2()
    sub3 = SubProcess3()

    sub1_process = Process(target = sub1.run, args = (inc_q, stop_flag))
    sub2_process = Process(target = sub2.run, args = (inc_q, mul_q, stop_flag))
    sub3_process = Process(target = sub3.run, args = (mul_q, stop_flag))


    ### Start sub processes
    processes = [sub1_process, sub2_process, sub3_process]
    for p in processes :
        p.start()


    ### Signal settings
    def signalHandler(signal, handler) :
        stop_flag.set()

    signal.signal(signal.SIGINT,  signalHandler)
    signal.signal(signal.SIGTERM, signalHandler)

    ### Wait subprocess stop
    # (2016/02/04) 追加。こっちで子プロセス終了待ちした方が賢明な気がしてる。
    while True :
        alive_flag = False
        for p in processes :
            if p.is_alive() :
                alive_flag = True
                break
        if alive_flag :
            time.sleep(0.1)
            continue
        break

    """
    # (2016/02/04)
    # 修正。join して terminate とかいれると終わらないことが orz...
    signal.pause()


    ### Wait subprocess join
    for p in processes :
        p.join()
    for p in processes :
        p.terminate()
    """

    print "Complete !!!"

  • (2016/02/04) 修正。サブプロセスの待ちを「pause」を使わず、ずっと監視するようにした。
    • (join → terminate してたせい?) たまに終わらないことがあったり。
    • 無限ループじゃない仕組みの場合、pause 入っていると Ctrl + C を入れないと終わらなかったり。。。