본문 바로가기
반응형

Airflow17

[Airflow] KubernetesExecutor vs CeleryExecutor | LIM KubernetesPodOperator 를 사용하게 되면서 자동으로 Executor 는 어떻게 설정해야 하는 것인가에 대한 고민이 시작되었다. 물론 현재 설정되어 있는 CeleryExecutor로도 KubernetesPodOperator는 정상 동작한다. 그럼 왜, KubernetesExecutor 를 사용하는 것일까? 기본 차이점 CeleryExecutor의 경우 여러대의 워커 머신을 사용해서 동시에 여러 작업을 분산 실행할 수 있다. Celery는 메시지 브로커(RabbitMQ, Redis 등)를 사용하여 메시지를 전달하고, 워커 머신들은 이러한 메시지를 받아 작업을 실행한다. 아키텍처는 다음과 같다. KubernetesExecutor의 경우 각 Airflow 작업을 Kubernetes의 개별 파드로.. 2023. 9. 17.
[Airflow] KubernetesPodOperator 로컬 테스트를 위한 환경 구축(feat. KIND(Kubernetes In Docker) | LIM docker image를 활용해 airflow에서 실행시키기 위해서 KubernetesPodOperator를 사용하게 되었는데 이는 기존 로컬 환경에서 테스트가 바로 안 된다는 단점이 있었다. 이번에 한국데이터 엔지니어 모임 3회에 참여해 발표를 들었던 내용 중에 로컬 환경에서 KubernetesPodOperator 를 사용할 수 있게끔 로컬 Airflow on K8S 구축에 Vault 곁들이기 설명을 들었는데 그 블로그를 참조해서 테스트해보았다. 🚀 KIND(Kubernetes in Docker) 활용하기 Kubernetes In Docker는 Kubernetes 클러스터를 Docker 컨테이너 내에서 실행할 수 있게 해주는 도구다. 이를 활용해 빠르고 쉽게 로컬 환경에서 Kubernetes 클러스터를.. 2023. 9. 17.
[Airflow] LocalExecutor 사용 시 PostgreSQL 설치 (Mac M1) | LIM Airflow 를 설치하면 가장 기본적으로 설정되어 있는 executor 는 Sequential Executor 이다. 하지만 이 Executor는 병렬처리를 진행할 수 없기 때문에 잘 사용하지 않고 보통 LocalExecutor나 CeleryExecutor 를 많이 사용한다. 이 때 LocalExecutor 를 사용하기 위해 기존 sqlite 로 되어있던 것에서 postgresql 로 변경해야 한다. 설치 환경은 Mac M1 기준이다. Postgresql 설치 brew install postgresql psql 을 입력하여 접속 >> psql 여기서 잘 접속된다면 해피하지만 보통은 그런 일이 잘 없다. 다음과 같은 에러 발생! 에러 핸들링 psql: error: connection to server o.. 2023. 6. 4.
[Airflow] Dynamic Task Mapping 사용해보기(w/UseCase) | LIM Dynamic Task Mapping 은 DAG 작성자가 필요한 작업 수를 사전에 알 필요 없이 현재 데이터에 기반하여 실행 시점에 Task 를 생성하도록 한다. 이는 기존에 for loop 을 이용하여 Task 를 정의하는 것과 유사하지만 다른 점이 있다면 기존엔 for loop 을 Task 바깥에 정의하여 돌리는 방식이었다면 Dynamic Task Mapping 의 경우에는 스케줄러가 실행하기 때문에 미리 정의해둘 필요가 없다. 매핑된 작업의 출력을 기반으로 작업을 수행하고 이는 일반적으로 MapReduce 방식으로 알려져 있다. 기존 For Loop 을 이용하여 DAG 를 생성한 결과 from datetime import datetime from airflow import DAG from airfl.. 2023. 6. 3.
[Airflow] Sensor - ExternalTaskSensor | LIM Intro Airflow의 DAG안에 Task들 간의 의존성을 통해 Task의 실행 여부 및 순서를 결정할 수 있듯이 DAG 간에도 가능하다. 예로, DAG A의 마지막 TaskC의 실행이 성공으로 끝나면 DAG B의 Sensor가 이를 인지하고 Task D가 시작되는 이러한 흐름이다. ExternalTaskSensor의 구성요소 ✔️ task_id - 현재 dag( 위의 예시에서 Dag B )의 task 이름 ✔️ external_dag_id - sensing 하는 dag( 위의 예시에서 Dag A )의 이름 ✔️ external_task_id - sensing 하는 dag의 최종적인 task 이름( 위의 예시에서 Dag A - Task C ) ✔️ execution_date_fn - sensing 대.. 2023. 5. 12.
[Airflow] DAG Creation Boilerplate CLI 도구 생성 | LIM Airflow 관리 관련해서 쏘카 기술 블로그 글을 읽다가 반복되는 DAG 코드를 쉽게 작성하기 위해 CLI 도구를 만들었다는 것을 보고 우리 팀에도 적용되면 좋을 것 같아 구현해 보았다. 이 도구는 이럴 때 유용할 것 같다. DAG 작성자가 처음에 DAG 작성을 어떻게 할지 모를 때 팀 내 DAG Template을 공통되게 유지하고 싶을 때 우리 팀은 현재 이럴 때 이미 작성된 코드를 복붙 해서 쓰고 있었는데, 뼈대가 되는 코드를 어떤 걸 기준으로 삼을 건지가 명확하지 않았고 처음부터 다들 제각각 작성하고 있어서 코드가 통일되지 않았었다. 참고했던 쏘카 기술 블로그이다. https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.ht.. 2023. 1. 14.
반응형