현재 회사에서 Airflow를 Google Composer(composer v1)로 사용하고 있다.
하지만 Cloud Composer내에 Version Upgrade가 아직 Beta version이라 완벽하게 진행되지
않고 롤백된 경험이 있다..😔
이때 새로운 환경을 만들어서 기존 환경에 있던 Connection들을 수동으로 모두 다시 기입했었는데 이러한 사태를 막고자 개인 로컬에 Connection 정보를 저장해두고 새로운 환경으로 이관할 때마다 API를 통해 이관하기로 했다.
그러나 알다시피 개인 로컬에 계정 정보를 저장한다는게 사실 안전하지 않기도 하고 누군가가 추가할때마다 로컬에서 정확하게 관리가 되어야 하기 때문에 관리적인 측면에서도 비효율적이었다.
어떻게 할까 찾아보던 와중 Google에 Secret Manager와 Cloud Composer내에 Connection 을 uri 형태로 저장할 수 있다는 공식 문서를 찾아보게 되었다.
https://cloud.google.com/composer/docs/secret-manager
📌 Secret Manager를 사용하기 전에 설정해야 하는 Airflow Configuration Option이다.
✔️ DAG Serialization이 만족되어야 한다.(Airflow 2.x 버전에서는 이 설정이 default인 것 같다.)
✔️ backend 설정
✔️ backend_kwargs 설정
- connection_prefix: Airflow내에서 사용할 connection prefix를 정의하면 된다. 반드시 secret manager에 저장하는 key값은 이 prefix로 시작해야 한다. (default: 'airflow-connections')
- sep: connection_prefix 와 conn_id를 분리시켜주는 것(default: '-')
- project_id: composer가 위치한 해당 project-id
{"project_id": "<project id>", "connections_prefix":"example-connections", "variables_prefix":"example-variables", "sep":"-"}
Airflow Connection의 prefix와 sep 만 정의해주면 뒤에 [prefix][sep][connection_id] 로
connection을 생성하면 된다.
📌 Connection 정보가 포함된 uri 생성
uri 형태를 만드는 방법은 Airflow 공식문서내에 정의되어 있다.
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#generating-connection-uri
>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
📌 Airflow DAG 내에서 SecretManager에 저장된 Connection 정보 가져오기
Airflow 내에서 Connection 정보를 가져올 때는 BaseHook 을 이용해 가져오면 된다.
Connection정보를 조회하는 순서는 이렇다.
정의한 connection_id로 SecretManager 에 해당하는 connection 조회
-> 없을 시 Airflow DB에 저장된 connection 조회
get_connection의 connection_id는 secret manager에 저장해놓은
[prefix][sep][connection_id] 에서 connection_id 만 적어주면 된다.
from airflow.hooks.base import BaseHook
hook = BaseHook()
mysql_connection_info = hook.get_connection('{connection_id}')
host = mysql_connection_info.host
user = mysql_connection_info.login
password = mysql_connection_info.get_password()
db = mysql_connection_info.schema
connection 정보를 SecretManager를 이용해 관리하게 되면 새로운 환경이 만들어졌을 때 옮겨야 되는 일이 없기 때문에 password를 따로 관리할 필요가 없어서 더 안전해진다.
앞으로 팀원들끼리 어떤 connection을 생성했는지에 대한 history 만 서로 잘 공유되면 될 것 같다.
'Airflow' 카테고리의 다른 글
[Airflow] SFTPToGCSOperator 개념 및 사용방법 | LIM (0) | 2022.11.12 |
---|---|
[Airflow] Branch를 이용해 상황에 맞는 task 실행(BranchPythonOperator) | LIM (0) | 2022.07.19 |
[Airflow] CustomOperator 생성 | LIM (0) | 2022.06.12 |
Mac Airflow local 설치 (0) | 2022.05.14 |
[Airflow] Datetime (feat. 자동실행 방지) (0) | 2022.05.14 |
댓글