개발중/AirFlow

[Airflow] 🚀 고객사별 병렬 실행 + 내부 순차 작업 처리하기

Binsoo 2025. 4. 24. 09:21
728x90
반응형

 

Airflow에서 다수의 고객사에 대해 각각 배치를 수행하는 경우,

"고객사 간은 병렬로 처리하면서, 고객사 내부의 작업은 순차로 실행"되는 구조가 필요할 수 있습니다.

 

✨ 목표 시나리오

고객사별 작업 목록

1 번 고객사에 1,2,3,4 작업이 대기중이다.

13 번 고객사에 10,20 작업이 대기중이다.

14 번 고객사에 100,200 작업이 대기중이다.

{
    1: [1, 2, 3, 4],
    13: [10, 20],
    14: [100, 200],
}

 

기대 실행 구조

1번 고객사의 첫번째 작업과 13번 고객사의 첫번째 작업과 100번 고객사의 첫번째 작업을 병렬처리 한다.각 고객사의 첫번째 작업이 끝나면 고객사별 다음 작업을 순차적으로 실행한다.

Company_Batch_1_1 ➞ Company_Batch_1_2 ➞ Company_Batch_1_3 ➞ Company_Batch_1_4
Company_Batch_13_10 ➞ Company_Batch_13_20
Company_Batch_14_100 ➞ Company_Batch_14_200

 

 

✨ 구현 코드

이 구조에서는 KubernetesPodOperator를 사용하여 각 작업마다 별도의 Pod를 생성하고, 각 Pod에서 독립적으로 프로세스를 실행합니다. ( image 변경 필수 )

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

def get_customer_work_map():
    return {
        1: [1, 2, 3, 4],
        13: [10, 20],
        14: [100, 200],
    }

with DAG(
    dag_id="MULTI_STAGE_BATCH",
    schedule_interval=None,
    start_date=datetime(2025, 4, 1),
    catchup=False,
    default_args={
        "depends_on_past": False,
        "email": ["soobin@naver.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["multi-customer"],
) as dag:

    customer_work_map = get_customer_work_map()

    for company_id, work_list in customer_work_map.items():
        prev_task = None

        for work_id in work_list:
            current_task = KubernetesPodOperator(
                task_id=f"Company_Batch_{company_id}_{work_id}",
                name=f"pod-{company_id}-{work_id}",
                namespace="client-test",
                image="soobin.com/batch:0.0.1", 
                env_vars={
                    "COMPANY_ID": str(company_id),
                    "WORK_ID": str(work_id),
                    "PROFILE": "pre-production"
                },
                get_logs=True,
                is_delete_operator_pod=True,
                labels={"project_name": "client-test"},
            )

            if prev_task:
                prev_task >> current_task

            prev_task = current_task
 

✅ 결과

  • 고객사 별로 작업이 순차적으로 이어지며 실행됨
  • 고객사 간에는 병렬로 실행됨
  • task_id를 기준으로 의존성 설정 (>>)을 통해 순서를 보장함

 

🚫 단점

문제설명

UI 가독성 낮음 DAG View에서 모든 작업이 한 화면에 펼쳐짐
고객사 단위 식별 어려움 작업명 규칙 없이는 구분이 힘듦
확장 어려움 고객사/작업 수가 늘어나면 복잡도 급증

 

🚀 다음 단계: TaskGroup으로 개선하기

  • 고객사별 작업을 하나의 그룹으로 시각화 가능
  • DAG UI에서 가독성이 대폭 향상됨
  • 유지보수가 쉬워지고 작업 단위로 관리 가능
  • 그룹 간 의존성도 설정 가능 (group_a >> group_b)

다음은 위와 동일한 DAG을 TaskGroup을 적용하여 리팩토링한 예제입니다.

 

✨ TaskGroup 적용 예제

✅ TaskGroup 버전의 장점 요약
  • DAG View에서 작업 흐름이 고객사 단위로 정리되어 한눈에 보기 쉬움
  • 그룹별 tooltip 등으로 문서화 및 관리 용이
  • 그룹 간 관계 정의 가능해 복잡한 워크플로우에도 확장 가능

TaskGroup은 단순한 시각화 도구 그 이상으로, DAG 유지보수성과 가독성을 높이는 핵심 도구입니다.

 

from airflow.utils.task_group import TaskGroup

with DAG(...):
    customer_work_map = get_customer_work_map()

    for company_id, work_list in customer_work_map.items():
        with TaskGroup(group_id=f"group_{company_id}", tooltip=f"Company {company_id} Tasks") as company_group:
            prev_task = None

            for work_id in work_list:
                current_task = KubernetesPodOperator(
                    task_id=f"cb_{company_id}_{work_id}",
                    name=f"pod-{company_id}-{work_id}",
                    namespace="client-test",
                    image="ai-harbor.quettai.com/lucy3/pre-insight-batch:100.0.4",
                    env_vars={
                        "COMPANY_ID": str(company_id),
                        "WORK_ID": str(work_id),
                        "PROFILE": "pre-production"
                    },
                    get_logs=True,
                    is_delete_operator_pod=True,
                    labels={"project_name": "client-test"},
                )

                if prev_task:
                    prev_task >> current_task
                prev_task = current_task

 

728x90
반응형