ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.22~23 | 74~75일차 [spark 마무리 | CDK | 미니프로젝트 시작]

Datadesigner 2026. 1. 23. 14:34

안녕하세요

 

22일과 23일은 강사님의 건강이슈로 미니프로젝트와 시간을 반씩 나누어서 진행했습니다

 

22일 오전 , SPARK 마무리

23일 오전 , CDK 

그 외 오후, 미니프로젝트

 

미니프로젝트는 아직 시작 전 단계이고 팀 빌딩, 개요와 업무 분장 배분 등의 내용만 진행해서 이틀 게시글을 하나로 합칩니다.

미니프로젝트는 추후 업로드 예정

 

22일 SPARK 마무리

# Spark 코드 (내일 추가)
# import logging
# logging.info('스파크를 이용한 대(용)량 데이터(TB~PB) 클리닝 처리')

# 스칼라의 컴파일러 JVM -> 자바식(카멜표기법 API 대부분임)
# JVM 컴파일러 사용하는 언어 : Java, Scalar, Kotlin(안드로이드개발)
# EMR에서 실행될 Spark 애플리케이션 코드
# 실습 준비:
# 이 파일을 S3의 스크립트 폴더에 업로드
# 경로: s3://<your-bucket>/scripts/data_cleaning.py
import sys
# 스파크를 이용한 데이터 전처리
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Airflow에서 넘겨받을 날짜 인자 (예: 2026-01-22)
# DAG에서 실행시 전달받은 인자값 ( {{ ds }} )을 획득
if len(sys.argv) > 1:
    TARGET_DATE = sys.argv[1]
else:
    raise ValueError("날짜 인자가 필요합니다. (YYYY-MM-DD)")

# S3 경로 설정 (자신의 버킷명으로 변경 필요)
BUCKET_NAME = "airflow-ai-en-3" 
INPUT_PATH  = f"s3://{BUCKET_NAME}/raw/dt={TARGET_DATE}/*.json"
OUTPUT_PATH = f"s3://{BUCKET_NAME}/processed/dt={TARGET_DATE}/"

def main():
    # 1. Spark 세션 생성
    spark = SparkSession.builder \
        .appName(f"Daily_Data_Cleaning_{TARGET_DATE}") \
        .getOrCreate() # 세션이 있으면 획득 없으면 생성

    # 2. 데이터 읽기 (Schema 강제 적용이 안전함)
    # 2-1. 스키마 설정 => 순서 유의 => []
    schema = StructType([
        StructField("event_id",   StringType(), True),
        StructField("user_id",    StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("price",      IntegerType(), True), # 데이터가 정수이므로 수정
        StructField("timestamp",  StringType(), True),  # 일단 String으로 읽고 변환
        StructField("os",         StringType(), True)
    ])
    # 2-2. s3 -> 스키마에 맞춰서 -> 데이터 읽기
    raw_df = spark.read.schema(schema).json(INPUT_PATH)    
    print(f"원본 데이터 건수: {raw_df.count()}")

    # 3. 데이터 정제 과정 (Transformation)
    # col('컬럼명') -> 특정 컬럼 타겟
    # filter(col("user_id").isNotNull()) : 널이 아닌것만 포함(필터링) -> 결측치 제거
    # filter(col("price") >= 0 : 양수만 필터링
    # withColumn("event_time", to_timestamp ... : 시간 포멧 변경
    # fillna({"event_type": "unknown", "os": "other"}) : 결측치 채우기, 컬럼명로 매칭
    # dropDuplicates(["event_id"]) : 중복 데이터 제거
    cleaned_df = raw_df \
        .filter(col("user_id").isNotNull()) \
        .filter(col("price") >= 0) \
        .withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
        .filter(col("event_time").isNotNull()) \
        .fillna({"event_type": "unknown", "os": "other"}) \
        .dropDuplicates(["event_id"])

    # 4. 파생 컬럼 추가 (ETL 시점 기록)
    # withColumn() : 파생컬럼, 전처리시간 세팅
    final_df = cleaned_df.withColumn("processed_at", current_timestamp())

    print(f"정제 후 데이터 건수: {final_df.count()}")

    # 5. S3에 저장 (Parquet 포맷 + Snappy 압축)
    # mode("overwrite"): 재실행 시 중복 방지를 위해 덮어쓰기
    # parquet 포멧으로 Snappy(기본값) 압축
    # 하루에 한번이라는 컨셉, 하루에 여러번이라면->파일명에 시간추가되어야함
    final_df.write \
        .mode("overwrite") \
        .parquet(OUTPUT_PATH)

    print("작업 완료")
    # 6. 세션 종료
    spark.stop()

if __name__ == "__main__":
    main()

 

이 파일을 S3상에 업로드하면 이 코드를 SPARK_PATH로 읽어와서 읽기, 정제, 처리 작업을 수행한다.

 

그렇게 클러스터가 작업 수행시에만 켜지고, 종료되는 기능을 수행하며 자원을 아끼고, S3에 적재되는 기능을 가진 AIRFLOW를 구성하였다. 결과물이 없다 안되가지고 하하

 


CDK

# 설치
## 구조
/
L .env                  # AWS 엑세스 키, 리전 정보
L .gitignore            # git에 포함되지 않는 파일
L docker-compose.yaml   # CDK 구동을 위한 환경, 컨테이너 구성(NODE 기반 작동)
L Dockerfile            # CDK 컨테이너 구성을 위한 이미지 설정 

## .env
- git 커밋 금지
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=

## Dockerfile
- python 기반 이미지 => nodejs 설치, aws cli 구성

## docker-compose.yaml
- 구성
```
cd cdk
docker compose up -d --build
```
# aws=cdk 컨테이너 접속
```
docker container exec -it aws-cdk bash

# 중요, 작업 디렉토리가 깨끗하게 비워있어야함
# cdk 프로젝트 기본 템플릿 구성 -> 자동으로 기본 프로젝트 완성됨
cdk init app --language python

# 불필요한 .git 삭제
rm -rf .git

# 의존성 설치
pip install -r requirements.txt
```

## 인프라 구성 코드 작성
-   \Iac\app\app_stack.py

## 배포
- code -. cloudformation -> 인프라 적용
- 절차
    - 합성 : 파이썬 코드를 cloudformation 템플릿(yaml)로 변환처리, 에러 없는지 체크
    ```
    cdk synth
    ```
    - 부트스트랩 :cdk 배포를 위한 초기 세팅 진행 --> 실패나면 cloudformation 에 파일이 생김(쓰레기, CDKToolkit)
    ```
    cdk bootstrap
    ```    
    - 배포 : 실제 리소스 생성
    ```
    cdk deploy
    ```
    - 삭제 : 삭제가 성공이 되도, 직접 리소스로 진입하여 확인 (간혹 오류 가능성)
    ```
    cdk destroy
    ```

CDK란?

  • CDK
    • 정의
      • AWS CDK(Cloud Development Kit)는 익숙한 프로그래밍 언어(python등..)를 사용하여 클라우드 애플리케이션 리소스를 모델링하고 프로비저닝할 수 있는 오픈 소스 소프트웨어 개발 프레임워크
    • 인프라스트럭처 코드(IaC) 도구
      • Terraform, Ansible, CloudFoudation
        • JSON이나 YAML 같은 정적 설정 파일을 사용
        • 테라폼 : 인프라구성, 엔서블 : 인프라에 배치된 서비스 내부 구성
        • 무엇이 필요한지 나열
        • 인프라의 최종 상태를 명확하게 정의하고 싶거나, 복잡한 로직이 필요 없는 간단한 리소스 관리에 적합
      • CDK
        • Python, TypeScript, Java, C# 등 범용 프로그래밍 언어의 힘을 빌려 인프라 코딩
        • 소스코드 → CloudFormation 템플릿으로 변환 → 배포
        • CloudFormation을 더 효율적으로 생성해주는 '생성기' 역할
        • 복잡한 리소스 설정(예: VPC, 서브넷, 라우팅 테이블 등)을 미리 정의된 클래스(Construct) 하나로 간단하게 구현
        • “어떻게 구성할지"를 논리적으로 작성
        • 복잡한 아키텍처를 구성하거나, 인프라를 모듈화하여 재사용하고 싶거나, 개발자 친화적인 환경(IDE, 테스트 도구 등)을 활용하고 싶을 때 적합
    • 비교
      • 3 Level 계층 구조
        • L1 (Cfn Resources): CloudFormation 리소스와 1:1로 매핑되는 로우 레벨 클래스입니다. 모든 속성을 직접 설정해야 합니다.
        • L2 (AWS Resources): 기본값과 편의 메서드가 포함된 서비스별 클래스입니다 (예: s3.Bucket). 가장 많이 사용됩니다.
        • L3 (Patterns): 여러 리소스를 묶어 일반적인 아키텍처 패턴을 구현한 최상위 모듈 (예: ecs_patterns.ApplicationLoadBalancedFargateService)

간단하게 코딩으로 AWS 서비스 구축하는 프레임워크다.

 

이제 실습이다.

Dockerfile

# Python 3.11 슬림 버전을 베이스로 사용
FROM python:3.11-slim

# 작업 디렉토리 설정
WORKDIR /app

# 1. 기본 패키지 설치 (curl, git, unzip, ca-certificates)
# --no-install-recommends: 불필요한 추천 패키지 설치 방지
RUN apt-get update && \
    apt-get install -y --no-install-recommends curl git unzip ca-certificates && \
    rm -rf /var/lib/apt/lists/*

# 2. Node.js 설치 (NodeSource 공식 스크립트 사용 - Node v20 LTS)
# 이 방식이 기본 apt-get install nodejs 보다 훨씬 안정적이고 CDK 호환성이 좋음
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && \
    apt-get install -y nodejs && \
    npm install -g aws-cdk && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# (선택사항) AWS CLI v2 설치 - 디버깅 용도로 유용함
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
    unzip awscliv2.zip && \
    ./aws/install && \
    rm -rf aws awscliv2.zip

# 컨테이너가 바로 종료되지 않고 계속 실행되도록 설정
CMD ["tail", "-f", "/dev/null"]

Dockerfile로 이미지를 생성해준다

docker-compose.yaml

version: '3.8'

services:
  cdk:
    container_name: aws-cdk
    # dockerfile의 위치 지정
    build: .
    # /app 하위에 작성되는 cdk 코드를 호스트pc에서 편집 가능하도록 공유
    volumes:
      - ./Iac:/app
    # .env 파일 내용을 환경변수로 주입
    env_file:
      - .env

 

생성한 이미지를 바탕으로 컨테이너를 올린다,

그러면 docker-compose에서 지정한 경로인 Iac폴더 안에 app 폴더가 생기고 cdk를 위한 코드들이 생성된다.

 

app_stack.py

'''
파이썬으로 인프라 구성 
- EC2, VPC 개별 배포
'''
from aws_cdk import (
    # Duration,
    # 애플리케이션 배포단위, 하나의 논리적 단위 -> 하위에 기술되는 모든 aws 리소스 모음
    Stack, 
    # aws_sqs as sqs,
    aws_ec2 as ec2, # ec2와 vpc 관련 모듈
)
from constructs import Construct

class AppStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
        # 1. VPC 생성 (클라우드 내 가상 네트워크)
        vpc=ec2.Vpc(self, "MyTestEasyVPC",
                    max_azs      = 2, # 가용영역 2개 설정(xxxa, xxxb)
                    nat_gateways = 0, # 비용 절감, 굳이 사용 x 
                    subnet_configuration = [
                        ec2.SubnetConfiguration(
                            name="PublicSubnet",
                            subnet_type=ec2.SubnetType.PUBLIC,
                            cidr_mask=24,
                        ),   
                    ])
        # 2. 보안그룹(방화벽) 설정
        security_group = ec2.SecurityGroup(self, "MyTestEasySG",
                    vpc=vpc,
                    description="ALL HTTP ACCESS", # 설명
                    allow_all_outbound=True,       # 모든 아웃바운드 공개
                    
                    )
        # 필요한 만큼 구성
        security_group.add_ingress_rule(
            ec2.Peer.any_ipv4(),         # 모든 IP(ipv4) 접근 가능
            ec2.Port.tcp(80),            # 80번 포트(http) 개방
            description="HTTP ACCESS anywhere", # 설명
        )
        # 3. EC2 인스턴스 생성
        ec2.Instance(self, "MyTestEasyEC2",
                    vpc=vpc,
                    # instance_type 비용에 직접적 연관("t2.micro)
                    instance_type=ec2.InstanceType("t2.micro"),
                    machine_image=ec2.MachineImage.latest_amazon_linux2023(), # 이미지
                    security_group=security_group,
                    vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), 
                    # nat 없이 PUBLIC 설정으로 외부 공개 허용
                    )

 

이 코드가 실제로 aws상에 ec2, vpc등을 만들어주는 기능을 수행하는 코드이다.

 

이제 readme에 있는 커맨드를 cdk 컨테이너 bash 창에서 입력해주면 자동으로 aws상에 ec2와 vpc가 생성되는것을 볼수있다.

## 배포
- code -. cloudformation -> 인프라 적용
- 절차
    - 합성 : 파이썬 코드를 cloudformation 템플릿(yaml)로 변환처리, 에러 없는지 체크
    ```
    cdk synth
    ```
    - 부트스트랩 :cdk 배포를 위한 초기 세팅 진행 --> 실패나면 cloudformation 에 파일이 생김(쓰레기, CDKToolkit)
    ```
    cdk bootstrap
    ```    
    - 배포 : 실제 리소스 생성
    ```
    cdk deploy
    ```
    - 삭제 : 삭제가 성공이 되도, 직접 리소스로 진입하여 확인 (간혹 오류 가능성)
    ```
    cdk destroy
    ```

순서대로 수행한다.

이렇게 자동으로 인스턴스가 생성된것을 확인할 수 있다!

그리고 마지막으로 cdk destroy를 입력하면 모두 다시 종료된다 자동화의 힘

 

이틀의 수업은 여기까지, 이제 미니프로젝트 시작이다