Search

Airflow 왕왕기초

Created at
2023/03/03
Updated at
2023/03/04
Tags
Keywords
Airflow
3 more properties
이번 이직으로 오늘의집에 조인하게 되면서 AWS를 사용하는 데이터 인프라에 적응하고 있다. 특히 오늘의집에서는 DA도 필요에 따라 직접 마트를 생성하는 등 ETL 업무에 Airflow를 직접 사용하고 있어 공부하는 중이다.
오늘의집에서는 2020년 후반부터 데이터 플랫폼 개선 작업의 일환으로 Airflow를 도입했다고 하는데, 자세한 내용은 아래의 블로그를 참고할 것.
우선은 Airflow의 왕왕 기초적인 내용부터 정리해보려고 한다. Airflow에는 다양한 개념과 구성요소가 있어 한 눈에 파악하기 어려운 듯 보이지만, 공식 documentation과 다양한 글들을 참고하며 기초적인 부분을 이해해볼 수 있다.
목차

Airflow란?

Airbnb에서 개발한 워크플로우 관리 오픈소스 플랫폼
쉽게 말해, 일련의 일들을 순차적으로 진행시켜주는 작업관리자
주기적으로 실행되는 batch 작업에 적합
Python으로 워크플로우를 작성하고 스케쥴링, 모니터링 → “Workflows as code”
.py 파일 하나로 워크플로우 작성이 가능하다는 것이 장점

Airflow 이해하기

주요 개념

Workflow: 의존성으로 연결된 작업들의 집합
예를 들어 저녁 식사를 준비한다면, 메뉴를 결정한다 → 재료를 준비한다 → 재료를 손질한다 → 요리한다 와 같은 흐름. 뒤의 작업은 앞 단계 작업에 의존적이다.
DAG(Directed Acyclic Graph)
A DAG represents a workflow, a collection of tasks”
순환하지 않고(acyclic), 방향성을 갖고(directed) 진행되는 워크플로우 구조
실행하고 싶은 Task들의 관계와 dependency를 표현하고 있는 Task들의 모음
어떤 순서와 dependency로 실행할지, 어떤 스케줄로 실행할지에 대한 정보를 가지고 있음
DAG를 정확하게 정의하고 설정해야 원하는 Task들을 스케쥴에 맞게 정확하게 실행할 수 있음
Task: DAG를 이루는 단위 작업

구성 요소

Database: DAG과 Task들의 메타데이터 저장소
예를 들어 task status(queued, scheduled, running, success, failed, etc) 가 저장
Web server: Airflow의 웹 UI 서버로, 실행 중인 작업을 한 눈에 볼 수 있는 다양한 View 기능 제공
Scheduler: 모든 DAG과 Task를 모니터링 및 관리하고, 실행해야 할 Task를 스케줄링
Executor: 스케쥴러와 함께 동작하는 구성요소
status가 queued인 task를 확인하며 실제 어떤 리소스가 투입되어 실행이 될 것인지를 결정
Local Executor, Celery Executor, Kubernetes Executor 등이 흔히 사용
Worker: 실제 Task를 실행하는 구성요소
Executor 및 airflow.cfg 에 의해 작업 환경 구성이 완성
Operator: Task를 어떻게 실행시킬지 나타냄

간단한 동작 방식

사용자는 DAG를 만든다.
Airflow는 Python DAG을 읽고, 거기에 맞춰 Scheduler가 Task를 스케줄링한다.
Worker가 Task를 가져가 실행한다.
Task의 실행상태는 Database에 저장된다.
사용자는 UI를 통해서 각 Task의 실행 상태, 성공 여부 등을 확인하고 관리한다.

오퍼레이터

개념
각 DAG은 여러 Task로 이루어져 있다. Operator나 Sensor가 하나의 Task로 만들어지는데, Airflow는 기본적인 Task를 위해 다양한 Operator를 제공한다.
쉽게 말해 원하는 Task를 실행시키는 기계라고 이해하자. 오퍼레이터를 실행(세팅)하면 Task가 된다. 즉, 오퍼레이터에 특정 인풋과 조건을 넣어주면 특정 태스크(작업)가 된다.
오퍼레이터의 큰 분류
Action Operator : 간단한 연산 수행 오퍼레이터, airflow.operators 모듈 아래에 존재
Transfer Operator : 데이터를 옮기는 오퍼레이터, <출발>To<도착>Operator 형태
Sensor : Task를 실행시킬 Trigger(이벤트)를 기다리는 특별한 타입의 오퍼레이터 (예를 들어 어떤 폴더에 데이터가 쌓여지기를 기다린다든지, 요청에 대한 응답이 확인되기를 기다린다든지)
주요 오퍼레이터
PythonOperator : python 함수를 실행
BashOperator : bash command를 실행
EmailOperator : Email을 발송
MySqlOperator : sql 쿼리를 수행

DAG 만들기

DAG 만들기

~/airflow/dags 폴더 안에 {dag_name}.py 파일 생성. 이 파일에 하나의 DAG을 코딩
Task마다 사용할 방법이나 툴이 달라지고 그에 맞는 오퍼레이터를 사용해 DAG을 작성

DAG의 기본적인 구성

DAG 정의 파일 작성
DAG의 기본적인 틀, 실행 주기 등을 설정
DAG의 기본 설정(대그 아이디(유일하게), 대그를 돌릴 주기, 태그 등 설정)을 완성한다.
이후 DAG 작업의 오퍼레이터들을 만들고, 맨 마지막에 연결하는 코드를 작성
실행 주기와 DAG 시작 날짜 등 기본적인 정보를 설정
필요한 오퍼레이터를 어떤 순서로 연결할지 설계해서 하나의 DAG로 작성
맨 마지막에 Task들을 순서대로 연결해서 파이프라인을 구성해준다
다음은 간단한 bash_operator를 사용하는 DAG 파일의 구성 예시이다.
# import basic libraries from airflow import DAG from datetime import datetime import pendulum # import operators from airflow.operators.bash_operator import BashOperator # import sensors # local timezone kst = pendulum.timezone("Asia/Seoul") # BASE_DT 기준일자 base_dt = '{{ dag.timezone.convert(execution_date).strftime("%Y-%m-%d") }}' # DAG args 정의 default_args = { 'owner': '{owner_name}', 'depends_on_past': False, 'email': '{owner_email}', 'email_on_failure': True, 'email_on_retry': True, 'retries': 0 } # DAG 인스턴스화: DAG_ID와 default_args, description, schedule_interaval 등을 지정하여 DAG 인스턴스를 생성 dag = DAG( dag_id='{file명과 통일}', description='설명', tags=['{tag1}', '{tag2}'], default_args=default_args, max_active_runs=1, start_date=datetime(2023, 3, 15, tzinfo=kst), # Dag 생성 이후 바꾸면 안됨. 바꾸려면 Dag 새로 생성 or dag_id 변경 (kst) schedule_interval="5 10 * * 1", # 분 시 일 월 요일 catchup=True # start_date부터 backfill 여부 / True : 백필 / False: 백필 무시 ) # task 정의: operator를 이용해 task 정의 t1 = BashOperator( task_id='{task_id}', bash_command='{bash_command}', dag=dag ) # task relation: Task 파이프라인 구성 t1
Python
복사
DAG 정의 파일 구성
그런데 위와 같이 실제로 DAG을 세팅하려고 해보면, 무척 헷갈리는 것이 Airflow에 쓰이는 다양한 날짜 개념이다.
다음 포스팅에서는 Airflow의 날짜들에 대해 정리해보려고 한다.
참고자료