Python threading 処理で高速化
概要
- Python のモジュール「Threading」を使って並列化処理を実装してみる。
- 逐次処理版と速度比較してみる。
情報
- Python threading
サンプル
サンプル概要
- 2つの処理 A、B を順次動かす。
- (処理A) 1行 ~ 10行のCSVファイルを作成。1行に (x, y, r)。
- (処理B) CSVファイルを読み込み、(x, y, r) の円を行数分描いた画像を作成。
逐次処理版
- 01.py
# -*- coding: utf-8 -*- import csv import Image import ImageDraw import os import random as rnd import sys DEFAULT_TEXT_DIR = "./texts" DEFAULT_IMAGE_DIR = "./images" ################################################## # Creator class # ################################################## class Creator : # Const. IMAGE_WIDTH = 256 IMAGE_HEIGHT = 256 CIRCLES_NUM_MIN = 1 CIRCLES_NUM_MAX = 10 CIRCLES_RADIUS_MIN = 10 CIRCLES_RADIUS_MAX = 50 def __init__(self, text_dir = DEFAULT_TEXT_DIR, image_dir = DEFAULT_IMAGE_DIR) : self.text_dir = text_dir self.image_dir = image_dir if not os.path.exists(text_dir) : os.mkdir(text_dir) if not os.path.exists(image_dir) : os.mkdir(image_dir) def create(self, count) : """ count数だけデータ作成 """ ### テキスト作成 filepaths = [] for idx in xrange(count) : filepath = self._createText(idx + 1) filepaths.append(filepath) ### テキスト → 画像変換 for filepath in filepaths : self._createImage(filepath) def _createText(self, text_id) : """ 画像情報テキスト作成 作成完了後、テキストファイルパスを返す。 """ circle_count = rnd.randrange(self.CIRCLES_NUM_MIN, self.CIRCLES_NUM_MAX + 1) text = "" count = 0 while count < circle_count : x = rnd.randrange(0, self.IMAGE_WIDTH) y = rnd.randrange(0, self.IMAGE_HEIGHT) r = rnd.randrange(self.CIRCLES_RADIUS_MIN, self.CIRCLES_RADIUS_MAX) # はみ出たら作成し直し if x + r >= self.IMAGE_WIDTH or y + r >= self.IMAGE_HEIGHT : continue text += "{0},{1},{2}\n".format(x, y, r) count += 1 # 書き出し filepath = "{0}/{1}.txt".format(self.text_dir, text_id) f = open(filepath, 'w') f.writelines(text) f.close() return filepath def _createImage(self, filepath) : """ 画像情報ファイルを読み込み、画像ファイルを作成 """ basename = os.path.basename(filepath) name, ext = os.path.splitext(basename) f = open(filepath) reader = csv.reader(f, lineterminator = '\n') circles = [] for row in reader : circles.append({'x': int(row[0]), 'y': int(row[1]), 'r': int(row[2])}) f.close() ### 色決定 background_color = _getRandomColor() foreground_color = _getRandomColor() while background_color == foreground_color : foreground_color = _getRandomColor() ### 画像作成 img = Image.new('RGB', (self.IMAGE_WIDTH, self.IMAGE_HEIGHT), background_color) dr = ImageDraw.Draw(img) for circle in circles : dr.ellipse(((circle['x'], circle['y']), (circle['x'] + circle['r'], circle['y'] + circle['r'])), outline = (0, 0, 0), fill = foreground_color) img.save("{0}/{1}.png".format(self.image_dir, name)) ################################################## # Utils # ################################################## def _getRandomColor() : return (rnd.randrange(0, 255), rnd.randrange(0, 255), rnd.randrange(0, 255)) ################################################## # Main # ################################################## if __name__ == '__main__' : argv = sys.argv argc = len(argv) if argc < 2 : print "Usage: python {0} <count>".format(argv[0]) quit() count = int(argv[1]) creator = Creator() creator.create(count)
並列処理版
- 02.py (逐次処理版との差分のみ)
# Added import Queue import time import threading class Creator : def __init__(self, text_dir = DEFAULT_TEXT_DIR, image_dir = DEFAULT_IMAGE_DIR) : self.text_dir = text_dir self.image_dir = image_dir if not os.path.exists(text_dir) : os.mkdir(text_dir) if not os.path.exists(image_dir) : os.mkdir(image_dir) # Added self.shutdown = threading.Event() def create(self, count) : """ count数だけデータ作成 """ ### Threads settings thread_count = 10 ### 処理用Queue self.create_text_queue = Queue.Queue() self.create_image_queue = Queue.Queue() ### 制御用 Flag self.create_text_flag = threading.Event() self.create_image_flag = threading.Event() ### 結果取得用Queue self.create_text_result = Queue.Queue() self.create_image_result = Queue.Queue() ### 処理用Queue にID追加 for idx in xrange(count) : self.create_text_queue.put(idx + 1) ### Thread 開始 for i in xrange(thread_count) : pt = threading.Thread(target=self._createText_t) pt.daemon = True pt.start() pi = threading.Thread(target=self._createImage_t) pi.daemon = True pi.start() # NOTE: フラグを立てることで、Queue が空になった時点で Thread 終了。 self.create_text_flag.set() ### Thread 終了待ち wait_time = time.time() create_text_threads_done = 0 create_image_threads_done = 0 while create_image_threads_done < thread_count : if self.shutdown.is_set() : return False if time.time() - wait_time > 2 : print "Processed {0}/{1}".format(count - self.create_text_queue.qsize(), count) wait_time = time.time() ### Text 作成終わったら Image作成処理 Thread フラグ立てる ### → Queue が空になったら Thread 終了 if not self.create_image_flag.is_set() and create_text_threads_done == thread_count : self.create_image_flag.set() ### Text 作成結果 Queue 格納時 ( = 1 thread 完了 ) while not self.create_text_result.empty() : process_count = self.create_text_result.get() create_text_threads_done += 1 ### Write 作成結果 Queue 格納時 ( = 1 thread 完了 ) while not self.create_image_result.empty() : process_count = self.create_image_result.get() create_image_threads_done += 1 try : time.sleep(0.2) except KeyboardInterrupt : self.shutdown.set() self.shutdown.set() return True def _createText_t(self) : process_count = 0 while not self.create_text_flag.is_set() or not self.create_text_queue.empty() : if self.shutdown.is_set() : return text_id = None try : text_id = self.create_text_queue.get(True, 0.05) except Queue.Empty : continue ### Create text file filepath = self._createText(text_id) self.create_image_queue.put(filepath) process_count += 1 self.create_text_result.put(process_count) return True def _createImage_t(self) : process_count = 0 while not self.create_image_flag.is_set() or not self.create_image_queue.empty() : if self.shutdown.is_set() : return filepath = None try : filepath = self.create_image_queue.get(True, 0.05) except Queue.Empty : continue ### Create image self._createImage(filepath) process_count += 1 self.create_image_result.put(process_count) return True
速度比較
- 100枚、1,000枚、10,000枚、100,000枚で実行
$ time python 01.py 100 real 0m0.260s user 0m0.243s sys 0m0.016s $ time python 01.py 1000 real 0m2.410s user 0m2.312s sys 0m0.099s $ time python 01.py 10000 real 0m24.124s user 0m23.176s sys 0m0.935s $ time python 01.py 100000 real 4m2.286s user 3m51.366s sys 0m8.095s $ time python 02.py 100 real 0m0.835s user 0m0.452s sys 0m0.048s $ time python 02.py 1000 real 0m1.455s user 0m4.338s sys 0m0.320s $ time python 02.py 10000 real 0m11.129s user 0m44.010s sys 0m3.043s $ time python 02.py 100000 real 1m46.127s user 7m11.919s sys 0m28.289s
- real time だけグラフ化すると…
備考
- NVIDIA DiGiTS のソースコードを読んでいて、データインポート処理にThreading使っていたので、参考にしました。
所感
- 簡単なマルチスレッド処理なら Pool を使う方が良いかも。
- 細かなスレッド処理を構築するなら、こっちですね。もっと工夫して色々できそう。