Sessions
중급Level 200-3003-4시간

AWS Data Platform Workshop

AWS CDK를 사용하여 실제 데이터 플랫폼을 구축하는 핸즈온 워크샵입니다. S3 데이터 레이크, Glue ETL, Athena, Kinesis 스트리밍, QuickSight 대시보드까지 전체 파이프라인을 구현합니다.

아키텍처 개요

┌─────────────────────────────────────────────────────────────────────────────┐
│                    AWS Data Platform Architecture                            │
│                                                                              │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐  │
│  │   Kinesis   │───▶│  Firehose   │───▶│   S3 Raw    │───▶│ Glue Crawler│  │
│  │   Stream    │    │  (Lambda)   │    │   Bucket    │    │             │  │
│  └─────────────┘    └─────────────┘    └──────┬──────┘    └──────┬──────┘  │
│                                               │                  │          │
│                                               ▼                  ▼          │
│                                        ┌─────────────┐    ┌─────────────┐  │
│                                        │  Glue ETL   │    │    Glue     │  │
│                                        │    Job      │    │   Catalog   │  │
│                                        └──────┬──────┘    └──────┬──────┘  │
│                                               │                  │          │
│                                               ▼                  │          │
│                                        ┌─────────────┐           │          │
│                                        │ S3 Processed│           │          │
│                                        │  (Parquet)  │           │          │
│                                        └──────┬──────┘           │          │
│                                               │                  │          │
│                                               ▼                  ▼          │
│                                        ┌─────────────┐    ┌─────────────┐  │
│                                        │   Athena    │◀───│  QuickSight │  │
│                                        │   Query     │    │  Dashboard  │  │
│                                        └─────────────┘    └─────────────┘  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

데이터 레이크

S3 + Glue Catalog

ETL 파이프라인

Glue + Crawler

실시간 스트리밍

Kinesis + Firehose

분석 & 시각화

Athena + QuickSight

💰 예상 비용: 약 $2-5 (전체 워크샵 완료 시)
Kinesis, Glue Job, QuickSight가 주요 비용 요소입니다. 워크샵 완료 후 반드시 리소스를 정리하세요.

사전 요구사항

  • AWS 계정 (AdministratorAccess 권한)
  • Node.js 18.x 이상
  • AWS CLI 설치 및 구성
  • Python 3.9+ (테스트 스크립트용)
  • 기본적인 SQL 지식

워크샵 단계

0

Step 0: 사전 준비

15분

학습 목표
  • AWS CDK 개발 환경 설정
  • 필요한 도구 설치 확인
  • AWS 자격 증명 구성
핵심 개념

AWS CDK란?

AWS Cloud Development Kit(CDK)는 프로그래밍 언어를 사용하여 클라우드 인프라를 정의하고 프로비저닝하는 오픈소스 프레임워크입니다. TypeScript, Python, Java 등을 지원합니다.

왜 CDK를 사용하나요?

CloudFormation 대비 코드 재사용성, 타입 안전성, IDE 자동완성 지원이 뛰어납니다. 복잡한 데이터 플랫폼 구축에 적합합니다.

0.1 Node.js 설치 확인

Node.js 18.x 이상이 필요합니다.

node --version
# v18.x.x 이상 확인

npm --version
# 9.x.x 이상 확인
0.2 AWS CDK CLI 설치

전역으로 AWS CDK CLI를 설치합니다.

npm install -g aws-cdk

# 설치 확인
cdk --version
0.3 AWS CLI 설정

AWS 자격 증명을 구성합니다.

aws configure
# AWS Access Key ID: [입력]
# AWS Secret Access Key: [입력]
# Default region name: ap-northeast-2
# Default output format: json

# 설정 확인
aws sts get-caller-identity
0.4 프로젝트 생성

CDK 프로젝트를 초기화합니다.

mkdir aws-data-platform-workshop
cd aws-data-platform-workshop

cdk init app --language typescript

# 필요한 패키지 설치
npm install @aws-cdk/aws-s3 @aws-cdk/aws-glue @aws-cdk/aws-kinesis \
  @aws-cdk/aws-lambda @aws-cdk/aws-iam @aws-cdk/aws-athena

💡 Tips

  • ap-northeast-2 (서울) 리전을 사용합니다
  • IAM 사용자에게 AdministratorAccess 권한이 필요합니다
0.5 CDK Bootstrap

CDK가 사용할 S3 버킷과 IAM 역할을 생성합니다.

cdk bootstrap aws://ACCOUNT_ID/ap-northeast-2

# ACCOUNT_ID는 본인의 AWS 계정 ID로 대체
확인 사항
  • cdk --version 명령이 정상 실행되는지 확인
  • aws sts get-caller-identity가 본인 계정 정보를 반환하는지 확인
  • cdk bootstrap이 성공적으로 완료되었는지 확인
1

Step 1: 데이터 레이크 기반 구축

30분

학습 목표
  • S3 기반 데이터 레이크 생성
  • Raw/Processed/Curated 계층 구조 설계
  • Glue Data Catalog 설정
핵심 개념

데이터 레이크 계층 구조

Raw(원본) → Processed(정제) → Curated(분석용) 3계층 구조로 데이터 품질을 단계적으로 향상시킵니다. Bronze/Silver/Gold 패턴이라고도 합니다.

Glue Data Catalog

데이터 레이크의 메타데이터 저장소입니다. 테이블 스키마, 파티션 정보를 관리하며 Athena, Redshift Spectrum 등에서 공유합니다.

1.1 데이터 레이크 스택 생성

lib/data-lake-stack.ts 파일을 생성합니다.

import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as glue from 'aws-cdk-lib/aws-glue';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';

export class DataLakeStack extends cdk.Stack {
  public readonly rawBucket: s3.Bucket;
  public readonly processedBucket: s3.Bucket;
  public readonly curatedBucket: s3.Bucket;
  public readonly database: glue.CfnDatabase;

  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Raw 데이터 버킷 (원본 데이터)
    this.rawBucket = new s3.Bucket(this, 'RawDataBucket', {
      bucketName: `data-lake-raw-${this.account}-${this.region}`,
      versioned: true,
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      lifecycleRules: [
        {
          id: 'MoveToIA',
          transitions: [
            {
              storageClass: s3.StorageClass.INFREQUENT_ACCESS,
              transitionAfter: cdk.Duration.days(90),
            },
            {
              storageClass: s3.StorageClass.GLACIER,
              transitionAfter: cdk.Duration.days(180),
            },
          ],
        },
      ],
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Processed 데이터 버킷 (정제된 데이터)
    this.processedBucket = new s3.Bucket(this, 'ProcessedDataBucket', {
      bucketName: `data-lake-processed-${this.account}-${this.region}`,
      versioned: true,
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Curated 데이터 버킷 (분석용 데이터)
    this.curatedBucket = new s3.Bucket(this, 'CuratedDataBucket', {
      bucketName: `data-lake-curated-${this.account}-${this.region}`,
      versioned: true,
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Glue Database
    this.database = new glue.CfnDatabase(this, 'GlueDatabase', {
      catalogId: this.account,
      databaseInput: {
        name: 'data_platform_db',
        description: 'Data Platform Workshop Database',
        locationUri: `s3://${this.curatedBucket.bucketName}/`,
      },
    });

    // Outputs
    new cdk.CfnOutput(this, 'RawBucketName', {
      value: this.rawBucket.bucketName,
      description: 'Raw data bucket name',
    });

    new cdk.CfnOutput(this, 'ProcessedBucketName', {
      value: this.processedBucket.bucketName,
      description: 'Processed data bucket name',
    });

    new cdk.CfnOutput(this, 'CuratedBucketName', {
      value: this.curatedBucket.bucketName,
      description: 'Curated data bucket name',
    });
  }
}

💡 Tips

  • 버킷 이름은 전역적으로 고유해야 합니다
  • Lifecycle 정책으로 스토리지 비용을 최적화합니다
1.2 메인 앱에 스택 추가

bin/aws-data-platform-workshop.ts를 수정합니다.

#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { DataLakeStack } from '../lib/data-lake-stack';

const app = new cdk.App();

const env = {
  account: process.env.CDK_DEFAULT_ACCOUNT,
  region: process.env.CDK_DEFAULT_REGION || 'ap-northeast-2',
};

new DataLakeStack(app, 'DataLakeStack', { env });
1.3 배포

데이터 레이크 인프라를 배포합니다.

# 변경사항 확인
cdk diff DataLakeStack

# 배포
cdk deploy DataLakeStack --require-approval never
1.4 샘플 데이터 업로드

테스트용 샘플 데이터를 업로드합니다.

# 샘플 데이터 생성
cat > sample_orders.json << 'EOF'
{"order_id": "ORD001", "customer_id": "C001", "product": "Laptop", "amount": 1200, "order_date": "2024-01-15"}
{"order_id": "ORD002", "customer_id": "C002", "product": "Mouse", "amount": 25, "order_date": "2024-01-15"}
{"order_id": "ORD003", "customer_id": "C001", "product": "Keyboard", "amount": 75, "order_date": "2024-01-16"}
{"order_id": "ORD004", "customer_id": "C003", "product": "Monitor", "amount": 350, "order_date": "2024-01-16"}
{"order_id": "ORD005", "customer_id": "C002", "product": "Webcam", "amount": 80, "order_date": "2024-01-17"}
EOF

# S3에 업로드
aws s3 cp sample_orders.json s3://data-lake-raw-$(aws sts get-caller-identity --query Account --output text)-ap-northeast-2/orders/year=2024/month=01/
확인 사항
  • AWS 콘솔에서 S3 버킷 3개가 생성되었는지 확인
  • Glue Data Catalog에 data_platform_db 데이터베이스가 생성되었는지 확인
  • 샘플 데이터가 Raw 버킷에 업로드되었는지 확인
2

Step 2: Glue ETL 파이프라인

40분

학습 목표
  • Glue Crawler로 스키마 자동 탐지
  • Glue ETL Job으로 데이터 변환
  • Parquet 포맷으로 최적화
핵심 개념

Glue Crawler

S3, RDS 등의 데이터 소스를 스캔하여 스키마를 자동으로 추론하고 Data Catalog에 테이블을 생성합니다.

Parquet 포맷

컬럼 기반 저장 포맷으로 분석 쿼리에 최적화되어 있습니다. 압축률이 높고 필요한 컬럼만 읽어 비용을 절감합니다.

2.1 Glue ETL 스택 생성

lib/glue-etl-stack.ts 파일을 생성합니다.

import * as cdk from 'aws-cdk-lib';
import * as glue from 'aws-cdk-lib/aws-glue';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';

interface GlueEtlStackProps extends cdk.StackProps {
  rawBucket: s3.IBucket;
  processedBucket: s3.IBucket;
  curatedBucket: s3.IBucket;
  databaseName: string;
}

export class GlueEtlStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: GlueEtlStackProps) {
    super(scope, id, props);

    // Glue IAM Role
    const glueRole = new iam.Role(this, 'GlueRole', {
      assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole'),
      ],
    });

    // S3 버킷 접근 권한
    props.rawBucket.grantRead(glueRole);
    props.processedBucket.grantReadWrite(glueRole);
    props.curatedBucket.grantReadWrite(glueRole);

    // Raw 데이터 Crawler
    const rawCrawler = new glue.CfnCrawler(this, 'RawDataCrawler', {
      name: 'raw-data-crawler',
      role: glueRole.roleArn,
      databaseName: props.databaseName,
      targets: {
        s3Targets: [
          {
            path: `s3://${props.rawBucket.bucketName}/orders/`,
          },
        ],
      },
      schemaChangePolicy: {
        updateBehavior: 'UPDATE_IN_DATABASE',
        deleteBehavior: 'LOG',
      },
      configuration: JSON.stringify({
        Version: 1.0,
        Grouping: {
          TableGroupingPolicy: 'CombineCompatibleSchemas',
        },
      }),
    });

    // ETL Job Script (S3에 저장)
    const etlScript = `
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_date, year, month

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SOURCE_PATH', 'TARGET_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read raw JSON data
df = spark.read.json(args['SOURCE_PATH'])

# Transform: Add date partitions, clean data
df_transformed = df \
    .withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("year", year(col("order_date"))) \
    .withColumn("month", month(col("order_date"))) \
    .filter(col("amount") > 0)

# Write as Parquet with partitioning
df_transformed.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(args['TARGET_PATH'])

job.commit()
`;

    // ETL Job
    const etlJob = new glue.CfnJob(this, 'OrdersEtlJob', {
      name: 'orders-etl-job',
      role: glueRole.roleArn,
      command: {
        name: 'glueetl',
        pythonVersion: '3',
        scriptLocation: `s3://${props.processedBucket.bucketName}/scripts/orders_etl.py`,
      },
      defaultArguments: {
        '--SOURCE_PATH': `s3://${props.rawBucket.bucketName}/orders/`,
        '--TARGET_PATH': `s3://${props.processedBucket.bucketName}/orders/`,
        '--job-language': 'python',
        '--enable-metrics': 'true',
        '--enable-continuous-cloudwatch-log': 'true',
      },
      glueVersion: '4.0',
      numberOfWorkers: 2,
      workerType: 'G.1X',
      timeout: 60,
    });

    // Processed 데이터 Crawler
    const processedCrawler = new glue.CfnCrawler(this, 'ProcessedDataCrawler', {
      name: 'processed-data-crawler',
      role: glueRole.roleArn,
      databaseName: props.databaseName,
      targets: {
        s3Targets: [
          {
            path: `s3://${props.processedBucket.bucketName}/orders/`,
          },
        ],
      },
      schemaChangePolicy: {
        updateBehavior: 'UPDATE_IN_DATABASE',
        deleteBehavior: 'LOG',
      },
    });

    // Outputs
    new cdk.CfnOutput(this, 'GlueRoleArn', {
      value: glueRole.roleArn,
    });
  }
}
2.2 ETL 스크립트 업로드

Glue Job에서 사용할 스크립트를 S3에 업로드합니다.

# ETL 스크립트 생성
cat > orders_etl.py << 'EOF'
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_date, year, month

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SOURCE_PATH', 'TARGET_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read raw JSON data
df = spark.read.json(args['SOURCE_PATH'])

# Transform
df_transformed = df \
    .withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("year", year(col("order_date"))) \
    .withColumn("month", month(col("order_date"))) \
    .filter(col("amount") > 0)

# Write as Parquet
df_transformed.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(args['TARGET_PATH'])

job.commit()
EOF

# S3에 업로드
PROCESSED_BUCKET="data-lake-processed-$(aws sts get-caller-identity --query Account --output text)-ap-northeast-2"
aws s3 cp orders_etl.py s3://$PROCESSED_BUCKET/scripts/
2.3 스택 배포 및 실행

Glue ETL 스택을 배포하고 Crawler와 Job을 실행합니다.

# 배포
cdk deploy GlueEtlStack --require-approval never

# Raw Crawler 실행
aws glue start-crawler --name raw-data-crawler

# Crawler 완료 대기 (약 1-2분)
aws glue get-crawler --name raw-data-crawler --query 'Crawler.State'

# ETL Job 실행
aws glue start-job-run --job-name orders-etl-job

# Job 상태 확인
aws glue get-job-runs --job-name orders-etl-job --query 'JobRuns[0].JobRunState'

# Processed Crawler 실행
aws glue start-crawler --name processed-data-crawler

💡 Tips

  • Crawler는 처음 실행 시 1-2분 소요됩니다
  • ETL Job은 약 2-3분 소요됩니다
확인 사항
  • Glue Data Catalog에 orders 테이블이 생성되었는지 확인
  • Processed 버킷에 Parquet 파일이 생성되었는지 확인
  • Glue Job이 Succeeded 상태인지 확인
3

Step 3: Athena로 데이터 분석

20분

학습 목표
  • Athena로 S3 데이터 쿼리
  • 파티셔닝을 활용한 비용 최적화
  • 쿼리 결과 저장 및 활용
핵심 개념

Amazon Athena

서버리스 대화형 쿼리 서비스입니다. S3에 저장된 데이터를 표준 SQL로 분석할 수 있으며, 스캔한 데이터 양에 따라 과금됩니다.

파티셔닝의 중요성

파티션을 사용하면 쿼리 시 필요한 데이터만 스캔하여 비용을 90% 이상 절감할 수 있습니다. year/month/day 형태가 일반적입니다.

3.1 Athena Workgroup 생성

lib/athena-stack.ts 파일을 생성합니다.

import * as cdk from 'aws-cdk-lib';
import * as athena from 'aws-cdk-lib/aws-athena';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';

interface AthenaStackProps extends cdk.StackProps {
  curatedBucket: s3.IBucket;
}

export class AthenaStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: AthenaStackProps) {
    super(scope, id, props);

    // Athena 쿼리 결과 버킷
    const queryResultsBucket = new s3.Bucket(this, 'AthenaQueryResults', {
      bucketName: `athena-results-${this.account}-${this.region}`,
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      lifecycleRules: [
        {
          expiration: cdk.Duration.days(30),
        },
      ],
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Athena Workgroup
    const workgroup = new athena.CfnWorkGroup(this, 'DataPlatformWorkgroup', {
      name: 'data-platform-workgroup',
      description: 'Workgroup for Data Platform Workshop',
      state: 'ENABLED',
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${queryResultsBucket.bucketName}/results/`,
          encryptionConfiguration: {
            encryptionOption: 'SSE_S3',
          },
        },
        enforceWorkGroupConfiguration: true,
        publishCloudWatchMetricsEnabled: true,
        bytesScannedCutoffPerQuery: 10737418240, // 10GB limit
        engineVersion: {
          selectedEngineVersion: 'Athena engine version 3',
        },
      },
    });

    new cdk.CfnOutput(this, 'WorkgroupName', {
      value: workgroup.name,
    });

    new cdk.CfnOutput(this, 'QueryResultsBucket', {
      value: queryResultsBucket.bucketName,
    });
  }
}
3.2 Athena 쿼리 실행

AWS 콘솔 또는 CLI로 쿼리를 실행합니다.

# Athena 콘솔에서 실행하거나 CLI 사용

# 1. 테이블 확인
aws athena start-query-execution \
  --query-string "SHOW TABLES IN data_platform_db" \
  --work-group data-platform-workgroup \
  --query-execution-context Database=data_platform_db

# 2. 데이터 조회 (파티션 활용)
aws athena start-query-execution \
  --query-string "SELECT * FROM orders WHERE year = 2024 AND month = 1 LIMIT 10" \
  --work-group data-platform-workgroup \
  --query-execution-context Database=data_platform_db

# 3. 집계 쿼리
aws athena start-query-execution \
  --query-string "SELECT product, SUM(amount) as total_sales, COUNT(*) as order_count FROM orders GROUP BY product ORDER BY total_sales DESC" \
  --work-group data-platform-workgroup \
  --query-execution-context Database=data_platform_db

💡 Tips

  • Workgroup에서 쿼리 비용 제한을 설정할 수 있습니다
  • 파티션 컬럼(year, month)을 WHERE 절에 포함하면 비용이 절감됩니다
3.3 CTAS로 최적화된 테이블 생성

Create Table As Select로 Parquet 테이블을 생성합니다.

-- Athena 콘솔에서 실행
CREATE TABLE data_platform_db.orders_optimized
WITH (
  format = 'PARQUET',
  parquet_compression = 'SNAPPY',
  partitioned_by = ARRAY['year', 'month'],
  external_location = 's3://data-lake-curated-ACCOUNT_ID-ap-northeast-2/orders_optimized/'
) AS
SELECT 
  order_id,
  customer_id,
  product,
  amount,
  order_date,
  year,
  month
FROM data_platform_db.orders;
확인 사항
  • Athena Workgroup이 생성되었는지 확인
  • 쿼리가 정상적으로 실행되는지 확인
  • 쿼리 결과가 S3에 저장되는지 확인
4

Step 4: 실시간 스트리밍 파이프라인

45분

학습 목표
  • Kinesis Data Streams로 실시간 데이터 수집
  • Kinesis Data Firehose로 S3 전달
  • Lambda로 실시간 데이터 처리
핵심 개념

Kinesis Data Streams

실시간 데이터 스트리밍 서비스입니다. 샤드 단위로 처리량을 조절하며, 1샤드당 1MB/s 쓰기, 2MB/s 읽기를 지원합니다.

Kinesis Data Firehose

스트리밍 데이터를 S3, Redshift, OpenSearch 등으로 자동 전달합니다. 버퍼링, 압축, 포맷 변환을 자동으로 처리합니다.

4.1 스트리밍 스택 생성

lib/streaming-stack.ts 파일을 생성합니다.

import * as cdk from 'aws-cdk-lib';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';

interface StreamingStackProps extends cdk.StackProps {
  rawBucket: s3.IBucket;
}

export class StreamingStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: StreamingStackProps) {
    super(scope, id, props);

    // Kinesis Data Stream
    const stream = new kinesis.Stream(this, 'EventStream', {
      streamName: 'event-stream',
      shardCount: 1,
      retentionPeriod: cdk.Duration.hours(24),
    });

    // Lambda for real-time processing
    const processorLambda = new lambda.Function(this, 'StreamProcessor', {
      functionName: 'stream-processor',
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
import base64

def handler(event, context):
    output = []
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        data = json.loads(payload)
        
        # Add processing timestamp
        data['processed_at'] = context.aws_request_id
        
        # Enrich data
        if data.get('amount', 0) > 100:
            data['category'] = 'high_value'
        else:
            data['category'] = 'standard'
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(data).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
    
    return {'records': output}
`),
      timeout: cdk.Duration.minutes(1),
      memorySize: 256,
    });

    // Firehose IAM Role
    const firehoseRole = new iam.Role(this, 'FirehoseRole', {
      assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
    });

    props.rawBucket.grantReadWrite(firehoseRole);
    processorLambda.grantInvoke(firehoseRole);
    stream.grantRead(firehoseRole);

    // CloudWatch Log Group for Firehose
    const logGroup = new logs.LogGroup(this, 'FirehoseLogGroup', {
      logGroupName: '/aws/kinesisfirehose/event-delivery',
      retention: logs.RetentionDays.ONE_WEEK,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    const logStream = new logs.LogStream(this, 'FirehoseLogStream', {
      logGroup: logGroup,
      logStreamName: 'delivery-stream',
    });

    // Kinesis Data Firehose
    const deliveryStream = new firehose.CfnDeliveryStream(this, 'EventDeliveryStream', {
      deliveryStreamName: 'event-delivery-stream',
      deliveryStreamType: 'KinesisStreamAsSource',
      kinesisStreamSourceConfiguration: {
        kinesisStreamArn: stream.streamArn,
        roleArn: firehoseRole.roleArn,
      },
      extendedS3DestinationConfiguration: {
        bucketArn: props.rawBucket.bucketArn,
        roleArn: firehoseRole.roleArn,
        prefix: 'streaming/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
        errorOutputPrefix: 'streaming-errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
        bufferingHints: {
          intervalInSeconds: 60,
          sizeInMBs: 1,
        },
        compressionFormat: 'GZIP',
        processingConfiguration: {
          enabled: true,
          processors: [
            {
              type: 'Lambda',
              parameters: [
                {
                  parameterName: 'LambdaArn',
                  parameterValue: processorLambda.functionArn,
                },
              ],
            },
          ],
        },
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: logGroup.logGroupName,
          logStreamName: logStream.logStreamName,
        },
      },
    });

    // Outputs
    new cdk.CfnOutput(this, 'StreamName', {
      value: stream.streamName,
    });

    new cdk.CfnOutput(this, 'DeliveryStreamName', {
      value: deliveryStream.deliveryStreamName!,
    });
  }
}
4.2 테스트 데이터 전송

Kinesis Stream에 테스트 데이터를 전송합니다.

# 테스트 데이터 전송 스크립트
cat > send_events.py << 'EOF'
import boto3
import json
import time
import random
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')
stream_name = 'event-stream'

products = ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Webcam', 'Headset']
customers = ['C001', 'C002', 'C003', 'C004', 'C005']

for i in range(100):
    event = {
        'event_id': f'EVT{i:05d}',
        'customer_id': random.choice(customers),
        'product': random.choice(products),
        'amount': random.randint(10, 500),
        'timestamp': datetime.utcnow().isoformat()
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=event['customer_id']
    )
    
    print(f"Sent: {event['event_id']} - Shard: {response['ShardId']}")
    time.sleep(0.1)

print("Done!")
EOF

# 실행
pip install boto3
python send_events.py

💡 Tips

  • PartitionKey로 데이터가 샤드에 분산됩니다
  • Firehose는 1분 또는 1MB 버퍼가 차면 S3에 전달합니다
확인 사항
  • Kinesis Data Stream이 생성되었는지 확인
  • Firehose Delivery Stream이 활성화되었는지 확인
  • 1-2분 후 S3 streaming/ 폴더에 데이터가 저장되는지 확인
5

Step 5: QuickSight 대시보드

30분

학습 목표
  • QuickSight와 Athena 연결
  • 데이터셋 생성 및 SPICE 활용
  • 대시보드 시각화 구성
핵심 개념

Amazon QuickSight

서버리스 BI 서비스입니다. SPICE 인메모리 엔진으로 빠른 응답을 제공하며, ML 기반 인사이트를 자동으로 생성합니다.

SPICE

Super-fast, Parallel, In-memory Calculation Engine의 약자로, 데이터를 메모리에 캐싱하여 빠른 쿼리 응답을 제공합니다.

5.1 QuickSight 설정

QuickSight 계정을 설정하고 Athena에 연결합니다.

# QuickSight는 콘솔에서 설정합니다

# 1. AWS 콘솔 → QuickSight 이동
# 2. Sign up for QuickSight (Enterprise Edition 권장)
# 3. QuickSight가 S3, Athena에 접근할 수 있도록 권한 설정

# CLI로 QuickSight 데이터 소스 생성
aws quicksight create-data-source \
  --aws-account-id $(aws sts get-caller-identity --query Account --output text) \
  --data-source-id athena-data-source \
  --name "Athena Data Source" \
  --type ATHENA \
  --data-source-parameters '{"AthenaParameters":{"WorkGroup":"data-platform-workgroup"}}' \
  --permissions '[{"Principal":"arn:aws:quicksight:ap-northeast-2:ACCOUNT_ID:user/default/ADMIN_USER","Actions":["quicksight:DescribeDataSource","quicksight:DescribeDataSourcePermissions","quicksight:PassDataSource","quicksight:UpdateDataSource","quicksight:DeleteDataSource","quicksight:UpdateDataSourcePermissions"]}]'

💡 Tips

  • QuickSight Enterprise는 월 $18/사용자입니다
  • Standard Edition은 월 $9/사용자입니다
5.2 데이터셋 생성

Athena 테이블을 기반으로 데이터셋을 생성합니다.

# QuickSight 콘솔에서:
# 1. Datasets → New dataset
# 2. Athena 선택
# 3. data_platform_db 데이터베이스 선택
# 4. orders 테이블 선택
# 5. Import to SPICE 선택 (빠른 응답을 위해)

# 또는 Custom SQL 사용:
SELECT 
  order_id,
  customer_id,
  product,
  amount,
  order_date,
  CASE 
    WHEN amount > 200 THEN 'High'
    WHEN amount > 50 THEN 'Medium'
    ELSE 'Low'
  END as value_segment
FROM data_platform_db.orders
WHERE year = 2024
5.3 대시보드 생성

시각화를 구성하여 대시보드를 만듭니다.

# QuickSight 콘솔에서 Analysis 생성:

# 1. 매출 추이 (Line Chart)
#    - X축: order_date
#    - Y축: SUM(amount)

# 2. 제품별 매출 (Bar Chart)
#    - X축: product
#    - Y축: SUM(amount)
#    - 정렬: 내림차순

# 3. 고객별 주문 수 (Pie Chart)
#    - Group by: customer_id
#    - Value: COUNT(order_id)

# 4. KPI 카드
#    - 총 매출: SUM(amount)
#    - 총 주문 수: COUNT(order_id)
#    - 평균 주문 금액: AVG(amount)

# 5. 필터 추가
#    - order_date 범위 필터
#    - product 다중 선택 필터
확인 사항
  • QuickSight에서 Athena 데이터 소스가 연결되었는지 확인
  • SPICE 데이터셋이 생성되었는지 확인
  • 대시보드에 시각화가 표시되는지 확인
6

Step 6: 리소스 정리

10분

학습 목표
  • 생성된 모든 리소스 삭제
  • 비용 발생 방지
  • CDK destroy 실행
핵심 개념

리소스 정리의 중요성

워크샵 완료 후 리소스를 정리하지 않으면 지속적으로 비용이 발생합니다. 특히 Kinesis, QuickSight는 시간당 과금됩니다.

6.1 QuickSight 리소스 삭제

QuickSight 대시보드, 데이터셋, 데이터 소스를 삭제합니다.

# QuickSight 콘솔에서:
# 1. Dashboards → 삭제
# 2. Analyses → 삭제
# 3. Datasets → 삭제
# 4. Data sources → 삭제

# 또는 QuickSight 구독 취소 (모든 리소스 삭제됨)
# QuickSight → Manage QuickSight → Account settings → Delete account
6.2 CDK 스택 삭제

CDK로 생성한 모든 리소스를 삭제합니다.

# 모든 스택 삭제 (역순으로)
cdk destroy StreamingStack --force
cdk destroy AthenaStack --force
cdk destroy GlueEtlStack --force
cdk destroy DataLakeStack --force

# 또는 한번에 모두 삭제
cdk destroy --all --force

💡 Tips

  • S3 버킷은 autoDeleteObjects: true로 설정되어 자동 삭제됩니다
  • 삭제에 5-10분 소요될 수 있습니다
6.3 삭제 확인

모든 리소스가 삭제되었는지 확인합니다.

# CloudFormation 스택 확인
aws cloudformation list-stacks --stack-status-filter CREATE_COMPLETE UPDATE_COMPLETE

# S3 버킷 확인
aws s3 ls | grep data-lake

# Glue 리소스 확인
aws glue get-databases
aws glue get-crawlers
aws glue get-jobs

# Kinesis 확인
aws kinesis list-streams
aws firehose list-delivery-streams
확인 사항
  • CloudFormation에 워크샵 관련 스택이 없는지 확인
  • S3에 data-lake 버킷이 없는지 확인
  • Glue에 워크샵 관련 리소스가 없는지 확인
  • Kinesis 스트림이 삭제되었는지 확인

워크샵 완료!

축하합니다! AWS Data Platform 워크샵을 완료했습니다. 이 워크샵에서 다음을 학습했습니다:

  • S3 기반 데이터 레이크 3계층 구조 (Raw/Processed/Curated)
  • Glue Crawler와 ETL Job을 활용한 데이터 파이프라인
  • Athena를 활용한 서버리스 SQL 분석
  • Kinesis를 활용한 실시간 스트리밍 파이프라인
  • QuickSight를 활용한 데이터 시각화
  • AWS CDK를 활용한 인프라 자동화

⚠️ 중요: 워크샵 완료 후 반드시 Step 6의 리소스 정리를 수행하세요. 리소스를 삭제하지 않으면 지속적으로 비용이 발생합니다.

다음 단계

📚 이론 학습

AWS Data Platform 이론 세션에서 더 깊이 있는 내용을 학습하세요.

Theory 바로가기

🎯 자격증 준비

AWS Data Analytics Specialty 자격증을 준비해보세요.

Session 1 시작