Airflow DAG을 만들때, 특정 조건에 따라 특정 task의 실행 여부를 결정하거나 실행할 task를 결정하고 싶을 때가 있다.
예를 들어 나는 execution date의 요일이나 날짜에 따라 특정 task만 실행되도록 하는 DAG을 실제로 사용하고 있다. 이렇게 하면, 살짝만 내용이 다른 daily, weekly, monthly task를 각각의 DAG이 아닌 하나의 DAG으로 관리하는 것이 가능해진다.
분기와 조건부 논리를 구현할 때는 아래의 operator들을 사용할 수 있다.
•
ShortCircuitOperator : 조건을 만족하는 경우에만 다음 task 실행
•
BranchPythonOperator : 조건에 따라 반환한 task_id에 해당하는 task 실행
•
BranchSQLOperator : SQL 쿼리 결과의 True/False 여부로 실행할 Task 분기
•
BranchDayOfWeekOperator : 현재 요일 = 지정된 week_day 매개변수와 동일한지에 따라 실행할 task 분기
•
BranchDateTimeOperator : 현재 시각이 target_lower 와 target_upper 매개변수 사이의 시각인지에 따라 실행할 task 분기
이 operator들의 사용법을 차근차근 정리해보도록 하자.
목차 (화살표 눌러서 열어보기)
1. ShortCircuitOperator
ShorCircuitOperator는 특정 조건(condition)을 만족했을 때 downstream task를 실행하는 operator이다.
즉, 조건에 따라 task의 실행 여부를 결정할 때 사용할 수 있다.
1.1 특징
•
python_callable로 전달받은 함수가 True를 반환하면 downstream task를 실행
•
False를 반환하면 downstream task를 모두 skip
1.2 활용
•
DAG의 일부 task만 조건부로 실행해야 할 때
•
DAG은 daily로 실행되지만 특정 task는 특정 요일이나 날짜마다 시행되어야하는 경우
•
데이터 품질 검사 후 후속 작업 실행 여부 결정
1.3 사용법
기본 코드
from airflow.operators.python import ShortCircuitOperator
# True, False를 return하는 함수
def check_condition():
# 조건 로직 구현
return True # 또는 False
short_circuit_task = ShortCircuitOperator(
task_id='short_circuit_task',
python_callable=check_condition,
dag=dag
)
Python
복사
예시
from airflow.operators.python import ShortCircuitOperator
from airflow.operators.dummy import DummyOperator
def check_condition(execution_date):
# 월요일만 실행
return execution_date.weekday() == 0 # 0=월요일 -> True, 아니면 False
short_circuit_task = ShortCircuitOperator(
task_id='short_circuit_task',
python_callable=check_condition,
dag=dag
)
daily_task = DummyOperator(task_id='daily_task', dag=dag)
monday_task = DummyOperator(task_id='monday_task', dag=dag)
# tasks
daily_task >> short_circuit_task >> monday_task # daily_task 실행 후, 월요일에만 monday_task 실행
Python
복사
2. BranchPythonOperator
BranchPythonOperator는 조건(condition)에 따라 다음에 실행할 task를 선택한다.
2.1 특징
•
python_callable로 전달받은 함수는 실행시킬 Downstream Task ID를 반환
•
반환된 task_id에 해당하는 task만 실행되고 나머지는 skip
2.2 활용
•
요일이나 날짜에 따라 실행할 Task가 달라질 때
•
데이터 크기나 유형에 따라 다른 처리 경로를 선택할 때
•
에러 발생시 별도의 처리 경로 실행
2.3 사용법
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
# 조건에 따라 실행시킬 task_id를 return하는 함수
def choose_branch(**context):
# 1일에는 monthly_task를, 아니면 daily_task를 return
if context['execution_date'].day == 1:
return 'monthly_task'
else:
return 'daily_task'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
provide_context=True,
dag=dag
)
daily_task = DummyOperator(task_id='daily_task', dag=dag)
monthly_task = DummyOperator(task_id='monthly_task', dag=dag)
# 아래처럼 task를 설정하면 조건에 맞는 task만 실행
branch_task >> [daily_task, monthly_task]
Python
복사
3. BranchSQLOperator
BranchSQLOperator는 SQL 쿼리의 결과에 따라 실행할 Task를 분기한다.
3.1 특징
•
SQL 쿼리 결과의 True/False 여부에 따라 실행할 task 분기
3.2 활용
•
데이터베이스 상태나 조건에 따른 분기
3.3 사용법
from airflow.operators.sql import BranchSQLOperator
from airflow.operators.dummy import DummyOperator
branch_sql_task = BranchSQLOperator(
task_id='branch_sql_task',
sql="SELECT COUNT(*) > 100 FROM my_table", # true/false가 나오는 쿼리
follow_task_ids_if_true='high_volume_task',
follow_task_ids_if_false='low_volume_task',
conn_id='my_db_conn',
dag=dag
)
high_volume_task = DummyOperator(task_id='high_volume_task', dag=dag)
low_volume_task = DummyOperator(task_id='low_volume_task', dag=dag)
# task relations
branch_sql_task >> [high_volume_task, low_volume_task]
Python
복사
4. BranchDayOfWeekOperator
BranchDayOfWeekOperator는 현재 요일에 따라 실행할 task를 동적으로 선택한다. 이 operator를 사용하면 특정 요일에 따라 다른 작업 흐름을 쉽게 구현할 수 있다.
4.1 특징
•
현재 요일이 지정된 week_day 매개변수와 동일한지 확인
•
일치 여부에 따라 follow_task_ids_if_true 또는 follow_task_ids_if_false에 지정된 task를 실행
4.2 활용
•
요일별로 다른 작업을 수행해야 할 경우
4.3 사용법
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.operators.dummy import DummyOperator
monday_branch = BranchDayOfWeekOperator(
task_id='monday_branch',
follow_task_ids_if_true='monday_task', # week_day와 일치할 때 실행할 task ID
follow_task_ids_if_false='others_task', # week_day와 일치하지 않을 때 실행할 task ID
week_day='Monday', # 비교할 요일 (예: 'Monday', 'Tuesday' 등으로 정확히 입력)
use_task_execution_day=True, # True: task 실행 날짜를 기준으로 요일 확인 (기본값: False)
dag=dag
)
monday_task = DummyOperator(task_id='monday_task', dag=dag)
others_task = DummyOperator(task_id='others_task', dag=dag)
# task relations
monday_branch >> [monday_task, others_task]
Python
복사
5. BranchDateTimeOperator
BranchDateTimeOperator는 현재 시각에 따라 실행할 task를 분기한다.
5.1 특징
•
현재 시각이 target_lower와 target_upper 매개변수 사이의 시각인지에 따라 실행할 task 분기
5.2 활용
•
특정 시간대에 따른 작업 분기에 유용
5.3 사용법
from airflow.operators.datetime import BranchDateTimeOperator
from datetime import time
time_branch = BranchDateTimeOperator(
task_id='time_branch',
follow_task_ids_if_true='business_hours_task',
follow_task_ids_if_false='after_hours_task',
target_lower=time(9, 0),
target_upper=time(17, 0),
dag=dag
)
Python
복사
6. task 조건 분기시 주의사항
6.1 트리거 규칙(Trigger Rules) 설정
Task 조건분기시, 후속 task의 트리거 규칙 기본값은 all_success로, 모든 상위 작업이 성공해야 하위 작업이 실행된다. 따라서 분기 후에 task를 재결합할 때는 필수로 trigger rules를 적절하게 설정해야 한다.
설정 방법
•
task의 매개변수로 전달
•
주요 Trigger Rules
◦
all_success (기본값): 모든 상위 task가 성공해야 실행
◦
all_failed: 모든 상위 task가 실패해야 실행
◦
all_done: 모든 상위 task가 완료되면 실행 (성공/실패 무관)
◦
one_success: 최소 하나의 상위 task가 성공하면 실행
◦
one_failed: 최소 하나의 상위 task가 실패하면 실행
◦
none_failed: 실패한 상위 task가 없으면 실행 (일부 skipped 허용)
◦
none_failed_or_skipped: 실패나 skipped된 상위 task가 없으면 실행
◦
none_skipped: skipped된 상위 task가 없으면 실행
•
예시
from airflow.operators.dummy import DummyOperator
join_task = DummyOperator(
task_id='join_task',
trigger_rule='one_success',
dag=dag
)
Python
복사
6.2 기타
•
분기 로직을 잘 테스트하고 로깅하여 오작동하지 않도록 한다.
•
과도하게 복잡한 분기는 지양하고, Task Group을 활용하거나 독립적인 DAG으로 분리한다.
→ Task Group에 대해서는 더 알아보기
지금까지 살펴본 다섯가지 operator의 특징을 간단히 정리해보면 아래와 같다.
운영자 | 반환 값 | 하위 작업 영향 | 사용 사례 |
ShortCircuitOperator | boolean | 전체 하위 작업 | 조건부 전체 실행 차단 |
BranchPythonOperator | task_id (문자열) | 선택된 작업만 | 다중 경로 분기 처리 |
BranchSQLOperator | SQL 결과 기반 | 선택된 작업만 | DB 기반 조건 분기 |
BranchDayOfWeekOperator | 요일 조건 | 선택된 작업만 | 요일별 스케줄링 |
BranchDateTimeOperator | 시간 범위 조건 | 선택된 작업만 | 시간대별 작업 처리 |
이 operator들을 사용하면, Airflow DAG을 만들 때 복잡한 조건부 로직을 효과적으로 구현할 수 있게 된다.
아직 나도 업무에서 이중에서 몇가지만 사용해봤는데, 상황과 필요에 따라 적절한 operator를 선택해서 사용하면 좋을 것 같다. 