개발중/AirFlow
[Airflow] 🚀 고객사별 병렬 실행 + 내부 순차 작업 처리하기
Binsoo
2025. 4. 24. 09:21
728x90
반응형
Airflow에서 다수의 고객사에 대해 각각 배치를 수행하는 경우,
"고객사 간은 병렬로 처리하면서, 고객사 내부의 작업은 순차로 실행"되는 구조가 필요할 수 있습니다.
✨ 목표 시나리오
고객사별 작업 목록
1 번 고객사에 1,2,3,4 작업이 대기중이다.
13 번 고객사에 10,20 작업이 대기중이다.
14 번 고객사에 100,200 작업이 대기중이다.
{
1: [1, 2, 3, 4],
13: [10, 20],
14: [100, 200],
}
기대 실행 구조
1번 고객사의 첫번째 작업과 13번 고객사의 첫번째 작업과 100번 고객사의 첫번째 작업을 병렬처리 한다.각 고객사의 첫번째 작업이 끝나면 고객사별 다음 작업을 순차적으로 실행한다.
Company_Batch_1_1 ➞ Company_Batch_1_2 ➞ Company_Batch_1_3 ➞ Company_Batch_1_4
Company_Batch_13_10 ➞ Company_Batch_13_20
Company_Batch_14_100 ➞ Company_Batch_14_200
✨ 구현 코드
이 구조에서는 KubernetesPodOperator를 사용하여 각 작업마다 별도의 Pod를 생성하고, 각 Pod에서 독립적으로 프로세스를 실행합니다. ( image 변경 필수 )
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
def get_customer_work_map():
return {
1: [1, 2, 3, 4],
13: [10, 20],
14: [100, 200],
}
with DAG(
dag_id="MULTI_STAGE_BATCH",
schedule_interval=None,
start_date=datetime(2025, 4, 1),
catchup=False,
default_args={
"depends_on_past": False,
"email": ["soobin@naver.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
tags=["multi-customer"],
) as dag:
customer_work_map = get_customer_work_map()
for company_id, work_list in customer_work_map.items():
prev_task = None
for work_id in work_list:
current_task = KubernetesPodOperator(
task_id=f"Company_Batch_{company_id}_{work_id}",
name=f"pod-{company_id}-{work_id}",
namespace="client-test",
image="soobin.com/batch:0.0.1",
env_vars={
"COMPANY_ID": str(company_id),
"WORK_ID": str(work_id),
"PROFILE": "pre-production"
},
get_logs=True,
is_delete_operator_pod=True,
labels={"project_name": "client-test"},
)
if prev_task:
prev_task >> current_task
prev_task = current_task
✅ 결과
- 고객사 별로 작업이 순차적으로 이어지며 실행됨
- 고객사 간에는 병렬로 실행됨
- task_id를 기준으로 의존성 설정 (>>)을 통해 순서를 보장함
🚫 단점
문제설명
UI 가독성 낮음 | DAG View에서 모든 작업이 한 화면에 펼쳐짐 |
고객사 단위 식별 어려움 | 작업명 규칙 없이는 구분이 힘듦 |
확장 어려움 | 고객사/작업 수가 늘어나면 복잡도 급증 |
🚀 다음 단계: TaskGroup으로 개선하기
- 고객사별 작업을 하나의 그룹으로 시각화 가능
- DAG UI에서 가독성이 대폭 향상됨
- 유지보수가 쉬워지고 작업 단위로 관리 가능
- 그룹 간 의존성도 설정 가능 (group_a >> group_b)
다음은 위와 동일한 DAG을 TaskGroup을 적용하여 리팩토링한 예제입니다.
✨ TaskGroup 적용 예제
✅ TaskGroup 버전의 장점 요약
- DAG View에서 작업 흐름이 고객사 단위로 정리되어 한눈에 보기 쉬움
- 그룹별 tooltip 등으로 문서화 및 관리 용이
- 그룹 간 관계 정의 가능해 복잡한 워크플로우에도 확장 가능
TaskGroup은 단순한 시각화 도구 그 이상으로, DAG 유지보수성과 가독성을 높이는 핵심 도구입니다.
from airflow.utils.task_group import TaskGroup
with DAG(...):
customer_work_map = get_customer_work_map()
for company_id, work_list in customer_work_map.items():
with TaskGroup(group_id=f"group_{company_id}", tooltip=f"Company {company_id} Tasks") as company_group:
prev_task = None
for work_id in work_list:
current_task = KubernetesPodOperator(
task_id=f"cb_{company_id}_{work_id}",
name=f"pod-{company_id}-{work_id}",
namespace="client-test",
image="ai-harbor.quettai.com/lucy3/pre-insight-batch:100.0.4",
env_vars={
"COMPANY_ID": str(company_id),
"WORK_ID": str(work_id),
"PROFILE": "pre-production"
},
get_logs=True,
is_delete_operator_pod=True,
labels={"project_name": "client-test"},
)
if prev_task:
prev_task >> current_task
prev_task = current_task
728x90
반응형