雑食性雑感雑記

知識の整理場。ため込んだ知識をブログ記事として再構築します。

Python threading 処理で高速化

概要

  • Python のモジュール「Threading」を使って並列化処理を実装してみる。
  • 逐次処理版と速度比較してみる。

サンプル

サンプル概要

  • 2つの処理 A、B を順次動かす。
    • (処理A) 1行 ~ 10行のCSVファイルを作成。1行に (x, y, r)。
    • (処理B) CSVファイルを読み込み、(x, y, r) の円を行数分描いた画像を作成。

逐次処理版

  • 01.py
# -*- coding: utf-8 -*-

import csv
import Image
import ImageDraw
import os
import random as rnd
import sys


DEFAULT_TEXT_DIR  = "./texts"
DEFAULT_IMAGE_DIR = "./images"


##################################################
# Creator class                                  #
##################################################
class Creator :

    # Const.
    IMAGE_WIDTH  = 256
    IMAGE_HEIGHT = 256

    CIRCLES_NUM_MIN =  1
    CIRCLES_NUM_MAX = 10

    CIRCLES_RADIUS_MIN = 10
    CIRCLES_RADIUS_MAX = 50


    def __init__(self, text_dir = DEFAULT_TEXT_DIR, image_dir = DEFAULT_IMAGE_DIR) :
        self.text_dir  = text_dir
        self.image_dir = image_dir

        if not os.path.exists(text_dir) :
            os.mkdir(text_dir)

        if not os.path.exists(image_dir) :
            os.mkdir(image_dir)


    def create(self, count) :
        """
        count数だけデータ作成
        """

        ### テキスト作成
        filepaths = []
        for idx in xrange(count) :
            filepath = self._createText(idx + 1)
            filepaths.append(filepath)

        ### テキスト → 画像変換
        for filepath in filepaths :
            self._createImage(filepath)


    def _createText(self, text_id) :
        """
        画像情報テキスト作成
        作成完了後、テキストファイルパスを返す。
        """
        circle_count = rnd.randrange(self.CIRCLES_NUM_MIN, self.CIRCLES_NUM_MAX + 1)

        text  = ""
        count = 0
        while count < circle_count :

            x = rnd.randrange(0, self.IMAGE_WIDTH)
            y = rnd.randrange(0, self.IMAGE_HEIGHT)
            r = rnd.randrange(self.CIRCLES_RADIUS_MIN, self.CIRCLES_RADIUS_MAX)

            # はみ出たら作成し直し
            if x + r >= self.IMAGE_WIDTH or y + r >= self.IMAGE_HEIGHT :
                continue

            text  += "{0},{1},{2}\n".format(x, y, r)
            count += 1

        # 書き出し
        filepath = "{0}/{1}.txt".format(self.text_dir, text_id)
        f = open(filepath, 'w')
        f.writelines(text)
        f.close()

        return filepath


    def _createImage(self, filepath) :
        """
        画像情報ファイルを読み込み、画像ファイルを作成
        """
        basename  = os.path.basename(filepath)
        name, ext = os.path.splitext(basename)

        f      = open(filepath)
        reader = csv.reader(f, lineterminator = '\n')

        circles = []
        for row in reader :
            circles.append({'x': int(row[0]), 'y': int(row[1]), 'r': int(row[2])})
        f.close()

        ### 色決定
        background_color = _getRandomColor()
        foreground_color = _getRandomColor()
        while background_color == foreground_color :
            foreground_color = _getRandomColor()

        ### 画像作成
        img = Image.new('RGB', (self.IMAGE_WIDTH, self.IMAGE_HEIGHT), background_color)
        dr  = ImageDraw.Draw(img)

        for circle in circles :
            dr.ellipse(((circle['x'], circle['y']), (circle['x'] + circle['r'], circle['y'] + circle['r'])), outline = (0, 0, 0), fill = foreground_color)

        img.save("{0}/{1}.png".format(self.image_dir, name))


##################################################
# Utils                                          #
##################################################
def _getRandomColor() :
    return (rnd.randrange(0, 255), rnd.randrange(0, 255), rnd.randrange(0, 255))


##################################################
# Main                                           #
##################################################
if __name__ == '__main__' :

    argv = sys.argv
    argc = len(argv)

    if argc < 2 :
        print "Usage: python {0} <count>".format(argv[0])
        quit()

    count = int(argv[1])

    creator = Creator()
    creator.create(count)

並列処理版

  • 02.py (逐次処理版との差分のみ)
# Added
import Queue
import time
import threading

class Creator : 

    def __init__(self, text_dir = DEFAULT_TEXT_DIR, image_dir = DEFAULT_IMAGE_DIR) :
        self.text_dir  = text_dir
        self.image_dir = image_dir

        if not os.path.exists(text_dir) :
            os.mkdir(text_dir)

        if not os.path.exists(image_dir) :
            os.mkdir(image_dir)

        # Added
        self.shutdown = threading.Event()


    def create(self, count) :
        """
        count数だけデータ作成
        """

        ### Threads settings
        thread_count = 10

        ### 処理用Queue
        self.create_text_queue  = Queue.Queue()
        self.create_image_queue = Queue.Queue()

        ### 制御用 Flag
        self.create_text_flag  = threading.Event()
        self.create_image_flag = threading.Event()

        ### 結果取得用Queue
        self.create_text_result  = Queue.Queue()
        self.create_image_result = Queue.Queue()

        ### 処理用Queue にID追加
        for idx in xrange(count) :
            self.create_text_queue.put(idx + 1)

        ### Thread 開始
        for i in xrange(thread_count) :
            pt = threading.Thread(target=self._createText_t)
            pt.daemon = True
            pt.start()

            pi = threading.Thread(target=self._createImage_t)
            pi.daemon = True
            pi.start()

        # NOTE: フラグを立てることで、Queue が空になった時点で Thread 終了。
        self.create_text_flag.set()

        ### Thread 終了待ち
        wait_time = time.time()
        create_text_threads_done = 0
        create_image_threads_done = 0
        while create_image_threads_done < thread_count :

            if self.shutdown.is_set() :
                return False

            if time.time() - wait_time > 2 :
                print "Processed {0}/{1}".format(count - self.create_text_queue.qsize(), count)
                wait_time = time.time()

            ### Text 作成終わったら Image作成処理 Thread フラグ立てる
            ### → Queue が空になったら Thread 終了
            if not self.create_image_flag.is_set() and create_text_threads_done == thread_count :
                self.create_image_flag.set()

            ### Text 作成結果 Queue 格納時 ( = 1 thread 完了 )
            while not self.create_text_result.empty() :
                process_count = self.create_text_result.get()
                create_text_threads_done += 1

            ### Write 作成結果 Queue 格納時 ( = 1 thread 完了 )
            while not self.create_image_result.empty() :
                process_count = self.create_image_result.get()
                create_image_threads_done += 1

            try :
                time.sleep(0.2)
            except KeyboardInterrupt :
                self.shutdown.set()

        self.shutdown.set()
        return True


    def _createText_t(self) :

        process_count = 0
        while not self.create_text_flag.is_set() or not self.create_text_queue.empty() :
            if self.shutdown.is_set() :
                return

            text_id = None
            try :
                text_id = self.create_text_queue.get(True, 0.05)
            except Queue.Empty :
                continue

            ### Create text file
            filepath = self._createText(text_id)

            self.create_image_queue.put(filepath)
            process_count += 1

        self.create_text_result.put(process_count)
        return True


    def _createImage_t(self) :

        process_count = 0
        while not self.create_image_flag.is_set() or not self.create_image_queue.empty() :
            if self.shutdown.is_set() :
                return

            filepath = None
            try :
                filepath = self.create_image_queue.get(True, 0.05)
            except Queue.Empty :
                continue

            ### Create image
            self._createImage(filepath)

            process_count += 1

        self.create_image_result.put(process_count)
        return True

速度比較

  • 100枚、1,000枚、10,000枚、100,000枚で実行
$ time python 01.py 100
    real    0m0.260s
    user    0m0.243s
    sys     0m0.016s

$ time python 01.py 1000
    real    0m2.410s
    user    0m2.312s
    sys     0m0.099s

$ time python 01.py 10000
    real    0m24.124s
    user    0m23.176s
    sys     0m0.935s

$ time python 01.py 100000
    real    4m2.286s
    user    3m51.366s
    sys     0m8.095s

$ time python 02.py 100
    real    0m0.835s
    user    0m0.452s
    sys     0m0.048s

$ time python 02.py 1000
    real    0m1.455s
    user    0m4.338s
    sys     0m0.320s

$ time python 02.py 10000
    real    0m11.129s
    user    0m44.010s
    sys     0m3.043s

$ time python 02.py 100000
    real    1m46.127s
    user    7m11.919s
    sys     0m28.289s

  • real time だけグラフ化すると…

f:id:kazuki_nagasawa:20150502005523p:plain

備考

  • NVIDIA DiGiTS のソースコードを読んでいて、データインポート処理にThreading使っていたので、参考にしました。

所感

  • 簡単なマルチスレッド処理なら Pool を使う方が良いかも。
  • 細かなスレッド処理を構築するなら、こっちですね。もっと工夫して色々できそう。