개발중632 Apache Airflow에서의 Task Group 활용법 Apache Airflow는 복잡한 데이터 워크플로우를 자동화하는 데 매우 유용한 도구입니다. 이 글에서는 Airflow의 Task Group을 효과적으로 활용하는 방법에 대해 알아보겠습니다. 1. Task Group이란?Task Group은 여러 태스크를 논리적으로 묶어주는 역할을 합니다. 이를 통해 복잡한 워크플로우를 더 작은 단위로 나누어 관리할 수 있습니다. Task Group 내의 태스크들은 순차적 또는 병렬로 실행될 수 있으며, >> 또는 >>> 연산자를 사용하여 태스크 간의 종속성을 정의할 수 있습니다. 2. Task Group의 사용 방법Step 1: TaskGroup 객체 생성여기서 dag은 이미 정의된 DAG 객체입니다. from airflow.models import TaskGroup.. 개발중/AirFlow 2025. 4. 24. [Airflow] 🚀 고객사별 병렬 실행 + 내부 순차 작업 처리하기 Airflow에서 다수의 고객사에 대해 각각 배치를 수행하는 경우, "고객사 간은 병렬로 처리하면서, 고객사 내부의 작업은 순차로 실행"되는 구조가 필요할 수 있습니다. ✨ 목표 시나리오고객사별 작업 목록1 번 고객사에 1,2,3,4 작업이 대기중이다.13 번 고객사에 10,20 작업이 대기중이다. 14 번 고객사에 100,200 작업이 대기중이다. { 1: [1, 2, 3, 4], 13: [10, 20], 14: [100, 200],} 기대 실행 구조1번 고객사의 첫번째 작업과 13번 고객사의 첫번째 작업과 100번 고객사의 첫번째 작업을 병렬처리 한다.각 고객사의 첫번째 작업이 끝나면 고객사별 다음 작업을 순차적으로 실행한다.Company_Batch_1_1 ➞ Company_Batch.. 개발중/AirFlow 2025. 4. 24. [Airflow] 🎯 Airflow에서 사용자별 DAG 접근 제어 + REST API로 계정 생성까지! 요즘 Airflow를 쓰면서 하나하나 세팅을 직접 해보고 있는데,그 중에서도 "사용자마다 볼 수 있는 DAG을 제한하고 싶다"는 니즈가 생겼다.그리고 아예 REST API로 계정도 자동으로 만들 수 있게 해보자! 라는 마음으로 삽질 시작! 🔐 사용자별 DAG 권한 제어는 어떻게 하지?Airflow는 Roles라는 개념으로 사용자 권한을 나눈다.기본적으로는 다음과 같다:Admin: 모든 DAG, 모든 권한 가능User: 할당된 DAG만 볼 수 있음(추가로 Viewer, Op, 커스텀 Role도 만들 수 있음)DAG에 접근 제어를 하려면 DAG 파일에 아래처럼 access_control을 명시해주면 된다. # dag_example.pyaccess_control = { "example_dag_id": .. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG 의 Task 상태 Airflow Dag 에는 다양한 Task 상태가 존재함. deferred태스크가 어떤 특정 이벤트 또는 조건이 충족될 때까지 실행을 연기하는 상태를 나타냅니다. failed태스크가 실행 중 오류가 발생하여 실패한 상태입니다. 이 상태가 되면, 태스크의 재시도(retry) 설정에 따라 다시 시도될 수 있습니다. queued태스크가 실행될 준비가 되었지만, 실행할 수 있는 리소스를 기다리고 있는 상태입니다. removed해당 태스크가 삭제된 상태를 나타냅니다. 더 이상 DAG에서 사용되지 않거나 불필요한 태스크일 때 나타날 수 있습니다. restarting태스크가 실패한 후 재시작 중인 상태를 의미합니다. 재시도 설정에 따라 자동으로 재시작될 때 이 상태가 표시됩니다. running태스크가 현재 실행 중.. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG 실행 🟡 DAG 저장소 위치 확인DAG 저장소는 검색하면 나오는데, 실제로 없을 수 있으니 없다면 생성하길 바람$ airflow config get-value core dags_folder/root/airflow/dags 🟡 DAG 저장소 위치 확인DAG 기본 저장소 위치 변경 가능 !/root/airflow/dags 경로 하위에 DOG 파일을 저장한다. 🟡 DAG 파일 생성from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime# DAG 정의with DAG( 'hello_world_dag', default_args={ 'owner': 'airflow', .. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG Objuect 작성 방법 DOG 예제from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args 설정default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['example@example.com'], 'email_on_failure': True, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'start_date': datetime(2023, 9, 1),}# DAG 정의with DAG( dag_id='examp.. 개발중/AirFlow 2025. 4. 21. [Airflow] DAG (Directed Acyclic Graph) 란 ? 🟡 특징Airflow의 핵심 개념으로, 작업의 흐름을 나타내는 그래프 구조입니다. Directed Acyclic Graph 라는 이름에서 알 수 있듯이, DAG는 방향성이 있는 그래프이며, 순환(cycle)이 없는 구조입니다.이 구조는 작업(Task) 간의 의존 관계를 정의하고, 어떻게 순차적으로 실행되는지를 결정합니다.DAG는 작업 간의 흐름과 의존성을 정의하는 데 사용되며, 각 작업은 테스크(Task) 로 표현됩니다.DAG 자체는 작업이 어떻게 서로 연결되어 있는지를 나타내고, Airflow는 이 DAG를 따라 작업을 실행합니다. 🟡 기본 구성 요소⚙ 작업(Task)DAG에서 실행해야 하는 하나의 개별 작업을 의미합니다.Python 함수, SQL 쿼리, Shell 명령어 등 다양한 형태로 정의할 .. 개발중/AirFlow 2025. 4. 21. [Airflow] Apache Airflow 특징 Apache Airflow 특징Apache Airflow는 워크플로우 관리 및 스케줄링 플랫폼으로, 데이터 파이프라인의 실행 및 자동화를 지원합니다. 주로 복잡한 데이터 파이프라인, ETL(Extract, Transform, Load) 작업을 관리하는 데 사용되며, 작업 간의 종속성을 쉽게 정의하고 실행할 수 있는 환경을 제공합니다.⚙ DAG (Directed Acyclic Graph) 기반 워크플로우Airflow에서 워크플로우는 **DAG(Directed Acyclic Graph, 방향성 비순환 그래프)**로 정의됩니다. DAG는 각 작업(Task)이 어떤 순서로 실행되어야 하는지 정의한 구조로, 작업 간의 종속성을 명확히 관리할 수 있습니다.⚙ Python으로 정의Airflow 워크플로우는 Pytho.. 개발중/AirFlow 2025. 4. 21. Airflow DAG에서 모듈 임포트가 안 될 때: 상대경로 vs 절대경로 해결 Airflow DAG를 만들면서 `PythonOperator`에서 내가 만든 파이썬 함수를 import하려고 했다. 근데 생각대로 되지 않았기에 기록을 남기려 한다. ㅠㅠ 예를 들어 이런 구조로 작업 중이었다. dags/ └── repo/ └── client-test/ └── soobin/ ├── tutorial.py ├── update_pending_records.py 처음에는 `tutorial.py`에서 상대경로로 import를 시도했었다.시도 1from .update_pending_records import update_pending_records 시도 2from ape.update_pending_records import update_pending_records 하지만 .. 개발중/AirFlow 2025. 4. 7. [Airflow] 아주 간단한 DAG 실행시키기 목적say_hello1 > say_hello2 > say_hello3 함수를 차례대로 실행시켜보자. say_hello1.pydef say_hello1(): print("Hello from say_hello1!") say_hello2.pydef say_hello2(): print("Hello from say_hello2!") say_hello1.pydef say_hello3(): print("Hello from say_hello3!") hello_airflow_test.pyfrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimefrom say_hello1 imp.. 개발중/AirFlow 2025. 4. 3. [MYSQL] 이벤트 스케쥴러 (Event Scheduler) 사용법 🎉 안녕하세요!MySQL 이벤트 스케줄러를 사용하면, 정기적으로 반복되는 작업(예: 로그 정리, 데이터 백업, 오래된 레코드 업데이트 등)을 데이터베이스 내부에서 자동으로 실행할 수 있다고 해요!이번 포스팅에서는 이벤트 스케줄러를 조회, 생성, 삭제하는 방법과 이벤트 쿼리 작성법에 대해 알아보도록 할게요. 🚀 1. 이벤트 스케줄러 조회 🔍1-1. 이벤트 스케줄러 상태 확인먼저 이벤트 스케줄러가 활성화되어 있는지 확인해봅시다.MySQL 클라이언트에서 아래 명령어를 입력해 보세요.SHOW VARIABLES LIKE 'event_scheduler'; 결과가 ON이면 이벤트 스케줄러가 활성화된 상태예요. 만약 OFF라면, 다음 명령어로 활성화할 수 있어요. SET GLOBAL event_scheduler = .. 개발중/MYSQL 2025. 3. 26. Dockerfile 에서 Test 코드 실행하지 않고 Build 하는 방법 ( Gradlew, Maven ) Docker 이미지를 빌드할 때, 테스트 코드를 실행하지 않고 빠르게 빌드할 필요가 있을 때가 있습니다. 여기서는 Gradle Wrapper(gradlew)와 Maven을 사용하는 경우 각각 테스트를 건너뛰는 방법을 소개합니다.Gradle Wrapper (gradlew)Gradle로 프로젝트를 빌드할 때, 모든 테스트를 실행하지 않고 빌드를 진행하려면 -x test 옵션을 사용합니다. Dockerfile 내에서는 다음과 같이 명령어를 작성할 수 있습니다. RUN ./gradlew build -x test 위 명령어는 프로젝트의 빌드 작업을 수행하면서 테스트 태스크를 제외합니다. 이를 통해 테스트 실행에 소요되는 시간을 절약하고, 빠른 이미지 빌드가 가능합니다. MavenMaven 프로젝트의 경우, 테스트.. 개발중/Docker 2025. 3. 19. 이전 1 2 3 4 ··· 53 다음