본문 바로가기
Airflow

[Operator] SimpleHttpOperator | LIM

by forestlim 2022. 12. 21.
728x90
반응형

Intro

현재 회사에서 만들어진 DAG 들을 보니 다른 서비스에 만들어진 API 요청 후 그 API 에 대해서 status code 를 확인하고 결과를 반환하는 Task 들이 너무 많았다. 예전에도 이러한 코드를 단순화시키기 위해 GetOperator, PostOperator 를 만들어 본 경험이 있었다. 

 

[Airflow] CustomOperator 생성 | LIM

회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같

amazelimi.tistory.com

예전에는 Operator 한 개에 여러 개의 기능을 담으려 하다 보니 난관에 봉착했었는데 Airflow 개발철학이 1개의 Task에 1개의 기능만 있는 것이라고 한다. 그래서 다양한 종류의 Operator 가 많은 거지 않을까 싶다. 

 

이번엔 Airflow Operator 에서 기본으로 제공하는 SimpleHttpOperator 가 있어서 사용해보았다. 내가 만든 GetOperator, PostOperator 의 기능 + response 에 대한 처리에 대한 기능. 이전에 이 response 처리 기능 구현을 어떻게 해야 할 지 몰라서 아쉬웠었는데 이번 기회에 내부 구현이 어떻게 되어있나 뜯어보고자 한다.

 

How To Use

1. 먼저 import 해보자

from airflow.providers.http.operators.http import SimpleHttpOperator

 

2. Argument 파악

** 기본적으로 airflow 의 모든 operator는 baseoperator를 상속받아 만들어진다. 

endpoint: Optional[str] = None
method: str = 'POST'
data: Any = None
headers: Optional[Dict[str, str]] = None
response_check: Optional[Callable[..., bool]] = None
response_filter: Optional[Callable[..., Any]] = None
extra_options: Optional[Dict[str, Any]] = None
http_conn_id: str = 'http_default'
log_response: bool = False
auth_type: Type[AuthBase] = HTTPBasicAuth

✔️ http_conn_id 에는 connection id 를 넣어주면 된다. 

     connection type은 HTTP 이며, Host에 해당 API 를 넣어주면 된다.

 

✔️ response_check: lambda function 을 사용하거나 정해진 함수를 실행하는데 결과값을 True or False 로 반환되게 해야 한다. 

ex) lambda response: response.status_code == 200

 

✔️ response_filter: response 에 대한 처리를 해준다. lambda function 을 사용하거나 정해진 함수를 실행한고 실행된 값을 return 한다. 

 

 

3. Example

기존 TaskFlowAPI

@task(task_id='test')
def request_to_preprocess_data():
    res = requests.get(endpoint, data=data, headers=headers)
    if res["status_code"] != 200:
        raise BrokenPipeError('retry should be started')
    return res.text

 

SimpleHttpOperator

SimpleHttpOperator(
    task_id='test',
    http_conn_id='api',
    endpoint='/endpoint',
    method='GET',
    data=data,
    response_check=lambda response: response.status_code == 200
)

 

Operator 로 통합하니 코드가 훨씬 깔끔해졌다!

 

📌 주의

SimpleHttpOperator 의 경우 기본 method 는 'Post' 다.

코드를 보면

method 가 'GET' 일 경우 SimpleHttpOperator의 인자로 넘겨준 data 가 자동으로 parameter 로 들어가게 되지만,

'POST' 의 경우 data 는 request body로 들어가게 되고 parameter 를 받는 부분이 따로 없다.

 

method가 POST 인데 data 인자에 parameter 로 넘겨줄 부분을 적어주게 되면 

422 Unprocessable Entity

API 를 FastAPI 로 구현했다면 다음과 같은 에러를 맞닥뜨릴 수 있다. 

 

따러서 request 의 method가 POST 이고 parameter 를 넘기고 싶다면 endpoint 에 직접 parameter 를 적어주도록 하자.

endpoint='/endpoint?type=1&date=2022-12-22'
728x90
반응형

댓글