본문 바로가기
Language/Python

[Python] Ray 를 활용한 병렬처리 | LIM

by forestlim 2023. 1. 21.
728x90
반응형

기존에 병렬처리에 대해 학습하고 Python의 기본 라이브러리인 Multiprocessing Module을 이용하여 시간을 많이 단축했던 적이 있다.

https://amazelimi.tistory.com/50

 

[Python] MultiProcessing map() vs imap() | LIM

Intro 빅데이터를 다루다보면 계산을 parallel 하게 진행해야할 필요성이 생기게 된다. 파이썬에서는 multiprocessing 모듈을 사용하여 진행할 수 있다. 파이썬의 multiprocessing 모듈 내에는 Process, Pool, map,

amazelimi.tistory.com

 

Multiprocessing 보다 더 빠르고 간단하게 병렬처리를 구현할 수 있는 라이브러리가 있다고 하여 공부해보았다. 

https://docs.ray.io/en/latest/

목차

1. Ray 를 사용하게 된 계기


2. Ray 의 사용방법


3. Ray 동작방식 및 원리


4. Ray 의 구성요소


5. MultiProcessing vs Ray 실험

 

🧐 Ray 를 사용하게 된 계기

현재 회사 내에서 날씨 데이터를 받아서 처리하는 작업을 진행하고 있다. 데이터가 너무너무 많이 들어온다..

위도, 경도, point index에 따라 다른 데이터를 모두 받고 적재하면서 처리하고 있다. 사실 평소엔 10분마다 한 번씩 쌓인 파일들을 처리하고 있기 때문에 처리해야 하는 파일 양이 많진 않다. 그러나 기존에 쌓인 몇일의 파일들을 한 번에 변환해서 저장해야 하는 작업을 해야 했고 무조건 병렬처리를 써야겠다고 생각했다. 

 

처음엔 그냥 MultiProcessing 을 이용해서 작업하면 되겠다고 생각했다. 혹시 또 다른 방법이 있을까 하여 Python 병렬처리로 검색해 보았고, multiprocessing 외에 dask, ray 가 있다는 것을 알게 되었다. 그 중 ray 가 기존 코드를 거의 변형하지 않고 적용할 수 있고 빠르다고 해서 써보게 되었다.

 

기존 코드에 적용하기 너무 간편해서 써 본 결과 대만족이었다..

 

 

👍 무엇이 만족스러웠는가

  • 일단 위에서 말한 것처럼 기존 코드에 적용하기가 너무 간편하다
  • 대시보드도 제공해준다
  • 코어가 늘어날수록 처리속도가 압도적으로 좋다.
  • 머신러닝/딥러닝에서 활용하기 좋다.

 

🖥 Ray의 사용방법

설치

  • ray [default] 를 설치해주어야 ray dashboard 를 볼 수 있음
pip3 install ray[default]

모듈 import와 init

import ray

ray.init()

ray.init() 옵션

  • num_cpus -> cpu 개수를 몇 개 할당할 건지
  • dashboard_host -> default: localhost, 외부에서 접속하고 싶은 경우 0.0.0.0 으로 변경해야 함
  • dashboard_port -> default: 8265

다른 옵션들에 대해서도 차차 공부하고 알아봐야겠다.

 

🧐 그렇다면 Ray 동작방식 및 원리는

Ray는 Multiprocessing에서 발생하는 직렬화 오버헤드를 해결하기 위해 Apache Arrow를 사용합니다. Apache Arror는 행기반이 아닌 칼럼 기반의 인메모리 포맷으로 Zero-Copy 직렬화를 수행합니다. 또한 직렬화된 데이터를 인메모리 객체 저장소인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유합니다.

Multiprocessing의 경우 큰 데이터를 다른 프로세스에 전달할 때 pickle을 사용하여 직렬화한 뒤 전달. 하지만, 이 접근 방식을 사용하기 위해서는 모든 프로세스가 데이터에 대한 복사본을 만들어야 하며, 큰 메모리를 할당하고 역직렬화에서 발생하는 큰 오버헤드를 가질 수밖에 없다. 

 

코드를 통해 간단하게 살펴보자.

import time
import multiprocessing as mp
import ray
import os

ray.init(num_cpus=16, dashboard_host='0.0.0.0')

@ray.remote
def my_awesome_foo_ray(n):
    print(os.getpid())
    time.sleep(1)
    return n
    
 if __name__ == '__main__':
    start = time.time()
    tasks = range(30)
    result_ids = []
    [result_ids.append(my_awesome_foo_ray.remote(task)) for task in tasks]
    result = ray.get(result_ids)
    end = time.time()
    print(f'총 걸린시간 - {end-start}')
    print(f'results - {result}')

 

  • 먼저, ray.init()을 통해 ray를 실행해 준다. 그러면 다음과 같이 ray process와 ray dashboard 가 실행됐다는 문구가 뜬다.
    • 2023-01-21 12:46:10,247 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
  • 병렬로 실행되는 함수 위에 @ray. remote를 써준다. 이 함수가 ray의 Task 가 된다.
  • 함수를 호출할 때 remote() 메서드를 통해 호출한다.
  • ray.get()을 활용해 결괏값을 받아온다.

 

💡 Ray의 구성 요소

Task

  • 호출하는 곳과 다른 프로세스에서 실행되는 함수 또는 크래스
  • 함수를 @ray. remote()라는 데코레이터로 감쌌다면 그 함수는 Task이다.
  • remote() 메서드를 사용해서 호출가능하며 ObjectRef라는 값을 반환. ray.get()을 활용하여 Task를 실행하고 값을 반환할 수 있다.

Object

  • Task 를 통해서 반환되거나 ray.put()을 통해 생성되는 값
  • 데이터의 크기가 큰 경우 ray.put()을 활용해 Object 로 만들어 Ray 에서 빠르게 사용할 수 있다.
  • 또한 큰 데이터를 반복적으로 사용하게 된다면 ray.put() 을 활용해 메모리 사용을 줄일 수 있다.
    • ray.put() 은 데이터를 공유 메모리에 저장하여 복사본을 만들지 않고 모든 프로세스에 접근할 수 있다.

Actor

  • stateful 한 워커
  • @ray. remote()로 감싼 파이썬 클래스 인스턴스

 

📝 Non Mulitprocessing vs Multiprocessing vs Ray

그렇다면 과연 Ray의 성능이 얼마나 좋은지 테스트를 해보자.

Non MultiProcessing

def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n


if __name__ == '__main__':
    start = time.time()
    tasks = range(100)
    result = []
    for task in tasks:
        my_awesome_foo(task)
    end = time.time()
    print(f'총 걸린시간 - {end - start}')

 

Use MultiProcessing

def my_awesome_foo(n):
    print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
    time.sleep(1)
    print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
    return n
    
 if __name__ == '__main__':
    start = time.time()
    tasks = range(100)
    pool = mp.Pool(processes=16)
    pool.map(my_awesome_foo, tasks)
    pool.close()
    pool.join()
    end = time.time()
    print(f'총 걸린시간 - {end-start}')

 

Use Ray

@ray.remote
def my_awesome_foo_ray(n):
    print(os.getpid())
    time.sleep(1)
    return n


if __name__ == '__main__':
    start = time.time()
    tasks = range(100)
    result_ids = []
    [result_ids.append(my_awesome_foo_ray.remote(task)) for task in tasks]
    result = ray.get(result_ids)
    end = time.time()
    print(f'총 걸린시간 - {end-start}')
    print(f'results - {result}')

멀티프로세싱을 사용하지 않는 경우 CPU 개수와 상관없이 100s 이기 때문에 결과에서 생략함

 

1) CPU 2개

더보기

MultiProcessing: 52.094s

Ray: 50.440s

 

차이: 대략 2초

 

2) CPU 4개

더보기

MultiProcessing: 28.076s

Ray: 25.308s

 

차이: 대략 3초

 

3) CPU 8개

더보기

MultiProcessing: 16.086s

Ray: 13.238s

 

차이: 대략 3초

 

4) CPU 16개

더보기

MultiProcessing: 8.121s

Ray: 7.245s

 

차이: 대략 1초

 


단순 time.sleep() 으로는 Ray의 진면모를 알아볼 수 없었다. 좀 더 복잡한 연산을 Ray를 사용해 봐야겠다!

728x90
반응형

댓글