ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.12 66일차 [Airflow : MSA 기반 파이프라인 구성]

Datadesigner 2026. 1. 12. 16:49

오늘은 어제에 이어서 airflow에서 도커와 연동하여 여러 서비스로 운영하는 msa 기반 파이프라인 구성 실습이 있었다.

 


MSA 기반의 데이터 파이프라인 구성

 

  • MSA
    • **Microservice Architecture(마이크로서비스 아키텍처)**의 약자로, 하나의 거대한 애플리케이션을 작고 독립적인 여러 서비스로 나누어 개발하고 운영하는 소프트웨어 설계 방식
  • 특징
    • 모놀리식(Monolithic) 아키텍처의 단점을 극복하고, 서비스별 독립적인 배포, 빠른 개발, 기술 유연성, 확장성 향상 등의 장점을 제공하여 현대 클라우드 환경에서 중요한 개발 패러다임으로 자리 잡고 있음
    • MSA <-> Monolithic
      • Monolithic : 모든 코드, 서비스가 한개의 프로젝트 상에서 운용 (모두 내부에 존재함)
 

구성

 

  • 구성
    • Airflow : 직접 일하지 않는다. 지휘자처럼 명령만 내림(오케스트레이터)
    • Microservice
      • 각자 역할만 수행하는 독립적 컨테이너 구성
        • 수집, 전처리, 추론, 학습 ...
      • Airflow는 이를 독립적 컨테이너에게 API 호출을 통해서 일을 시킴
  • 실습
    • AI 신용 점수 분석 시스템 (신용 평가 반영)
    • 컨셉
      • 실시간은 아님, 하루(일주일 등등)동안 수집된 고객 데이터를 기반으로 추후(정해진 시간)에 일괄적으로 처리해버림(스케줄링) -> 데이터 -> 전처리 -> 추론 -> 고객 데이터 반영(0~1000점 or 편의상 등급으로 표기등)
      • 배치 추론(Batch Inference)
      • 금융거래 금지 (23:30~00:30)
    • 더미 데이터 구성
      • DAG
    • 배치 작업으로 신용 점수 업그레이드 진행
      • DAG
      • API 호출 (추론모델에게 고객 데이터 전송 등등 -> 추론 -> 평가 -> 응답)
      • 결과 -> DB 업데이트
      • DB 업데이트
    • MSA
      • 기타 추론 모델등은 가상(임의구성)으로 구성
        • 현재 : 모델이 있다고 전제하고 요청 -> 추론
        • 향후 : 모델 학습까지 확장 가능함

AI 마이크로 서비스 (FastAPI)

 

  • 파일 구성 및 작성
  /
  L api_server
    L main.py
    L Dockerfile
 

  • 도커 컴포즈 서비스 등록
  • 서비스 추가
  # 기존 등록된 서비스중 가장 밑에 추가
  ai-api-server:
    build: ./api_server # 하위에 Dockerfile을 참고하여 빌드
    container_name: airflow-class-api
    ports:
      - "8000:8000" # 8000번 포트 개방
    networks:
      - default     # airflow와 같은 네트워크로 구성(통신)
  • 도커 컴포즈 구성
  docker compose uo -d --build

 

 

 

Dockerfile

FROM python:3.12.12-slim

# 작업 디렉토리 지정(루트가됨)
WORKDIR /app

# 필요 패키지 설치
RUN pip install fastapi uvicorn pydantic

# 소스코드 복사
COPY main.py .

# 서버 실행
CMD [ "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000" ]

최소한으로 필요한 내용만 있다.

도커 컴포즈시에 위 기능으로 컨테이너가 만들어진다.

 

Docker-compose.yaml

  ai-api-server:
    build: ./api_server # 하위에 Dockerfile을 참고하여 빌드
    container_name: airflow-class-api
    ports: 
      - "8000:8000" # 8000번 포트 개방
    networks:
      - default     # airflow와 같은 네트워크로 구성(통신)

위 내용을 추가하여 도커파일을 참고로 컨테이너를 만들게 명령했다.

 

 

main.py

# 0. 요구사항 정의, 모듈 설명
'''
MSA구조상 신용평가 업무만 담당하는 API

추론 요청(데이터 포함)-> 추론 -> 결과 돌려줌
요청시 데이터
    - 1명 ~ (*)여러명 요청 -> [ 개별정보(Pydantic 정의), 개별정보, ..]
응답 데이터 
    - 1명 ~ (*)여러명 응답 -> [ 평가정보(Pydantic 정의), 평가정보, ...]
신용 점수 평가 모델은 -> 더미 (간단한 공식(수식)으로 진행) 
            -> 향후 실제모델로 교체하면됨(모델 엔트리포인트(API) 호출)
'''
# 1. 모듈 가져오기
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import random

# 2. 앱 생성
app = FastAPI()

# 3. 요청/응답시 전달되는(하는) 데이터 형태 + 유효성 자동 검사 =>pydantic
class ReqData(BaseModel): # 요청
    # 사용자 아이디, 소득, 대출총량
    user_id: str
    income: int
    loan_amt: int
    pass
class ResData(BaseModel): # 응답
    # 사용자 아이디, 신용점수, 등급
    user_id: str
    credit_score: int
    grade: str
    pass

# 4. 라우팅
## 홈 -> 헬시체크(서비스가 살아 있는지 주기적 요청)용도로 활용 가능
@app.get('/')
def home():
    return {'status':"AI Predict Server is Running"}

## 신용예측
@app.post("/predict", response_model=List[ResData])
def predict(users: List[ReqData]):
    '''
    AI 모델(더미)을 이용한 신용평가 서비스    
    :param uses: n명의 사용자의 정보(아이디, 소득, 대출양)
    :type uses: List[ReqData]
    '''
    # AI 모델이 없으므로 가상의 계산식으로 평가
    # 식1 = (소득 // 1000) * 10
    # credit_score = 식2 = min( 난수값(300, 600) + 식1, 990 )
    # grade = 식2 > 800 크면 "A"등급, > 600 크면 "B"등급, 나머지는 "C"등급
    # 결과물 : [ {user_id:xx, credit_score:xx, grade:xx }, {}, .... ]
    # 위의 요구사항대로 구현하시오
    results = list()
    for user in users:
        # 더미 계산
        식1 = (user.income // 1000) * 10
        credit_score = min( random.randint(300, 600) + 식1, 990 )
        grade = "A" if credit_score>800 else "B" if credit_score>600 else "C"
        # 결과 담기
        results.append({
            "user_id":user.user_id, 
            "credit_score":credit_score, 
            "grade":grade
        })
    return results

 

백엔드를 담당하는 main.py에서는 더미데이터의 값을 계산해서 신용 정보를 평가하는 내용만 넣어준다, 그리고 그 결과를 results에 담아준다.

 

 

07_msa_pattern.py

 

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import json
import requests # MSA 서비스 호출 (HTTP 요청)
# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook

# 2. 도커 상 네트워크 통해 접근할 API 주소
#    url: 서비스명 => ai-api-server
API_URL = "http://ai-api-server:8000/predict"

# 3. PythonOperator를 위한 함수
def _extract_data_msa(**kwargs):
    # 더미 데미터 수동 구성 -> (업그레이드) sql 쿼리에서 조회
    users = [
        { "user_id":"C001", "income": 5000, "loan_amt": 2000 },
        { "user_id":"C002", "income": 3000, "loan_amt": 5000 },
        { "user_id":"C003", "income": 8000, "loan_amt": 1000 }
    ]
    # 차후에는 고객 테이블 구성-> 더미로 입력(신용평가 여부 랜덤구성)
    # 반환
    return users
def _ai_service_api_msa(**kwargs):
    # API 호출 (재료는 XCom 통해 통신)
    # 1. XCom  통해 신용평가를 수행하고자 하는 고객 데이터 획득
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='extract_data_msa')
    # 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
    try:
        res = requests.post( API_URL, json=users_data )
        #res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
        results = res.json()    # 결과 획득
        logging.info(f'신용 평가 결과 획득 {results}')
        # 결과를 XCom 에서 획득 가는하게 반환
        return results
    except Exception as e:
        logging.error(f'API 호출 실패 {e}')
        raise
    pass

def _load_data_msa(**kwargs):
    # SQL 이용하여 DB 저장
    # 신용 평가 결과 획득 XCom 사용
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
    if not users_data:
        logging.error("신용 평가 결과 없음")
        raise ValueError('신용 평가 결과 없음')
    # SQL 저장
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn       = mysql_hook.get_conn() # 커넥션 획득
    try:        
        with conn.cursor() as cursor:   # 커서 획득
            # 1. 테이블이 없다면 생성(create)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS credit_scores (                    
                    user_id VARCHAR(50),                    
                    credit_score INT,
                    grade VARCHAR(4),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            ''')
            # 2. 데이터 적재(insert)
            sql = '''
                insert into credit_scores 
                (user_id, credit_score, grade)
                values
                (%s, %s, %s)
            '''
            params = [ 
                ( data['user_id'], data['credit_score'], data['grade'])
                for data in users_data
            ]
            cursor.executemany( sql, params)
            # 3. 커밋
            conn.commit()
            pass
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
    finally:
        if conn:
            conn.close()
            logging.info('mysql에 데이터 삽입(적제) 완료')
    pass

# 4. DAG 구성
with DAG(
    dag_id              = "07_msa_pattern_v1",
    description         = "msa 서비스 호출하여 스케줄링",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@daily',
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['msa', 'fastapi']
) as dag:
    # Task
    # 1. 더미 데이터  준비
    extract_data    = PythonOperator(
        task_id     = "extract_data_msa",
        python_callable = _extract_data_msa
    )
    # 2. AI 서비스 호출-> 신용평가 결과 획득
    ai_service_api    = PythonOperator(
        task_id     = "ai_service_api_msa",
        python_callable = _ai_service_api_msa
    )
    # 3. 결과를 저장
    load_data    = PythonOperator(
        task_id     = "load_data_msa",
        python_callable = _load_data_msa
    )

    # 의존성 1->2->3
    extract_data >> ai_service_api >> load_data
    pass

위 코드는 데이터 추출과, ai 서비스 호출(예시), 데이터 적재 까지의 과정을 자동화하는것이다.

 

백엔드에 있는 ai에게 더미데이터를 보내서 신용정보를 평가받고, 그 정보를 추가해서 sql상에 적재하는것이다.

 

그리고 이 기능에서 심화 버전으로 확장시킨 다른 예시도 실습하였다.

 

07_batch_inference.py

 

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import requests
# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 추가
import random

# 2. 도커 상 네트워크 통해 접근할 API 주소
#    url: 서비스명 => ai-api-server
API_URL = "http://ai-api-server:8000/predict"

# 3. PythonOperator를 위한 함수
def _init_data_msa(**kwargs):
    # 실습 상황 연출 위해 신용평가 점수 없는 신규 고객 데이터 강제로 추가
    # 실제는 특정 기간동안 가입한 고객 (혹은 일주일(혹은 하루) 단위 가입한 고객) 조회
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn       = mysql_hook.get_conn() # 커넥션 획득
    try:        
        with conn.cursor() as cursor:   # 커서 획득
            # 1. 고객 테이블 생성
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS customers (
                    user_id VARCHAR(50) PRIMARY KEY,
                    income INT,
                    loan_amt INT,
                    credit_score INT DEFAULT NULL,
                    grade VARCHAR(10) DEFAULT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                ''')    
            # 2. 실습상 여러번 수행 가능하므로 -> 테이블내에 데이터 삭제 (임시구성)
            cursor.execute('truncate table customers')
            # 3. 신규 고객 데이터 추가 (50명)
            params = [
                (
                    f'C{i:03d}',                # 고객 아이디
                    random.randint(3000,10000), # 소득
                    random.randint(1000, 5000), # 대출, 론
                    )
                for i in range(1, 51)
            ]
            # 4. 여러건의 데이터를 벌크단위로 삽입
            sql = "insert into customers (user_id, income, loan_amt) values ( %s, %s, %s)"
            cursor.executemany(sql, params)
            conn.commit() 
            pass
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
    finally:
        if conn:
            conn.close()
            logging.info('신규 고객 x 명의 데이터 테이블 생성 및 입력 완료')
    pass

def _extract_data_msa(**kwargs):
    # 실습 => customers 테이블에서 신용평가 점수가 없는 고객만 추출하여
    # "ai-service_api_msa: task 로 전달 (의존성 모두 풀어서 정상 구성)"
    # sql 구성 -> 쿼리 -> 결과를 [ {}, {}, ...] => 07_msa...번에서 구성한대로 동일하게 태우면 됨
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')

    # '신용평가 점수가 없는 고객 데이터만' sql 조회 -> dataframe 획득(통으로, bulk read)
    df = mysql_hook.get_pandas_df('''
        select user_id, income, loan_amt
        from customers where credit_score is NULL
        ''')
    # 결과셋 체크
    if df.empty:
        logging.info('신규 고객이 없습니다.')
        return [] 
    # 로그
    logging.info(f'신규 평가 대상 고객 수 {len(df)}') # 실제는 데이터 수가 제각각 (규모에 따라서 처리 방법 바뀔수 있음)
    # 변환 : df -> [ {}, [}, ...]
    users = df.to_dict(orient='records')
    # 반환
    return users


def _ai_service_api_msa(**kwargs):
    # API 호출 (재료는 XCom 통해 통신)
    # 1. XCom  통해 신용평가를 수행하고자 하는 고객 데이터 획득
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='extract_data_msa')
    # 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
    try:
        res = requests.post( API_URL, json=users_data )
        #res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
        results = res.json()    # 결과 획득
        logging.info(f'신용 평가 결과 획득 {results}')
        # 결과를 XCom 에서 획득 가는하게 반환
        return results
    except Exception as e:
        logging.error(f'API 호출 실패 {e}')
        raise
    pass

def _load_data_msa(**kwargs):
    # SQL 이용하여 DB 저장
    # 신용 평가 결과 획득 XCom 사용
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
    if not users_data:
        logging.info("업데이트할 데이터가 없음")
        return
    # SQL 저장
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn       = mysql_hook.get_conn() # 커넥션 획득
    try:        
        with conn.cursor() as cursor:   # 커서 획득
            # 1. 데이터 update
            sql = '''
                update customers 
                set credit_score=%s, grade=%s
                where user_id=%s
            '''
            params = [ 
                # 순서 조정 (쿼리문에 따라 위치 조정됨)
                ( data['credit_score'], data['grade'], data['user_id'])
                for data in users_data
            ]
            cursor.executemany( sql, params)
            # 3. 커밋
            conn.commit()
            pass
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
    finally:
        if conn:
            conn.close()
            logging.info('mysql에 데이터 삽입(적제) 완료')
    pass

# 4. DAG 구성
with DAG(
    dag_id              = "07_batch_inference_v1",
    description         = "msa 서비스 호출하여 스케줄링",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@daily',
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['msa', 'fastapi', 'batch_inference']
) as dag:
    # Task
    # 1. 더미 데이터  준비 ( 실습상 편의적으로 세팅, 실제는 실 서비스에서 저장됨)
    init_data    = PythonOperator(
        task_id = "init_data_msa",
        python_callable= _init_data_msa
    )
    # 2. 신용평가 미처리 고객 데이터 추출
    extract_data    = PythonOperator(
        task_id     = "extract_data_msa",
        python_callable = _extract_data_msa
    )
    # 3. AI 서비스 호출-> 신용평가 결과 획득
    ai_service_api    = PythonOperator(
        task_id     = "ai_service_api_msa",
        python_callable = _ai_service_api_msa
    )
    # 4. 결과를 저장
    load_data    = PythonOperator(
        task_id     = "load_data_msa",
        python_callable = _load_data_msa
    )

    # 의존성 1->2->3->4
    init_data >> extract_data >> ai_service_api >> load_data
    pass

위 코드의 내용은 워크플로우 자체는 이전과 같다, 하지만 여기서 차이점은

이전의 코드에서는 더미데이터를 구성하여 보냈지만

 

이번 코드에서는 실제 db에 저장되어있는 데이터를 읽어오고 전처리하는 과정을 추가하여, 실제로 파이프라인을 구성한 것 과 같은 흐름을 가져가는데에 의의가 있다.

 

먼저 더미데이터를 구성하고, 기존의 추출파트에서 더미데이터가 아니라, 신용평가를 받지 않은 고객들만 '추출'해네는 기능을 넣었다.

이후는 동일하다.

 

이 흐름이 실제로는 하루에 한번정도 돌아가면서 아직 신용등급이 없는 고객의 데이터를 추출해와서 데이터 전처리와 업데이트를 진행하고, 다시 적재하는 과정을 자동화하는 진짜 airflow pipeline의 순서인것이다.

 

오늘의 수업은 여기까지