반응형 Airflow17 [Airflow] Connection 정보 Vault 이관 작업(feat.Plugin) | LIM 🧐 Vault 란 HashiCorp 사에서 만든 비밀정보, 즉 공개되면 안 되는 비밀번호나(DB 연결 정보), API 키, 토큰 등을 저장하고 관리하는 플랫폼이다. 👩💻 현재 조직 내에서 Airflow 관리 방안 지금 우리 팀은 데이터엔지니어 수가 많지 않아 Airflow 를 GCP의 managed service 인 Composer를 통해 관리하고 있다. Composer로 Airflow를 운영하면 Connection을 관리할 수 있는 방법이 기본적으로 2가지가 있다. Secret Manager 에 airflow prefix를 통한 저장 동일하게 airflow connection_id 를 통해 가져올 수 있다. 단순 Airflow UI에서 Connection 을 생성한다. 물론 API 로도 생성할 수 있.. 2023. 1. 13. [Airflow] Custom UI for Airflow | LIM 에어플로우는 Plugin 을 이용해서 Webserver 에 메뉴를 수정할 수 있다. 기본적으로 에어플로우를 설치 후 Webserver 를 띄우면 이렇게 생겼다. Airflow Version: 2.2.4 Python Version: 3.8 😩 AS-IS 회사 내에서 Airflow 를 개발/ 운영 을 나누어서 2개를 사용하고 있었는데 해당 UI 두개가 동일하여 현재 Airflow 가 Dev인지 Prod 인지 헷갈렸었기 때문에 Plugin 을 이용해 단순 메뉴를 추가함으로써 Dev/ Prod 를 구분하고자 했다. ✨ TO-BE 추가로 원하는 Template 을 생성해 UI로 expose 시킬 수도 있다. 현재는 단순 Text 만 보이게 해두었다. 📝 Plugin 을 통해 Webserver 에 Menu Item.. 2022. 12. 29. [Operator] SimpleHttpOperator | LIM Intro 현재 회사에서 만들어진 DAG 들을 보니 다른 서비스에 만들어진 API 요청 후 그 API 에 대해서 status code 를 확인하고 결과를 반환하는 Task 들이 너무 많았다. 예전에도 이러한 코드를 단순화시키기 위해 GetOperator, PostOperator 를 만들어 본 경험이 있었다. [Airflow] CustomOperator 생성 | LIM 회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같 amazelimi.tistory.com 예전에는 Operator 한 개에 여러 개의 기능을 담으려 하다 보니.. 2022. 12. 21. Airflow 특정 Task 재실행 | LIM Airflow를 이용할 때, 실패한 특정 task를 다시 실행하고 싶은 경우가 있다. 이전까지는.. 그냥 맨 처음부터 실행하는 방법만 있는 줄 알았다.. Task 가 적은 경우 괜찮지만 Task 도 많고 시간도 오래 걸리는 작업은... 하지만 특정 Task 가 실패 후 코드를 수정해서 업데이트 한 다음, 실패한 Task 만 다시 실행해서 올바르게 수행이 되었는지 확인하는 방법이 있다! 즉, Airflow는 DAG run 단위로 재수행 하는 방법, Task 단위로 재수행 하는 방법이 있는 것이다. 왜 이걸 이제야 찾아보고 정리하는 걸까..⭐️ Airflow UI 이용하는 방법 실패한 위 Task는 해당 DAG의 거의 마지막 Task 였다. 이 Task 가 계속 실패하고 테스트하면서 앞에 수많은 Task 들.. 2022. 12. 7. [Airflow] SFTPToGCSOperator 개념 및 사용방법 | LIM ✔️ [SFTP 개념] ssh의 파일 전송 버전이라고 생각하면 됩니다. ssh 방식을 이용하여 안전하게 암호화된 구간에서 ftp 기능을 이용할 수 있게 됩니다. ftp, ftps, sftp(ssh)에 관한 자세한 설명은 이 블로그를 참고했습니다. https://nhj12311.tistory.com/76 ftp, ftps, sftp(ssh) 개념 정리 IT를 전공하거나 아니면 실무를 접하다보면 이 놈의 ftp, ftps, sftp(ssh) 프로토콜에 대해서 이야길 하게 되거나 듣거나 사용하게 됩니다만 개념이라도 좀 정확하게 갈무리하고 접해야한다는 생각이 nhj12311.tistory.com ✔️ [SFTPToGCSOperator - SFTP to Google Cloud Storage Transfer Ope.. 2022. 11. 12. [Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM Airflow 내에서 이전 작업의 결과에 따라서 다음 작업이 달라야 할 때 Branch 를 통해 나눌 수 있다. 예를 들어서 조회하고자 하는 connection 이 있는 경우 -> 기존 값을 업데이트 connection 이 없는 경우 -> 새롭게 생성 나같은 경우 이렇게 사용하였다. 보통 데이터를 처리할 때 데이터가 있으면 전처리 수행, 없으면 가져오기 이렇게 많이들 사용하는 것 같다. 하지만 현재는 팀내에서 connection을 관리하는게 주 목적이어서 connection 관리를 위한 목적으로 BranchPythonOperator를 사용했다. 예시코드를 보자. 다음코드는 from airflow import DAG from airflow.decorators import task from airflow.o.. 2022. 7. 19. 이전 1 2 3 다음 반응형