✔️ [SFTP 개념]
ssh의 파일 전송 버전이라고 생각하면 됩니다. ssh 방식을 이용하여 안전하게 암호화된 구간에서 ftp 기능을 이용할 수 있게 됩니다.
ftp, ftps, sftp(ssh)에 관한 자세한 설명은 이 블로그를 참고했습니다.
https://nhj12311.tistory.com/76
✔️ [SFTPToGCSOperator - SFTP to Google Cloud Storage Transfer Operator]
Airflow 내에서 FTP 서버에 있는 파일을 GCS에 저장하는 Operator입니다.
사용법은 다음과 같습니다.
- task_id = task_id
- sftp_conn_id = sftp connection 정보를 이용하여 생성한 connection_id 를 적어주면 됩니다.
- source_path = sftp 서버 내부에 파일 위치입니다.
- destination_path = gcs 에 저장할 파일 경로입니다.
- destination_bucket = gcs bucket 이름을 적어주면 됩니다.
- move_object(default=False)
- Linux command 의 mv 명령어라고 생각하면 됩니다.
- True로 설정하면 GCS로 파일을 옮긴 후 FTP 서버 내 파일을 삭제합니다.
📝 실행 전 준비사항
- SFTPToGCSOperator 를 사용하기 위해선 먼저 library를 설치해야 합니다.
💡 pip3 install apache-airflow-providers-ftp
- 접속할 sftp 정보를 airflow connection 에 생성해두어야 합니다. sftp airflow connection을 생성하는 방법은 두 가지입니다.
저 같은 경우 1번 방법을 사용하여 connection_id를 생성해주었습니다.
참고) sftp의 기본 port는 22로 설정되어 있습니다.
이 두가지만 하면 설정은 끝입니다!
✔️[SFTPToGCSOperator 내부 작동]
전 새로운 Operator를 써볼 때 내부가 어떻게 구현되었는지 보는 걸 좋아하는데 이번에도 어떻게 GCS로 파일을 전달하는지 보고자 살펴보았습니다.
https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
일단 기본적으로 sftp_hook과 gcs_hook 을 이용하여 데이터를 전달합니다.
Airflow Hook은 외부 플랫폼, 데이터베이스에 접근할 수 있도록 만든 인터페이스입니다. 대부분 Operator 가 실행되기 전에 Hook을 통해 통신합니다.
파일을 주고받을 때 파일을 잠시 어디에 저장하는지 궁금했는데 python의 tempfile.NamedTemporaryFile 라이브러리를 사용하고 있었습니다.
tempfile 라이브러리 내에 여러가지 옵션이 있는데 그 중 tempfile.NamedTemporaryFile 은 이름이 있는 임시 파일을 반환하고 임시 파일을 자동 삭제합니다.
이상 SFTPToGCSOperator 를 사용하면서 정리한 글이었습니다.
'Airflow' 카테고리의 다른 글
[Operator] SimpleHttpOperator | LIM (0) | 2022.12.21 |
---|---|
Airflow 특정 Task 재실행 | LIM (0) | 2022.12.07 |
[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM (0) | 2022.07.19 |
[Airflow] Airflow Connection with Google Secret Manager | LIM (0) | 2022.06.16 |
[Airflow] CustomOperator 생성 | LIM (0) | 2022.06.12 |
댓글