기존에 병렬처리에 대해 학습하고 Python의 기본 라이브러리인 Multiprocessing Module을 이용하여 시간을 많이 단축했던 적이 있다.
https://amazelimi.tistory.com/50
Multiprocessing 보다 더 빠르고 간단하게 병렬처리를 구현할 수 있는 라이브러리가 있다고 하여 공부해보았다.
목차
🧐 Ray 를 사용하게 된 계기
현재 회사 내에서 날씨 데이터를 받아서 처리하는 작업을 진행하고 있다. 데이터가 너무너무 많이 들어온다..
위도, 경도, point index에 따라 다른 데이터를 모두 받고 적재하면서 처리하고 있다. 사실 평소엔 10분마다 한 번씩 쌓인 파일들을 처리하고 있기 때문에 처리해야 하는 파일 양이 많진 않다. 그러나 기존에 쌓인 몇일의 파일들을 한 번에 변환해서 저장해야 하는 작업을 해야 했고 무조건 병렬처리를 써야겠다고 생각했다.
처음엔 그냥 MultiProcessing 을 이용해서 작업하면 되겠다고 생각했다. 혹시 또 다른 방법이 있을까 하여 Python 병렬처리로 검색해 보았고, multiprocessing 외에 dask, ray 가 있다는 것을 알게 되었다. 그 중 ray 가 기존 코드를 거의 변형하지 않고 적용할 수 있고 빠르다고 해서 써보게 되었다.
기존 코드에 적용하기 너무 간편해서 써 본 결과 대만족이었다..
👍 무엇이 만족스러웠는가
- 일단 위에서 말한 것처럼 기존 코드에 적용하기가 너무 간편하다
- 대시보드도 제공해준다
- 코어가 늘어날수록 처리속도가 압도적으로 좋다.
- 머신러닝/딥러닝에서 활용하기 좋다.
🖥 Ray의 사용방법
설치
- ray [default] 를 설치해주어야 ray dashboard 를 볼 수 있음
pip3 install ray[default]
모듈 import와 init
import ray
ray.init()
ray.init() 옵션
- num_cpus -> cpu 개수를 몇 개 할당할 건지
- dashboard_host -> default: localhost, 외부에서 접속하고 싶은 경우 0.0.0.0 으로 변경해야 함
- dashboard_port -> default: 8265
다른 옵션들에 대해서도 차차 공부하고 알아봐야겠다.
🧐 그렇다면 Ray 동작방식 및 원리는
Ray는 Multiprocessing에서 발생하는 직렬화 오버헤드를 해결하기 위해 Apache Arrow를 사용합니다. Apache Arror는 행기반이 아닌 칼럼 기반의 인메모리 포맷으로 Zero-Copy 직렬화를 수행합니다. 또한 직렬화된 데이터를 인메모리 객체 저장소인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유합니다.
Multiprocessing의 경우 큰 데이터를 다른 프로세스에 전달할 때 pickle을 사용하여 직렬화한 뒤 전달. 하지만, 이 접근 방식을 사용하기 위해서는 모든 프로세스가 데이터에 대한 복사본을 만들어야 하며, 큰 메모리를 할당하고 역직렬화에서 발생하는 큰 오버헤드를 가질 수밖에 없다.
코드를 통해 간단하게 살펴보자.
import time
import multiprocessing as mp
import ray
import os
ray.init(num_cpus=16, dashboard_host='0.0.0.0')
@ray.remote
def my_awesome_foo_ray(n):
print(os.getpid())
time.sleep(1)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(30)
result_ids = []
[result_ids.append(my_awesome_foo_ray.remote(task)) for task in tasks]
result = ray.get(result_ids)
end = time.time()
print(f'총 걸린시간 - {end-start}')
print(f'results - {result}')
- 먼저, ray.init()을 통해 ray를 실행해 준다. 그러면 다음과 같이 ray process와 ray dashboard 가 실행됐다는 문구가 뜬다.
- 2023-01-21 12:46:10,247 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
- 병렬로 실행되는 함수 위에 @ray. remote를 써준다. 이 함수가 ray의 Task 가 된다.
- 함수를 호출할 때 remote() 메서드를 통해 호출한다.
- ray.get()을 활용해 결괏값을 받아온다.
💡 Ray의 구성 요소
Task
- 호출하는 곳과 다른 프로세스에서 실행되는 함수 또는 크래스
- 함수를 @ray. remote()라는 데코레이터로 감쌌다면 그 함수는 Task이다.
- remote() 메서드를 사용해서 호출가능하며 ObjectRef라는 값을 반환. ray.get()을 활용하여 Task를 실행하고 값을 반환할 수 있다.
Object
- Task 를 통해서 반환되거나 ray.put()을 통해 생성되는 값
- 데이터의 크기가 큰 경우 ray.put()을 활용해 Object 로 만들어 Ray 에서 빠르게 사용할 수 있다.
- 또한 큰 데이터를 반복적으로 사용하게 된다면 ray.put() 을 활용해 메모리 사용을 줄일 수 있다.
- ray.put() 은 데이터를 공유 메모리에 저장하여 복사본을 만들지 않고 모든 프로세스에 접근할 수 있다.
Actor
- stateful 한 워커
- @ray. remote()로 감싼 파이썬 클래스 인스턴스
📝 Non Mulitprocessing vs Multiprocessing vs Ray
그렇다면 과연 Ray의 성능이 얼마나 좋은지 테스트를 해보자.
Non MultiProcessing
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(100)
result = []
for task in tasks:
my_awesome_foo(task)
end = time.time()
print(f'총 걸린시간 - {end - start}')
Use MultiProcessing
def my_awesome_foo(n):
print(f'Process {mp.current_process().name} started working on task {n}', flush=True)
time.sleep(1)
print(f'Process {mp.current_process().name} ended working on task {n}', flush=True)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(100)
pool = mp.Pool(processes=16)
pool.map(my_awesome_foo, tasks)
pool.close()
pool.join()
end = time.time()
print(f'총 걸린시간 - {end-start}')
Use Ray
@ray.remote
def my_awesome_foo_ray(n):
print(os.getpid())
time.sleep(1)
return n
if __name__ == '__main__':
start = time.time()
tasks = range(100)
result_ids = []
[result_ids.append(my_awesome_foo_ray.remote(task)) for task in tasks]
result = ray.get(result_ids)
end = time.time()
print(f'총 걸린시간 - {end-start}')
print(f'results - {result}')
멀티프로세싱을 사용하지 않는 경우 CPU 개수와 상관없이 100s 이기 때문에 결과에서 생략함
1) CPU 2개
MultiProcessing: 52.094s
Ray: 50.440s
차이: 대략 2초
2) CPU 4개
MultiProcessing: 28.076s
Ray: 25.308s
차이: 대략 3초
3) CPU 8개
MultiProcessing: 16.086s
Ray: 13.238s
차이: 대략 3초
4) CPU 16개
MultiProcessing: 8.121s
Ray: 7.245s
차이: 대략 1초
단순 time.sleep() 으로는 Ray의 진면모를 알아볼 수 없었다. 좀 더 복잡한 연산을 Ray를 사용해 봐야겠다!
'Language > Python' 카테고리의 다른 글
[Python] Pipenv 설치 및 가상환경 관리(Mac 기준) | LIM (0) | 2023.05.12 |
---|---|
[Python] csv 모듈을 이용하여 파일 읽고, 쓰기 | LIM (0) | 2023.05.10 |
[Python] Module dependcy 새로운 관리 툴 Poetry | LIM (0) | 2022.12.17 |
[Python] Staticmethod vs Classmethod | LIM (0) | 2022.12.16 |
[Python] 일급객체(First Class Object)란? | LIM (0) | 2022.12.11 |
댓글