"실시간 데이터 처리는 비즈니스가 과거를 분석하는 것에서 현재에 대응하는 것으로 전환하게 한다."
— Stream Processing Principles
┌─────────────────────────────────────────────────────────────────────┐
│ 배치 vs 스트림 처리 │
│ │
│ 배치 처리 (Batch Processing) │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 데이터 축적 ││
│ │ ████████████████████████████████████████ ││
│ │ ─────────────────────────────────────────▶ 시간 ││
│ │ │ ││
│ │ ▼ ││
│ │ ┌─────────────┐ ││
│ │ │ 일괄 처리 │ ││
│ │ │ (1시간 후) │ ││
│ │ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ 스트림 처리 (Stream Processing) │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 연속적 데이터 흐름 ││
│ │ ●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──▶ ││
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ││
│ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ││
│ │ 즉시 처리 (밀리초~초 단위) ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────────┐
│ 윈도우 유형 │
│ │
│ 1. Tumbling Window (고정 윈도우) │
│ ┌─────────┐┌─────────┐┌─────────┐┌─────────┐ │
│ │ Window1 ││ Window2 ││ Window3 ││ Window4 │ 겹치지 않음 │
│ │ 5분 ││ 5분 ││ 5분 ││ 5분 │ │
│ └─────────┘└─────────┘└─────────┘└─────────┘ │
│ ──────────────────────────────────────────────▶ 시간 │
│ │
│ 2. Sliding Window (슬라이딩 윈도우) │
│ ┌─────────────┐ │
│ │ Window1 │ │
│ └─────────────┘ │
│ ┌─────────────┐ │
│ │ Window2 │ 윈도우 크기: 10분 │
│ └─────────────┘ 슬라이드: 5분 │
│ ┌─────────────┐ │
│ │ Window3 │ 겹침 발생 │
│ └─────────────┘ │
│ ──────────────────────────────────────────────▶ 시간 │
│ │
│ 3. Session Window (세션 윈도우) │
│ ┌───────┐ ┌─────────────┐ ┌───┐ │
│ │Session│ │ Session │ │Ses│ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └───────┘ └─────────────┘ └───┘ │
│ ●●●● gap ●●●●●●●●●● gap ●● │
│ ──────────────────────────────────────────────▶ 시간 │
│ 활동 기반, 비활성 간격으로 구분 │
└─────────────────────────────────────────────────────────────────────┘분당 집계, 시간당 통계
이동 평균, 트렌드 분석
사용자 세션 분석
┌─────────────────────────────────────────────────────────────────────┐
│ Event Time vs Processing Time │
│ │
│ Event Time: 이벤트가 실제로 발생한 시간 │
│ Processing Time: 시스템이 이벤트를 처리하는 시간 │
│ │
│ 이벤트 발생 네트워크 지연 처리 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 10:00:00│───────▶│ 지연... │─────▶│ 10:00:05│ │
│ │(Event T)│ │ │ │(Proc T) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 문제: 늦게 도착한 이벤트 (Late Event) │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Window: 10:00 ~ 10:05 ││
│ │ ││
│ │ 정상 이벤트: 10:02 발생 → 10:02 도착 ✓ ││
│ │ 지연 이벤트: 10:03 발생 → 10:07 도착 ✗ (윈도우 종료 후) ││
│ │ ││
│ │ 해결: Watermark로 허용 지연 시간 설정 ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘워터마크는 "이 시간 이전의 모든 이벤트가 도착했다"고 가정하는 임계값입니다. 예: Watermark = 현재 시간 - 5분이면, 5분 이상 지연된 이벤트는 무시하거나 별도 처리합니다.
| 서비스 | 엔진 | 사용 사례 | 관리 수준 |
|---|---|---|---|
| Kinesis Data Analytics | Apache Flink | 복잡한 스트림 처리 | 완전 관리형 |
| MSK + Flink | Kafka + Flink | Kafka 에코시스템 | 관리형 |
| Lambda | 서버리스 | 간단한 변환 | 완전 관리형 |
| Glue Streaming | Spark Streaming | 마이크로배치 | 완전 관리형 |
Amazon Managed Service for Apache Flink(구 Kinesis Data Analytics for Apache Flink)는 완전 관리형 Apache Flink 서비스입니다. 복잡한 스트림 처리 로직을 구현할 수 있습니다.
┌─────────────────────────────────────────────────────────────────────┐
│ Apache Flink Architecture │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Flink Application ││
│ │ ││
│ │ Source ──▶ Transformation ──▶ Transformation ──▶ Sink ││
│ │ │ │ │ │ ││
│ │ ▼ ▼ ▼ ▼ ││
│ │ Kinesis Filter Window S3 ││
│ │ Stream Map Aggregate Kinesis ││
│ │ Join Redshift ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Flink Runtime ││
│ │ ││
│ │ ┌─────────────┐ ┌─────────────────────────────────────────┐ ││
│ │ │ JobManager │ │ TaskManagers │ ││
│ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││
│ │ │ • 작업 조율 │ │ │ Task │ │ Task │ │ Task │ │ ││
│ │ │ • 체크포인트│ │ │ Slot 1 │ │ Slot 2 │ │ Slot N │ │ ││
│ │ │ • 장애 복구 │ │ └─────────┘ └─────────┘ └─────────┘ │ ││
│ │ └─────────────┘ └─────────────────────────────────────────┘ ││
│ │ ││
│ │ State Backend: RocksDB (대용량) / Heap (소용량) ││
│ │ Checkpoint Storage: S3 ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘// Flink SQL - 실시간 집계
-- 소스 테이블 정의 (Kinesis)
CREATE TABLE clickstream (
user_id STRING,
page_url STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'clickstream-events',
'aws.region' = 'ap-northeast-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
-- 싱크 테이블 정의 (Kinesis)
CREATE TABLE page_views_per_minute (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page_url STRING,
view_count BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'page-views-aggregated',
'aws.region' = 'ap-northeast-2',
'format' = 'json'
);
-- 실시간 집계 쿼리
INSERT INTO page_views_per_minute
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
page_url,
COUNT(*) as view_count
FROM clickstream
WHERE event_type = 'page_view'
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
page_url;// PyFlink - 이상 탐지 예시
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy
env = StreamExecutionEnvironment.get_execution_environment()
# Kinesis 소스
consumer = FlinkKinesisConsumer(
"sensor-data",
SimpleStringSchema(),
Properties({
"aws.region": "ap-northeast-2",
"stream.initial.position": "LATEST"
})
)
# 스트림 처리
stream = env.add_source(consumer) \
.map(lambda x: json.loads(x)) \
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(lambda e, _: e['timestamp'])
)
# 이상 탐지: 5분 윈도우에서 평균 대비 3 표준편차 초과
alerts = stream \
.key_by(lambda x: x['sensor_id']) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.process(AnomalyDetector())
# 알림 전송
alerts.add_sink(KinesisSink("anomaly-alerts"))
env.execute("Anomaly Detection")// Terraform - Managed Flink Application
resource "aws_kinesisanalyticsv2_application" "flink_app" {
name = "realtime-analytics"
runtime_environment = "FLINK-1_18"
service_execution_role = aws_iam_role.flink.arn
application_configuration {
application_code_configuration {
code_content {
s3_content_location {
bucket_arn = aws_s3_bucket.code.arn
file_key = "flink-app.jar"
}
}
code_content_type = "ZIPFILE"
}
flink_application_configuration {
checkpoint_configuration {
configuration_type = "CUSTOM"
checkpointing_enabled = true
checkpoint_interval = 60000 # 1분
min_pause_between_checkpoints = 5000
}
monitoring_configuration {
configuration_type = "CUSTOM"
log_level = "INFO"
metrics_level = "APPLICATION"
}
parallelism_configuration {
configuration_type = "CUSTOM"
parallelism = 4
parallelism_per_kpu = 1
auto_scaling_enabled = true
}
}
environment_properties {
property_group {
property_group_id = "FlinkApplicationProperties"
property_map = {
"input.stream.name" = "clickstream-events"
"output.stream.name" = "aggregated-events"
"aws.region" = "ap-northeast-2"
}
}
}
}
}┌─────────────────────────────────────────────────────────────────────┐
│ 실시간 대시보드 아키텍처 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ 이벤트 │───▶│ Kinesis │───▶│ Flink │───▶│ OpenSearch │ │
│ │ 소스 │ │ Streams │ │ (집계) │ │ (인덱싱) │ │
│ └─────────┘ └─────────┘ └─────────┘ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ OpenSearch │ │
│ │ Dashboards │ │
│ │ (시각화) │ │
│ └─────────────────┘ │
│ │
│ 대안 아키텍처 (QuickSight) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Kinesis │───▶│Firehose │───▶│ S3 │───▶│ QuickSight │ │
│ │ Streams │ │(Parquet)│ │(Athena) │ │ (SPICE) │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────────────┘ │
│ │
│ 지연: 1분~5분 (Near Real-time) │
└─────────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────────┐
│ 실시간 이상 탐지 아키텍처 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────────────────────┐ │
│ │ 센서 │───▶│ Kinesis │───▶│ Flink Application │ │
│ │ 데이터 │ │ Streams │ │ │ │
│ └─────────┘ └─────────┘ │ ┌─────────────────────────────┐│ │
│ │ │ 이상 탐지 로직 ││ │
│ │ │ ││ │
│ │ │ 1. 통계적 방법 ││ │
│ │ │ - Z-Score ││ │
│ │ │ - 이동 평균 편차 ││ │
│ │ │ ││ │
│ │ │ 2. 규칙 기반 ││ │
│ │ │ - 임계값 초과 ││ │
│ │ │ - 패턴 매칭 ││ │
│ │ │ ││ │
│ │ │ 3. ML 기반 ││ │
│ │ │ - SageMaker 연동 ││ │
│ │ └─────────────────────────────┘│ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐│
│ │ SNS │ │ Lambda │ │ S3 ││
│ │ (알림) │ │(자동화) │ │ (로그) ││
│ └─────────┘ └─────────┘ └─────────┘│
└─────────────────────────────────────────────────────────────────────┘// Flink SQL - Z-Score 기반 이상 탐지
-- 5분 윈도우 통계 계산
CREATE VIEW sensor_stats AS
SELECT
sensor_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
AVG(value) as avg_value,
STDDEV(value) as stddev_value,
COUNT(*) as sample_count
FROM sensor_readings
GROUP BY
sensor_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- 이상치 탐지 (Z-Score > 3)
INSERT INTO anomaly_alerts
SELECT
r.sensor_id,
r.event_time,
r.value,
s.avg_value,
s.stddev_value,
ABS(r.value - s.avg_value) / NULLIF(s.stddev_value, 0) as z_score
FROM sensor_readings r
JOIN sensor_stats s
ON r.sensor_id = s.sensor_id
WHERE ABS(r.value - s.avg_value) / NULLIF(s.stddev_value, 0) > 3;Lambda Architecture는 배치 레이어와 스피드 레이어를 결합하여 정확성과 실시간성을 모두 달성합니다.
┌─────────────────────────────────────────────────────────────────────┐
│ Lambda Architecture │
│ │
│ ┌─────────────┐ │
│ │ Source │ │
│ │ (Kinesis) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Batch Layer │ │ Speed Layer │ │
│ │ │ │ │ │
│ │ Firehose → S3 → Glue │ │ Flink (실시간 집계) │ │
│ │ │ │ │ │
│ │ • 전체 데이터 저장 │ │ • 최근 데이터만 처리 │ │
│ │ • 일/시간 단위 배치 │ │ • 초 단위 업데이트 │ │
│ │ • 정확한 결과 │ │ • 근사치 결과 │ │
│ └───────────┬─────────────┘ └───────────┬─────────────┘ │
│ │ │ │
│ │ ┌─────────────────────┐ │ │
│ └───▶│ Serving Layer │◀────┘ │
│ │ │ │
│ │ • Batch View │ │
│ │ • Real-time View │ │
│ │ • Merged View │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Query Layer │ │
│ │ (Athena/Redshift) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
쿼리 예시:
-- 배치 뷰 (어제까지의 정확한 데이터)
SELECT * FROM batch_daily_sales WHERE date < CURRENT_DATE
UNION ALL
-- 실시간 뷰 (오늘의 근사치 데이터)
SELECT * FROM realtime_sales WHERE date = CURRENT_DATEKappa Architecture는 스트림 처리만으로 모든 것을 처리합니다. Kafka의 로그 보존과 재처리 기능을 활용하여 배치 레이어를 제거합니다. 단순하지만 대용량 재처리 시 비용이 높을 수 있습니다.
윈도우(Tumbling, Sliding, Session)로 무한 스트림을 유한 단위로 분할. Event Time과 Watermark로 지연 이벤트 처리.
완전 관리형 Apache Flink. SQL 또는 Java/Python API로 복잡한 스트림 처리. Exactly-once 보장, 자동 스케일링, 체크포인트 기반 장애 복구.
실시간 대시보드(OpenSearch/QuickSight), 이상 탐지(Z-Score, 규칙 기반), Lambda Architecture로 정확성과 실시간성 동시 달성.
| 사용 사례 | 권장 서비스 | 지연 시간 |
|---|---|---|
| 복잡한 스트림 처리 | Managed Flink | 밀리초~초 |
| 간단한 변환/필터 | Lambda + Kinesis | 초 |
| S3 적재 (Near RT) | Kinesis Firehose | 60초~ |
| 마이크로배치 | Glue Streaming | 분 |
| Kafka 에코시스템 | MSK + Flink | 밀리초~초 |
클라우드 데이터 웨어하우스의 핵심인 Redshift를 깊이 있게 학습합니다.
Kinesis Data Generator로 클릭스트림 데이터를 생성하고, Managed Flink SQL로 분당 페이지뷰를 집계하세요.
Flink 집계 결과를 OpenSearch로 전송하고, OpenSearch Dashboards에서 실시간 시각화를 구성하세요.
동일한 데이터 소스에 대해 배치 레이어(Firehose → S3 → Athena)와 스피드 레이어(Flink)를 구축하고 결과를 비교하세요.
Kinesis Data Analytics for SQL은 SQL 쿼리를 사용하여 스트리밍 데이터를 실시간으로 분석합니다. Apache Flink보다 간단한 사용 사례에 적합하며, 빠른 프로토타이핑에 유용합니다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Kinesis Data Analytics for SQL │
│ │
│ ┌─────────────────┐ │
│ │ Input Stream │ │
│ │ (Kinesis/ │ │
│ │ Firehose) │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ In-Application Streams ││
│ │ ││
│ │ ┌─────────────────────────────────────────────────────────────────┐ ││
│ │ │ SOURCE_SQL_STREAM_001 │ ││
│ │ │ ├── Column: ticker (VARCHAR) │ ││
│ │ │ ├── Column: price (DECIMAL) │ ││
│ │ │ ├── Column: volume (INTEGER) │ ││
│ │ │ └── Column: event_time (TIMESTAMP) │ ││
│ │ └─────────────────────────────────────────────────────────────────┘ ││
│ │ │ ││
│ │ ▼ SQL Query ││
│ │ ┌─────────────────────────────────────────────────────────────────┐ ││
│ │ │ DESTINATION_SQL_STREAM │ ││
│ │ │ (Aggregated Results) │ ││
│ │ └─────────────────────────────────────────────────────────────────┘ ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Kinesis Stream │ │ Firehose │ │ Lambda │ │
│ │ (Output) │ │ (S3/RS) │ │ (Custom) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘// Tumbling Window - 1분 단위 집계
-- 입력 스트림 정의
CREATE OR REPLACE STREAM "SOURCE_SQL_STREAM_001" (
"ticker" VARCHAR(4),
"price" DECIMAL(10,2),
"volume" INTEGER,
"event_time" TIMESTAMP
);
-- 1분 단위 VWAP (Volume Weighted Average Price) 계산
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"ticker" VARCHAR(4),
"window_start" TIMESTAMP,
"window_end" TIMESTAMP,
"vwap" DECIMAL(10,4),
"total_volume" INTEGER,
"trade_count" INTEGER
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
"ticker",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE) AS "window_start",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE)
+ INTERVAL '1' MINUTE AS "window_end",
SUM("price" * "volume") / SUM("volume") AS "vwap",
SUM("volume") AS "total_volume",
COUNT(*) AS "trade_count"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY
"ticker",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE);// Sliding Window - 5분 이동 평균
-- 5분 이동 평균 (1분마다 업데이트)
CREATE OR REPLACE STREAM "MOVING_AVG_STREAM" (
"ticker" VARCHAR(4),
"event_time" TIMESTAMP,
"current_price" DECIMAL(10,2),
"avg_price_5m" DECIMAL(10,4),
"min_price_5m" DECIMAL(10,2),
"max_price_5m" DECIMAL(10,2)
);
CREATE OR REPLACE PUMP "MOVING_AVG_PUMP" AS
INSERT INTO "MOVING_AVG_STREAM"
SELECT STREAM
"ticker",
ROWTIME AS "event_time",
"price" AS "current_price",
AVG("price") OVER W AS "avg_price_5m",
MIN("price") OVER W AS "min_price_5m",
MAX("price") OVER W AS "max_price_5m"
FROM "SOURCE_SQL_STREAM_001"
WINDOW W AS (
PARTITION BY "ticker"
RANGE INTERVAL '5' MINUTE PRECEDING
);// Stagger Window - 지연 데이터 처리
-- Stagger Window: 늦게 도착하는 데이터 처리
-- 파티션 키가 처음 나타난 시점부터 윈도우 시작
CREATE OR REPLACE STREAM "STAGGER_WINDOW_STREAM" (
"ticker" VARCHAR(4),
"window_start" TIMESTAMP,
"avg_price" DECIMAL(10,4),
"record_count" INTEGER
);
CREATE OR REPLACE PUMP "STAGGER_PUMP" AS
INSERT INTO "STAGGER_WINDOW_STREAM"
SELECT STREAM
"ticker",
FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "window_start",
AVG("price") AS "avg_price",
COUNT(*) AS "record_count"
FROM "SOURCE_SQL_STREAM_001"
WINDOWED BY STAGGER (
PARTITION BY "ticker", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE)
RANGE INTERVAL '1' MINUTE
);// RANDOM_CUT_FOREST - ML 기반 이상 탐지
-- Random Cut Forest 알고리즘을 사용한 이상 탐지
-- Kinesis Analytics에 내장된 ML 함수
CREATE OR REPLACE STREAM "ANOMALY_DETECTION_STREAM" (
"ticker" VARCHAR(4),
"event_time" TIMESTAMP,
"price" DECIMAL(10,2),
"volume" INTEGER,
"anomaly_score" DOUBLE,
"is_anomaly" BOOLEAN
);
CREATE OR REPLACE PUMP "ANOMALY_PUMP" AS
INSERT INTO "ANOMALY_DETECTION_STREAM"
SELECT STREAM
"ticker",
ROWTIME AS "event_time",
"price",
"volume",
"ANOMALY_SCORE" AS "anomaly_score",
CASE
WHEN "ANOMALY_SCORE" > 2.0 THEN true
ELSE false
END AS "is_anomaly"
FROM TABLE(
RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM "ticker", "price", "volume"
FROM "SOURCE_SQL_STREAM_001"),
100, -- numberOfTrees
256, -- subSampleSize
100000, -- timeDecay
1 -- shingleSize
)
);// 통계 기반 이상 탐지 (Z-Score)
-- Z-Score 기반 이상 탐지
-- 평균에서 3 표준편차 이상 벗어난 값을 이상으로 판단
CREATE OR REPLACE STREAM "ZSCORE_ANOMALY_STREAM" (
"ticker" VARCHAR(4),
"event_time" TIMESTAMP,
"price" DECIMAL(10,2),
"avg_price" DECIMAL(10,4),
"stddev" DECIMAL(10,4),
"z_score" DECIMAL(10,4),
"is_anomaly" BOOLEAN
);
CREATE OR REPLACE PUMP "ZSCORE_PUMP" AS
INSERT INTO "ZSCORE_ANOMALY_STREAM"
SELECT STREAM
"ticker",
ROWTIME AS "event_time",
"price",
AVG("price") OVER W AS "avg_price",
STDDEV_POP("price") OVER W AS "stddev",
CASE
WHEN STDDEV_POP("price") OVER W > 0
THEN ("price" - AVG("price") OVER W) / STDDEV_POP("price") OVER W
ELSE 0
END AS "z_score",
CASE
WHEN ABS(("price" - AVG("price") OVER W) /
NULLIF(STDDEV_POP("price") OVER W, 0)) > 3
THEN true
ELSE false
END AS "is_anomaly"
FROM "SOURCE_SQL_STREAM_001"
WINDOW W AS (
PARTITION BY "ticker"
ROWS 100 PRECEDING
);스트리밍 데이터를 S3의 참조 데이터와 조인하여 데이터를 보강할 수 있습니다.
-- S3 참조 데이터 테이블 정의
CREATE OR REPLACE TABLE "TICKER_INFO" (
"ticker" VARCHAR(4),
"company_name" VARCHAR(100),
"sector" VARCHAR(50),
"market_cap" BIGINT
)
WITH (
's3.bucket' = 'my-reference-data',
's3.path' = 'ticker_info.csv',
's3.format' = 'CSV'
);
-- 스트림과 참조 데이터 조인
CREATE OR REPLACE STREAM "ENRICHED_STREAM" (
"ticker" VARCHAR(4),
"company_name" VARCHAR(100),
"sector" VARCHAR(50),
"price" DECIMAL(10,2),
"volume" INTEGER,
"event_time" TIMESTAMP
);
CREATE OR REPLACE PUMP "ENRICH_PUMP" AS
INSERT INTO "ENRICHED_STREAM"
SELECT STREAM
s."ticker",
t."company_name",
t."sector",
s."price",
s."volume",
s.ROWTIME AS "event_time"
FROM "SOURCE_SQL_STREAM_001" s
LEFT JOIN "TICKER_INFO" t
ON s."ticker" = t."ticker";| 특성 | Kinesis Analytics SQL | Managed Flink |
|---|---|---|
| 언어 | SQL only | SQL, Java, Python, Scala |
| 복잡도 | 간단한 집계, 필터링 | 복잡한 CEP, 상태 관리 |
| ML 통합 | RANDOM_CUT_FOREST 내장 | 외부 모델 호출 |
| 상태 크기 | 제한적 | 대용량 상태 지원 |
| 권장 사용 | 프로토타이핑, 간단한 분석 | 프로덕션, 복잡한 처리 |
Amazon MSK는 Apache Kafka를 완전 관리형으로 제공합니다. 대규모 실시간 데이터 스트리밍에 적합하며, Kafka 생태계의 모든 도구와 호환됩니다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Amazon MSK Architecture │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ VPC ││
│ │ ││
│ │ ┌─────────────────────────────────────────────────────────────────┐ ││
│ │ │ MSK Cluster │ ││
│ │ │ │ ││
│ │ │ AZ-a AZ-b AZ-c │ ││
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ ││
│ │ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ ││
│ │ │ │ │ │ │ │ │ │ ││
│ │ │ │ Topic A │◄───▶│ Topic A │◄───▶│ Topic A │ │ ││
│ │ │ │ (Leader) │ │(Follower)│ │(Follower)│ │ ││
│ │ │ │ │ │ │ │ │ │ ││
│ │ │ │ Topic B │ │ Topic B │ │ Topic B │ │ ││
│ │ │ │(Follower)│ │ (Leader) │ │(Follower)│ │ ││
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ ││
│ │ │ │ │ │ │ ││
│ │ │ └────────────────┼────────────────┘ │ ││
│ │ │ │ │ ││
│ │ │ ▼ │ ││
│ │ │ ┌─────────────────────────────────────────────────────────┐ │ ││
│ │ │ │ Apache ZooKeeper (Managed) │ │ ││
│ │ │ │ • Cluster coordination │ │ ││
│ │ │ │ • Leader election │ │ ││
│ │ │ │ • Configuration management │ │ ││
│ │ │ └─────────────────────────────────────────────────────────┘ │ ││
│ │ └─────────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ││
│ │ │ Producers │ │ Consumers │ │ Kafka Connect │ ││
│ │ │ (EC2/ECS/ │ │ (Flink/ │ │ (S3/RDS/ │ ││
│ │ │ Lambda) │ │ Lambda) │ │ OpenSearch) │ ││
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ ││
│ └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘# MSK Cluster 생성
resource "aws_msk_cluster" "main" {
cluster_name = "data-platform-msk"
kafka_version = "3.5.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
client_subnets = var.private_subnet_ids
security_groups = [aws_security_group.msk.id]
storage_info {
ebs_storage_info {
volume_size = 1000 # GB
provisioned_throughput {
enabled = true
volume_throughput = 250 # MB/s
}
}
}
}
# 암호화 설정
encryption_info {
encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
encryption_in_transit {
client_broker = "TLS"
in_cluster = true
}
}
# 인증 설정
client_authentication {
sasl {
iam = true
scram = true
}
tls {
certificate_authority_arns = [aws_acmpca_certificate_authority.msk.arn]
}
}
# 모니터링 설정
open_monitoring {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.msk.name
}
s3 {
enabled = true
bucket = aws_s3_bucket.msk_logs.id
prefix = "logs/msk/"
}
}
}
configuration_info {
arn = aws_msk_configuration.main.arn
revision = aws_msk_configuration.main.latest_revision
}
tags = {
Environment = "production"
}
}
# MSK 구성
resource "aws_msk_configuration" "main" {
name = "data-platform-msk-config"
kafka_versions = ["3.5.1"]
server_properties = <<PROPERTIES
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.partitions=6
log.retention.hours=168
log.retention.bytes=1073741824
compression.type=lz4
message.max.bytes=10485760
PROPERTIES
}
# MSK Serverless
resource "aws_msk_serverless_cluster" "serverless" {
cluster_name = "data-platform-msk-serverless"
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.msk.id]
}
client_authentication {
sasl {
iam {
enabled = true
}
}
}
}// Producer - IAM 인증
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import socket
class MSKTokenProvider:
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('ap-northeast-2')
return token
def create_producer(bootstrap_servers):
return KafkaProducer(
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=MSKTokenProvider(),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # 모든 replica에 쓰기 완료 확인
retries=3,
compression_type='lz4',
batch_size=16384,
linger_ms=10,
buffer_memory=33554432
)
def send_events(producer, topic, events):
for event in events:
future = producer.send(
topic,
key=event.get('key'),
value=event,
headers=[('source', b'data-platform')]
)
# 동기 전송 (에러 확인)
try:
record_metadata = future.get(timeout=10)
print(f"Sent to {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")
except Exception as e:
print(f"Failed to send: {e}")
producer.flush()
# 사용 예시
if __name__ == "__main__":
bootstrap_servers = "b-1.msk-cluster.xxx.kafka.ap-northeast-2.amazonaws.com:9098"
producer = create_producer(bootstrap_servers)
events = [
{"key": "user-123", "event": "page_view", "page": "/products", "timestamp": "2024-01-15T10:30:00Z"},
{"key": "user-456", "event": "purchase", "amount": 99.99, "timestamp": "2024-01-15T10:31:00Z"}
]
send_events(producer, "user-events", events)// Consumer - Consumer Group
from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
def create_consumer(bootstrap_servers, group_id, topics):
return KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=MSKTokenProvider(),
group_id=group_id,
auto_offset_reset='earliest', # 처음부터 읽기
enable_auto_commit=False, # 수동 커밋
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
max_poll_records=500,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
def consume_events(consumer, process_func):
try:
while True:
# 메시지 폴링 (최대 1초 대기)
records = consumer.poll(timeout_ms=1000, max_records=100)
for topic_partition, messages in records.items():
for message in messages:
try:
# 메시지 처리
process_func(message.value)
print(f"Processed: {message.topic}:{message.partition}:{message.offset}")
except Exception as e:
print(f"Processing error: {e}")
# Dead Letter Queue로 전송 또는 재시도 로직
# 처리 완료 후 오프셋 커밋
if records:
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
def process_event(event):
"""이벤트 처리 로직"""
event_type = event.get('event')
if event_type == 'purchase':
# 구매 이벤트 처리
print(f"Purchase: {event.get('amount')}")
elif event_type == 'page_view':
# 페이지뷰 처리
print(f"Page view: {event.get('page')}")
# 사용 예시
if __name__ == "__main__":
bootstrap_servers = "b-1.msk-cluster.xxx.kafka.ap-northeast-2.amazonaws.com:9098"
consumer = create_consumer(
bootstrap_servers,
group_id="analytics-consumer-group",
topics=["user-events"]
)
consume_events(consumer, process_event)MSK Connect는 Kafka Connect를 관리형으로 제공하여 다양한 데이터 소스/싱크와 연동합니다.
// S3 Sink Connector 설정
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "user-events,orders",
"s3.bucket.name": "data-lake-raw",
"s3.region": "ap-northeast-2",
"s3.part.size": "5242880",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"partition.duration.ms": "3600000",
"locale": "en-US",
"timezone": "UTC",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"behavior.on.null.values": "ignore",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-s3-sink",
"errors.deadletterqueue.topic.replication.factor": "3"
}| 기준 | Kinesis Data Streams | Amazon MSK |
|---|---|---|
| 운영 복잡도 | 낮음 (완전 관리형) | 중간 (브로커 관리 필요) |
| 데이터 보존 | 최대 365일 | 무제한 (스토리지 기반) |
| 처리량 | 샤드당 1MB/s | 브로커당 수백 MB/s |
| 생태계 | AWS 서비스 통합 | Kafka 생태계 전체 |
| 권장 사용 | AWS 네이티브, 간단한 스트리밍 | Kafka 마이그레이션, 복잡한 처리 |
Watermark
지연 데이터 허용 범위 설정, Event Time 기반 윈도우에 필수
Exactly-once
Flink 체크포인트 + Kafka 트랜잭션 = 정확히 한 번 처리
Lambda vs Kappa
Lambda = 배치+스트림 분리, Kappa = 스트림 통합
Kinesis vs MSK
Kinesis = AWS 네이티브, MSK = Kafka 생태계
Flink vs Analytics SQL
Flink = 복잡한 처리, SQL = 간단한 집계/프로토타입
이상 탐지
RANDOM_CUT_FOREST (Analytics SQL) 또는 Flink ML
| 요구사항 | 권장 서비스 | 이유 |
|---|---|---|
| 간단한 실시간 집계 | Kinesis Analytics SQL | SQL만으로 빠른 구현 |
| 복잡한 CEP/상태 관리 | Managed Flink | 대용량 상태, 복잡한 로직 |
| Kafka 마이그레이션 | Amazon MSK | Kafka API 호환 |
| AWS 서비스 통합 | Kinesis Data Streams | Lambda, Firehose 연동 |
| 실시간 대시보드 | Flink → OpenSearch | 실시간 집계 + 시각화 |
| 시계열 분석 | Flink → Timestream | 시계열 최적화 스토리지 |
┌─────────────────────────────────────────────────────────────────────────────┐
│ Architecture Pattern Comparison │
│ │
│ Lambda Architecture │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Source → Batch Layer (Glue/EMR) → Serving Layer (Redshift) ││
│ │ └→ Speed Layer (Flink) → Serving Layer (OpenSearch) ││
│ │ ││
│ │ 장점: 정확성 + 실시간성 ││
│ │ 단점: 두 시스템 유지 관리 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Kappa Architecture │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Source → Stream Layer (Flink) → Serving Layer ││
│ │ ││
│ │ 장점: 단일 코드베이스, 간단한 운영 ││
│ │ 단점: 대규모 재처리 시 비용 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Hybrid (권장) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Source → Kinesis → Flink (실시간) → OpenSearch (대시보드) ││
│ │ └→ Firehose → S3 → Glue (배치) → Redshift (분석) ││
│ │ ││
│ │ 장점: 실시간 + 배치 분리, 비용 최적화 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘