들어가며
최근 들어, 팀 내에서도 그리고 글또에서도 새로운 환경에서 ETL 파이프라인을 구축해 볼 기회가 생김에 따라 Airflow를 이전보다는 조금 더 깊게 알아야 할 필요성이 생겼다. 이에 따라, Airflow를 공부하며 관련된 내용들을 정리해보는 중인데, 오늘은 나와 같은 초보자의 관점에서 Airflow를 테스트 환경을 구성하고 DAG을 만들고 실행하는 과정에서 몇 가지 기본적인 개념들을 다루어보고자 한다.
DAG란?
Airflow is a platform that lets you build and run workflows. A workflow is represented as a DAG (a Directed Acyclic Graph), and contains individual pieces of work called Tasks, arranged with dependencies and data flows taken into account.
- Architecture Overview -
공식 문서의 Architecture Overview문서에 따르면 Airflow는 workflows를 만들고 실행할 수 있는 플랫폼이고, 여기서 workflows라는 개념은 여러개의 Task들의 집합인 DAG라는 개념으로 표현된다. 이 DAG의 장점은 순서에 맞게 여러 작업(Task)들을 실행해서 원하는 결과를 얻을 수 있게 해준다는 것이다.
혹시, 이 글을 보는 분들이 드래곤볼?에 대해 알고 있는지 모르겠다. 죽은 사람을 살려내는 소원도 들어주는 7개의 구슬을 모으면서 발생하는 각종 스토리를 다룬 애니메이션인데, 여기에 보면 아래와 같이 두 캐릭터가 하나로 합체하는 퓨전이라는 신기한 장면이 나온다.
그냥 보면 얘들이 장난 치는 것처럼 보이는 것 같지만, 그 안에는 여러 동작들이 있고, 원하는 결과를 얻기 위해서는 정확한 순서와 동작을 지켜야 한다.
왜냐하면, 정확한 순서와 동작을 지키지 않으면, 사진1과 2처럼 잘못 합체 되기 때문이다. 그리고, 정확한 순서와 동작을 지키면 사진3처럼 굉장히 강력한 전투력을 지닌 오천크스로 재탄생한다. 손오천과 트랭크스가 퓨전하는 과정에서 각각의 동작들의 집합을 하나의 DAG으로 생각해보면 DAG은 아래와 같이 정의해볼 수 있다.
DAG?
- 여러 작업(동작)들의 집합
- 원하는 결과를 얻기 위해 진행하는 여러 작업들 간에 정확한 순서와 동작을 보장해줌
- 그리고, 내가 원하는 일자마다 그 작업들을 실행해줌
우선은 이정도로 정리하고 넘어가자.
그렇다면 DAG는 어떻게 원하는 순서와 동작을 보장하고 원하는 일자마다 작업을 실행해줄까? 아래 단락에서 Airflow를 같이 설치하고, DAG을 만들어보며 몇 가지 핵심적인 개념들에 대해 익혀보자.
Airflow 실행해보기
그전에, Airflow를 설치하고 실행해보자. pip로 설치하는 방법도 있고, docker로 설치하는 방법도 있는데 Airflow의 강의에 나오는 여러 예제들을 실행해보기 위해서는 docker로 설치하는 것을 추천한다. (pip로 설치 시 나중에 DB연결 등 다른 작업을 할 때 제한 사항이 있을 수 있음.)
<간략한 설명 - 자세한 설명은 Airflow공식 문서 에서>
0. docker desktop을 설치해준다. 정상적으로 설치 후, docker desktop을 실행해주면 아래와 같이 docker 버전을 확인할 수 있다.
1. Airflow 테스트에 사용할 폴더를 만들어 준다.
mkdir airflow
2. 해당 폴더 안에서 docker-compose 파일을 다운 받아준다.
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
파일을 다운 받으면 폴더에 docker-compose.yaml파일이 생기는데, 원래 파일안에는 postgres DB의 포트 번호가 없다.(왼쪽 사진) 본 글에서는 postgres DB에 데이터를 적재하고 당겨오는 과정을 설명하지는 않지만, 다른 예제들에서 종종 설명하므로 아래와 같이 docker-compose.yaml 파일 안에 5432포트 번호만 추가해주자(오른쪽 사진).
3. airflow폴더 안에서 아래 명령어를 실행한다. (.은 현재 디렉터리를 의미)
mkdir -p ./dags ./logs ./plugins ./config
여기서, dags폴더는 앞서 우리가 말한 DAG(=여러 작업들의 집합)이 모여있는 폴더이다. 그럼, Aiflow는 어떻게 dags안의 파일들을 읽어들일까? Architecture Overview 에는 아래와 같이 가장 간단한 버전의 Airflow 배포 과정이 설명되어 있다.
나는 위 그림에서 어떻게 Airflow로 DAG을 넘기는지 단번에 이해가 되지는 않았다.
위 도식을 건설사의 건설 작업 현장으로 다시 생각해보자. User는 건설사에 DAG files는 각종 작업 목록들에, Scheduler는 현장 감독관에, Metadata DB는 작업 관련 사항 일체에, Webserver는 공사현장에 비유해볼 수 있을 것 같다(Plugin은 당장 이해할 필요는 없어 가림막으로 가림) 내가 여기서 설명하고 싶은 내용은 각종 작업 목록(DAG files)들을 어떻게 현장감독관(Scheduler)한테 넘기는지이다.
docker-compose.yaml파일을 열어보면 위와 같이 docker 컨테이너와 나의 로컬 환경 저장소를 연결(마운트)하는 구문이 있다. 이걸 하나씩 뜯어보면 어떻게 각종 작업 목록들을 현장감독관에게 넘기는지 이해할 수 있다.
1. ${AIRFLOW_PROJ_DIR:-.} = AIRFLOW_PROJ_DIR라는 환경 변수에 폴더 경로를 선언했으면 선언한 경로를 사용하고 없으면 현재 경로(.)를 사용
2. ${AIRFLOW_PROJ_DIR:-.}/dags = ./dags 즉, 현재 경로 하위의 dags라는 경로를 사용
3. ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags = 도커 컨테이너 안의 폴더(/opt/airflow/dags)와 나의 로컬 폴더(./dags)를 연결 시킴
이렇게, 내가 로컬(./dags)에서 작성한 dag.py들이 docker container의 폴더 안에 저장되고 각종 작업 목록(dag files)들을 현장 감독관(scheduler)에게 넘기면 이 정보들이 Metadata DB에 저장된다. 그리고, Webserver에서 이를 실행하면 그때마다 logs폴더에 실행 기록들이 남는다.
그럼, Airflow 웹서버는 이러한 정보들을 참조하여 프론트 단에서 유저한테 작업 목록과 현황을 제공한다.
4. AIRFLOW_UID 환경변수 지정
echo -e "AIRFLOW_UID=$(id -u)" > .env
AIRFLOW 도커 컨테이너를 실행하는 사용자 UID를 지정하는 과정이다. docker-compose.yaml의 하단 부에 보면, 세팅하지 않을 시 아래와 같이 경고 메시지를 노출하도록 설정되어 있으니, 세팅해주고 넘어가자.
5. Airflow 사용을 위한 유저 생성 및 DB세팅을 해준다.
docker compose up airflow-init
6. 두두등장. Airflow docker 컨테이너를 띄워보자.
docker compose up
아래와 같이 나오면, 정상적으로 실행 된 것이다.
docker ps를 입력하면, 실행한 컨테이너들의 id가 나오는데, 여기서 webserver(=공사현장)의 컨테이너 아이디를 복사해주자.
그리고, 아래와 같은 명령어를 통해 airflow 웹서버 도커 컨테이너 안으로 들어가서, /opt/airflow/dags 폴더 안에 아래와 같이 로컬 .(=현재경로)/dags폴더 하위와 똑같은 파일들이 있는 것을 알 수 있다. (세부 항목 안의 파일들은 실리콘벨리 엔지니어와 함께하는 Apache Airflow 강의의 예제 파일들 입니다.)
docker exec -it b0ca8e85b0ca bash
http://0.0.0.0:8080/home을 크롬에서 실행 >> 유저네임/패스워드 airflow입력 하면, 아래와 같은 Airflow 웹서버에 접속할 수 있게 된다.
처음에는 Airflow 자체적으로 만들어져 있는 샘플 파일들이 한 가득 노출된다. 만약, 본인이 작성한 dag만 노출하고 싶으면, 아래와 같이 docker-compose.yaml파일 안에서 AIRFLOW__CORE__LOAD_EXAMPLES를 false로 바꿔주고 docker compose up을 실행하면 된다.
DAG 만들어보기
공식 문서에서는 with구문(context manager)으로 선언하는 방법, DAG클래스를 변수에 할당하는 방법, @dag 데코레이터를 사용하는 방법 세 가지를 안내해준다.
아래와 같이 DAG 클래스를 변수에 할당할 경우, Operator의 dag파라미터 안에 dag=my_dag처럼 매번 생성한 Dag을 계속 할당해주어야 하지만
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
with구문을 사용하면 그럴 필요가 없어 조금 더 편리하다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
Operator란?
그런데, 여기서 Operator라는 처음 보는 용어가 등장한다. DAG을 공사착공준비로 보면 여기에는 설계도면, 인허가, 인력 구하기 등 여러 Task들이 있을 것이다.
Operator는 이런 task들을 쉽게 만들 수 있는 templates라고 공식 문서에서 설명하고 있다.
Operators, predefined task templates that you can string together quickly to build most parts of your DAGs.
그 안에는 BashOperator, PythonOperator, SimpleHttpOperator등.. 다양한 template이 있다. 예를 들어, 아래와 같이 pythonOperator를 사용하면, 큰 어려움 없이 DAG안에서 python함수를 실행할 수 있게 된다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
print("Hello from PythonOperator")
with DAG(dag_id="sample_dag", start_date=datetime(2024, 10, 1), schedule='@daily') as dag:
python_task = PythonOperator(task_id="print_hello", python_callable=print_hello)
bash명령어를 사용하고 싶으면 BashOperator를, 단순한 API 호출 후 결과값을 받아보고 싶으면 SimpleHttpOperator를 사용하면 된다. 이외에 다양한 Operator들이 있으니, 공식 문서를 참조해보면 좋을 것 같다.
참고로 Airflow 웹 서버에는 dag파일 이름(*.py)이 아니라, with DAG안의 dag_id 파라미터안의 이름이 노출된다.
DAG안의 파라미터들
DAG안에는 여러 가지 파라미터들을 설정할 수 있다. 앞선 예시에서는 아래와 같이 dag_id="sample_dag", start_date=datetime(2024, 10, 1) 2가지 파라미터만 설정해주었다. 말 그대로, 웹서버에 노출되는 dag_id이름은 sample_dag이고, 24년 10월 1일 부터, schedule=@daily 매일 해당 dag을 실행하라는 의미이다.
이렇게 세팅을 하고 Pause/Unpause DAG버튼을 눌러, Unpause(=실행 상태)로 바꾸면, 13개가 동시에 실행되는 것을 알 수 있다.
그 이유는 Catchup이라는 기능 때문인데, 실행 일자(=이 글을 쓰는 시점)가 10/13일 이므로 start_date부터 실행 일자까지 일별로 dag을 한번씩 실행했기 때문이다.
이를 방지하기 위해서는 catchup을 False로 설정해주거나, start_date를 오늘로 설정하는 방법이 있다. (물론, 꼭 방지할 필요도 없고 catchup은 상당히 유용한 기능이다. 다만, DAG을 실행할 때마다 ./logs폴더 안에 기록들이 쌓이는데, 너무 많이 쌓이면 테스트 도중에 docker Container가 멈출 수도 있어 테스트 중에는 방지해주는 것이 좋다.)
# catchup을 False로 세팅
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
print("Hello from PythonOperator")
with DAG(
dag_id="sample_dag",
start_date=datetime(2024, 10, 1),
schedule="@daily",
catchup=False,
) as dag:
python_task = PythonOperator(task_id="print_hello", python_callable=print_hello)
# days_ago를 이용하여 실행 일자를 오늘로 지정, n이 0이면 오늘, n이 1이면 하루전 부터 오늘까지.. 이런식
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def print_hello():
print("Hello world")
with DAG(dag_id="sample_dag_v2", schedule='@daily', start_date=days_ago(n=0)) as dag:
python_task = PythonOperator(task_id="print_hello", python_callable=print_hello)
이외에도 schedule을 '@daily'가 아니라 '0 0 * * *'이렇게 cron으로 정해줄 수도 있다.
스케쥴링이나, catchup등에 대해 더 자세히 알고 싶으면 DAG Runs를 읽어보면 좋다.
task간 실행 순서 결정하기
그럼, 작업을 어떤 순서로 진행할지는 어떻게 결정할 수 있을까? 이를 위해 작업 간의 관계는 나타낼 수 있지만, 실제로는 아무런 동작을 하지 않는, EmptyOperator를 사용해서 DAG을 정의해보겠다.
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(dag_id="visualize_task_dependancy",
start_date=datetime(2024, 10, 1),
schedule='@daily',
catchup=False):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="third_task")
task4 = EmptyOperator(task_id="task4")
task5 = EmptyOperator(task_id="task5")
task1 >> task2 >> task3 >> task4 >> task5
task1다음에, task2를, .... task4다음에 task5를 마지막으로 실행하려면 위와 같이 >>으로 작업 간의 관계를 표시해주면 된다. https://0.0.0.0:8080에 접속후 visualize_task_dependancy를 클릭후, Graph를 눌러주면 아래와 같이 실행 순서가 표시된 것을 알 수 있다.
여기서도 한 가지 함정카드가 숨어 잇는데, task dependancy는 task파라미터 (task1, task2, task3,...)로 표현하지만 그 안의 task이름은 task_id의 파라미터 값으로 선언되어 있는 것을 알 수 있다. (task_id=third_task)
third_task의 입장에서 task1과 task2는 upstream이 되고, task4와 task5는 downstream이 된다. 즉, 아래와 같이 된다.
task2(third_task의 upstream) >> third_task >> task4(third_task의 downstream)
Airflow는 webserver안에서 유연한 데이터 재처리를 지원한다. third_task를 클릭 후 clear task를 누르면 Clear and Retry화면이 나온다.
여기서, Upstream을 체크하면 Affected Tasks가 3개가 되고, Clear를 누르면 third_task포함 상위 task(=이전에 실행되어야 할 task)들만 재실행 하는 것을 알 수 있다.
반대로, Downstream을 체크하고 Clear를 누르면 third_task포함 하위 task(=이후에 실행되어야 할 task)들만 재처리 하는 것을 알 수 있다.
마치며
이상, Airflow를 설치하고 DAG을 실행하는 과정에서 DAG, task, operator, task dependancy등에 대한 개념을 다루어봤다. 다음 시간에는 조금 더 자세한 예제를 다루어볼까 한다. 그럼 이만.
'딥상어동의 딥한 프로그래밍 > 엔지니어링' 카테고리의 다른 글
MY FIRST DBT - (1) Bigquery 연결해보기 + 폴더 구조에 대한 가벼운 이해 (5) | 2024.11.10 |
---|---|
우당탕탕 슬랙 메시지 저장기(2) - 게시글과 쓰레드 조회하기 (2) | 2023.05.21 |
우당탕탕 슬랙 메시지 저장기(1) - 슬랙 메시지 넌 누구냐? (8) | 2023.05.07 |
데이터 마트에서는 뭘 파나요?(feat. OLTP, OLAP) (14) | 2023.03.26 |
[Bigquery] 지난 며칠 간 Python과 연동하여 사용한 소감 (0) | 2023.01.18 |
제 블로그에 와주셔서 감사합니다! 다들 오늘 하루도 좋은 일 있으시길~~
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!