개발중/AirFlow

[Airflow] 🎯 Airflow에서 사용자별 DAG 접근 제어 + REST API로 계정 생성까지!

Binsoo 2025. 4. 21. 13:49
728x90
반응형

 

요즘 Airflow를 쓰면서 하나하나 세팅을 직접 해보고 있는데,
그 중에서도 "사용자마다 볼 수 있는 DAG을 제한하고 싶다"는 니즈가 생겼다.
그리고 아예 REST API로 계정도 자동으로 만들 수 있게 해보자! 라는 마음으로 삽질 시작!

 

🔐 사용자별 DAG 권한 제어는 어떻게 하지?

Airflow는 Roles라는 개념으로 사용자 권한을 나눈다.
기본적으로는 다음과 같다:

  • Admin: 모든 DAG, 모든 권한 가능
  • User: 할당된 DAG만 볼 수 있음
  • (추가로 Viewer, Op, 커스텀 Role도 만들 수 있음)

DAG에 접근 제어를 하려면 DAG 파일에 아래처럼 access_control을 명시해주면 된다.

 

# dag_example.py
access_control = {
    "example_dag_id": {"can_read": ["User"], "can_edit": ["Admin"]}
}

 

이 설정이 있으면 User 역할의 사용자는 읽기만 가능,
Admin은 편집까지 가능하다. 간단하면서도 강력한 구조다.

 

⚙️ 먼저 환경 설정: RBAC 활성화

Airflow 기본 설정 파일인 airflow.cfg에 가서 아래 옵션을 추가해준다.

rbac = True
 

이걸 켜야 UI에서 사용자/권한을 따로 나눌 수 있다.
Airflow 2.x부터는 RBAC 기반이 기본이라 안 켜도 되긴 하지만, 확실히 적어놓는 게 마음이 편하다.

 

🛠️ REST API로 사용자 만들기

Airflow는 사용자 계정을 REST API로도 관리할 수 있다.
아래는 내가 만든 계정을 curl로 등록한 예제다.

 

curl -X POST "http://localhost:8080/api/v1/users" \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic <encoded_credentials>" \
  -d '{
        "username": "new_user",
        "first_name": "New",
        "last_name": "User",
        "email": "new_user@example.com",
        "role": "User",
        "password": "user_password"
      }'

 

 

여기서 <encoded_credentials>는 Base64로 인코딩한 admin:비밀번호 형식이다.
ex) echo -n 'admin:1234' | base64

 

🔄 사용자 조회/수정도 물론 가능

사용자 목록 조회

curl -X GET "http://localhost:8080/api/v1/users" \
  -H "Authorization: Basic <encoded_credentials>"

 

역할(Role) 목록 조회

curl -X GET "http://localhost:8080/api/v1/roles" \
  -H "Authorization: Basic <encoded_credentials>"

 

 

사용자 수정 (예: 역할 변경)

curl -X PATCH "http://localhost:8080/api/v1/users/{user_id}" \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic <encoded_credentials>" \
  -d '{
        "first_name": "Updated",
        "last_name": "User",
        "role": "Admin"
      }'

 

☕ Java로 호출하고 싶을 때는?

회사 시스템이 Java 기반이라면 아래처럼 HttpURLConnection을 써서 호출할 수도 있다.

 

import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

public class AirflowUserCreator {
    public static void main(String[] args) {
        String airflowUrl = "http://localhost:8080/api/v1/users";
        String adminUsername = "admin";
        String adminPassword = "admin_password";
        
        // New user details
        String newUsername = "sbjung@xxxx.com";
        String newPassword = "1234";
        String firstName = "Soobin";
        String lastName = "Jung";
        String email = "sbjung@xxxx.com";
        String roleName = "User"; // Role name

        try {
            // Airflow API URL 설정
            URL url = new URL(airflowUrl);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            
            // POST 요청 설정
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type", "application/json");
            
            // Basic Authentication을 위한 Header 설정
            String encodedCredentials = Base64.getEncoder().encodeToString(
                (adminUsername + ":" + adminPassword).getBytes(StandardCharsets.UTF_8)
            );
            conn.setRequestProperty("Authorization", "Basic " + encodedCredentials);
            
            // 요청 본문에 포함할 JSON 데이터 생성
            String jsonInputString = String.format(
                "{" +
                    "\"username\": \"%s\"," +
                    "\"password\": \"%s\"," +
                    "\"first_name\": \"%s\"," +
                    "\"last_name\": \"%s\"," +
                    "\"email\": \"%s\"," +
                    "\"roles\": [{\"name\": \"%s\"}]" +
                "}",
                newUsername, newPassword, firstName, lastName, email, roleName
            );

            // 요청 본문을 OutputStream에 작성
            conn.setDoOutput(true);
            try (OutputStream os = conn.getOutputStream()) {
                byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
                os.write(input, 0, input.length);
            }
            
            // 응답 코드 확인
            int responseCode = conn.getResponseCode();
            if (responseCode == HttpURLConnection.HTTP_CREATED) {
                System.out.println("User created successfully!");
            } else {
                System.out.println("Failed to create user. Response code: " + responseCode);
                try (java.util.Scanner s = new java.util.Scanner(conn.getErrorStream()).useDelimiter("\\A")) {
                    String responseBody = s.hasNext() ? s.next() : "";
                    System.out.println("Response content: " + responseBody);
                }
            }
            
            // 연결 종료
            conn.disconnect();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

(→ 코드 전문은 따로 첨부해도 좋고, 깃허브 링크를 걸어도 좋음)

 

🐍 Python으로 자동화할 땐 이렇게!

Python은 requests와 HTTPBasicAuth만 있으면 끝.

import requests
from requests.auth import HTTPBasicAuth
import json

def create_airflow_user():
    # Airflow API URL 및 관리자 계정 정보
    airflow_url = "http://localhost:8080/api/v1/users"
    admin_username = "sbjung@XXXX.com"
    admin_password = "1234"
    
    # 새로운 사용자 정보
    new_username = "sbjung@XXXX.com"
    new_password = "1234"
    first_name = "Soobin"
    last_name = "Jung"
    email = "sbjung@XXXX.com"
    roles = [{"name": "User"}]  # 역할 설정 (예: "Admin", "User")

    # 요청에 사용할 JSON 데이터 생성
    payload = {
        "username": new_username,
        "password": new_password,
        "first_name": first_name,
        "last_name": last_name,
        "email": email,
        "roles": roles
    }

    try:
        # 새로운 사용자 생성을 위한 POST 요청 전송
        response = requests.post(
            airflow_url,
            auth=HTTPBasicAuth(admin_username, admin_password),  # Basic 인증을 위한 관리자 계정 정보
            headers={"Content-Type": "application/json"},  # 요청 헤더에 Content-Type 설정
            data=json.dumps(payload)  # JSON 데이터를 문자열로 변환하여 전송
        )
        
        # 응답 코드 확인
        if response.status_code == 201:
            print("User created successfully!")  # 사용자 생성 성공 시 출력
        else:
            print(f"Failed to create user. Response code: {response.status_code}")  # 실패 시 응답 코드 출력
            print("Response content:", response.content)  # 응답 내용 출력
    
    except Exception as e:
        print(f"An error occurred: {e}")  # 예외 발생 시 오류 내용 출력

# 함수를 호출하여 사용자 생성 실행
create_airflow_user()
 

자동 계정 생성이 필요한 팀 환경이라면 간단하게 스크립트로 짜두면 편하다.

 

✍️ 정리하며

이번 작업을 하면서 느낀 건, Airflow는 RBAC 구조만 이해하면 권한 제어도 깔끔하고, 계정 관리도 API로 확장 가능하다는 점이 마음에 들었다. 개인적으로는 UI로 일일이 클릭하는 것보다, 자동화 가능한 API가 있다는 게 제일 큰 장점이었다.

728x90
반응형