본문 바로가기
Airflow

[Airflow] Sensor - ExternalTaskSensor | LIM

by forestlim 2023. 5. 12.
728x90
반응형

Intro

Airflow의 DAG안에 Task들 간의 의존성을 통해 Task의 실행 여부 및 순서를 결정할 수 있듯이 DAG 간에도 가능하다.

예로, DAG A의 마지막 TaskC의 실행이 성공으로 끝나면 DAG B의 Sensor가 이를 인지하고 Task D가 시작되는 이러한 흐름이다.

 

 

ExternalTaskSensor의 구성요소

✔️ task_id - 현재 dag( 위의 예시에서 Dag B )의 task 이름

✔️ external_dag_id - sensing 하는 dag( 위의 예시에서 Dag A )의 이름

✔️ external_task_id - sensing 하는 dag의 최종적인 task 이름( 위의 예시에서 Dag A - Task C )

✔️ execution_date_fn - sensing 대상의 execution_date( 스케줄러에 Dag 가 올라가는 시간 )

✔️ allowed_states - 다음 Dag가 실행될 조건

✔️ failed_states - 다음 Dag가 실행되지 않는 조건

 

위 구성요소 중 execution_date_fn 파라미터만 주의하면 쉽게 사용할 수 있다. 

 

Airflow 내에서는 Scheduler 에 올라가는 시간과 실행되는 시간이 다르다.


처음 Airflow 를 사용한다면 이 Scheduler와 실행되는 시간 사이에 혼돈이 올 수 있다. Airflow Datetime에 대해서 정리해 둔 글이다.

https://amazelimi.tistory.com/entry/Airflow-datetime-feat-%EC%9E%90%EB%8F%99%EC%8B%A4%ED%96%89-%EB%B0%A9%EC%A7%80

 

[Airflow] Datetime (feat. 자동실행 방지)

📌 Airflow 실행날짜 실행 날짜는 batch job 의 특성을 가진 Airflow에서 매우 중요하다. Airflow에서 start_date 는 실행 시작 날짜가 아니라 스케줄이 시작되는 날짜라는 것을 기억해야한다. 예를 들어서

amazelimi.tistory.com

 

 

예시를 통해 살펴보기

A Dag 의 경우 오전 9시 25분, 오후 4시 25분 이렇게 하루에 2차례 실행된다.

오전 9시 25분에 실행되는 dag run의 경우 전날 오후 4시 25분에 2차가 실행되면서 다음날 1차 Dag가 Scheduler Queue에 올라가게 된다.

 

아래 그림에서 RunId의 경우 UTC로 되어 있어서 5월 9일 9시 25분이라고 생각하면 된다. Local:KST 시간을 보면 16:25분이다.

즉, 16시 25분에 실행되는 dag run의 경우 scheduler queue 에 올라가는 시간은 오전 9시 25분인 것이다.

 

 

🧐 그렇다면 ExternalTaskSensor 의 경우 execution_date_fn을 어떻게 맞춰야 할까?

B Dag 의 경우 오후 4시 25분 하루에 한 번만 실행된다. 즉, A Dag의 오후 4시 25분 Dag run을 poking 하고 있다가 끝난 후 실행된다.

 

A Dag. 오후 4시 25분에 실행되는 A Dag의 경우 오전 9시 25분에 scheduler 에 올라가게 됨

B Dag.  오후 4시 25분에 하루에 한 번 실행되는 B Dag의 경우 전날 오후 4시 25분에 scheduler 에 올라가게 됨

 

따라서 A Dag의 execution_date 와 B Dag의 execution_date의 차이를 구하면 된다.

즉, B Dag가 전날 오후 4시 25분이 execution_date 이고, A Dag가 다음날 오전 9시 25분이므로 아래와 같이 설정해야 한다.

 

execution_date_fn = lambda x: x + timedelta(hours=17)

 

 

📌 가장 중요한 schedule_interval

A Dag를 B Dag 가 Poking 하다가 A Dag 의 특정 Task가 정의해 둔 Status 가 되었을 때 B Dag 가 실행되는 것이기 때문에 schedule_interval 같은 경우 다음처럼 정의되어야 한다. 

 

A Dag

schedule_interval=25 09,16 * * *

 

B Dag

schedule_interval=25 16 * * *

 


ExternalTaskSensor 의 경우 1분에 한 번씩 이전 DAG의 Task 상태를 Poking 한다. 만약 이전 Task 가 너무 오래 걸리거나 에러가 발생해 안 끝나는 경우 Poking 하고 있는 Sensor 가 계속해서 Poking 하면서 리소스를 점유하고 있기 때문에 주의해서 사용하는 것이 좋다. 

728x90
반응형

댓글