개발중/AirFlow

[Airflow] DAG Objuect 작성 방법

Binsoo 2025. 4. 21.
728x90
반응형

DOG 예제

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# default_args 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['example@example.com'],
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 9, 1),
}

# DAG 정의
with DAG(
    dag_id='example_dag',
    default_args=default_args,
    description='An example DAG',
    schedule_interval='@daily',
    catchup=False,
    max_active_runs=1,
    tags=['example', 'tutorial']
) as dag:

    # 태스크 정의
    hello_task = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, World!"'
    )

 

DOG 인자

필수 인자

  1. dag_id:
    • DAG의 고유 식별자입니다.
    • 각 DAG마다 유일해야 하며, Airflow에서 이 ID를 기반으로 DAG를 식별합니다.
    • 예시: 'example_dag'
  2. start_date:
    • 이 DAG이 언제부터 실행될지를 결정하는 날짜입니다.
    • datetime 객체를 사용하여 지정하며, 종종 default_args에 설정됩니다.
    • 예시: start_date=datetime(2023, 9, 1)

선택 인자

  1. schedule_interval:
    • DAG이 얼마나 자주 실행될지를 결정합니다. 여러 가지 표현 방법이 있습니다.
      • @once: 한 번만 실행합니다.
      • @daily, @hourly, @weekly: Airflow에서 제공하는 예약어를 사용하여 주기를 간단히 설정할 수 있습니다.
      • 크론 표현식('0 12 * * *'): 좀 더 세밀한 스케줄을 설정할 수 있습니다.
    • 기본값은 None이며, 이 경우 DAG이 스케줄되지 않습니다.
  2. default_args:
    • DAG 내의 모든 태스크에 공통적으로 적용할 기본 매개변수의 딕셔너리입니다.
    • 여기에는 다음과 같은 인자들이 포함될 수 있습니다.
      • owner: 태스크 소유자를 나타냅니다.
      • depends_on_past: 이전 실행이 성공적으로 완료되어야 현재 실행을 할 수 있는지 여부를 나타냅니다. (기본값: False)
      • email: 태스크 실패 시 알림을 보낼 이메일 주소 목록입니다.
      • retries: 태스크 실패 시 재시도 횟수입니다.
      • retry_delay: 태스크 실패 후 재시도까지 기다리는 시간으로, timedelta 객체로 지정됩니다.
      • start_date: 위에서 설명한 DAG의 시작일입니다.
  3. catchup:
    • 기본값: True
    • True로 설정하면 start_date부터 현재까지의 모든 예약을 수행하려고 합니다.
    • False로 설정하면 현재 시점 이후부터만 DAG이 실행됩니다.
  4. tags:
    • DAG에 태그를 추가하여 여러 DAG을 그룹화하거나 쉽게 찾을 수 있도록 합니다.
    • 예시: tags=['data-pipeline', 'etl']
  5. max_active_runs:
    • 동시에 실행될 수 있는 최대 활성화된 DAG 실행 수입니다.
    • 예시: max_active_runs=1은 동시에 한 번만 DAG 실행이 가능하도록 제한합니다.
  6. description:
    • DAG에 대한 설명을 추가합니다.
    • 예시: description="This is an example DAG for data processing"
  7. concurrency:
    • 이 DAG에서 동시에 실행될 수 있는 최대 태스크 수입니다.
    • 예시: concurrency=4는 동시에 최대 4개의 태스크가 실행될 수 있도록 제한합니다.
  8. is_paused_upon_creation:
    • DAG이 처음 로드될 때 자동으로 일시 중지(pause) 상태가 되도록 설정합니다.
    • 기본값: True
    • 예시: is_paused_upon_creation=False는 DAG이 로드될 때 자동으로 실행되도록 합니다.
  9. params:
    • DAG에 파라미터를 전달할 때 사용됩니다.
    • 예시: params={"param1": "value1", "param2": "value2"}
728x90
반응형

댓글