AWS Data PlatformSession 2 of 10

데이터 수집 (Ingestion)

Kinesis, DMS, AppFlow, MSK, IoT Core를 활용한 데이터 수집 전략

약 45분
Intermediate

목차

  1. 데이터 수집의 기본 개념
  2. Amazon Kinesis 심층 분석
  3. AWS DMS - 데이터베이스 마이그레이션 및 CDC
  4. Amazon AppFlow - SaaS 데이터 통합
  5. Amazon MSK - 관리형 Kafka
  6. AWS IoT Core - IoT 데이터 수집
  7. 정리 및 다음 세션 예고

1. 데이터 수집의 기본 개념

"데이터 파이프라인의 첫 번째 단계인 수집(Ingestion)이 실패하면, 그 이후의 모든 분석과 인사이트는 무의미해진다."

— Data Engineering Best Practices

1.1 데이터 수집이란?

데이터 수집(Data Ingestion)은 다양한 소스에서 데이터를 추출하여 데이터 플랫폼으로 가져오는 과정입니다. 이는 데이터 파이프라인의 첫 번째이자 가장 중요한 단계로, 데이터의 품질과 적시성을 결정합니다.

데이터 수집 파이프라인 개요
┌─────────────────────────────────────────────────────────────────────┐
│                         데이터 소스                                   │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────────┤
│  운영 DB    │   로그      │   SaaS      │    IoT      │    API      │
│ (MySQL,     │ (서버,      │ (Salesforce │  (센서,     │ (REST,      │
│  PostgreSQL)│  앱 로그)   │  Zendesk)   │  디바이스)  │  Webhook)   │
└──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┘
       │             │             │             │             │
       ▼             ▼             ▼             ▼             ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      수집 계층 (Ingestion Layer)                     │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐       │
│  │   DMS   │ │ Kinesis │ │ AppFlow │ │   IoT   │ │ Lambda  │       │
│  │  (CDC)  │ │Firehose │ │         │ │  Core   │ │         │       │
│  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘       │
└───────┼───────────┼───────────┼───────────┼───────────┼─────────────┘
        │           │           │           │           │
        └───────────┴───────────┴─────┬─────┴───────────┘
                                      │
                                      ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    S3 Data Lake (Landing Zone)                       │
│  s3://data-lake/raw/                                                │
│  ├── database/     (DMS에서 수집)                                   │
│  ├── logs/         (Firehose에서 수집)                              │
│  ├── saas/         (AppFlow에서 수집)                               │
│  └── iot/          (IoT Core에서 수집)                              │
└─────────────────────────────────────────────────────────────────────┘

1.2 배치 vs 실시간 수집

배치 수집 (Batch Ingestion)

일정 주기로 데이터를 한꺼번에 수집합니다. 대용량 데이터 처리에 효율적이며, 비용이 저렴합니다.

주기: 시간/일/주 단위
지연: 분~시간 단위
사용 사례: 일일 보고서, 데이터 웨어하우스 적재
AWS 서비스: DMS, Glue, Step Functions

실시간 수집 (Real-time Ingestion)

데이터가 발생하는 즉시 수집합니다. 즉각적인 분석과 대응이 필요한 경우에 사용합니다.

주기: 연속적 (Continuous)
지연: 밀리초~초 단위
사용 사례: 실시간 대시보드, 이상 탐지
AWS 서비스: Kinesis, MSK, IoT Core

1.3 수집 패턴 선택 기준

고려 사항배치 선택실시간 선택
데이터 신선도 요구시간/일 단위 지연 허용초 단위 신선도 필요
데이터 볼륨대용량 (GB~TB/배치)소량 연속 (KB~MB/초)
비용 민감도비용 최적화 우선성능 우선, 비용 수용
소스 시스템DB 풀 덤프, 파일 전송이벤트 스트림, CDC
처리 복잡도복잡한 변환 가능단순 변환 권장

💡 실무 권장: Lambda Architecture

대부분의 실무 환경에서는 배치와 실시간을 함께 사용하는 Lambda Architecture를 채택합니다. 실시간 레이어로 즉각적인 인사이트를 제공하고, 배치 레이어로 정확성을 보완합니다.

1.4 데이터 수집의 핵심 고려사항

신뢰성 (Reliability)

  • • At-least-once 전송 보장
  • • 재시도 및 DLQ 처리
  • • 체크포인트/북마크

확장성 (Scalability)

  • • 수평적 확장 가능
  • • 트래픽 급증 대응
  • • 자동 스케일링

모니터링 (Monitoring)

  • • 수집 지연 시간 추적
  • • 에러율 모니터링
  • • 데이터 품질 검증

스키마 관리

  • • 스키마 진화 대응
  • • 하위 호환성 유지
  • • Schema Registry 활용

보안 (Security)

  • • 전송 중 암호화 (TLS)
  • • 저장 시 암호화 (KMS)
  • • IAM 기반 접근 제어

비용 최적화

  • • 압축 전송
  • • 배치 크기 최적화
  • • 적절한 서비스 선택

2. Amazon Kinesis 심층 분석

Amazon Kinesis는 AWS의 실시간 데이터 스트리밍 플랫폼입니다. Kinesis Data Streams, Kinesis Data Firehose, Kinesis Data Analytics, Kinesis Video Streams 네 가지 서비스로 구성됩니다.

2.1 Kinesis Data Streams (KDS)

Kinesis Data Streams 아키텍처

KDS는 대규모 실시간 데이터 스트리밍을 위한 서비스입니다. 샤드(Shard) 기반으로 확장되며, 여러 컨슈머가 동시에 데이터를 읽을 수 있습니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    Kinesis Data Streams                              │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                         Stream                                   ││
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐               ││
│  │  │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │ Shard N │               ││
│  │  │ 1MB/s   │ │ 1MB/s   │ │ 1MB/s   │ │ 1MB/s   │  ← 쓰기 용량  ││
│  │  │ 2MB/s   │ │ 2MB/s   │ │ 2MB/s   │ │ 2MB/s   │  ← 읽기 용량  ││
│  │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘               ││
│  │       │          │          │          │                        ││
│  │       └──────────┴──────────┴──────────┘                        ││
│  │                         │                                        ││
│  └─────────────────────────┼────────────────────────────────────────┘│
└────────────────────────────┼────────────────────────────────────────┘
                             │
           ┌─────────────────┼─────────────────┐
           ▼                 ▼                 ▼
    ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
    │   Lambda    │   │  Firehose   │   │    KDA      │
    │ (Consumer)  │   │ (Consumer)  │   │ (Consumer)  │
    └─────────────┘   └─────────────┘   └─────────────┘

샤드 용량

  • 쓰기: 1MB/초 또는 1,000 레코드/초
  • 읽기: 2MB/초 (샤드당)
  • 레코드 크기: 최대 1MB
  • 보존 기간: 24시간 ~ 365일

용량 모드

  • Provisioned: 샤드 수 직접 지정
  • On-demand: 자동 스케일링 (권장)
  • • On-demand는 최대 200MB/초까지 자동 확장
KDS Producer 구현

// Python - Kinesis Producer 예시

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')

def send_event(stream_name: str, event: dict):
    """단일 이벤트 전송"""
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event).encode('utf-8'),
        PartitionKey=event['user_id']  # 파티션 키로 샤드 결정
    )
    return response['SequenceNumber']

def send_batch(stream_name: str, events: list):
    """배치 이벤트 전송 (최대 500개, 5MB)"""
    records = [
        {
            'Data': json.dumps(event).encode('utf-8'),
            'PartitionKey': event['user_id']
        }
        for event in events
    ]
    
    response = kinesis.put_records(
        StreamName=stream_name,
        Records=records
    )
    
    # 실패한 레코드 재시도
    if response['FailedRecordCount'] > 0:
        failed = [
            records[i] for i, r in enumerate(response['Records'])
            if 'ErrorCode' in r
        ]
        # 재시도 로직...
    
    return response

# 사용 예시
event = {
    'user_id': 'user-123',
    'event_type': 'page_view',
    'page': '/products/123',
    'timestamp': datetime.utcnow().isoformat()
}
send_event('clickstream-events', event)

⚠️ 파티션 키 설계 주의사항

  • • 파티션 키가 균등하게 분포되어야 샤드 간 부하가 균형
  • • 특정 키에 트래픽 집중 시 "Hot Shard" 문제 발생
  • • 권장: user_id, device_id 등 카디널리티 높은 값 사용
  • • 비권장: 날짜, 지역 등 카디널리티 낮은 값

2.2 Kinesis Data Firehose (KDF)

Kinesis Data Firehose - 완전 관리형 전송

Firehose는 스트리밍 데이터를 S3, Redshift, OpenSearch, HTTP 엔드포인트로 자동 전송하는 완전 관리형 서비스입니다. 인프라 관리 없이 사용할 수 있습니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    Kinesis Data Firehose                             │
│                                                                      │
│  ┌─────────┐    ┌─────────────┐    ┌─────────────┐    ┌──────────┐ │
│  │ Source  │───▶│   Buffer    │───▶│  Transform  │───▶│  Target  │ │
│  │         │    │ (크기/시간) │    │  (Lambda)   │    │          │ │
│  └─────────┘    └─────────────┘    └─────────────┘    └──────────┘ │
│                                                                      │
│  Sources:                          Targets:                          │
│  • Direct PUT                      • S3                              │
│  • Kinesis Data Streams            • Redshift (via S3)               │
│  • MSK                             • OpenSearch                      │
│  • CloudWatch Logs                 • Splunk                          │
│                                    • HTTP Endpoint                   │
└─────────────────────────────────────────────────────────────────────┘

버퍼 설정

  • 크기: 1MB ~ 128MB
  • 시간: 60초 ~ 900초
  • • 둘 중 먼저 도달하는 조건에서 전송
  • • S3 파일 크기 = 버퍼 크기

데이터 변환

  • • Lambda 함수로 변환 가능
  • • JSON → Parquet 변환 내장
  • • 압축: GZIP, Snappy, ZIP
  • • 동적 파티셔닝 지원

// Terraform - Firehose 설정 예시

resource "aws_kinesis_firehose_delivery_stream" "clickstream" {
  name        = "clickstream-to-s3"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.firehose.arn
    bucket_arn = aws_s3_bucket.data_lake.arn
    
    # 버퍼 설정
    buffering_size     = 64  # MB
    buffering_interval = 60  # seconds
    
    # Parquet 변환
    data_format_conversion_configuration {
      input_format_configuration {
        deserializer {
          open_x_json_ser_de {}
        }
      }
      output_format_configuration {
        serializer {
          parquet_ser_de {
            compression = "SNAPPY"
          }
        }
      }
      schema_configuration {
        database_name = aws_glue_catalog_database.analytics.name
        table_name    = aws_glue_catalog_table.clickstream.name
        role_arn      = aws_iam_role.firehose.arn
      }
    }
    
    # 동적 파티셔닝
    dynamic_partitioning_configuration {
      enabled = true
    }
    prefix = "raw/clickstream/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    error_output_prefix = "errors/clickstream/"
  }
}

2.3 KDS vs KDF 선택 가이드

특성Kinesis Data StreamsKinesis Data Firehose
관리 수준샤드 관리 필요완전 관리형
지연 시간~200ms (실시간)60초~15분 (Near real-time)
다중 컨슈머단일 대상
데이터 재처리 (보존 기간 내)
변환별도 처리 필요Lambda, Parquet 변환 내장
비용샤드-시간 + PUT수집 데이터량
적합한 사용 사례실시간 분석, 복잡한 처리단순 S3 적재, 로그 수집

💡 실무 권장 패턴

KDS → KDF 조합: KDS로 실시간 처리(Lambda, KDA)를 수행하면서, 동시에 KDF를 컨슈머로 연결하여 S3에 장기 보관합니다. 실시간 분석과 배치 분석을 모두 지원하는 Lambda Architecture 구현에 적합합니다.

3. AWS DMS - 데이터베이스 마이그레이션 및 CDC

AWS Database Migration Service(DMS)는 데이터베이스를 AWS로 마이그레이션하거나, 운영 DB의 변경 사항을 실시간으로 데이터 레이크에 복제하는 CDC(Change Data Capture) 기능을 제공합니다.

3.1 DMS 아키텍처

DMS 구성 요소
┌─────────────────────────────────────────────────────────────────────┐
│                    AWS DMS Architecture                              │
│                                                                      │
│  ┌──────────────┐                              ┌──────────────┐     │
│  │   Source     │                              │    Target    │     │
│  │  Endpoint    │                              │   Endpoint   │     │
│  │              │                              │              │     │
│  │ • MySQL      │    ┌──────────────────┐     │ • S3         │     │
│  │ • PostgreSQL │───▶│  Replication     │────▶│ • Redshift   │     │
│  │ • Oracle     │    │    Instance      │     │ • Aurora     │     │
│  │ • SQL Server │    │                  │     │ • Kinesis    │     │
│  │ • MongoDB    │    │ ┌──────────────┐ │     │ • DynamoDB   │     │
│  └──────────────┘    │ │ Replication  │ │     └──────────────┘     │
│                      │ │    Task      │ │                          │
│                      │ └──────────────┘ │                          │
│                      └──────────────────┘                          │
│                                                                      │
│  마이그레이션 유형:                                                  │
│  1. Full Load: 전체 데이터 일괄 복사                                │
│  2. CDC Only: 변경 사항만 지속적 복제                               │
│  3. Full Load + CDC: 초기 로드 후 CDC 지속                          │
└─────────────────────────────────────────────────────────────────────┘

Source Endpoint

원본 데이터베이스 연결 정보. 읽기 권한과 CDC를 위한 로그 접근 권한 필요.

Replication Instance

데이터 복제를 수행하는 EC2 인스턴스. 크기에 따라 처리량 결정.

Target Endpoint

대상 저장소 연결 정보. S3, Redshift, Kinesis 등 다양한 대상 지원.

3.2 CDC (Change Data Capture) 이해

CDC 동작 원리

CDC는 데이터베이스의 트랜잭션 로그(binlog, WAL 등)를 읽어 변경 사항을 실시간으로 캡처합니다. 전체 테이블을 스캔하지 않아 소스 DB 부하가 최소화됩니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    CDC 데이터 흐름                                   │
│                                                                      │
│  Source DB                                                           │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Transaction Log (binlog/WAL)                                   ││
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐               ││
│  │  │ INSERT  │ │ UPDATE  │ │ DELETE  │ │ INSERT  │  ...          ││
│  │  │ user_1  │ │ user_2  │ │ user_3  │ │ order_1 │               ││
│  │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘               ││
│  └───────┼──────────┼──────────┼──────────┼────────────────────────┘│
│          │          │          │          │                          │
│          └──────────┴──────────┴──────────┘                          │
│                         │                                            │
│                         ▼                                            │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    DMS CDC Reader                                ││
│  │  • Log Position 추적 (Checkpoint)                               ││
│  │  • 변경 이벤트 파싱                                              ││
│  │  • 배치 처리 및 전송                                             ││
│  └─────────────────────────────────────────────────────────────────┘│
│                         │                                            │
│                         ▼                                            │
│  Target (S3)                                                         │
│  s3://data-lake/cdc/orders/                                         │
│  ├── LOAD00000001.parquet  (Full Load)                              │
│  ├── 2024/01/15/cdc-00001.parquet  (CDC)                            │
│  └── 2024/01/15/cdc-00002.parquet  (CDC)                            │
└─────────────────────────────────────────────────────────────────────┘

CDC 레코드 구조

{
  "Op": "U",           // I=Insert, U=Update, D=Delete
  "before": {          // UPDATE/DELETE 시 이전 값
    "id": 123,
    "name": "Old Name",
    "updated_at": "2024-01-15T10:00:00Z"
  },
  "after": {           // INSERT/UPDATE 시 새 값
    "id": 123,
    "name": "New Name",
    "updated_at": "2024-01-15T10:30:00Z"
  },
  "source": {
    "schema": "public",
    "table": "users",
    "txId": 12345,
    "lsn": "0/1234567"
  },
  "ts_ms": 1705312200000
}

3.3 DMS to S3 설정

// Terraform - DMS to S3 CDC 설정

# Replication Instance
resource "aws_dms_replication_instance" "main" {
  replication_instance_id     = "dms-cdc-instance"
  replication_instance_class  = "dms.r5.large"
  allocated_storage          = 100
  vpc_security_group_ids     = [aws_security_group.dms.id]
  replication_subnet_group_id = aws_dms_replication_subnet_group.main.id
  
  multi_az = true  # 프로덕션 권장
}

# Source Endpoint (MySQL)
resource "aws_dms_endpoint" "source" {
  endpoint_id   = "mysql-source"
  endpoint_type = "source"
  engine_name   = "mysql"
  
  server_name   = var.mysql_host
  port          = 3306
  database_name = "production"
  username      = var.mysql_user
  password      = var.mysql_password
  
  # CDC 설정
  extra_connection_attributes = "parallelLoadThreads=1"
}

# Target Endpoint (S3)
resource "aws_dms_endpoint" "target" {
  endpoint_id   = "s3-target"
  endpoint_type = "target"
  engine_name   = "s3"
  
  s3_settings {
    bucket_name             = aws_s3_bucket.data_lake.id
    bucket_folder           = "cdc"
    service_access_role_arn = aws_iam_role.dms_s3.arn
    
    # Parquet 형식으로 저장
    data_format                = "parquet"
    parquet_version           = "parquet-2-0"
    parquet_timestamp_in_millisecond = true
    
    # 파티셔닝
    timestamp_column_name     = "updated_at"
    date_partition_enabled    = true
    date_partition_sequence   = "YYYYMMDD"
    
    # CDC 설정
    cdc_path                  = "cdc"
    cdc_max_batch_interval    = 60  # seconds
    cdc_min_file_size         = 32000  # KB
    
    # 압축
    compression_type          = "GZIP"
  }
}

# Replication Task
resource "aws_dms_replication_task" "cdc" {
  replication_task_id      = "mysql-to-s3-cdc"
  migration_type           = "full-load-and-cdc"
  replication_instance_arn = aws_dms_replication_instance.main.replication_instance_arn
  source_endpoint_arn      = aws_dms_endpoint.source.endpoint_arn
  target_endpoint_arn      = aws_dms_endpoint.target.endpoint_arn
  
  table_mappings = jsonencode({
    rules = [
      {
        rule-type = "selection"
        rule-id   = "1"
        rule-name = "include-all-tables"
        object-locator = {
          schema-name = "production"
          table-name  = "%"
        }
        rule-action = "include"
      }
    ]
  })
  
  replication_task_settings = jsonencode({
    TargetMetadata = {
      BatchApplyEnabled = true
    }
    FullLoadSettings = {
      TargetTablePrepMode = "DROP_AND_CREATE"
    }
    Logging = {
      EnableLogging = true
    }
  })
}

3.4 CDC 데이터 처리 패턴

S3에 적재된 CDC 데이터는 최신 상태를 반영하기 위해 추가 처리가 필요합니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    CDC 데이터 처리 파이프라인                         │
│                                                                      │
│  S3 Raw (CDC Files)                                                  │
│  ├── LOAD00000001.parquet  ─┐                                       │
│  ├── cdc-00001.parquet      │                                       │
│  └── cdc-00002.parquet      │                                       │
│                             │                                        │
│                             ▼                                        │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    Glue Job (Merge)                              ││
│  │                                                                  ││
│  │  1. Full Load 데이터 읽기                                        ││
│  │  2. CDC 파일들 시간순 정렬                                       ││
│  │  3. MERGE 연산 (UPSERT/DELETE 적용)                              ││
│  │  4. 최신 상태 스냅샷 생성                                        ││
│  └─────────────────────────────────────────────────────────────────┘│
│                             │                                        │
│                             ▼                                        │
│  S3 Curated (Current State)                                         │
│  └── users/                                                          │
│      └── snapshot.parquet  ← 최신 상태                              │
└─────────────────────────────────────────────────────────────────────┘

// PySpark - CDC Merge 처리

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("CDC-Merge").getOrCreate()

# 1. Full Load 데이터 읽기
full_load_df = spark.read.parquet("s3://data-lake/cdc/users/LOAD*.parquet")

# 2. CDC 파일 읽기
cdc_df = spark.read.parquet("s3://data-lake/cdc/users/cdc-*.parquet")

# 3. CDC에서 최신 레코드만 추출 (같은 PK에 대해)
window = Window.partitionBy("id").orderBy(col("ts_ms").desc())
latest_cdc = cdc_df.withColumn("rn", row_number().over(window)) \
                   .filter(col("rn") == 1) \
                   .drop("rn")

# 4. DELETE 레코드 분리
deletes = latest_cdc.filter(col("Op") == "D").select("id")
upserts = latest_cdc.filter(col("Op") != "D")

# 5. Merge 수행
# Full Load에서 CDC에 있는 레코드 제외
merged = full_load_df.join(
    latest_cdc.select("id"), 
    on="id", 
    how="left_anti"
)

# CDC의 INSERT/UPDATE 추가
merged = merged.union(
    upserts.select(full_load_df.columns)
)

# DELETE 제외
final_df = merged.join(deletes, on="id", how="left_anti")

# 6. 결과 저장
final_df.write.mode("overwrite").parquet(
    "s3://data-lake/curated/users/"
)

3.5 DMS 모니터링 및 트러블슈팅

주요 CloudWatch 메트릭

  • CDCLatencySource: 소스 지연 시간
  • CDCLatencyTarget: 타겟 지연 시간
  • CDCIncomingChanges: 대기 중인 변경
  • FullLoadThroughputRows: 로드 처리량

일반적인 문제

  • LOB 컬럼: Limited LOB mode 사용 권장
  • binlog 만료: 보존 기간 충분히 설정
  • 네트워크: VPC 피어링/엔드포인트 확인

4. Amazon AppFlow - SaaS 데이터 통합

Amazon AppFlow는 SaaS 애플리케이션과 AWS 서비스 간의 데이터 전송을 코드 없이 설정할 수 있는 완전 관리형 통합 서비스입니다.

4.1 AppFlow 개요

AppFlow 아키텍처
┌─────────────────────────────────────────────────────────────────────┐
│                      Amazon AppFlow                                  │
│                                                                      │
│  SaaS Sources                          AWS Destinations              │
│  ┌─────────────┐                       ┌─────────────┐              │
│  │ Salesforce  │──┐                 ┌──│     S3      │              │
│  ├─────────────┤  │                 │  ├─────────────┤              │
│  │   Zendesk   │──┤                 │  │  Redshift   │              │
│  ├─────────────┤  │  ┌───────────┐  │  ├─────────────┤              │
│  │    Slack    │──┼─▶│   Flow    │──┼─▶│ EventBridge │              │
│  ├─────────────┤  │  │           │  │  ├─────────────┤              │
│  │  ServiceNow │──┤  │ Transform │  │  │  Snowflake  │              │
│  ├─────────────┤  │  │  Filter   │  │  ├─────────────┤              │
│  │   HubSpot   │──┤  │  Mapping  │  │  │   Upsolver  │              │
│  ├─────────────┤  │  └───────────┘  │  └─────────────┘              │
│  │   Google    │──┘                 │                                │
│  │  Analytics  │                    │  Bi-directional:               │
│  └─────────────┘                    │  • Salesforce                  │
│                                     │  • Zendesk                     │
│  50+ Connectors                     │  • ServiceNow                  │
└─────────────────────────────────────────────────────────────────────┘

트리거 유형

  • On-demand: 수동 실행
  • Scheduled: 분/시간/일 단위
  • Event: 소스 이벤트 기반

데이터 변환

  • • 필드 매핑 및 이름 변경
  • • 필터링 및 검증
  • • 마스킹 및 잘라내기

보안

  • • 전송 중 암호화
  • • KMS 암호화
  • • PrivateLink 지원

4.2 주요 커넥터

카테고리커넥터주요 사용 사례
CRMSalesforce, HubSpot, Zoho고객 데이터, 영업 파이프라인 분석
마케팅Google Analytics, Marketo, Mailchimp마케팅 성과 분석, 캠페인 ROI
고객 지원Zendesk, ServiceNow, Freshdesk티켓 분석, 고객 만족도
협업Slack, Asana, Jira팀 생산성, 프로젝트 추적
재무SAP, NetSuite, Stripe재무 보고, 결제 분석

4.3 AppFlow 설정 예시

Salesforce → S3 Flow 구성

// Terraform - AppFlow 설정

# Salesforce Connection
resource "aws_appflow_connector_profile" "salesforce" {
  connector_profile_name = "salesforce-production"
  connector_type         = "Salesforce"
  connection_mode        = "Public"

  connector_profile_config {
    connector_profile_credentials {
      salesforce {
        access_token  = var.sf_access_token
        refresh_token = var.sf_refresh_token
      }
    }
    connector_profile_properties {
      salesforce {
        instance_url         = "https://mycompany.salesforce.com"
        is_sandbox_environment = false
      }
    }
  }
}

# AppFlow Flow
resource "aws_appflow_flow" "salesforce_to_s3" {
  name = "salesforce-opportunities-to-s3"

  # 트리거 설정 (매일 자정)
  trigger_config {
    trigger_type = "Scheduled"
    trigger_properties {
      scheduled {
        schedule_expression = "rate(1 day)"
        data_pull_mode      = "Incremental"
        schedule_start_time = "2024-01-01T00:00:00Z"
      }
    }
  }

  # 소스 설정
  source_flow_config {
    connector_type         = "Salesforce"
    connector_profile_name = aws_appflow_connector_profile.salesforce.name
    source_connector_properties {
      salesforce {
        object                      = "Opportunity"
        enable_dynamic_field_update = true
        include_deleted_records     = false
      }
    }
    incremental_pull_config {
      datetime_type_field_name = "LastModifiedDate"
    }
  }

  # 대상 설정
  destination_flow_config {
    connector_type = "S3"
    destination_connector_properties {
      s3 {
        bucket_name = aws_s3_bucket.data_lake.id
        bucket_prefix = "raw/salesforce/opportunities"
        
        s3_output_format_config {
          file_type = "PARQUET"
          prefix_config {
            prefix_type   = "PATH_AND_FILENAME"
            prefix_format = "DAY"
          }
          aggregation_config {
            aggregation_type = "SingleFile"
          }
        }
      }
    }
  }

  # 필드 매핑
  task {
    task_type     = "Map_all"
    source_fields = []
    task_properties = {
      "EXCLUDE_SOURCE_FIELDS_LIST" = "[]"
    }
    connector_operator {
      salesforce = "NO_OP"
    }
  }

  # 필터링 (금액 > 10000)
  task {
    task_type     = "Filter"
    source_fields = ["Amount"]
    task_properties = {
      "DATA_TYPE"    = "double"
      "LOWER_BOUND"  = "10000"
      "FILTER_CONDITION" = "GREATER_THAN"
    }
    connector_operator {
      salesforce = "GREATER_THAN"
    }
  }

  # PII 마스킹
  task {
    task_type     = "Mask"
    source_fields = ["Email"]
    destination_field = "Email"
    task_properties = {
      "MASK_VALUE" = "***"
      "MASK_LENGTH" = "3"
    }
    connector_operator {
      salesforce = "MASK_ALL"
    }
  }
}

4.4 AppFlow vs 대안 비교

특성AppFlowFivetran/AirbyteCustom (Lambda)
관리완전 관리형완전 관리형직접 관리
커넥터 수50+300+무제한
비용Flow 실행당MAR 기반실행 시간
커스터마이징제한적중간완전 자유
AWS 통합 네이티브연동 필요 네이티브

💡 선택 가이드

  • AppFlow: AWS 네이티브 통합, 간단한 SaaS 연동, 비용 최적화
  • Fivetran/Airbyte: 다양한 소스, 복잡한 스키마 변경 추적
  • Custom: 특수한 API, 복잡한 비즈니스 로직

5. Amazon MSK - 관리형 Kafka

Amazon Managed Streaming for Apache Kafka(MSK)는 완전 관리형 Apache Kafka 서비스입니다. Kafka의 강력한 기능을 인프라 관리 부담 없이 사용할 수 있습니다.

5.1 MSK vs Kinesis 비교

특성Amazon MSKKinesis Data Streams
프로토콜Apache Kafka (오픈소스)AWS 독점 API
관리 수준브로커 관리 필요완전 서버리스
확장성브로커/파티션 수동 추가On-demand 자동 확장
보존 기간무제한 (스토리지 기반)최대 365일
메시지 크기기본 1MB, 최대 10MB최대 1MB
Consumer Group네이티브 지원Enhanced Fan-out
에코시스템Kafka Connect, Schema RegistryAWS 서비스 통합
비용 모델브로커 시간 + 스토리지샤드 시간 + PUT

MSK 선택

  • • 기존 Kafka 워크로드 마이그레이션
  • • Kafka Connect 에코시스템 활용
  • • 멀티 클라우드/하이브리드 환경
  • • 장기 데이터 보존 필요

Kinesis 선택

  • • AWS 네이티브 통합 우선
  • • 서버리스 운영 선호
  • • 트래픽 변동이 큰 워크로드
  • • 빠른 시작, 간단한 설정

5.2 MSK 아키텍처

MSK 클러스터 구성
┌─────────────────────────────────────────────────────────────────────┐
│                        Amazon MSK Cluster                            │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                      VPC (Private Subnets)                       ││
│  │                                                                  ││
│  │  AZ-a              AZ-b              AZ-c                        ││
│  │  ┌─────────┐      ┌─────────┐      ┌─────────┐                  ││
│  │  │Broker 1 │      │Broker 2 │      │Broker 3 │                  ││
│  │  │         │      │         │      │         │                  ││
│  │  │ Topic A │◀────▶│ Topic A │◀────▶│ Topic A │  Replication    ││
│  │  │ Part 0  │      │ Part 1  │      │ Part 2  │                  ││
│  │  │ (Leader)│      │(Replica)│      │(Replica)│                  ││
│  │  └────┬────┘      └────┬────┘      └────┬────┘                  ││
│  │       │               │               │                          ││
│  │       └───────────────┼───────────────┘                          ││
│  │                       │                                          ││
│  │  ┌────────────────────┴────────────────────┐                    ││
│  │  │              ZooKeeper / KRaft           │                    ││
│  │  │         (Cluster Coordination)           │                    ││
│  │  └─────────────────────────────────────────┘                    ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  Storage Options:                                                    │
│  • EBS: 범용 SSD (gp3), 프로비저닝 IOPS (io1)                       │
│  • Tiered Storage: S3로 콜드 데이터 자동 이동                       │
└─────────────────────────────────────────────────────────────────────┘

MSK Provisioned

  • • 브로커 인스턴스 타입 선택
  • • kafka.m5.large ~ kafka.m5.24xlarge
  • • 예측 가능한 워크로드에 적합
  • • 세밀한 성능 튜닝 가능

MSK Serverless

  • • 용량 자동 관리
  • • 사용량 기반 과금
  • • 변동성 높은 워크로드에 적합
  • • 빠른 시작, 간편한 운영

5.3 MSK Connect

MSK Connect - 관리형 Kafka Connect

MSK Connect는 Kafka Connect를 완전 관리형으로 제공합니다. 다양한 소스/싱크 커넥터를 사용하여 데이터 파이프라인을 구축할 수 있습니다.

┌─────────────────────────────────────────────────────────────────────┐
│                      MSK Connect Architecture                        │
│                                                                      │
│  Source Connectors              Sink Connectors                      │
│  ┌─────────────┐               ┌─────────────┐                      │
│  │ Debezium    │──┐         ┌──│ S3 Sink     │                      │
│  │ (CDC)       │  │         │  └─────────────┘                      │
│  ├─────────────┤  │         │  ┌─────────────┐                      │
│  │ JDBC Source │  │  MSK    │  │ OpenSearch  │                      │
│  │             │──┼─────────┼──│ Sink        │                      │
│  ├─────────────┤  │ Cluster │  └─────────────┘                      │
│  │ MongoDB     │  │         │  ┌─────────────┐                      │
│  │ Source      │──┘         └──│ Snowflake   │                      │
│  └─────────────┘               │ Sink        │                      │
│                                └─────────────┘                      │
│                                                                      │
│  Popular Connectors:                                                 │
│  • Debezium: MySQL, PostgreSQL, MongoDB CDC                         │
│  • Confluent S3 Sink: Parquet, Avro, JSON to S3                     │
│  • JDBC: 범용 데이터베이스 연결                                      │
│  • OpenSearch Sink: 검색/분석 인덱싱                                │
└─────────────────────────────────────────────────────────────────────┘

// MSK Connect - S3 Sink Connector 설정

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "4",
  "topics": "clickstream-events",
  
  "s3.region": "ap-northeast-2",
  "s3.bucket.name": "data-lake-bucket",
  "s3.part.size": "5242880",
  
  "flush.size": "10000",
  "rotate.interval.ms": "60000",
  
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
  "partition.duration.ms": "3600000",
  "locale": "en-US",
  "timezone": "UTC",
  
  "topics.dir": "raw/kafka",
  
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

5.4 MSK 모니터링

브로커 메트릭

  • CPUUser: CPU 사용률
  • KafkaDataLogsDiskUsed: 디스크 사용량
  • MemoryUsed: 메모리 사용량
  • NetworkRxDropped: 네트워크 드롭

토픽/파티션 메트릭

  • MessagesInPerSec: 초당 메시지 수
  • BytesInPerSec: 초당 바이트
  • UnderReplicatedPartitions: 복제 지연
  • OfflinePartitionsCount: 오프라인 파티션

⚠️ 주요 알람 설정 권장

  • 디스크 사용률 > 80%: 스토리지 확장 필요
  • UnderReplicatedPartitions > 0: 복제 문제 확인
  • Consumer Lag 급증: 컨슈머 처리 지연
  • ActiveControllerCount ≠ 1: 컨트롤러 문제

6. AWS IoT Core - IoT 데이터 수집

AWS IoT Core는 수십억 개의 IoT 디바이스를 연결하고 데이터를 수집하는 완전 관리형 서비스입니다. MQTT, HTTPS, WebSocket 프로토콜을 지원합니다.

6.1 IoT Core 아키텍처

IoT 데이터 수집 파이프라인
┌─────────────────────────────────────────────────────────────────────┐
│                    AWS IoT Core Architecture                         │
│                                                                      │
│  IoT Devices                                                         │
│  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                                   │
│  │Sensor│ │Sensor│ │Sensor│ │Sensor│  ... (수백만 디바이스)          │
│  └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘                                   │
│     │       │       │       │                                        │
│     └───────┴───────┴───────┘                                        │
│                 │                                                    │
│                 ▼  MQTT / HTTPS                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    IoT Core                                      ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             ││
│  │  │   Device    │  │   Message   │  │    Rules    │             ││
│  │  │   Gateway   │─▶│   Broker    │─▶│   Engine    │             ││
│  │  │             │  │   (MQTT)    │  │             │             ││
│  │  └─────────────┘  └─────────────┘  └──────┬──────┘             ││
│  └───────────────────────────────────────────┼──────────────────────┘│
│                                              │                       │
│              ┌───────────────────────────────┼───────────────────┐   │
│              │                               │                   │   │
│              ▼                               ▼                   ▼   │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐     │
│  │    Kinesis      │  │      S3         │  │    Lambda       │     │
│  │   Data Streams  │  │   (Direct)      │  │  (Processing)   │     │
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘     │
│           │                    │                    │               │
│           └────────────────────┴────────────────────┘               │
│                                │                                     │
│                                ▼                                     │
│                    ┌─────────────────────┐                          │
│                    │   S3 Data Lake      │                          │
│                    │   (Analytics)       │                          │
│                    └─────────────────────┘                          │
└─────────────────────────────────────────────────────────────────────┘

Device Gateway

디바이스 연결 관리. MQTT, HTTPS, WebSocket 지원. 자동 확장.

Message Broker

Pub/Sub 메시징. QoS 0/1 지원. 토픽 기반 라우팅.

Rules Engine

SQL 기반 필터링. 다양한 AWS 서비스로 라우팅.

6.2 IoT Rules Engine

Rules Engine은 SQL 문법으로 IoT 메시지를 필터링하고 변환하여 다양한 AWS 서비스로 라우팅합니다.

// IoT Rule - 온도 센서 데이터를 Kinesis로 전송

-- Rule SQL
SELECT 
  device_id,
  temperature,
  humidity,
  timestamp() as ingestion_time,
  topic(2) as location
FROM 'sensors/+/temperature'
WHERE temperature > 0 AND temperature < 100

-- 결과 예시
{
  "device_id": "sensor-001",
  "temperature": 25.5,
  "humidity": 60,
  "ingestion_time": "2024-01-15T10:30:00Z",
  "location": "factory-a"
}

// Terraform - IoT Rule 설정

resource "aws_iot_topic_rule" "sensor_to_kinesis" {
  name        = "sensor_data_to_kinesis"
  description = "Route sensor data to Kinesis"
  enabled     = true
  sql         = <<-SQL
    SELECT 
      device_id,
      temperature,
      humidity,
      timestamp() as ingestion_time
    FROM 'sensors/+/temperature'
    WHERE temperature > 0
  SQL
  sql_version = "2016-03-23"

  kinesis {
    stream_name = aws_kinesis_stream.sensor_data.name
    role_arn    = aws_iam_role.iot_kinesis.arn
    partition_key = "${device_id}"
  }

  # 에러 처리
  error_action {
    s3 {
      bucket_name = aws_s3_bucket.error_logs.id
      key         = "iot-errors/${timestamp()}"
      role_arn    = aws_iam_role.iot_s3.arn
    }
  }
}

# 이상 온도 알림 Rule
resource "aws_iot_topic_rule" "temperature_alert" {
  name    = "temperature_alert"
  enabled = true
  sql     = <<-SQL
    SELECT 
      device_id,
      temperature,
      'HIGH_TEMPERATURE' as alert_type
    FROM 'sensors/+/temperature'
    WHERE temperature > 80
  SQL
  sql_version = "2016-03-23"

  sns {
    target_arn = aws_sns_topic.alerts.arn
    role_arn   = aws_iam_role.iot_sns.arn
  }

  lambda {
    function_arn = aws_lambda_function.alert_handler.arn
  }
}

6.3 디바이스 인증 및 보안

IoT 보안 모델

X.509 인증서

  • • 디바이스별 고유 인증서
  • • AWS IoT CA 또는 자체 CA
  • • 인증서 로테이션 지원
  • • Just-in-Time Registration (JITR)

IoT Policy

  • • 토픽별 Publish/Subscribe 권한
  • • 디바이스별 세분화된 접근 제어
  • • 정책 변수 ($${iot:ClientId})

// IoT Policy 예시

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:ap-northeast-2:*:client/${iot:ClientId}"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": [
        "arn:aws:iot:ap-northeast-2:*:topic/sensors/${iot:ClientId}/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": [
        "arn:aws:iot:ap-northeast-2:*:topicfilter/commands/${iot:ClientId}/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": [
        "arn:aws:iot:ap-northeast-2:*:topic/commands/${iot:ClientId}/*"
      ]
    }
  ]
}

6.4 IoT 데이터 수집 패턴

패턴 1: 실시간 분석 (IoT → Kinesis → Lambda)

실시간 이상 탐지, 즉각적인 알림이 필요한 경우. Kinesis Data Analytics로 실시간 집계 가능.

패턴 2: 배치 분석 (IoT → Firehose → S3)

대용량 센서 데이터 저장, 일/주/월 단위 분석. Athena, Redshift로 분석.

패턴 3: 시계열 분석 (IoT → Timestream)

시계열 데이터 전용 저장소. 자동 데이터 계층화, 내장 시계열 함수 제공.

💡 IoT 데이터 수집 Best Practices

  • 메시지 크기 최소화: 압축, 바이너리 포맷 사용
  • 배치 전송: 여러 측정값을 하나의 메시지로
  • QoS 적절히 선택: QoS 0 (최대 1회), QoS 1 (최소 1회)
  • 오프라인 버퍼링: 연결 끊김 시 로컬 저장 후 재전송

7. 정리 및 다음 세션 예고

7.1 핵심 요약

데이터 수집 패턴

배치 수집은 대용량/비용 효율, 실시간 수집은 즉각적 분석에 적합. Lambda Architecture로 두 패턴을 조합하는 것이 일반적.

Kinesis 서비스 선택

KDS는 실시간 처리/다중 컨슈머, KDF는 단순 S3 적재에 적합. KDS → KDF 조합으로 실시간 + 배치 동시 지원 가능.

DMS CDC

운영 DB 변경 사항을 실시간으로 데이터 레이크에 복제. Full Load + CDC로 초기 마이그레이션 후 지속적 동기화.

AppFlow

SaaS 데이터 통합을 코드 없이 구현. 50+ 커넥터 지원. Salesforce, Zendesk 등 비즈니스 데이터 수집에 적합.

MSK vs Kinesis

MSK는 Kafka 에코시스템/마이그레이션, Kinesis는 AWS 네이티브/서버리스. MSK Connect로 다양한 소스/싱크 연결 가능.

IoT Core

수십억 디바이스 연결 지원. Rules Engine으로 SQL 기반 라우팅. X.509 인증서 기반 보안.

7.2 서비스 선택 가이드

사용 사례권장 서비스대안
실시간 클릭스트림Kinesis Data StreamsMSK
로그 수집 → S3Kinesis Data FirehoseFluentd + S3
DB 변경 복제DMS CDCDebezium + MSK
Salesforce 데이터AppFlowFivetran, Airbyte
Kafka 마이그레이션MSKSelf-managed Kafka
IoT 센서 데이터IoT CoreMQTT Broker + Lambda

7.3 다음 세션 예고

Session 3: 데이터 저장소 - S3 Data Lake 설계

수집된 데이터를 효율적으로 저장하고 관리하는 방법을 학습합니다.

  • S3 Data Lake 아키텍처 (Raw/Curated/Analytics 계층)
  • 파일 포맷 비교 (Parquet, ORC, Avro, JSON)
  • 파티셔닝 전략 및 성능 최적화
  • S3 스토리지 클래스 및 수명 주기 정책
  • AWS Glue Data Catalog 메타데이터 관리

7.4 실습 과제

과제 1: Kinesis Firehose 설정

Kinesis Data Firehose를 생성하여 샘플 데이터를 S3에 Parquet 형식으로 저장하세요. 동적 파티셔닝을 설정하여 날짜별로 데이터를 분리하세요.

과제 2: DMS CDC 파이프라인

RDS MySQL에서 S3로 CDC 파이프라인을 구축하세요. Full Load 후 CDC가 정상 동작하는지 확인하세요.

과제 3: 비용 분석

일일 10GB 데이터 수집 시나리오에서 Kinesis Data Streams vs Firehose의 월간 비용을 계산하고 비교하세요.