본문 바로가기
Airflow

[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM

by forestlim 2022. 7. 19.
728x90
반응형

Airflow 내에서 이전 작업의 결과에 따라서 다음 작업이 달라야 할 때 Branch 를 통해 나눌 수 있다. 

 

예를 들어서 

조회하고자 하는 connection 이 있는 경우 -> 기존 값을 업데이트
connection 이 없는 경우 -> 새롭게 생성

나같은 경우 이렇게 사용하였다. 보통 데이터를 처리할 때 데이터가 있으면 전처리 수행, 없으면 가져오기 이렇게 많이들 사용하는 것 같다.

하지만 현재는 팀내에서 connection을 관리하는게 주 목적이어서 connection 관리를 위한 목적으로 BranchPythonOperator를 사용했다.

 

예시코드를 보자. 다음코드는 

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator

from datetime import timedelta, datetime

dag_id = 'branch_operator_test'

default_args = {
    'owner': 'LIM',
    'retries': 0,
    'retry_delay': timedelta(minutes=0),
}


def is_even(number):
    next_task = 'print_number'
    if int(number) % 2 != 0:
        next_task = 'find_closet_even_number'
    return next_task


@task
def print_number(**context):
    input_number = context['dag_run'].conf['number']
    print(input_number)


@task
def find_closet_even_number(**context):
    input_number = context['dag_run'].conf['number']
    print(f'small_even_number:{input_number-1}, large_even_number: {input_number+1}')


check_number = BranchPythonOperator(
    task_id='is_even',
    python_callable=is_even,
    op_kwargs={"number": "{{ dag_run.conf['number']}}"}
)

with DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=['test']
) as dag:

    check_number >> print_number()
    check_number >> find_closet_even_number()

 

jinja_template을 이용해서 number를 입력받고 그 숫자가 짝수인지 홀수인지에 따라서 branch 로 나눠지도록 작업했다.

예를 들어서 숫자 9를 입력했을 때 짝수가 아니기 때문에 가장 가까운 짝수를 찾는 task 로 이동하고 print_number task는 skip 된다.

728x90
반응형

댓글