Intro
현재 회사에서 만들어진 DAG 들을 보니 다른 서비스에 만들어진 API 요청 후 그 API 에 대해서 status code 를 확인하고 결과를 반환하는 Task 들이 너무 많았다. 예전에도 이러한 코드를 단순화시키기 위해 GetOperator, PostOperator 를 만들어 본 경험이 있었다.
예전에는 Operator 한 개에 여러 개의 기능을 담으려 하다 보니 난관에 봉착했었는데 Airflow 개발철학이 1개의 Task에 1개의 기능만 있는 것이라고 한다. 그래서 다양한 종류의 Operator 가 많은 거지 않을까 싶다.
이번엔 Airflow Operator 에서 기본으로 제공하는 SimpleHttpOperator 가 있어서 사용해보았다. 내가 만든 GetOperator, PostOperator 의 기능 + response 에 대한 처리에 대한 기능. 이전에 이 response 처리 기능 구현을 어떻게 해야 할 지 몰라서 아쉬웠었는데 이번 기회에 내부 구현이 어떻게 되어있나 뜯어보고자 한다.
How To Use
1. 먼저 import 해보자
from airflow.providers.http.operators.http import SimpleHttpOperator
2. Argument 파악
** 기본적으로 airflow 의 모든 operator는 baseoperator를 상속받아 만들어진다.
endpoint: Optional[str] = None
method: str = 'POST'
data: Any = None
headers: Optional[Dict[str, str]] = None
response_check: Optional[Callable[..., bool]] = None
response_filter: Optional[Callable[..., Any]] = None
extra_options: Optional[Dict[str, Any]] = None
http_conn_id: str = 'http_default'
log_response: bool = False
auth_type: Type[AuthBase] = HTTPBasicAuth
✔️ http_conn_id 에는 connection id 를 넣어주면 된다.
connection type은 HTTP 이며, Host에 해당 API 를 넣어주면 된다.
✔️ response_check: lambda function 을 사용하거나 정해진 함수를 실행하는데 결과값을 True or False 로 반환되게 해야 한다.
ex) lambda response: response.status_code == 200
✔️ response_filter: response 에 대한 처리를 해준다. lambda function 을 사용하거나 정해진 함수를 실행한고 실행된 값을 return 한다.
3. Example
기존 TaskFlowAPI
@task(task_id='test')
def request_to_preprocess_data():
res = requests.get(endpoint, data=data, headers=headers)
if res["status_code"] != 200:
raise BrokenPipeError('retry should be started')
return res.text
SimpleHttpOperator
SimpleHttpOperator(
task_id='test',
http_conn_id='api',
endpoint='/endpoint',
method='GET',
data=data,
response_check=lambda response: response.status_code == 200
)
Operator 로 통합하니 코드가 훨씬 깔끔해졌다!
📌 주의
SimpleHttpOperator 의 경우 기본 method 는 'Post' 다.
코드를 보면
method 가 'GET' 일 경우 SimpleHttpOperator의 인자로 넘겨준 data 가 자동으로 parameter 로 들어가게 되지만,
'POST' 의 경우 data 는 request body로 들어가게 되고 parameter 를 받는 부분이 따로 없다.
method가 POST 인데 data 인자에 parameter 로 넘겨줄 부분을 적어주게 되면
422 Unprocessable Entity
API 를 FastAPI 로 구현했다면 다음과 같은 에러를 맞닥뜨릴 수 있다.
따러서 request 의 method가 POST 이고 parameter 를 넘기고 싶다면 endpoint 에 직접 parameter 를 적어주도록 하자.
endpoint='/endpoint?type=1&date=2022-12-22'
'Airflow' 카테고리의 다른 글
[Airflow] Connection 정보 Vault 이관 작업(feat.Plugin) | LIM (0) | 2023.01.13 |
---|---|
[Airflow] Custom UI for Airflow | LIM (0) | 2022.12.29 |
Airflow 특정 Task 재실행 | LIM (0) | 2022.12.07 |
[Airflow] SFTPToGCSOperator 개념 및 사용방법 | LIM (0) | 2022.11.12 |
[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM (0) | 2022.07.19 |
댓글