본문 바로가기
Airflow

[Airflow] DAG Creation Boilerplate CLI 도구 생성 | LIM

by forestlim 2023. 1. 14.
728x90
반응형

Airflow 관리 관련해서 쏘카 기술 블로그 글을 읽다가 반복되는 DAG 코드를 쉽게 작성하기 위해 CLI 도구를 만들었다는 것을 보고 우리 팀에도 적용되면 좋을 것 같아 구현해 보았다.

 

이 도구는 이럴 때 유용할 것 같다.

  • DAG 작성자가 처음에 DAG 작성을 어떻게 할지 모를 때
  • 팀 내 DAG Template을 공통되게 유지하고 싶을 때

우리 팀은 현재 이럴 때 이미 작성된 코드를 복붙 해서 쓰고 있었는데, 뼈대가 되는 코드를 어떤 걸 기준으로 삼을 건지가 명확하지 않았고 처음부터 다들 제각각 작성하고 있어서 코드가 통일되지 않았었다.

 

참고했던 쏘카 기술 블로그이다.

https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html

 

쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)

지난 3년간 Airflow 구축 및 운영기록

tech.socarcorp.kr


먼저 CLI Arguments를 사용하기 위해서 Click 모듈과 PyInquirer 모듈을 사용하였다. 

 

📚Python Click(Command Line Interface Creation Kit)

Command 환경에서 Python 스크립트에 인자를 넣을 수 있도록 도와주는 라이브러리이다. 

먼저 click 모듈을 설치한 후 라이브러리를 불러온 뒤 @click. command() 데코레이터를 써주면 된다. 여기까지는 기존 스크립트와 동일하다.

click 은 argument와 option을 인자로 받을 수 있는데 argument의 경우 필수로 넣어주는 인자이고, option의 경우 말 그대로 option이다. 넣어도 되고 안 넣어도 되는 옵션을 구현할 때 쓰인다. 자세한 건 이 블로그를 참고하면 좋을 것 같다. 

 

Click _ argument, option으로 인자 전달하기

Click 이란? click은 커맨드 명령 환경에서 Python 스크립트에 인자를 넣을 수 있도록 도와주는 라이브러리이다. 예를 들어 하나의 스크립트가 날짜에 따라 동작이 달라져야 한다고 가정해보자. 우리

daco2020.tistory.com

 

📚PyInquirer(Interactive Command Line User Interface)

PyInquirere는 command line interface를 이쁘게 제공해 주는 툴이다. 

  • 질문 리스트를 정의하고 이를 prompt에 전달한다
  • prompt는 답변을 dictionary 형태로 리턴한다.

 

코드를 실행하면 다음과 같이 예쁜 CLI 툴이 완성된다.

 

👩‍💻 두 가지 모듈을 이용해서 DAG BoilerPlate 코드 작성

 

🚗 Shell 파일과 Makefile 을 이용하여 DAG 코드 자동완성

shell 파일

#!/bin/bash
dag_id="$1"
owner="$2";
retries="$3";
retry_delay="$4";
description="$5";
schedule_interval="$6";
start_date="$7";
catchup="$8";
tags="$9";
dir="$(pwd)"

cat > ${dir}/${dag_id}.py << EOF
import os
from datetime import timedelta, datetime
import pendulum
from airflow import DAG

seoul_time = pendulum.timezone('Asia/Seoul')
dag_name = os.path.basename(__file__).split('.')[0]

default_args = {
    'owner': '${owner}',
    'retries': ${retries}, 
    'retry_delay': timedelta(minutes=${retry_delay})
}

with DAG(
    dag_id=dag_name,
    default_args=default_args,
    description='${description}',
    schedule_interval='${schedule_interval}',
    start_date=datetime(${start_date}, tzinfo=seoul_time),
    catchup=${catchup},
    tags=${tags}
) as dag:
    pass
EOF

 

Makefile

.PHONY: create
create:
	@python dag_code_generator.py

 

코드 실행

>> make create

다음과 같이 cli 에서 실행하면 PyInquirer 모듈을 통해서 필요한 정보를 입력받은 후 그 정보를 토대로 DAG 를 생성할 수 있다!

728x90
반응형

댓글