Intro
Airflow의 DAG안에 Task들 간의 의존성을 통해 Task의 실행 여부 및 순서를 결정할 수 있듯이 DAG 간에도 가능하다.
예로, DAG A의 마지막 TaskC의 실행이 성공으로 끝나면 DAG B의 Sensor가 이를 인지하고 Task D가 시작되는 이러한 흐름이다.
ExternalTaskSensor의 구성요소
✔️ task_id - 현재 dag( 위의 예시에서 Dag B )의 task 이름
✔️ external_dag_id - sensing 하는 dag( 위의 예시에서 Dag A )의 이름
✔️ external_task_id - sensing 하는 dag의 최종적인 task 이름( 위의 예시에서 Dag A - Task C )
✔️ execution_date_fn - sensing 대상의 execution_date( 스케줄러에 Dag 가 올라가는 시간 )
✔️ allowed_states - 다음 Dag가 실행될 조건
✔️ failed_states - 다음 Dag가 실행되지 않는 조건
위 구성요소 중 execution_date_fn 파라미터만 주의하면 쉽게 사용할 수 있다.
Airflow 내에서는 Scheduler 에 올라가는 시간과 실행되는 시간이 다르다.
처음 Airflow 를 사용한다면 이 Scheduler와 실행되는 시간 사이에 혼돈이 올 수 있다. Airflow Datetime에 대해서 정리해 둔 글이다.
예시를 통해 살펴보기
A Dag 의 경우 오전 9시 25분, 오후 4시 25분 이렇게 하루에 2차례 실행된다.
오전 9시 25분에 실행되는 dag run의 경우 전날 오후 4시 25분에 2차가 실행되면서 다음날 1차 Dag가 Scheduler Queue에 올라가게 된다.
아래 그림에서 RunId의 경우 UTC로 되어 있어서 5월 9일 9시 25분이라고 생각하면 된다. Local:KST 시간을 보면 16:25분이다.
즉, 16시 25분에 실행되는 dag run의 경우 scheduler queue 에 올라가는 시간은 오전 9시 25분인 것이다.
🧐 그렇다면 ExternalTaskSensor 의 경우 execution_date_fn을 어떻게 맞춰야 할까?
B Dag 의 경우 오후 4시 25분 하루에 한 번만 실행된다. 즉, A Dag의 오후 4시 25분 Dag run을 poking 하고 있다가 끝난 후 실행된다.
A Dag. 오후 4시 25분에 실행되는 A Dag의 경우 오전 9시 25분에 scheduler 에 올라가게 됨
B Dag. 오후 4시 25분에 하루에 한 번 실행되는 B Dag의 경우 전날 오후 4시 25분에 scheduler 에 올라가게 됨
따라서 A Dag의 execution_date 와 B Dag의 execution_date의 차이를 구하면 된다.
즉, B Dag가 전날 오후 4시 25분이 execution_date 이고, A Dag가 다음날 오전 9시 25분이므로 아래와 같이 설정해야 한다.
execution_date_fn = lambda x: x + timedelta(hours=17)
📌 가장 중요한 schedule_interval
A Dag를 B Dag 가 Poking 하다가 A Dag 의 특정 Task가 정의해 둔 Status 가 되었을 때 B Dag 가 실행되는 것이기 때문에 schedule_interval 같은 경우 다음처럼 정의되어야 한다.
A Dag
schedule_interval=25 09,16 * * *
B Dag
schedule_interval=25 16 * * *
ExternalTaskSensor 의 경우 1분에 한 번씩 이전 DAG의 Task 상태를 Poking 한다. 만약 이전 Task 가 너무 오래 걸리거나 에러가 발생해 안 끝나는 경우 Poking 하고 있는 Sensor 가 계속해서 Poking 하면서 리소스를 점유하고 있기 때문에 주의해서 사용하는 것이 좋다.
'Airflow' 카테고리의 다른 글
[Airflow] LocalExecutor 사용 시 PostgreSQL 설치 (Mac M1) | LIM (0) | 2023.06.04 |
---|---|
[Airflow] Dynamic Task Mapping 사용해보기(w/UseCase) | LIM (0) | 2023.06.03 |
[Airflow] DAG Creation Boilerplate CLI 도구 생성 | LIM (0) | 2023.01.14 |
[Airflow] Connection 정보 Vault 이관 작업(feat.Plugin) | LIM (0) | 2023.01.13 |
[Airflow] Custom UI for Airflow | LIM (0) | 2022.12.29 |
댓글