본문 바로가기
Airflow

[Airflow] CustomOperator 생성 | LIM

by forestlim 2022. 6. 12.
728x90
반응형

회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같아서 CustomOperator 생성하는 방법을 찾아보게 되었다.

 

airflow 공식 document와 BaseOperator, PythonOperator를 참조해서 만들었다.

✅ airflow 공식 document

https://airflow.apache.org/docs/apache-airflow/2.2.3/howto/custom-operator.html

airflow를 사용하면서 느끼는 거지만 공식 document 설명이 좀 불친절한 것 같다. 특히 원래 CustomDecorator를 생성하려 했는데 그건 공식 document만 보고서는 너무 설명이 부족해서 아직 도전하지 못했다..

 

📌 GetOperator 생성

이 Operator의 주 목적은 python의 requests.get()을 수행하는 것이다. 공식 document에는 다음과 같이 나와있다. 

from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

 

위 document를 참고해서 내가 맨 처음에 만든 GetOperator이다.

 

from airflow.models.baseoperator import BaseOperator
import requests

class GetOperator(BaseOperator):
    def __init__(self, endpoint: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.endpoint = endpoint

    def execute(self, context):
    	res = requests.get(self.endpoint)
        return {
            endpoint: endpoint,
            status_code: res.status_code,
            data: res.json()
            }

execute부분이 실제 실행되는 부분이다. 

 

이 GetOperator 실제로 dag내에서 task로 사용하는 코드는 이렇다.

get_op = GetOperator(
        task_id="test",
        endpoint="http://www.naver.com"
    )

 

해당 task에서 return 되는 값은 xcom에 저장된다. 여기서 의문점이 들었다. requests.get()을 할 때 endpoint만 넣는 게 아니고 params도 같이 넣어야 하는데 params는 어떻게 넣어야 하지..생각하다가 PythonOperator가 어떻게 만들어졌는 지 참고하기로 했다.

 

🧐 PythonOperator도 BaseOperator를 상속받아서 정의한 건데 어떻게 만들었을까?

PythonOperator가 정의된 코드를 살펴보기 시작했다. 

공식 document에 정의된 PythonOperator이다.

 

https://airflow.apache.org/docs/apache-airflow/2.2.3/_api/airflow/operators/python/index.html?highlight=pythonoperator#airflow.operators.python.PythonOperator

 

airflow.operators.python — Airflow Documentation

 

airflow.apache.org

 

PythonOperator에서 op_kwargs를 통해 object를 전달한 기억이 나서 op_kwargs대신 params를 넣어보기로 했다.

class GetOperator(BaseOperator):
    template_fields = ('parameters')
    template_fields_renderers = {"parameters": "json"}
    LAVENDER = "#E6E6FA" # 해당 operator의 컬러를 설정할 수 있다. 
    ui_color = LAVENDER

    def __init__(
            self,
            endpoint: str,
            params: Optional[Dict] = None,
            **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.endpoint = endpoint
        self.params = params or {}

    def execute(self, context: Dict):
        logging.info(f'endpoint - {self.endpoint}')
        try:
            logging.info(f'params - {self.params}')
            r = requests.get(
                url=self.endpoint,
                params=self.params,
            )
            return {
                "endpoint": self.endpoint,
                "status_code": r.status_code,
                "data": r.json()
            }
        except Exception as e:
            logging.warning(f"failed to request - {e}")
            return {
                "endpoint": self.endpoint,
                "status_code": 501
            }

 

최종적으로 위와 같이 GetOperator를 생성하니 내가 원하는 대로 request의 parameter를 받아서 정상적으로 실행이 되었다. 

위 방식으로 PostOperator도 생성했다. PostOperator는 params, data 모두 받아올 수 있도록 변경한 것 뿐이다.

(ex. requests.post(endpoint, params=params, data=data)

 

 

기분이 너무 좋았다...이거 만들려고 진짜 하루 통째로 투자했다..

하지만 이렇게 좋아할 수만은 없었다..역시 여기서 끝나면 섭섭하다

이제 request후 return 된 데이터가 GetOperator를 실행한 task의 xcom에 저장되어 있는데 다음 task에서 저걸 어떻게 가져갈까 하는 것이었다. 

 

여기서부터 또 삽질이 시작되었다..

 

이건 또 다음에 정리하도록 하겠다..⭐️

728x90
반응형

댓글