오늘은 어제에 이어서 05번 파일부터 리뷰하겠다.
05_mysql_etl.py
- DAG
- 1개의 DAG에서 ETL 수행 <-> N개의 DAG에서 ETL 수행
- task1
- 스마트 팩토리 상에 온도 센서에서 온도 측정
- 해당 측정된 데이터가 어딘가에 쌓이고 있다 (원시데이터)
- 이 데이터를 추출 => 더미 구성
- 스마트 팩토리 상에 온도 센서에서 온도 측정
- task2
- 추출된 데이터를 가지고 와서 변환과정 (전처리) 수행
- 온도값을 보정, 향후 모델이나 등등에서 사용되는 단위로 인코딩
- 여기서는 단순하게 환산, 수치연산만 수행
- 추출된 데이터를 가지고 와서 변환과정 (전처리) 수행
- task3
- 변환된 데이터는 mysql에 특정 테이블에 적재
- 소량의 데이터는 RDB 가능, -> mysql에 적재
- 대량의 데이터는 S3 같은 클라우드 기반 서비스에 적재
- 변환된 데이터는 mysql에 특정 테이블에 적재
- 작업 완료 후 확인
mysql> select * from sensor_readings;
+----+-----------+---------------------+---------------+---------------+---------------------+
| id | sensor_id | timestamp | temperature_c | temperature_f | created_at |
+----+-----------+---------------------+---------------+---------------+---------------------+
| 1 | SENSOR_1 | 2026-01-09 14:31:02 | 61.35 | 142.43 | 2026-01-09 05:31:05 |
| 2 | SENSOR_2 | 2026-01-09 14:31:02 | 66.12 | 151.016 | 2026-01-09 05:31:05 |
| 3 | SENSOR_3 | 2026-01-09 14:31:02 | 77.17 | 170.906 | 2026-01-09 05:31:05 |
| 4 | SENSOR_6 | 2026-01-09 14:31:02 | 43.44 | 110.192 | 2026-01-09 05:31:05 |
| 5 | SENSOR_7 | 2026-01-09 14:31:02 | 85.44 | 185.792 | 2026-01-09 05:31:05 |
| 6 | SENSOR_9 | 2026-01-09 14:31:02 | 96.56 | 205.808 | 2026-01-09 05:31:05 |
| 7 | SENSOR_10 | 2026-01-09 14:31:02 | 52.77 | 126.986 | 2026-01-09 05:31:05 |
+----+-----------+---------------------+---------------+---------------+---------------------+
7 rows in set (0.00 sec)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 데이터
import json
import random
import pandas as pd
import os
# 프로젝트 폴더 내부에 /dags/data 에 내부에서 task 과정에 생성되는 파일을 동기화하여 확인할 수 있게
# 위치를 선정
DATA_PATH = '/opt/airflow/dags/data' # 도커 내부에 있는 서비스(리눅스 기반)의 특정 경로
os.makedirs(DATA_PATH, exist_ok=True)
def _extract_data_sensor(**kwargs):
# kwargs <- airflow context 정보가 전달됨
# 스마트 팩토리에 설치된 온도 센서 데이터가 어딘가(데이터 레이크 : s3)에 쌓이고있다 -> 추출해서 가져온다
# 여기서는 더미로 구성
data = [ {
"sensor_id" : f"SENSOR_{i+1}", # 장비 id
"timestamp" : datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # YYYY-MM-DD hh:mm:ss
"temperature" : round( random.uniform( 20.0, 150.0 ), 2 ),
"status" : "on" # on/off
} for i in range(10) ]
# 더미로 만든 데이터를 파일로 저장 -> /opt/airflow/dags/data/sensor_data_수행날짜.json
file_path = f'{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json'
with open(file_path, 'w') as f:
json.dump(data , f)
# XCom을 통해서 다음 task에 접근 가능함
# 다음 테스크에게 무엇을 전달할 것인가? 1) 데이터(저용량일때 가볍게,그러나 사용 자제) 2) [v] 파일 경로(로컬, S3)
logging.info(f'extract 완료 데이터 = {file_path}')
return file_path
pass
def _transform_Data_std_change(**kwargs):
# 1. extract_data_sensor task에서 전달한 file_path 획득 (XCom 이용)
ti = kwargs['ti']
json_file_path = ti.xcom_pull(task_ids='extract_data_sensor')
# 2. 해당 데이터를 DataFrame으로 로드
df = pd.read_json(json_file_path) # 경로에 맞춰서 알아서 데이터를 로드
# 3. 전처리 수행 -> 섭씨를 화씨로 변환처리 (화씨(F) = (섭씨(C) x 9/5) + 32 ) -> 파생 변수 'temperature_f'
# 컨셉 => 우리 공장에서는 측정 온도를 100도 이하만 정상 데이터로 간주한다
# (100도 이상이면 이상치로 간주) => 이상치 제거 or 100도 이하만 추출
# => 불리언 인덱싱 : df [ 조건식 ]
target_df = df[ df['temperature'] < 100 ].copy() #(deep=True) # 데이터 크기 따라서 선택
target_df['temperature_f'] = (target_df['temperature'] * 9/5) + 32
# 4. 전처리된 데이터를 저장 => 동일 공간에 파일명만 preprocessing_data_{ds_dodash}.csv
file_path = f'{DATA_PATH}/preprocessing_data_{kwargs['ds_nodash']}.csv'
target_df.to_csv(file_path, index = False)
logging.info(f"transform 데이터 전처리 후 csv 저장 완료={file_path}")
# 5. 저장된 csv경로를 다음 task에서 사용할 수 있게 반환 처리 (df -> csv, 인덱스 제거)
return file_path
pass
def _load_data_mysql(**kwargs):
# csv => mysql, 이를 위해서 MysqlHook 사용
# 1. csv 경로 획득
ti = kwargs['ti']
csv_file_path = ti.xcom_pull(task_ids='transform_Data_std_change')
# 1.5 csv -> df로 read
df = pd.read_csv(csv_file_path)
# 2. 연결 => I/O (예외처리, with문)
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 객체 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 1. 쿼리문 준비
sql = '''
insert into sensor_readings
(sensor_id, timestamp, temperature_c, temperature_f)
values (%s, %s, %s, %s)
'''
# 2. 데이터별, 컬럼별 추출하여 쿼리 수행 (executemany() )
# params = list() # [ (값, 값, 값, 값) (), () ] # status는 고정이라 제외
params = [
( data['sensor_id'] ,data['timestamp'] ,data['temperature'] ,data['temperature_f'] )
for _, data in df.iterrows()
]
cursor.executemany( sql, params)
# 3. 커밋
conn.commit()
logging.info('mysql에 데이터 삽입(적재) 성공(success)')
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적재)중 오류 발생 {e}')
pass
finally:
if conn:
conn.close()
logging.info('mysql에 데이터 삽입(적재) 완료')
pass
# DAG
with DAG(
dag_id = "05_mysql_etl_v1",
description = "etl을 수행하여 mysql에 적재",
default_args = {
'owner' : 'de_1team_manager',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
},
schedule_interval = "@daily",
start_date = datetime(2025,1,1),
catchup = False,
tags = ["mysql", "etl"]
) as dag:
# operator(task)
create_table = MySqlOperator(
task_id = 'create_table',
mysql_conn_id = "mysql_default", # UI > Admin > connections 등록된 내용과 동일하게 구성
# 여러번 수행할 수 있으므로 if not exists => 존재하지 않으면 생성
# 기재하지 않으면 2번째 주기에서 task 구동시 실패 발생할 수 있음
# sql을 등록하시면 task instance 수행시 반드시 쿼리가 작동함
sql = '''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INT AUTO_INCREMENT PRIMARY KEY,
sensor_id VARCHAR(50),
timestamp DATETIME,
temperature_c FLOAT,
temperature_f FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
)
extract_data = PythonOperator(
task_id = 'extract_data_sensor',
python_callable = _extract_data_sensor
)
transform_data = PythonOperator(
task_id = 'transform_Data_std_change',
python_callable = _transform_Data_std_change
)
load_data = PythonOperator(
task_id = 'load_data_mysql',
python_callable = _load_data_mysql
)
# 의존성
# 테이블 생성 >> Extract >> Transform >> Load
create_table >> extract_data >> transform_data >> load_data

어렵다...정말로 어렵다,,,, 과정마다 디테일이 너무너무 다르다
이번에는 mysql과 airflow를 연동시켜 데이터를 전처리하고 전처리된 데이터를 저장하며 적재하는 과정까지를 실습하였다.
이 과정에서 더미데이터를 형성한 이후 mysql을 사용한다가정하고 전처리와 예전에 배운 sql 쿼리를 사용하여 데이터에 파생변수를 추가하였다.
마지막은 결과값이다,
06_멀티 DAG로 ETL
- DAG
- 1개의 DAG에서 ETL 수행 <-> N개의 DAG에서 ETL 수행
- 개요
- 하나의 DAG에서 구성된 긴 파이프라인을 여러개의 DAG로 쪼개고 싶다
- DAG의 분리
- 1개의 DAG이 너무 커진다 (코드가 너무 길다)
- 가독성이 떨어짐등 문제 발생
- 팀 간 협업, 업무 분장 때문에 분할
- A팀이 추출
- B팀이 가공
- C팀이 적재
- 관리 난이도가 높아질 수 있음 (여러개를 바라보고 추적해야함)
- 선택적으로 구성
- 1개의 DAG이 너무 커진다 (코드가 너무 길다)
- 유형
- 싱글 DAG
- 장점
- 한 눈에 모든 파이프라인 구성, 파악
- 중간 실패시 즉시 오류 지점 확인 가능함
- 단점
- 코드가 길어질 수 있음 (난이도 상승)
- 장점
- 멀티 DAG
- 장점
- 각 단계별로 독립적 개발/배포 가능
- 수집은 10분텀, 적재는 1시간 텀, 등등 주기성(스케줄링)을 다르게 구성하여 운용 가능함
- 각자 스케줄이 달라야 함
- 코드가 짧아질 수 있음
- 단점
- DAG간 연결고리 구성 신중
- 트리거, 데이터셋이 완성되었는지 감시하여 자동 진행되게 구성
- 데이터 전달 -> XCom 사용 불가
- conf등 구성공간을 이용하여 파일 경로등 전달
- 에러 모니터링 난이도 상승 (여러개의 dag 체크)
- DAG간 연결고리 구성 신중
- 장점
- 싱글 DAG
- 결론
- 서로 다른 주기성을 가지고 관리 주체가 다르면 => 나눈다 => 멀티 DAG로 구성
06_multi_dag_1step_extract.py
# TriggerDagRunOperator = 트리거를 직접 발동시켜서 1번이 완료되면 2번 DAG를 실행시킴
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging
import json
import random
import os
DATA_PATH = '/opt/airflow/dags/data'
os.makedirs(DATA_PATH, exist_ok=True)
def _extract_data_sensor(**kwargs):
# kwargs <- airflow context 정보가 전달됨
# 스마트 팩토리에 설치된 온도 센서 데이터가 어딘가(데이터 레이크 : s3)에 쌓이고있다 -> 추출해서 가져온다
# 여기서는 더미로 구성
data = [ {
"sensor_id" : f"SENSOR_{i+1}", # 장비 id
"timestamp" : datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # YYYY-MM-DD hh:mm:ss
"temperature" : round( random.uniform( 20.0, 150.0 ), 2 ),
"status" : "on" # on/off
} for i in range(10) ]
# 더미로 만든 데이터를 파일로 저장 -> /opt/airflow/dags/data/sensor_data_수행날짜.json
file_path = f'{DATA_PATH}/sensor_raw_data_{kwargs['ds_nodash']}.json'
with open(file_path, 'w') as f:
json.dump(data , f)
# XCom을 통해서 다음 task에 접근 가능함
# 다음 테스크에게 무엇을 전달할 것인가? 1) 데이터(저용량일때 가볍게,그러나 사용 자제) 2) [v] 파일 경로(로컬, S3)
logging.info(f'extract 완료 데이터 = {file_path}')
return file_path
pass
with DAG(
dag_id = "06_multi_dag_1step_extract",
description = "extract 전용 DAG",
default_args = {
'owner' : 'de_1team_manager'
},
schedule_interval = "@daily",
start_date = datetime(2025,1,1),
catchup = False,
tags = ["extract", "etl"]
) as dag:
# 오퍼레이터
extract_data = PythonOperator(
task_id = 'extract_data_sensor',
python_callable = _extract_data_sensor
)
# 다음 DAG를 실행시키기 위한 트리거 발동
trigger_transform = TriggerDagRunOperator(
task_id = "trigger_transform",
# 트리거의 대상 -> 누구를 어떤 DAG를 구동시킬것인가
trigger_dag_id = "06_multi_dag_2step_transform",
# 전달할 데이터 (JSON의 경로)
# jinja 템플릿 엔진을 통해 XCom의 전달된 데이터를 동적 세팅
conf = {"json_path":"{{task_instance.xcom_pull(task_ids='extract_data_sensor')}}"},
# DAG의 실행시간을 리셋 => 동일하게 맞춘다
# TriggerDagRunOperator의 수행시간을 PythonOperator(바로 위에 있는, 혹은 직전)의 수행시간에 맞춤
reset_dag_run = True,
wait_for_completion = False # 수행되면 대기 없이 그대로 종료(비동기)
)
# 의존성
extract_data >> trigger_transform
추출의 기능을 하는 파이썬 코드이다. 이전과는 다르게 데이터를 주고 받을때 XCom을 사용하지 못하여 트리거 오퍼레이터를 이용해서 데이터를 옮겨주는것이 포인트다.
이 떄 데이터를 직접 주는것이 아닌, 경로를 전달해주어 데이터가 오고ㄱ갈때 오류가 생기지 않는것이 포인트다.
지금은 더미데이터지만 실제로 데이터를 몇만개, 몇십만개가 오고간다고 가정하면 무조건 문제가 생기기 때문이다.
06_multi_dag_2step_transform.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging
import os
import pandas as pd
DATA_PATH = '/opt/airflow/dags/data'
def _transform_Data_std_change(**kwargs):
# 1. extract_data_sensor task에서 전달한 file_path 획득 (다른덱에서 전달한 conf 이용 /xcom 사용 불가)
dag_run = kwargs['dag_run']
# 멀티 DAG에서 데이터 통신하기 (수신)
json_file_path = dag_run.conf.get('json_path')
if not json_file_path:
# 예외 발생 => 실패 처리
logging.error('1step에서 conf를 통해 데이터 전달(획득) 실패')
raise ValueError('1step에서 conf를 통해 데이터 전달(획득) 실패')
# 2. 해당 데이터를 DataFrame으로 로드
df = pd.read_json(json_file_path) # 경로에 맞춰서 알아서 데이터를 로드
# 3. 전처리 수행 -> 섭씨를 화씨로 변환처리 (화씨(F) = (섭씨(C) x 9/5) + 32 ) -> 파생 변수 'temperature_f'
# 컨셉 => 우리 공장에서는 측정 온도를 100도 이하만 정상 데이터로 간주한다
# (100도 이상이면 이상치로 간주) => 이상치 제거 or 100도 이하만 추출
# => 불리언 인덱싱 : df [ 조건식 ]
target_df = df[ df['temperature'] < 100 ].copy() #(deep=True) # 데이터 크기 따라서 선택
target_df['temperature_f'] = (target_df['temperature'] * 9/5) + 32
# 4. 전처리된 데이터를 저장 => 동일 공간에 파일명만 preprocessing_data_{ds_dodash}.csv
file_path = f'{DATA_PATH}/preprocessing_data_{kwargs['ds_nodash']}.csv'
target_df.to_csv(file_path, index = False)
logging.info(f"transform 데이터 전처리 후 csv 저장 완료={file_path}")
# 5. 저장된 csv경로를 다음 task에서 사용할 수 있게 반환 처리 (df -> csv, 인덱스 제거)
return file_path
pass
with DAG(
dag_id = "06_multi_dag_2step_transform",
description = "transform 전용 DAG",
default_args = {
'owner' : 'de_1team_manager',
},
schedule_interval = "@daily",
start_date = datetime(2025,1,1),
catchup = False,
tags = ["transform", "etl"]
) as dag:
# 오퍼레이터
transform_data = PythonOperator(
task_id = 'transform_Data_std_change',
python_callable = _transform_Data_std_change
)
# 다음 DAG를 실행시키기 위한 트리거 발동
trigger_load = TriggerDagRunOperator(
task_id = "trigger_load",
# 트리거의 대상 -> 누구를 어떤 DAG를 구동시킬것인가
trigger_dag_id = "06_multi_dag_3step_load",
# 전달할 데이터 (JSON의 경로)
# jinja 템플릿 엔진을 통해 XCom의 전달된 데이터를 동적 세팅
conf = {"csv_path":"{{task_instance.xcom_pull(task_ids='transform_Data_std_change')}}"},
# DAG의 실행시간을 리셋 => 동일하게 맞춘다
# TriggerDagRunOperator의 수행시간을 PythonOperator(바로 위에 있는, 혹은 직전)의 수행시간에 맞춤
reset_dag_run = True,
wait_for_completion = False # 수행되면 대기 없이 그대로 종료(비동기)
)
# 의존성
transform_data >> trigger_load
그 이후로는 크게 다르지 않다, 틀은 기본적으로 모두 같으며 그 안의 디테일이 조금씩 달라지는 느낌
06_multi_dag_3step_load.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging
import os
import pandas as pd
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
def _load_data_mysql(**kwargs):
# csv => mysql, 이를 위해서 MysqlHook 사용
# 1. csv 경로 획득
# 멀티 DAG에서 데이터 통신하기 (수신)
csv_file_path = kwargs['dag_run'].conf.get('csv_path')
if not csv_file_path:
# 예외 발생 => 실패 처리
logging.error('2step에서 conf를 통해 데이터 전달(획득) 실패')
raise ValueError('2step에서 conf를 통해 데이터 전달(획득) 실패')
# 1.5 csv -> df로 read
df = pd.read_csv(csv_file_path)
# 2. 연결 => I/O (예외처리, with문)
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
conn = mysql_hook.get_conn() # 커넥션 객체 획득
try:
with conn.cursor() as cursor: # 커서 획득
# 0. 테이블 생성 -> MySqlOperator에서 안하고 직접 task에서 합병함
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INT AUTO_INCREMENT PRIMARY KEY,
sensor_id VARCHAR(50),
timestamp DATETIME,
temperature_c FLOAT,
temperature_f FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
# 1. 쿼리문 준비
sql = '''
insert into sensor_readings
(sensor_id, timestamp, temperature_c, temperature_f)
values (%s, %s, %s, %s)
'''
# 2. 데이터별, 컬럼별 추출하여 쿼리 수행 (executemany() )
# params = list() # [ (값, 값, 값, 값) (), () ] # status는 고정이라 제외
params = [
( data['sensor_id'] ,data['timestamp'] ,data['temperature'] ,data['temperature_f'] )
for _, data in df.iterrows()
]
cursor.executemany( sql, params)
# 3. 커밋
conn.commit()
logging.info('mysql에 데이터 삽입(적재) 성공(success)')
except Exception as e:
logging.error(f'mysql에 데이터 삽입(적재)중 오류 발생 {e}')
raise ValueError('mysql에 데이터 삽입(적재)중 오류 발생')
finally:
if conn:
conn.close()
logging.info('mysql에 데이터 삽입(적재) 완료')
pass
with DAG(
dag_id = "06_multi_dag_3step_load",
description = "load 전용 DAG",
default_args = {
'owner' : 'de_1team_manager',
},
schedule_interval = "@daily",
start_date = datetime(2025,1,1),
catchup = False,
tags = ["load", "etl"]
) as dag:
load_data = PythonOperator(
task_id = 'load_data_mysql',
python_callable = _load_data_mysql
)
최종적으로 적재되는 모습을 확인하자.

'ASAC-SK플래닛 T아카데미 데이터 엔지니어' 카테고리의 다른 글
| 26.01.13 67일차 [Airflow : Dataset 감지, AWS S3 연동] (0) | 2026.01.13 |
|---|---|
| 26.01.12 66일차 [Airflow : MSA 기반 파이프라인 구성] (0) | 2026.01.12 |
| 26.01.08 64일차 [Airflow : 기본 개념, dag, operator 로컬 실습] (0) | 2026.01.08 |
| 26.01.06~26.01.07 62~63일차 [ 클라우드 , aws 특강] (1) | 2026.01.08 |
| 25.12.26 ~ 26.01.05 56~61일차 [ 미니 프로젝트 : Ai Trouble Shooter ] (0) | 2026.01.08 |