오늘은 어제에 이어서 airflow에서 도커와 연동하여 여러 서비스로 운영하는 msa 기반 파이프라인 구성 실습이 있었다.
MSA 기반의 데이터 파이프라인 구성
- MSA
- **Microservice Architecture(마이크로서비스 아키텍처)**의 약자로, 하나의 거대한 애플리케이션을 작고 독립적인 여러 서비스로 나누어 개발하고 운영하는 소프트웨어 설계 방식
- 특징
- 모놀리식(Monolithic) 아키텍처의 단점을 극복하고, 서비스별 독립적인 배포, 빠른 개발, 기술 유연성, 확장성 향상 등의 장점을 제공하여 현대 클라우드 환경에서 중요한 개발 패러다임으로 자리 잡고 있음
- MSA <-> Monolithic
- Monolithic : 모든 코드, 서비스가 한개의 프로젝트 상에서 운용 (모두 내부에 존재함)
구성
- 구성
- Airflow : 직접 일하지 않는다. 지휘자처럼 명령만 내림(오케스트레이터)
- Microservice
- 각자 역할만 수행하는 독립적 컨테이너 구성
- 수집, 전처리, 추론, 학습 ...
- Airflow는 이를 독립적 컨테이너에게 API 호출을 통해서 일을 시킴
- 각자 역할만 수행하는 독립적 컨테이너 구성
- 실습
- AI 신용 점수 분석 시스템 (신용 평가 반영)
- 컨셉
- 실시간은 아님, 하루(일주일 등등)동안 수집된 고객 데이터를 기반으로 추후(정해진 시간)에 일괄적으로 처리해버림(스케줄링) -> 데이터 -> 전처리 -> 추론 -> 고객 데이터 반영(0~1000점 or 편의상 등급으로 표기등)
- 배치 추론(Batch Inference)
- 금융거래 금지 (23:30~00:30)
- 더미 데이터 구성
- DAG
- 배치 작업으로 신용 점수 업그레이드 진행
- DAG
- API 호출 (추론모델에게 고객 데이터 전송 등등 -> 추론 -> 평가 -> 응답)
- 결과 -> DB 업데이트
- DB 업데이트
- MSA
- 기타 추론 모델등은 가상(임의구성)으로 구성
- 현재 : 모델이 있다고 전제하고 요청 -> 추론
- 향후 : 모델 학습까지 확장 가능함
- 기타 추론 모델등은 가상(임의구성)으로 구성
AI 마이크로 서비스 (FastAPI)
- 파일 구성 및 작성
/
L api_server
L main.py
L Dockerfile
- 도커 컴포즈 서비스 등록
- 서비스 추가
# 기존 등록된 서비스중 가장 밑에 추가
ai-api-server:
build: ./api_server # 하위에 Dockerfile을 참고하여 빌드
container_name: airflow-class-api
ports:
- "8000:8000" # 8000번 포트 개방
networks:
- default # airflow와 같은 네트워크로 구성(통신)
- 도커 컴포즈 구성
docker compose uo -d --build
- 구동 테스트
Dockerfile
FROM python:3.12.12-slim
# 작업 디렉토리 지정(루트가됨)
WORKDIR /app
# 필요 패키지 설치
RUN pip install fastapi uvicorn pydantic
# 소스코드 복사
COPY main.py .
# 서버 실행
CMD [ "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000" ]
최소한으로 필요한 내용만 있다.
도커 컴포즈시에 위 기능으로 컨테이너가 만들어진다.
Docker-compose.yaml
ai-api-server:
build: ./api_server # 하위에 Dockerfile을 참고하여 빌드
container_name: airflow-class-api
ports:
- "8000:8000" # 8000번 포트 개방
networks:
- default # airflow와 같은 네트워크로 구성(통신)
위 내용을 추가하여 도커파일을 참고로 컨테이너를 만들게 명령했다.
main.py
# 0. 요구사항 정의, 모듈 설명
'''
MSA구조상 신용평가 업무만 담당하는 API
추론 요청(데이터 포함)-> 추론 -> 결과 돌려줌
요청시 데이터
- 1명 ~ (*)여러명 요청 -> [ 개별정보(Pydantic 정의), 개별정보, ..]
응답 데이터
- 1명 ~ (*)여러명 응답 -> [ 평가정보(Pydantic 정의), 평가정보, ...]
신용 점수 평가 모델은 -> 더미 (간단한 공식(수식)으로 진행)
-> 향후 실제모델로 교체하면됨(모델 엔트리포인트(API) 호출)
'''
# 1. 모듈 가져오기
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import random
# 2. 앱 생성
app = FastAPI()
# 3. 요청/응답시 전달되는(하는) 데이터 형태 + 유효성 자동 검사 =>pydantic
class ReqData(BaseModel): # 요청
# 사용자 아이디, 소득, 대출총량
user_id: str
income: int
loan_amt: int
pass
class ResData(BaseModel): # 응답
# 사용자 아이디, 신용점수, 등급
user_id: str
credit_score: int
grade: str
pass
# 4. 라우팅
## 홈 -> 헬시체크(서비스가 살아 있는지 주기적 요청)용도로 활용 가능
@app.get('/')
def home():
return {'status':"AI Predict Server is Running"}
## 신용예측
@app.post("/predict", response_model=List[ResData])
def predict(users: List[ReqData]):
'''
AI 모델(더미)을 이용한 신용평가 서비스
:param uses: n명의 사용자의 정보(아이디, 소득, 대출양)
:type uses: List[ReqData]
'''
# AI 모델이 없으므로 가상의 계산식으로 평가
# 식1 = (소득 // 1000) * 10
# credit_score = 식2 = min( 난수값(300, 600) + 식1, 990 )
# grade = 식2 > 800 크면 "A"등급, > 600 크면 "B"등급, 나머지는 "C"등급
# 결과물 : [ {user_id:xx, credit_score:xx, grade:xx }, {}, .... ]
# 위의 요구사항대로 구현하시오
results = list()
for user in users:
# 더미 계산
식1 = (user.income // 1000) * 10
credit_score = min( random.randint(300, 600) + 식1, 990 )
grade = "A" if credit_score>800 else "B" if credit_score>600 else "C"
# 결과 담기
results.append({
"user_id":user.user_id,
"credit_score":credit_score,
"grade":grade
})
return results
백엔드를 담당하는 main.py에서는 더미데이터의 값을 계산해서 신용 정보를 평가하는 내용만 넣어준다, 그리고 그 결과를 results에 담아준다.
07_msa_pattern.py
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import json
import requests # MSA 서비스 호출 (HTTP 요청)
# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 2. 도커 상 네트워크 통해 접근할 API 주소
# url: 서비스명 => ai-api-server
API_URL = "http://ai-api-server:8000/predict"
# 3. PythonOperator를 위한 함수
def _extract_data_msa(**kwargs):
# 더미 데미터 수동 구성 -> (업그레이드) sql 쿼리에서 조회
users = [
{ "user_id":"C001", "income": 5000, "loan_amt": 2000 },
{ "user_id":"C002", "income": 3000, "loan_amt": 5000 },
{ "user_id":"C003", "income": 8000, "loan_amt": 1000 }
]
# 차후에는 고객 테이블 구성-> 더미로 입력(신용평가 여부 랜덤구성)
# 반환
return users
def _ai_service_api_msa(**kwargs):
# API 호출 (재료는 XCom 통해 통신)
# 1. XCom 통해 신용평가를 수행하고자 하는 고객 데이터 획득
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='extract_data_msa')
# 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
try:
res = requests.post( API_URL, json=users_data )
#res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
results = res.json() # 결과 획득
logging.info(f'신용 평가 결과 획득 {results}')
# 결과를 XCom 에서 획득 가는하게 반환
return results
except Exception as e:
logging.error(f'API 호출 실패 {e}')
raise
pass
def _load_data_msa(**kwargs):
# SQL 이용하여 DB 저장
# 신용 평가 결과 획득 XCom 사용
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
if not users_data:
logging.error("신용 평가 결과 없음")
raise ValueError('신용 평가 결과 없음')
# SQL 저장
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 1. 테이블이 없다면 생성(create)
cursor.execute('''
CREATE TABLE IF NOT EXISTS credit_scores (
user_id VARCHAR(50),
credit_score INT,
grade VARCHAR(4),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
# 2. 데이터 적재(insert)
sql = '''
insert into credit_scores
(user_id, credit_score, grade)
values
(%s, %s, %s)
'''
params = [
( data['user_id'], data['credit_score'], data['grade'])
for data in users_data
]
cursor.executemany( sql, params)
# 3. 커밋
conn.commit()
pass
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
finally:
if conn:
conn.close()
logging.info('mysql에 데이터 삽입(적제) 완료')
pass
# 4. DAG 구성
with DAG(
dag_id = "07_msa_pattern_v1",
description = "msa 서비스 호출하여 스케줄링",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@daily',
start_date = datetime(2025,1,1),
catchup = False,
tags = ['msa', 'fastapi']
) as dag:
# Task
# 1. 더미 데이터 준비
extract_data = PythonOperator(
task_id = "extract_data_msa",
python_callable = _extract_data_msa
)
# 2. AI 서비스 호출-> 신용평가 결과 획득
ai_service_api = PythonOperator(
task_id = "ai_service_api_msa",
python_callable = _ai_service_api_msa
)
# 3. 결과를 저장
load_data = PythonOperator(
task_id = "load_data_msa",
python_callable = _load_data_msa
)
# 의존성 1->2->3
extract_data >> ai_service_api >> load_data
pass
위 코드는 데이터 추출과, ai 서비스 호출(예시), 데이터 적재 까지의 과정을 자동화하는것이다.
백엔드에 있는 ai에게 더미데이터를 보내서 신용정보를 평가받고, 그 정보를 추가해서 sql상에 적재하는것이다.
그리고 이 기능에서 심화 버전으로 확장시킨 다른 예시도 실습하였다.
07_batch_inference.py
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import requests
# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 추가
import random
# 2. 도커 상 네트워크 통해 접근할 API 주소
# url: 서비스명 => ai-api-server
API_URL = "http://ai-api-server:8000/predict"
# 3. PythonOperator를 위한 함수
def _init_data_msa(**kwargs):
# 실습 상황 연출 위해 신용평가 점수 없는 신규 고객 데이터 강제로 추가
# 실제는 특정 기간동안 가입한 고객 (혹은 일주일(혹은 하루) 단위 가입한 고객) 조회
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 1. 고객 테이블 생성
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
user_id VARCHAR(50) PRIMARY KEY,
income INT,
loan_amt INT,
credit_score INT DEFAULT NULL,
grade VARCHAR(10) DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 2. 실습상 여러번 수행 가능하므로 -> 테이블내에 데이터 삭제 (임시구성)
cursor.execute('truncate table customers')
# 3. 신규 고객 데이터 추가 (50명)
params = [
(
f'C{i:03d}', # 고객 아이디
random.randint(3000,10000), # 소득
random.randint(1000, 5000), # 대출, 론
)
for i in range(1, 51)
]
# 4. 여러건의 데이터를 벌크단위로 삽입
sql = "insert into customers (user_id, income, loan_amt) values ( %s, %s, %s)"
cursor.executemany(sql, params)
conn.commit()
pass
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
finally:
if conn:
conn.close()
logging.info('신규 고객 x 명의 데이터 테이블 생성 및 입력 완료')
pass
def _extract_data_msa(**kwargs):
# 실습 => customers 테이블에서 신용평가 점수가 없는 고객만 추출하여
# "ai-service_api_msa: task 로 전달 (의존성 모두 풀어서 정상 구성)"
# sql 구성 -> 쿼리 -> 결과를 [ {}, {}, ...] => 07_msa...번에서 구성한대로 동일하게 태우면 됨
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
# '신용평가 점수가 없는 고객 데이터만' sql 조회 -> dataframe 획득(통으로, bulk read)
df = mysql_hook.get_pandas_df('''
select user_id, income, loan_amt
from customers where credit_score is NULL
''')
# 결과셋 체크
if df.empty:
logging.info('신규 고객이 없습니다.')
return []
# 로그
logging.info(f'신규 평가 대상 고객 수 {len(df)}') # 실제는 데이터 수가 제각각 (규모에 따라서 처리 방법 바뀔수 있음)
# 변환 : df -> [ {}, [}, ...]
users = df.to_dict(orient='records')
# 반환
return users
def _ai_service_api_msa(**kwargs):
# API 호출 (재료는 XCom 통해 통신)
# 1. XCom 통해 신용평가를 수행하고자 하는 고객 데이터 획득
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='extract_data_msa')
# 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
try:
res = requests.post( API_URL, json=users_data )
#res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
results = res.json() # 결과 획득
logging.info(f'신용 평가 결과 획득 {results}')
# 결과를 XCom 에서 획득 가는하게 반환
return results
except Exception as e:
logging.error(f'API 호출 실패 {e}')
raise
pass
def _load_data_msa(**kwargs):
# SQL 이용하여 DB 저장
# 신용 평가 결과 획득 XCom 사용
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
if not users_data:
logging.info("업데이트할 데이터가 없음")
return
# SQL 저장
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 1. 데이터 update
sql = '''
update customers
set credit_score=%s, grade=%s
where user_id=%s
'''
params = [
# 순서 조정 (쿼리문에 따라 위치 조정됨)
( data['credit_score'], data['grade'], data['user_id'])
for data in users_data
]
cursor.executemany( sql, params)
# 3. 커밋
conn.commit()
pass
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
finally:
if conn:
conn.close()
logging.info('mysql에 데이터 삽입(적제) 완료')
pass
# 4. DAG 구성
with DAG(
dag_id = "07_batch_inference_v1",
description = "msa 서비스 호출하여 스케줄링",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@daily',
start_date = datetime(2025,1,1),
catchup = False,
tags = ['msa', 'fastapi', 'batch_inference']
) as dag:
# Task
# 1. 더미 데이터 준비 ( 실습상 편의적으로 세팅, 실제는 실 서비스에서 저장됨)
init_data = PythonOperator(
task_id = "init_data_msa",
python_callable= _init_data_msa
)
# 2. 신용평가 미처리 고객 데이터 추출
extract_data = PythonOperator(
task_id = "extract_data_msa",
python_callable = _extract_data_msa
)
# 3. AI 서비스 호출-> 신용평가 결과 획득
ai_service_api = PythonOperator(
task_id = "ai_service_api_msa",
python_callable = _ai_service_api_msa
)
# 4. 결과를 저장
load_data = PythonOperator(
task_id = "load_data_msa",
python_callable = _load_data_msa
)
# 의존성 1->2->3->4
init_data >> extract_data >> ai_service_api >> load_data
pass
위 코드의 내용은 워크플로우 자체는 이전과 같다, 하지만 여기서 차이점은
이전의 코드에서는 더미데이터를 구성하여 보냈지만
이번 코드에서는 실제 db에 저장되어있는 데이터를 읽어오고 전처리하는 과정을 추가하여, 실제로 파이프라인을 구성한 것 과 같은 흐름을 가져가는데에 의의가 있다.
먼저 더미데이터를 구성하고, 기존의 추출파트에서 더미데이터가 아니라, 신용평가를 받지 않은 고객들만 '추출'해네는 기능을 넣었다.
이후는 동일하다.
이 흐름이 실제로는 하루에 한번정도 돌아가면서 아직 신용등급이 없는 고객의 데이터를 추출해와서 데이터 전처리와 업데이트를 진행하고, 다시 적재하는 과정을 자동화하는 진짜 airflow pipeline의 순서인것이다.
오늘의 수업은 여기까지
'ASAC-SK플래닛 T아카데미 데이터 엔지니어' 카테고리의 다른 글
| 26.01.14 68일차 [airflow 특강 | 취업 특강] (0) | 2026.01.18 |
|---|---|
| 26.01.13 67일차 [Airflow : Dataset 감지, AWS S3 연동] (0) | 2026.01.13 |
| 26.01.09 65일차 [Airflow : Mysql 연동, multi DAG 구현] (0) | 2026.01.09 |
| 26.01.08 64일차 [Airflow : 기본 개념, dag, operator 로컬 실습] (0) | 2026.01.08 |
| 26.01.06~26.01.07 62~63일차 [ 클라우드 , aws 특강] (1) | 2026.01.08 |