Search

Airflow 조건에 따른 task 분기

Created at
2024/12/23
Updated at
2025/02/06
Tags
Keywords
Airflow
3 more properties
Airflow DAG을 만들때, 특정 조건에 따라 특정 task의 실행 여부를 결정하거나 실행할 task를 결정하고 싶을 때가 있다.
예를 들어 나는 execution date의 요일이나 날짜에 따라 특정 task만 실행되도록 하는 DAG을 실제로 사용하고 있다. 이렇게 하면, 살짝만 내용이 다른 daily, weekly, monthly task를 각각의 DAG이 아닌 하나의 DAG으로 관리하는 것이 가능해진다.
분기와 조건부 논리를 구현할 때는 아래의 operator들을 사용할 수 있다.
ShortCircuitOperator : 조건을 만족하는 경우에만 다음 task 실행
BranchPythonOperator : 조건에 따라 반환한 task_id에 해당하는 task 실행
BranchSQLOperator : SQL 쿼리 결과의 True/False 여부로 실행할 Task 분기
BranchDayOfWeekOperator : 현재 요일 = 지정된 week_day 매개변수와 동일한지에 따라 실행할 task 분기
BranchDateTimeOperator : 현재 시각이 target_lower 와 target_upper 매개변수 사이의 시각인지에 따라 실행할 task 분기
이 operator들의 사용법을 차근차근 정리해보도록 하자.
목차 (화살표 눌러서 열어보기)

1. ShortCircuitOperator

ShorCircuitOperator는 특정 조건(condition)을 만족했을 때 downstream task를 실행하는 operator이다.
즉, 조건에 따라 task의 실행 여부를 결정할 때 사용할 수 있다.

1.1 특징

python_callable로 전달받은 함수가 True를 반환하면 downstream task를 실행
False를 반환하면 downstream task를 모두 skip

1.2 활용

DAG의 일부 task만 조건부로 실행해야 할 때
DAG은 daily로 실행되지만 특정 task는 특정 요일이나 날짜마다 시행되어야하는 경우
데이터 품질 검사 후 후속 작업 실행 여부 결정

1.3 사용법

기본 코드
from airflow.operators.python import ShortCircuitOperator # True, False를 return하는 함수 def check_condition(): # 조건 로직 구현 return True # 또는 False short_circuit_task = ShortCircuitOperator( task_id='short_circuit_task', python_callable=check_condition, dag=dag )
Python
복사
예시
from airflow.operators.python import ShortCircuitOperator from airflow.operators.dummy import DummyOperator def check_condition(execution_date): # 월요일만 실행 return execution_date.weekday() == 0 # 0=월요일 -> True, 아니면 False short_circuit_task = ShortCircuitOperator( task_id='short_circuit_task', python_callable=check_condition, dag=dag ) daily_task = DummyOperator(task_id='daily_task', dag=dag) monday_task = DummyOperator(task_id='monday_task', dag=dag) # tasks daily_task >> short_circuit_task >> monday_task # daily_task 실행 후, 월요일에만 monday_task 실행
Python
복사

2. BranchPythonOperator

BranchPythonOperator는 조건(condition)에 따라 다음에 실행할 task를 선택한다.

2.1 특징

python_callable로 전달받은 함수는 실행시킬 Downstream Task ID를 반환
반환된 task_id에 해당하는 task만 실행되고 나머지는 skip

2.2 활용

요일이나 날짜에 따라 실행할 Task가 달라질 때
데이터 크기나 유형에 따라 다른 처리 경로를 선택할 때
에러 발생시 별도의 처리 경로 실행

2.3 사용법

from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy import DummyOperator # 조건에 따라 실행시킬 task_id를 return하는 함수 def choose_branch(**context): # 1일에는 monthly_task를, 아니면 daily_task를 return if context['execution_date'].day == 1: return 'monthly_task' else: return 'daily_task' branch_task = BranchPythonOperator( task_id='branch_task', python_callable=choose_branch, provide_context=True, dag=dag ) daily_task = DummyOperator(task_id='daily_task', dag=dag) monthly_task = DummyOperator(task_id='monthly_task', dag=dag) # 아래처럼 task를 설정하면 조건에 맞는 task만 실행 branch_task >> [daily_task, monthly_task]
Python
복사

3. BranchSQLOperator

BranchSQLOperator는 SQL 쿼리의 결과에 따라 실행할 Task를 분기한다.

3.1 특징

SQL 쿼리 결과의 True/False 여부에 따라 실행할 task 분기

3.2 활용

데이터베이스 상태나 조건에 따른 분기

3.3 사용법

from airflow.operators.sql import BranchSQLOperator from airflow.operators.dummy import DummyOperator branch_sql_task = BranchSQLOperator( task_id='branch_sql_task', sql="SELECT COUNT(*) > 100 FROM my_table", # true/false가 나오는 쿼리 follow_task_ids_if_true='high_volume_task', follow_task_ids_if_false='low_volume_task', conn_id='my_db_conn', dag=dag ) high_volume_task = DummyOperator(task_id='high_volume_task', dag=dag) low_volume_task = DummyOperator(task_id='low_volume_task', dag=dag) # task relations branch_sql_task >> [high_volume_task, low_volume_task]
Python
복사

4. BranchDayOfWeekOperator

BranchDayOfWeekOperator는 현재 요일에 따라 실행할 task를 동적으로 선택한다. 이 operator를 사용하면 특정 요일에 따라 다른 작업 흐름을 쉽게 구현할 수 있다.

4.1 특징

현재 요일이 지정된 week_day 매개변수와 동일한지 확인
일치 여부에 따라 follow_task_ids_if_true 또는 follow_task_ids_if_false에 지정된 task를 실행

4.2 활용

요일별로 다른 작업을 수행해야 할 경우

4.3 사용법

from airflow.operators.weekday import BranchDayOfWeekOperator from airflow.operators.dummy import DummyOperator monday_branch = BranchDayOfWeekOperator( task_id='monday_branch', follow_task_ids_if_true='monday_task', # week_day와 일치할 때 실행할 task ID follow_task_ids_if_false='others_task', # week_day와 일치하지 않을 때 실행할 task ID week_day='Monday', # 비교할 요일 (예: 'Monday', 'Tuesday' 등으로 정확히 입력) use_task_execution_day=True, # True: task 실행 날짜를 기준으로 요일 확인 (기본값: False) dag=dag ) monday_task = DummyOperator(task_id='monday_task', dag=dag) others_task = DummyOperator(task_id='others_task', dag=dag) # task relations monday_branch >> [monday_task, others_task]
Python
복사

5. BranchDateTimeOperator

BranchDateTimeOperator는 현재 시각에 따라 실행할 task를 분기한다.

5.1 특징

현재 시각이 target_lower와 target_upper 매개변수 사이의 시각인지에 따라 실행할 task 분기

5.2 활용

특정 시간대에 따른 작업 분기에 유용

5.3 사용법

from airflow.operators.datetime import BranchDateTimeOperator from datetime import time time_branch = BranchDateTimeOperator( task_id='time_branch', follow_task_ids_if_true='business_hours_task', follow_task_ids_if_false='after_hours_task', target_lower=time(9, 0), target_upper=time(17, 0), dag=dag )
Python
복사

6. task 조건 분기시 주의사항

6.1 트리거 규칙(Trigger Rules) 설정

Task 조건분기시, 후속 task의 트리거 규칙 기본값은 all_success로, 모든 상위 작업이 성공해야 하위 작업이 실행된다. 따라서 분기 후에 task를 재결합할 때는 필수로 trigger rules를 적절하게 설정해야 한다.
설정 방법
task의 매개변수로 전달
주요 Trigger Rules
all_success (기본값): 모든 상위 task가 성공해야 실행
all_failed: 모든 상위 task가 실패해야 실행
all_done: 모든 상위 task가 완료되면 실행 (성공/실패 무관)
one_success: 최소 하나의 상위 task가 성공하면 실행
one_failed: 최소 하나의 상위 task가 실패하면 실행
none_failed: 실패한 상위 task가 없으면 실행 (일부 skipped 허용)
none_failed_or_skipped: 실패나 skipped된 상위 task가 없으면 실행
none_skipped: skipped된 상위 task가 없으면 실행
예시
from airflow.operators.dummy import DummyOperator join_task = DummyOperator( task_id='join_task', trigger_rule='one_success', dag=dag )
Python
복사

6.2 기타

분기 로직을 잘 테스트하고 로깅하여 오작동하지 않도록 한다.
과도하게 복잡한 분기는 지양하고, Task Group을 활용하거나 독립적인 DAG으로 분리한다.
→ Task Group에 대해서는 더 알아보기
지금까지 살펴본 다섯가지 operator의 특징을 간단히 정리해보면 아래와 같다.
운영자
반환 값
하위 작업 영향
사용 사례
ShortCircuitOperator
boolean
전체 하위 작업
조건부 전체 실행 차단
BranchPythonOperator
task_id (문자열)
선택된 작업만
다중 경로 분기 처리
BranchSQLOperator
SQL 결과 기반
선택된 작업만
DB 기반 조건 분기
BranchDayOfWeekOperator
요일 조건
선택된 작업만
요일별 스케줄링
BranchDateTimeOperator
시간 범위 조건
선택된 작업만
시간대별 작업 처리
이 operator들을 사용하면, Airflow DAG을 만들 때 복잡한 조건부 로직을 효과적으로 구현할 수 있게 된다.
아직 나도 업무에서 이중에서 몇가지만 사용해봤는데, 상황과 필요에 따라 적절한 operator를 선택해서 사용하면 좋을 것 같다.