← Theory 목록으로

Session 5: 실시간 데이터 처리

Kinesis, Lambda, MSK를 활용한 스트리밍 아키텍처

1. 스트림 처리 기본 개념

"실시간 데이터 처리는 비즈니스가 과거를 분석하는 것에서 현재에 대응하는 것으로 전환하게 한다."

— Stream Processing Principles

1.1 배치 vs 스트림 처리

┌─────────────────────────────────────────────────────────────────────┐
│                    배치 vs 스트림 처리                                │
│                                                                      │
│  배치 처리 (Batch Processing)                                        │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  데이터 축적                                                     ││
│  │  ████████████████████████████████████████                       ││
│  │  ─────────────────────────────────────────▶ 시간                ││
│  │                                           │                      ││
│  │                                           ▼                      ││
│  │                                    ┌─────────────┐              ││
│  │                                    │  일괄 처리  │              ││
│  │                                    │  (1시간 후) │              ││
│  │                                    └─────────────┘              ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  스트림 처리 (Stream Processing)                                     │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  연속적 데이터 흐름                                              ││
│  │  ●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──▶                 ││
│  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │                    ││
│  │  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼                    ││
│  │  즉시 처리 (밀리초~초 단위)                                      ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

배치 처리

  • • 유한한 데이터셋 (Bounded)
  • • 높은 처리량, 높은 지연
  • • 정확한 결과 보장
  • • 일/주/월 단위 분석

스트림 처리

  • • 무한한 데이터셋 (Unbounded)
  • • 낮은 지연, 연속 처리
  • • 근사치/점진적 결과
  • • 실시간 대시보드, 알림

1.2 윈도우 (Window) 개념

스트림 윈도우 유형
┌─────────────────────────────────────────────────────────────────────┐
│                    윈도우 유형                                        │
│                                                                      │
│  1. Tumbling Window (고정 윈도우)                                    │
│  ┌─────────┐┌─────────┐┌─────────┐┌─────────┐                       │
│  │ Window1 ││ Window2 ││ Window3 ││ Window4 │  겹치지 않음          │
│  │  5분    ││  5분    ││  5분    ││  5분    │                       │
│  └─────────┘└─────────┘└─────────┘└─────────┘                       │
│  ──────────────────────────────────────────────▶ 시간               │
│                                                                      │
│  2. Sliding Window (슬라이딩 윈도우)                                 │
│  ┌─────────────┐                                                    │
│  │   Window1   │                                                    │
│  └─────────────┘                                                    │
│     ┌─────────────┐                                                 │
│     │   Window2   │  윈도우 크기: 10분                              │
│     └─────────────┘  슬라이드: 5분                                  │
│        ┌─────────────┐                                              │
│        │   Window3   │  겹침 발생                                   │
│        └─────────────┘                                              │
│  ──────────────────────────────────────────────▶ 시간               │
│                                                                      │
│  3. Session Window (세션 윈도우)                                     │
│  ┌───────┐    ┌─────────────┐      ┌───┐                           │
│  │Session│    │   Session   │      │Ses│                           │
│  │   1   │    │      2      │      │ 3 │                           │
│  └───────┘    └─────────────┘      └───┘                           │
│  ●●●●    gap   ●●●●●●●●●●    gap    ●●                              │
│  ──────────────────────────────────────────────▶ 시간               │
│  활동 기반, 비활성 간격으로 구분                                     │
└─────────────────────────────────────────────────────────────────────┘

Tumbling

분당 집계, 시간당 통계

Sliding

이동 평균, 트렌드 분석

Session

사용자 세션 분석

1.3 이벤트 시간 vs 처리 시간

┌─────────────────────────────────────────────────────────────────────┐
│                    Event Time vs Processing Time                     │
│                                                                      │
│  Event Time: 이벤트가 실제로 발생한 시간                             │
│  Processing Time: 시스템이 이벤트를 처리하는 시간                    │
│                                                                      │
│  이벤트 발생        네트워크 지연        처리                        │
│       │                  │                │                          │
│       ▼                  ▼                ▼                          │
│  ┌─────────┐        ┌─────────┐      ┌─────────┐                    │
│  │ 10:00:00│───────▶│ 지연... │─────▶│ 10:00:05│                    │
│  │(Event T)│        │         │      │(Proc T) │                    │
│  └─────────┘        └─────────┘      └─────────┘                    │
│                                                                      │
│  문제: 늦게 도착한 이벤트 (Late Event)                               │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Window: 10:00 ~ 10:05                                          ││
│  │                                                                  ││
│  │  정상 이벤트: 10:02 발생 → 10:02 도착 ✓                         ││
│  │  지연 이벤트: 10:03 발생 → 10:07 도착 ✗ (윈도우 종료 후)        ││
│  │                                                                  ││
│  │  해결: Watermark로 허용 지연 시간 설정                           ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

💡 Watermark (워터마크)

워터마크는 "이 시간 이전의 모든 이벤트가 도착했다"고 가정하는 임계값입니다. 예: Watermark = 현재 시간 - 5분이면, 5분 이상 지연된 이벤트는 무시하거나 별도 처리합니다.

1.4 AWS 실시간 처리 서비스

서비스 비교
서비스엔진사용 사례관리 수준
Kinesis Data AnalyticsApache Flink복잡한 스트림 처리완전 관리형
MSK + FlinkKafka + FlinkKafka 에코시스템관리형
Lambda서버리스간단한 변환완전 관리형
Glue StreamingSpark Streaming마이크로배치완전 관리형

2. Amazon Managed Service for Apache Flink

Amazon Managed Service for Apache Flink(구 Kinesis Data Analytics for Apache Flink)는 완전 관리형 Apache Flink 서비스입니다. 복잡한 스트림 처리 로직을 구현할 수 있습니다.

2.1 Flink 아키텍처

Apache Flink 구조
┌─────────────────────────────────────────────────────────────────────┐
│                    Apache Flink Architecture                         │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                      Flink Application                           ││
│  │                                                                  ││
│  │  Source ──▶ Transformation ──▶ Transformation ──▶ Sink          ││
│  │    │              │                  │              │            ││
│  │    ▼              ▼                  ▼              ▼            ││
│  │ Kinesis        Filter             Window          S3            ││
│  │  Stream         Map               Aggregate      Kinesis        ││
│  │                 Join                             Redshift        ││
│  └─────────────────────────────────────────────────────────────────┘│
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    Flink Runtime                                 ││
│  │                                                                  ││
│  │  ┌─────────────┐    ┌─────────────────────────────────────────┐ ││
│  │  │ JobManager  │    │           TaskManagers                  │ ││
│  │  │             │    │  ┌─────────┐ ┌─────────┐ ┌─────────┐   │ ││
│  │  │ • 작업 조율 │    │  │  Task   │ │  Task   │ │  Task   │   │ ││
│  │  │ • 체크포인트│    │  │ Slot 1  │ │ Slot 2  │ │ Slot N  │   │ ││
│  │  │ • 장애 복구 │    │  └─────────┘ └─────────┘ └─────────┘   │ ││
│  │  └─────────────┘    └─────────────────────────────────────────┘ ││
│  │                                                                  ││
│  │  State Backend: RocksDB (대용량) / Heap (소용량)                ││
│  │  Checkpoint Storage: S3                                         ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

핵심 개념

  • Source: 데이터 입력 (Kinesis, Kafka)
  • Transformation: 데이터 변환
  • Sink: 데이터 출력 (S3, Redshift)
  • State: 상태 저장 (집계, 윈도우)

장점

  • • Exactly-once 처리 보장
  • • 밀리초 단위 지연
  • • 복잡한 이벤트 처리 (CEP)
  • • 자동 스케일링

2.2 Flink SQL 예시

// Flink SQL - 실시간 집계

-- 소스 테이블 정의 (Kinesis)
CREATE TABLE clickstream (
  user_id STRING,
  page_url STRING,
  event_type STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kinesis',
  'stream' = 'clickstream-events',
  'aws.region' = 'ap-northeast-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json'
);

-- 싱크 테이블 정의 (Kinesis)
CREATE TABLE page_views_per_minute (
  window_start TIMESTAMP(3),
  window_end TIMESTAMP(3),
  page_url STRING,
  view_count BIGINT
) WITH (
  'connector' = 'kinesis',
  'stream' = 'page-views-aggregated',
  'aws.region' = 'ap-northeast-2',
  'format' = 'json'
);

-- 실시간 집계 쿼리
INSERT INTO page_views_per_minute
SELECT 
  TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
  TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
  page_url,
  COUNT(*) as view_count
FROM clickstream
WHERE event_type = 'page_view'
GROUP BY 
  TUMBLE(event_time, INTERVAL '1' MINUTE),
  page_url;

2.3 Flink Java/Python API

// PyFlink - 이상 탐지 예시

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy

env = StreamExecutionEnvironment.get_execution_environment()

# Kinesis 소스
consumer = FlinkKinesisConsumer(
    "sensor-data",
    SimpleStringSchema(),
    Properties({
        "aws.region": "ap-northeast-2",
        "stream.initial.position": "LATEST"
    })
)

# 스트림 처리
stream = env.add_source(consumer) \
    .map(lambda x: json.loads(x)) \
    .assign_timestamps_and_watermarks(
        WatermarkStrategy
            .for_bounded_out_of_orderness(Duration.of_seconds(5))
            .with_timestamp_assigner(lambda e, _: e['timestamp'])
    )

# 이상 탐지: 5분 윈도우에서 평균 대비 3 표준편차 초과
alerts = stream \
    .key_by(lambda x: x['sensor_id']) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .process(AnomalyDetector())

# 알림 전송
alerts.add_sink(KinesisSink("anomaly-alerts"))

env.execute("Anomaly Detection")

2.4 Managed Flink 설정

// Terraform - Managed Flink Application

resource "aws_kinesisanalyticsv2_application" "flink_app" {
  name                   = "realtime-analytics"
  runtime_environment    = "FLINK-1_18"
  service_execution_role = aws_iam_role.flink.arn

  application_configuration {
    application_code_configuration {
      code_content {
        s3_content_location {
          bucket_arn = aws_s3_bucket.code.arn
          file_key   = "flink-app.jar"
        }
      }
      code_content_type = "ZIPFILE"
    }

    flink_application_configuration {
      checkpoint_configuration {
        configuration_type = "CUSTOM"
        checkpointing_enabled = true
        checkpoint_interval   = 60000  # 1분
        min_pause_between_checkpoints = 5000
      }

      monitoring_configuration {
        configuration_type = "CUSTOM"
        log_level          = "INFO"
        metrics_level      = "APPLICATION"
      }

      parallelism_configuration {
        configuration_type   = "CUSTOM"
        parallelism          = 4
        parallelism_per_kpu  = 1
        auto_scaling_enabled = true
      }
    }

    environment_properties {
      property_group {
        property_group_id = "FlinkApplicationProperties"
        property_map = {
          "input.stream.name"  = "clickstream-events"
          "output.stream.name" = "aggregated-events"
          "aws.region"         = "ap-northeast-2"
        }
      }
    }
  }
}

3. 실시간 분석 패턴

3.1 실시간 대시보드 아키텍처

실시간 대시보드 파이프라인
┌─────────────────────────────────────────────────────────────────────┐
│                    실시간 대시보드 아키텍처                           │
│                                                                      │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────────────┐  │
│  │ 이벤트  │───▶│ Kinesis │───▶│  Flink  │───▶│   OpenSearch    │  │
│  │  소스   │    │ Streams │    │ (집계)  │    │   (인덱싱)      │  │
│  └─────────┘    └─────────┘    └─────────┘    └────────┬────────┘  │
│                                                        │            │
│                                                        ▼            │
│                                               ┌─────────────────┐   │
│                                               │   OpenSearch    │   │
│                                               │   Dashboards    │   │
│                                               │   (시각화)      │   │
│                                               └─────────────────┘   │
│                                                                      │
│  대안 아키텍처 (QuickSight)                                         │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────────────┐  │
│  │ Kinesis │───▶│Firehose │───▶│   S3    │───▶│   QuickSight    │  │
│  │ Streams │    │(Parquet)│    │(Athena) │    │   (SPICE)       │  │
│  └─────────┘    └─────────┘    └─────────┘    └─────────────────┘  │
│                                                                      │
│  지연: 1분~5분 (Near Real-time)                                     │
└─────────────────────────────────────────────────────────────────────┘

OpenSearch 방식

  • • 초 단위 실시간 업데이트
  • • 복잡한 검색/필터링
  • • 로그 분석에 최적
  • • 비용: 인스턴스 기반

QuickSight 방식

  • • 분 단위 Near Real-time
  • • BI 기능 풍부
  • • 비즈니스 사용자 친화적
  • • 비용: 사용자 기반

3.2 실시간 이상 탐지

이상 탐지 패턴
┌─────────────────────────────────────────────────────────────────────┐
│                    실시간 이상 탐지 아키텍처                          │
│                                                                      │
│  ┌─────────┐    ┌─────────┐    ┌─────────────────────────────────┐  │
│  │ 센서    │───▶│ Kinesis │───▶│        Flink Application        │  │
│  │ 데이터  │    │ Streams │    │                                 │  │
│  └─────────┘    └─────────┘    │  ┌─────────────────────────────┐│  │
│                                │  │     이상 탐지 로직          ││  │
│                                │  │                             ││  │
│                                │  │  1. 통계적 방법             ││  │
│                                │  │     - Z-Score               ││  │
│                                │  │     - 이동 평균 편차        ││  │
│                                │  │                             ││  │
│                                │  │  2. 규칙 기반               ││  │
│                                │  │     - 임계값 초과           ││  │
│                                │  │     - 패턴 매칭             ││  │
│                                │  │                             ││  │
│                                │  │  3. ML 기반                 ││  │
│                                │  │     - SageMaker 연동        ││  │
│                                │  └─────────────────────────────┘│  │
│                                └──────────────┬──────────────────┘  │
│                                               │                      │
│                              ┌────────────────┼────────────────┐    │
│                              ▼                ▼                ▼    │
│                        ┌─────────┐      ┌─────────┐      ┌─────────┐│
│                        │   SNS   │      │ Lambda  │      │   S3    ││
│                        │ (알림)  │      │(자동화) │      │ (로그)  ││
│                        └─────────┘      └─────────┘      └─────────┘│
└─────────────────────────────────────────────────────────────────────┘

// Flink SQL - Z-Score 기반 이상 탐지

-- 5분 윈도우 통계 계산
CREATE VIEW sensor_stats AS
SELECT 
  sensor_id,
  TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
  AVG(value) as avg_value,
  STDDEV(value) as stddev_value,
  COUNT(*) as sample_count
FROM sensor_readings
GROUP BY 
  sensor_id,
  TUMBLE(event_time, INTERVAL '5' MINUTE);

-- 이상치 탐지 (Z-Score > 3)
INSERT INTO anomaly_alerts
SELECT 
  r.sensor_id,
  r.event_time,
  r.value,
  s.avg_value,
  s.stddev_value,
  ABS(r.value - s.avg_value) / NULLIF(s.stddev_value, 0) as z_score
FROM sensor_readings r
JOIN sensor_stats s 
  ON r.sensor_id = s.sensor_id
WHERE ABS(r.value - s.avg_value) / NULLIF(s.stddev_value, 0) > 3;

3.3 Lambda Architecture 구현

Lambda Architecture

Lambda Architecture는 배치 레이어와 스피드 레이어를 결합하여 정확성과 실시간성을 모두 달성합니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    Lambda Architecture                               │
│                                                                      │
│                         ┌─────────────┐                             │
│                         │   Source    │                             │
│                         │  (Kinesis)  │                             │
│                         └──────┬──────┘                             │
│                                │                                     │
│              ┌─────────────────┼─────────────────┐                  │
│              ▼                                   ▼                  │
│  ┌─────────────────────────┐      ┌─────────────────────────┐      │
│  │     Batch Layer         │      │     Speed Layer         │      │
│  │                         │      │                         │      │
│  │  Firehose → S3 → Glue   │      │  Flink (실시간 집계)    │      │
│  │                         │      │                         │      │
│  │  • 전체 데이터 저장     │      │  • 최근 데이터만 처리   │      │
│  │  • 일/시간 단위 배치    │      │  • 초 단위 업데이트     │      │
│  │  • 정확한 결과          │      │  • 근사치 결과          │      │
│  └───────────┬─────────────┘      └───────────┬─────────────┘      │
│              │                                 │                    │
│              │    ┌─────────────────────┐     │                    │
│              └───▶│   Serving Layer     │◀────┘                    │
│                   │                     │                          │
│                   │  • Batch View       │                          │
│                   │  • Real-time View   │                          │
│                   │  • Merged View      │                          │
│                   └──────────┬──────────┘                          │
│                              │                                      │
│                              ▼                                      │
│                   ┌─────────────────────┐                          │
│                   │    Query Layer      │                          │
│                   │  (Athena/Redshift)  │                          │
│                   └─────────────────────┘                          │
└─────────────────────────────────────────────────────────────────────┘

쿼리 예시:
-- 배치 뷰 (어제까지의 정확한 데이터)
SELECT * FROM batch_daily_sales WHERE date < CURRENT_DATE

UNION ALL

-- 실시간 뷰 (오늘의 근사치 데이터)
SELECT * FROM realtime_sales WHERE date = CURRENT_DATE

💡 Kappa Architecture 대안

Kappa Architecture는 스트림 처리만으로 모든 것을 처리합니다. Kafka의 로그 보존과 재처리 기능을 활용하여 배치 레이어를 제거합니다. 단순하지만 대용량 재처리 시 비용이 높을 수 있습니다.

4. 정리 및 다음 세션 예고

4.1 핵심 요약

스트림 처리 기본 개념

윈도우(Tumbling, Sliding, Session)로 무한 스트림을 유한 단위로 분할. Event Time과 Watermark로 지연 이벤트 처리.

Amazon Managed Flink

완전 관리형 Apache Flink. SQL 또는 Java/Python API로 복잡한 스트림 처리. Exactly-once 보장, 자동 스케일링, 체크포인트 기반 장애 복구.

실시간 분석 패턴

실시간 대시보드(OpenSearch/QuickSight), 이상 탐지(Z-Score, 규칙 기반), Lambda Architecture로 정확성과 실시간성 동시 달성.

4.2 서비스 선택 가이드

사용 사례권장 서비스지연 시간
복잡한 스트림 처리Managed Flink밀리초~초
간단한 변환/필터Lambda + Kinesis
S3 적재 (Near RT)Kinesis Firehose60초~
마이크로배치Glue Streaming
Kafka 에코시스템MSK + Flink밀리초~초

4.3 다음 세션 예고

Session 6: Amazon Redshift 심층 분석

클라우드 데이터 웨어하우스의 핵심인 Redshift를 깊이 있게 학습합니다.

  • Redshift 아키텍처 (MPP, 컬럼형 저장)
  • 분산키(DISTKEY)와 정렬키(SORTKEY) 설계
  • Redshift Serverless vs Provisioned
  • Redshift Spectrum으로 Data Lake 쿼리
  • 성능 튜닝 및 비용 최적화

4.4 실습 과제

과제 1: Flink SQL 실시간 집계

Kinesis Data Generator로 클릭스트림 데이터를 생성하고, Managed Flink SQL로 분당 페이지뷰를 집계하세요.

과제 2: 실시간 대시보드

Flink 집계 결과를 OpenSearch로 전송하고, OpenSearch Dashboards에서 실시간 시각화를 구성하세요.

과제 3: Lambda Architecture

동일한 데이터 소스에 대해 배치 레이어(Firehose → S3 → Athena)와 스피드 레이어(Flink)를 구축하고 결과를 비교하세요.

5. Amazon Kinesis Data Analytics (SQL)

Kinesis Data Analytics for SQL은 SQL 쿼리를 사용하여 스트리밍 데이터를 실시간으로 분석합니다. Apache Flink보다 간단한 사용 사례에 적합하며, 빠른 프로토타이핑에 유용합니다.

Kinesis Analytics SQL 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Kinesis Data Analytics for SQL                            │
│                                                                              │
│  ┌─────────────────┐                                                        │
│  │  Input Stream   │                                                        │
│  │  (Kinesis/      │                                                        │
│  │   Firehose)     │                                                        │
│  └────────┬────────┘                                                        │
│           │                                                                  │
│           ▼                                                                  │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │                    In-Application Streams                                ││
│  │                                                                          ││
│  │  ┌─────────────────────────────────────────────────────────────────┐    ││
│  │  │  SOURCE_SQL_STREAM_001                                          │    ││
│  │  │  ├── Column: ticker (VARCHAR)                                   │    ││
│  │  │  ├── Column: price (DECIMAL)                                    │    ││
│  │  │  ├── Column: volume (INTEGER)                                   │    ││
│  │  │  └── Column: event_time (TIMESTAMP)                             │    ││
│  │  └─────────────────────────────────────────────────────────────────┘    ││
│  │                              │                                           ││
│  │                              ▼ SQL Query                                 ││
│  │  ┌─────────────────────────────────────────────────────────────────┐    ││
│  │  │  DESTINATION_SQL_STREAM                                         │    ││
│  │  │  (Aggregated Results)                                           │    ││
│  │  └─────────────────────────────────────────────────────────────────┘    ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│           │                                                                  │
│           ▼                                                                  │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐             │
│  │  Kinesis Stream │  │    Firehose     │  │     Lambda      │             │
│  │  (Output)       │  │    (S3/RS)      │  │   (Custom)      │             │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘             │
└─────────────────────────────────────────────────────────────────────────────┘

5.1 스트리밍 SQL 쿼리

윈도우 함수와 집계

// Tumbling Window - 1분 단위 집계

-- 입력 스트림 정의
CREATE OR REPLACE STREAM "SOURCE_SQL_STREAM_001" (
    "ticker"     VARCHAR(4),
    "price"      DECIMAL(10,2),
    "volume"     INTEGER,
    "event_time" TIMESTAMP
);

-- 1분 단위 VWAP (Volume Weighted Average Price) 계산
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "ticker"       VARCHAR(4),
    "window_start" TIMESTAMP,
    "window_end"   TIMESTAMP,
    "vwap"         DECIMAL(10,4),
    "total_volume" INTEGER,
    "trade_count"  INTEGER
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
    "ticker",
    STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE) AS "window_start",
    STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE) 
        + INTERVAL '1' MINUTE AS "window_end",
    SUM("price" * "volume") / SUM("volume") AS "vwap",
    SUM("volume") AS "total_volume",
    COUNT(*) AS "trade_count"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY 
    "ticker",
    STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE);

// Sliding Window - 5분 이동 평균

-- 5분 이동 평균 (1분마다 업데이트)
CREATE OR REPLACE STREAM "MOVING_AVG_STREAM" (
    "ticker"        VARCHAR(4),
    "event_time"    TIMESTAMP,
    "current_price" DECIMAL(10,2),
    "avg_price_5m"  DECIMAL(10,4),
    "min_price_5m"  DECIMAL(10,2),
    "max_price_5m"  DECIMAL(10,2)
);

CREATE OR REPLACE PUMP "MOVING_AVG_PUMP" AS
INSERT INTO "MOVING_AVG_STREAM"
SELECT STREAM
    "ticker",
    ROWTIME AS "event_time",
    "price" AS "current_price",
    AVG("price") OVER W AS "avg_price_5m",
    MIN("price") OVER W AS "min_price_5m",
    MAX("price") OVER W AS "max_price_5m"
FROM "SOURCE_SQL_STREAM_001"
WINDOW W AS (
    PARTITION BY "ticker"
    RANGE INTERVAL '5' MINUTE PRECEDING
);

// Stagger Window - 지연 데이터 처리

-- Stagger Window: 늦게 도착하는 데이터 처리
-- 파티션 키가 처음 나타난 시점부터 윈도우 시작
CREATE OR REPLACE STREAM "STAGGER_WINDOW_STREAM" (
    "ticker"       VARCHAR(4),
    "window_start" TIMESTAMP,
    "avg_price"    DECIMAL(10,4),
    "record_count" INTEGER
);

CREATE OR REPLACE PUMP "STAGGER_PUMP" AS
INSERT INTO "STAGGER_WINDOW_STREAM"
SELECT STREAM
    "ticker",
    FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "window_start",
    AVG("price") AS "avg_price",
    COUNT(*) AS "record_count"
FROM "SOURCE_SQL_STREAM_001"
WINDOWED BY STAGGER (
    PARTITION BY "ticker", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE)
    RANGE INTERVAL '1' MINUTE
);

5.2 이상 탐지

실시간 이상 탐지 패턴

// RANDOM_CUT_FOREST - ML 기반 이상 탐지

-- Random Cut Forest 알고리즘을 사용한 이상 탐지
-- Kinesis Analytics에 내장된 ML 함수

CREATE OR REPLACE STREAM "ANOMALY_DETECTION_STREAM" (
    "ticker"        VARCHAR(4),
    "event_time"    TIMESTAMP,
    "price"         DECIMAL(10,2),
    "volume"        INTEGER,
    "anomaly_score" DOUBLE,
    "is_anomaly"    BOOLEAN
);

CREATE OR REPLACE PUMP "ANOMALY_PUMP" AS
INSERT INTO "ANOMALY_DETECTION_STREAM"
SELECT STREAM
    "ticker",
    ROWTIME AS "event_time",
    "price",
    "volume",
    "ANOMALY_SCORE" AS "anomaly_score",
    CASE 
        WHEN "ANOMALY_SCORE" > 2.0 THEN true 
        ELSE false 
    END AS "is_anomaly"
FROM TABLE(
    RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM "ticker", "price", "volume" 
               FROM "SOURCE_SQL_STREAM_001"),
        100,    -- numberOfTrees
        256,    -- subSampleSize
        100000, -- timeDecay
        1       -- shingleSize
    )
);

// 통계 기반 이상 탐지 (Z-Score)

-- Z-Score 기반 이상 탐지
-- 평균에서 3 표준편차 이상 벗어난 값을 이상으로 판단

CREATE OR REPLACE STREAM "ZSCORE_ANOMALY_STREAM" (
    "ticker"     VARCHAR(4),
    "event_time" TIMESTAMP,
    "price"      DECIMAL(10,2),
    "avg_price"  DECIMAL(10,4),
    "stddev"     DECIMAL(10,4),
    "z_score"    DECIMAL(10,4),
    "is_anomaly" BOOLEAN
);

CREATE OR REPLACE PUMP "ZSCORE_PUMP" AS
INSERT INTO "ZSCORE_ANOMALY_STREAM"
SELECT STREAM
    "ticker",
    ROWTIME AS "event_time",
    "price",
    AVG("price") OVER W AS "avg_price",
    STDDEV_POP("price") OVER W AS "stddev",
    CASE 
        WHEN STDDEV_POP("price") OVER W > 0 
        THEN ("price" - AVG("price") OVER W) / STDDEV_POP("price") OVER W
        ELSE 0 
    END AS "z_score",
    CASE 
        WHEN ABS(("price" - AVG("price") OVER W) / 
                 NULLIF(STDDEV_POP("price") OVER W, 0)) > 3 
        THEN true 
        ELSE false 
    END AS "is_anomaly"
FROM "SOURCE_SQL_STREAM_001"
WINDOW W AS (
    PARTITION BY "ticker"
    ROWS 100 PRECEDING
);

5.3 Reference Data 조인

스트리밍 데이터를 S3의 참조 데이터와 조인하여 데이터를 보강할 수 있습니다.

-- S3 참조 데이터 테이블 정의
CREATE OR REPLACE TABLE "TICKER_INFO" (
    "ticker"       VARCHAR(4),
    "company_name" VARCHAR(100),
    "sector"       VARCHAR(50),
    "market_cap"   BIGINT
)
WITH (
    's3.bucket' = 'my-reference-data',
    's3.path' = 'ticker_info.csv',
    's3.format' = 'CSV'
);

-- 스트림과 참조 데이터 조인
CREATE OR REPLACE STREAM "ENRICHED_STREAM" (
    "ticker"       VARCHAR(4),
    "company_name" VARCHAR(100),
    "sector"       VARCHAR(50),
    "price"        DECIMAL(10,2),
    "volume"       INTEGER,
    "event_time"   TIMESTAMP
);

CREATE OR REPLACE PUMP "ENRICH_PUMP" AS
INSERT INTO "ENRICHED_STREAM"
SELECT STREAM
    s."ticker",
    t."company_name",
    t."sector",
    s."price",
    s."volume",
    s.ROWTIME AS "event_time"
FROM "SOURCE_SQL_STREAM_001" s
LEFT JOIN "TICKER_INFO" t
ON s."ticker" = t."ticker";

Kinesis Analytics SQL vs Flink 비교

특성Kinesis Analytics SQLManaged Flink
언어SQL onlySQL, Java, Python, Scala
복잡도간단한 집계, 필터링복잡한 CEP, 상태 관리
ML 통합RANDOM_CUT_FOREST 내장외부 모델 호출
상태 크기제한적대용량 상태 지원
권장 사용프로토타이핑, 간단한 분석프로덕션, 복잡한 처리

6. Amazon MSK (Managed Streaming for Kafka)

Amazon MSK는 Apache Kafka를 완전 관리형으로 제공합니다. 대규모 실시간 데이터 스트리밍에 적합하며, Kafka 생태계의 모든 도구와 호환됩니다.

MSK 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Amazon MSK Architecture                                   │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │                         VPC                                              ││
│  │                                                                          ││
│  │  ┌─────────────────────────────────────────────────────────────────┐    ││
│  │  │  MSK Cluster                                                    │    ││
│  │  │                                                                 │    ││
│  │  │  AZ-a              AZ-b              AZ-c                      │    ││
│  │  │  ┌──────────┐     ┌──────────┐     ┌──────────┐               │    ││
│  │  │  │ Broker 1 │     │ Broker 2 │     │ Broker 3 │               │    ││
│  │  │  │          │     │          │     │          │               │    ││
│  │  │  │ Topic A  │◄───▶│ Topic A  │◄───▶│ Topic A  │               │    ││
│  │  │  │ (Leader) │     │(Follower)│     │(Follower)│               │    ││
│  │  │  │          │     │          │     │          │               │    ││
│  │  │  │ Topic B  │     │ Topic B  │     │ Topic B  │               │    ││
│  │  │  │(Follower)│     │ (Leader) │     │(Follower)│               │    ││
│  │  │  └──────────┘     └──────────┘     └──────────┘               │    ││
│  │  │       │                │                │                      │    ││
│  │  │       └────────────────┼────────────────┘                      │    ││
│  │  │                        │                                       │    ││
│  │  │                        ▼                                       │    ││
│  │  │  ┌─────────────────────────────────────────────────────────┐  │    ││
│  │  │  │  Apache ZooKeeper (Managed)                             │  │    ││
│  │  │  │  • Cluster coordination                                 │  │    ││
│  │  │  │  • Leader election                                      │  │    ││
│  │  │  │  • Configuration management                             │  │    ││
│  │  │  └─────────────────────────────────────────────────────────┘  │    ││
│  │  └─────────────────────────────────────────────────────────────────┘    ││
│  │                                                                          ││
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐         ││
│  │  │   Producers     │  │   Consumers     │  │  Kafka Connect  │         ││
│  │  │   (EC2/ECS/     │  │   (Flink/       │  │  (S3/RDS/       │         ││
│  │  │    Lambda)      │  │    Lambda)      │  │   OpenSearch)   │         ││
│  │  └─────────────────┘  └─────────────────┘  └─────────────────┘         ││
│  └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

MSK Provisioned

  • • 브로커 인스턴스 직접 선택
  • • 예측 가능한 성능
  • • 대규모 워크로드

MSK Serverless

  • • 자동 용량 조절
  • • 사용량 기반 과금
  • • 운영 부담 최소화

MSK Connect

  • • Kafka Connect 관리형
  • • Source/Sink 커넥터
  • • 자동 스케일링

6.1 MSK 클러스터 구성

Terraform 구성
# MSK Cluster 생성
resource "aws_msk_cluster" "main" {
  cluster_name           = "data-platform-msk"
  kafka_version          = "3.5.1"
  number_of_broker_nodes = 3

  broker_node_group_info {
    instance_type   = "kafka.m5.large"
    client_subnets  = var.private_subnet_ids
    security_groups = [aws_security_group.msk.id]
    
    storage_info {
      ebs_storage_info {
        volume_size = 1000  # GB
        provisioned_throughput {
          enabled           = true
          volume_throughput = 250  # MB/s
        }
      }
    }
  }

  # 암호화 설정
  encryption_info {
    encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
  }

  # 인증 설정
  client_authentication {
    sasl {
      iam   = true
      scram = true
    }
    tls {
      certificate_authority_arns = [aws_acmpca_certificate_authority.msk.arn]
    }
  }

  # 모니터링 설정
  open_monitoring {
    prometheus {
      jmx_exporter {
        enabled_in_broker = true
      }
      node_exporter {
        enabled_in_broker = true
      }
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.msk.name
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.msk_logs.id
        prefix  = "logs/msk/"
      }
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.main.arn
    revision = aws_msk_configuration.main.latest_revision
  }

  tags = {
    Environment = "production"
  }
}

# MSK 구성
resource "aws_msk_configuration" "main" {
  name              = "data-platform-msk-config"
  kafka_versions    = ["3.5.1"]
  
  server_properties = <<PROPERTIES
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.partitions=6
log.retention.hours=168
log.retention.bytes=1073741824
compression.type=lz4
message.max.bytes=10485760
PROPERTIES
}

# MSK Serverless
resource "aws_msk_serverless_cluster" "serverless" {
  cluster_name = "data-platform-msk-serverless"

  vpc_config {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.msk.id]
  }

  client_authentication {
    sasl {
      iam {
        enabled = true
      }
    }
  }
}

6.2 Producer/Consumer 코드

Python Kafka 클라이언트

// Producer - IAM 인증

from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import socket

class MSKTokenProvider:
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('ap-northeast-2')
        return token

def create_producer(bootstrap_servers):
    return KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=MSKTokenProvider(),
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        acks='all',  # 모든 replica에 쓰기 완료 확인
        retries=3,
        compression_type='lz4',
        batch_size=16384,
        linger_ms=10,
        buffer_memory=33554432
    )

def send_events(producer, topic, events):
    for event in events:
        future = producer.send(
            topic,
            key=event.get('key'),
            value=event,
            headers=[('source', b'data-platform')]
        )
        # 동기 전송 (에러 확인)
        try:
            record_metadata = future.get(timeout=10)
            print(f"Sent to {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")
        except Exception as e:
            print(f"Failed to send: {e}")
    
    producer.flush()

# 사용 예시
if __name__ == "__main__":
    bootstrap_servers = "b-1.msk-cluster.xxx.kafka.ap-northeast-2.amazonaws.com:9098"
    producer = create_producer(bootstrap_servers)
    
    events = [
        {"key": "user-123", "event": "page_view", "page": "/products", "timestamp": "2024-01-15T10:30:00Z"},
        {"key": "user-456", "event": "purchase", "amount": 99.99, "timestamp": "2024-01-15T10:31:00Z"}
    ]
    send_events(producer, "user-events", events)

// Consumer - Consumer Group

from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json

def create_consumer(bootstrap_servers, group_id, topics):
    return KafkaConsumer(
        *topics,
        bootstrap_servers=bootstrap_servers,
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=MSKTokenProvider(),
        group_id=group_id,
        auto_offset_reset='earliest',  # 처음부터 읽기
        enable_auto_commit=False,  # 수동 커밋
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        max_poll_records=500,
        session_timeout_ms=30000,
        heartbeat_interval_ms=10000
    )

def consume_events(consumer, process_func):
    try:
        while True:
            # 메시지 폴링 (최대 1초 대기)
            records = consumer.poll(timeout_ms=1000, max_records=100)
            
            for topic_partition, messages in records.items():
                for message in messages:
                    try:
                        # 메시지 처리
                        process_func(message.value)
                        print(f"Processed: {message.topic}:{message.partition}:{message.offset}")
                    except Exception as e:
                        print(f"Processing error: {e}")
                        # Dead Letter Queue로 전송 또는 재시도 로직
                        
            # 처리 완료 후 오프셋 커밋
            if records:
                consumer.commit()
                
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

def process_event(event):
    """이벤트 처리 로직"""
    event_type = event.get('event')
    if event_type == 'purchase':
        # 구매 이벤트 처리
        print(f"Purchase: {event.get('amount')}")
    elif event_type == 'page_view':
        # 페이지뷰 처리
        print(f"Page view: {event.get('page')}")

# 사용 예시
if __name__ == "__main__":
    bootstrap_servers = "b-1.msk-cluster.xxx.kafka.ap-northeast-2.amazonaws.com:9098"
    consumer = create_consumer(
        bootstrap_servers, 
        group_id="analytics-consumer-group",
        topics=["user-events"]
    )
    consume_events(consumer, process_event)

6.3 MSK Connect

MSK Connect는 Kafka Connect를 관리형으로 제공하여 다양한 데이터 소스/싱크와 연동합니다.

// S3 Sink Connector 설정

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "4",
  "topics": "user-events,orders",
  
  "s3.bucket.name": "data-lake-raw",
  "s3.region": "ap-northeast-2",
  "s3.part.size": "5242880",
  
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
  "partition.duration.ms": "3600000",
  "locale": "en-US",
  "timezone": "UTC",
  
  "flush.size": "10000",
  "rotate.interval.ms": "600000",
  
  "behavior.on.null.values": "ignore",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-s3-sink",
  "errors.deadletterqueue.topic.replication.factor": "3"
}

Kinesis vs MSK 선택 기준

기준Kinesis Data StreamsAmazon MSK
운영 복잡도낮음 (완전 관리형)중간 (브로커 관리 필요)
데이터 보존최대 365일무제한 (스토리지 기반)
처리량샤드당 1MB/s브로커당 수백 MB/s
생태계AWS 서비스 통합Kafka 생태계 전체
권장 사용AWS 네이티브, 간단한 스트리밍Kafka 마이그레이션, 복잡한 처리

7. 핵심 요약

실시간 처리 핵심 포인트

스트림 처리 개념

  • • Event Time vs Processing Time: 이벤트 발생 시간 vs 처리 시간
  • • Watermark: 지연 데이터 처리를 위한 진행 표시자
  • • Window: Tumbling(고정), Sliding(이동), Session(세션 기반)
  • • Exactly-once: 체크포인트 + 트랜잭션으로 정확히 한 번 처리

Amazon Managed Flink

  • • Apache Flink 완전 관리형 서비스
  • • Flink SQL: 선언적 스트림 처리
  • • PyFlink: Python API 지원
  • • 자동 스케일링, 체크포인트, 스냅샷
  • • Kinesis, MSK, S3 등 다양한 커넥터

실시간 분석 패턴

  • • Lambda Architecture: 배치 + 스피드 레이어 조합
  • • Kappa Architecture: 스트림 처리만으로 통합
  • • CEP (Complex Event Processing): 패턴 매칭
  • • 실시간 집계 → OpenSearch/Timestream

Kinesis Analytics SQL

  • • SQL 기반 간단한 스트림 처리
  • • RANDOM_CUT_FOREST: 내장 이상 탐지
  • • Reference Data 조인 (S3)
  • • 프로토타이핑에 적합

Amazon MSK

  • • Apache Kafka 완전 관리형
  • • MSK Provisioned vs Serverless
  • • MSK Connect: Kafka Connect 관리형
  • • IAM/SASL/TLS 인증 지원

🎯 시험 포인트

Watermark

지연 데이터 허용 범위 설정, Event Time 기반 윈도우에 필수

Exactly-once

Flink 체크포인트 + Kafka 트랜잭션 = 정확히 한 번 처리

Lambda vs Kappa

Lambda = 배치+스트림 분리, Kappa = 스트림 통합

Kinesis vs MSK

Kinesis = AWS 네이티브, MSK = Kafka 생태계

Flink vs Analytics SQL

Flink = 복잡한 처리, SQL = 간단한 집계/프로토타입

이상 탐지

RANDOM_CUT_FOREST (Analytics SQL) 또는 Flink ML

서비스 선택 가이드

요구사항권장 서비스이유
간단한 실시간 집계Kinesis Analytics SQLSQL만으로 빠른 구현
복잡한 CEP/상태 관리Managed Flink대용량 상태, 복잡한 로직
Kafka 마이그레이션Amazon MSKKafka API 호환
AWS 서비스 통합Kinesis Data StreamsLambda, Firehose 연동
실시간 대시보드Flink → OpenSearch실시간 집계 + 시각화
시계열 분석Flink → Timestream시계열 최적화 스토리지

아키텍처 패턴 비교

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Architecture Pattern Comparison                           │
│                                                                              │
│  Lambda Architecture                                                         │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  Source → Batch Layer (Glue/EMR) → Serving Layer (Redshift)            ││
│  │       └→ Speed Layer (Flink)     → Serving Layer (OpenSearch)          ││
│  │                                                                          ││
│  │  장점: 정확성 + 실시간성                                                 ││
│  │  단점: 두 시스템 유지 관리                                               ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Kappa Architecture                                                          │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  Source → Stream Layer (Flink) → Serving Layer                         ││
│  │                                                                          ││
│  │  장점: 단일 코드베이스, 간단한 운영                                      ││
│  │  단점: 대규모 재처리 시 비용                                             ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Hybrid (권장)                                                               │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  Source → Kinesis → Flink (실시간) → OpenSearch (대시보드)              ││
│  │                  └→ Firehose → S3 → Glue (배치) → Redshift (분석)       ││
│  │                                                                          ││
│  │  장점: 실시간 + 배치 분리, 비용 최적화                                   ││
│  └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

비용 최적화 팁

Kinesis
  • • On-demand 모드: 예측 불가 트래픽
  • • Provisioned: 안정적 트래픽
  • • Enhanced Fan-out: 필요시만 사용
  • • 데이터 보존 기간 최소화
Flink
  • • 자동 스케일링 활성화
  • • 적절한 병렬 처리 수준
  • • 체크포인트 간격 조정
  • • 상태 TTL 설정