회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같아서 CustomOperator 생성하는 방법을 찾아보게 되었다.
airflow 공식 document와 BaseOperator, PythonOperator를 참조해서 만들었다.
✅ airflow 공식 document
https://airflow.apache.org/docs/apache-airflow/2.2.3/howto/custom-operator.html
airflow를 사용하면서 느끼는 거지만 공식 document 설명이 좀 불친절한 것 같다. 특히 원래 CustomDecorator를 생성하려 했는데 그건 공식 document만 보고서는 너무 설명이 부족해서 아직 도전하지 못했다..
📌 GetOperator 생성
이 Operator의 주 목적은 python의 requests.get()을 수행하는 것이다. 공식 document에는 다음과 같이 나와있다.
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
위 document를 참고해서 내가 맨 처음에 만든 GetOperator이다.
from airflow.models.baseoperator import BaseOperator
import requests
class GetOperator(BaseOperator):
def __init__(self, endpoint: str, **kwargs) -> None:
super().__init__(**kwargs)
self.endpoint = endpoint
def execute(self, context):
res = requests.get(self.endpoint)
return {
endpoint: endpoint,
status_code: res.status_code,
data: res.json()
}
execute부분이 실제 실행되는 부분이다.
이 GetOperator 실제로 dag내에서 task로 사용하는 코드는 이렇다.
get_op = GetOperator(
task_id="test",
endpoint="http://www.naver.com"
)
해당 task에서 return 되는 값은 xcom에 저장된다. 여기서 의문점이 들었다. requests.get()을 할 때 endpoint만 넣는 게 아니고 params도 같이 넣어야 하는데 params는 어떻게 넣어야 하지..생각하다가 PythonOperator가 어떻게 만들어졌는 지 참고하기로 했다.
🧐 PythonOperator도 BaseOperator를 상속받아서 정의한 건데 어떻게 만들었을까?
PythonOperator가 정의된 코드를 살펴보기 시작했다.
공식 document에 정의된 PythonOperator이다.
PythonOperator에서 op_kwargs를 통해 object를 전달한 기억이 나서 op_kwargs대신 params를 넣어보기로 했다.
class GetOperator(BaseOperator):
template_fields = ('parameters')
template_fields_renderers = {"parameters": "json"}
LAVENDER = "#E6E6FA" # 해당 operator의 컬러를 설정할 수 있다.
ui_color = LAVENDER
def __init__(
self,
endpoint: str,
params: Optional[Dict] = None,
**kwargs
) -> None:
super().__init__(**kwargs)
self.endpoint = endpoint
self.params = params or {}
def execute(self, context: Dict):
logging.info(f'endpoint - {self.endpoint}')
try:
logging.info(f'params - {self.params}')
r = requests.get(
url=self.endpoint,
params=self.params,
)
return {
"endpoint": self.endpoint,
"status_code": r.status_code,
"data": r.json()
}
except Exception as e:
logging.warning(f"failed to request - {e}")
return {
"endpoint": self.endpoint,
"status_code": 501
}
최종적으로 위와 같이 GetOperator를 생성하니 내가 원하는 대로 request의 parameter를 받아서 정상적으로 실행이 되었다.
위 방식으로 PostOperator도 생성했다. PostOperator는 params, data 모두 받아올 수 있도록 변경한 것 뿐이다.
(ex. requests.post(endpoint, params=params, data=data)
기분이 너무 좋았다...이거 만들려고 진짜 하루 통째로 투자했다..
하지만 이렇게 좋아할 수만은 없었다..역시 여기서 끝나면 섭섭하다
이제 request후 return 된 데이터가 GetOperator를 실행한 task의 xcom에 저장되어 있는데 다음 task에서 저걸 어떻게 가져갈까 하는 것이었다.
여기서부터 또 삽질이 시작되었다..
이건 또 다음에 정리하도록 하겠다..⭐️
'Airflow' 카테고리의 다른 글
[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM (0) | 2022.07.19 |
---|---|
[Airflow] Airflow Connection with Google Secret Manager | LIM (0) | 2022.06.16 |
Mac Airflow local 설치 (0) | 2022.05.14 |
[Airflow] Datetime (feat. 자동실행 방지) (0) | 2022.05.14 |
[Airflow] Airflow 기본 개념 (2) | 2021.08.14 |
댓글