본문 바로가기
Airflow

[Airflow] Dynamic Task Mapping 사용해보기(w/UseCase) | LIM

by forestlim 2023. 6. 3.
728x90
반응형

Dynamic Task Mapping 은 DAG 작성자가 필요한 작업 수를 사전에 알 필요 없이 현재 데이터에 기반하여 실행 시점에 Task 를 생성하도록 한다. 

이는 기존에 for loop 을 이용하여 Task 를 정의하는 것과 유사하지만 다른 점이 있다면 기존엔 for loop 을 Task 바깥에 정의하여 돌리는 방식이었다면 Dynamic Task Mapping 의 경우에는 스케줄러가 실행하기 때문에 미리 정의해둘 필요가 없다. 

매핑된 작업의 출력을 기반으로 작업을 수행하고 이는 일반적으로 MapReduce 방식으로 알려져 있다. 

 

 

기존 For Loop 을 이용하여 DAG 를 생성한 결과

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

numbers = [1, 2, 3]

with DAG(
	dag_id="simple_mapping",
    start_date=datetime(2022, 3, 4),
    schedule_interval='@once'
    ) as dag:
    @task
    def sum_it(**context):
        total = 0
        for num in numbers:
            val += context['ti'].xcom_pull(task_ids=f'task_{num}')
        print(f'Total was {total}')


    value = sum_it()

    for num in numbers:
        @task(task_id=f'task_{num}')
        def add_one(x: int):
            return x + 1

        add = add_one(num)
        add >> value

✅ numbers 배열을 global 하게 정의해줌으로써 계속해서 memory 에 올라가 있게 됨

✅ task_1, task_2, task_3를 모아서 sum_it 태스크로 전달할 때도 xcom_pull 을 for loop 으로 돌려서 받아와야 함
( 더 나은 방법이 있다면 알려주세요! )

 

Dynamic Task Mapping 을 사용하여 DAG를 생성한 결과

https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(
	dag_id="simple_mapping",
    start_date=datetime(2022, 3, 4),
    schedule_interval='@once'
    ) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

 

 

 

Dynamic Task Mapping Use Case

1. Dict In List

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(
	dag_id="dict_in_list",
    start_date=datetime(2022, 3, 4),
    schedule_interval='@once'
    ) as dag:

    @task
    def get_data():
        student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
                        {'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
                        {'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
        return student_list

    @task
    def add_teacher_attribute(**kwargs):
        student = kwargs.get('student') # {'id': 1, 'name': 'Kim', 'height': 150, 'class': 3}
        if student.get('class') == 3:
            student.update({'teacher': 'Gang'})
        elif student.get('class') == 5:
            student.update({'teacher': 'Lee'})
        else:
            student.update({'teacher': 'Go'})
        print(student)

    students = get_data()
    add_teacher_attribute.expand(student=students)

 

✅ expand 를 사용하여 List[Dict] 를 전달할 때 어떤 키값에 Value 를 매핑해줄 지 적어야 한다. 

add_teacher_attribute.expand(student=students)

 

✅ 그리고 다음 task 인 add_teacher_attribute 에서 student 키 값을 가져오면 해당 정보가 value 에 들어있다. 

 

 

expand_kwargs 를 사용해서 전달할 수도 있다

 

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

numbers = [1, 2, 3]

with DAG(
	dag_id="dict_in_list",
    start_date=datetime(2022, 3, 4),
    schedule_interval='@once'
    ) as dag:

    @task
    def get_data():
        student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
                        {'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
                        {'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
        return student_list

    @task
    def add_teacher_attribute(**student):
        if student.get('class') == 3:
            student.update({'teacher': 'Gang'})
        elif student.get('class') == 5:
            student.update({'teacher': 'Lee'})
        else:
            student.update({'teacher': 'Go'})
        print(student)

    students = get_data()
    add_teacher_attribute.expand_kwargs(students)

 

하지만 이 expand_kwargs 를 사용해서 전달하면 airflow 에서 기본으로 task에서 전달해주는 인자들도 같이 전달되게 된다. 

밑에와 같은 애들..

더보기

{'conf': <airflow.configuration.AirflowConfigParser object at 0x104220880>, 'dag': <DAG: dict_in_list>, 'dag_run': <DagRun dict_in_list @ 2023-01-01 00:00:00+00:00: backfill__2023-01-01T00:00:00+00:00, state:running, queued_at: None. externally triggered: False>, 'data_interval_end': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'ds': '2023-01-01', 'ds_nodash': '20230101', 'execution_date': <Proxy at 0x107f6ddc0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'execution_date', DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')))>, 'inlets': [], 'logical_date': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from '/Users/ruby/.local/share/virtualenvs/dynamic_task_mapping-Y7EibYsy/lib/python3.8/site-packages/airflow/macros/__init__.py'>, 'next_ds': <Proxy at 0x107eccb80 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_ds', None)>, 'next_ds_nodash': <Proxy at 0x107f59d00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_ds_nodash', None)>, 'next_execution_date': <Proxy at 0x107f53c40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_execution_date', None)>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x107f71200 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_ds', None)>, 'prev_ds_nodash': <Proxy at 0x107f71380 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_ds_nodash', None)>, 'prev_execution_date': <Proxy at 0x107f6a240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_execution_date', None)>, 'prev_execution_date_success': <Proxy at 0x107f6a100 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'run_id': 'backfill__2023-01-01T00:00:00+00:00', 'task': <Task(_PythonDecoratedOperator): add_teacher_attribute>, 'task_instance': <TaskInstance: dict_in_list.add_teacher_attribute backfill__2023-01-01T00:00:00+00:00 map_index=2 [running]>, 'task_instance_key_str': 'dict_in_list__add_teacher_attribute__20230101', 'test_mode': False, 'ti': <TaskInstance: dict_in_list.add_teacher_attribute backfill__2023-01-01T00:00:00+00:00 map_index=2 [running]>, 'tomorrow_ds': <Proxy at 0x107f6aac0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'tomorrow_ds', '2023-01-02')>, 'tomorrow_ds_nodash': <Proxy at 0x107f6a380 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'tomorrow_ds_nodash', '20230102')>, 'triggering_dataset_events': <Proxy at 0x107f59ec0 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x107f74160>>, 'ts': '2023-01-01T00:00:00+00:00', 'ts_nodash': '20230101T000000', 'ts_nodash_with_tz': '20230101T000000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x107f6a480 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'yesterday_ds', '2022-12-31')>, 'yesterday_ds_nodash': <Proxy at 0x107f6a9c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'yesterday_ds_nodash', '20221231')>, 'id': 3, 'name': 'June', 'height': 180, 'class': 7, 'templates_dict': None, 'teacher': 'Go'}

 

2. Task decorator를 사용하지 않고 Operator 를 사용하는 경우

예시는 PythonOperator 이다. Operator 에서는 바로 expand 를 호출할 수 없고 partial 함수를 통해 변환 후 expand 를 호출하여 Dynamic Task Mapping 을 가능하게하도록 한다. 

from datetime import datetime

from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator


with DAG(
	dag_id="dict_in_list",
    start_date=datetime(2022, 3, 4), 
    schedule_interval='@once'
    ) as dag:

    def get_data():
        student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
                        {'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
                        {'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
        return student_list

    def add_teacher_attribute(**student):
        if student.get('class') == 3:
            student.update({'teacher': 'Gang'})
        elif student.get('class') == 5:
            student.update({'teacher': 'Lee'})
        else:
            student.update({'teacher': 'Go'})
        print(student)


    get_data = PythonOperator(
        task_id='get_data',
        python_callable=get_data
    )

    add_teacher_attribute = PythonOperator.partial(
        task_id='add_teacher_attribute',
        python_callable=add_teacher_attribute
    ).expand(op_kwargs=get_data.output)

 

 

3. Dynamic Task Mapping To Dynamic Task Mapping

from airflow.decorators import task
from airflow import DAG

from datetime import datetime


with DAG(
	dag_id="repeated_mapping",
    start_date=datetime(2022, 3, 4),
    schedule_interval='@once'
    ) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def add_two(x: int):
        return x + 2

    first = add_one.expand(x=[1, 2, 3])
    second = add_two.expand(x=first)

 

다른 예제들은 모두 공식 document 에 잘 나와있으니 같이 참고해서 해보시면 될 것 같습니다!

https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html

 

Dynamic Task Mapping — Airflow Documentation

 

airflow.apache.org

 

728x90
반응형

댓글