ASAC-SK플래닛 T아카데미 데이터 엔지니어

26.01.16 70일차 [Airflow | ELK_log_generator, reader]

Datadesigner 2026. 1. 18. 23:07

오늘은 airflow elk에 대한 실습을 진행했다.

 


AWS EMR

  • EMR, Ariflow, spark 를 이용한 추천 시스템이 필요한 데이터 파이프라인
 

AWS ELK stack - 로그 분석

  • 엘라스틱 서치(검색엔진), 키바나 대시보드, airflow, kafka(실시간 스트리밍)
    • 스마트 팩토리에 설치된 센서에서 실시간 수집되는 로그에 대한 데이터 파이프라인
 

  • ELK (스마트 팩토리 관점)
    • Elasticsearch : 검색엔진, 데이터를 검색엔진에 사용 되는 형태로 저장
      • 예) 수억건의 로그중 어제 15시경 특정 센서의 온도가 100도가 넘은(튀는값) 기록을 찾아라 -> 1초 미만으로 찾아낸다
    • Logstash
      • 수집된 데이터(로그)등에서 잡음 제거 -> 단위변환(전처리등) 수행하여 Elasticsearch에게 전달
    • Kibana : 시각화 -> 대시보드 -> 실시간 그래프, 지도, 계기판, 알람, ...
  • 흐름
    • 로그 수집(Beats) -> Logstash -> Elasticsearch -> Kibana

  • AWS에서 서비스 형태로 1:1 매핑
    • Beats <-> AWT IOT Core or Fluent Bit
    • Logstash <-> AWS Data Firehose or Fluent Bit
    • Elasticsearch <-> AWS OpenSearch Service
      • Elasticsearch 의 포크 버전 (특정 버전)
      • 저작권을 해결한 버전
    • Kibana <-> OpenSearch Dashboard

  • 로그 발생 (데이터 주기적으로 발생)
    • 서비스 : 스마트 팩토리, 이커머스, 금융, ...
      • 로그를 통해서 이상탐지, 장애를 미리 예측, 시그널을 체크 => 의사결정, 진단, 사전 조치 => 서비스 안정적 운영
    • 스마트 팩토리 시뮬레이션
      • 라즈베리파이(로그발생기) -> s3(데이터 lake) or Fluent Bit 곳을 대상으로 전송
      • 파이썬 프로그램(장비로 가정) -> s3(데이터 lake) or Fluent Bit 곳을 대상으로 전송
        • 파이썬 -> 텀을 구성 -> 반복문 or airflow 표현(스케줄링)
      • 데이터 발생 -> 저장
        • 실시간 -> Fluent Bit -> OpenSearch
          • 실시간:데이터발생 -> Kafka(실시간 스트리밍) -> Fluent Bit -> OpenSearch
          • 지연시간은 최소로 => 실시간이란 표현 가능함
        • 모아두었다가 -> 패치(특정시간 간격) : 한번에 쌓여있는것을 긁어서 벌크 형태로 한번에 처리
          • airflow
            • 새벽 2시에 패치
            • 새벽 4시에 분석 솔류션 전송(athena, redsfith)
            • 아침 7시 분석 레포트 구성하여 대시보드 제공 시스템 구성

 


 
  •  ETL/ELK 파이프라인 구축
    • Logstash : 자바
  • [v] ETL/EFK 파이프라인 구축
    • Fluent Bit : C
  • 실시간(kafka) 데이터처리 및 파이프라인 구축
  • 대용량 데이터(하둡/스파크) 파이프라인 구축
  • 데이터
    • RAG에 사용되는 자연어 => 임베딩 => 백터디비
    • RAG에 필요한 "자연어의 비중이 높아짐 데이터"에 대한 파이프라인
  • airflow(스케줄링, DAG을 이용한 파이프라인)
    • 서로 다른 작업들을( 다른 상황, 다른 시간대, 다른 인터벌을 가진)을 하나의 flow로 연결하는 서비스
  • s3 :
    • 원본 데이터(가공되기전)를 보관
    • 중간 과정에서 생성되는 데이터 보관/삭제

 

 

여기서는 먼저  aws opensearch를 들어가서 aws 변수를 설정해주는 밑작업이 필요하다.

 

opensearch에 들어가서 도메인을 생성해주고 대시보드로 들어가서 security > all_access를 해주면 된다.

 

도메인 생성
opensearch > 시큐리티 진입
airflow > connection [+]

 

이렇게 하면 밑작업은 끝난다.

 

그 이후 내용이다.

 

11_ELK_log_generator.py

 

'''
스마트 팩토리에 설치된 센서에서 발생된 로그를 aws opensearch 서비스에 전송하는 DAG
주기는 1 or 5분 간격으로 스케줄링 
- 파이썬 -> 1분 주기 -> OpenSearch
- 파이썬 -> 1분 주기(컨셉부여 or 발생즉시:실시간) -> s3(원본저장) / Fluent Bit(검색엔진으로 보내는 내용) 
        -> opensearch -> 검색 -> s3 저장 -> athena 분석 -> 대시보드 레포팅

- 설치 패키지
- opensearch-py  : 파이썬 레벨에서 직접 접속하여 검색엔진 접속 및 업무
- apache-airflow-providers-opensearch : airflow 기반에서 검색엔진 접속 및 업무
'''
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging 
import random
import time
# Generic 타입의 커넥션 정보 획득
from airflow.hooks.base import BaseHook
# 환경변수에서 획득
from airflow.models import Variable

# 2. 설정 정보 -> OpenSearch
conn = BaseHook.get_connection('opensearch_default')
host = Variable.get('OS_HOST') # deserialize_json=True) 뒤의 옵션은 JSON으로 형태로 출력됨



# 도메인 엔드포인트 (IPv4)
HOST = conn.host
AUTH = (conn.login, conn.password) # master 계정 정보 (숨길필요있지), .env나 airflow에 설정값 등
# A 공장 45구역의 모든 센서 데이터 인덱스 정보 -> 커스텀 설정
# 많은 로그들 중 아래 인덱스 정보로 검색할 수 있음
INDEX_NAME = 'a-factory-45x-sensor-v1' # 해당 데이터를 검색할 수 있는, 분류할 수 있는 인덱스 정보

# 파이썬 레벨 코드 (airflow 없이 가능)
def _send_log_task(**kwargs):

    logging.info('-'*20)
    logging.info(host)
    logging.info('-'*20)
    logging.info(conn.host)
    logging.info('-'*20)

    # 1. 오픈서치 모듈 가져오기   -> 에러 발생시 확실한 위치 확인 차원
    from opensearchpy import OpenSearch
    # 2. 오픈서치 클라이언트 연결 -> aws상에 오픈서치 관련 도메인 구성해야함
    client = OpenSearch(
        hosts                 = [{'host': HOST, 'port': 443}],
        http_compress         = True,
        http_auth             = AUTH,
        use_ssl               = True,
        verify_certs          = False,
        ssl_assert_hostname   = False,
        ssl_show_warn         = False
    )
    # 3. 인덱스 확인 (절차)
    # 해당 로그를 구분할 수 있는 표식 -> 인덱싱 수행 가능함
    # opensearch 상에 등록된 인덱스 값이 존재하는 체크 -> 없으면 생성 -> 1회성
    if not client.indices.exists(index=INDEX_NAME):
        # 없으면 생성
        client.indices.create(index=INDEX_NAME)
        logging.info(f"== 인덱스 생성 : {INDEX_NAME} ==")
    logging.info('가상의 센서 데이터 전송 (Batch 작업 : 특정 반복 주기로 진행, 실시간 x, 지연 존재)')
    # 4. 장비 고유값 정의 (n개의 센서의 고유값 미리 정의(문자열))
    oven_ids =['OVEN-001','OVEN-002','OVEN-003']

    # 5. 로그 발생 -> 장비별로 30회 로그를 임의발생 -> 전송 -> (회차별 장비 3개의 로그값 전송)
    MAX_LOOP = 30
    for i in range(MAX_LOOP):
        for oven in oven_ids:
            # 데이터 랜덤 생성
            temp = random.uniform(100,200) # 오븐 온도 생성
            # 임의 변조
            if random.random() > 0.95: # 5% 확률로 변조
                temp += random.uniform(30,50)
            # 데이터 구성
            doc = {
                'timestamp'   : datetime.now(), # 로그 발생 시간
                'oven_id'     : oven,
                'temperature' : round(temp,2), # 온도 (소수점 2자리까지)
                'vibration'   : round(random.uniform(0,1.5), 2), # 진동 레벨 임의 구성
                'status'      : 'DANGER' if temp > 230 else ' NORMAL' # 센서 감지상 위험 / 평시
            }
            # 시나리오
            # 특정 기간동안 특정 센서에서 DANGER 가 지속적으로 검색되면 -> 이상신호로 볼 수 있음
            
            # 6. 로그 전송
            client.index(
                index=INDEX_NAME,
                body=doc,
                refresh=True
            )
        # 전송률  로깅 -> 특정 팀 단로 진행 -> 5번에 한번씩 로깅
        if i % 2 == 0: # 2번중에 한번
            logging.info(f"== {i+1}번차 로그 발생 완료 ==")
        # 시간 임의 지연 -> 2초
        time.sleep(2)
    # 7. 배치 작업 완료
    logging.info("== n차 로그 발생 완료 ==")
    pass

with DAG(
    dag_id              = "11_ELK_log_generator",   
    description         = "파이썬 기반으로 가상으로 로그 발생", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=1) 
},       
    schedule_interval   = '*/2 * * * *', # 2분 간격
    start_date          = datetime(2026,1,1),                    
    catchup             = False,              
    tags                = ["ELK", "opensearch", "sensor", "smartfactory"]
) as dag:

    send_log_task = PythonOperator(
        task_id         = "send_log_task",  
        python_callable = _send_log_task
    )

 

먼저 ELK 작업을 위해선 로그가 필요하다, 이상치도 탐지하는 내용도 있으니까, 하지만 우린 로그가 없기 때문에 로그 생성 또한 AIRFLOW 를 사용하여 진행해주는 코드이다. 코드 설명은

 

Airflow] AWS OpenSearch(ELK)를 활용한 센서 데이터 수집 파이프라인

1. 개요

이번 글에서는 스마트 팩토리 시나리오를 가정하여, Airflow를 통해 가상의 IoT 센서 로그를 생성하고 이를 AWS OpenSearch Service (구 Elasticsearch)에 적재하는 파이프라인을 구축해 봅니다.

2. 주요 구성 요소

  • PythonOperator: 복잡한 Airflow 전용 Operator 대신, opensearch-py 라이브러리를 직접 사용하여 Python 코드로 유연하게 데이터를 생성하고 전송합니다.
  • OpenSearch Service: AWS에서 제공하는 완전 관리형 검색 엔진 서비스로, 대량의 로그 데이터를 실시간으로 인덱싱하고 검색/시각화(Kibana)하는 데 사용됩니다.
  • Airflow Connection & Variable: DB 접속 정보와 호스트 정보를 Airflow의 보안 저장소에서 안전하게 관리합니다.

3. DAG 상세 분석 (11_ELK_log_generator)

이 DAG는 2분 간격(*/2 * * * *)으로 실행되며, 스마트 팩토리의 'A 공장 45구역' 센서 데이터를 시뮬레이션합니다.

Task 1: 로그 생성 및 전송 (

 

send_log_task)

Python 함수 

 

_send_log_task 내부에서 다음과 같은 작업이 수행됩니다.

  1. OpenSearch 클라이언트 연결:
    • Airflow Connection (opensearch_default)에서 계정 정보를 가져오고, Variable (OS_HOST)에서 호스트 주소를 가져와 클라이언트를 초기화합니다.
    • verify_certs=False: 실습 환경의 편리함을 위해 SSL 인증서 검증을 건너뜁니다. (운영 환경에서는 보안 주의 필요)
  2. 인덱스 관리:
    • client.indices.exists: 지정된 인덱스 (a-factory-45x-sensor-v1)가 없으면 create 메소드로 자동 생성합니다. 이는 Logstash나 Fluent Bit 없이 애플리케이션 레벨에서 인덱스를 관리하는 방식입니다.
  3. 데이터 시뮬레이션 (ETL 중 Extract & Transform):
    • Loop: 총 30회 반복하며 데이터를 생성합니다.
    • Scenario: OVEN-001003 장비에서 온도가측정됩니다. 정상 범위(100200)와 이상 징후(DANGER, 230도 이상)를 5% 확률로 섞어서 생성합니다.
    • Delay: time.sleep(2)를 주어 실제 센서 데이터가 수집되는 시간 차이를 흉내 냅니다.
  4. 데이터 적재 (Load):
    • client.index: 생성된 JSON 문서(doc)를 OpenSearch에 전송합니다.
    • refresh=True: 실습 확인을 위해 데이터를 즉시 검색 가능하게 만듭니다. (대량 데이터 처리 시에는 성능 저하 원인이 될 수 있으므로 주의)

4. 로그 & 모니터링

  • Airflow 로그 상에서는 2회 반복마다 "== n번차 로그 발생 완료 ==" 메시지를 출력하여 작업 진행 상황을 보여줍니다.
  • 실제 OpenSearch 대시보드에서는 timestamp, oven_id, temperature, status 필드를 통해 실시간 차트를 그릴 수 있습니다.

5. 결론 및 응용

이 예제는 Logstash나 Fluentd 같은 별도의 수집 데몬 없이, Airflow의 PythonOperator만으로도 간단한 Serverless Log Ingestion 파이프라인을 구축할 수 있음을 보여줍니다. 배치 주기(1분, 5분)를 조절하여 준실시간(Near Real-time) 모니터링 시스템을 저비용으로 구축할 때 유용한 패턴입니다.

 

 

현재 코드는 그저 로그 발생용이다. 내용은 위와 같다.

 

*** Log file: detailed_log_elk_generator_2026-01-19.log
*** DAG: 11_ELK_log_generator
*** Tasks: send_log_task
[2026-01-19 00:09:00,100] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: 11_ELK_log_generator.send_log_task manual__2026-01-19T00:09:00+00:00 [queued]>
[2026-01-19 00:09:00,120] {taskinstance.py:1159} INFO - Starting attempt 1 of 1
[2026-01-19 00:09:00,121] {taskinstance.py:1160} INFO - Executing <Task(PythonOperator): send_log_task> on 2026-01-19T00:09:00+00:00
[2026-01-19 00:09:00,130] {logging_mixin.py:115} INFO - --------------------
[2026-01-19 00:09:00,131] {logging_mixin.py:115} INFO - opensearch-domain.ap-northeast-2.es.amazonaws.com
[2026-01-19 00:09:00,131] {logging_mixin.py:115} INFO - --------------------
[2026-01-19 00:09:00,131] {logging_mixin.py:115} INFO - opensearch-domain.ap-northeast-2.es.amazonaws.com
[2026-01-19 00:09:00,132] {logging_mixin.py:115} INFO - --------------------
[2026-01-19 00:09:00,500] {logging_mixin.py:115} INFO - == 인덱스 생성 : a-factory-45x-sensor-v1 ==
[2026-01-19 00:09:00,501] {logging_mixin.py:115} INFO - 가상의 센서 데이터 전송 (Batch 작업 : 특정 반복 주기로 진행, 실시간 x, 지연 존재)
[2026-01-19 00:09:04,550] {logging_mixin.py:115} INFO - == 1번차 로그 발생 완료 ==
[2026-01-19 00:09:08,600] {logging_mixin.py:115} INFO - == 3번차 로그 발생 완료 ==
[2026-01-19 00:09:12,650] {logging_mixin.py:115} INFO - == 5번차 로그 발생 완료 ==
[2026-01-19 00:09:16,700] {logging_mixin.py:115} INFO - == 7번차 로그 발생 완료 ==
[2026-01-19 00:09:20,750] {logging_mixin.py:115} INFO - == 9번차 로그 발생 완료 ==
[2026-01-19 00:09:24,800] {logging_mixin.py:115} INFO - == 11번차 로그 발생 완료 ==
[2026-01-19 00:09:28,850] {logging_mixin.py:115} INFO - == 13번차 로그 발생 완료 ==
[2026-01-19 00:09:32,900] {logging_mixin.py:115} INFO - == 15번차 로그 발생 완료 ==
[2026-01-19 00:09:36,950] {logging_mixin.py:115} INFO - == 17번차 로그 발생 완료 ==
[2026-01-19 00:09:41,000] {logging_mixin.py:115} INFO - == 19번차 로그 발생 완료 ==
[2026-01-19 00:09:45,050] {logging_mixin.py:115} INFO - == 21번차 로그 발생 완료 ==
[2026-01-19 00:09:49,100] {logging_mixin.py:115} INFO - == 23번차 로그 발생 완료 ==
[2026-01-19 00:09:53,150] {logging_mixin.py:115} INFO - == 25번차 로그 발생 완료 ==
[2026-01-19 00:09:57,200] {logging_mixin.py:115} INFO - == 27번차 로그 발생 완료 ==
[2026-01-19 00:10:01,250] {logging_mixin.py:115} INFO - == 29번차 로그 발생 완료 ==
[2026-01-19 00:10:01,300] {logging_mixin.py:115} INFO - == n차 로그 발생 완료 ==
[2026-01-19 00:10:01,310] {taskinstance.py:1278} INFO - Marking task as SUCCESS. duration=61.189000

이러한 내용의 로그가 나오고 데이터가 쌓인다.

 

 

이후 코드가 진짜다.

 

 

11_elk_log_reader.py

 

'''
ELK 중 E(OpenSearch)만 현재 사용중, K는 대시보드, L은 미사용
- OpenSearch 접속 => 검색 => 결과 획득  
    => 집계 분석(Athena or redshift or [v]pandas or spark(EMR))
'''
# 1. 모듈 가져오기
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging 
import pandas as pd
from opensearchpy import OpenSearch
from airflow.hooks.base import BaseHook

# 2. 환경변수

conn        = BaseHook.get_connection('opensearch_default')
HOST        = conn.host
AUTH        = (conn.login, conn.password) 
INDEX_NAME  = 'a-factory-45x-sensor-v1' 

# 3-2. 실제 검색 -> 분석
def _analysis_task(**kwargs): # (**context), () 등 다양하게 매개변수를 표시함
    # 1. 검색엔진 접속
    logging.info('== 검색엔진 접속 ==')
    client = OpenSearch(
        hosts                 = [{'host': HOST, 'port': 443}],
        http_auth             = AUTH,
        use_ssl               = True,
        verify_certs          = True
    )
    # 2. 쿼리(질의) -> 최근 10분 내 데이터 가져오기
    logging.info('== 쿼리(질의) ==')
    # 질의 방식 -> 별도로 opensearch에 적합한 형태는 본적 없음 -> 제시함
    query ={
        "size" : 1000, # 1000개 문서만 가져와라
        "query": {
            "range":{
                "timestamp":{
                    "gte":"now-10m" # 최근 10분 이내 데이터 가져오기, 1000개까지만 (?)
                }
            }
        }
    }
    # 3. 결과 획득 및 체크(검색결과 x)
    logging.info('== 결과 획득 ==')
    res = client.search(index=INDEX_NAME, body=query)
    # 히트수 획득 -> 건수
    hits = res['hits']['hits']
    if not hits:
        print("= 조회된 건수가 없다 =")
        return
    else:
        print(f"= 조회된 건수가 있다 {len(hits)} =")
    # 조회결과를 -> s3로 보냄 (중간 결과물)
    # --------------------------------------------
    # 4. 전처리 : res(or s3) -> df로
    logging.info('== 전처리 ==')
    searching_data = [hit['_source'] for hit in hits] # [ {}, {}, {}, ...]
    df = pd.DataFrame(searching_data)
    # 5. 분석
    logging.info('== 분석 ==')
    # 오븐별(센서별) -> 집계 (groupby)
    analysis = df.groupby('oven_id').agg({
        'temperature': 'mean',
        'vibration': 'max',
        "status":"count"
    })
    print (analysis) # s3로 업로드 해야함 (중간결과물)
    # 6. 분석 결과 혹은 이상치 탐지(고온 230이상 위험 시그널 체킹)
    logging.info('== 이상치 탐지 ==')
    # status 컬럼이 DANGER였던 건 수 출력
    outlier = df[df['status'] == 'DANGER']
    if not outlier.empty:
        print(outlier)
        logging.info("== 이상 탐지됨 : 오븐 온도가 너무 높음! -> 오버쿠킹됨 -> \
맛의 일관성 훼손 -> 이 시간대 생산품 불량?==")
    pass
# 3. DAG 정의
with DAG(
    dag_id              = "11_ELK_log_reader",   
    description         = "opensearch에게 검색(질의)-> 결과 획득-> 분석", 
    default_args        = {
        'owner'           : 'de_1team_manager',  
        'retries'         : 1,                   
        'retry_delay'     : timedelta(minutes=1) 
},       
    schedule_interval   = '*/5 * * * *', # 5분 간격(실시간 아님, 2~ 2+5분 지연)
    start_date          = datetime(2026,1,1),                    
    catchup             = False,              
    tags                = ["ELK", "opensearch", "analysis", "reader"]
) as dag:

    
    # 3-1. 오퍼레이터 (검색)
    analysis_task = PythonOperator(
        task_id         = "analysis_task",  
        python_callable = _analysis_task
    )

 

[Airflow] AWS OpenSearch(ELK) 로그 분석 및 이상 감지 파이프라인

1. 개요

앞선 글에서 Log Generator를 통해 가상의 센서 데이터를 OpenSearch에 적재했습니다. 이번에는 그 데이터를 주기적으로 조회하여 분석하고, 이상 징후(DANGER)를 감지하는 Reader 파이프라인(11_ELK_log_reader)을 구축하고, 전체 그림을 완성해 봅니다.

2. Producer-Consumer 구조

이 시스템은 전형적인 생산자-소비자(Producer-Consumer) 패턴을 따릅니다.

  1. Producer (
     
    11_ELK_log_generator.py):
    • 2분마다 실행
    • OVEN-xx 장비의 온도, 진동 데이터를 생성
    • OpenSearch 인덱스(a-factory-45x-sensor-v1)에 적재 (Load)
  2. Consumer (
     
    11_ELK_log_reader.py):
    • 5분마다 실행
    • OpenSearch 인덱스에서 최근 10분 데이터를 조회 (Search)
    • Pandas로 집계 분석 및 이상치 탐지

3. DAG 상세 분석 (11_ELK_log_reader)

Task 1: 검색 및 분석 (

 

analysis_task)

  • 검색 (Extraction):
    • opensearch-py 클라이언트로 접속하여 쿼리를 날립니다.
    • Range Query: timestamp: { "gte": "now-10m" } 조건을 통해 최근 10분간의 데이터만 가져옵니다. 배치 주기가 5분이므로, 중복을 허용하면서 안전하게 데이터를 커버하는 전략입니다.
  • 전처리 (Transformation):
    • hits['_source'] 리스트 컴프리헨션으로 순수 데이터만 추출합니다.
    • Pandas DataFrame으로 변환하여 분석 편의성을 높입니다.
  • 분석 (Aggregations):
    • groupby('oven_id'): 오븐별로 그룹화합니다.
    • agg(...): 온도 평균(mean), 진동 최대값(max) 등을 계산하여 장비 상태를 요약합니다.
  • 이상 감지 (Anomaly Detection):
    • df['status'] == 'DANGER' 필터링을 통해 위험 신호를 포착합니다.
    • 위험 신호 발견 시 로그에 경고 메시지를 남깁니다. (실무에서는 Slack/Email 알림으로 확장 가능)

5. 결론

Airflow를 단순한 ETL 도구를 넘어 경량 분석 플랫폼으로 활용한 사례입니다.

  • Generator: 데이터 소스가 없는 개발/테스트 환경에서 유용한 시뮬레이션 머신
  • Reader: 별도의 분석 서버 없이 Airflow 워커에서 Pandas를 이용해 소규모 데이터(센서 로그)를 즉시 분석하고 모니터링
  • 확장성: 데이터 규모가 커지면 Reader 부분은 Spark(EMR)나 Athena 쿼리로 대체하여 Airflow의 부하를 줄일 수 있습니다.

 

위와 같다. 

 

- 예상 로그

*** Log file: detailed_log_elk_reader_2026-01-19.log
*** DAG: 11_ELK_log_reader
*** Tasks: analysis_task
[2026-01-19 00:15:00,100] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: 11_ELK_log_reader.analysis_task manual__2026-01-19T00:15:00+00:00 [queued]>
[2026-01-19 00:15:00,120] {taskinstance.py:1159} INFO - Starting attempt 1 of 1
[2026-01-19 00:15:00,121] {taskinstance.py:1160} INFO - Executing <Task(PythonOperator): analysis_task> on 2026-01-19T00:15:00+00:00
[2026-01-19 00:15:00,130] {logging_mixin.py:115} INFO - == 검색엔진 접속 ==
[2026-01-19 00:15:00,200] {logging_mixin.py:115} INFO - == 쿼리(질의) ==
[2026-01-19 00:15:00,210] {logging_mixin.py:115} INFO - == 결과 획득 ==
[2026-01-19 00:15:00,450] {logging_mixin.py:115} INFO - = 조회된 건수가 있다 30 =
[2026-01-19 00:15:00,451] {logging_mixin.py:115} INFO - == 전처리 ==
[2026-01-19 00:15:00,460] {logging_mixin.py:115} INFO - == 분석 ==
[2026-01-19 00:15:00,465] {logging_mixin.py:115} INFO - 
          temperature  vibration  status
oven_id                                 
OVEN-001       150.23       1.20      10
OVEN-002       165.45       0.85      10
OVEN-003       205.12       1.45      10
[2026-01-19 00:15:00,470] {logging_mixin.py:115} INFO - == 이상치 탐지 ==
[2026-01-19 00:15:00,475] {logging_mixin.py:115} INFO - 
      timestamp   oven_id  temperature  vibration  status
14  2026-01-19... OVEN-003       235.50       1.35  DANGER
28  2026-01-19... OVEN-003       241.20       1.40  DANGER
[2026-01-19 00:15:00,476] {logging_mixin.py:115} INFO - == 이상 탐지됨 : 오븐 온도가 너무 높음! -> 오버쿠킹됨 -> 맛의 일관성 훼손 -> 이 시간대 생산품 불량?==
[2026-01-19 00:15:00,480] {taskinstance.py:1278} INFO - Marking task as SUCCESS. duration=0.359000

[Airflow] AWS OpenSearch 기반 로그 분석 파이프라인 3줄 요약

  1. 생산자-소비자 패턴의 완성: Log Generator(생산자)가 OpenSearch에 적재한 센서 데이터를 Log Reader(소비자)가 주기적으로 읽어 분석하는 효율적인 순환 구조를 구축합니다.
  2. Pandas 기반 이상 감지: OpenSearch의 최근 10분 데이터를 조회한 뒤, Pandas로 오븐별 온도와 진동 데이터를 집계하여 'DANGER' 신호를 실시간으로 포착합니다.
  3. 경량 분석 플랫폼으로서의 확장성: Airflow를 단순 스케줄러를 넘어 데이터 시뮬레이션 및 모니터링 도구로 활용하며, 데이터 규모에 따라 Spark나 Athena로 유연하게 확장이 가능합니다.

airflow를 데이터 모니터링 도구로 사용하는 과정을 수행하였다. 그리고 opensearch에서는 대시보드도 제공하여서 

 

데이터를 대시보드로 확인할 수도 있다.

 

 

 

16 ~ 17일동안 airflow의 다양한 사용법에 대해서 알아봤다.

현재 11번까지 왔는데 약 18번까지 있다고 한다. 얼마나 더 많은 사용법이 있을까.

 

오늘의 수업은 여기까지