ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.09 65일차 [Airflow : Mysql 연동, multi DAG 구현]

Datadesigner 2026. 1. 9. 17:31

오늘은 어제에 이어서 05번 파일부터 리뷰하겠다.


05_mysql_etl.py

 

  • DAG
    • 1개의 DAG에서 ETL 수행 <-> N개의 DAG에서 ETL 수행
    • task1
      • 스마트 팩토리 상에 온도 센서에서 온도 측정
        • 해당 측정된 데이터가 어딘가에 쌓이고 있다 (원시데이터)
        • 이 데이터를 추출 => 더미 구성
    • task2
      • 추출된 데이터를 가지고 와서 변환과정 (전처리) 수행
        • 온도값을 보정, 향후 모델이나 등등에서 사용되는 단위로 인코딩
        • 여기서는 단순하게 환산, 수치연산만 수행
    • task3
      • 변환된 데이터는 mysql에 특정 테이블에 적재
        • 소량의 데이터는 RDB 가능, -> mysql에 적재
        • 대량의 데이터는 S3 같은 클라우드 기반 서비스에 적재
 

 

  • 작업 완료 후 확인
mysql> select * from sensor_readings;
+----+-----------+---------------------+---------------+---------------+---------------------+
| id | sensor_id | timestamp           | temperature_c | temperature_f | created_at          |
+----+-----------+---------------------+---------------+---------------+---------------------+
|  1 | SENSOR_1  | 2026-01-09 14:31:02 |         61.35 |        142.43 | 2026-01-09 05:31:05 |
|  2 | SENSOR_2  | 2026-01-09 14:31:02 |         66.12 |       151.016 | 2026-01-09 05:31:05 |
|  3 | SENSOR_3  | 2026-01-09 14:31:02 |         77.17 |       170.906 | 2026-01-09 05:31:05 |
|  4 | SENSOR_6  | 2026-01-09 14:31:02 |         43.44 |       110.192 | 2026-01-09 05:31:05 |
|  5 | SENSOR_7  | 2026-01-09 14:31:02 |         85.44 |       185.792 | 2026-01-09 05:31:05 |
|  6 | SENSOR_9  | 2026-01-09 14:31:02 |         96.56 |       205.808 | 2026-01-09 05:31:05 |
|  7 | SENSOR_10 | 2026-01-09 14:31:02 |         52.77 |       126.986 | 2026-01-09 05:31:05 |
+----+-----------+---------------------+---------------+---------------+---------------------+
7 rows in set (0.00 sec)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging 

# mysql
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook

# 데이터 
import json 
import random
import pandas as pd
import os 

# 프로젝트 폴더 내부에 /dags/data 에 내부에서 task 과정에 생성되는 파일을 동기화하여 확인할 수 있게
# 위치를 선정
DATA_PATH = '/opt/airflow/dags/data' # 도커 내부에 있는 서비스(리눅스 기반)의 특정 경로
os.makedirs(DATA_PATH, exist_ok=True)

def _extract_data_sensor(**kwargs):
    # kwargs <- airflow context 정보가 전달됨
    # 스마트 팩토리에 설치된 온도 센서 데이터가 어딘가(데이터 레이크 : s3)에 쌓이고있다 -> 추출해서 가져온다
    # 여기서는 더미로 구성
    data = [ {
        "sensor_id"   : f"SENSOR_{i+1}",  # 장비 id
        "timestamp"   : datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # YYYY-MM-DD hh:mm:ss  
        "temperature" : round( random.uniform( 20.0, 150.0 ), 2 ),  
        "status"      : "on" # on/off
    } for i in range(10)  ]

    # 더미로 만든 데이터를 파일로 저장 -> /opt/airflow/dags/data/sensor_data_수행날짜.json
    file_path = f'{DATA_PATH}/sensor_data_{kwargs['ds_nodash']}.json'
    with open(file_path, 'w') as f:
        json.dump(data , f)

    # XCom을 통해서 다음 task에 접근 가능함
    # 다음 테스크에게 무엇을 전달할 것인가? 1) 데이터(저용량일때 가볍게,그러나 사용 자제) 2) [v] 파일 경로(로컬, S3)
    logging.info(f'extract 완료 데이터 = {file_path}')
    return file_path
    pass

def _transform_Data_std_change(**kwargs):
    # 1. extract_data_sensor task에서 전달한 file_path 획득 (XCom 이용)
    ti = kwargs['ti']
    json_file_path = ti.xcom_pull(task_ids='extract_data_sensor')

    # 2. 해당 데이터를 DataFrame으로 로드
    df = pd.read_json(json_file_path) # 경로에 맞춰서 알아서 데이터를 로드

    # 3. 전처리 수행 ->  섭씨를 화씨로 변환처리 (화씨(F) = (섭씨(C) x 9/5) + 32 ) -> 파생 변수 'temperature_f'
    #    컨셉 => 우리 공장에서는 측정 온도를 100도 이하만 정상 데이터로 간주한다
    #           (100도 이상이면 이상치로 간주) => 이상치 제거 or 100도 이하만 추출
    #           => 불리언 인덱싱 : df [ 조건식 ]
    target_df = df[ df['temperature'] < 100 ].copy() #(deep=True) # 데이터 크기 따라서 선택
    target_df['temperature_f'] = (target_df['temperature'] * 9/5) + 32

    # 4. 전처리된 데이터를 저장 => 동일 공간에 파일명만 preprocessing_data_{ds_dodash}.csv
    file_path = f'{DATA_PATH}/preprocessing_data_{kwargs['ds_nodash']}.csv'
    target_df.to_csv(file_path, index = False)
    logging.info(f"transform 데이터 전처리 후 csv 저장 완료={file_path}")
    # 5. 저장된 csv경로를 다음 task에서 사용할 수 있게 반환 처리 (df -> csv, 인덱스 제거)
    return file_path
    pass

def _load_data_mysql(**kwargs):
    # csv => mysql, 이를 위해서 MysqlHook 사용
    # 1. csv 경로 획득
    ti = kwargs['ti']
    csv_file_path = ti.xcom_pull(task_ids='transform_Data_std_change')
    
    # 1.5 csv -> df로 read
    df = pd.read_csv(csv_file_path)

    # 2. 연결 => I/O (예외처리, with문)
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = mysql_hook.get_conn() # 커넥션 객체 획득
    try:
        with conn.cursor() as cursor:   # 커서 획득
            # 1. 쿼리문 준비 
            sql = '''
                insert into sensor_readings
                (sensor_id, timestamp, temperature_c, temperature_f)
                values (%s, %s, %s, %s)
            '''
            # 2. 데이터별, 컬럼별 추출하여 쿼리 수행 (executemany() )
            # params = list() # [ (값, 값, 값, 값) (), () ] # status는 고정이라 제외
            params = [
                ( data['sensor_id'] ,data['timestamp'] ,data['temperature'] ,data['temperature_f'] )
                for _, data in df.iterrows()
            ]
            cursor.executemany( sql, params)
            # 3. 커밋
            conn.commit()
            logging.info('mysql에 데이터 삽입(적재) 성공(success)')
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적재)중 오류 발생 {e}')
        pass
    finally:
        if conn:
            conn.close()
            logging.info('mysql에 데이터 삽입(적재) 완료')
    pass

# DAG
with DAG(
    dag_id              = "05_mysql_etl_v1",   
    description         = "etl을 수행하여 mysql에 적재", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=1) 
},       
    schedule_interval   = "@daily",
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["mysql", "etl"]    
) as dag:
    # operator(task)
    create_table   = MySqlOperator(
        task_id       = 'create_table',
        mysql_conn_id = "mysql_default", # UI > Admin > connections 등록된 내용과 동일하게 구성
        # 여러번 수행할 수 있으므로 if not exists => 존재하지 않으면 생성
        # 기재하지 않으면 2번째 주기에서 task 구동시 실패 발생할 수 있음
        # sql을 등록하시면 task instance 수행시 반드시 쿼리가 작동함
        sql           = '''
            CREATE TABLE IF NOT EXISTS sensor_readings (
                id INT AUTO_INCREMENT PRIMARY KEY,
                sensor_id VARCHAR(50),
                timestamp DATETIME,
                temperature_c FLOAT,
                temperature_f FLOAT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
                        
        '''
    )
    extract_data   = PythonOperator(
        task_id = 'extract_data_sensor',
        python_callable = _extract_data_sensor
    )
    transform_data = PythonOperator(
        task_id = 'transform_Data_std_change',
         python_callable = _transform_Data_std_change
    )
    load_data      = PythonOperator(
        task_id = 'load_data_mysql',
         python_callable = _load_data_mysql
    )
    # 의존성 
    # 테이블 생성 >> Extract >> Transform >> Load
    create_table >> extract_data >> transform_data >> load_data

어렵다...정말로 어렵다,,,, 과정마다 디테일이 너무너무 다르다

 

이번에는 mysql과 airflow를 연동시켜 데이터를 전처리하고 전처리된 데이터를 저장하며 적재하는 과정까지를 실습하였다.

 

이 과정에서 더미데이터를 형성한 이후 mysql을 사용한다가정하고 전처리와 예전에 배운 sql 쿼리를 사용하여 데이터에 파생변수를 추가하였다.

 

마지막은 결과값이다,

 


06_멀티 DAG로 ETL

 

  • DAG
    • 1개의 DAG에서 ETL 수행 <-> N개의 DAG에서 ETL 수행
  • 개요
    • 하나의 DAG에서 구성된 긴 파이프라인을 여러개의 DAG로 쪼개고 싶다
    • DAG의 분리
      • 1개의 DAG이 너무 커진다 (코드가 너무 길다)
        • 가독성이 떨어짐등 문제 발생
      • 팀 간 협업, 업무 분장 때문에 분할
        • A팀이 추출
        • B팀이 가공
        • C팀이 적재
      • 관리 난이도가 높아질 수 있음 (여러개를 바라보고 추적해야함)
      • 선택적으로 구성

  • 유형
    • 싱글 DAG
      • 장점
        • 한 눈에 모든 파이프라인 구성, 파악
        • 중간 실패시 즉시 오류 지점 확인 가능함
      • 단점
        • 코드가 길어질 수 있음 (난이도 상승)
    • 멀티 DAG
      • 장점
        • 각 단계별로 독립적 개발/배포 가능
        • 수집은 10분텀, 적재는 1시간 텀, 등등 주기성(스케줄링)을 다르게 구성하여 운용 가능함
          • 각자 스케줄이 달라야 함
        • 코드가 짧아질 수 있음
      • 단점
        • DAG간 연결고리 구성 신중
          • 트리거, 데이터셋이 완성되었는지 감시하여 자동 진행되게 구성
        • 데이터 전달 -> XCom 사용 불가
          • conf등 구성공간을 이용하여 파일 경로등 전달
        • 에러 모니터링 난이도 상승 (여러개의 dag 체크)
  • 결론
    • 서로 다른 주기성을 가지고 관리 주체가 다르면 => 나눈다 => 멀티 DAG로 구성

 

06_multi_dag_1step_extract.py

# TriggerDagRunOperator = 트리거를 직접 발동시켜서 1번이 완료되면 2번 DAG를 실행시킴
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging 
import json
import random
import os

DATA_PATH = '/opt/airflow/dags/data' 
os.makedirs(DATA_PATH, exist_ok=True)

def _extract_data_sensor(**kwargs):
    # kwargs <- airflow context 정보가 전달됨
    # 스마트 팩토리에 설치된 온도 센서 데이터가 어딘가(데이터 레이크 : s3)에 쌓이고있다 -> 추출해서 가져온다
    # 여기서는 더미로 구성
    data = [ {
        "sensor_id"   : f"SENSOR_{i+1}",  # 장비 id
        "timestamp"   : datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # YYYY-MM-DD hh:mm:ss  
        "temperature" : round( random.uniform( 20.0, 150.0 ), 2 ),  
        "status"      : "on" # on/off
    } for i in range(10)  ]

    # 더미로 만든 데이터를 파일로 저장 -> /opt/airflow/dags/data/sensor_data_수행날짜.json
    file_path = f'{DATA_PATH}/sensor_raw_data_{kwargs['ds_nodash']}.json'
    with open(file_path, 'w') as f:
        json.dump(data , f)

    # XCom을 통해서 다음 task에 접근 가능함
    # 다음 테스크에게 무엇을 전달할 것인가? 1) 데이터(저용량일때 가볍게,그러나 사용 자제) 2) [v] 파일 경로(로컬, S3)
    logging.info(f'extract 완료 데이터 = {file_path}')
    return file_path
    pass



with DAG(
    dag_id              = "06_multi_dag_1step_extract",   
    description         = "extract 전용 DAG", 
    default_args        = {
        'owner'           : 'de_1team_manager'
},       
    schedule_interval   = "@daily",
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["extract", "etl"]    

) as dag:
        # 오퍼레이터
        extract_data   = PythonOperator(
            task_id = 'extract_data_sensor',
            python_callable = _extract_data_sensor
    )
        # 다음 DAG를 실행시키기 위한 트리거 발동
        trigger_transform = TriggerDagRunOperator(
            task_id        = "trigger_transform",
            # 트리거의 대상 -> 누구를 어떤 DAG를 구동시킬것인가
            trigger_dag_id = "06_multi_dag_2step_transform",
            # 전달할 데이터 (JSON의 경로)
            # jinja 템플릿 엔진을 통해 XCom의 전달된 데이터를 동적 세팅
            conf           =  {"json_path":"{{task_instance.xcom_pull(task_ids='extract_data_sensor')}}"},
            # DAG의 실행시간을 리셋 => 동일하게 맞춘다
            # TriggerDagRunOperator의 수행시간을 PythonOperator(바로 위에 있는, 혹은 직전)의 수행시간에 맞춤
            reset_dag_run = True, 
            wait_for_completion = False # 수행되면 대기 없이 그대로 종료(비동기)
        )
        # 의존성
        extract_data >> trigger_transform

추출의 기능을 하는 파이썬 코드이다. 이전과는 다르게 데이터를 주고 받을때 XCom을 사용하지 못하여 트리거 오퍼레이터를 이용해서 데이터를 옮겨주는것이 포인트다.

 

이 떄 데이터를 직접 주는것이 아닌, 경로를 전달해주어 데이터가 오고ㄱ갈때 오류가 생기지 않는것이 포인트다.

 

지금은 더미데이터지만 실제로 데이터를 몇만개, 몇십만개가 오고간다고 가정하면 무조건 문제가 생기기 때문이다.

 

06_multi_dag_2step_transform.py

 

 

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging 
import os
import pandas as pd

DATA_PATH = '/opt/airflow/dags/data' 

def _transform_Data_std_change(**kwargs):
    # 1. extract_data_sensor task에서 전달한 file_path 획득 (다른덱에서 전달한 conf 이용 /xcom 사용 불가)
    dag_run = kwargs['dag_run']
    # 멀티 DAG에서 데이터 통신하기 (수신)
    json_file_path = dag_run.conf.get('json_path')
    if not json_file_path:
        # 예외 발생 => 실패 처리
        logging.error('1step에서 conf를 통해 데이터 전달(획득) 실패')
        raise ValueError('1step에서 conf를 통해 데이터 전달(획득) 실패')

    # 2. 해당 데이터를 DataFrame으로 로드
    df = pd.read_json(json_file_path) # 경로에 맞춰서 알아서 데이터를 로드

    # 3. 전처리 수행 ->  섭씨를 화씨로 변환처리 (화씨(F) = (섭씨(C) x 9/5) + 32 ) -> 파생 변수 'temperature_f'
    #    컨셉 => 우리 공장에서는 측정 온도를 100도 이하만 정상 데이터로 간주한다
    #           (100도 이상이면 이상치로 간주) => 이상치 제거 or 100도 이하만 추출
    #           => 불리언 인덱싱 : df [ 조건식 ]
    target_df = df[ df['temperature'] < 100 ].copy() #(deep=True) # 데이터 크기 따라서 선택
    target_df['temperature_f'] = (target_df['temperature'] * 9/5) + 32

    # 4. 전처리된 데이터를 저장 => 동일 공간에 파일명만 preprocessing_data_{ds_dodash}.csv
    file_path = f'{DATA_PATH}/preprocessing_data_{kwargs['ds_nodash']}.csv'
    target_df.to_csv(file_path, index = False)
    logging.info(f"transform 데이터 전처리 후 csv 저장 완료={file_path}")
    # 5. 저장된 csv경로를 다음 task에서 사용할 수 있게 반환 처리 (df -> csv, 인덱스 제거)
    return file_path
    pass



with DAG(
    dag_id              = "06_multi_dag_2step_transform",   
    description         = "transform 전용 DAG", 
    default_args        = {
        'owner'           : 'de_1team_manager',
},       
    schedule_interval   = "@daily",
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["transform", "etl"]    

) as dag:
        # 오퍼레이터
    transform_data = PythonOperator(
        task_id = 'transform_Data_std_change',
         python_callable = _transform_Data_std_change
    )
        # 다음 DAG를 실행시키기 위한 트리거 발동
    trigger_load = TriggerDagRunOperator(
        task_id        = "trigger_load",
        # 트리거의 대상 -> 누구를 어떤 DAG를 구동시킬것인가
        trigger_dag_id = "06_multi_dag_3step_load",
        # 전달할 데이터 (JSON의 경로)
        # jinja 템플릿 엔진을 통해 XCom의 전달된 데이터를 동적 세팅
        conf           =  {"csv_path":"{{task_instance.xcom_pull(task_ids='transform_Data_std_change')}}"},
        # DAG의 실행시간을 리셋 => 동일하게 맞춘다
        # TriggerDagRunOperator의 수행시간을 PythonOperator(바로 위에 있는, 혹은 직전)의 수행시간에 맞춤
        reset_dag_run = True, 
        wait_for_completion = False # 수행되면 대기 없이 그대로 종료(비동기)
        )
        # 의존성
    transform_data >> trigger_load

 그 이후로는 크게 다르지 않다, 틀은 기본적으로 모두 같으며 그 안의 디테일이 조금씩 달라지는 느낌

06_multi_dag_3step_load.py

 

 

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator # 핵심
import logging 
import os
import pandas as pd
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook



def _load_data_mysql(**kwargs):
    # csv => mysql, 이를 위해서 MysqlHook 사용
    # 1. csv 경로 획득
    # 멀티 DAG에서 데이터 통신하기 (수신)
    csv_file_path = kwargs['dag_run'].conf.get('csv_path')
    if not csv_file_path:
        # 예외 발생 => 실패 처리
        logging.error('2step에서 conf를 통해 데이터 전달(획득) 실패')
        raise ValueError('2step에서 conf를 통해 데이터 전달(획득) 실패')

    # 1.5 csv -> df로 read
    df = pd.read_csv(csv_file_path)

    # 2. 연결 => I/O (예외처리, with문)
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    conn = mysql_hook.get_conn() # 커넥션 객체 획득
    try:
        with conn.cursor() as cursor:   # 커서 획득

            # 0. 테이블 생성 -> MySqlOperator에서 안하고 직접 task에서 합병함
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS sensor_readings (
                id INT AUTO_INCREMENT PRIMARY KEY,
                sensor_id VARCHAR(50),
                timestamp DATETIME,
                temperature_c FLOAT,
                temperature_f FLOAT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
                        
        ''')
            # 1. 쿼리문 준비 
            sql = '''
                insert into sensor_readings
                (sensor_id, timestamp, temperature_c, temperature_f)
                values (%s, %s, %s, %s)
            '''
            # 2. 데이터별, 컬럼별 추출하여 쿼리 수행 (executemany() )
            # params = list() # [ (값, 값, 값, 값) (), () ] # status는 고정이라 제외
            params = [
                ( data['sensor_id'] ,data['timestamp'] ,data['temperature'] ,data['temperature_f'] )
                for _, data in df.iterrows()
            ]
            cursor.executemany( sql, params)
            # 3. 커밋
            conn.commit()
            logging.info('mysql에 데이터 삽입(적재) 성공(success)')
    except Exception as e:
        logging.error(f'mysql에 데이터 삽입(적재)중 오류 발생 {e}')
        raise ValueError('mysql에 데이터 삽입(적재)중 오류 발생')
    finally:
        if conn:
            conn.close()
            logging.info('mysql에 데이터 삽입(적재) 완료')
    pass


with DAG(
    dag_id              = "06_multi_dag_3step_load",   
    description         = "load 전용 DAG", 
    default_args        = {
        'owner'           : 'de_1team_manager',
},       
    schedule_interval   = "@daily",
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["load", "etl"]    

) as dag:
    load_data      = PythonOperator(
        task_id = 'load_data_mysql',
         python_callable = _load_data_mysql
    )

최종적으로 적재되는 모습을 확인하자.