python で並行処理と並列処理のメモ

2022年9月15日

concurrent.futures を使ってみたので試したことを雑に書き残しておこうと思います。

参考

逐次処理

ワーカーは一つの処理を完了してから次の処理を実行する。

並行処理

処理に待ち時間が発生している際に他の処理を実行する。

どういうときに向いているか?
   → 待ち時間が長い処理。(I/O バウンドな処理)
     → 例: WebAPI の利用、ディスクへの書き込み。

並列処理

複数の処理を同時に実行する。

どういうときに向いているか?
   → CPU リソースを大量に消費するような処理。(CPU バウンドな処理)
     → 例: 大量の数値計算。

インポート

ThreadPoolExecutor → 並行処理
ProcessPoolExecutor → 並列処理

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

並行処理を試してみる

「executor.submit()」の引数に並列で処理したい function を渡せば良いとの事。

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func_1():
    for i in range(10):
        time.sleep(0.2)
        print(f'func_1: {i}')

def func_2():
    for i in range(10):
        time.sleep(0.5)
        print(f'func_2: {i}')

def main():
    with ThreadPoolExecutor() as executor:
        executor.submit(func_1)
        executor.submit(func_2)

if __name__ == '__main__':
    main()

実行してみると func_1 と func_2 は、まざって print されているよう。

func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9

それぞれの呼ばれた function のプロセス ID を os.getpid() で確認してみる。

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func_1():
    print(os.getpid())
    for i in range(10):
        time.sleep(0.2)
        print(f'func_1: {i}')

def func_2():
    print(os.getpid())
    for i in range(10):
        time.sleep(0.5)
        print(f'func_2: {i}')

def main():
    with ThreadPoolExecutor() as executor:
        executor.submit(func_1)
        executor.submit(func_2)

if __name__ == '__main__':
    main()

実行してみると、プロセス ID は同じになっている。

70
70
func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9

並列処理を試してみる

並列処理にするには、ThreadPoolExecutor を ProcessPoolExecutor に変更するだけで済むよう。
これを書き換えてもう一度実行してみるとプロセス ID が違っているのが分かる。

74
75
func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9

return を取得する

コードを下の通りに書き換えて func_1 func_2 から返り値を取得してプリントするコードは下記の通り。

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func_1():
    pid_1 = os.getpid()
    for i in range(10):
        time.sleep(0.2)
        print(f'func_1: {i}')
    return pid_1

def func_2():
    pid_2 = os.getpid()
    for i in range(10):
        time.sleep(0.5)
        print(f'func_2: {i}')
    return pid_2

def main():
    with ThreadPoolExecutor() as executor:
        feature_1 = executor.submit(func_1)
        feature_2 = executor.submit(func_2)
    print(feature_1.result())
    print(feature_2.result())

if __name__ == '__main__':
    main()

実行してみると return したプロセス ID がプリントされているが、func_1 と func_2 で返した ID が別になっているのがわかる。

func_1: 0
func_2: 0
func_1: 1
func_1: 2
func_2: 1
func_1: 3
func_1: 4
func_2: 2
func_1: 5
func_1: 6
func_2: 3
func_1: 7
func_1: 8
func_2: 4
func_1: 9
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9
100
101

引数の分だけ実行する

map を使って第二引数に配列を渡すとその要素数の分だけ処理をしてくれるよう。

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func_1(alphabet):
    print(f'START_{alphabet}_PROCESS_ID: {os.getpid()}')
    for _ in range(10):
        time.sleep(0.2)

def main():
    with ProcessPoolExecutor() as executor:
        executor.map(func_1, ['A', 'B', 'C', 'D'])

if __name__ == '__main__':
    main()

実行結果は次の通りで、4 つのプロセスがプリントされているのがわかる。

START_A_PROCESS_ID: 261
START_B_PROCESS_ID: 262
START_C_PROCESS_ID: 263
START_D_PROCESS_ID: 264

2022年9月15日