[Airflow] 🎯 Airflow에서 사용자별 DAG 접근 제어 + REST API로 계정 생성까지!
요즘 Airflow를 쓰면서 하나하나 세팅을 직접 해보고 있는데,
그 중에서도 "사용자마다 볼 수 있는 DAG을 제한하고 싶다"는 니즈가 생겼다.
그리고 아예 REST API로 계정도 자동으로 만들 수 있게 해보자! 라는 마음으로 삽질 시작!
🔐 사용자별 DAG 권한 제어는 어떻게 하지?
Airflow는 Roles라는 개념으로 사용자 권한을 나눈다.
기본적으로는 다음과 같다:
- Admin: 모든 DAG, 모든 권한 가능
- User: 할당된 DAG만 볼 수 있음
- (추가로 Viewer, Op, 커스텀 Role도 만들 수 있음)
DAG에 접근 제어를 하려면 DAG 파일에 아래처럼 access_control을 명시해주면 된다.
# dag_example.py
access_control = {
"example_dag_id": {"can_read": ["User"], "can_edit": ["Admin"]}
}
이 설정이 있으면 User 역할의 사용자는 읽기만 가능,
Admin은 편집까지 가능하다. 간단하면서도 강력한 구조다.
⚙️ 먼저 환경 설정: RBAC 활성화
Airflow 기본 설정 파일인 airflow.cfg에 가서 아래 옵션을 추가해준다.
rbac = True
이걸 켜야 UI에서 사용자/권한을 따로 나눌 수 있다.
Airflow 2.x부터는 RBAC 기반이 기본이라 안 켜도 되긴 하지만, 확실히 적어놓는 게 마음이 편하다.
🛠️ REST API로 사용자 만들기
Airflow는 사용자 계정을 REST API로도 관리할 수 있다.
아래는 내가 만든 계정을 curl로 등록한 예제다.
curl -X POST "http://localhost:8080/api/v1/users" \
-H "Content-Type: application/json" \
-H "Authorization: Basic <encoded_credentials>" \
-d '{
"username": "new_user",
"first_name": "New",
"last_name": "User",
"email": "new_user@example.com",
"role": "User",
"password": "user_password"
}'
여기서 <encoded_credentials>는 Base64로 인코딩한 admin:비밀번호 형식이다.
ex) echo -n 'admin:1234' | base64
🔄 사용자 조회/수정도 물론 가능
사용자 목록 조회
curl -X GET "http://localhost:8080/api/v1/users" \
-H "Authorization: Basic <encoded_credentials>"
역할(Role) 목록 조회
curl -X GET "http://localhost:8080/api/v1/roles" \
-H "Authorization: Basic <encoded_credentials>"
사용자 수정 (예: 역할 변경)
curl -X PATCH "http://localhost:8080/api/v1/users/{user_id}" \
-H "Content-Type: application/json" \
-H "Authorization: Basic <encoded_credentials>" \
-d '{
"first_name": "Updated",
"last_name": "User",
"role": "Admin"
}'
☕ Java로 호출하고 싶을 때는?
회사 시스템이 Java 기반이라면 아래처럼 HttpURLConnection을 써서 호출할 수도 있다.
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class AirflowUserCreator {
public static void main(String[] args) {
String airflowUrl = "http://localhost:8080/api/v1/users";
String adminUsername = "admin";
String adminPassword = "admin_password";
// New user details
String newUsername = "sbjung@xxxx.com";
String newPassword = "1234";
String firstName = "Soobin";
String lastName = "Jung";
String email = "sbjung@xxxx.com";
String roleName = "User"; // Role name
try {
// Airflow API URL 설정
URL url = new URL(airflowUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// POST 요청 설정
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
// Basic Authentication을 위한 Header 설정
String encodedCredentials = Base64.getEncoder().encodeToString(
(adminUsername + ":" + adminPassword).getBytes(StandardCharsets.UTF_8)
);
conn.setRequestProperty("Authorization", "Basic " + encodedCredentials);
// 요청 본문에 포함할 JSON 데이터 생성
String jsonInputString = String.format(
"{" +
"\"username\": \"%s\"," +
"\"password\": \"%s\"," +
"\"first_name\": \"%s\"," +
"\"last_name\": \"%s\"," +
"\"email\": \"%s\"," +
"\"roles\": [{\"name\": \"%s\"}]" +
"}",
newUsername, newPassword, firstName, lastName, email, roleName
);
// 요청 본문을 OutputStream에 작성
conn.setDoOutput(true);
try (OutputStream os = conn.getOutputStream()) {
byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
// 응답 코드 확인
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_CREATED) {
System.out.println("User created successfully!");
} else {
System.out.println("Failed to create user. Response code: " + responseCode);
try (java.util.Scanner s = new java.util.Scanner(conn.getErrorStream()).useDelimiter("\\A")) {
String responseBody = s.hasNext() ? s.next() : "";
System.out.println("Response content: " + responseBody);
}
}
// 연결 종료
conn.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(→ 코드 전문은 따로 첨부해도 좋고, 깃허브 링크를 걸어도 좋음)
🐍 Python으로 자동화할 땐 이렇게!
Python은 requests와 HTTPBasicAuth만 있으면 끝.
import requests
from requests.auth import HTTPBasicAuth
import json
def create_airflow_user():
# Airflow API URL 및 관리자 계정 정보
airflow_url = "http://localhost:8080/api/v1/users"
admin_username = "sbjung@XXXX.com"
admin_password = "1234"
# 새로운 사용자 정보
new_username = "sbjung@XXXX.com"
new_password = "1234"
first_name = "Soobin"
last_name = "Jung"
email = "sbjung@XXXX.com"
roles = [{"name": "User"}] # 역할 설정 (예: "Admin", "User")
# 요청에 사용할 JSON 데이터 생성
payload = {
"username": new_username,
"password": new_password,
"first_name": first_name,
"last_name": last_name,
"email": email,
"roles": roles
}
try:
# 새로운 사용자 생성을 위한 POST 요청 전송
response = requests.post(
airflow_url,
auth=HTTPBasicAuth(admin_username, admin_password), # Basic 인증을 위한 관리자 계정 정보
headers={"Content-Type": "application/json"}, # 요청 헤더에 Content-Type 설정
data=json.dumps(payload) # JSON 데이터를 문자열로 변환하여 전송
)
# 응답 코드 확인
if response.status_code == 201:
print("User created successfully!") # 사용자 생성 성공 시 출력
else:
print(f"Failed to create user. Response code: {response.status_code}") # 실패 시 응답 코드 출력
print("Response content:", response.content) # 응답 내용 출력
except Exception as e:
print(f"An error occurred: {e}") # 예외 발생 시 오류 내용 출력
# 함수를 호출하여 사용자 생성 실행
create_airflow_user()
자동 계정 생성이 필요한 팀 환경이라면 간단하게 스크립트로 짜두면 편하다.
✍️ 정리하며
이번 작업을 하면서 느낀 건, Airflow는 RBAC 구조만 이해하면 권한 제어도 깔끔하고, 계정 관리도 API로 확장 가능하다는 점이 마음에 들었다. 개인적으로는 UI로 일일이 클릭하는 것보다, 자동화 가능한 API가 있다는 게 제일 큰 장점이었다.