Dynamic Task Mapping 은 DAG 작성자가 필요한 작업 수를 사전에 알 필요 없이 현재 데이터에 기반하여 실행 시점에 Task 를 생성하도록 한다.
이는 기존에 for loop 을 이용하여 Task 를 정의하는 것과 유사하지만 다른 점이 있다면 기존엔 for loop 을 Task 바깥에 정의하여 돌리는 방식이었다면 Dynamic Task Mapping 의 경우에는 스케줄러가 실행하기 때문에 미리 정의해둘 필요가 없다.
매핑된 작업의 출력을 기반으로 작업을 수행하고 이는 일반적으로 MapReduce 방식으로 알려져 있다.
기존 For Loop 을 이용하여 DAG 를 생성한 결과
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
numbers = [1, 2, 3]
with DAG(
dag_id="simple_mapping",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
@task
def sum_it(**context):
total = 0
for num in numbers:
val += context['ti'].xcom_pull(task_ids=f'task_{num}')
print(f'Total was {total}')
value = sum_it()
for num in numbers:
@task(task_id=f'task_{num}')
def add_one(x: int):
return x + 1
add = add_one(num)
add >> value
✅ numbers 배열을 global 하게 정의해줌으로써 계속해서 memory 에 올라가 있게 됨
✅ task_1, task_2, task_3를 모아서 sum_it 태스크로 전달할 때도 xcom_pull 을 for loop 으로 돌려서 받아와야 함
( 더 나은 방법이 있다면 알려주세요! )
Dynamic Task Mapping 을 사용하여 DAG를 생성한 결과
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="simple_mapping",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
Dynamic Task Mapping Use Case
1. Dict In List
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="dict_in_list",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
@task
def get_data():
student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
{'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
{'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
return student_list
@task
def add_teacher_attribute(**kwargs):
student = kwargs.get('student') # {'id': 1, 'name': 'Kim', 'height': 150, 'class': 3}
if student.get('class') == 3:
student.update({'teacher': 'Gang'})
elif student.get('class') == 5:
student.update({'teacher': 'Lee'})
else:
student.update({'teacher': 'Go'})
print(student)
students = get_data()
add_teacher_attribute.expand(student=students)
✅ expand 를 사용하여 List[Dict] 를 전달할 때 어떤 키값에 Value 를 매핑해줄 지 적어야 한다.
add_teacher_attribute.expand(student=students)
✅ 그리고 다음 task 인 add_teacher_attribute 에서 student 키 값을 가져오면 해당 정보가 value 에 들어있다.
expand_kwargs 를 사용해서 전달할 수도 있다
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
numbers = [1, 2, 3]
with DAG(
dag_id="dict_in_list",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
@task
def get_data():
student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
{'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
{'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
return student_list
@task
def add_teacher_attribute(**student):
if student.get('class') == 3:
student.update({'teacher': 'Gang'})
elif student.get('class') == 5:
student.update({'teacher': 'Lee'})
else:
student.update({'teacher': 'Go'})
print(student)
students = get_data()
add_teacher_attribute.expand_kwargs(students)
하지만 이 expand_kwargs 를 사용해서 전달하면 airflow 에서 기본으로 task에서 전달해주는 인자들도 같이 전달되게 된다.
밑에와 같은 애들..
{'conf': <airflow.configuration.AirflowConfigParser object at 0x104220880>, 'dag': <DAG: dict_in_list>, 'dag_run': <DagRun dict_in_list @ 2023-01-01 00:00:00+00:00: backfill__2023-01-01T00:00:00+00:00, state:running, queued_at: None. externally triggered: False>, 'data_interval_end': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'ds': '2023-01-01', 'ds_nodash': '20230101', 'execution_date': <Proxy at 0x107f6ddc0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'execution_date', DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')))>, 'inlets': [], 'logical_date': DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from '/Users/ruby/.local/share/virtualenvs/dynamic_task_mapping-Y7EibYsy/lib/python3.8/site-packages/airflow/macros/__init__.py'>, 'next_ds': <Proxy at 0x107eccb80 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_ds', None)>, 'next_ds_nodash': <Proxy at 0x107f59d00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_ds_nodash', None)>, 'next_execution_date': <Proxy at 0x107f53c40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'next_execution_date', None)>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x107f71200 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_ds', None)>, 'prev_ds_nodash': <Proxy at 0x107f71380 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_ds_nodash', None)>, 'prev_execution_date': <Proxy at 0x107f6a240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_execution_date', None)>, 'prev_execution_date_success': <Proxy at 0x107f6a100 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'run_id': 'backfill__2023-01-01T00:00:00+00:00', 'task': <Task(_PythonDecoratedOperator): add_teacher_attribute>, 'task_instance': <TaskInstance: dict_in_list.add_teacher_attribute backfill__2023-01-01T00:00:00+00:00 map_index=2 [running]>, 'task_instance_key_str': 'dict_in_list__add_teacher_attribute__20230101', 'test_mode': False, 'ti': <TaskInstance: dict_in_list.add_teacher_attribute backfill__2023-01-01T00:00:00+00:00 map_index=2 [running]>, 'tomorrow_ds': <Proxy at 0x107f6aac0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'tomorrow_ds', '2023-01-02')>, 'tomorrow_ds_nodash': <Proxy at 0x107f6a380 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'tomorrow_ds_nodash', '20230102')>, 'triggering_dataset_events': <Proxy at 0x107f59ec0 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x107f74160>>, 'ts': '2023-01-01T00:00:00+00:00', 'ts_nodash': '20230101T000000', 'ts_nodash_with_tz': '20230101T000000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x107f6a480 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'yesterday_ds', '2022-12-31')>, 'yesterday_ds_nodash': <Proxy at 0x107f6a9c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x107c20f70>, 'yesterday_ds_nodash', '20221231')>, 'id': 3, 'name': 'June', 'height': 180, 'class': 7, 'templates_dict': None, 'teacher': 'Go'}
2. Task decorator를 사용하지 않고 Operator 를 사용하는 경우
예시는 PythonOperator 이다. Operator 에서는 바로 expand 를 호출할 수 없고 partial 함수를 통해 변환 후 expand 를 호출하여 Dynamic Task Mapping 을 가능하게하도록 한다.
from datetime import datetime
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator
with DAG(
dag_id="dict_in_list",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
def get_data():
student_list = [{'id': 1, 'name': 'Kim', 'height': 150, 'class': 3},
{'id': 2, 'name': 'Lim', 'height': 160, 'class': 5},
{'id': 3, 'name': 'June', 'height': 180, 'class': 7}]
return student_list
def add_teacher_attribute(**student):
if student.get('class') == 3:
student.update({'teacher': 'Gang'})
elif student.get('class') == 5:
student.update({'teacher': 'Lee'})
else:
student.update({'teacher': 'Go'})
print(student)
get_data = PythonOperator(
task_id='get_data',
python_callable=get_data
)
add_teacher_attribute = PythonOperator.partial(
task_id='add_teacher_attribute',
python_callable=add_teacher_attribute
).expand(op_kwargs=get_data.output)
3. Dynamic Task Mapping To Dynamic Task Mapping
from airflow.decorators import task
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="repeated_mapping",
start_date=datetime(2022, 3, 4),
schedule_interval='@once'
) as dag:
@task
def add_one(x: int):
return x + 1
@task
def add_two(x: int):
return x + 2
first = add_one.expand(x=[1, 2, 3])
second = add_two.expand(x=first)
다른 예제들은 모두 공식 document 에 잘 나와있으니 같이 참고해서 해보시면 될 것 같습니다!
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
'Airflow' 카테고리의 다른 글
[Airflow] KubernetesPodOperator 로컬 테스트를 위한 환경 구축(feat. KIND(Kubernetes In Docker) | LIM (0) | 2023.09.17 |
---|---|
[Airflow] LocalExecutor 사용 시 PostgreSQL 설치 (Mac M1) | LIM (0) | 2023.06.04 |
[Airflow] Sensor - ExternalTaskSensor | LIM (0) | 2023.05.12 |
[Airflow] DAG Creation Boilerplate CLI 도구 생성 | LIM (0) | 2023.01.14 |
[Airflow] Connection 정보 Vault 이관 작업(feat.Plugin) | LIM (0) | 2023.01.13 |
댓글