지난 포스팅에서는 Airflow의 기본적인 개념과 DAG을 작성하는 틀에 대해 정리해보았다.
이번에는 Airflow에서 쓰이는 각종 날짜, 시간 개념을 이야기해보려고 하는데,
실제로 DAG을 만들다보면 가장 헷갈리는 부분이기 때문이다.
의도한대로 내 DAG이 잘 돌아가게 하려면 이 부분에 대해 제대로 이해하는 것이 필수적이다.
이 글에서는 엔지니어가 아닌 Airflow 초보 활용자의 관점에서 이해한 내용을 정리해보려고 한다.
1. Timezone
Timezone: Airflow는 기본적으로 UTC 기준이다
아무래도 처음 만나는 장벽은 TimeZone 이슈인 것 같다. Airflow는 기본적으로 UTC를 사용하는데, UTC는 KST 보다 9시간 느리다(KST = UTC + 9 HOUR). start_date과 schedule_interval, execution_date까지 고려하게 되면 UTC로 코드를 작성하고 DAG을 사용하기가 무척이나 헷갈리게 되는 것이다.
KST로 변환하기
pendulum 라이브러리를 사용해서 TimeZone을 KST로 지정할 수 있다.
아래의 예시 코드처럼 DAG을 정의할 때 start_date에 pendulum으로 만든 타임존 KST를 지정할 수 있다. (end_date이 있을 경우에도 같은 방법으로 지정한다.)
•
타임존: KST = pendulum.timezone("Asia/Seoul")
•
start_time에 타임존 적용: start_date = datetime(2023, 3, 16, tzinfo=KST)
import pendulum
KST = pendulum.timezone("Asia/Seoul")
# DAG default arguments
default_args = {
'owner': '{dag_owner}',
'depends_on_past': False,
'email': ['{dag_owner@example.com}'],
'email_on_failure': False,
'email_on_retry': False,
'email_on_success': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
dag_id='{dag_id}',
description='{description}',
tags=['{tag1}', '{tag2}'],
default_args=default_args,
max_active_runs=1,
start_date=datetime(2023, 3, 16, tzinfo=KST), # KST 적용
schedule_interval="5 10 * * 1", # 분 시 일 월 요일
catchup=True # start_date부터 backfill 여부
)
Python
복사
이렇게 하면 한국 시간으로 코드를 작성하고, Web UI에서도 KST 시간을 확인할 수 있다. 하지만 Airflow의 기본시간은 여전히 UTC고 KST 시간을 참고할 수 있는 정도로 이해해야할 것 같다.
Web UI에서 확인할 수 있는 KST 실행시간 예시
Web UI의 Calendar상에서도 UTC 기준의 날짜로 찍히기 때문에, DAG 실행 시간이 오전 9시 이전이라면 UTC 시간을 기준으로 -1일의 날짜에 실행 기록이 찍히게 되어 헷갈릴 수 있다.
실제 아래 예시를 보면 매주 월요일 아침 실행되는 weekly DAG인데, 어느 시점에 실행시간을 9시 이후에서 9시 이전으로 변경하자, Calendar에서 일요일에 실행기록이 찍히게 된 것을 볼 수 있다.
2. start_date
•
start_date: DAG의 스케줄이 시작되는 날짜
◦
execution_date과 같이 데이터 interval의 시작점을 의미하는데, 이 의미는 execution_date 파트에서 상세히 살펴보자.
•
과거의 날짜를 start_date으로 지정하면, 그 때부터 현재까지 job이 생성됨 (end_date 미지정시)
•
정의 방법: DAG을 정의할 때 start_date 파라미터에 지정
◦
[참고] end_date
▪
end_date을 지정하면 end_date - 1 interval 까지 job이 생성되므로,
원하는 마지막 execution_date + 1 interval로 지정한다.
▪
ex) 2023-12-31 데이터까지 DAG을 실행하고 싶다면, end_date은 2024-01-01로 지정
# DAG 정의
dag = DAG(
dag_id='{dag_id}',
description='{description}',
tags=['{tag1}', '{tag2}'],
default_args=default_args,
max_active_runs=1,
start_date=datetime(2023, 3, 16, tzinfo=KST), # KST 적용
# end_date=datetime(2024, 1, 1, tzinfo=KST), # 마지막 execution_date + 1 interval로 지정
schedule_interval="5 10 * * 1", # 분 시 일 월 요일
catchup=True # start_date부터 backfill 여부
)
Python
복사
•
주의점
◦
동적(dynamic) start_date을 사용하지 말 것
▪
예를 들어, start_date=dt.datetime.today() 를 사용하게되면 Airflow는 start_date + 1 interval 시점부터 실행되기 때문에 DAG이 계속해서 미래의 날짜로 이동하며 영원히(?) 실행되지 않게 된다.
◦
start_date의 날짜를 최초 DAG을 생성한 후에 수정하지 말 것. 수정이 필요한 경우 dag id를 바꾸어 올린다. (suffix _v.# 를 붙이는 형태로 버전업하는 방법을 쓸 수 있다)
•
catchup: 과거 날짜의 start_date을 과거의 날짜로 설정할 경우, catchup=True라면 생성된 과거의 task를 차례로 실행한다(backfill). Backfill을 원치 않을 경우 catchup=False로 지정
3. schedule_interval
•
schedule_interval: DAG의 실행주기를 결정
•
cron 형식, 혹은 미리 정의된 문자열 사용 가능
•
start_date에서 설정한 timezone이 적용됨
•
사용 방법: DAG을 정의할 때 schedule_interval 파라미터에 지정
# DAG 정의
dag = DAG(
dag_id='{dag_id}',
description='{description}',
tags=['{tag1}', '{tag2}'],
default_args=default_args,
max_active_runs=1,
start_date=datetime(2023, 3, 16, tzinfo=KST), # KST 적용
# end_date=datetime(2024, 1, 1, tzinfo=KST), # 마지막 execution_date + 1 interval로 지정
schedule_interval="5 10 * * 1", # 분 시 일 월 요일 -> 매주 월요일 10:05에 실행
catchup=True # start_date부터 backfill 여부
)
Python
복사
4. execution_date
execution_date
execution_date은 Airflow의 활용에 있어 아주 핵심적인 부분이면서도 헷갈리는 개념 중 하나이다.
execution_date은 실제로 task가 실행되는 시간이 아니라, 처리하려는 데이터의 시작 시점을 나타낸다.
이해를 위해 Time window의 개념을 알아보자.
Time window 개념
Daily로 원천 데이터를 집계해서 특정 마트에 저장하는 ETL 작업을 한다고 가정해보자.
•
2023-01-01의 데이터를 집계한다는 것은 2023-01-01 00시부터 2023-01-02 00시까지 발생한 데이터를 집계한다는 의미일 것이다. (Time window)
•
실제 task는 해당 기간의 데이터가 모두 쌓인 2023-01-02 00시 이후에 실행된다.
•
여기서 이 DAG의 execution_date은 2023-01-01이다. 집계하려는 데이터 time window의 시작점, 집계하려는 데이터의 기준점이라고 생각해야하는 것이다. 실제로 task가 실행되는 시간과는 다르다.
실제 상황에서는 schedule_interval에 따라 다음의 패턴으로 batch job이 실행될 것이다.
•
daily batch면 execution_date + 1일에 실제로 task가 실행
•
weekly batch라면 execution_date + 1주에 실제 task가 실행
•
monthly batch라면 execution_date + 1개월에 실제 task가 실행
start_date & execution_date
•
start_date과 execution_date 모두 처리하려는 데이터의 시작 지점을 가리킨다.
•
하지만 start_date은 처음 DAG에 정의된 것에서 변하지 않는데 반해, execution_date은 task에 parameter로 전달되어 DAG이 실행될 때마다 변하게 된다.
execution_date 활용
execution_date은 Python datetime 객체로 제공되며, 아래와 같이 다양한 방식으로 활용 가능하다.
이를 sensor operator에 파라미터로 전달하여 센싱하고자하는 테이블의 파티션 날짜로 사용하거나, 쿼리를 실행하는 operator에 파라미터로 전달하여 원하는 날짜에 대한 쿼리를 실행할 수도 있다.
# 직접 사용
'{{ execution_date.strftime("%Y-%m-%d") }}'
# ds 매크로 사용 (YYYY-MM-DD 형식)
'{{ ds }}'
# timedelta를 이용한 날짜 계산
'{{ (execution_date - macros.timedelta(days=5)).strftime("%Y-%m-%d") }}'
# macros를 이용한 날짜 계산
'{{ macros.ds_add(ds, -5) }}'
Python
복사
이상 Airflow를 사용하기 위해 기본적으로 알아야하는 날짜, 시간 개념에 대해 이렇게 정리해봤다.
오늘은 여기까지
.
참고 자료