드디어 밀린 게시물 따라왔다,
오늘은 AIRFLOW에서 데이터셋을 감지하고 그 데이터를 트리거로 하여 다음 작업을 수행하는 파이프라인 구축과
AWS S3에 데이터를 적재하는 자동화 실습을 진행하였다.
dataset 감지
ETL -> MLOps -> AI Service
- 3개의 DAG를 구성하여 연결
- 앞에서 만든 모델(가상,시뮬레이션, 간단하게 수식 구성)이 뒤에서 만들어지는 과정 구현해본것
- FLOW
- 데이터가 계속 쌓이면 ETL을 통해서 모델 학습에 필요한 데이터를 축적
- MLOps에서는 특정한 스케줄 혹은 데이터량이 축적되면 학습을 통해서 모델 업데이트를 진행
- AI Service는 새로 적용된 모델을 기반으로 추론 서비스를 제공 -> API 제공
- 구성
- 데이터 준비(ETL) -> 모델 학습(ML/DL Train) -> 서비스 적용(Inference, API)
- 멀티 DAG에서 다른 DAG 호출
- TriggerDagRunOperator 사용
- Airflow 2.4부터는 Datasets 기능 사용
- 앞단계 DAG가 끝났는지 검사하는것이 아니라, Datasets이 생성되었는가를 감시하는 이벤트 기반으로 처리
- 데이터 인지 스케줄링
- data-Aware Scheduling
- 데이터가 준비되었으면 학습해 (O)
- 10시에 학습해 (X)
Data pipeline + MLops를 위한 파이프라인 아킥텍쳐
- 3 DAG
- 상호 데이터 파일을 통해 대화
- 구성
- DAG1
- 08_etl.py
- ETL을 통해서 trainning_data.csv 생성
- Producer 역할
- DAG2
- 08_mlops.py
- trainning_data.csv가 생기면 자동으로 모델 학습 진행
- 학습의 결과로 모델 덤프 (시물레이션, model_artifact.json) 파일 생성
- Consumer & Producer 역할
- DAG3
- 08_inference.py
- model_artifact.json 생성(혹은 갱신)되면 -> AI Server(MSA) 요청을 통해 고객 결과 DB 갱신
- Consumer 역할
- 공통
- 공통의 Dataset 정의
- 파일, 경로 등등 대상
- Airflow가 인식해야함
- 향후 -> AWS S3상에 저장 -> 업그레이드 (엑세스키 발급 및 등록-> Airflow)
- 공통의 Dataset 정의
- DAG1
이번에는 데이터 학습, 머신러닝의 개념까지 추가한 AIRFLOW를 구현해볼 예정이다.
그리고 각 코드별로 이전의 TASK가 끝날 시에 진행되는 내용을 넣어서 처리또한 진행한다.
08_etl.py
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import random
import os
from airflow.datasets import Dataset # 2.4 이후 지원 데이터셋 모듈
import pandas as pd
# 2. 데이터셋 저장 경로 (도커 내 리눅스 내부경로 지정)
DATA_PATH='/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True) # 멱등성 고려 (여러번 수행되어도 동일한 결과 나오도록)
# 3. 데이터셋 (Airflow가 감시하는(바라보는) -> 트리거 발동 시킴) 정의 => URI 형태
# 최종 task의 결과로 trainning_data.csv 만들어지면 다음 DAG 작동됨
trigger_dataset = Dataset(f'file://{DATA_PATH}/trainning_data.csv')
# 4-1-1. 오퍼레이터에 콜백함수 정의
def _etl_task_generator(**kwargs):
# 학습용 더미데이터 생성 (Feature / label(target) 생성)
data = list()
for i in range(100):
income = random.randint(2000, 10000)
loan_amt = random.randint(100, 5000)
data.append({
"income" : income,
"loan_amt": loan_amt,
# 1 : 대출 승인, 0 : 대출 불허(거절)
"target" : 1 if income > loan_amt else 0
})
# [ {}, {}, {}, ... ] -> Dataframe -> csv, 코드 완성, 로그 출력(학습 데이터 완료: 경로)
df = pd.DataFrame(data)
save_path= f'{DATA_PATH}/trainning_data.csv'
df.to_csv(save_path, index=False)
logging.info(f'학습 데이터 완료: {save_path}')
# training_data_년월일().csv -> 이렇게 구성할 경우 별도 삭제 안하면 데이터가 쌓일수 잇음
pass
# 4. DAG 정의
with DAG(
dag_id = "08_etl_v1",
description = "etl 서비스",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@daily',
start_date = datetime(2025,1,1),
catchup = False,
tags = ['etl', 'part1']
) as dag:
# 4-1. 오퍼레이터 정의
etl_task = PythonOperator(
task_id='etl_task_generator',
python_callable=_etl_task_generator,
# 핵심은 dataset 연결
# 해당 task가 종료되면 -> trigger_dataset이 업데이트 되었다고 공지함
# 다음 DAG의 스케줄이 시간이 아닌 trigger_dataset을 지정(바라보면) -> 해당 공지(트리거)
# 바로 연쇄적으로 다음 DAG이 작동됨(비동기적인 느낌)
outlets=[trigger_dataset]
)
# 4-2. 의존성(injection)
이번 플로우는 etl.py 에서 더미데이터를 만들고, 그 데이터를 특정 경로에 저장해두면, 다음 파일이 그 데이터가 생긴걸 감지하고 다름 task를 수행하는것이다.
이를 위해서 outlets가 추가되었다.
08_mlops.py
# ETL을 통해서 trainning_data.csv -> 학습(시물레이션) -> 모델 덤프(model_artifact.json)
# DAG에서 아래와 같은 과정을 진행했다 가정하고 시뮬레이션 진행
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import random
import os
import json
from airflow.datasets import Dataset
import pandas as pd
# 2. 공통(모든 DAG)
DATA_PATH='/opt/airflow/dags/data'
# 3. 데이터셋 (감시해야 하는, 생성해야 하는_)
trigger_dataset = Dataset(f'file://{DATA_PATH}/trainning_data.csv') # 감시
model_dataset = Dataset(f'file://{DATA_PATH}/model_v1.json') # 생성
# 4-1-1. 오퍼레이터에 콜백함수 정의
def _train_task_dump(**kwargs):
# 일반 모델 학습 과정 (큰 포인트만 표시)
# df = pd.read_csv(...)
# 데이터 분할
# ml학습 => 실제로는 모델이 계속 업그레이드(기존 가중치 유지 -> 신규 데이터 학습) 되어야함
# => 전이학습(transfer learning) : 원래 목적과(업스트림) 실제 사용 목적(다운스트림)이 다르게 구성
# => 여기서는 동일 데이터로 계속 가중치를 업데이트 하는 전략 활용
# => 데이터는 매일, 매시간 계속 쌓인다는 전제하에서 진행
logging.info("ML/DL/LLM 등 모델 학습중.. epoch 1,epoch 2, ... epoch n") # 연출
# 모델 저장
model_info = {
'version': '1.0',
'accuracy': 0.96,
'train_at': datetime.now().isoformat()
}
# 3. 저장
save_path = f'{DATA_PATH}/model_v1.json'
with open(save_path, 'w') as f:
json.dump(model_info, f)
logging.info(f"모델 학습 완료 -> 저장 {save_path}")
pass
# 4. DAG 정의
with DAG(
dag_id = "08_mlops_v1",
description = "mlops 중 모델 학습",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
# 핵심 : trigger_dataset이 생성되어야 해당 DAG이 작동됨 (조건은 시간이 아니라 데이터셋임)
schedule = [trigger_dataset],
start_date = datetime(2025,1,1),
catchup = False,
tags = ['mlops', 'train', 'part2']
) as dag:
# 4-1. 오퍼레이터 정의
train_task = PythonOperator(
task_id='train_task_dump',
python_callable=_train_task_dump,
# 핵심 : task의 결론, 새로운 모델이 나왔다고 공지
outlets=[model_dataset]
)
# 4-2. 의존성(injection)
이번 코드에서는 etl코드에서 만든 csv를 감지하고 model.json 파일을 생성해낸다,
이 과정에서 실제로는 모델 학습을 수행하고 모델 파일을 내보내는것이다.
이제 데이터를 추출하고, 머신러닝을 그 데이터를 기반으로 학습시키는 과정까지 진행하였다.
다음으로는 평가를 해야겠지?
08_inference.py
# model_artifact.json -> msa 기반 api 서비스 사용
# 향후 MLOps의 대표적인 도구 MLFlow를 적용 -> 모델 생성 후 엔드포인트 구성 가능해짐
# api는 동일한데 모델이 교체됨(v1 -> v2 -> v3 -> ... -> vn) -> 코드 수정이 없음
# 여기서는 새로운 모델이 발견되었다 -> 모델이 교체되었다 (가정 -> MLFLow 관련 DAG 구성 필요) 가정
# 새로 업데이트된 기준 (모델)에 맞춰 대상 고객의 신용 평가를 새로(혹은 업데이트(확장성 고려)) 진행
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
import json
import requests # MSA 서비스 호출 (HTTP 요청)
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.datasets import Dataset
# 2. 상수 (처럼 관리, 고정값)정의
# inference 수행할 msa 기반 url
API_URL = "http://ai-api-server:8000/predict"
# 데이터셋
DATA_PATH='/opt/airflow/dags/data'
# 데이터셋 (감시 해야하는)
model_dataset = Dataset(f'file://{DATA_PATH}/model_v1.json')
# 3. PythonOperator를 위한 함수
def _extract_data_msa(**kwargs):
# 더미 데미터 수동 구성 -> (업그레이드) sql 쿼리에서 조회
users = [
{ "user_id":"C101", "income": 6000, "loan_amt": 2000 },
{ "user_id":"C102", "income": 2000, "loan_amt": 6000 },
{ "user_id":"C103", "income": 9000, "loan_amt": 3000 }
]
# 차후에는 고객 테이블 구성-> 더미로 입력(신용평가 여부 랜덤구성)
# 반환
return users
def _ai_service_api_msa(**kwargs):
# API 호출 (재료는 XCom 통해 통신)
# 1. XCom 통해 신용평가를 수행하고자 하는 고객 데이터 획득
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='extract_data_msa')
# 2. 외부에 존재하는 MSA(컨셉) API 호출(실제 연산은 외부에서 진행된다)
try:
res = requests.post( API_URL, json=users_data )
#res.raise_for_status() # 200에 대한 점검 필요하면 진행(생략)
results = res.json() # 결과 획득
logging.info(f'신용 평가 결과 획득 {results}')
# 결과를 XCom 에서 획득 가능하게 반환
return results
except Exception as e:
logging.error(f'API 호출 실패 {e}')
raise
pass
def _load_data_msa(**kwargs):
# SQL 이용하여 DB 저장
# 신용 평가 결과 획득 XCom 사용
ti = kwargs['ti']
users_data = ti.xcom_pull(task_ids='ai_service_api_msa')
if not users_data:
logging.error("신용 평가 결과 없음")
raise ValueError('신용 평가 결과 없음')
# SQL 저장
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 1. 테이블이 없다면 생성(create)
cursor.execute('''
CREATE TABLE IF NOT EXISTS credit_scores (
user_id VARCHAR(50),
credit_score INT,
grade VARCHAR(4),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
# 2. 데이터 적재(insert)
sql = '''
insert into credit_scores
(user_id, credit_score, grade)
values
(%s, %s, %s)
'''
params = [
( data['user_id'], data['credit_score'], data['grade'])
for data in users_data
]
cursor.executemany( sql, params)
# 3. 커밋
conn.commit()
pass
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적제) 중 오류 발생 {e}')
finally:
if conn:
conn.close()
logging.info('mysql에 데이터 삽입(적제) 완료')
pass
# 4. DAG 구성
with DAG(
dag_id = "08_inference_v1",
description = "msa 서비스 호출 모델 추론 요청",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule = [model_dataset],
start_date = datetime(2025,1,1),
catchup = False,
tags = ['msa', 'part3']
) as dag:
# Task
# 1. 더미 데이터 준비
extract_data = PythonOperator(
task_id = "extract_data_msa",
python_callable = _extract_data_msa
)
# 2. AI 서비스 호출-> 신용평가 결과 획득
ai_service_api = PythonOperator(
task_id = "ai_service_api_msa",
python_callable = _ai_service_api_msa
)
# 3. 결과를 저장
load_data = PythonOperator(
task_id = "load_data_msa",
python_callable = _load_data_msa
)
# 의존성 1->2->3
extract_data >> ai_service_api >> load_data
pass
이번 코드에서는 더미데이터(실제로는 sql db랑 연동이 되어있을거다,)의 신용평가를 이전에 보내준 모델을 사용해서 수행하는 과정을 자동화하는것이다, api를 통해서 모델을 불러와서 결과를 획득하고, 그 데이터를 xcom을 통해서 보내서 db에 저장하고, 적재의 역할을 수행한다.
이 일련의 과정들을 msa라고 칭하는것이다. 약간 랭그래프랑 좀 비슷한거같기도 하다. 이거 좀 재밌는듯
오늘은 마지막으로 S3와 AIRFLOW를 연동하는 과정을 진행하였다.
AWS S3
- DAG의 결과물들을 S3 저장
- S3 (Date Lake의 역활)
- 원시 데이터(raw data)
- 중간에 생성되는 임시 데이터
- airflow의 최종 데이터
- S3 (Date Lake의 역활)
- 세팅
- 인증
- 로컬 PC에서 AWS 엑세스
- IAM -> 본인계정 -> 엑세스키 발급
- 접근 루트
- IAM > 사용자 > ai-en-0(본인계정) > 보안 자격 증명 > 액세스 키 > 액세스 키 만들기
- 사용사례 > Command Line Interface(CLI) 선택 > 확인체크 > 설명-태그( airflow 임시 입력) > 액세스 키 만들기
- 키를 복사 or csv 다운로드
- 주의 외부 노출(유출) 절대 금지 -> 사고남 -> git 업로드 절대 금지!!
- 접근 루트
- airflow 대시보드 > Admin > Connection > 등록 > [+] 클릭
- git or .env등에 기재 x (프로젝트 내부에는 흔적 없음)
- IAM -> 본인계정 -> 엑세스키 발급
- 로컬 PC에서 AWS 엑세스
- 패키지설치 (docker-compose.yaml)
- apache-airflow-providers-amazon
# apache-airflow-providers-amazon 추가함 _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} apache-airflow-providers-mysql apache-airflow-providers-amazon- 아래 명령 수행 (도커 재구성)
- docker compose down
- docker compose up -d
- 인증
- S3 버킷 준비
- 본인 리전 이동 -> S3 진입 -> 버킷만들기
- airflow-(사용자명)
Airflow + s3 (DataLake)
- 09_aws_s3_basics.py
- 기본 업로드 및 업로드 확인
- Producer-Consumer 패턴
- DAG A (생산자, A팀, producer) 파일 생성하며 업로드
- DAG B (소비자, B팀, consumer) 업로드 된 파일 감지하여 자동으로 처리(업무)
- 처리 후 -> 파일 삭제 -> 저장소가 비워있어야함
- 결론
- airflow에서 필요시 DAG에서 자유롭게 읽고, 쓰기, 삭제 등등 임시 활용할 수 있는 스토리지로 활용
- 추출한 결과, 변환한 결과, 적재한 결과, 임시 파일 등등 모두 저장 가능
- raw data 저장 가능
- 모델 저장 가능
- s3 : 모든 데이터, 객체 등등 저장 가능한 data lake임
- airflow에서 필요시 DAG에서 자유롭게 읽고, 쓰기, 삭제 등등 임시 활용할 수 있는 스토리지로 활용

09_aws_s3_basics.py
# 로컬에서 파일 생성 -> s3 업로드 => 업로드 여부 확인
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import logging
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# 2. 상수, 환경변수등 고정값
# 본인이 사용하는 버킷명
BUCKET_NAME = 'airflow-ai-en-3'
# 업로드할 파일명
FILE_NAME = 'hello.txt'
# 업로드할 파일의 로컬내 위치
LOCAL_PATH = f'/opt/airflow/dags/data/{FILE_NAME}'
# 3-1-1. 오퍼레이터에 콜백함수 정의
def _check_s3_task(**kwargs):
# S3Hook 이용 => 파일이 진짜로 존재하는지 리스트 출력
# 1. S3Hook 이용 AWS 엑세스함
hook = S3Hook(aws_conn_id='aws_default')
# 2. S3Hook 이용하여 모든 키(파일명) 조회
keys = hook.list_keys(bucket_name=BUCKET_NAME)
# 3. 검출
if not keys:
raise ValueError("실패, 파일 업로드 실패")
for key in keys:
if FILE_NAME in keys:
logging.info("성공, 파일 업로드 성공")
else:
logging.info("실패, 파일 업로드 실패1")
raise ValueError("실패, 파일 업로드 실패2")
logging.info("실패, 파일 업로드 실패")
pass
# 3. DAG 정의
with DAG(
dag_id = "09_aws_s3_basics",
description = "s3 연동 기본 연습",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@once',
start_date = datetime(2025,1,1),
catchup = False,
tags = ['aws', 's3']
) as dag:
# 3-1. 오퍼레이터 정의
# 파일 생성
create_file_task = BashOperator(
task_id='create_file_task',
bash_command=f'echo "hello airflow & s3" > {LOCAL_PATH}'
)
# 로컬 파일 -> s3 업로드 : 설정만으로 업로드 가능함
upload_to_s3_task = LocalFilesystemToS3Operator(
task_id = 'upload_to_s3_task',
filename = LOCAL_PATH, # 로컬 파일의 위치
dest_key = FILE_NAME, # 특정 데이터의 파일명
dest_bucket = BUCKET_NAME, # 파일이 업로드될 최종지(목적지) 버킷명
aws_conn_id = 'aws_default', # aws 연결 정보 id, 대시보드상 등록한 id
replace = True # 동일한 파일이 존재하면 덮어써라
)
# s3 업로드 여부 확인
check_s3_task = PythonOperator(
task_id = 'check_s3_task',
python_callable=_check_s3_task,
)
# 의존성(injection)
create_file_task >> upload_to_s3_task >> check_s3_task
이전과 다른점이 크진 않지만 다른 점은 데이터를 로컬에도 저장하며, 동시에 s3상에도 올리는것이다.


09_s3_producer.py
# 로컬에서 작업(추출,변환,적재) 후 -> csv 생성 -> s3 내(버킷/income/xxx.csv)에 업로드
# A팀
# 1. 모듈 가져오기
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# 2. 상수, 환경변수, 고정값
# 버킷명
BUCKET_NAME = "airflow-ai-en-3"
# 업로드할 파일명
FILE_NAME = 'trigger_data.csv'
# 최종 버킷 내 특정 폴더에 생성될 파일명 -> KEY
S3_KEY = f'income/{FILE_NAME}'
# 업로드할 파일의 로컬내 위치
LOCAL_PATH = f'/opt/airflow/dags/data/{S3_KEY}'
# 3. DAG 정의
with DAG(
dag_id = "09_s3_producer",
description = "s3의 특정 버킷에 데이터 공급",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = None, # 수동 실행 (트리거)
start_date = datetime(2025,1,1),
catchup = False,
tags = ['aws', 's3', 'producer']
) as dag:
# 3-1. 오퍼레이터 정의
# csv 파일 생성
create_csv_file_task = BashOperator(
task_id='create_csv_file_task',
bash_command=f'echo "id,timestamp,value\n1,$(date),100\n2,$(date),200" > {LOCAL_PATH}'
)
# 로컬 파일 -> s3 업로드 : 설정만으로 업로드 가능함
upload_to_s3_task = LocalFilesystemToS3Operator(
task_id = 'upload_to_s3_task',
filename = LOCAL_PATH, # 로컬 파일의 위치
dest_key = S3_KEY, # 특정 데이터의 파일명
dest_bucket = BUCKET_NAME, # 파일이 업로드될 최종지(목적지) 버킷명
aws_conn_id = 'aws_default', # aws 연결 정보 id, 대시보드상 등록한 id
replace = True # 동일한 파일이 존재하면 덮어써라 => 멱등성
)
# 3-2. 의존성
create_csv_file_task >> upload_to_s3_task
뭐 이전과 크게 다르진 않다, 여기서는 이제 s3상의 데이터를 감지해서 다음 작업을 수행하는 업무를 자동화할것이다.

로컬에는 이렇게 저장이 된다. 그리고 이렇게 생긴 데이터를 확인하면
09_s3_consumer.py
# 잠복하여 감시하던 중 업로드 되는 것을 자동 감지(S3KeySensor) -> 파일 읽고 -> 처리 -> 삭제
# 버킷 내 income 폴더 하위는 임시로 사용하는 공간 (늘 채워져있지 않다)
# B팀
# 1. 모듈 가져오기
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 읽기용
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor # 감시(지)용
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator # 삭제용
# 2. 상수, 환경변수, 고정값
# 버킷명
BUCKET_NAME = "airflow-ai-en-3"
# 업로드할 파일명
FILE_NAME = 'trigger_data.csv'
# 최종 버킷 내 특정 폴더에 생성될 파일명 -> KEY
S3_KEY = f'income/{FILE_NAME}'
def _processing_data(**kwargs):
# 감시 대상이 포착되면 해당 내용을 읽어서 로그 출력
hook = S3Hook(aws_conn_id='aws_default') # 연결
# 파일 내용 읽기
data = hook.read_key(key=S3_KEY, bucket_name=BUCKET_NAME)
# 로그 출력
logging.info('--- 로그 출력 시작 ---')
logging.info(data)
logging.info('--- 로그 출력 끝 ---')
pass
# 3. DAG 정의
with DAG(
dag_id = "09_s3_consumer_v1",
description = "s3의 특정 버킷에 데이터 변화 감지-> 읽기 -> 삭제",
default_args = {
'owner' :'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = '@daily', # 스위치 키면 작동 -> 센서 감지 안되면 대기(연두색뜰걸)
start_date = datetime(2025,1,1),
catchup = False,
tags = ['aws', 's3', 'consumer']
) as dag:
# 3-1. 오퍼레이터 정의
# 1. 센서 감시자(감지)
waitting_trigger_task = S3KeySensor(
task_id='waitting_trigger_task',
# 감시 대상 설정
bucket_name=BUCKET_NAME, # 버킷 이름
bucket_key=S3_KEY, # 버킷 내 타겟
aws_conn_id='aws_default', # 접속 정보
# 감시 설정(방법)
mode='reschedule', # 대기중에는 자원을 반납한다
poke_interval=10, # 10초 간격으로 체크(실습상 부여, 실제는 다를 수 있음)
timeout=600 # 10분
)
# 2. 데이터 읽기 (처리)
processing_data_task = PythonOperator(
task_id='processing_data_task',
python_callable=_processing_data
)
# 3. 파일 삭제(s3 특정 키 삭제)
delete_data_task = S3DeleteObjectsOperator(
task_id = 'delete_data_task',
bucket = BUCKET_NAME, # 버킷 이름
keys = [S3_KEY], # 버킷 내 타겟들을 n개 지정
aws_conn_id = 'aws_default' # 접속 정보
)
# 3-2. 의존성
waitting_trigger_task >> processing_data_task >> delete_data_task
다음 코드이다, 큰 작업은 없고 읽었다는 확인만 하기위해 만든 코드다.

로그상에서 consumer DAG가 로그를 정상적으로 읽고 출력하는 모습을 볼 수 있다.
오늘의 수업은 여기까지
'ASAC-SK플래닛 T아카데미 데이터 엔지니어' 카테고리의 다른 글
| 26.01.15 69일차 [airflow | athena query, sensor, ctas etl] (0) | 2026.01.18 |
|---|---|
| 26.01.14 68일차 [airflow 특강 | 취업 특강] (0) | 2026.01.18 |
| 26.01.12 66일차 [Airflow : MSA 기반 파이프라인 구성] (0) | 2026.01.12 |
| 26.01.09 65일차 [Airflow : Mysql 연동, multi DAG 구현] (0) | 2026.01.09 |
| 26.01.08 64일차 [Airflow : 기본 개념, dag, operator 로컬 실습] (0) | 2026.01.08 |