ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.20 72일차 [kafka connect | logstash 사용 s3,opensearch 적재]

Datadesigner 2026. 1. 20. 17:08

유하~ 오늘의 수업은 

 

kafka connect 를 이용한 s3 적재 실습

 

logstash 사용한 기본 설정, 도커 컴포즈, 도커파일 세팅, logstash.conf 세팅

 

logstash 이용한 s3,opensearch 적재

 

이다.

 


kafka connect 

kafka를 활용한 다양한 파이프라인 예시이다. 앞으로 우리가 진행해야할 workflow이기도 하다,

# 향후 파이프라인
    - 센서|웹|IOT|로그,데이터 발생 -> kafka producer 전송 -> kafka 서버 적재
    - kafka 서버 -> kafka consumer 수신 ->
    - kafka 서버 -> kafka consumer 수신 -> s3 : 수신 후 업로드 -> 약간의 지연 발생
    - kafka 서버 -> kafka connect -> s3 : 즉시 업로드 -> 패턴 설계 (10개가 쌓이면 업로드)
    - kafka 서버 -> logstash(ELK 활용) -> s3 : 즉시 업로드 -> 패턴 설계 (10개가 쌓이면 업로드)
                            -> 전처리(ETL) -> OpenSearch
                            -> XXXX
                            -> XXXX
    - kafka 서버 -> Fluent Bit(EFK 활용) -> s3
                               -> ...
    - kafka 서버 -> Spark Structured Streaming (대규모 복잡한 가공처리, 전처리) -> s3
        - 단순하게 적재하는것이면 오버스펙임
        - kafka 서버 -> Spark Structured Streaming (AWS EMR) -> s3
 

 

아래는 kafka connect를 활용한 실습 워크플로우다.

 
# kafka > kafka connect > s3 : XX 데이터 실시간 업로드
## 특징
- kafka producer가 전송하는 데이터를 별도의 코드 없이 설정으로 s3에 업로드
- 설정에서 패턴 정의
- 데이터는 페이크데이터를 활용 (패키지 faker를 사용)
    - pip install Faker
- 로그 발생, 전송 등 포지션상 airflow 내부에 포함 x (별도 프로젝트로 진행)
- 데이터 - 웹 로그 - json 포멧
```
    ip, timestamp, method, url, status_code, user_agent
```

## 데이터 파이프라인
- 로그 발생 -> kafka producer 메세지 전송 -> kafka 서버 도달 -> kafka connect -> s3
    - docker-compose.yaml 서비스 추가 필요
        - kafka connect 추가 필요
    - 패턴 정의 -> 설정 파일 -> s3_conn_config.json (커스텀)
    - s3_conn_config.json 파일 내용
                    s3.region      : 본인 리전 수정
                    s3.bucket.name : 본인 버킷명 수정          
                ```
        - 대략적인 컨셉 = 데이터가 10개 모이면 s3에 업로드(버퍼링 10개로 잡음, 컨셉)
        ```
            "flush.size": "10",
        ```
        - 로그 10개가 아주 늦게 모이거나, 도달하지 않으면 전송이 안되는 상황을 고려하여 일정시간 지나면 자동 업로드 (인터벌)
            ```
            # 현재 s3_conn_config.json 파일에는 생략되어있음
            rotate.schedule.interval.ms : 시간당 (ms단위)
            ```
        - 버킷 (본인계정)
        - 버킷 패스 패턴
            ```
                "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
                버킷/토픽/토픽명/2026/01/20/10/....
            ```
    - 패턴 등록 -> 커넥션 정보 등록
        - kafka ui > kafka connect > create new connector
            - GUI 방식 -> 불안정함
            - 이름, 내용 등록
        - 터미널/커맨드라인/파워쉘 등에서 직접 등록
            ```
            curl.exe -X POST http://localhost:8083/connectors/ -H "Content-Type: application/json" -d "@s3_conn_config.json"

            or

            Invoke-RestMethod -Uri "http://localhost:8083/connectors/" -Method Post -ContentType "application/json" -InFile "s3_conn_config.json"
            ```
- kafka connect
    - 데이터(로그(도메인별), 뉴스, ...)가 발생하면 kafka producer를 통해서 여러 곳에서 동시 다발적으로 kafka에 전송 -> kafka connector를 통해서 -> s3에 적재 (Data Lake)
    - 만약, s3 적재하면서 다른 서비스에 추가로 저장 등 작업을 원하면 -> logstash, fluent bit 등 사용하여 동시에 멀티작업에 대한 구성이 필요함

 

kafka connect의 기능은 주로 s3에 모든 데이터를 적재, 하는 내용정도이고 추후 세밀한 작업을 원하면 logstash 등을 사용해야한다.

 

이를 위해서 로그 생성 파일을 만들고 거기에 새로 배운 라이브러리 FAKER를 사용한다, FAKER 그는 신이야

 

web_log_producer.py

'''
웹 로그 발생 및 카프카 프로듀서 전송
'''
# 1. 모듈 가져오기
from kafka import KafkaProducer
import json
import time
import random
import logging
from datetime import datetime
from faker import Faker
import pendulum
# 2. kafka 연결
producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:9092'], # docker 상에 존재하는 kafka 주소, localhost:9092 
    value_serializer=lambda x: json.dumps(x).encode('utf-8') # dict를 문자열로 -> 직렬화 과정 -. 인코딩
)

# 3. 전송
def send_msg():
    '''
    가상의 웹 접속(요청) 로그 생성
    '''
    fake = Faker()
    while True:
        # 3-1. 가상 웹 접속 로그 
        web_log = {
            "ip"            : fake.ipv4(),
            "timestamp"     : pendulum.now('Asia/Seoul').format('YYYY-MM-DD HH:mm:ss'),
            "method"        : fake.http_method(),
            "url"           : fake.url(),
            "status_code"   : random.random_int(200,500),
            "user_agent"    : fake.user_agent()
        }
        
        # 3-2. 토픽("bk-web_logs")을 구성하여 전송
        producer.send("web_logs", value=web_log)
        # logging.info(f"주문:{web_log['menu']} 수량:{web_log['count']}")
        print(f"전송 :{web_log['ip']}")
        # 3-3. 강제 전송 (버퍼 비움)
        # producer.flush()

        # 3-4. 잠시 대기 -> 주문 간격 랜덤하게 조정
        time.sleep(1)
        pass

if __name__ == '__main__':
    send_msg()

이 코드를 실행시키면 데이터를 계속 만들어냄과 동시에 계속 데이터를 kafka로 전송시킨다, 

 

docker-compose.yaml

 kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      # [중요] 리스너 프로토콜 정의 (내부용/외부용 구분)
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      
      # [중요] 리스너 설정
      # INTERNAL: Docker 내부망에서 29092 포트로 대기
      # EXTERNAL: 호스트(외부)에서 9092 포트로 대기
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      
      # [중요] 클라이언트에게 알려줄 주소 (Advertised)
      # kafka-ui 같은 컨테이너에는 'kafka:29092' 라고 알려줌
      # 호스트(내 PC)에는 'localhost:9092' 라고 알려줌
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      
      # 브로커 간 통신은 내부망(INTERNAL) 사용
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      # 브로커 내부에서 바인딩할 주소 (0.0.0.0:9092) 
      #KAFKA_LISTENERS: PLAINTEXT://:9092
      #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "factory-sensors:1:1"    
    depends_on:
      - zookeeper

  # kafka 메세지 전송, 토픽, 브로커, 등등 UI로 관리, 모니터링등 대시보드 서비스
  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8081:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
    depends_on:
      - kafka
    restart: always
  
 # kafka-connect 서비스
  # [추가됨] Kafka Connect (S3 적재 담당)
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      # 도커내 내부 통신 프로토콜명으로 카프카 서버 주소 세팅
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-group"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"

      # [추가된 부분] 단일 브로커 환경을 위한 복제 계수 설정 (기본값 3 -> 1)
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"


      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      # [중요] AWS 자격증명 (실행 시 환경변수나 .env 파일로 주입 필요)
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    volumes:
      - ./connect-plugins:/usr/share/confluent-hub-components
    command: 
      - bash 
      - -c 
      - |
        # S3 플러그인 설치 (부팅 시 자동 설치)
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.0
        # 커넥트 실행
        /etc/confluent/docker/run
    depends_on:
      - kafka

도커 컴포즈에 위 내용을 추가해준다. kafka에서 받는 부분도 수정해주고 ui 와 connect를 추가해준다.

kafka ui

아래 코드를 cli에 넣고 입력해야 kafka connect의 설정값이 잡힌다.
 
ui로 넣는 방법은 불안정해서 사용하지 않는다.

 

    - 패턴 등록 -> 커넥션 정보 등록
        - kafka ui > kafka connect > create new connector
            - GUI 방식 -> 불안정함
            - 이름, 내용 등록
        - 터미널/커맨드라인/파워쉘 등에서 직접 등록
            ```
            curl.exe -X POST http://localhost:8083/connectors/ -H "Content-Type: application/json" -d "@s3_conn_config.json"

            or

            Invoke-RestMethod -Uri "http://localhost:8083/connectors/" -Method Post -ContentType "application/json" -InFile "s3_conn_config.json"

 

 

connector가 생긴 모습이다.

 

커넥터를 세팅한 후 web_log_producer.py를 돌려주면

 

s3에 데이터가 적재되는 모습을 볼 수 있다.

로그는 이렇게 나온다. 우리가 정한 스키마의 json 내용을 카프카 커넥터를 사용해서 s3에 적재하였다.

 

다음은 logstash이다.

 


logstash

이번에 실습할 내용이다, 이번에는 같은 로그를 s3와 opensearch에 동시에 보내는 과정을 수행할 예정이다.

## kafka 서버 -> logstash(ELK 활용) -> S3 AND 전처리(ETL) -> OpenSearch : 동시
## 구성
    - logstash에 서비스 설치
    - 패턴 구성 (동시 처리에 대한 패턴 정의)
        - 1. s3         -> raw데이터 보관용
        - 2. OpenSearch -> 검색 분석용
        - 3. athena     -> 분석용 
        - ....
    - opensearch에 저장된 데이터 -> airflow를 통해서 특정 주기별로 후속 작업 진행
    (배치 패턴)

## 구조
/
├─── docker-compose.yaml # [V] [유지] 컨테이너 구성 + [추가] 서비스에 logstash 추가
├─── web_log_producer.py # [V] [유지] 로그 발생기
├─── .env                # [V] [유지] AWS 엑세스키 + [추가] OPENSEARCH 마스터 ID/PW
├─── pipeline
│    └─ logstash.conf    # [ ][신규] 동시 전송 로직 구성
├─── logstash
│    └─ Dockerfile       # [V][신규] logstash에 대한 이미지
├─── 기타 파일 유지 (*.py)

## .env 파일
```
AWS_ACCESS_KEY_ID=..
AWS_SECRET_ACCESS_KEY=..
AWS_OPENSEARCH_HOST=..
AWS_OEPNSEARCH_USER=..
AWS_OEPNSEARCH_PASS=..
```

 

이를 위해서 dockerfile을 만들어주고 logstash.conf도 만들어준다.

 

먼저 dockerfile이다.

dockerfile

 

FROM opensearchproject/logstash-oss-with-opensearch-output-plugin:8.6.1

# s3 플러그인 등등 설치 -> 제외

 

위 내용은 opensearch와 연결할 수 있는 logstash 이미지를 가져와서 사용하겠다고 도커에 명시하는 것이다.

 

위 코드를 읽고 docker-compose가 저 이미지를 가져와서 logstash를 build한다.

 

logstash.conf

# 입력
input{
    # 카프카
    kafka{
        bootstrap_servers => "kafka:29092"  # 내부용 통신 프로토콜 : 포트
        topics => ["web_logs"]              # 토픽, n개 될수 있음
        codec => "json"                     # 데이터(메세지)의 형식
        auto_offset_reset => "latest"       # 최신 데이터부터 읽기
        group_id => "logstash-aws-group-v1" # 처음부터 다시 메세지를 읽어야한다면, 필요조건
    }
}
# logstash의 표준 문서에서 표기법 확인
# 필터 -> 원본(s3) -> 사본 구성 -> 전처리 -> opensearch용 표현
filter {
    # 1. 들어오는 모든 데이터에 태그를 추가 ("raw_s3") -> 원본은 이대로 마무리
  mutate {
    add_tag => ["raw_s3"]
  }
  # 2. 데이터 복제 -> "processed_copy" 태그 추가
  clone {
    clones => ["processed_copy"]
  }
  # 3. 타입이 processed_copy라면 
  if "processed_copy" in [tags] {
    mutate {
    # 3-1. "raw_s3" 태그 제거
      remove_tag => ["raw_s3"]
      remove_tag => ["raw_s3"]
      # "for_opensearch" 태그 추가
      add_tag => ["for_opensearch"]
      # "type" 필드 제거 "processed_copy"로 제거됨
      remove_field => ["type"]
    }
    # 원본은 태그로 오직 "raw_s3"만 가짐
    # 사본은 태그로 오직 "for_opensearch"만 가짐
    # user_agent 필드값 파싱
    useragent {
      source => "user_agent"
      target => "ua_parsed"
    }
    # 간단한 전처리 -> 불필요한 필드 제거 -> 용량 줄임 -> 비용 절감
    mutate {
      remove_field => ["user_agent", "[event][original]"]
    }
  }
}

# 출력, 조건을 달아서 각각의 출력 방향으로 데이터를 전송
# 데이터를 가공하여 특정 방향으로 데이터를 송출할 수 있음
output{
    # 원본 데이터
    # 어떤 데이터든 간에 태그값에 "raw_s3" 있으면 s3로 전송
    if "raw_s3" in [tags] {
        s3{ 
            region => "eu-west-2"
            bucket => "${AWS_S3_BUCKET}"
            prefix => "raw_logs/%{+YYYY}/%{+MM}/%{+dd}/" # 저장될 경로
            # 엑세스키, 시크릿키
            access_key_id => "${AWS_ACCESS_KEY_ID}"
            secret_access_key => "${AWS_SECRET_ACCESS_KEY}"
            # 데이터 전송 시간, 크기, 형식
            # 현재 데이터는 303bytes 크기임
            size_file => 102400 # 100kb -> 버퍼 크기를 작게 구성 -> 빠르게 업로드(단, 데이터가 크면 쪼개짐 -> 더 느려짐)
            time_file => 1 # 1분
            codec => "json_lines"
        }
    }
    if "for_opensearch" in [tags] {
        # 가공(전처리된)데이터 (간단한 전처리)
        opensearch{
            # aws 엔드포인트
            hosts => ["https://${AWS_OPENSEARCH_HOST}:443"]
            # 인증정보
            user => "${AWS_OPENSEARCH_USER}"
            password => "${AWS_OPENSEARCH_PASS}"
            # 인덱스 이름 부여 -> 인덱스 명 단위가 일 단위 적재
            index => "web-logs-realtime-%{+YYYY.MM.dd}"
            # 보안 정보
            ssl => true
            ssl_certificate_verification => true
            # 기타 설정        
            manage_template => false
        }
    }
}

 

logstash.conf의 내용이다. 요약하자면

원본 데이터를 일단 한번 복사해서 두개로 만들고

원본에는 raw_s3 태그를, 복사본에는 processed_copy 태그를 넣어준다.

 

그리고 원본은 s3로 보내고 raw_s3 태그를 삭제한다, 복사본도 마찬가지로 수행한다.

 

는 과정을 담은 파일이다.

 

docker-compose.yaml

logstash:
  # 도커 파일로부터 빌드 후 이미지를 대상으로 컨테이너 구성 -> --build (1회)
    build:
      context: ./logstash
    container_name: logstash
    # 설정 파일 (conf)를 외부에서 작성된것이 컨테이너 내부로 공유됨 -> logstash의 설정이 됨
    # 여기에 s3, opensearch에 동시에 업로드하는 구성이 세팅됨
    volumes:
      - ./pipeline/:/usr/share/logstash/pipeline/
    environment:
    # 자바로 구성된 프로그램 ->logstash -> 자바 힙 메모리 설정(운영)
      LS_JAVA_OPTS: "-Xmx512m -Xms512m"
      # AWS 접속 정보 -> .env를 알아서 읽어서 세팅함
      # S3 접속용
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      # [신규] AWS OpenSearch 접속용 환경변수 전달 (마스터 계정)
      AWS_OPENSEARCH_HOST: ${AWS_OPENSEARCH_HOST}
      AWS_OPENSEARCH_USER: ${AWS_OPENSEARCH_USER}
      AWS_OPENSEARCH_PASS: ${AWS_OPENSEARCH_PASS}
      # [신규] AWS S3 접속용 환경변수 전달
      AWS_S3_BUCKET: ${AWS_S3_BUCKET}
    depends_on:
      - kafka

 

마지막으로 도커 컴포즈다,

 

위 내용은 conf를 컨테이너로 공유하여 logstash의 설정값을 부여하는 내용이다.

 

이렇게 모든 세팅을 마치면 원본은 s3에, 복사본은 opensearch에 저장이 되겠지?

 

 

이제 web_log_producer.py 파일을 실행시키면

 

일단 이런식으로 s3에 적재는 성공했다.

내용을 확인하면

 

상위 2개만

{"url":"http://williams.com/","tags":["raw_s3"],"status_code":239,"@version":"1","event":{"original":"{\"ip\": \"39.79.10.146\", \"timestamp\": \"2026-01-20 15:38:47\", \"method\": \"OPTIONS\", \"url\": \"http://williams.com/\", \"status_code\": 239, \"user_agent\": \"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 5.0; Trident/3.0)\"}"},"ip":"39.79.10.146","user_agent":"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 5.0; Trident/3.0)","timestamp":"2026-01-20 15:38:47","method":"OPTIONS","@timestamp":"2026-01-20T06:38:47.230610380Z"}
{"url":"http://www.roberson.com/","tags":["raw_s3"],"status_code":327,"@version":"1","event":{"original":"{\"ip\": \"95.237.34.169\", \"timestamp\": \"2026-01-20 15:38:48\", \"method\": \"PUT\", \"url\": \"http://www.roberson.com/\", \"status_code\": 327, \"user_agent\": \"Mozilla/5.0 (iPod; U; CPU iPhone OS 4_0 like Mac OS X; hne-IN) AppleWebKit/531.38.4 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6531.38.4\"}"},"ip":"95.237.34.169","user_agent":"Mozilla/5.0 (iPod; U; CPU iPhone OS 4_0 like Mac OS X; hne-IN) AppleWebKit/531.38.4 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6531.38.4","timestamp":"2026-01-20 15:38:48","method":"PUT","@timestamp":"2026-01-20T06:38:48.180197968Z"}

이렇게 이전 정보들이 s3에 적재된것을 볼 수 있다.

 

그리고 opensearch에서는 일단 작업시 개수가 새로고침때마다 늘어나는것으로 확인할 수  있고

 

추후 athena, redshift를 이용ㅇ해서 데이터를 확인할 수도 있다.

 

 

그리고 이제 다음 파트는 airflow로 이 과정을 자동화시키는 작업을 수행하려 한다.


오늘의 수업은 여기까지

 

티스토리는 왜 마크다운형식이 지원이 안되는거야