ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.15 69일차 [airflow | athena query, sensor, ctas etl]

Datadesigner 2026. 1. 18. 23:07

오늘은 airflow athena를 본격적으로 이용하여 ctas 기능을 구현하는 실숩을 진행하였다.

 

 


AWS Athena

 

개요

 

  • 정의
    • Serverless Query => 분석용, 특정 데이터 추출가능
    • 서버리스 대화형 분석 서비스
  • 특징
    • 기존
      • 데이터 확인을 위해 파일 다운로드, DB에 적제 등등 과정을 거쳐야 함
      • 원시데이터(raw data)를 s3(data lake)에 저장 -> Athena통해 바로 SQL 쿼리 수행 -> 결과 획득
    • 역활
      • s3 : 모든 데이터가 다양한 형식으로 저장되어 있는 저장소
      • Athena : 위 저장소에서 필요한 데이터만 원하는 형태로 추출(쿼리)
        • csv -> 쿼리 -> 정형 데이터 획득
        • 특정 버킷 경로에 있는 모든 데이터에서 신용 점수 90점 이상이 되는 고객 데이터를 가져와줘
    • 포인트
      • 서버리스 -> 서버 설치 필요 없음
      • s3 데이터를 db에 옮길 필요없이 바로 추출가능
      • 사용한 만큼만 비용 지출
        • 동일하게 사용량이 반복적으로 많다면 비추
      • 쿼리 : SQL 그대로 사용
 

  • 세팅
    • Athena 서비스 진입
    • 왼쪽 메뉴 > 쿼리 편집기 클릭
    • 최초 진입시(계정별)
      • 상단에 알림창 발생
      • "쿼리 결과를 저장할 S3 위치 결정..." -> 설정편집 클릭
        • 설정 관리
        • BROWSE S3 클릭
        • 본인 버킷 선택
          • 사전 작업
            • 해당 버킷 > 폴더 만들기 > athena-results > 폴더 만들기 클릭 > s3 URL 복사
          • s3://airflow-ai-en-0/athena-results/
      • 저장 클릭
      • data 폴더 생성하여 임시 데이터 세팅 
      •   airflow-ai-en-0     L data       L athena_sample.csv
 

  • 쿼리 테스트
    • 데이터베이스 생성(필요시)
        create database if not exists de_0;
      
    • 테이블 생성
        CREATE EXTERNAL TABLE IF NOT EXISTS my_s3_data (
                id INT,
                name STRING,
                score INT,
                created_at STRING,
                result STRING
            )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY ','
        STORED AS TEXTFILE
        LOCATION 's3://airflow-ai-en-0/data/'
        TBLPROPERTIES ("skip.header.line.count"="1");
      
    • 쿼리 수행
        select * from my_s3_data;
      
 

[추가] RDB vs Athena

 

특징


  • rdb
    • RDB는 '기록과 관리(Transaction)'**에 최적화
  • Athena
    • '대용량 조회와 분석(Analytics)'**에 최적화
  • 결론
    • RDB
      • "지금 현재 주문 상태가 뭐야?" (빠르고 정확한 현재 상태 관리)
      • 행기반 데이터베이스
        • 일반 디비 제품, mysql, oracle, mssql,...
        • 대량 데이터 처리는 느림 -> 집계, 등등 위해서 대상 데이터를 모두 읽어야함
      • 열기반 데이터베이스
        • Amazon Redshift, Google Bigquery, Teradata, Snowflake, Cassandra, HBase
        • 특정 컬럼만 대상으로 쿼리 진행 => 속도처리가 빠름
    • Athena
      • "지난 1년치 로그 10TB를 분석해서 월별 추이를 뽑아줘." (저장된 파일 기반의 대규모 분석)
      • 비용 이득은 존재, 처리 속도는 편차 큼

 


Airflow상에서 사용시


  • RDB
    • Airflow에서 주로 메타데이터 관리나 소규모 중요 데이터 처리에 사용ㅣ
    • 작업 상태 및 메타데이터 저장
      • Airflow DAG가 실행될 때, "어디까지 처리했는지(Last Processed ID)"를 기록하거나 읽어올 때.
      • 작업의 결과 요약본(Summary)을 운영 팀이 보는 대시보드 DB에 적재할 때.
    • 트랜잭션 데이터의 추출 (Source)
      • 서비스 운영 DB(MySQL 등)에서 어제 자 주문 데이터를 SELECT 하여 데이터 레이크(S3)로 넘기는 작업을 수행할 때.
    • 데이터 마트 적재 (Destination)
      • 분석이 끝난 최종 결과 데이터(예: 일일 매출 리포트)를 BI 도구(Tableau, Superset)가 읽을 수 있도록 RDB에 INSERT 할 때.
    • 고려사항
      • Airflow Worker의 메모리를 고려
      • 대량의 데이터를 SELECT * 해서 Airflow 메모리로 가져오는 것(PythonOperator 내에서 처리)은 부적절
      • RDB 내부에서 연산을 끝내거나(PostgresOperator), S3로 덤프를 뜨는 방식(MySQLToS3Operator)을 사용해야 함

  • Athena
    • Airflow에서 무거운 데이터 변환(ELT) 및 로그 분석에 사용
    • Airflow 서버에 부하를 주지 않고 AWS 인프라를 활용
    • S3 데이터의 대규모 집계 (Serverless ETL)
      • 상황: 매일 100GB씩 쌓이는 로그 파일(JSON)이 S3에 있음.
      • Airflow 역할: AthenaOperator를 사용하여 SQL을 날림.
      • 작업: "S3의 원본 로그를 읽어서 → 에러만 필터링하고 → 날짜별로 그룹화해서 → 결과 S3에 Parquet로 저장(CTAS 쿼리)"
    • 이기종 데이터 조인
      • S3에 있는 "사용자 행동 로그"와 다른 경로에 있는 "상품 마스터 데이터(CSV)"를 조인하여 분석 테이블을 생성할 때.
    • 콜드 데이터(Cold Data) 조회
      • RDB에 넣기에는 너무 크고, 자주 보지 않는 1년 전 데이터를 조회해야 하는 배치 작업이 있을 때.
    • 고려사항
      • 파티셔닝(Partitioning): Athena는 스캔한 데이터 양만큼 비용이 발생
      • Airflow에서 쿼리를 날릴 때 반드시 WHERE date = '2024-01-01' 처럼 파티션을 지정하여 비용을 절감해야 함
      • CTAS 사용:
        • 결과를 단순히 조회(SELECT)만 하지 말고, CREATE TABLE AS SELECT (CTAS) 구문을 사용하여 결과를 다시 S3에 압축된 포맷(Parquet/ORC)으로 저장하는 것이 효율적임

airflow를 이용하여 아테나에 자동화 처리

 

 

  • 목표
    • ctas 기반 적용
    • etl
    • 아테나 쿼리 수행
    • 아테마 쿼리 결과 저장(압축형태 사용)
    • seneor를 이용한 자동 트리거 적용

 

  • 사전 작업
    • s3 
    •   개정별소유브런치    L data       L athena_sample.csv       L athena_sample2.csv   
    • airflow의 먹통성 때문에 s3상에 중간에 중단/작업했던 폴더나 파일이 있을 경우 -> 삭제후 진행
      • 항상 초기 상태를 유지하도록 관리
      • DAG 상에 특정 task에서 초기화(삭제등) 작업 필요
  • 구현
    • 10_aws_athena_ctas_etl.py
    • 10_aws_athena_query.py
    • 10_aws_athena_sensor.py
 

  • 10_aws_athena_query.py
    • DAG 구현 내용
      • airflow 통해 athena 기본 연동
      • 사전에 데이터베이스 존재해야 함
      • task 1
        • 테이블 생성 -> s3상에 특정 경로에 있는 csv 파일 대상(경로만 지정)
        • 경로상에 있는 데이터를 읽어서 테이블에 자동 입력
      • task 2
        • 기존 내용(테이블등)이 있다면 삭제(drop)
        • 먹통성 때문에 진행 -> 해당 결과물은 임시적으로 사용한다는 의미
      • task 3
        • 목적 -> 간단한 분석(집계) -> 결과를 특정 s3 위치 저장 or 특정 테이블을 생성 저장. 자징 내용은 압축하여 보관
      • 의존성
        • task 1 => task 2 => task 3
      • 스케줄 매일 1회 진행

 

 

그런데 지금 집에서 로컬로 하려니까 aws 상에서 받아온 키가 없어서 dag가 실행이 안된다. 어쩔수 없이 ai의 도움을 받아야 겠다.

 

안티그래비티는 신이다.

 

 

10_aws_athena_query.py

# 1. 모듈 가져오기
from datetime import datetime, timedelta 
import logging
from airflow import DAG
# 아테나 대상으로 오퍼레이터 작업, 센서
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor


# 2. 환경 변수(상수값), s3 버킷등 경로
BUCKET_NAME = "airflow-ai-en-3" # 사용자별 사용하는 버킷명
ATHENA_DB   = "de_3" # Athena에서 생성한 DB 이름
ALAYSIS_KEY = f's3://{BUCKET_NAME}/athena-results/' # 분석 결과를 담은 S3상의 경로 (키))
# 3. DAG 정의
with DAG(
    dag_id              = "10_aws_athena_query_v1",
    description         = "athena에 테이블 생성, 분석 등 요청, 결과 저장",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = '@daily', # 수동실행 (트리거)
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['aws', 'athena', 'query']
) as dag:


    # 3-1. TASK 1 구현
    basic_table_create_task = AthenaOperator(
        task_id     = 'basic_table_create_task',
        # 특정 저장소에서 있는 데이터 기반 테이블 생성, 구분자 , 이고 1번 라인은 헤더이고 생략
        # 저장되는 형식은 텍스트로 지정
        query       = f"""
        CREATE EXTERNAL TABLE IF NOT EXISTS s3_exam_csv (
                id INT,
                name STRING,
                score INT,
                created_at STRING,
                result STRING
        )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY ','
        STORED AS TEXTFILE
        LOCATION 's3://{BUCKET_NAME}/data/'
        TBLPROPERTIES ("skip.header.line.count"="1");
        """,
        database  =  ATHENA_DB,
        # 테이블 생성 후 csv를 읽어서 데이터를 테이블에 주입하고 나온 결과물을 저장할 위치 
        # 원 데이터를 가진 테이블, 분석 결과를 가진 테이블 모두 같은 위치에 저장
        output_location = ALAYSIS_KEY, 
        aws_conn_id = 'aws_default',
        )
    # 3-2. TASK 2 구현
    report_table_drop_task = AthenaOperator(
        task_id     = 'report_table_drop_task',
        # 일일 보고서를 계속해서 유지하고 싶다면 삭제 task는 생략 가능 -> rdb나 s3 보관 추천
        query           = "DROP TABLE if exists daliy_report_tbl;",
        database        = ATHENA_DB,
        output_location = ALAYSIS_KEY, 
        aws_conn_id     = 'aws_default',
    )
    # 3-3. TASK 3 구현
    report_table_create_task = AthenaOperator(
        task_id         = 'report_table_create_task',
        # s3_exam_csv 테이블을 대상 => result 컬러 기준 집계 =>
        # 결과 컬럼(result, 개수(count), 평균 점수(avg_score), 
        # 최소 점수(min_score), 최대 점수(max_score)) 테이블 daliy_report_tbl
        # PARQUET : 압축된 형태로 저장하는 형식 중 하나(파케이) 속도는 빠르고, 비용이 저렴 형식
        query       = f"""
            create table daily_report_tbl
            with (
                format = 'PARQUET',
                external_location = 's3://{BUCKET_NAME}/reports/'
            )
            AS
            select 
                result,
                count(score) as count, 
                avg(score) as avg_score, 
                min(score) as min_score, 
                max(score) as max_score
            from s3_exam_csv
            group by result;
    """,  
        database        = ATHENA_DB,
        output_location = ALAYSIS_KEY, 
        aws_conn_id     = 'aws_default',
    )

    # # 3-4. 의존성
    basic_table_create_task >> report_table_drop_task >> report_table_create_task

먼저 아테나 쿼리문을 작성하고 그 내용을 담아준다.

 

이 코드의 주된 기능은 

 

task 1. s3의 csv 파일을 읽기 위해서 먼저 csv 테이블을 만들어준다 = s3_exam_csv, 이는 더미데이터로 전달받았다.

task 2. 매일 업데이트되는 데이터를 가정하여, 기존에 테이블에 있는 내용을 삭제해준다.

task 3. s3_exam_csv를 이용해서 daily_report_tbl 테이블을 만들어준다.

이에 들어가는 내용은 합격, 인원수, 평균, 최소, 최대 점수이다.

 

이러한 쿼리를 만들어주고, 이제 센서 dag 차례이다.

 

10_aws_athena_sensor.py

 

# Athena의 쿼리가 완료되었다. 라는 시그널을 감지(시) 하고싶다면 사용 => AthenaSensor
# 앞선 쿼리 작업이 오래 걸릴때 센서를 붙여서 언젠가 끝나면 바로 다음 task 작동되게 시점 체크 가능
# 1. 모듈 가져오기
from datetime import datetime, timedelta 
import logging
from airflow import DAG
# 아테나 대상으로 오퍼레이터 작업, 센서
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor  

# 2. 환경 변수(상수값), s3 버킷등 경로
BUCKET_NAME   = "airflow-ai-en-3" 
ATHENA_DB     = "de_3" 
S3_OUTPUT_LOC = f's3://{BUCKET_NAME}/athena-outputs/' # 결과물 저장 위치

# 3. DAG 정의
with DAG(
    dag_id              = "10_aws_athena_sensor_v1",
    description         = "athena에 오퍼레이터 작업 완료에 대한 감지",
    default_args        = {
        'owner'          :'de_1team_manager',        
        'retries'        : 1,
        'retry_delay'    : timedelta(minutes=1)
    },    
    schedule_interval   = None, # 수동실행 (트리거)
    start_date          = datetime(2025,1,1),
    catchup             = False,
    tags                = ['aws', 'athena', 'sensor']
) as dag:
    # 3-1. 아테나 쿼리 실행 (80점 이상 점수를 받은 학생 명단 조회)
    run_query_task = AthenaOperator(
        task_id         = 'run_query_task',
        query           = "select * from s3_exam_csv where score >= 80",
        database        = ATHENA_DB,
        output_location = S3_OUTPUT_LOC, 
        aws_conn_id     = 'aws_default',
        # 쿼리 실행 아이디 => query_execution_id 를 반환 => XCom을 통해 다른 테스크에서 획득 가능
        do_xcom_push    = True
        )

    # 3-2. 아테나 센서(쿼리 상태를 감지 -> 신호가 오면 다음 작업 전개되게 구성)
    #      비동기적인 상황 -> 쿼리 수행 시간이 제각각임 (서버리스, athena 자체 처리 상황)
    #      TASK -> ATHENA 작업 -> 센서 배치 -> 신호가 감지되면 다음 작업 전개되게 구성
    sensor_task = AthenaSensor(
        task_id         = 'sensor_task',
        # XCom을 통해서 테스트 출력값 세팅 -> 세팅된다 -> 감지 -> 다움 단계 진행할 수 있다.
        # 앞에 코드 대비 장시간 대기할 때 유용함
        query_execution_id = "{{task_instance.xcom_pull(task_ids='run_query_task')}}",
        # 센서는 감시 활동 진행해야함
        poke_interval   = 10, # 10초에 한번씩 확인
        timeout         = 600, # 최대 대기 시간 - 10분 
        aws_conn_id     = 'aws_default',
    )
    # 의존성
    run_query_task >> sensor_task

 

이 파일은 직전 파일에서 테이블을 만드는것을 감지하고, 모니터링하는 작업을 수행한다.

 

이번 sensor 파일의 설명이다.

 

[Airflow] AthenaSensor를 활용한 비동기 쿼리 모니터링

1. 개요

이번 글에서는 Airflow의 AthenaOperator와 AthenaSensor를 조합하여, 긴 시간이 소요될 수 있는 Athena 쿼리 작업을 효율적으로 관리하는 패턴을 알아봅니다.

2. 왜 Sensor가 필요한가?

기본적으로 AthenaOperator도 쿼리 완료를 기다릴 수 있지만, Sensor를 분리하여 사용하는 패턴은 다음과 같은 이점이 있습니다.

  • 리소스 분리: Worker 슬롯을 오랫동안 점유하지 않고, Airflow의 Smart Sensor나 Deferrable Operator 모드 등을 활용하기 유리할 수 있습니다.
  • 명시적 대기: 비동기 처리가 필요한 상황에서 모니터링 로직을 명확히 분리할 수 있습니다.

3. DAG 상세 분석

10_aws_athena_sensor_v1 DAG는 크게 두 가지 태스크로 구성됩니다.

Task 1: 쿼리 실행 (run_query_task)

  • AthenaOperator를 사용하여 s3_exam_csv 테이블에서 점수가 80점 이상인 데이터를 조회합니다.
  • 핵심 설정 (do_xcom_push=True): 쿼리가 실행되면 AWS는 고유한 QueryExecutionId를 발급합니다. 이 ID를 다음 태스크인 센서에게 전달하기 위해 XCom 기능을 활성화합니다. 이렇게 하면 실행 ID가 XCom에 저장됩니다.

Task 2: 쿼리 상태 감지 (sensor_task)

  • AthenaSensor를 사용하여 앞선 태스크의 쿼리 완료 여부를 감시(Poke)합니다.
  • XCom 활용: query_execution_id 파라미터에 Jinja 템플릿 {{ task_instance.xcom_pull(task_ids='run_query_task') }}을 사용하여 앞 Task에서 전달받은 쿼리 ID를 주입받습니다.
  • 모니터링 주기:
    • poke_interval=10: 10초마다 상태를 확인합니다.
    • timeout=600: 최대 10분(600초)까지 기다리며, 초과 시 실패 처리됩니다.

4. 실행 흐름 시뮬레이션

  1. run_query_task 실행: Athena에 쿼리를 던지고, 즉시 반환된 QueryExecutionId(예: 5555aaaa...)를 XCom에 기록하고 종료됩니다. (Operator가 Wait 하지 않게 설정할 수도 있으나, 예제에서는 기본 동작 후 ID 반환)
  2. sensor_task 실행 시작: XCom에서 ID를 가져와서 모니터링을 시작합니다.
  3. Poke Loop:
    • 10초 대기 -> Athena 상태 확인 (RUNNING)
    • 10초 대기 -> Athena 상태 확인 (RUNNING)
    • ...
    • Athena 상태 확인 (SUCCEEDED) -> Task 성공

5. 마무리

이 구조를 활용하면, "쿼리 요청(Submit)"과 "완료 대기(Wait)"를 논리적으로 분리할 수 있어, 복잡한 데이터 파이프라인에서 작업 간의 의존성을 더 유연하게 설계할 수 있습니다.

 

마지막 ctas 파일과 연관지어서 설명하겠다.

 

10_aws_athena_ctas.py

 

 

'''
- 서비스 : 
    매일 온라인 (코딩테스트) 시험에 응시하는 특정 플랫폼 학생 데이터가 쌓여서 일과후에 모두 
    특정 DB에 쌓임
- ETL 처리 TASK가 해당 DB에 접속해서 18시(응시완료) 에 Extract(추출)하여서 
    s3 특정 공간에 저장 (csv 형태)
- 다른 task는 해당 csv를 대상으로 새로운 데이터를 기준으로 s3_exam_csv테이블을 삭제 
    -> 생성 -> 신규 데이터 주입
- 다음 날, 00시01분에 아래 스케줄 task1이 작동됨
'''
'''
일시적 분석 자료 -> athena(서버리스)를 통해서 진행 -> 분석결과, 테이블 내용 모두 s3애 있음
-> athena를 통해 쿼리 던지면, 가공되서 결과가 나옴

task 1 : 특정 s3 경로상에 데이터들 모두 삭제 -> 분석 데이터 삭제
         s3://{버킷}/athena/proccesed/pass_student/
         S3DeleteObjectsOperator 사용
         
task 2 : pass_student 정보를 가진 테이블 삭제 pass_student

task 3 : csv => table (s3_exam_csv) 대상으로 90점 이상 학생들 정보를 모두 추출하여 
         pass_student 테이블을 생성하고 저장 (컬럼 : id, name, score, created_at)
         
task 4 : task 3가 완료되었음을 센서를 붙여서 감시, 10초 간격, 최대 대기시간 10분

task 1 >> task 2 >> task 3 >> task 4
         
'''
'''
- 서비스 관리자는 아침 9시에 출근해서 -> 분석결과 대시보드로 조회 -> 의사결정
    - 문제 밸런스가 좋은가? 합격자 수가 너무 낮은데 등등 -> 문제 수준 조정
'''
# 1. 모듈 가져오기
from datetime import datetime, timedelta
import logging
from airflow import DAG
# 아테나 대상으로 오퍼레이터 작업, 센서
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator


# 2. 환경 변수(상수값), s3 버킷등 경로
BUCKET_NAME = "airflow-ai-en-3"
ATHENA_DB       = "de_3"
SRC_TABLE       = 's3_exam_csv'
TARGET_TABLE    = 'pass_student'

# s3상에 저장할 위치는 작업중 설정
# csv, 메타데이터가 저장됨
S3_TARGET_LOC   = f's3://{BUCKET_NAME}/athena/tbl/{TARGET_TABLE}/'
# athena의 쿼리 실행한 결과, 실행 로그 s3상의 저장위치
S3_QUERY_LOC    = f's3://{BUCKET_NAME}/athena/query_logs/'

# 3. DAG 정의
with DAG(
    dag_id="10_aws_athena_ctas_etl_v2",
    description="athena CTAS 작업",
    default_args={
        'owner': 'de_1team_manager',
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    },
    schedule_interval='@daily',  # 수동실행 (트리거)
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['aws', 'athena', 'ctas']
) as dag:
    # DAG 다시 시작하면 관련된 S3내 저장된 테이블, 내용을 모두 정리해줌 -> 클리어
    #     매번 가동시 깨끗한 환경 상태 유지 -> 멱등성 유지 -> airflow의 스타일 / 철학
    #     항상 동일하게 DAG 작업들이 무결하게 진행됨
    # 1. 기존 데이터 정리 (s3 상에 저장된)
    t1 = S3DeleteObjectsOperator(
        task_id     = 'clean_s3_target_task',
        bucket      = BUCKET_NAME,  # 버킷 이름
        # keys      = [..],     # 버킷 내 타겟들을 n개 지정 -> 풀경로 표기
        prefix      = f'athena/tbl/{TARGET_TABLE}/',
        aws_conn_id = 'aws_default' # 접속 정보
    )
    # 2. 타겟 테이블 정리 -> 메타 데이터 정리
    t2 = AthenaOperator(
        task_id     = 'drop_table_task',
        query           = f"DROP TABLE if exists {ATHENA_DB}.{TARGET_TABLE};",
        database        = ATHENA_DB,
        output_location = S3_QUERY_LOC,  # 로그 기록, 결과가 저장되는 위치
        aws_conn_id     = 'aws_default',
    )
    # 3. CTAS : csv -> table -> parguet 변환 및 저장
    #    테이블의 정보는 파케이 압축 형태를 적용
    #    PARQUET / ORC => 열,컬럼 기반 형태
    #    압축 : GZIP < LZO, ZSTD, SNAPPY, ... 
    query = f"""
        CREATE table {ATHENA_DB}.{TARGET_TABLE}
        with (
        format = 'PARQUET',
        parquet_compression = 'GZIP',
        external_location = '{S3_TARGET_LOC}') 
        as
        select id, name, score, created_at 
        from {ATHENA_DB}.{SRC_TABLE}
        where score >= 90
        order by score desc;
    """
    t3 = AthenaOperator(
        task_id         = 'create_parquet_table',
        query           = query,
        database        = ATHENA_DB,
        output_location = S3_QUERY_LOC, 
        aws_conn_id     = 'aws_default',
        do_xcom_push    = True
        )
    # 4. CTAS 작업이 완료되었는지 감지(감시) 
    # 센서는 XCom (게시판으로 비유)에 "create_parguet_table"이라는 이름으로 내용이 떴는지 체크
    t4 = AthenaSensor(
        task_id         = 'sensor_task',
        query_execution_id = "{{task_instance.xcom_pull(task_ids='create_parquet_table')}}",
        poke_interval   = 10, 
        timeout         = 600, 
        aws_conn_id     = 'aws_default',
    )
    # 의존성
    t1 >> t2 >> t3 >> t4

 

 

[Airflow] AWS Athena CTAS를 활용한 완전한 ETL 파이프라인 구축 및 종합

1. 개요

지금까지 살펴본 AthenaOperator의 기본 실행, AthenaSensor를 통한 비동기 감지 패턴을 모두 결합하여, 실무에서 사용 가능한 완전한 형태의 CTAS(Create Table As Select) ETL 파이프라인(10_aws_athena_ctas_etl_v2)을 구축합니다. 이 파이프라인은 멱등성(Idempotency)을 보장하며 대용량 데이터를 효율적으로 처리하는 모범 사례를 담고 있습니다.

2. Airflow + Athena 패턴 종합 비교

 

 

3. DAG 상세 분석 (10_aws_athena_ctas_etl_v2)

이 DAG는 "기존 데이터 삭제 -> 테이블 삭제 -> 재생성"의 순서를 통해 언제 재실행하더라도 동일한 결과를 보장하는 멱등성을 완벽하게 구현합니다.

Task 1: S3 데이터 정리 (clean_s3_target_task)

  • 역할: 이전 실행에서 생성된 데이터(Parquet 파일 등)를 S3에서 물리적으로 삭제합니다.
  • Why?: Athena에서 DROP TABLE을 해도 external_location에 있는 실제 S3 파일은 삭제되지 않습니다. 이를 방치하면 재실행 시 데이터가 중복되거나 꼬일 수 있습니다. S3DeleteObjectsOperator를 사용하여 이를 깔끔하게 청소합니다.

Task 2: 메타데이터 정리 (drop_table_task)

  • 역할: Athena 카탈로그(Glue Data Catalog)에서 테이블 정의를 삭제합니다.

Task 3: CTAS로 새 테이블 생성 (create_parquet_table)

  • 역할: 원본 s3_exam_csv에서 조건(90점 이상)을 만족하는 데이터를 추출하여 변환 후 pass_student 테이블로 저장합니다.
  • 최적화:
    • format = 'PARQUET': 컬럼 기반 저장소로 조회 성능 최적화
    • parquet_compression = 'GZIP': 압축을 통해 S3 용량 절약 및 I/O 비용 감소
  • 비동기 연동: do_xcom_push=True로 설정하여 쿼리 ID를 다음 센서에게 넘길 준비를 합니다.

Task 4: 완료 감지 (sensor_task)

  • 역할: 대용량 변환 작업이 끝날 때까지 대기합니다. 센서를 분리함으로써 Airflow 워커 슬롯을 효율적으로 사용할 수 있는 구조를 만듭니다.

4. 최종 시뮬레이션 흐름

  1. 청소: S3의 athena/tbl/pass_student/ 경로가 비워집니다.
  2. 초기화: pass_student 테이블 정의가 삭제됩니다.
  3. 변환/적재(ETL): Athena가 수 기가바이트의 CSV를 읽어 분석 후, 고도로 압축된 Parquet 파일로 변환하여 S3에 다시 씁니다. (Airflow는 이 작업을 트리거만 함)
  4. 완료: 센서가 작업 완료를 감지하면, 후속 Task(예: 대시보드 갱신 알림 등)로 이어질 준비가 끝납니다.

5. 결론


이 구조는 Serverless Data Lake 아키텍처의 전형적인 모습입니다. Airflow는 무거운 데이터 처리를 직접 하지 않고 AWS Athena에게 위임(Offloading)하며, 관리 감독(Orchestration) 역할에 집중함으로써 매우 가볍고 안정적인 데이터 파이프라인을 유지할 수 있습니다.

 

으 설명이 너무 길어서 긁어올 수 밖에 없다.

 

아무튼 요약해서 설명하자면 

 

  • 완벽한 멱등성 보장: S3DeleteObjectsOperator와 DROP TABLE을 결합해 재실행 시 데이터 중복이나 꼬임 없는 깨끗한 파이프라인을 구현합니다.
  • 성능과 비용의 최적화: 무거운 CSV 데이터를 Athena CTAS를 통해 고효율 압축 포맷인 Parquet로 변환하여 저장 용량은 줄이고 쿼리 속도는 획기적으로 높입니다.
  • 효율적인 오케스트레이션: Airflow는 무거운 연산을 직접 하지 않고 Athena에 위임(Offloading)하며, 센서(Sensor)를 활용해 리소스 낭비 없이 안정적으로 전체 공정을 관리합니다.

 

이것이 ctas의 기능이다. 매일매일 업데이트 되는 내용을 자동화하여 업무를 최소화하고, 멱등성을 보장하는 기능을 가진것이다.

 

오늘의 수업은 여기까지