python で並行処理と並列処理のメモ
concurrent.futures を使ってみたので試したことを雑に書き残しておこうと思います。
参考
- Python の並行処理を理解したい [マルチスレッド編]
※ 逐次処理、並行処理、並列処理については、図解されているので参考の記事を見た方が良さそうです。
逐次処理
ワーカーは一つの処理を完了してから次の処理を実行する。
並行処理
処理に待ち時間が発生している際に他の処理を実行する。
どういうときに向いているか?
→ 待ち時間が長い処理。(I/O バウンドな処理)
→ 例: WebAPI の利用、ディスクへの書き込み。
並列処理
複数の処理を同時に実行する。
どういうときに向いているか?
→ CPU リソースを大量に消費するような処理。(CPU バウンドな処理)
→ 例: 大量の数値計算。
インポート
ThreadPoolExecutor → 並行処理
ProcessPoolExecutor → 並列処理
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
並行処理を試してみる
「executor.submit()」の引数に並列で処理したい function を渡せば良いとの事。
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func_1():
for i in range(10):
time.sleep(0.2)
print(f'func_1: {i}')
def func_2():
for i in range(10):
time.sleep(0.5)
print(f'func_2: {i}')
def main():
with ThreadPoolExecutor() as executor:
executor.submit(func_1)
executor.submit(func_2)
if __name__ == '__main__':
main()
実行してみると func_1 と func_2 は、まざって print されているよう。
func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9
それぞれの呼ばれた function のプロセス ID を os.getpid() で確認してみる。
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func_1():
print(os.getpid())
for i in range(10):
time.sleep(0.2)
print(f'func_1: {i}')
def func_2():
print(os.getpid())
for i in range(10):
time.sleep(0.5)
print(f'func_2: {i}')
def main():
with ThreadPoolExecutor() as executor:
executor.submit(func_1)
executor.submit(func_2)
if __name__ == '__main__':
main()
実行してみると、プロセス ID は同じになっている。
70
70
func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9
並列処理を試してみる
並列処理にするには、ThreadPoolExecutor を ProcessPoolExecutor に変更するだけで済むよう。
これを書き換えてもう一度実行してみるとプロセス ID が違っているのが分かる。
74
75
func_1: 0
func_1: 1
func_2: 0
func_1: 2
func_1: 3
func_2: 1
func_1: 4
func_1: 5
func_1: 6
func_2: 2
func_1: 7
func_1: 8
func_2: 3
func_1: 9
func_2: 4
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9
return を取得する
コードを下の通りに書き換えて func_1 func_2 から返り値を取得してプリントするコードは下記の通り。
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func_1():
pid_1 = os.getpid()
for i in range(10):
time.sleep(0.2)
print(f'func_1: {i}')
return pid_1
def func_2():
pid_2 = os.getpid()
for i in range(10):
time.sleep(0.5)
print(f'func_2: {i}')
return pid_2
def main():
with ThreadPoolExecutor() as executor:
feature_1 = executor.submit(func_1)
feature_2 = executor.submit(func_2)
print(feature_1.result())
print(feature_2.result())
if __name__ == '__main__':
main()
実行してみると return したプロセス ID がプリントされているが、func_1 と func_2 で返した ID が別になっているのがわかる。
func_1: 0
func_2: 0
func_1: 1
func_1: 2
func_2: 1
func_1: 3
func_1: 4
func_2: 2
func_1: 5
func_1: 6
func_2: 3
func_1: 7
func_1: 8
func_2: 4
func_1: 9
func_2: 5
func_2: 6
func_2: 7
func_2: 8
func_2: 9
100
101
引数の分だけ実行する
map を使って第二引数に配列を渡すとその要素数の分だけ処理をしてくれるよう。
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func_1(alphabet):
print(f'START_{alphabet}_PROCESS_ID: {os.getpid()}')
for _ in range(10):
time.sleep(0.2)
def main():
with ProcessPoolExecutor() as executor:
executor.map(func_1, ['A', 'B', 'C', 'D'])
if __name__ == '__main__':
main()
実行結果は次の通りで、4 つのプロセスがプリントされているのがわかる。
START_A_PROCESS_ID: 261
START_B_PROCESS_ID: 262
START_C_PROCESS_ID: 263
START_D_PROCESS_ID: 264
ディスカッション
コメント一覧
まだ、コメントがありません