ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.08 64일차 [Airflow : 기본 개념, dag, operator 로컬 실습]

Datadesigner 2026. 1. 8. 16:46

오늘은 특강이 끝나고 Airflow 수업을 첫 시작하게 되었다.

 


개요

 

  • 정의
    • 워크플로우 오케스트레이션
      • 연속된 작업(연결된 작업)을 지휘하는 지휘자 역할
    • 지휘자(Airflow)는 직접적인 작업을 수행하지는 않는다
    • 실질적인 작업(수집, 추출, 전처리, 적재, . . .)DAG이 담당
      • 언제 시작, 언제 종료(지정, 완료되면), 순서 등등 관리 -> 지휘자 -> Airflow
    • Airflow 실질적인(DAG) 일을 명령하는 관리자 역할
 

  • 기존 작업관리자와 차이점
    • 리눅스의 Crontab or 윈도우 작업스케줄러 차이점
      • 자동화를 위해서 특정 시간에, 특정 텀으로, 특정 업무를 수행하는 내용 등록하는 명령어/프로그램 임
      • 앞선 작업의 성공/실패 여부에 따른 분기 처리등 유동적인 처리가 불가능함 => 가장 큰 차이점 => Airflow를 이용하면 상황에 맞게 조치가 가능함(프로그래밍 가능)
  • 도입 이유
    • 의존성 관리(Dependency)
      • A가 성립,성공해야지만 B를 실행할 수 있다
    • 후행 및 재시도 (retry)
      • 실패했다면 -> 5분 후에 다시 시도 -> 계속(즉시) 안되면 -> 관리자에게 장애 알람 전송
    • 백필(backfill)
      • 데이터를 수집하는 타겟 사이트(혹은 장비)의 서버 장애로 데이터가 5일치가 누락되었다 -> 대상(사이트 혹은 장비) 쪽에 누락된 데이터가 존재한다면 -> 해당 데이터를 오늘까지 순서대로 다시 수집(*돌린다) 가능
 

  • 4대 핵심 아키텍쳐(구성원)
    • 스케줄러
      • 24시간 365일 계속 가동중(깨어있음)
      • 현재 시점 -> 지금 실행해야 할 작업이 존재하는가? 체크(감시) -> 조건이 맞으면(작업이 존재하면) 작업을 executor에게 전송(내부통신 진행)
    • 웹 서버
      • 모니터링, 작업 조절 등등 관리페이지 필요
        • 대시보드 제공해줌 -> 관리 모니터링 가능
      • 기본 구성 => localhost:8080 페이지 자동 제공
        • 8080 시그니처 포트 -> 자바계얼, springboot
    • 메타데이터 db
      • 해당 스케줄등은 데이터베이스(RDB)에서 관리
        • Postgres, mysql 등등 가능
        • 기본 환경구성도 airflow 공홈에서 docker-compose 파일 제공
      • 필요시 NoSQL 사용 (Redis)
      • 저장내용 예시
        • 아침에 작업한 작업이 성공했다 -> 로그
        • 특정 DAG는 오늘 17시에 작동할 예정이다 -> 스케줄
        • 모든 상태 정보를 저장(기억)함
        • 스케줄러, 웹서버(대시보드)는 DB를 바라보면서 상호 대화(내부 통신)진행
    • executor / worker (실행기 / 일꾼)
      • 실제 작업을 수행하는 포지션
      • LocalExecutor
        • 로컬 PC -> 1개의 컴퓨터(머신/서버)에서 프로세스로 작동 -> 일 수행 (금일/내일 실습환경)
      • CeleryExecutor / KubernatesExecutor
        • N개의 서버를 가동시켜서 작업을 분산 -> 대용량 처리 (차주 클라우드 환경에서 실습)
 

  • 핵심 구성
    • DAG (Directed Acyclic Graph)
      • 방향성 비순환 그래프, 순환(loop)없는 작업 흐름도
        • DAG 내부에서 직접 지정
        • 앞 단계의 TASK가 완료되면 직접 호출
        • 앞 단계의 TASK산출물이 확인되면 이벤트에 의해 호출
      • 무한 루프가 금지된 일반 통행 작업
        • A -> B -> C (O)
        • A -> B -> A (X, 무한루프)
      • 어떤 작업을 수행할지 작성하는 프로그램 단위
        • *.py로 작성
    • Operator
      • DAG를 구성하는 블럭
      • DAG가 실제로 작업하게 될 타 플랫폼(리소스)환경
      • 어떤 일을 할지 정의된 템플릿
        • BashOperator (리눅스 기반 명령어 수행)
        • PythonOperator (파이썬 수행)
        • SQLOperator (DB쿼리)
        • . .
    • Task
      • Operator 수행시 구분할 수 있는(지칭하는) 이름
      • DAG 안에서는 노드 1개 의미
    • Task Instance
      • Task + 스케줄(시간/날짜) 결합된 단위
      • Task(코드. .) + Task Instance (2026년 1월 20일 17시에 실행되는 .Task)
      • 대시보드상 DAG 단위의 상태 정보에서 사각박스 단위로 표기 (작업 1개 수행되어 성공 => 녹색박스 하나 배치됨)
      • 주기단위에 따라 특정 간격 단위로 발생

  • 주의 사항 (내용만 기억)
    • 동일한 코드는 한번 or N번 수행되더라도 결과는 항상 동일해야 한다 => 멱등성
      • 중복된 데이터가 쌓이지 않도록 => 필요시 재시도(설정따라 다름, 실패하면 재시도(기본 컨셉))를 막아야 함

로컬 pc 에서 설치 및 운용

 

  • 도커 기반, 도커 컴포즈로 자동 구성
    • yml 파일은 오피셜 사이트에서 제공
  • 기본 구성단계
    • 1 step
      • docker 클라이언트 구동
    • 2 step
      • vscode 작동
      • ~/project/airflow-local
    • 3 step
      #
        # 특정 폴더에 직접 다운로드
        # 윈도우
        Invoke-WebRequest -Uri 'https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml' -OutFile 'docker-compose.yaml'
      
        # 맥/리눅스
        curl -Lfo 'https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml'
      
    • 4 step
      • 구성 및 운영에 필요한(지정된 이름) 폴더 생성
        • airflow 내부적으로 폴더명 고정되어있음
      mkdir -p ./dags ./logs ./plugins ./config
      
    • 5 step
      • 사용자 id 설정 -> .env 파일 생성
        # 맥/리눅스
        echo -e "AIRFLOW_UID=$(id -u)" > .env
      
        # 세팅값 -> 50000은 yaml 파일 내부에 존재함
        AIRFLOW_UID=50000
      

 

  • 도커 기반 설치
    • 1 step
      • 초기화 (DB 설치(마이그레이션등)) 계정 생성
      docker compose up airflow-init
      
    • 2 step
      • 컨테이너 설치 및 실행
      docker compose up -d
      
    • 3 step
      • 대시보드(웹서버)접속
      • localhost : 8080
      • ID/PW : airflow -> 로그인 -> DAGS 페이지 확인

 

 

도커를 구동한 뒤 로컬호스트 8000에 들어가면 로그인 창이 뜬다, 아이디랑 비밀번호는  airflow이다. 

들어가면 현재 파이썬상에서 구성한 dag 파이썬 코드와 연동되어서 바로 대시보드로 나오게 된다,

 


DAG 기본형 구성


  • 프로젝트 내에 DAGS 폴더 하위에 py단위로 구성
  /airflow-local
  L dags
    L 01_basics_bash.py
    L 02_basics_python.py

01_basics_bash.py 작성

 

 

  • 확인 절차
    • 작성 완료 -> 저장 -> 대시보드 새로고침 (시간 지나면 자동으로 리프레쉬 됨)
    • DAG 내부 코드에 문제가 있으면 자동 오류 출력
      • 자동 검증 
      • t1 = BashOperator(   task_id      = '시간 출력', # 에러 발생   bash_command = 'date'
      )
    • ```
  • 정상 등록 후
    • 미작동중이라 로그 등 내용 부재
    • graph(작업간 의존성 관계확인), detail등 정보 확인
 

DAG 구동


 
  • 0단계
    • 태그 검색을 통해서 특정 DAG 검색 가능
  • 1단계
    • 개별 DAG 앞에 존재하는 토글 버튼(pause/unpause) ON 상태로 클릭
  • 2단계
    • ON 상태가 된 DAG 목록에 맨뒤로 이동 재생버튼(Trigger Button) 클릭하여 실행
    • Runs, Recent Task, Last Run, Next Run등 세팅 혹은 변경되는 것이 보임 -> 구체적인 체크는 차후 진행
  • 3단계
    • DAG 클릭 -> 상세 화면 진입 -> 왼쪽 작업 단위 그래프에서 사각 박스 선택
      • 사각 박스 : Task Instance
        • ⬜ 회색/Queued: 줄 서서 기다리는 중
        • 🟢 연두색/Running: 열심히 일하는 중
        • ✅ 진한 초록색/Success: 성공!
        • 🔴 빨간색/Failed: 에러 발생 (클릭해서 원인을 봐야 함)
    • 사각 박스별 로그 클릭 -> 확인
      • t1 -> t2 -> t3 순서대로 수행되었다

 

# 기본 골격, bash 오퍼레이터 적용
# airflow 상에서 dag가 어떻게 작동하는지 기초
# timedelta : 시간의 차이를 계산하는 함수
from datetime import datetime, timedelta
# DAG
from airflow import DAG
# 오퍼레이터 2.x
from airflow.operators.bash import BashOperator
# 오퍼레이터 3.x
#from airflow.providers.standard.operators.bash import BashOperator

# 1. 기본 인자 구성 -> Task에 적용될 매개변수
#    dag 소유주 (작성자, 관리자), 과거 데이터 누락본에 대한 소급 실행 여부,
#    작업 실패 (빈번하게(I/O일 확률이 높다) 발생될 수 있음) 발생했을때 재시도 여부 -> 1회만 등등 설정
#    실패 후 다시 시도할때 텀(간격) 설정, 몇 분 후 다시 시도

default_args = {
    'owner'           : 'de_1team_manager',  # DAG 주인
    'depends_on_past' : False,               # 과거 데이터 소급 처리 금지
    'retries'         : 1,                   # 작업 실패 시 재시도는 1회 자동 진행
    'retry_delay'     : timedelta(minutes=5) # 실패시 5분 후에 재시도 
    # 시나리오 => 작업 성공 => 완료
    # 시나리오 => 작업 실패 => 5분 후 => 1회 재시도 => 성공 => 완료
    # 시나리오 => 작업 실패 => 5분 후 => 1회 재시도 => 실패 => 해당 시점에서 작업 포기 (소급 방지)
}

# 2. DAG 정의
'''
with DAG() as dag:
    # 오퍼레이터 등등 .. 기술
    # 작업 순서 (의존성 고려) 지정
'''
with DAG(
    dag_id       = "01_basics_bash_v1",  # 고유한 값, DAG를 상호구분하는 용도
    description  = "DE를 위한 ETL 작업의 핵심 패키지 airflow 기본 연습용 DAG", # DAG 설명
    default_args = default_args,       # DAG의 기본 인자값
    schedule_interval = "@daily",      # 하루에 한번, 매일 00시 00분 00초에 실행 
    start_date   = datetime(2025,1,1), # 과거 날짜(샘플)로 설정, 특정 시점 세팅해도 됨 -> 소급 수행 가능성 존재함, 즉시 수행되는 장점
                                       # 과거 설정의 장점은 당장 지금부터 시점이 되면 수행됨
    catchup      = False,              # 과거에 대한 소급 처리 실행 방지     
    # 만약 Fasle 가 아니면 1년치(365일) + 현재 (8일) => 365 + 8회 수행이 됨 (소급 처리 진행)
    # 개발, 신규 서비스 러닝 => 소급처리 방지하여 구성
    tags         = ["bash","basic"]    # DAG가 많으면 찾기 힘듬 => 검색용
) as dag:
    # Operator -> 작업자 
    # Task 정의 -> 내용 자체에는 목적이 현재는 없음 (수행되는지만 관심을 가짐)
    # Task의 작동 내용을 로그를 통해서 확인 가능
    t1 = BashOperator(
        # 영문자, 숫자, 하이푼, 마침표, 밑줄만으로 구성되어야 합니다.
        task_id      = 'date_print', # airflow의 지휘를 통해서 dag 구동시 실제 할 일을 정의한 task 구분값
        bash_command = 'date'      # 리눅스의 date 명령
    )
    t2 = BashOperator(
        task_id      = 'sleep',
        bash_command = 'sleep 5'   # 5초 대기 
    )
    t3 = BashOperator(
        task_id      = 'print_echo',
        bash_command = 'echo "반갑습니다. Airflow start 123 !@#"'
    )

    # 의존성
    # t1 실행 -> t2 실행 -> t3 실행
    # t1 실행이 성공해야만 t2 실행함, t2 실행이 성공해야만 t3 실행
    # 대시보드에서 Graph를 통해서 노드의 연결방향이 표현됨
    t1 >> t2 >> t3
    pass

 

이렇게 우리가 세팅한 task가 의존도에 따라 순서대로 작업이 진행된다.

 

02_basics_python.py

 

 

  • ./dags/02_basics_python.py 생성
    • 파이썬의 함수를 task의 주된 내용으로 사용
    • task와 task간 통신 XCom(Cross Communication) 확인
      • 함수가 뭔가 반환하면 이것을 가져와서 사용
      • XCom이라는 내부 게시판에 특정 내용을 게시하면 이것을 접근해서 가져온다

  • airflow context
{'conf': <***.configuration.AirflowConfigParser object at 0x724fdaafbec0>, 'dag': <DAG: 02_basics_python_v1>, 'dag_run': <DagRun 02_basics_python_v1 @ 2025-01-01 00:00:00+00:00: scheduled__2025-01-01T00:00:00+00:00, state:running, queued_at: 2026-01-08 05:29:49.897075+00:00. externally triggered: False>, 'data_interval_end': DateTime(2025, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2025, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'outlet_events': <***.utils.context.OutletEventAccessors object at 0x724fbdd7d280>, 'ds': '2025-01-01', 'ds_nodash': '20250101', 'execution_date': <Proxy at 0x724fbdd8fdc0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'execution_date', DateTime(2025, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'inlet_events': InletEventsAccessors(_inlets=[], _datasets={}, _dataset_aliases={}, _session=<sqlalchemy.orm.session.Session object at 0x724fbdd41310>), 'logical_date': DateTime(2025, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/.local/lib/python3.12/site-packages/***/macros/__init__.py'>, 'map_index_template': None, 'next_ds': <Proxy at 0x724fbdd5cf00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'next_ds', None)>, 'next_ds_nodash': <Proxy at 0x724fbdd5dec0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'next_ds_nodash', None)>, 'next_execution_date': <Proxy at 0x724fbdd5db40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'next_execution_date', None)>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x724fbe39c340 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'prev_ds', None)>, 'prev_ds_nodash': <Proxy at 0x724fbdd39b40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'prev_ds_nodash', None)>, 'prev_execution_date': <Proxy at 0x724fbdd3aa80 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'prev_execution_date', None)>, 'prev_execution_date_success': <Proxy at 0x724fbe319cc0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'prev_end_date_success': None, 'run_id': 'scheduled__2025-01-01T00:00:00+00:00', 'task': <Task(PythonOperator): extract_data_task>, 'task_instance': <TaskInstance: 02_basics_python_v1.extract_data_task scheduled__2025-01-01T00:00:00+00:00 [running]>, 'task_instance_key_str': '02_basics_python_v1__extract_data_task__20250101', 'test_mode': False, 'ti': <TaskInstance: 02_basics_python_v1.extract_data_task scheduled__2025-01-01T00:00:00+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x724fbdde9b40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'tomorrow_ds', '2025-01-02')>, 'tomorrow_ds_nodash': <Proxy at 0x724fbddea000 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'tomorrow_ds_nodash', '20250102')>, 'triggering_dataset_events': <Proxy at 0x724fbdd98c00 with factory <function _get_template_context.<locals>.get_triggering_events at 0x724fbdd80f40>>, 'ts': '2025-01-01T00:00:00+00:00', 'ts_nodash': '20250101T000000', 'ts_nodash_with_tz': '20250101T000000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x724fbddea040 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'yesterday_ds', '2024-12-31')>, 'yesterday_ds_nodash': <Proxy at 0x724fbddea080 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x724fbdd83ec0>, 'yesterday_ds_nodash', '20241231')>, 'templates_dict': None} <class 'dict'> dict_keys(['conf', 'dag', 'dag_run', 'data_interval_end', 'data_interval_start', 'outlet_events', 'ds', 'ds_nodash', 'execution_date', 'expanded_ti_count', 'inlets', 'inlet_events', 'logical_date', 'macros', 'map_index_template', 'next_ds', 'next_ds_nodash', 'next_execution_date', 'outlets', 'params', 'prev_data_interval_start_success', 'prev_data_interval_end_success', 'prev_ds', 'prev_ds_nodash', 'prev_execution_date', 'prev_execution_date_success', 'prev_start_date_success', 'prev_end_date_success', 'run_id', 'task', 'task_instance', 'task_instance_key_str', 'test_mode', 'ti', 'tomorrow_ds', 'tomorrow_ds_nodash', 'triggering_dataset_events', 'ts', 'ts_nodash', 'ts_nodash_with_tz', 'var', 'conn', 'yesterday_ds', 'yesterday_ds_nodash', 'templates_dict']

  • 아래 테스크 수행 결과
  • extract_task   = PythonOperator(
        task_id         = "extract_data_task",
        python_callable = _extract_cb
    )
  • 로그
    • 수동 수행 -> 현재 날짜와 시간이 세팅이됨
    • 단, 현재 airflow의 기준시는 GMT 기준(런던 기준시)으로 세팅되어서 9시간 시차가 존재함
    [2026-01-08, 05:45:41 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
    [2026-01-08, 05:45:41 UTC] {02_basics_python.py:29} INFO - == Extract 작업 시작 ==
    [2026-01-08, 05:45:41 UTC] {02_basics_python.py:30} INFO - 작업시간 2026-01-08 , 실행 ID manual__2026-01-08T05:45:39.849134+00:00
    [2026-01-08, 05:45:41 UTC] {02_basics_python.py:31} INFO - == Extract 작업 끝 ==
    [2026-01-08, 05:45:41 UTC] {python.py:240} INFO - Done. Returned value was: Data_Extract_성공
    [2026-01-08, 05:45:41 UTC] {taskinstance.py:340} ▶ Post task execution logs
    

 

 
  • 아래 테스크 수행 결과 (XCom 통신)
  •   transform_task = PythonOperator(
          task_id         = "transform_data_task",
          python_callable = _transform_cb
      )
  • 로그
  • [2026-01-08, 06:08:20 UTC] {02_basics_python.py:49} INFO - == Transform 작업 시작 ==
    [2026-01-08, 06:08:20 UTC] {02_basics_python.py:50} INFO - Data_transform_성공 Data_Extract_성공
    -> "Data_Extract_성공" 전달된 데이터임
    [2026-01-08, 06:08:20 UTC] {02_basics_python.py:51} INFO - == Transform 작업 시작 ==
    [2026-01-08, 06:08:20 UTC] {python.py:240} INFO - Done. Returned value was: None
# 파이썬 로직을 task로 사용
# task간 통신을 통해서 상호 대화
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging  # 레벨별로 로그 출력 가능 (에러,경고, 디버깅, 정보 등등 레벨 지정)

def _extract_cb(**kwargs):
    '''
    ETL에서 Extract 담당
    :param kwargs: Airflow가 작업 실행하기 직전에 정보를 injection(주입) 해주는 위치
                   해당 내용은 Airflow 내부의 context(딕셔너리타입)라는 공간에 정보들이 저장되어있음 
                   Task 내부에서 Airflow의 정보들을 접근 및 사용 가능함
    '''
    # 1. airflow가 주입한 airflow context 에서 필요한 정보 추출
    # 'ti': <TaskInstance: ...> => 현재 작동중인 TaskInstance 객체를 의미함 (대시보드상 사각 박스)
    ti = kwargs['ti']
    # 'ds': '2025-01-01', 'ds_nodash': '20250101' 
    #  => 이 작업을 수행하기로 스케줄링 된 논리적인 날짜
    execute_date = kwargs['ds']
    # 'run_id': 'scheduled__2025-01-01T00:00:00+00:00'
    #  => 이번 실행의 고유한 ID => 로그 추적용
    run_id = kwargs['run_id']

    # 2. task 본연 업무 구현
    # print(kwargs,type(kwargs), kwargs.keys() )
    # 여기서는 로그만 남김
    logging.info('== Extract 작업 시작 ==')
    logging.info(f'작업시간 {execute_date}, 실행 ID {run_id} ')
    logging.info('== Extract 작업 끝 ==')

    # 3. 필요시 xcom을 통해서 특정 데이터를 다음 task로 전달함
    #    return 데이터 => 자동으로 XCom에 push 처리됨 (게시판에 글 등록됨)

    return "Data_Extract_성공"

def _transform_cb(**kwargs):
    '''
    ETL에서 transform 담당
    :param kwargs: 다양한 매개변수가 접근됨
    '''
    print(kwargs,type(kwargs), kwargs.keys() )
    print("Data_transform_성공")
    # return "Data_transform_성공"
    # 1. ti 객체 획득
    ti = kwargs['ti']
    
    # 2. Task 본연 업무 구현
    # extract_task에서 전달한 데이터 획득
    data = ti.xcom_pull(task_ids='extract_data_task')
    logging.info('== transform 작업 시작 ==')
    logging.info(f"Data_transform_성공 {data}")
    logging.info('== transform 작업 끝 ==')

with DAG(
    dag_id              = "02_basics_python_v1",   
    description         = "파이썬 task 구성 및 통신(xcom)", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=1) 
},       
    schedule_interval   = "@once",       # 수동으로 딱 한번 수행, 주기성 없음
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["python","xcom", "context"]    
) as dag:
    # 1. task 정의 (ETL을 고려하여 네이밍)
    extract_task   = PythonOperator(
        task_id         = "extract_data_task",  
        python_callable = _extract_cb
    )
    transform_task = PythonOperator(
        task_id         = "transform_data_task",  
        python_callable = _transform_cb
    )
    # 2. 의존성 (순서 작성)
    extract_task >> transform_task
    # [리뷰 실습] : load_task 구현하시오.
    #              의존성은 transform_task 실행 성공후 task 수행되게 구성
    #              주 업무 : transform_task에서 리턴한 값(문자열)을 받아서 로그 출력
    pass

아 맞다 load_task 실습이엇는데 안했다

03_basics_context_jinja.py

 

 

 

  • 아래 테스크 수행 결과
t1 = BashOperator(
        task_id      = 'template_used_bash',
        bash_command = 'echo "테스트 수행시간은 {{ ds }} {{ds_nodash}}"'
    )
  • 로그
[2026-01-08, 06:44:44 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-01-08, 06:44:45 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-01-08, 06:44:45 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "테스트 수행시간은 2026-01-08 20260108"']
[2026-01-08, 06:44:45 UTC] {subprocess.py:86} INFO - Output:
[2026-01-08, 06:44:45 UTC] {subprocess.py:93} INFO - 테스트 수행시간은 2026-01-08 20260108
[2026-01-08, 06:44:45 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2026-01-08, 06:44:45 UTC] {taskinstance.py:340} ▶ Post task execution logs
 

 

  • 아래 테스크 수행 결과
t2 = BashOperator(
        task_id      = 'template_macro_used_bash',
        # ETL에서 지난주(어제 혹은 지난달 등) 과거 데이터 조회 시 필수 패턴 중 하나
        # airflow.macros 패키지를 의미 => import 없이 바로 사용 가능함
        bash_command = 'echo "일주일 전 수행시간은 {{macros.ds_add(ds, -7)}} {{macros.random()}}" '
    )
  • 로그
[2026-01-08, 06:44:46 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-01-08, 06:44:46 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2026-01-08, 06:44:46 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "일주일 전 수행시간은 2026-01-01 0.8301081775529927" ']
[2026-01-08, 06:44:46 UTC] {subprocess.py:86} INFO - Output:
[2026-01-08, 06:44:46 UTC] {subprocess.py:93} INFO - 일주일 전 수행시간은 2026-01-01 0.8301081775529927
[2026-01-08, 06:44:46 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2026-01-08, 06:44:46 UTC] {taskinstance.py:340} ▶ Post task execution logs
 

 

  • 아래 테스크 수행 결과
t3 = PythonOperator(
        task_id         = "template_used_python",  
        python_callable = _print
    )
  • 로그
[2026-01-08, 06:44:48 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2026-01-08, 06:44:48 UTC] {03_basics_context_jinja.py:15} INFO - ds 출력 2026-01-08
[2026-01-08, 06:44:48 UTC] {03_basics_context_jinja.py:16} INFO - ds_nodashb 출력 20260108
[2026-01-08, 06:44:48 UTC] {python.py:240} INFO - Done. Returned value was: None
[2026-01-08, 06:44:48 UTC] {taskinstance.py:340} ▶ Post task execution logs
'''
airflow 내부에서 관리하는 context 정보를 jinja를 이용하여 접근 사용 예시
DAG 내에서 template 적용
'''
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import logging

# 실습 DAG 기본 골격 형태 구성
def _print(**kwargs):
    # airflow context는 함수 호출 시 airflow가 injection 하여 전달 -> kwargs
    # ds, ds_nodash 테스크 수행 시간
    logging.info(f'ds 출력 {kwargs['ds']}')
    logging.info(f'ds_nodashb 출력 {kwargs['ds_nodash']}')
    pass

with DAG(
    dag_id              = "03_basics_context_jinja_v1",   
    description         = "jinja 템플릿 적용, context 접근, 매크로 사용", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=5) 
},       
    # 초 단위(맨 앞), 년 단위(맨 뒤) 생략되었다면 => 아래 표기는 => 매일 오전 9시 0분에 실행
    schedule_interval   = " 0 9 * * * ", # cron 표기법
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["jinja" , 'macro' , 'context']    
) as dag:
    


    # 1. operator 생성
    #    대부분 오퍼레이터 내부에는 template_field 에 jinja가 세팅(허용)되어 있음
    #    {{context의 키 값}}
    t1 = BashOperator(
        task_id      = 'template_used_bash', 
        bash_command = 'echo "테스트 수행시간은 {{ ds }} {{ds_nodash}}"'
    )
    # import airflow.macros => 하위 함수들은 private 하게 처리되어이음 => ___ 표기
    t2 = BashOperator(
        task_id      = 'template_macro_used_bash', 
        # ETL에서 지난주(어제 혹은 지난달 등) 과거 데이터 조회 시 필수 패턴 중 하나
        # airflow.macros 패키지를 의미 => import 없이 바로 사용 가능함
        bash_command = 'echo "일주일 전 수행시간은 {{macros.ds_add(ds, -7)}} {{macros.random()}}" '
    )
    t3 = PythonOperator(
        task_id         = "template_used_python",  
        python_callable = _print
    )

    # 2. 의존성 관련 수행 순서 지정
    t1 >> t2 >> t3
    pass

이렇게 테스크 순서대로 시간을 출력한다.

 

분기 04_basics_branching.py

 

  • 파이썬 오퍼레이터의 등록된 함수 내부 연산 결과 다음을 진행될 TASK를 선택할 수 있음 (브런치 전략)
 

mysql 연동


  • 목표
    • 대한민국/서울 시간대로 전체 airflow 조정
    • docker-compose 상에 mysql 서비스 추가
      • 서비스 추가에 대한 전체 수정
    • etl + mysql (적재 혹은 추출)등 활용
 

  • 시간 보정
    • 기본 yaml 파일의 시간대는 런던 기준시임
    • 서울 시간대로 조정
    • docker-compose.yml
    x-airflow-common:
    &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.0}
    # build: .
    environment:
      &airflow-common-env
      
      # 시간대 조정
      TZ: 'Asia/Seoul'
      AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE:  'Asia/Seoul'
      
    

  • mysql 서비스 추가
    services:
    
    ...........
    
      mysql:
        image: mysql:8.0
        container_name: airflow-class-mysql
        environment:
          MYSQL_ROOT_PASSWORD: root
          MYSQL_DATABASE: my_data_warehouse
          MYSQL_USER: student
          MYSQL_PASSWORD: student
        ports:
          - "3306:3306"
        volumes:
          - mysql-db-volume:/var/lib/mysql
        healthcheck:
          test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
          interval: 10s
          timeout: 5s
          retries: 5
    
  • 볼륨 추가
volumes:
  postgres-db-volume:
  # mysql용 볼륨
  mysql-db-volume:
 

 

  • airflow의 DAG에서 접근 가능하게 설정
      # 시간설정
    
      # mysql 연결용 패키지 자동 설치
      _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} apache-airflow-providers-mysql
    
    
      .........
    
      # 기존 세팅값 주석 처리
      # _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    
 

  • 대시보드상에 mysql 컨넥션 설정
    • airflow에서 mysql 서비스의 접속 정보를 알려준다(세팅한다)
  • 위치
    • 첫화면 > Admin > Connections > [+] 클릭
      Connection Id: mysql_default (이름 중요!)
      Connection Type: MySQL
      Host: mysql (Docker 서비스 이름)
      Schema: my_data_warehouse
      Login: student
      Password: student
      Port: 3306
    

  • 커 컴포즈상 변경 내용 반영
    • docker compose down
    • docker compose up -d
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
import logging
import random
def _branch_check(**kwargs):
    '''
    특정 조건에 따른 분기 담당
    '''
    # 랜덤하게 선택(특정 목표가 없음)
    if random.choice([True, False]):
        logging.info('참 랜덤 선택, task_process로 이동')
        return 'process' # 이동하고싶은 task의 task_id값을 표기
    else:
        logging.info('거짓 랜덤 선택, task_skip로 이동')
        return 'skip'    # 이동하고싶은 task의 task_id값을 표기
    pass
def _process(**kwargs):
    logging.info("특정 업무 수행 성공")
    pass

with DAG(
    dag_id              = "04_basics_branching_v1",   
    description         = "분기 처리, 조건에 따른 선택적 task 구동", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=1) 
},       
    schedule_interval   = "@daily",
    start_date          = datetime(2025,1,1),                    
    catchup             = False,              
    tags                = ["branch", "trigger_rule"]    
) as dag:
    # operator
    task_start   = EmptyOperator(
        task_id = "start"
)       # 시작
    task_branch  = BranchPythonOperator(
        task_id = "branch_check",
        python_callable = _branch_check
    )   # 분기
    task_process = PythonOperator(
        task_id = "process",
        python_callable = _process
    
)       # 특정 업무
    task_skip    = EmptyOperator(
        task_id = "skip"
)        # 생략
    task_end     = EmptyOperator(
        task_id = "end",
        # NONE_FAILED_MIN_ONE_SUCCESS : 실패는 없고, 최소 1개는 성공했다
        # DAG는 모든 task는 성공해야 한다라는 기조로 작동하는 단위
        # 1개라도 중간에 실패하면 중단시킴 -> 전체 task를 그대로 생략해버림
        # 분기 진행 -> skip된 task가 반드시 발생 -> 모두 성공한다는 기조에 위배됨
        #          -> 특별한 설정이 없다면 모두 성공이라는 관점에 위배가 됨
        # NONE_FAILED_MIN_ONE_SUCCESS 
        # -> 전체 공정에 실패는 없었다. 최소 1개 이상은 성공했다 -> DAG는 신경쓰지 말고 그대로 실행해라

        # 결론. 브런치(분기기능) 사용시 필수값(기본값) trigger_rule 적용해야함
        # 값은 NONE_FAILED_MIN_ONE_SUCCESS 세팅
        # ALL_SUCCESS => skip을 실패로 간주함
        trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)        # 종료

    # 의존성 (3가지 방향성)
    # task_branch 수행 중 특정 조건값에 따라 task_process or task_skip으로 선택 이동
    task_start >> task_branch
    task_branch >> task_process >> task_end
    task_branch >> task_skip >> task_end
 

이렇게 분기를 나누어서 한개의 task가 실패하더라도 성공해서 작업을 수행할 수 있도록 할 수도 있다,

 


오늘의 수업은 여기까지. 였다 밀렸다