유하~ 오늘의 수업은
kafka connect 를 이용한 s3 적재 실습
logstash 사용한 기본 설정, 도커 컴포즈, 도커파일 세팅, logstash.conf 세팅
logstash 이용한 s3,opensearch 적재
이다.
kafka connect
kafka를 활용한 다양한 파이프라인 예시이다. 앞으로 우리가 진행해야할 workflow이기도 하다,
아래는 kafka connect를 활용한 실습 워크플로우다.
kafka connect의 기능은 주로 s3에 모든 데이터를 적재, 하는 내용정도이고 추후 세밀한 작업을 원하면 logstash 등을 사용해야한다.
이를 위해서 로그 생성 파일을 만들고 거기에 새로 배운 라이브러리 FAKER를 사용한다, FAKER 그는 신이야
web_log_producer.py
'''
웹 로그 발생 및 카프카 프로듀서 전송
'''
# 1. 모듈 가져오기
from kafka import KafkaProducer
import json
import time
import random
import logging
from datetime import datetime
from faker import Faker
import pendulum
# 2. kafka 연결
producer = KafkaProducer(
bootstrap_servers=['127.0.0.1:9092'], # docker 상에 존재하는 kafka 주소, localhost:9092
value_serializer=lambda x: json.dumps(x).encode('utf-8') # dict를 문자열로 -> 직렬화 과정 -. 인코딩
)
# 3. 전송
def send_msg():
'''
가상의 웹 접속(요청) 로그 생성
'''
fake = Faker()
while True:
# 3-1. 가상 웹 접속 로그
web_log = {
"ip" : fake.ipv4(),
"timestamp" : pendulum.now('Asia/Seoul').format('YYYY-MM-DD HH:mm:ss'),
"method" : fake.http_method(),
"url" : fake.url(),
"status_code" : random.random_int(200,500),
"user_agent" : fake.user_agent()
}
# 3-2. 토픽("bk-web_logs")을 구성하여 전송
producer.send("web_logs", value=web_log)
# logging.info(f"주문:{web_log['menu']} 수량:{web_log['count']}")
print(f"전송 :{web_log['ip']}")
# 3-3. 강제 전송 (버퍼 비움)
# producer.flush()
# 3-4. 잠시 대기 -> 주문 간격 랜덤하게 조정
time.sleep(1)
pass
if __name__ == '__main__':
send_msg()
이 코드를 실행시키면 데이터를 계속 만들어냄과 동시에 계속 데이터를 kafka로 전송시킨다,
docker-compose.yaml
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
# [중요] 리스너 프로토콜 정의 (내부용/외부용 구분)
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# [중요] 리스너 설정
# INTERNAL: Docker 내부망에서 29092 포트로 대기
# EXTERNAL: 호스트(외부)에서 9092 포트로 대기
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
# [중요] 클라이언트에게 알려줄 주소 (Advertised)
# kafka-ui 같은 컨테이너에는 'kafka:29092' 라고 알려줌
# 호스트(내 PC)에는 'localhost:9092' 라고 알려줌
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
# 브로커 간 통신은 내부망(INTERNAL) 사용
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# 브로커 내부에서 바인딩할 주소 (0.0.0.0:9092)
#KAFKA_LISTENERS: PLAINTEXT://:9092
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "factory-sensors:1:1"
depends_on:
- zookeeper
# kafka 메세지 전송, 토픽, 브로커, 등등 UI로 관리, 모니터링등 대시보드 서비스
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8081:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
depends_on:
- kafka
restart: always
# kafka-connect 서비스
# [추가됨] Kafka Connect (S3 적재 담당)
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.0
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
# 도커내 내부 통신 프로토콜명으로 카프카 서버 주소 세팅
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-group"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
# [추가된 부분] 단일 브로커 환경을 위한 복제 계수 설정 (기본값 3 -> 1)
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# [중요] AWS 자격증명 (실행 시 환경변수나 .env 파일로 주입 필요)
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
volumes:
- ./connect-plugins:/usr/share/confluent-hub-components
command:
- bash
- -c
- |
# S3 플러그인 설치 (부팅 시 자동 설치)
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.0
# 커넥트 실행
/etc/confluent/docker/run
depends_on:
- kafka
도커 컴포즈에 위 내용을 추가해준다. kafka에서 받는 부분도 수정해주고 ui 와 connect를 추가해준다.

- 패턴 등록 -> 커넥션 정보 등록
- kafka ui > kafka connect > create new connector
- GUI 방식 -> 불안정함
- 이름, 내용 등록
- 터미널/커맨드라인/파워쉘 등에서 직접 등록
```
curl.exe -X POST http://localhost:8083/connectors/ -H "Content-Type: application/json" -d "@s3_conn_config.json"
or
Invoke-RestMethod -Uri "http://localhost:8083/connectors/" -Method Post -ContentType "application/json" -InFile "s3_conn_config.json"

connector가 생긴 모습이다.
커넥터를 세팅한 후 web_log_producer.py를 돌려주면

s3에 데이터가 적재되는 모습을 볼 수 있다.

로그는 이렇게 나온다. 우리가 정한 스키마의 json 내용을 카프카 커넥터를 사용해서 s3에 적재하였다.
다음은 logstash이다.
logstash
이번에 실습할 내용이다, 이번에는 같은 로그를 s3와 opensearch에 동시에 보내는 과정을 수행할 예정이다.
## kafka 서버 -> logstash(ELK 활용) -> S3 AND 전처리(ETL) -> OpenSearch : 동시
## 구성
- logstash에 서비스 설치
- 패턴 구성 (동시 처리에 대한 패턴 정의)
- 1. s3 -> raw데이터 보관용
- 2. OpenSearch -> 검색 분석용
- 3. athena -> 분석용
- ....
- opensearch에 저장된 데이터 -> airflow를 통해서 특정 주기별로 후속 작업 진행
(배치 패턴)
## 구조
/
├─── docker-compose.yaml # [V] [유지] 컨테이너 구성 + [추가] 서비스에 logstash 추가
├─── web_log_producer.py # [V] [유지] 로그 발생기
├─── .env # [V] [유지] AWS 엑세스키 + [추가] OPENSEARCH 마스터 ID/PW
├─── pipeline
│ └─ logstash.conf # [ ][신규] 동시 전송 로직 구성
├─── logstash
│ └─ Dockerfile # [V][신규] logstash에 대한 이미지
├─── 기타 파일 유지 (*.py)
## .env 파일
```
AWS_ACCESS_KEY_ID=..
AWS_SECRET_ACCESS_KEY=..
AWS_OPENSEARCH_HOST=..
AWS_OEPNSEARCH_USER=..
AWS_OEPNSEARCH_PASS=..
```
이를 위해서 dockerfile을 만들어주고 logstash.conf도 만들어준다.
먼저 dockerfile이다.
dockerfile
FROM opensearchproject/logstash-oss-with-opensearch-output-plugin:8.6.1
# s3 플러그인 등등 설치 -> 제외
위 내용은 opensearch와 연결할 수 있는 logstash 이미지를 가져와서 사용하겠다고 도커에 명시하는 것이다.
위 코드를 읽고 docker-compose가 저 이미지를 가져와서 logstash를 build한다.
logstash.conf
# 입력
input{
# 카프카
kafka{
bootstrap_servers => "kafka:29092" # 내부용 통신 프로토콜 : 포트
topics => ["web_logs"] # 토픽, n개 될수 있음
codec => "json" # 데이터(메세지)의 형식
auto_offset_reset => "latest" # 최신 데이터부터 읽기
group_id => "logstash-aws-group-v1" # 처음부터 다시 메세지를 읽어야한다면, 필요조건
}
}
# logstash의 표준 문서에서 표기법 확인
# 필터 -> 원본(s3) -> 사본 구성 -> 전처리 -> opensearch용 표현
filter {
# 1. 들어오는 모든 데이터에 태그를 추가 ("raw_s3") -> 원본은 이대로 마무리
mutate {
add_tag => ["raw_s3"]
}
# 2. 데이터 복제 -> "processed_copy" 태그 추가
clone {
clones => ["processed_copy"]
}
# 3. 타입이 processed_copy라면
if "processed_copy" in [tags] {
mutate {
# 3-1. "raw_s3" 태그 제거
remove_tag => ["raw_s3"]
remove_tag => ["raw_s3"]
# "for_opensearch" 태그 추가
add_tag => ["for_opensearch"]
# "type" 필드 제거 "processed_copy"로 제거됨
remove_field => ["type"]
}
# 원본은 태그로 오직 "raw_s3"만 가짐
# 사본은 태그로 오직 "for_opensearch"만 가짐
# user_agent 필드값 파싱
useragent {
source => "user_agent"
target => "ua_parsed"
}
# 간단한 전처리 -> 불필요한 필드 제거 -> 용량 줄임 -> 비용 절감
mutate {
remove_field => ["user_agent", "[event][original]"]
}
}
}
# 출력, 조건을 달아서 각각의 출력 방향으로 데이터를 전송
# 데이터를 가공하여 특정 방향으로 데이터를 송출할 수 있음
output{
# 원본 데이터
# 어떤 데이터든 간에 태그값에 "raw_s3" 있으면 s3로 전송
if "raw_s3" in [tags] {
s3{
region => "eu-west-2"
bucket => "${AWS_S3_BUCKET}"
prefix => "raw_logs/%{+YYYY}/%{+MM}/%{+dd}/" # 저장될 경로
# 엑세스키, 시크릿키
access_key_id => "${AWS_ACCESS_KEY_ID}"
secret_access_key => "${AWS_SECRET_ACCESS_KEY}"
# 데이터 전송 시간, 크기, 형식
# 현재 데이터는 303bytes 크기임
size_file => 102400 # 100kb -> 버퍼 크기를 작게 구성 -> 빠르게 업로드(단, 데이터가 크면 쪼개짐 -> 더 느려짐)
time_file => 1 # 1분
codec => "json_lines"
}
}
if "for_opensearch" in [tags] {
# 가공(전처리된)데이터 (간단한 전처리)
opensearch{
# aws 엔드포인트
hosts => ["https://${AWS_OPENSEARCH_HOST}:443"]
# 인증정보
user => "${AWS_OPENSEARCH_USER}"
password => "${AWS_OPENSEARCH_PASS}"
# 인덱스 이름 부여 -> 인덱스 명 단위가 일 단위 적재
index => "web-logs-realtime-%{+YYYY.MM.dd}"
# 보안 정보
ssl => true
ssl_certificate_verification => true
# 기타 설정
manage_template => false
}
}
}
logstash.conf의 내용이다. 요약하자면
원본 데이터를 일단 한번 복사해서 두개로 만들고
원본에는 raw_s3 태그를, 복사본에는 processed_copy 태그를 넣어준다.
그리고 원본은 s3로 보내고 raw_s3 태그를 삭제한다, 복사본도 마찬가지로 수행한다.
는 과정을 담은 파일이다.
docker-compose.yaml
마지막으로 도커 컴포즈다,
위 내용은 conf를 컨테이너로 공유하여 logstash의 설정값을 부여하는 내용이다.
이렇게 모든 세팅을 마치면 원본은 s3에, 복사본은 opensearch에 저장이 되겠지?
이제 web_log_producer.py 파일을 실행시키면

일단 이런식으로 s3에 적재는 성공했다.
내용을 확인하면
상위 2개만
{"url":"http://williams.com/","tags":["raw_s3"],"status_code":239,"@version":"1","event":{"original":"{\"ip\": \"39.79.10.146\", \"timestamp\": \"2026-01-20 15:38:47\", \"method\": \"OPTIONS\", \"url\": \"http://williams.com/\", \"status_code\": 239, \"user_agent\": \"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 5.0; Trident/3.0)\"}"},"ip":"39.79.10.146","user_agent":"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 5.0; Trident/3.0)","timestamp":"2026-01-20 15:38:47","method":"OPTIONS","@timestamp":"2026-01-20T06:38:47.230610380Z"}
{"url":"http://www.roberson.com/","tags":["raw_s3"],"status_code":327,"@version":"1","event":{"original":"{\"ip\": \"95.237.34.169\", \"timestamp\": \"2026-01-20 15:38:48\", \"method\": \"PUT\", \"url\": \"http://www.roberson.com/\", \"status_code\": 327, \"user_agent\": \"Mozilla/5.0 (iPod; U; CPU iPhone OS 4_0 like Mac OS X; hne-IN) AppleWebKit/531.38.4 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6531.38.4\"}"},"ip":"95.237.34.169","user_agent":"Mozilla/5.0 (iPod; U; CPU iPhone OS 4_0 like Mac OS X; hne-IN) AppleWebKit/531.38.4 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6531.38.4","timestamp":"2026-01-20 15:38:48","method":"PUT","@timestamp":"2026-01-20T06:38:48.180197968Z"}
이렇게 이전 정보들이 s3에 적재된것을 볼 수 있다.
그리고 opensearch에서는 일단 작업시 개수가 새로고침때마다 늘어나는것으로 확인할 수 있고
추후 athena, redshift를 이용ㅇ해서 데이터를 확인할 수도 있다.
그리고 이제 다음 파트는 airflow로 이 과정을 자동화시키는 작업을 수행하려 한다.
오늘의 수업은 여기까지
티스토리는 왜 마크다운형식이 지원이 안되는거야