본문 바로가기
Language/Python

[Python] MultiProcessing map() vs imap() | LIM

by forestlim 2022. 9. 19.
728x90
반응형

1. Python Multiprocessing에 대한 간단한 설명
2. Pool.map() 과 Pool.imap() 의 차이
3. Pool.imap_unordered()

 

🖥 Python MultiProcessing

멀티 프로세싱을 활용하면 여러 작업을 별도의 프로세스를 생성 후 병렬처리해서 더 빠르게 결과를 얻어낼 수 있다. 다만 멀티 프로세싱은 메모리 사용률이 높아진다는 단점이 존재한다

멀티 프로세싱과 멀티 스레드 두 가지 방법이 존재하는데, 파이썬에서는 GIL(Global Interpreter Lock) 정책 때문에 cpu 위주의 작업에서는 멀티 스레딩은 오히려 성능을 떨어뜨린다. 

 

즉 정리하자면, 

쓰레드는 가볍지만 파이썬의 GIL정책으로 인해 I/O 처리를 하는 경우에만 주로 효과적이고

프로세스는 각자가 고유한 메모리 영역을 가지기 때문에 처음 프로세스를 만들 때 시간이 조금 필요하고 더 많은 메모리를 필요로 하지만 병렬적으로 cpu작업을 할 수 있어서 빠르다

 

Multithreading vs. Multiprocessing — Image by author

 

🛠 MultiProcessing 을 사용했을 때와 사용하지 않았을 때

사용하지 않았을 때

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    start = time.time()
    tasks = range(30)
    result = []
    for task in tasks:
        my_awesome_foo(task)
    end = time.time()
    print(f'총 걸린시간 - {end-start}')

 

Process 4개 사용했을 때

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    start = time.time()
    tasks = range(30)
    pool = mp.Pool(processes=4)
    pool.map(my_awesome_foo, tasks)
    pool.close()
    pool.join()
    end = time.time()
    print(f'총 걸린시간 - {end-start}')

 

 

📌 Pool.map()

위에서는 MultiProcess 를 사용할 때 Multiprocess.Pool.map() 을 사용하였다. 하지만 예를 들어 이런 코드가 있다고 가정해보자

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    if n == 0:
        1 / 0
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    start = time.time()
    tasks = range(30)
    pool = mp.Pool(processes=4)
    pool.map(my_awesome_foo, tasks)
    pool.close()
    pool.join()
    end = time.time()
    print(f'총 걸린시간 - {end-start}')

처음부터 에러가 발생했는데 다른 모든 작업도 처리되고 마지막엔 결과도 받지 못하는 끔찍한 일이 벌어지게 된다..

위 예제는 간단한 연산이니 문제가 없지만 몇시간 걸리는 코드를 실행시켜놓고 몇시간 뒤에 와서 확인했을 때 저렇게 에러가 나있으면 맘이 착잡할 것 같다.

 

Python에 MulitiProcessing.Pool.map() 의 경우에는 프로세스가 연산을 모두 처리하고 한번에 결과를 보여주기 때문에 한 프로세스에서 에러가 나더라고 exception handling 이 쉽지 않고 바로 캐치해낼 수 없다. 

 

이러한 Exception Handling 을 하기 위해서 Pool.imap() 을 사용해보았다. 

 

 

📌 Pool.imap()

Multiprocessing.Pool.imap() 또한 Pool.map() 과 동일하게 여러 프로세스를 이용하여 병렬 연산을 처리할 때 사용되는 건 동일하다. 

차이가 있다면 Pool.imap()은 Pool.map()의 lazy parallel map function 으로 표현되는데 그 이유는 

 

  -> imap() function은 한 번에 하나의 작업을 Process Pool 에 넘기고, map() 함수는 한 번에 모든 작업을 Process Pool 에 넘긴다.

  -> imap() function은 반환 값을 반복할 때 각 작업이 완료될 때까지 차단되고 map() 함수는 반환 값을 반복할 때 모든 작업이 완료될때까지 차단된다. 

 

즉 정리하면 다음과 같다. 

imap() 함수는 작업을 하나씩 실행하고, 사용 가능한 순서대로 작업 결과를 처리하는데 사용해야 한다.

map() 함수는  모든 작업을 한 번에 Pool 에 넘기고 넘긴 작업이 완료된 후에만 결과를 순서대로 처리하는데 사용해야 한다.

 

코드를 통해 보면 더 이해가 쉽다

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    if n == 0:
        1 / 0
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    tasks = range(30)
    start = time.time()
    pool = mp.Pool(processes=4)
    result = list(pool.imap(my_awesome_foo, tasks))
    end = time.time()
    print(f'총 걸린시간 - {end-start}')

에러가 나더라도 다른 작업을 실행하는 map() 과는 다르게 imap()은 에러가 발생 시 바로 작업이 멈추고 exception 이 발생하게 된다

 

하지만 에러가 발생하더라도 결과값을 받고 싶은 경우가 있을 수 있는데 그때는 다음과 같이 실행될때마다 결과값을 저장하면 된다.

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    if n == 0:
        1 / 0
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    result = []
    tasks = range(30)
    start = time.time()
    pool = mp.Pool(processes=4)
    iterator = pool.imap(my_awesome_foo, tasks)
    while True:
        try:
            result.append(next(iterator))
        except StopIteration:
            break
        except Exception as e:
            result.append(e)
    end = time.time()
    print(f'result - {result}')
    print(f'총 걸린시간 - {end-start}')

 

📌 Pool.imap_unordered

Pool.imap_unordered() 도 Pool.imap() 과 거의 동일하다. Pool.map() 과는 다르게 작업을 하나씩 실행하고, 사용 가능한 순서대로 작업 결과를 처리한다. 

하지만 다른점이 있다면 Pool.imap() 같은 경우 Process에 작업이 들어가고 끝나는 순서가 일정하지만, Pool.imap_unordered() 의 경우 빨리 끝나는 순서대로 처리해서 넘겨준다. 

 

time.sleep()을 짝수인 경우 0, 홀수인 경우 1초 sleep 하도록 설정했다.

import time
import multiprocessing as mp


def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    if n == 0:
        1 / 0
    time.sleep(n % 2)
    # print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    result = []
    tasks = range(30)
    start = time.time()
    pool = mp.Pool(processes=4)
    iterator = pool.imap(my_awesome_foo, tasks)
    while True:
        try:
            result.append(next(iterator))
        except StopIteration:
            break
        except Exception as e:
            result.append(e)
    end = time.time()
    print(f'result - {result}')
    print(f'총 걸린시간 - {end-start}')

Pool.imap() 의 경우 결과값은 이러하다. 순서대로 result에 결과값이 쌓이는 걸 확인할 수 있다. 

 

동일한 코드를 Pool.imap_unordered() 를 적용했을 때 다음과 같다. 순서대로 쌓이지 않고 처리되는 대로 결과값에 쌓이는 것을 볼 수 있다. 

 


📙 참고

https://superfastpython.com/multiprocessing-pool-imap/

 

Multiprocessing Pool.imap() in Python

You can issue tasks to the process pool one-by-one and execute them in parallel via the imap() function. In this tutorial you will discover how to use the imap() function to issue tasks to the proc…

superfastpython.com

 

728x90
반응형

댓글