ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.13 67일차 [Airflow : Dataset 감지, AWS S3 연동]

Datadesigner 2026. 1. 13. 17:48

드디어 밀린 게시물 따라왔다,

 

오늘은 AIRFLOW에서 데이터셋을 감지하고 그 데이터를 트리거로 하여 다음 작업을 수행하는 파이프라인 구축과

 

AWS S3에 데이터를 적재하는 자동화 실습을 진행하였다.

 


dataset 감지

 

ETL -> MLOps -> AI Service

 

  • 3개의 DAG를 구성하여 연결
    • 앞에서 만든 모델(가상,시뮬레이션, 간단하게 수식 구성)이 뒤에서 만들어지는 과정 구현해본것
    • FLOW
      • 데이터가 계속 쌓이면 ETL을 통해서 모델 학습에 필요한 데이터를 축적
      • MLOps에서는 특정한 스케줄 혹은 데이터량이 축적되면 학습을 통해서 모델 업데이트를 진행
      • AI Service는 새로 적용된 모델을 기반으로 추론 서비스를 제공 -> API 제공
    • 구성
      • 데이터 준비(ETL) -> 모델 학습(ML/DL Train) -> 서비스 적용(Inference, API)
    • 멀티 DAG에서 다른 DAG 호출
      • TriggerDagRunOperator 사용
      • Airflow 2.4부터는 Datasets 기능 사용
        • 앞단계 DAG가 끝났는지 검사하는것이 아니라, Datasets이 생성되었는가를 감시하는 이벤트 기반으로 처리
        • 데이터 인지 스케줄링
          • data-Aware Scheduling
          • 데이터가 준비되었으면 학습해 (O)
            • 10시에 학습해 (X)
 

Data pipeline + MLops를 위한 파이프라인 아킥텍쳐

 

  • 3 DAG
    • 상호 데이터 파일을 통해 대화
  • 구성
    • DAG1
      • 08_etl.py
      • ETL을 통해서 trainning_data.csv 생성
      • Producer 역할
    • DAG2
      • 08_mlops.py
      • trainning_data.csv가 생기면 자동으로 모델 학습 진행
      • 학습의 결과로 모델 덤프 (시물레이션, model_artifact.json) 파일 생성
      • Consumer & Producer 역할
    • DAG3
      • 08_inference.py
      • model_artifact.json 생성(혹은 갱신)되면 -> AI Server(MSA) 요청을 통해 고객 결과 DB 갱신
      • Consumer 역할
    • 공통
      • 공통의 Dataset 정의
        • 파일, 경로 등등 대상
      • Airflow가 인식해야함
      • 향후 -> AWS S3상에 저장 -> 업그레이드 (엑세스키 발급 및 등록-> Airflow)

이번에는 데이터 학습, 머신러닝의 개념까지 추가한 AIRFLOW를 구현해볼 예정이다.

 

그리고 각 코드별로 이전의 TASK가 끝날 시에 진행되는 내용을 넣어서 처리또한 진행한다.

 

08_etl.py

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import random
import os
from airflow.datasets import Dataset # 2.4 이후 지원 데이터셋 모듈
import pandas as pd

# 2. 데이터셋 저장 경로 (도커 내 리눅스 내부경로 지정)
DATA_PATH='/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True) # 멱등성 고려 (여러번 수행되어도 동일한 결과 나오도록)

# 3. 데이터셋 (Airflow가 감시하는(바라보는) -> 트리거 발동 시킴) 정의 => URI 형태
#    최종 task의 결과로 trainning_data.csv 만들어지면 다음 DAG 작동됨
trigger_dataset = Dataset(f'file://{DATA_PATH}/trainning_data.csv')


# 4-1-1. 오퍼레이터에 콜백함수 정의
def _etl_task_generator(**kwargs):
    # 학습용 더미데이터 생성 (Feature / label(target) 생성)
    data = list()
    for i in range(100):
        income = random.randint(2000, 10000)
        loan_amt = random.randint(100, 5000)
        data.append({
            "income"  : income,
            "loan_amt": loan_amt,
            # 1 : 대출 승인, 0 : 대출 불허(거절)
            "target"  : 1 if income > loan_amt else 0
        })
    # [ {}, {}, {}, ... ] -> Dataframe -> csv, 코드 완성, 로그 출력(학습 데이터 완료: 경로)
    df = pd.DataFrame(data)
    save_path= f'{DATA_PATH}/trainning_data.csv'
    df.to_csv(save_path, index=False)
    logging.info(f'학습 데이터 완료: {save_path}')
    # training_data_년월일().csv -> 이렇게 구성할 경우 별도 삭제 안하면 데이터가 쌓일수 잇음
    pass

# 4. DAG 정의
with DAG(
    dag_id              = "08_etl_v1",
    description         = "etl 서비스",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@daily',
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['etl', 'part1']
) as dag:

    # 4-1. 오퍼레이터 정의
    etl_task = PythonOperator(
        task_id='etl_task_generator',
        python_callable=_etl_task_generator,
        # 핵심은 dataset 연결
        # 해당 task가 종료되면 -> trigger_dataset이 업데이트 되었다고 공지함
        # 다음 DAG의 스케줄이 시간이 아닌 trigger_dataset을 지정(바라보면) -> 해당 공지(트리거)
        # 바로 연쇄적으로 다음 DAG이 작동됨(비동기적인 느낌)
        outlets=[trigger_dataset]
    )    
    # 4-2. 의존성(injection)

 

이번 플로우는 etl.py 에서 더미데이터를 만들고, 그 데이터를 특정 경로에 저장해두면, 다음 파일이 그 데이터가 생긴걸 감지하고 다름 task를 수행하는것이다. 

이를 위해서 outlets가 추가되었다.

 

08_mlops.py

# ETL을 통해서 trainning_data.csv -> 학습(시물레이션) -> 모델 덤프(model_artifact.json)
# DAG에서 아래와 같은 과정을 진행했다 가정하고 시뮬레이션 진행

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import random
import os
import json
from airflow.datasets import Dataset 
import pandas as pd

# 2. 공통(모든 DAG)
DATA_PATH='/opt/airflow/dags/data'

# 3. 데이터셋 (감시해야 하는, 생성해야 하는_)
trigger_dataset = Dataset(f'file://{DATA_PATH}/trainning_data.csv') # 감시
model_dataset   = Dataset(f'file://{DATA_PATH}/model_v1.json') # 생성

# 4-1-1. 오퍼레이터에 콜백함수 정의
def _train_task_dump(**kwargs):
    # 일반 모델 학습 과정 (큰 포인트만 표시)
    # df = pd.read_csv(...)
    # 데이터 분할
    # ml학습 => 실제로는 모델이 계속 업그레이드(기존 가중치 유지 -> 신규 데이터 학습) 되어야함
    #       => 전이학습(transfer learning) : 원래 목적과(업스트림) 실제 사용 목적(다운스트림)이 다르게 구성
    #       => 여기서는 동일 데이터로 계속 가중치를 업데이트 하는 전략 활용 
    #       => 데이터는 매일, 매시간 계속 쌓인다는 전제하에서 진행
    logging.info("ML/DL/LLM 등 모델 학습중.. epoch 1,epoch 2, ... epoch n") # 연출

    # 모델 저장
    model_info = {
        'version': '1.0',
        'accuracy': 0.96,
        'train_at': datetime.now().isoformat()  
    }

    # 3. 저장
    save_path = f'{DATA_PATH}/model_v1.json'
    with open(save_path, 'w') as f:
        json.dump(model_info, f)
    logging.info(f"모델 학습 완료 -> 저장 {save_path}") 
    pass

# 4. DAG 정의
with DAG(
    dag_id              = "08_mlops_v1",
    description         = "mlops 중 모델 학습",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    # 핵심 : trigger_dataset이 생성되어야 해당 DAG이 작동됨 (조건은 시간이 아니라 데이터셋임)
    schedule            = [trigger_dataset],
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['mlops', 'train', 'part2']
) as dag:

    # 4-1. 오퍼레이터 정의
    train_task = PythonOperator(
        task_id='train_task_dump',
        python_callable=_train_task_dump,
        # 핵심 : task의 결론, 새로운 모델이 나왔다고 공지
        outlets=[model_dataset]
    )    
    # 4-2. 의존성(injection)

이번 코드에서는 etl코드에서 만든 csv를 감지하고 model.json 파일을 생성해낸다,

이 과정에서 실제로는 모델 학습을 수행하고 모델 파일을 내보내는것이다.

이제 데이터를 추출하고, 머신러닝을 그 데이터를 기반으로 학습시키는 과정까지 진행하였다.

다음으로는 평가를 해야겠지?

 

 

08_inference.py

# model_artifact.json -> msa 기반 api 서비스 사용
# 향후 MLOps의 대표적인 도구 MLFlow를 적용 -> 모델 생성 후 엔드포인트 구성 가능해짐
# api는 동일한데 모델이 교체됨(v1 -> v2 -> v3 -> ... -> vn) -> 코드 수정이 없음
# 여기서는 새로운 모델이 발견되었다 -> 모델이 교체되었다 (가정 -> MLFLow 관련 DAG 구성 필요) 가정

# 새로 업데이트된 기준 (모델)에 맞춰 대상 고객의 신용 평가를 새로(혹은 업데이트(확장성 고려)) 진행
# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import json
import requests # MSA 서비스 호출 (HTTP 요청)
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.datasets import Dataset 

# 2. 상수 (처럼 관리, 고정값)정의
#    inference 수행할 msa 기반 url
API_URL = "http://ai-api-server:8000/predict"
#    데이터셋
DATA_PATH='/opt/airflow/dags/data'


# 데이터셋 (감시 해야하는)
model_dataset   = Dataset(f'file://{DATA_PATH}/model_v1.json') 

# 3. PythonOperator를 위한 함수
def _extract_data_msa(**kwargs):
    # 더미 데미터 수동 구성 -> (업그레이드) sql 쿼리에서 조회
    users = [
        { "user_id":"C101", "income": 6000, "loan_amt": 2000 },
        { "user_id":"C102", "income": 2000, "loan_amt": 6000 },
        { "user_id":"C103", "income": 9000, "loan_amt": 3000 }
    ]
    # 차후에는 고객 테이블 구성-> 더미로 입력(신용평가 여부 랜덤구성)
    # 반환
    return users

def _ai_service_api_msa(**kwargs):
    # API 호출 (재료는 XCom 통해 통신)
    # 1. XCom  통해 신용평가를 수행하고자 하는 고객 데이터 획득
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='extract_data_msa')
    # 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
    try:
        res = requests.post( API_URL, json=users_data )
        #res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
        results = res.json()    # 결과 획득
        logging.info(f'신용 평가 결과 획득 {results}')
        # 결과를 XCom 에서 획득 가능하게 반환
        return results
    except Exception as e:
        logging.error(f'API 호출 실패 {e}')
        raise
    pass

def _load_data_msa(**kwargs):
    # SQL 이용하여 DB 저장
    # 신용 평가 결과 획득 XCom 사용
    ti         = kwargs['ti']
    users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
    if not users_data:
        logging.error("신용 평가 결과 없음")
        raise ValueError('신용 평가 결과 없음')
    # SQL 저장
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn       = mysql_hook.get_conn() # 커넥션 획득
    try:        
        with conn.cursor() as cursor:   # 커서 획득
            # 1. 테이블이 없다면 생성(create)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS credit_scores (                    
                    user_id VARCHAR(50),                    
                    credit_score INT,
                    grade VARCHAR(4),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            ''')
            # 2. 데이터 적재(insert)
            sql = '''
                insert into credit_scores 
                (user_id, credit_score, grade)
                values
                (%s, %s, %s)
            '''
            params = [ 
                ( data['user_id'], data['credit_score'], data['grade'])
                for data in users_data
            ]
            cursor.executemany( sql, params)
            # 3. 커밋
            conn.commit()
            pass
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
    finally:
        if conn:
            conn.close()
            logging.info('mysql에 데이터 삽입(적제) 완료')
    pass

# 4. DAG 구성
with DAG(
    dag_id              = "08_inference_v1",
    description         = "msa 서비스 호출 모델 추론 요청",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule            = [model_dataset],
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['msa', 'part3']
) as dag:
    # Task
    # 1. 더미 데이터  준비
    extract_data    = PythonOperator(
        task_id     = "extract_data_msa",
        python_callable = _extract_data_msa
    )
    # 2. AI 서비스 호출-> 신용평가 결과 획득
    ai_service_api    = PythonOperator(
        task_id     = "ai_service_api_msa",
        python_callable = _ai_service_api_msa
    )
    # 3. 결과를 저장
    load_data    = PythonOperator(
        task_id     = "load_data_msa",
        python_callable = _load_data_msa
    )

    # 의존성 1->2->3
    extract_data >> ai_service_api >> load_data
    pass

 

이번 코드에서는 더미데이터(실제로는 sql db랑 연동이 되어있을거다,)의 신용평가를 이전에 보내준 모델을 사용해서 수행하는 과정을 자동화하는것이다, api를 통해서 모델을 불러와서 결과를 획득하고, 그 데이터를 xcom을 통해서 보내서 db에 저장하고, 적재의 역할을 수행한다. 

이 일련의 과정들을 msa라고 칭하는것이다. 약간 랭그래프랑 좀 비슷한거같기도 하다. 이거 좀 재밌는듯

 

오늘은 마지막으로 S3와 AIRFLOW를 연동하는 과정을 진행하였다.


AWS S3

 

  • DAG의 결과물들을 S3 저장
    • S3 (Date Lake의 역활)
      • 원시 데이터(raw data)
      • 중간에 생성되는 임시 데이터
      • airflow의 최종 데이터
  • 세팅
    • 인증
      • 로컬 PC에서 AWS 엑세스
        • IAM -> 본인계정 -> 엑세스키 발급
          • 접근 루트
            • IAM > 사용자 > ai-en-0(본인계정) > 보안 자격 증명 > 액세스 키 > 액세스 키 만들기
            • 사용사례 > Command Line Interface(CLI) 선택 > 확인체크 > 설명-태그( airflow 임시 입력) > 액세스 키 만들기
            • 키를 복사 or csv 다운로드
          • 주의 외부 노출(유출) 절대 금지 -> 사고남 -> git 업로드 절대 금지!!
        • airflow 대시보드 > Admin > Connection > 등록 > [+] 클릭
        • git or .env등에 기재 x (프로젝트 내부에는 흔적 없음)
    • 패키지설치 (docker-compose.yaml)
      • apache-airflow-providers-amazon
        # apache-airflow-providers-amazon 추가함
        _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} apache-airflow-providers-mysql apache-airflow-providers-amazon
      
      • 아래 명령 수행 (도커 재구성)
        • docker compose down
        • docker compose up -d
 

  • S3 버킷 준비
    • 본인 리전 이동 -> S3 진입 -> 버킷만들기
    • airflow-(사용자명)
 

Airflow + s3 (DataLake)

 

  • 09_aws_s3_basics.py
    • 기본 업로드 및 업로드 확인
 

  • Producer-Consumer 패턴
    • DAG A (생산자, A팀, producer) 파일 생성하며 업로드
    • DAG B (소비자, B팀, consumer) 업로드 된 파일 감지하여 자동으로 처리(업무)
      • 처리 후 -> 파일 삭제 -> 저장소가 비워있어야함
 

  • 결론
    • airflow에서 필요시 DAG에서 자유롭게 읽고, 쓰기, 삭제 등등 임시 활용할 수 있는 스토리지로 활용
      • 추출한 결과, 변환한 결과, 적재한 결과, 임시 파일 등등 모두 저장 가능
      • raw data 저장 가능
      • 모델 저장 가능
      • s3 : 모든 데이터, 객체 등등 저장 가능한 data lake임
 

AWS 키는 여기에 등록한다.

 

09_aws_s3_basics.py

# 로컬에서 파일 생성 -> s3 업로드 => 업로드 여부 확인

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import logging
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 2. 상수, 환경변수등 고정값
# 본인이 사용하는 버킷명 
BUCKET_NAME = 'airflow-ai-en-3'
# 업로드할 파일명
FILE_NAME = 'hello.txt'
# 업로드할 파일의 로컬내 위치
LOCAL_PATH = f'/opt/airflow/dags/data/{FILE_NAME}'


# 3-1-1. 오퍼레이터에 콜백함수 정의
def _check_s3_task(**kwargs):
    # S3Hook 이용 => 파일이 진짜로 존재하는지 리스트 출력
    # 1. S3Hook 이용 AWS 엑세스함
    hook = S3Hook(aws_conn_id='aws_default')
    # 2. S3Hook 이용하여 모든 키(파일명) 조회
    keys = hook.list_keys(bucket_name=BUCKET_NAME)
    # 3. 검출
    if not keys:
        raise ValueError("실패, 파일 업로드 실패")
    for key in keys:
        if FILE_NAME in keys:
            logging.info("성공, 파일 업로드 성공")
        else:
            logging.info("실패, 파일 업로드 실패1")
            raise ValueError("실패, 파일 업로드 실패2")
    logging.info("실패, 파일 업로드 실패")
    pass

# 3. DAG 정의
with DAG(
    dag_id              = "09_aws_s3_basics",
    description         = "s3 연동 기본 연습",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@once',
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['aws', 's3']
) as dag:

    # 3-1. 오퍼레이터 정의
    # 파일 생성
    create_file_task = BashOperator(
        task_id='create_file_task',
        bash_command=f'echo "hello airflow & s3" > {LOCAL_PATH}'
    )
    # 로컬 파일 -> s3 업로드 : 설정만으로 업로드 가능함
    upload_to_s3_task = LocalFilesystemToS3Operator(
        task_id     = 'upload_to_s3_task',
        filename    = LOCAL_PATH,    # 로컬 파일의 위치
        dest_key    = FILE_NAME,     # 특정 데이터의 파일명
        dest_bucket = BUCKET_NAME,   # 파일이 업로드될 최종지(목적지) 버킷명
        aws_conn_id = 'aws_default', # aws 연결 정보 id, 대시보드상 등록한 id
        replace     = True           # 동일한 파일이 존재하면 덮어써라
    )
    # s3 업로드 여부 확인
    check_s3_task = PythonOperator(
        task_id     = 'check_s3_task',
        python_callable=_check_s3_task,
    )    
    # 의존성(injection)
    create_file_task >> upload_to_s3_task >> check_s3_task

 

이전과 다른점이 크진 않지만 다른 점은 데이터를 로컬에도 저장하며, 동시에 s3상에도 올리는것이다.

data > hello.txt
s3상에도 데이터가 올라간 모습이다.

09_s3_producer.py

 

# 로컬에서 작업(추출,변환,적재) 후 -> csv 생성 -> s3 내(버킷/income/xxx.csv)에 업로드
# A팀
# 1. 모듈 가져오기
from datetime import datetime, timedelta 
import logging
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 2. 상수, 환경변수, 고정값
# 버킷명
BUCKET_NAME = "airflow-ai-en-3"
# 업로드할 파일명
FILE_NAME = 'trigger_data.csv'
# 최종 버킷 내 특정 폴더에 생성될 파일명 -> KEY
S3_KEY = f'income/{FILE_NAME}'
# 업로드할 파일의 로컬내 위치
LOCAL_PATH = f'/opt/airflow/dags/data/{S3_KEY}'

# 3. DAG 정의
with DAG(
    dag_id              = "09_s3_producer",
    description         = "s3의 특정 버킷에 데이터 공급",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = None, # 수동 실행 (트리거)
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['aws', 's3', 'producer']
) as dag:
    # 3-1. 오퍼레이터 정의
    # csv 파일 생성
    create_csv_file_task = BashOperator(
        task_id='create_csv_file_task',
        bash_command=f'echo "id,timestamp,value\n1,$(date),100\n2,$(date),200" > {LOCAL_PATH}'
    )
    # 로컬 파일 -> s3 업로드 : 설정만으로 업로드 가능함
    upload_to_s3_task = LocalFilesystemToS3Operator(
        task_id     = 'upload_to_s3_task',
        filename    = LOCAL_PATH,    # 로컬 파일의 위치
        dest_key    = S3_KEY,     # 특정 데이터의 파일명
        dest_bucket = BUCKET_NAME,   # 파일이 업로드될 최종지(목적지) 버킷명
        aws_conn_id = 'aws_default', # aws 연결 정보 id, 대시보드상 등록한 id
        replace     = True           # 동일한 파일이 존재하면 덮어써라 => 멱등성
    )
    # 3-2. 의존성
    create_csv_file_task >> upload_to_s3_task

 

뭐 이전과 크게 다르진 않다, 여기서는 이제 s3상의 데이터를 감지해서 다음 작업을 수행하는 업무를 자동화할것이다.

 

로컬에는 이렇게 저장이 된다. 그리고 이렇게 생긴 데이터를 확인하면 

 

09_s3_consumer.py

# 잠복하여 감시하던 중 업로드 되는 것을 자동 감지(S3KeySensor) -> 파일 읽고 -> 처리 -> 삭제
# 버킷 내 income 폴더 하위는 임시로 사용하는 공간 (늘 채워져있지 않다)
# B팀
# 1. 모듈 가져오기
from datetime import datetime, timedelta 
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 읽기용
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor # 감시(지)용
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator # 삭제용


# 2. 상수, 환경변수, 고정값
# 버킷명
BUCKET_NAME = "airflow-ai-en-3"
# 업로드할 파일명
FILE_NAME = 'trigger_data.csv'
# 최종 버킷 내 특정 폴더에 생성될 파일명 -> KEY
S3_KEY = f'income/{FILE_NAME}'


def _processing_data(**kwargs):
    # 감시 대상이 포착되면 해당 내용을 읽어서 로그 출력
    hook = S3Hook(aws_conn_id='aws_default') # 연결
    # 파일 내용 읽기
    data = hook.read_key(key=S3_KEY, bucket_name=BUCKET_NAME)
    # 로그 출력
    logging.info('--- 로그 출력 시작 ---')
    logging.info(data)
    logging.info('--- 로그 출력 끝 ---')
    pass


# 3. DAG 정의
with DAG(
    dag_id              = "09_s3_consumer_v1",
    description         = "s3의 특정 버킷에 데이터 변화 감지-> 읽기 -> 삭제",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@daily', # 스위치 키면 작동 -> 센서 감지 안되면 대기(연두색뜰걸)
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['aws', 's3', 'consumer']
) as dag:
    # 3-1. 오퍼레이터 정의
    # 1. 센서 감시자(감지)
    waitting_trigger_task = S3KeySensor(
        task_id='waitting_trigger_task',
        # 감시 대상 설정
        bucket_name=BUCKET_NAME,   # 버킷 이름
        bucket_key=S3_KEY,         # 버킷 내 타겟     
        aws_conn_id='aws_default', # 접속 정보
        # 감시 설정(방법)
        mode='reschedule',         # 대기중에는 자원을 반납한다
        poke_interval=10,          # 10초 간격으로 체크(실습상 부여, 실제는 다를 수 있음)
        timeout=600                # 10분
    )
    # 2. 데이터 읽기 (처리)
    processing_data_task = PythonOperator(
        task_id='processing_data_task',
        python_callable=_processing_data
    )
    # 3. 파일 삭제(s3 특정 키 삭제)
    delete_data_task = S3DeleteObjectsOperator(
        task_id     = 'delete_data_task',
        bucket      = BUCKET_NAME,  # 버킷 이름
        keys        = [S3_KEY],     # 버킷 내 타겟들을 n개 지정
        aws_conn_id = 'aws_default' # 접속 정보
    )
    # 3-2. 의존성
    waitting_trigger_task >> processing_data_task >> delete_data_task

다음 코드이다, 큰 작업은 없고 읽었다는 확인만 하기위해 만든 코드다.

 

로그상에서 consumer DAG가 로그를 정상적으로 읽고 출력하는 모습을 볼 수 있다.

 

오늘의 수업은 여기까지