안녕하세요
22일과 23일은 강사님의 건강이슈로 미니프로젝트와 시간을 반씩 나누어서 진행했습니다
22일 오전 , SPARK 마무리
23일 오전 , CDK
그 외 오후, 미니프로젝트
미니프로젝트는 아직 시작 전 단계이고 팀 빌딩, 개요와 업무 분장 배분 등의 내용만 진행해서 이틀 게시글을 하나로 합칩니다.
미니프로젝트는 추후 업로드 예정
22일 SPARK 마무리
# Spark 코드 (내일 추가)
# import logging
# logging.info('스파크를 이용한 대(용)량 데이터(TB~PB) 클리닝 처리')
# 스칼라의 컴파일러 JVM -> 자바식(카멜표기법 API 대부분임)
# JVM 컴파일러 사용하는 언어 : Java, Scalar, Kotlin(안드로이드개발)
# EMR에서 실행될 Spark 애플리케이션 코드
# 실습 준비:
# 이 파일을 S3의 스크립트 폴더에 업로드
# 경로: s3://<your-bucket>/scripts/data_cleaning.py
import sys
# 스파크를 이용한 데이터 전처리
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Airflow에서 넘겨받을 날짜 인자 (예: 2026-01-22)
# DAG에서 실행시 전달받은 인자값 ( {{ ds }} )을 획득
if len(sys.argv) > 1:
TARGET_DATE = sys.argv[1]
else:
raise ValueError("날짜 인자가 필요합니다. (YYYY-MM-DD)")
# S3 경로 설정 (자신의 버킷명으로 변경 필요)
BUCKET_NAME = "airflow-ai-en-3"
INPUT_PATH = f"s3://{BUCKET_NAME}/raw/dt={TARGET_DATE}/*.json"
OUTPUT_PATH = f"s3://{BUCKET_NAME}/processed/dt={TARGET_DATE}/"
def main():
# 1. Spark 세션 생성
spark = SparkSession.builder \
.appName(f"Daily_Data_Cleaning_{TARGET_DATE}") \
.getOrCreate() # 세션이 있으면 획득 없으면 생성
# 2. 데이터 읽기 (Schema 강제 적용이 안전함)
# 2-1. 스키마 설정 => 순서 유의 => []
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("price", IntegerType(), True), # 데이터가 정수이므로 수정
StructField("timestamp", StringType(), True), # 일단 String으로 읽고 변환
StructField("os", StringType(), True)
])
# 2-2. s3 -> 스키마에 맞춰서 -> 데이터 읽기
raw_df = spark.read.schema(schema).json(INPUT_PATH)
print(f"원본 데이터 건수: {raw_df.count()}")
# 3. 데이터 정제 과정 (Transformation)
# col('컬럼명') -> 특정 컬럼 타겟
# filter(col("user_id").isNotNull()) : 널이 아닌것만 포함(필터링) -> 결측치 제거
# filter(col("price") >= 0 : 양수만 필터링
# withColumn("event_time", to_timestamp ... : 시간 포멧 변경
# fillna({"event_type": "unknown", "os": "other"}) : 결측치 채우기, 컬럼명로 매칭
# dropDuplicates(["event_id"]) : 중복 데이터 제거
cleaned_df = raw_df \
.filter(col("user_id").isNotNull()) \
.filter(col("price") >= 0) \
.withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
.filter(col("event_time").isNotNull()) \
.fillna({"event_type": "unknown", "os": "other"}) \
.dropDuplicates(["event_id"])
# 4. 파생 컬럼 추가 (ETL 시점 기록)
# withColumn() : 파생컬럼, 전처리시간 세팅
final_df = cleaned_df.withColumn("processed_at", current_timestamp())
print(f"정제 후 데이터 건수: {final_df.count()}")
# 5. S3에 저장 (Parquet 포맷 + Snappy 압축)
# mode("overwrite"): 재실행 시 중복 방지를 위해 덮어쓰기
# parquet 포멧으로 Snappy(기본값) 압축
# 하루에 한번이라는 컨셉, 하루에 여러번이라면->파일명에 시간추가되어야함
final_df.write \
.mode("overwrite") \
.parquet(OUTPUT_PATH)
print("작업 완료")
# 6. 세션 종료
spark.stop()
if __name__ == "__main__":
main()
이 파일을 S3상에 업로드하면 이 코드를 SPARK_PATH로 읽어와서 읽기, 정제, 처리 작업을 수행한다.
그렇게 클러스터가 작업 수행시에만 켜지고, 종료되는 기능을 수행하며 자원을 아끼고, S3에 적재되는 기능을 가진 AIRFLOW를 구성하였다. 결과물이 없다 안되가지고 하하
CDK
# 설치
## 구조
/
L .env # AWS 엑세스 키, 리전 정보
L .gitignore # git에 포함되지 않는 파일
L docker-compose.yaml # CDK 구동을 위한 환경, 컨테이너 구성(NODE 기반 작동)
L Dockerfile # CDK 컨테이너 구성을 위한 이미지 설정
## .env
- git 커밋 금지
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
## Dockerfile
- python 기반 이미지 => nodejs 설치, aws cli 구성
## docker-compose.yaml
- 구성
```
cd cdk
docker compose up -d --build
```
# aws=cdk 컨테이너 접속
```
docker container exec -it aws-cdk bash
# 중요, 작업 디렉토리가 깨끗하게 비워있어야함
# cdk 프로젝트 기본 템플릿 구성 -> 자동으로 기본 프로젝트 완성됨
cdk init app --language python
# 불필요한 .git 삭제
rm -rf .git
# 의존성 설치
pip install -r requirements.txt
```
## 인프라 구성 코드 작성
- \Iac\app\app_stack.py
## 배포
- code -. cloudformation -> 인프라 적용
- 절차
- 합성 : 파이썬 코드를 cloudformation 템플릿(yaml)로 변환처리, 에러 없는지 체크
```
cdk synth
```
- 부트스트랩 :cdk 배포를 위한 초기 세팅 진행 --> 실패나면 cloudformation 에 파일이 생김(쓰레기, CDKToolkit)
```
cdk bootstrap
```
- 배포 : 실제 리소스 생성
```
cdk deploy
```
- 삭제 : 삭제가 성공이 되도, 직접 리소스로 진입하여 확인 (간혹 오류 가능성)
```
cdk destroy
```
CDK란?
- CDK
- 정의
- AWS CDK(Cloud Development Kit)는 익숙한 프로그래밍 언어(python등..)를 사용하여 클라우드 애플리케이션 리소스를 모델링하고 프로비저닝할 수 있는 오픈 소스 소프트웨어 개발 프레임워크
- 인프라스트럭처 코드(IaC) 도구
- Terraform, Ansible, CloudFoudation
- JSON이나 YAML 같은 정적 설정 파일을 사용
- 테라폼 : 인프라구성, 엔서블 : 인프라에 배치된 서비스 내부 구성
- 무엇이 필요한지 나열
- 인프라의 최종 상태를 명확하게 정의하고 싶거나, 복잡한 로직이 필요 없는 간단한 리소스 관리에 적합
- CDK
- Python, TypeScript, Java, C# 등 범용 프로그래밍 언어의 힘을 빌려 인프라 코딩
- 소스코드 → CloudFormation 템플릿으로 변환 → 배포
- CloudFormation을 더 효율적으로 생성해주는 '생성기' 역할
- 복잡한 리소스 설정(예: VPC, 서브넷, 라우팅 테이블 등)을 미리 정의된 클래스(Construct) 하나로 간단하게 구현
- “어떻게 구성할지"를 논리적으로 작성
- 복잡한 아키텍처를 구성하거나, 인프라를 모듈화하여 재사용하고 싶거나, 개발자 친화적인 환경(IDE, 테스트 도구 등)을 활용하고 싶을 때 적합
- Terraform, Ansible, CloudFoudation
- 비교
- 3 Level 계층 구조
- L1 (Cfn Resources): CloudFormation 리소스와 1:1로 매핑되는 로우 레벨 클래스입니다. 모든 속성을 직접 설정해야 합니다.
- L2 (AWS Resources): 기본값과 편의 메서드가 포함된 서비스별 클래스입니다 (예: s3.Bucket). 가장 많이 사용됩니다.
- L3 (Patterns): 여러 리소스를 묶어 일반적인 아키텍처 패턴을 구현한 최상위 모듈 (예: ecs_patterns.ApplicationLoadBalancedFargateService)
- 3 Level 계층 구조
- 정의
간단하게 코딩으로 AWS 서비스 구축하는 프레임워크다.
이제 실습이다.
Dockerfile
# Python 3.11 슬림 버전을 베이스로 사용
FROM python:3.11-slim
# 작업 디렉토리 설정
WORKDIR /app
# 1. 기본 패키지 설치 (curl, git, unzip, ca-certificates)
# --no-install-recommends: 불필요한 추천 패키지 설치 방지
RUN apt-get update && \
apt-get install -y --no-install-recommends curl git unzip ca-certificates && \
rm -rf /var/lib/apt/lists/*
# 2. Node.js 설치 (NodeSource 공식 스크립트 사용 - Node v20 LTS)
# 이 방식이 기본 apt-get install nodejs 보다 훨씬 안정적이고 CDK 호환성이 좋음
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && \
apt-get install -y nodejs && \
npm install -g aws-cdk && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# (선택사항) AWS CLI v2 설치 - 디버깅 용도로 유용함
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install && \
rm -rf aws awscliv2.zip
# 컨테이너가 바로 종료되지 않고 계속 실행되도록 설정
CMD ["tail", "-f", "/dev/null"]
Dockerfile로 이미지를 생성해준다
docker-compose.yaml
version: '3.8'
services:
cdk:
container_name: aws-cdk
# dockerfile의 위치 지정
build: .
# /app 하위에 작성되는 cdk 코드를 호스트pc에서 편집 가능하도록 공유
volumes:
- ./Iac:/app
# .env 파일 내용을 환경변수로 주입
env_file:
- .env
생성한 이미지를 바탕으로 컨테이너를 올린다,
그러면 docker-compose에서 지정한 경로인 Iac폴더 안에 app 폴더가 생기고 cdk를 위한 코드들이 생성된다.
app_stack.py
'''
파이썬으로 인프라 구성
- EC2, VPC 개별 배포
'''
from aws_cdk import (
# Duration,
# 애플리케이션 배포단위, 하나의 논리적 단위 -> 하위에 기술되는 모든 aws 리소스 모음
Stack,
# aws_sqs as sqs,
aws_ec2 as ec2, # ec2와 vpc 관련 모듈
)
from constructs import Construct
class AppStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# 1. VPC 생성 (클라우드 내 가상 네트워크)
vpc=ec2.Vpc(self, "MyTestEasyVPC",
max_azs = 2, # 가용영역 2개 설정(xxxa, xxxb)
nat_gateways = 0, # 비용 절감, 굳이 사용 x
subnet_configuration = [
ec2.SubnetConfiguration(
name="PublicSubnet",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24,
),
])
# 2. 보안그룹(방화벽) 설정
security_group = ec2.SecurityGroup(self, "MyTestEasySG",
vpc=vpc,
description="ALL HTTP ACCESS", # 설명
allow_all_outbound=True, # 모든 아웃바운드 공개
)
# 필요한 만큼 구성
security_group.add_ingress_rule(
ec2.Peer.any_ipv4(), # 모든 IP(ipv4) 접근 가능
ec2.Port.tcp(80), # 80번 포트(http) 개방
description="HTTP ACCESS anywhere", # 설명
)
# 3. EC2 인스턴스 생성
ec2.Instance(self, "MyTestEasyEC2",
vpc=vpc,
# instance_type 비용에 직접적 연관("t2.micro)
instance_type=ec2.InstanceType("t2.micro"),
machine_image=ec2.MachineImage.latest_amazon_linux2023(), # 이미지
security_group=security_group,
vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
# nat 없이 PUBLIC 설정으로 외부 공개 허용
)
이 코드가 실제로 aws상에 ec2, vpc등을 만들어주는 기능을 수행하는 코드이다.
이제 readme에 있는 커맨드를 cdk 컨테이너 bash 창에서 입력해주면 자동으로 aws상에 ec2와 vpc가 생성되는것을 볼수있다.
## 배포
- code -. cloudformation -> 인프라 적용
- 절차
- 합성 : 파이썬 코드를 cloudformation 템플릿(yaml)로 변환처리, 에러 없는지 체크
```
cdk synth
```
- 부트스트랩 :cdk 배포를 위한 초기 세팅 진행 --> 실패나면 cloudformation 에 파일이 생김(쓰레기, CDKToolkit)
```
cdk bootstrap
```
- 배포 : 실제 리소스 생성
```
cdk deploy
```
- 삭제 : 삭제가 성공이 되도, 직접 리소스로 진입하여 확인 (간혹 오류 가능성)
```
cdk destroy
```
순서대로 수행한다.

이렇게 자동으로 인스턴스가 생성된것을 확인할 수 있다!
그리고 마지막으로 cdk destroy를 입력하면 모두 다시 종료된다 자동화의 힘
이틀의 수업은 여기까지, 이제 미니프로젝트 시작이다