Airflow 관리 관련해서 쏘카 기술 블로그 글을 읽다가 반복되는 DAG 코드를 쉽게 작성하기 위해 CLI 도구를 만들었다는 것을 보고 우리 팀에도 적용되면 좋을 것 같아 구현해 보았다.
이 도구는 이럴 때 유용할 것 같다.
- DAG 작성자가 처음에 DAG 작성을 어떻게 할지 모를 때
- 팀 내 DAG Template을 공통되게 유지하고 싶을 때
우리 팀은 현재 이럴 때 이미 작성된 코드를 복붙 해서 쓰고 있었는데, 뼈대가 되는 코드를 어떤 걸 기준으로 삼을 건지가 명확하지 않았고 처음부터 다들 제각각 작성하고 있어서 코드가 통일되지 않았었다.
참고했던 쏘카 기술 블로그이다.
https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
먼저 CLI Arguments를 사용하기 위해서 Click 모듈과 PyInquirer 모듈을 사용하였다.
- Click https://click.palletsprojects.com/en/8.1.x/
- PyInquirer https://kimmj.github.io/python/python-beautiful-cli/
📚Python Click(Command Line Interface Creation Kit)
Command 환경에서 Python 스크립트에 인자를 넣을 수 있도록 도와주는 라이브러리이다.
먼저 click 모듈을 설치한 후 라이브러리를 불러온 뒤 @click. command() 데코레이터를 써주면 된다. 여기까지는 기존 스크립트와 동일하다.
click 은 argument와 option을 인자로 받을 수 있는데 argument의 경우 필수로 넣어주는 인자이고, option의 경우 말 그대로 option이다. 넣어도 되고 안 넣어도 되는 옵션을 구현할 때 쓰인다. 자세한 건 이 블로그를 참고하면 좋을 것 같다.
📚PyInquirer(Interactive Command Line User Interface)
PyInquirere는 command line interface를 이쁘게 제공해 주는 툴이다.
- 질문 리스트를 정의하고 이를 prompt에 전달한다
- prompt는 답변을 dictionary 형태로 리턴한다.
코드를 실행하면 다음과 같이 예쁜 CLI 툴이 완성된다.
👩💻 두 가지 모듈을 이용해서 DAG BoilerPlate 코드 작성
🚗 Shell 파일과 Makefile 을 이용하여 DAG 코드 자동완성
shell 파일
#!/bin/bash
dag_id="$1"
owner="$2";
retries="$3";
retry_delay="$4";
description="$5";
schedule_interval="$6";
start_date="$7";
catchup="$8";
tags="$9";
dir="$(pwd)"
cat > ${dir}/${dag_id}.py << EOF
import os
from datetime import timedelta, datetime
import pendulum
from airflow import DAG
seoul_time = pendulum.timezone('Asia/Seoul')
dag_name = os.path.basename(__file__).split('.')[0]
default_args = {
'owner': '${owner}',
'retries': ${retries},
'retry_delay': timedelta(minutes=${retry_delay})
}
with DAG(
dag_id=dag_name,
default_args=default_args,
description='${description}',
schedule_interval='${schedule_interval}',
start_date=datetime(${start_date}, tzinfo=seoul_time),
catchup=${catchup},
tags=${tags}
) as dag:
pass
EOF
Makefile
.PHONY: create
create:
@python dag_code_generator.py
코드 실행
>> make create
다음과 같이 cli 에서 실행하면 PyInquirer 모듈을 통해서 필요한 정보를 입력받은 후 그 정보를 토대로 DAG 를 생성할 수 있다!
'Airflow' 카테고리의 다른 글
[Airflow] Dynamic Task Mapping 사용해보기(w/UseCase) | LIM (0) | 2023.06.03 |
---|---|
[Airflow] Sensor - ExternalTaskSensor | LIM (0) | 2023.05.12 |
[Airflow] Connection 정보 Vault 이관 작업(feat.Plugin) | LIM (0) | 2023.01.13 |
[Airflow] Custom UI for Airflow | LIM (0) | 2022.12.29 |
[Operator] SimpleHttpOperator | LIM (0) | 2022.12.21 |
댓글