728x90
반응형
Airflow 내에서 이전 작업의 결과에 따라서 다음 작업이 달라야 할 때 Branch 를 통해 나눌 수 있다.
예를 들어서
조회하고자 하는 connection 이 있는 경우 -> 기존 값을 업데이트
connection 이 없는 경우 -> 새롭게 생성
나같은 경우 이렇게 사용하였다. 보통 데이터를 처리할 때 데이터가 있으면 전처리 수행, 없으면 가져오기 이렇게 많이들 사용하는 것 같다.
하지만 현재는 팀내에서 connection을 관리하는게 주 목적이어서 connection 관리를 위한 목적으로 BranchPythonOperator를 사용했다.
예시코드를 보자. 다음코드는
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator
from datetime import timedelta, datetime
dag_id = 'branch_operator_test'
default_args = {
'owner': 'LIM',
'retries': 0,
'retry_delay': timedelta(minutes=0),
}
def is_even(number):
next_task = 'print_number'
if int(number) % 2 != 0:
next_task = 'find_closet_even_number'
return next_task
@task
def print_number(**context):
input_number = context['dag_run'].conf['number']
print(input_number)
@task
def find_closet_even_number(**context):
input_number = context['dag_run'].conf['number']
print(f'small_even_number:{input_number-1}, large_even_number: {input_number+1}')
check_number = BranchPythonOperator(
task_id='is_even',
python_callable=is_even,
op_kwargs={"number": "{{ dag_run.conf['number']}}"}
)
with DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=['test']
) as dag:
check_number >> print_number()
check_number >> find_closet_even_number()
jinja_template을 이용해서 number를 입력받고 그 숫자가 짝수인지 홀수인지에 따라서 branch 로 나눠지도록 작업했다.
예를 들어서 숫자 9를 입력했을 때 짝수가 아니기 때문에 가장 가까운 짝수를 찾는 task 로 이동하고 print_number task는 skip 된다.
728x90
반응형
'Airflow' 카테고리의 다른 글
Airflow 특정 Task 재실행 | LIM (0) | 2022.12.07 |
---|---|
[Airflow] SFTPToGCSOperator 개념 및 사용방법 | LIM (0) | 2022.11.12 |
[Airflow] Airflow Connection with Google Secret Manager | LIM (0) | 2022.06.16 |
[Airflow] CustomOperator 생성 | LIM (0) | 2022.06.12 |
Mac Airflow local 설치 (0) | 2022.05.14 |
댓글