개발중/AirFlow15 Apache Airflow에서의 Task Group 활용법 Apache Airflow는 복잡한 데이터 워크플로우를 자동화하는 데 매우 유용한 도구입니다. 이 글에서는 Airflow의 Task Group을 효과적으로 활용하는 방법에 대해 알아보겠습니다. 1. Task Group이란?Task Group은 여러 태스크를 논리적으로 묶어주는 역할을 합니다. 이를 통해 복잡한 워크플로우를 더 작은 단위로 나누어 관리할 수 있습니다. Task Group 내의 태스크들은 순차적 또는 병렬로 실행될 수 있으며, >> 또는 >>> 연산자를 사용하여 태스크 간의 종속성을 정의할 수 있습니다. 2. Task Group의 사용 방법Step 1: TaskGroup 객체 생성여기서 dag은 이미 정의된 DAG 객체입니다. from airflow.models import TaskGroup.. 개발중/AirFlow 2025. 4. 24. [Airflow] 🚀 고객사별 병렬 실행 + 내부 순차 작업 처리하기 Airflow에서 다수의 고객사에 대해 각각 배치를 수행하는 경우, "고객사 간은 병렬로 처리하면서, 고객사 내부의 작업은 순차로 실행"되는 구조가 필요할 수 있습니다. ✨ 목표 시나리오고객사별 작업 목록1 번 고객사에 1,2,3,4 작업이 대기중이다.13 번 고객사에 10,20 작업이 대기중이다. 14 번 고객사에 100,200 작업이 대기중이다. { 1: [1, 2, 3, 4], 13: [10, 20], 14: [100, 200],} 기대 실행 구조1번 고객사의 첫번째 작업과 13번 고객사의 첫번째 작업과 100번 고객사의 첫번째 작업을 병렬처리 한다.각 고객사의 첫번째 작업이 끝나면 고객사별 다음 작업을 순차적으로 실행한다.Company_Batch_1_1 ➞ Company_Batch.. 개발중/AirFlow 2025. 4. 24. [Airflow] 🎯 Airflow에서 사용자별 DAG 접근 제어 + REST API로 계정 생성까지! 요즘 Airflow를 쓰면서 하나하나 세팅을 직접 해보고 있는데,그 중에서도 "사용자마다 볼 수 있는 DAG을 제한하고 싶다"는 니즈가 생겼다.그리고 아예 REST API로 계정도 자동으로 만들 수 있게 해보자! 라는 마음으로 삽질 시작! 🔐 사용자별 DAG 권한 제어는 어떻게 하지?Airflow는 Roles라는 개념으로 사용자 권한을 나눈다.기본적으로는 다음과 같다:Admin: 모든 DAG, 모든 권한 가능User: 할당된 DAG만 볼 수 있음(추가로 Viewer, Op, 커스텀 Role도 만들 수 있음)DAG에 접근 제어를 하려면 DAG 파일에 아래처럼 access_control을 명시해주면 된다. # dag_example.pyaccess_control = { "example_dag_id": .. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG 의 Task 상태 Airflow Dag 에는 다양한 Task 상태가 존재함. deferred태스크가 어떤 특정 이벤트 또는 조건이 충족될 때까지 실행을 연기하는 상태를 나타냅니다. failed태스크가 실행 중 오류가 발생하여 실패한 상태입니다. 이 상태가 되면, 태스크의 재시도(retry) 설정에 따라 다시 시도될 수 있습니다. queued태스크가 실행될 준비가 되었지만, 실행할 수 있는 리소스를 기다리고 있는 상태입니다. removed해당 태스크가 삭제된 상태를 나타냅니다. 더 이상 DAG에서 사용되지 않거나 불필요한 태스크일 때 나타날 수 있습니다. restarting태스크가 실패한 후 재시작 중인 상태를 의미합니다. 재시도 설정에 따라 자동으로 재시작될 때 이 상태가 표시됩니다. running태스크가 현재 실행 중.. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG 실행 🟡 DAG 저장소 위치 확인DAG 저장소는 검색하면 나오는데, 실제로 없을 수 있으니 없다면 생성하길 바람$ airflow config get-value core dags_folder/root/airflow/dags 🟡 DAG 저장소 위치 확인DAG 기본 저장소 위치 변경 가능 !/root/airflow/dags 경로 하위에 DOG 파일을 저장한다. 🟡 DAG 파일 생성from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime# DAG 정의with DAG( 'hello_world_dag', default_args={ 'owner': 'airflow', .. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG Objuect 작성 방법 DOG 예제from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args 설정default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['example@example.com'], 'email_on_failure': True, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'start_date': datetime(2023, 9, 1),}# DAG 정의with DAG( dag_id='examp.. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG (Directed Acyclic Graph) 란 ? 🟡 특징Airflow의 핵심 개념으로, 작업의 흐름을 나타내는 그래프 구조입니다. Directed Acyclic Graph 라는 이름에서 알 수 있듯이, DAG는 방향성이 있는 그래프이며, 순환(cycle)이 없는 구조입니다.이 구조는 작업(Task) 간의 의존 관계를 정의하고, 어떻게 순차적으로 실행되는지를 결정합니다.DAG는 작업 간의 흐름과 의존성을 정의하는 데 사용되며, 각 작업은 테스크(Task) 로 표현됩니다.DAG 자체는 작업이 어떻게 서로 연결되어 있는지를 나타내고, Airflow는 이 DAG를 따라 작업을 실행합니다. 🟡 기본 구성 요소⚙ 작업(Task)DAG에서 실행해야 하는 하나의 개별 작업을 의미합니다.Python 함수, SQL 쿼리, Shell 명령어 등 다양한 형태로 정의할 .. 개발중/AirFlow 2025. 4. 21. [Airflow] Apache Airflow 특징 Apache Airflow 특징Apache Airflow는 워크플로우 관리 및 스케줄링 플랫폼으로, 데이터 파이프라인의 실행 및 자동화를 지원합니다. 주로 복잡한 데이터 파이프라인, ETL(Extract, Transform, Load) 작업을 관리하는 데 사용되며, 작업 간의 종속성을 쉽게 정의하고 실행할 수 있는 환경을 제공합니다.⚙ DAG (Directed Acyclic Graph) 기반 워크플로우Airflow에서 워크플로우는 **DAG(Directed Acyclic Graph, 방향성 비순환 그래프)**로 정의됩니다. DAG는 각 작업(Task)이 어떤 순서로 실행되어야 하는지 정의한 구조로, 작업 간의 종속성을 명확히 관리할 수 있습니다.⚙ Python으로 정의Airflow 워크플로우는 Pytho.. 개발중/AirFlow 2025. 4. 21. Airflow DAG에서 모듈 임포트가 안 될 때: 상대경로 vs 절대경로 해결 Airflow DAG를 만들면서 `PythonOperator`에서 내가 만든 파이썬 함수를 import하려고 했다. 근데 생각대로 되지 않았기에 기록을 남기려 한다. ㅠㅠ 예를 들어 이런 구조로 작업 중이었다. dags/ └── repo/ └── client-test/ └── soobin/ ├── tutorial.py ├── update_pending_records.py 처음에는 `tutorial.py`에서 상대경로로 import를 시도했었다.시도 1from .update_pending_records import update_pending_records 시도 2from ape.update_pending_records import update_pending_records 하지만 .. 개발중/AirFlow 2025. 4. 7. [Airflow] 아주 간단한 DAG 실행시키기 목적say_hello1 > say_hello2 > say_hello3 함수를 차례대로 실행시켜보자. say_hello1.pydef say_hello1(): print("Hello from say_hello1!") say_hello2.pydef say_hello2(): print("Hello from say_hello2!") say_hello1.pydef say_hello3(): print("Hello from say_hello3!") hello_airflow_test.pyfrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimefrom say_hello1 imp.. 개발중/AirFlow 2025. 4. 3. [Airflow] Airflow 병렬 처리의 함정: Worker 분산 없이 처리하면? Airflow에서 병렬 처리를 구현할 때 많은 사람들이 성능이 크게 향상될 것으로 기대하지만, Worker 분산 처리가 설정되어 있지 않은 경우에는 오히려 순차 처리보다 더 느려질 수 있는 상황이 발생합니다. 이번 포스팅에서는 Airflow의 병렬 처리와 순차 처리의 테스트 결과를 비교하고, 그 이유를 살펴보겠습니다. 테스트 결과 비교Task 순차 처리전체 데이터 수300,000태스크별 데이터 수300,000전체 소요 시간1시간 56분 Task 병렬 처리전체 데이터 수300,000태스크별 데이터 수75,000전체 소요 시간2시간 1분 테스트 결과를 보면, 병렬 처리의 전체 소요 시간이 순차 처리보다 오히려 더 길어졌음을 알 수 있습니다. 병렬 처리를 했음에도 불구하고 성능이 기대만큼 향상되지 않은 이유는.. 개발중/AirFlow 2024. 10. 10. [AirFlow] 특정 Role 만 특정 Dag 권한주고 싶다. 난 Admin Role 만 읽고 수정 권한을 주고 싶다,from airflow import DAGfrom airflow.operators.dummy_operator import DummyOperatorfrom datetime import datetimewith DAG( 'TEST_1', default_args={'start_date': datetime(2023, 10, 7)}, schedule_interval='@daily', catchup=False, access_control={ 'Admin': {'can_read', 'can_edit'}, # Admin 역할에만 읽기 및 수정 권한 부여 },) as dag: start = DummyOperator(.. 개발중/AirFlow 2024. 10. 8. 이전 1 2 다음