본문 바로가기
반응형

airflow15

[Operator] SimpleHttpOperator | LIM Intro 현재 회사에서 만들어진 DAG 들을 보니 다른 서비스에 만들어진 API 요청 후 그 API 에 대해서 status code 를 확인하고 결과를 반환하는 Task 들이 너무 많았다. 예전에도 이러한 코드를 단순화시키기 위해 GetOperator, PostOperator 를 만들어 본 경험이 있었다. [Airflow] CustomOperator 생성 | LIM 회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같 amazelimi.tistory.com 예전에는 Operator 한 개에 여러 개의 기능을 담으려 하다 보니.. 2022. 12. 21.
Airflow 특정 Task 재실행 | LIM Airflow를 이용할 때, 실패한 특정 task를 다시 실행하고 싶은 경우가 있다. 이전까지는.. 그냥 맨 처음부터 실행하는 방법만 있는 줄 알았다.. Task 가 적은 경우 괜찮지만 Task 도 많고 시간도 오래 걸리는 작업은... 하지만 특정 Task 가 실패 후 코드를 수정해서 업데이트 한 다음, 실패한 Task 만 다시 실행해서 올바르게 수행이 되었는지 확인하는 방법이 있다! 즉, Airflow는 DAG run 단위로 재수행 하는 방법, Task 단위로 재수행 하는 방법이 있는 것이다. 왜 이걸 이제야 찾아보고 정리하는 걸까..⭐️ Airflow UI 이용하는 방법 실패한 위 Task는 해당 DAG의 거의 마지막 Task 였다. 이 Task 가 계속 실패하고 테스트하면서 앞에 수많은 Task 들.. 2022. 12. 7.
[Airflow] SFTPToGCSOperator 개념 및 사용방법 | LIM ✔️ [SFTP 개념] ssh의 파일 전송 버전이라고 생각하면 됩니다. ssh 방식을 이용하여 안전하게 암호화된 구간에서 ftp 기능을 이용할 수 있게 됩니다. ftp, ftps, sftp(ssh)에 관한 자세한 설명은 이 블로그를 참고했습니다. https://nhj12311.tistory.com/76 ftp, ftps, sftp(ssh) 개념 정리 IT를 전공하거나 아니면 실무를 접하다보면 이 놈의 ftp, ftps, sftp(ssh) 프로토콜에 대해서 이야길 하게 되거나 듣거나 사용하게 됩니다만 개념이라도 좀 정확하게 갈무리하고 접해야한다는 생각이 nhj12311.tistory.com ✔️ [SFTPToGCSOperator - SFTP to Google Cloud Storage Transfer Ope.. 2022. 11. 12.
[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM Airflow 내에서 이전 작업의 결과에 따라서 다음 작업이 달라야 할 때 Branch 를 통해 나눌 수 있다. 예를 들어서 조회하고자 하는 connection 이 있는 경우 -> 기존 값을 업데이트 connection 이 없는 경우 -> 새롭게 생성 나같은 경우 이렇게 사용하였다. 보통 데이터를 처리할 때 데이터가 있으면 전처리 수행, 없으면 가져오기 이렇게 많이들 사용하는 것 같다. 하지만 현재는 팀내에서 connection을 관리하는게 주 목적이어서 connection 관리를 위한 목적으로 BranchPythonOperator를 사용했다. 예시코드를 보자. 다음코드는 from airflow import DAG from airflow.decorators import task from airflow.o.. 2022. 7. 19.
[Airflow] Airflow Connection with Google Secret Manager | LIM 현재 회사에서 Airflow를 Google Composer(composer v1)로 사용하고 있다. 하지만 Cloud Composer내에 Version Upgrade가 아직 Beta version이라 완벽하게 진행되지 않고 롤백된 경험이 있다..😔 이때 새로운 환경을 만들어서 기존 환경에 있던 Connection들을 수동으로 모두 다시 기입했었는데 이러한 사태를 막고자 개인 로컬에 Connection 정보를 저장해두고 새로운 환경으로 이관할 때마다 API를 통해 이관하기로 했다. 그러나 알다시피 개인 로컬에 계정 정보를 저장한다는게 사실 안전하지 않기도 하고 누군가가 추가할때마다 로컬에서 정확하게 관리가 되어야 하기 때문에 관리적인 측면에서도 비효율적이었다. 어떻게 할까 찾아보던 와중 Google에 Se.. 2022. 6. 16.
[Airflow] CustomOperator 생성 | LIM 회사에서 task를 생성할 때 task가 단순 request만 요청하는 작업이 많아졌고 기존에는 requests.get()하고 return값을 처리하는 함수를 따로 만들었는데 이 함수가 간단하게 operator로 생성되면 좋을 것 같아서 CustomOperator 생성하는 방법을 찾아보게 되었다. airflow 공식 document와 BaseOperator, PythonOperator를 참조해서 만들었다. ✅ airflow 공식 document https://airflow.apache.org/docs/apache-airflow/2.2.3/howto/custom-operator.html airflow를 사용하면서 느끼는 거지만 공식 document 설명이 좀 불친절한 것 같다. 특히 원래 CustomDec.. 2022. 6. 12.
반응형