핵심 내용
안녕하세요. Airflow로 데이터 적재 파이프라인을 만드는 튜토리얼을 만들어봤습니다. 사이드 프로젝트로 데이터를 수집하고, 수집한 내용을 DB에 저장하고 활용해보려는 분들께 도움이 될 것 같습니다. 본격적으로 시작하기 전에 몇 가지 재료가 필요합니다. 우선, 서버는 GCP를 이용하여 구축했습니다. IDE는 GCP에서 구축한 Jupyter lab서버를 이용했고, MySQL서버, Airflow서버 모두 GCP 우분투 서버에 설치했습니다.
https://gibles-deepmind.tistory.com/116?category=954919
https://gibles-deepmind.tistory.com/129?category=954919
https://gibles-deepmind.tistory.com/131?category=954919
https://gibles-deepmind.tistory.com/132?category=954919
각 내용들을 순차적으로 따라해보시면 본 튜토리얼에 필요한 재료들을 모두? 수집해보실 수 있을 겁니다.
본 글에서 다루는 내용은 다음과 같습니다.(스압주의)
1. 튜토리얼을 위한 몇 가지 기본적인 Airflow 개념
- DAG, Operator, 시간 개념
2. DAG테스트 해보기
- Airflow CLI설정, MySQL Connection생성하기, 테스트 코드, Execution_date
3. 서울시 지하철호선별 역별 승하차 인원 정보 API 사용법
- api요청 샘플 코드, 전처리, 사용시 주의점
4. 데이터 적재하기
- 스키마 생성, DAG파라미터 구성, 중복 일자 데이터 제거 (MySqlOperator), API요청(SimpleHttpOperator), 전처리후 MySQL적재(PythonOperator)
튜토리얼을 위한 몇 가지 기본적인 Airflow개념
공홈의 concepts를 참고했습니다.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html
DAG & Operator
가장 중요한 것은? DAG라는 개념입니다. DAG란 어떤 작업들의 집합이라고 할 수 있는데요. 여기서 중요한 건 이 작업들 간의 실행 순서를 정할 수 있다는 것인데요. 아래와 같이 각 작업들 간에 화살표가 있습니다. 그런데, 화살표의 방향이 한 방향인게 혹시 눈에 띄시나요? 그래서, DAG를 번역할 때 "비순환"이라는 단어를 사용하는 것 같습니다. 어쨌든, DAG는 어떤 작업들의 집합이고 작업들 간의 실행 순서를 인과적으로 설정할 수 있습니다. 그리고, DAG를 만들면서 DAG에 id를 부여하게 되는데요. 이 id는 구동하고 있는 airflow서버 내에서 "유일한" 값을 가집니다. 또한, 작업은 blarblarOperator함수를 통해 진행하며, 각 작업에도 task_id가 있습니다. 아래 그림안의 문자들은 task_id입니다.
그렇다면, DAG는 어떻게 생성할까요? 공홈 가이드에 따르면 크게 세 가지 방법이 있습니다.
1.with구문 활용 / 2.데코레이터 활용 / 3.DAG함수 활용
본 글에서는 3번 DAG함수를 이용하여 튜토리얼을 진행합니다.
공 홈의 예시 코드를 그대로 가져왔습니다. 안에 파라미터는 신경쓰지 마시고, 구조만 보시면 됩니다. 우선, DAG를 먼저 생성하고 그리고 하위에 EmptyOperator메서드가 보이시나요? 이 Operator가 task가 됩니다. 즉, 하나의 dag_id에 여러 Operator를 만들 수 있다는 뜻입니다. 여기서, dag_id가 airflow ui상에 표시됩니다. 그렇기 때문에 dag_id를 잘 적어주어야 합니다. 이렇게 DAG함수를 통해 dag를 만들고, Operator를 통해 하위 task들을 설정할 수 있습니다. 앞으로 나올 Operator어쩌고 하는 친구들은 = task다 라고 생각하시면 됩니다. 예를 들어, PythonOperator는 python task를 MysqlOperator는 mysql task를 다룹니다. 또한, Operator에는 task_id를 지정해주어야 합니다. 상기 그림 박스안 문자들이 task_id입니다.
from airflow import DAG
my_dag = DAG(dag_id= "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="@daily", catchup=False)
op = EmptyOperator(task_id="task", dag=my_dag)
시간 개념
우선, 언제부터 시작할지 시작시간이 필요합니다. 시작 시간은 start_date이며, 아래와 같이 default_args에 만들어두거나 혹은 DAG함수 내에 start_date파라미터로 직접 선언할 수도 있습니다. 그리고, 타입은 datatime입니다.
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 1, 1, 11, 0),
'schedule_interval': '0 11 * * *',
}
dag = DAG(
dag_id= """ddongmin_220612_pipeline_subway_passenger_cnt""",
default_args= default_args,
# start_date= datetime(2020, 1, 1, 11, 0),
)
다음으로, schedule_interval이 있습니다. 며칠, 몇 시간, 혹은 몇 분 간격으로 배치를 돌릴 것이냐?를 결정하는 파라미터입니다. * * * * *이 의미하는 바는 아래 글을 살펴보시길. 0 11 * * *은 매일 11시에 배치를 돌리겠다는 뜻입니다.
https://www.wikiwand.com/ko/Cron
crontab의 표현법을 이용하거나 아니면 아래와 같이 데코레이터를 변수에 할당할 수도 있습니다.
그리고, start_date와 마찬가지로 end_date도 datetime으로 선언할 수 있는데요. 작성하지 않으면 따로 end_date기간을 두지 않고 매번 interval 기간이 될 때마다 배치를 돌리겠다는 의미입니다.
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 1, 1, 11, 0),
'schedule_interval': '0 11 * * *',
# 'end_data': datetime(2020, 2, 1, 11, 0),
}
dag = DAG(
dag_id= """ddongmin_220612_pipeline_subway_passenger_cnt""",
default_args= default_args,
# start_date= datetime(2020, 1, 1, 11, 0),
)
이외에 execution_date가 있는데요. 단순히 실행시간?이라고 생각하시면 안됩니다. 이건 뒤에서 DAG를 직접 생성할 때 다시 한번 더 말씀 드리겠습니다. 마지막으로 Airflow의 time zone은 UTC 기준입니다. 즉, 0 11 * * * 를 입력하면 KST기준 오후 8시에 실행된다고 할 수 있습니다. 왜냐하면 UTC + 9시간 = KST이기 때문입니다. timezone을 Asia/Seoul로 바꾸는 방법도 있으나, 이 글에서는 다루지 않겠습니다.
DAG테스트해보기
cli상에서 DAG를 테스트 해보겠습니다. cli환경에서 airflow 명령어를 실행할 수 있도록 공홈 가이드에 따라 몇 가지 작업을 해줍니다.
CLI설정
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html
cd airflow-docker
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/airflow.sh'
chmod +x airflow.sh
도커 컴포즈 파일이 있는 폴더에 다운 받아줍니다.
실행은 다음과 같이 해주면 됩니다.
./airflow.sh bash
그럼 이런 화면이 나오게 됩니다. exit명령어를 이용하면 나갈 수 있습니다.
jupyterlab 서버에서 dag작성하기
이건 진짜 별건 없고 jupyterlab서버 상에서 DAG폴더로 이동해 코드를 작성해 주면 됩니다.
사실, 이건 jupyterlab서버 설치도 필요없고 jupyter notebook설치 후 dags폴더로 이동해주면 됩니다. 왜냐하면, dags폴더에 있는 파일이 airflow서버로 마운트(동기화)되기 때문입니다.
아래 설치 글에 관련된 내용이 나옵니다.
https://gibles-deepmind.tistory.com/129?category=954919
MySQL Connection생성하기
Mysql로 테스트를 하기 위해 Ariflow Admin페이지에서 Connection을 추가해줍니다.
Connection_id를 파라미터에 반복적으로 작성할 것이기 때문에 가독성이 좋은 이름으로 작성해줍니다. 그리고, Connection Type을 MySQL로 선택해줍니다. Host에는 GCP외부 고정 IP를 입력해줍니다. GCP에서 고정 IP를 만드는 방법은 아래 글에서 살펴보실 수 있습니다.
https://gibles-deepmind.tistory.com/131?category=954919
마지막으로 포트 번호 3306을 입력해줍니다.
아래 글을 보시면 MySQL서버를 세팅하실 수 있습니다.
https://gibles-deepmind.tistory.com/132
테스트 코드
테스트 코드는 불곰님의 글을 참고 했습니다.
https://brownbears.tistory.com/590
# task 테스트
airflow tasks test dag_id task_id execution_date
# dag 테스트
airflow dags test dag_id execution_date
위와 같이 테스크 단위로 테스트를 하거나 덱 단위로 테스트를 할 수 있습니다. execution_date가 뭔지는 모르겠으나, 우선 그냥 한번 실행해봅시다.
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
}
dag = DAG(
dag_id= 'ddongmin_220611_test_mysql_connect',
start_date= datetime(2022, 6, 8, 2, 0),
end_date= datetime(2022, 6, 12, 2, 0),
schedule_interval= '* 2 * * *',
default_args= default_args
)
# 테이블 생성
qr_create_test_table = r"""
CREATE TABLE airflow_test_db.test_220605 (
col1 VARCHAR(10) NOT NULL,
col2 INT NOT NULL
) ;
"""
# 앞서 만든 mysql_connection으로 연결
mysql_task_create_table = MySqlOperator(
task_id= 'mysql_create_table_test',
mysql_conn_id= 'mysql_connection',
sql= qr_create_test_table,
dag= dag,
)
# airflow taks test dag_id task_id excution_date
airflow tasks test ddongmin_220611_test_mysql_connect mysql_create_table_test 2022-06-10
불곰님의 설명대로 파라미터를 차례대로 입력해줍니다. 성공하면 아래와 같은 SUCCESS 문구를 볼 수 있습니다.
Execution_date
Execution_date란? 정확한 실행 시간이 아니고, logical_date입니다.
공홈의 설명에 따르면 왜 logical date라고 생각해야 하는지 그 예시를 들어주고 있는데요. 만약, 16/02/20에 전일 배치를 돌린다면 execution_date은 논리적으로 2016-02-19가 되어야 하기 때문에 실제 실행 시간이 아니라 논리적으로 들어맞는 시간이라는 의미를 가집니다.
https://m.blog.naver.com/gyrbsdl18/221561318823
좀 더 알고싶은 분들은 위 글을 한번 살펴보시면 좋을 것 같습니다.
서울시 지하철호선별 역별 승하차 인원 정보 API 사용법
http://data.seoul.go.kr/dataList/OA-12914/S/1/datasetView.do
해당 링크로 우선 들어가줍니다.
쭉쭉, 내려가서 미리보기에서 Open API를 클릭하면 인증키 신청화면이 보입니다. 인증키를 신청후 신청한 인증키를 복사합니다. 따로 대기 시간 없이 바로 사용 가능했습니다.
인증키 사용 가이드 페이집니다.
http://data.seoul.go.kr/together/guide/useGuide.do#sample-code-4
샘플 예제 코드입니다. 아래 페이지를 참조했습니다.
https://gunn.kim/post/2020-06-13-seoul-metro-data-openapi/
import requests
import json
import pandas as pd
# 최대 1,000건까지 가능
url = 'http://openapi.seoul.go.kr:8088/인증키요기에/json/CardSubwayStatsNew/1/1000/20220608'
response = requests.get(url)
data_sample = pd.json_normalize(json.loads(response.text)['CardSubwayStatsNew']['row'])
json_normalize를 이용하면 아래와 같이 출력됩니다.
아래와 같은 스키마로 되어 있습니다.
USE_DT: 실제 날짜
LINE_NUM: 호선
SUB_STA_NM: 역이름
RIDE_PASGR_NUM: 탑승객
ALIGHT_PASGR_NUM: 하차고객
WORK_DT: 출력 날짜
여기서 잠깐, 주의할점이 몇 가지 있습니다.
1. 신분당선은 없습니다.
2. 매일 오후 6시 40분 경 3일전의 데이터를 적재합니다.
따라서, 저는 배치 시간을 넉넉하게 오후 8시로 잡았습니다.
DAG생성 - 데이터 적재하기
적재는 크게 세 가지 파트로 나누어 진행하였습니다. 즉, 하나의 DAG에 3개의 task를 두었습니다.
1. 데이터 중복이 없도록 일자 조회후 동일 날짜 중복 데이터가 있는 경우 삭제
2. API 요청
3. 요청한 API 데이터를 변환하여 MySQL에 적재
스키마 생성
본격적으로 시작하기 전에 저는 MySQL Server에 따로 테이블을 생성하였습니다. 스키마는 json파일과 동일합니다.
CREATE TABLE airflow_test_db.subway_table (
USE_DT DATE NOT NULL,
LINE_NUM VARCHAR(100),
SUB_STA_NM VARCHAR(100),
RIDE_PASGR_NUM INT,
ALIGHT_PASGR_NUM INT,
WORK_DT DATE NOT NULL
) default character set utf8 collate utf8_general_ci;
DAG파라미터 구성
20년 1월 1일 11시 + 9시간 = 20시에 시작하여 매일 오후8시에 실행되도록 파라미터를 구성하였습니다.
import json
from datetime import datetime
from airflow import DAG, macros
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
import sqlalchemy
import pandas as pd
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 1, 1, 11, 0),
'schedule_interval': '0 11 * * *',
}
dag = DAG(
dag_id= """ddongmin_220612_pipeline_subway_passenger_cnt""",
default_args= default_args,
)
중복 일자 데이터 제거 (MySqlOperator)
일자별로 동일한 데이터를 중복으로 적재하는 일이 없도록 하는 task를 먼저 만들어 줍니다. 예를 들어, 특정 일자에 오류가 생겨 데이터를 다시 적재해야 하는데 이때 delete 작업을 먼저 지정해두면 데이터가 중복 적재 되는 일을 방지해줍니다.
# DELETE WHERE
sql_task_delete_table = MySqlOperator(
task_id= 't1_check_duplicate',
mysql_conn_id= 'mysql_connection',
sql= "DELETE FROM airflow_test_db.subway_table WHERE USE_DT = '{date}'".format(
date='{{ (execution_date - macros.timedelta(days=3)).strftime("%Y-%m-%d") }}'),
dag= dag,
)
https://stackoverflow.com/questions/52717043/using-ds-add-and-macros-with-airflow
여기서 잠깐, {{ (execution_date - macros.timedelta(days=3)).strftime("%Y-%m-%d") }} 이렇게 데이터 파라미터를 설정해줬는데요. 대충, -macros.timedelta(days=3)만 보면 3일을 빼준 것을 알 수 있습니다. 왜냐하면, 2. 매일 오후 6시 40분 경 3일전의 데이터를 적재합니다. 이렇게 3일전의 데이터가 적재되기 때문인데요. execution_date를 논리에 맞게 다시 구성해줬다고 생각하시면 될 것 같습니다.
macros설명은 공홈 가이드를 참조 부탁드립니다.
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
API요청(SimpleHttpOperator)
(해당 블로그를 참조했습니다.)
https://velog.io/@jjongbumeee/Airflow4
다음으로, API로 데이터를 요청 후 저장해봅시다. 그전에 API 호출 주소를 저장해야 합니다. 이렇게 세팅해주시면 됩니다.
그러면 http_conn_id + endpoint = url이 됩니다. 여기서 endpoint는 두 가지 규칙을 충족해야 하는데요
1. yyyymmdd형식일것
2. 집계일자보다 3일 전 일 것
그래서 아래와 같이 endpoint를 구성하였습니다.
endpoint= '{date}'.format(
date='{{ (execution_date - macros.timedelta(days=3)).strftime("%Y%m%d") }}'),
get_api = SimpleHttpOperator(
task_id= 't2_get_request_response',
http_conn_id= 'http_subway',
endpoint= '{date}'.format(
date='{{ (execution_date - macros.timedelta(days=3)).strftime("%Y%m%d") }}'),
method= 'GET',
response_filter= lambda response: json.loads(response.text),
log_response= True,
dag= dag,
)
전처리후 MySQL적재(PythonOperator)
다음으로, 데이터 전처리 후 pd.to_sql을 이용하여 MySQL서버에 적재해줍니다.
# python 실행 함수
def dataframe_to_sql(ti):
# json preprocessing
subway_json= ti.xcom_pull(task_ids=['t2_get_request_response'])
subway_json_decode= subway_json[0]['CardSubwayStatsNew']['row']
subway_json_normalize= pd.json_normalize(subway_json_decode)
# db info
user_name= ''
pass_my = ''
host_my = ''
db_name = ''
# to_sql
connection= sqlalchemy.create_engine(f"mysql+mysqlconnector://{user_name}:{pass_my}@{host_my}/{db_name}")
table_name= 'subway_table'
subway_json_normalize.to_sql(name = table_name
,con = connection
,index = False
,if_exists = 'append')
# python_operator
df_to_sql = PythonOperator(
task_id= 't3_pd_to_sql',
python_callable= dataframe_to_sql,
dag= dag
)
아래와 같이 작업 순서도 지정해줍니다.
sql_task_delete_table >> get_api >> df_to_sql
여기서 xcom_pull을 이용해 이전 단계의 작업 내용을 받아올 수 있습니다. ti는 task instance의 약자이구요.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
마무리
다적고 나니 스크롤이 상당히 기네요.. 저 같은 초보자 분이 보신다고 생각하고 최대한 상세하게 적어 봤습니다. 위 과정까지 완료하면 아래와 같이 DAG이 생성됩니다. 맨 왼쪽 버튼을 눌러주면 ON이 되구요
다들 초록불이 들어오는 걸 보니 정상적으로 잘 실행되고 있네요.
Mysql workbench에서 쿼리를 날려보니 데이터도 정상적으로 잘 들어와있네요!
이상입니다. 긴 글 읽어주셔서 감사합니다.
'딥상어동의 딥한 프로그래밍 > 엔지니어링' 카테고리의 다른 글
오라클 클라우드(1) - 계정 생성, 공짜를 누리려는 자 그 무게를 견뎌라 (0) | 2022.08.07 |
---|---|
Ganglia Web Interface 관련 링크 모음 (0) | 2022.07.29 |
[ubuntu] Mysql 원격 접속 허용후 Python(pd.to_sql)과 연동해보기 (0) | 2022.06.04 |
[ubuntu]에서 jupyterlab background 서버 구축하기 (0) | 2022.05.24 |
[Airflow] 리눅스 도커를 이용한 설치 삽질기 (1) | 2022.05.10 |
제 블로그에 와주셔서 감사합니다! 다들 오늘 하루도 좋은 일 있으시길~~
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!