1. Python Multiprocessing에 대한 간단한 설명
2. Pool.map() 과 Pool.imap() 의 차이
3. Pool.imap_unordered()
🖥 Python MultiProcessing
멀티 프로세싱을 활용하면 여러 작업을 별도의 프로세스를 생성 후 병렬처리해서 더 빠르게 결과를 얻어낼 수 있다. 다만 멀티 프로세싱은 메모리 사용률이 높아진다는 단점이 존재한다.
멀티 프로세싱과 멀티 스레드 두 가지 방법이 존재하는데, 파이썬에서는 GIL(Global Interpreter Lock) 정책 때문에 cpu 위주의 작업에서는 멀티 스레딩은 오히려 성능을 떨어뜨린다.
즉 정리하자면,
쓰레드는 가볍지만 파이썬의 GIL정책으로 인해 I/O 처리를 하는 경우에만 주로 효과적이고
프로세스는 각자가 고유한 메모리 영역을 가지기 때문에 처음 프로세스를 만들 때 시간이 조금 필요하고 더 많은 메모리를 필요로 하지만 병렬적으로 cpu작업을 할 수 있어서 빠르다
Multithreading vs. Multiprocessing — Image by author
🛠 MultiProcessing 을 사용했을 때와 사용하지 않았을 때
사용하지 않았을 때
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(30)
result = []
for task in tasks:
my_awesome_foo(task)
end = time.time()
print(f'총 걸린시간 - {end-start}')
Process 4개 사용했을 때
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(30)
pool = mp.Pool(processes=4)
pool.map(my_awesome_foo, tasks)
pool.close()
pool.join()
end = time.time()
print(f'총 걸린시간 - {end-start}')
📌 Pool.map()
위에서는 MultiProcess 를 사용할 때 Multiprocess.Pool.map() 을 사용하였다. 하지만 예를 들어 이런 코드가 있다고 가정해보자
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
if n == 0:
1 / 0
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(30)
pool = mp.Pool(processes=4)
pool.map(my_awesome_foo, tasks)
pool.close()
pool.join()
end = time.time()
print(f'총 걸린시간 - {end-start}')
처음부터 에러가 발생했는데 다른 모든 작업도 처리되고 마지막엔 결과도 받지 못하는 끔찍한 일이 벌어지게 된다..
위 예제는 간단한 연산이니 문제가 없지만 몇시간 걸리는 코드를 실행시켜놓고 몇시간 뒤에 와서 확인했을 때 저렇게 에러가 나있으면 맘이 착잡할 것 같다.
Python에 MulitiProcessing.Pool.map() 의 경우에는 프로세스가 연산을 모두 처리하고 한번에 결과를 보여주기 때문에 한 프로세스에서 에러가 나더라고 exception handling 이 쉽지 않고 바로 캐치해낼 수 없다.
이러한 Exception Handling 을 하기 위해서 Pool.imap() 을 사용해보았다.
📌 Pool.imap()
Multiprocessing.Pool.imap() 또한 Pool.map() 과 동일하게 여러 프로세스를 이용하여 병렬 연산을 처리할 때 사용되는 건 동일하다.
차이가 있다면 Pool.imap()은 Pool.map()의 lazy parallel map function 으로 표현되는데 그 이유는
-> imap() function은 한 번에 하나의 작업을 Process Pool 에 넘기고, map() 함수는 한 번에 모든 작업을 Process Pool 에 넘긴다.
-> imap() function은 반환 값을 반복할 때 각 작업이 완료될 때까지 차단되고 map() 함수는 반환 값을 반복할 때 모든 작업이 완료될때까지 차단된다.
즉 정리하면 다음과 같다.
imap() 함수는 작업을 하나씩 실행하고, 사용 가능한 순서대로 작업 결과를 처리하는데 사용해야 한다.
map() 함수는 모든 작업을 한 번에 Pool 에 넘기고 넘긴 작업이 완료된 후에만 결과를 순서대로 처리하는데 사용해야 한다.
코드를 통해 보면 더 이해가 쉽다
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
if n == 0:
1 / 0
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
tasks = range(30)
start = time.time()
pool = mp.Pool(processes=4)
result = list(pool.imap(my_awesome_foo, tasks))
end = time.time()
print(f'총 걸린시간 - {end-start}')
에러가 나더라도 다른 작업을 실행하는 map() 과는 다르게 imap()은 에러가 발생 시 바로 작업이 멈추고 exception 이 발생하게 된다.
하지만 에러가 발생하더라도 결과값을 받고 싶은 경우가 있을 수 있는데 그때는 다음과 같이 실행될때마다 결과값을 저장하면 된다.
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
if n == 0:
1 / 0
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
result = []
tasks = range(30)
start = time.time()
pool = mp.Pool(processes=4)
iterator = pool.imap(my_awesome_foo, tasks)
while True:
try:
result.append(next(iterator))
except StopIteration:
break
except Exception as e:
result.append(e)
end = time.time()
print(f'result - {result}')
print(f'총 걸린시간 - {end-start}')
📌 Pool.imap_unordered
Pool.imap_unordered() 도 Pool.imap() 과 거의 동일하다. Pool.map() 과는 다르게 작업을 하나씩 실행하고, 사용 가능한 순서대로 작업 결과를 처리한다.
하지만 다른점이 있다면 Pool.imap() 같은 경우 Process에 작업이 들어가고 끝나는 순서가 일정하지만, Pool.imap_unordered() 의 경우 빨리 끝나는 순서대로 처리해서 넘겨준다.
time.sleep()을 짝수인 경우 0, 홀수인 경우 1초 sleep 하도록 설정했다.
import time
import multiprocessing as mp
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
if n == 0:
1 / 0
time.sleep(n % 2)
# print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
result = []
tasks = range(30)
start = time.time()
pool = mp.Pool(processes=4)
iterator = pool.imap(my_awesome_foo, tasks)
while True:
try:
result.append(next(iterator))
except StopIteration:
break
except Exception as e:
result.append(e)
end = time.time()
print(f'result - {result}')
print(f'총 걸린시간 - {end-start}')
Pool.imap() 의 경우 결과값은 이러하다. 순서대로 result에 결과값이 쌓이는 걸 확인할 수 있다.
동일한 코드를 Pool.imap_unordered() 를 적용했을 때 다음과 같다. 순서대로 쌓이지 않고 처리되는 대로 결과값에 쌓이는 것을 볼 수 있다.
📙 참고
https://superfastpython.com/multiprocessing-pool-imap/
'Language > Python' 카테고리의 다른 글
[Python] 일급객체(First Class Object)란? | LIM (0) | 2022.12.11 |
---|---|
[Python] Super() 기초 개념 및 활용 예제 | LIM (0) | 2022.11.08 |
[Python] 코루틴으로 짜여있지 않은 함수 비동기적으로 이용하기(feat.run_in_executor) | LIM (0) | 2022.07.14 |
[Python] 비동기 프로그래밍 | LIM (0) | 2022.07.09 |
[Python] functools 의 partial | LIM (0) | 2022.07.09 |
댓글