← Theory 목록으로

Session 9: DataOps & MLOps

CI/CD, SageMaker Pipelines, 모니터링, IaC

1. DataOps 개념

DataOps는 데이터 파이프라인에 DevOps 원칙을 적용하여 데이터 품질, 속도, 협업을 개선하는 방법론입니다.

DataOps 파이프라인
┌─────────────────────────────────────────────────────────────────────┐
│                    DataOps Pipeline                                  │
│                                                                      │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐      │
│  │  Code    │───▶│  Build   │───▶│  Test    │───▶│  Deploy  │      │
│  │          │    │          │    │          │    │          │      │
│  │ • ETL    │    │ • Package│    │ • Unit   │    │ • Dev    │      │
│  │ • Schema │    │ • Docker │    │ • Data   │    │ • Stage  │      │
│  │ • Config │    │ • Glue   │    │ • Quality│    │ • Prod   │      │
│  └──────────┘    └──────────┘    └──────────┘    └──────────┘      │
│       │                                               │              │
│       │              ┌──────────┐                     │              │
│       └──────────────│ Monitor  │◀────────────────────┘              │
│                      │          │                                    │
│                      │ • Metrics│                                    │
│                      │ • Alerts │                                    │
│                      │ • Logs   │                                    │
│                      └──────────┘                                    │
└─────────────────────────────────────────────────────────────────────┘

버전 관리

ETL 코드, 스키마, 설정을 Git으로 관리

자동화 테스트

데이터 품질 검증을 CI/CD에 통합

환경 분리

Dev → Stage → Prod 단계별 배포

1.2 AWS 기반 DataOps

CI/CD for Data Pipelines

// CodePipeline for Glue ETL

# buildspec.yml for Glue ETL
version: 0.2
phases:
  install:
    runtime-versions:
      python: 3.9
  pre_build:
    commands:
      - pip install pytest boto3 awsglue-local
  build:
    commands:
      # Unit tests
      - pytest tests/unit/ -v
      # Data quality tests
      - python tests/data_quality_check.py
  post_build:
    commands:
      # Deploy Glue Job
      - aws glue update-job --job-name ${GLUE_JOB_NAME} \
          --job-update "Command={ScriptLocation=s3://${BUCKET}/scripts/etl.py}"
      # Update Glue Workflow
      - aws glue update-workflow --name ${WORKFLOW_NAME}

artifacts:
  files:
    - scripts/**/*
    - tests/reports/**/*

// Terraform - CodePipeline

resource "aws_codepipeline" "data_pipeline" {
  name     = "data-etl-pipeline"
  role_arn = aws_iam_role.codepipeline.arn

  stage {
    name = "Source"
    action {
      name             = "Source"
      category         = "Source"
      owner            = "AWS"
      provider         = "CodeCommit"
      version          = "1"
      output_artifacts = ["source_output"]
      configuration = {
        RepositoryName = "data-etl-repo"
        BranchName     = "main"
      }
    }
  }

  stage {
    name = "Test"
    action {
      name            = "UnitTest"
      category        = "Build"
      owner           = "AWS"
      provider        = "CodeBuild"
      input_artifacts = ["source_output"]
      configuration = {
        ProjectName = aws_codebuild_project.test.name
      }
    }
  }

  stage {
    name = "Deploy-Dev"
    action {
      name            = "DeployDev"
      category        = "Build"
      owner           = "AWS"
      provider        = "CodeBuild"
      input_artifacts = ["source_output"]
      configuration = {
        ProjectName          = aws_codebuild_project.deploy.name
        EnvironmentVariables = jsonencode([
          { name = "ENV", value = "dev" }
        ])
      }
    }
  }
}

2. MLOps & SageMaker

2.1 MLOps 파이프라인

ML 라이프사이클
┌─────────────────────────────────────────────────────────────────────┐
│                    MLOps Pipeline                                    │
│                                                                      │
│  Data Preparation          Model Development         Deployment      │
│  ┌─────────────┐          ┌─────────────┐          ┌─────────────┐  │
│  │ Feature     │          │ Training    │          │ Endpoint    │  │
│  │ Engineering │─────────▶│ & Tuning    │─────────▶│ Deployment  │  │
│  │             │          │             │          │             │  │
│  │ • S3        │          │ • SageMaker │          │ • Real-time │  │
│  │ • Glue      │          │ • HPO       │          │ • Batch     │  │
│  │ • Feature   │          │ • Experiments│         │ • Serverless│  │
│  │   Store     │          │             │          │             │  │
│  └─────────────┘          └─────────────┘          └─────────────┘  │
│        │                        │                        │          │
│        │                        │                        │          │
│        ▼                        ▼                        ▼          │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    Model Registry                                ││
│  │  • Version Control  • Approval Workflow  • Lineage Tracking     ││
│  └─────────────────────────────────────────────────────────────────┘│
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    Model Monitoring                              ││
│  │  • Data Drift  • Model Drift  • Bias Detection                  ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

2.2 SageMaker Pipelines

ML 워크플로우 자동화

// SageMaker Pipeline 정의

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

# 1. 데이터 전처리 단계
processing_step = ProcessingStep(
    name="PreprocessData",
    processor=sklearn_processor,
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[ProcessingOutput(output_name="train", source="/opt/ml/processing/train")],
    code="preprocess.py"
)

# 2. 모델 학습 단계
training_step = TrainingStep(
    name="TrainModel",
    estimator=xgb_estimator,
    inputs={"train": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri)}
)

# 3. 모델 평가 단계
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=sklearn_processor,
    inputs=[ProcessingInput(source=training_step.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model")],
    code="evaluate.py"
)

# 4. 조건부 배포 (정확도 > 0.8)
condition_step = ConditionStep(
    name="CheckAccuracy",
    conditions=[ConditionGreaterThanOrEqualTo(
        left=JsonGet(step_name="EvaluateModel", property_file="evaluation.json", json_path="accuracy"),
        right=0.8
    )],
    if_steps=[register_step, deploy_step],
    else_steps=[fail_step]
)

# 파이프라인 생성
pipeline = Pipeline(
    name="ml-training-pipeline",
    steps=[processing_step, training_step, evaluation_step, condition_step],
    sagemaker_session=sagemaker_session
)
pipeline.upsert(role_arn=role)

2.3 Feature Store

SageMaker Feature Store는 ML 피처를 중앙에서 저장, 공유, 재사용할 수 있는 저장소입니다.

┌─────────────────────────────────────────────────────────────────────┐
│                    SageMaker Feature Store                           │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Feature Group: customer_features                                ││
│  │  ├── customer_id (identifier)                                   ││
│  │  ├── total_purchases (numeric)                                  ││
│  │  ├── avg_order_value (numeric)                                  ││
│  │  ├── days_since_last_order (numeric)                            ││
│  │  └── event_time (timestamp)                                     ││
│  └─────────────────────────────────────────────────────────────────┘│
│                              │                                       │
│              ┌───────────────┴───────────────┐                      │
│              ▼                               ▼                      │
│  ┌─────────────────────┐         ┌─────────────────────┐           │
│  │   Online Store      │         │   Offline Store     │           │
│  │   (Low Latency)     │         │   (S3 + Glue)       │           │
│  │                     │         │                     │           │
│  │   • Real-time       │         │   • Batch Training  │           │
│  │     Inference       │         │   • Historical      │           │
│  │   • < 10ms          │         │     Analysis        │           │
│  └─────────────────────┘         └─────────────────────┘           │
└─────────────────────────────────────────────────────────────────────┘

3. 모니터링 & 관측성

3.1 데이터 파이프라인 모니터링

CloudWatch 메트릭
┌─────────────────────────────────────────────────────────────────────┐
│                    Data Pipeline Observability                       │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Glue ETL Metrics                                                ││
│  │  ├── glue.driver.aggregate.numCompletedTasks                    ││
│  │  ├── glue.driver.aggregate.numFailedTasks                       ││
│  │  ├── glue.driver.ExecutorAllocationManager.executors.numberAll  ││
│  │  └── glue.driver.jvm.heap.usage                                 ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Kinesis Metrics                                                 ││
│  │  ├── IncomingRecords / IncomingBytes                            ││
│  │  ├── GetRecords.IteratorAgeMilliseconds (Consumer Lag)          ││
│  │  ├── WriteProvisionedThroughputExceeded                         ││
│  │  └── ReadProvisionedThroughputExceeded                          ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Redshift Metrics                                                ││
│  │  ├── CPUUtilization / PercentageDiskSpaceUsed                   ││
│  │  ├── DatabaseConnections                                         ││
│  │  ├── QueryDuration / QueriesCompletedPerSecond                  ││
│  │  └── WLMQueueLength                                              ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

// CloudWatch Alarm 설정

resource "aws_cloudwatch_metric_alarm" "glue_job_failure" {
  alarm_name          = "glue-etl-job-failure"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "glue.driver.aggregate.numFailedTasks"
  namespace           = "Glue"
  period              = 300
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Glue ETL job has failed tasks"
  
  dimensions = {
    JobName = aws_glue_job.etl.name
    Type    = "gauge"
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

resource "aws_cloudwatch_metric_alarm" "kinesis_lag" {
  alarm_name          = "kinesis-consumer-lag"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 3
  metric_name         = "GetRecords.IteratorAgeMilliseconds"
  namespace           = "AWS/Kinesis"
  period              = 60
  statistic           = "Maximum"
  threshold           = 60000  # 1분 이상 지연
  
  dimensions = {
    StreamName = aws_kinesis_stream.events.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

3.2 ML 모델 모니터링

SageMaker Model Monitor
┌─────────────────────────────────────────────────────────────────────┐
│                    Model Monitoring Types                            │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  1. Data Quality Monitoring                                      ││
│  │     • 입력 데이터의 통계적 특성 변화 감지                         ││
│  │     • Baseline vs Current 비교                                   ││
│  │     • Missing values, data types, distribution                  ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  2. Model Quality Monitoring                                     ││
│  │     • 예측 정확도 추적 (Ground Truth 필요)                       ││
│  │     • Accuracy, Precision, Recall, F1                           ││
│  │     • Regression: MSE, MAE, R²                                  ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  3. Bias Drift Monitoring                                        ││
│  │     • 모델 편향 변화 감지                                        ││
│  │     • Demographic parity, Equalized odds                        ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  4. Feature Attribution Drift                                    ││
│  │     • Feature importance 변화 감지                               ││
│  │     • SHAP values 비교                                           ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

// Model Monitor 설정

from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Data Quality Monitor
data_quality_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600
)

# Baseline 생성
data_quality_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=True)
)

# 모니터링 스케줄 생성
data_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name="data-quality-schedule",
    endpoint_input=endpoint_name,
    output_s3_uri=f"s3://{bucket}/monitoring/data-quality",
    statistics=data_quality_monitor.baseline_statistics(),
    constraints=data_quality_monitor.suggested_constraints(),
    schedule_cron_expression="cron(0 * ? * * *)"  # 매시간
)

4. 핵심 요약

DataOps & MLOps 핵심 포인트

DataOps

  • • ETL 코드, 스키마, 설정을 Git으로 버전 관리
  • • CodePipeline + CodeBuild로 CI/CD 자동화
  • • 데이터 품질 테스트를 파이프라인에 통합
  • • Dev → Stage → Prod 환경 분리

MLOps (SageMaker)

  • • SageMaker Pipelines: ML 워크플로우 자동화
  • • Model Registry: 모델 버전 관리 및 승인 워크플로우
  • • Feature Store: 피처 중앙 저장소 (Online/Offline)
  • • 조건부 배포: 정확도 기준 충족 시 자동 배포

모니터링

  • • CloudWatch: Glue, Kinesis, Redshift 메트릭
  • • Kinesis IteratorAge: Consumer Lag 모니터링
  • • Model Monitor: Data/Model Quality, Bias Drift
  • • 알림: CloudWatch Alarm → SNS → Slack/PagerDuty

🎯 시험 포인트

SageMaker Pipelines

ML 워크플로우 오케스트레이션, 조건부 단계 지원

Feature Store

Online (실시간) + Offline (배치) 이중 저장소

Model Monitor

Data Quality, Model Quality, Bias, Feature Attribution

Kinesis Lag

IteratorAgeMilliseconds로 Consumer 지연 감지

AWS 서비스 매핑

기능AWS 서비스
CI/CDCodePipeline, CodeBuild, CodeCommit
ML 파이프라인SageMaker Pipelines, Step Functions
피처 저장소SageMaker Feature Store
모델 레지스트리SageMaker Model Registry
모델 모니터링SageMaker Model Monitor
인프라 모니터링CloudWatch, EventBridge, SNS

5. 인프라 자동화 (IaC)

데이터 플랫폼 인프라를 코드로 관리하면 일관성, 재현성, 버전 관리가 가능합니다. Terraform, CDK, CloudFormation을 활용한 IaC 패턴을 살펴봅니다.

데이터 플랫폼 IaC 구조
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Data Platform IaC Structure                               │
│                                                                              │
│  terraform/                                                                  │
│  ├── environments/                                                           │
│  │   ├── dev/                                                               │
│  │   │   ├── main.tf                                                        │
│  │   │   ├── variables.tf                                                   │
│  │   │   └── terraform.tfvars                                               │
│  │   ├── staging/                                                           │
│  │   └── prod/                                                              │
│  │                                                                           │
│  ├── modules/                                                                │
│  │   ├── data-lake/                                                         │
│  │   │   ├── s3.tf           # S3 버킷, 라이프사이클                        │
│  │   │   ├── glue.tf         # Glue Catalog, Crawlers                       │
│  │   │   ├── lake-formation.tf                                              │
│  │   │   └── outputs.tf                                                     │
│  │   │                                                                       │
│  │   ├── streaming/                                                         │
│  │   │   ├── kinesis.tf      # Kinesis Streams, Firehose                   │
│  │   │   ├── msk.tf          # MSK Cluster                                  │
│  │   │   └── flink.tf        # Managed Flink                                │
│  │   │                                                                       │
│  │   ├── analytics/                                                         │
│  │   │   ├── redshift.tf     # Redshift Cluster/Serverless                 │
│  │   │   ├── athena.tf       # Athena Workgroups                           │
│  │   │   └── quicksight.tf   # QuickSight                                  │
│  │   │                                                                       │
│  │   └── governance/                                                        │
│  │       ├── iam.tf          # IAM Roles, Policies                         │
│  │       ├── kms.tf          # KMS Keys                                     │
│  │       └── monitoring.tf   # CloudWatch, Alarms                          │
│  │                                                                           │
│  └── shared/                                                                 │
│      ├── vpc.tf              # VPC, Subnets, Endpoints                      │
│      └── security-groups.tf                                                 │
└─────────────────────────────────────────────────────────────────────────────┘

5.1 Terraform 모듈 예시

Data Lake 모듈

// modules/data-lake/main.tf

# S3 Data Lake Buckets
resource "aws_s3_bucket" "data_lake" {
  for_each = toset(["raw", "curated", "analytics"])
  
  bucket = "${var.project_name}-${each.key}-${var.environment}"
  
  tags = merge(var.tags, {
    Layer = each.key
  })
}

resource "aws_s3_bucket_versioning" "data_lake" {
  for_each = aws_s3_bucket.data_lake
  bucket   = each.value.id
  
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "data_lake" {
  for_each = aws_s3_bucket.data_lake
  bucket   = each.value.id

  rule {
    id     = "transition-to-ia"
    status = "Enabled"

    transition {
      days          = 90
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 180
      storage_class = "GLACIER"
    }

    noncurrent_version_expiration {
      noncurrent_days = 30
    }
  }
}

# Glue Database
resource "aws_glue_catalog_database" "data_lake" {
  for_each = toset(["raw", "curated", "analytics"])
  
  name = "${var.project_name}_${each.key}_${var.environment}"
  
  create_table_default_permission {
    permissions = ["ALL"]
    
    principal {
      data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
    }
  }
}

# Lake Formation Registration
resource "aws_lakeformation_resource" "data_lake" {
  for_each = aws_s3_bucket.data_lake
  
  arn = each.value.arn
}

# Glue Crawler
resource "aws_glue_crawler" "data_lake" {
  for_each = {
    raw      = { schedule = "cron(0 */6 * * ? *)", path = "raw/" }
    curated  = { schedule = "cron(0 */2 * * ? *)", path = "curated/" }
  }
  
  name          = "${var.project_name}-${each.key}-crawler"
  database_name = aws_glue_catalog_database.data_lake[each.key].name
  role          = var.glue_role_arn
  schedule      = each.value.schedule

  s3_target {
    path = "s3://${aws_s3_bucket.data_lake[each.key].id}/${each.value.path}"
  }

  schema_change_policy {
    delete_behavior = "LOG"
    update_behavior = "UPDATE_IN_DATABASE"
  }

  configuration = jsonencode({
    Version = 1.0
    Grouping = {
      TableGroupingPolicy = "CombineCompatibleSchemas"
    }
  })
}

5.2 AWS CDK 예시

CDK Data Pipeline Stack

// TypeScript CDK

import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as glue from 'aws-cdk-lib/aws-glue';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as iam from 'aws-cdk-lib/aws-iam';

export class DataPipelineStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // S3 Data Lake
    const rawBucket = new s3.Bucket(this, 'RawBucket', {
      bucketName: `data-lake-raw-${this.account}`,
      versioned: true,
      encryption: s3.BucketEncryption.KMS_MANAGED,
      lifecycleRules: [
        {
          transitions: [
            { storageClass: s3.StorageClass.INFREQUENT_ACCESS, transitionAfter: cdk.Duration.days(90) },
            { storageClass: s3.StorageClass.GLACIER, transitionAfter: cdk.Duration.days(180) }
          ]
        }
      ]
    });

    // Kinesis Stream
    const eventStream = new kinesis.Stream(this, 'EventStream', {
      streamName: 'user-events',
      shardCount: 4,
      retentionPeriod: cdk.Duration.hours(24)
    });

    // Firehose Delivery Stream
    const firehoseRole = new iam.Role(this, 'FirehoseRole', {
      assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com')
    });
    rawBucket.grantWrite(firehoseRole);
    eventStream.grantRead(firehoseRole);

    const deliveryStream = new firehose.CfnDeliveryStream(this, 'DeliveryStream', {
      deliveryStreamName: 'events-to-s3',
      deliveryStreamType: 'KinesisStreamAsSource',
      kinesisStreamSourceConfiguration: {
        kinesisStreamArn: eventStream.streamArn,
        roleArn: firehoseRole.roleArn
      },
      extendedS3DestinationConfiguration: {
        bucketArn: rawBucket.bucketArn,
        roleArn: firehoseRole.roleArn,
        prefix: 'events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
        errorOutputPrefix: 'errors/',
        bufferingHints: {
          intervalInSeconds: 300,
          sizeInMBs: 128
        },
        compressionFormat: 'GZIP',
        dataFormatConversionConfiguration: {
          enabled: true,
          inputFormatConfiguration: { deserializer: { openXJsonSerDe: {} } },
          outputFormatConfiguration: { serializer: { parquetSerDe: { compression: 'SNAPPY' } } },
          schemaConfiguration: {
            databaseName: 'raw_db',
            tableName: 'events',
            roleArn: firehoseRole.roleArn
          }
        }
      }
    });

    // Glue Database
    const database = new glue.CfnDatabase(this, 'GlueDatabase', {
      catalogId: this.account,
      databaseInput: {
        name: 'raw_db',
        description: 'Raw data lake database'
      }
    });

    // Outputs
    new cdk.CfnOutput(this, 'RawBucketName', { value: rawBucket.bucketName });
    new cdk.CfnOutput(this, 'StreamName', { value: eventStream.streamName });
  }
}

IaC 도구 비교

특성TerraformAWS CDKCloudFormation
언어HCLTypeScript, Python 등YAML/JSON
멀티 클라우드❌ (AWS only)❌ (AWS only)
상태 관리tfstate (S3)CloudFormation내장
권장 사용멀티 클라우드, 팀 표준복잡한 로직, 개발자 친화AWS 네이티브

6. 핵심 요약

DataOps & MLOps 핵심 포인트

DataOps

  • • ETL 코드, 스키마, 설정을 Git으로 버전 관리
  • • CodePipeline + CodeBuild로 CI/CD 자동화
  • • 데이터 품질 테스트를 파이프라인에 통합
  • • Dev → Stage → Prod 환경 분리
  • • 블루/그린 배포로 무중단 업데이트

MLOps (SageMaker)

  • • SageMaker Pipelines: ML 워크플로우 자동화
  • • Model Registry: 모델 버전 관리 및 승인 워크플로우
  • • Feature Store: 피처 중앙 저장소 (Online/Offline)
  • • 조건부 배포: 정확도 기준 충족 시 자동 배포
  • • A/B 테스트: 엔드포인트 트래픽 분할

모니터링

  • • CloudWatch: Glue, Kinesis, Redshift 메트릭
  • • Kinesis IteratorAge: Consumer Lag 모니터링
  • • Model Monitor: Data/Model Quality, Bias Drift
  • • 알림: CloudWatch Alarm → SNS → Slack/PagerDuty

인프라 자동화

  • • Terraform: 멀티 클라우드, 모듈화
  • • AWS CDK: 프로그래밍 언어로 인프라 정의
  • • CloudFormation: AWS 네이티브
  • • 환경별 변수 분리 (dev/staging/prod)

🎯 시험 포인트

SageMaker Pipelines

ML 워크플로우 오케스트레이션, 조건부 단계 지원

Feature Store

Online (실시간) + Offline (배치) 이중 저장소

Model Monitor

Data Quality, Model Quality, Bias, Feature Attribution

Kinesis Lag

IteratorAgeMilliseconds로 Consumer 지연 감지

CI/CD for Data

CodePipeline + CodeBuild + 데이터 품질 테스트

IaC 선택

Terraform = 멀티클라우드, CDK = 복잡한 로직

AWS 서비스 매핑

기능AWS 서비스대안
CI/CDCodePipeline, CodeBuildGitHub Actions, Jenkins
ML 파이프라인SageMaker PipelinesStep Functions, Airflow
피처 저장소SageMaker Feature StoreFeast, Tecton
모델 레지스트리SageMaker Model RegistryMLflow
모델 모니터링SageMaker Model MonitorEvidently, WhyLabs
인프라 모니터링CloudWatch, EventBridgeDatadog, Grafana
IaCCloudFormation, CDKTerraform, Pulumi

DataOps/MLOps 성숙도 모델

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DataOps/MLOps Maturity Model                              │
│                                                                              │
│  Level 5: Optimized                                                          │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  • 자동 스케일링, 자가 치유                                              ││
│  │  • AI 기반 이상 탐지 및 자동 조치                                        ││
│  │  • 비용 최적화 자동화                                                    ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Level 4: Managed                                                            │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  • 완전 자동화된 CI/CD                                                   ││
│  │  • 모델 드리프트 자동 감지 및 재학습                                     ││
│  │  • 데이터 품질 게이트 통합                                               ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Level 3: Defined                                                            │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  • 표준화된 파이프라인 템플릿                                            ││
│  │  • 중앙 집중식 모니터링                                                  ││
│  │  • 환경 분리 (Dev/Stage/Prod)                                           ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Level 2: Repeatable                                                         │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  • 버전 관리 (Git)                                                       ││
│  │  • 기본 CI/CD                                                            ││
│  │  • 수동 테스트                                                           ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                              │
│  Level 1: Initial                                                            │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  • 수동 배포                                                             ││
│  │  • Ad-hoc 프로세스                                                       ││
│  │  • 문서화 부족                                                           ││
│  └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘

7. 데이터 파이프라인 테스트

데이터 파이프라인의 품질을 보장하기 위해 다양한 수준의 테스트가 필요합니다. 단위 테스트부터 통합 테스트, 데이터 품질 테스트까지 살펴봅니다.

테스트 피라미드
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Data Pipeline Test Pyramid                                │
│                                                                              │
│                           ┌─────────────┐                                   │
│                           │   E2E Test  │  • 전체 파이프라인 검증           │
│                           │             │  • 프로덕션 유사 환경             │
│                           └─────────────┘  • 느림, 비용 높음                │
│                                                                              │
│                      ┌─────────────────────┐                                │
│                      │  Integration Test   │  • 서비스 간 연동 검증         │
│                      │                     │  • 실제 AWS 서비스 사용        │
│                      └─────────────────────┘  • 중간 속도/비용              │
│                                                                              │
│                 ┌───────────────────────────────┐                           │
│                 │      Data Quality Test        │  • 데이터 품질 규칙       │
│                 │                               │  • 스키마 검증            │
│                 └───────────────────────────────┘  • 빠름                   │
│                                                                              │
│            ┌─────────────────────────────────────────┐                      │
│            │           Unit Test                     │  • ETL 로직 검증     │
│            │                                         │  • 변환 함수 테스트  │
│            └─────────────────────────────────────────┘  • 가장 빠름         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

7.1 Glue ETL 단위 테스트

pytest를 활용한 테스트

// ETL 변환 함수

# etl/transformations.py
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit, to_date

def clean_customer_data(df: DataFrame) -> DataFrame:
    """고객 데이터 정제"""
    return df \
        .filter(col("customer_id").isNotNull()) \
        .filter(col("email").rlike("^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$")) \
        .withColumn("status", 
            when(col("status").isNull(), lit("active"))
            .otherwise(col("status"))) \
        .dropDuplicates(["customer_id"])

def calculate_order_metrics(df: DataFrame) -> DataFrame:
    """주문 메트릭 계산"""
    return df \
        .withColumn("order_date", to_date(col("order_timestamp"))) \
        .withColumn("total_amount", col("quantity") * col("unit_price")) \
        .withColumn("discount_amount", 
            when(col("discount_rate").isNotNull(), 
                 col("total_amount") * col("discount_rate"))
            .otherwise(lit(0))) \
        .withColumn("final_amount", 
            col("total_amount") - col("discount_amount"))

// 단위 테스트

# tests/test_transformations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from etl.transformations import clean_customer_data, calculate_order_metrics

@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder \
        .master("local[*]") \
        .appName("ETL Tests") \
        .getOrCreate()

class TestCleanCustomerData:
    def test_removes_null_customer_id(self, spark):
        # Given
        data = [
            ("C001", "test@example.com", "active"),
            (None, "test2@example.com", "active"),
        ]
        df = spark.createDataFrame(data, ["customer_id", "email", "status"])
        
        # When
        result = clean_customer_data(df)
        
        # Then
        assert result.count() == 1
        assert result.first()["customer_id"] == "C001"

    def test_filters_invalid_email(self, spark):
        # Given
        data = [
            ("C001", "valid@example.com", "active"),
            ("C002", "invalid-email", "active"),
        ]
        df = spark.createDataFrame(data, ["customer_id", "email", "status"])
        
        # When
        result = clean_customer_data(df)
        
        # Then
        assert result.count() == 1

    def test_fills_null_status(self, spark):
        # Given
        data = [("C001", "test@example.com", None)]
        df = spark.createDataFrame(data, ["customer_id", "email", "status"])
        
        # When
        result = clean_customer_data(df)
        
        # Then
        assert result.first()["status"] == "active"

class TestCalculateOrderMetrics:
    def test_calculates_total_amount(self, spark):
        # Given
        data = [(10, 100.0, None)]
        schema = StructType([
            StructField("quantity", DoubleType()),
            StructField("unit_price", DoubleType()),
            StructField("discount_rate", DoubleType())
        ])
        df = spark.createDataFrame(data, schema)
        
        # When
        result = calculate_order_metrics(df)
        
        # Then
        assert result.first()["total_amount"] == 1000.0
        assert result.first()["discount_amount"] == 0
        assert result.first()["final_amount"] == 1000.0

    def test_applies_discount(self, spark):
        # Given
        data = [(10, 100.0, 0.1)]  # 10% 할인
        schema = StructType([
            StructField("quantity", DoubleType()),
            StructField("unit_price", DoubleType()),
            StructField("discount_rate", DoubleType())
        ])
        df = spark.createDataFrame(data, schema)
        
        # When
        result = calculate_order_metrics(df)
        
        # Then
        assert result.first()["discount_amount"] == 100.0
        assert result.first()["final_amount"] == 900.0

7.2 데이터 품질 테스트

Great Expectations 통합

// Great Expectations 설정

# expectations/orders_expectations.py
import great_expectations as gx

def create_orders_expectations(context):
    """주문 데이터 기대치 정의"""
    
    suite = context.add_expectation_suite("orders_suite")
    
    # 필수 컬럼 존재
    suite.add_expectation(
        gx.expectations.ExpectColumnToExist(column="order_id")
    )
    suite.add_expectation(
        gx.expectations.ExpectColumnToExist(column="customer_id")
    )
    
    # Null 검사
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
    )
    
    # 고유성 검사
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
    )
    
    # 값 범위 검사
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column="amount",
            min_value=0,
            max_value=1000000
        )
    )
    
    # 값 집합 검사
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeInSet(
            column="status",
            value_set=["pending", "completed", "cancelled", "refunded"]
        )
    )
    
    # 날짜 형식 검사
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToMatchStrftimeFormat(
            column="order_date",
            strftime_format="%Y-%m-%d"
        )
    )
    
    # 행 수 검사
    suite.add_expectation(
        gx.expectations.ExpectTableRowCountToBeBetween(
            min_value=1000,
            max_value=10000000
        )
    )
    
    return suite

# CI/CD에서 실행
def validate_data(df, context):
    """데이터 검증 실행"""
    results = context.run_validation_operator(
        "action_list_operator",
        assets_to_validate=[df],
        expectation_suite_name="orders_suite"
    )
    
    if not results["success"]:
        failed_expectations = [
            r for r in results["results"] 
            if not r["success"]
        ]
        raise ValueError(f"Data validation failed: {failed_expectations}")
    
    return results

테스트 유형

  • • 스키마 검증
  • • Null 검사
  • • 고유성 검사
  • • 값 범위 검사
  • • 참조 무결성

CI/CD 통합

  • • PR 시 자동 테스트
  • • 배포 전 품질 게이트
  • • 테스트 리포트 생성
  • • 실패 시 알림

8. 종합 핵심 요약

DataOps & MLOps 종합

DataOps

  • • ETL 코드, 스키마, 설정을 Git으로 버전 관리
  • • CodePipeline + CodeBuild로 CI/CD 자동화
  • • 데이터 품질 테스트를 파이프라인에 통합
  • • Dev → Stage → Prod 환경 분리
  • • 블루/그린 배포로 무중단 업데이트

MLOps (SageMaker)

  • • SageMaker Pipelines: ML 워크플로우 자동화
  • • Model Registry: 모델 버전 관리 및 승인 워크플로우
  • • Feature Store: 피처 중앙 저장소 (Online/Offline)
  • • 조건부 배포: 정확도 기준 충족 시 자동 배포
  • • A/B 테스트: 엔드포인트 트래픽 분할

모니터링

  • • CloudWatch: Glue, Kinesis, Redshift 메트릭
  • • Kinesis IteratorAge: Consumer Lag 모니터링
  • • Model Monitor: Data/Model Quality, Bias Drift
  • • 알림: CloudWatch Alarm → SNS → Slack/PagerDuty

인프라 자동화

  • • Terraform: 멀티 클라우드, 모듈화
  • • AWS CDK: 프로그래밍 언어로 인프라 정의
  • • CloudFormation: AWS 네이티브
  • • 환경별 변수 분리 (dev/staging/prod)

테스트

  • • 단위 테스트: ETL 변환 로직 검증
  • • 데이터 품질 테스트: Great Expectations
  • • 통합 테스트: 서비스 간 연동 검증
  • • E2E 테스트: 전체 파이프라인 검증

🎯 시험 포인트

SageMaker Pipelines

ML 워크플로우 오케스트레이션, 조건부 단계 지원

Feature Store

Online (실시간) + Offline (배치) 이중 저장소

Model Monitor

Data Quality, Model Quality, Bias, Feature Attribution

Kinesis Lag

IteratorAgeMilliseconds로 Consumer 지연 감지

CI/CD for Data

CodePipeline + CodeBuild + 데이터 품질 테스트

IaC 선택

Terraform = 멀티클라우드, CDK = 복잡한 로직

DataOps/MLOps 성숙도 체크리스트

Level 1-2: 기초
  • ☐ Git 버전 관리
  • ☐ 기본 CI/CD 파이프라인
  • ☐ 환경 분리 (Dev/Prod)
  • ☐ 기본 모니터링
Level 3-4: 성숙
  • ☐ 자동화된 테스트
  • ☐ 데이터 품질 게이트
  • ☐ 모델 레지스트리
  • ☐ 드리프트 모니터링
Level 5: 최적화
  • ☐ 자동 재학습
  • ☐ 자가 치유 파이프라인
  • ☐ 비용 최적화 자동화
  • ☐ AI 기반 이상 탐지
공통
  • ☐ 문서화
  • ☐ 팀 교육
  • ☐ 표준화된 템플릿
  • ☐ 정기 리뷰

9. CI/CD 파이프라인 심화

데이터 파이프라인의 CI/CD는 코드 변경부터 프로덕션 배포까지 자동화된 워크플로우를 구축합니다. AWS CodePipeline과 GitHub Actions를 활용한 구현 방법을 살펴봅니다.

AWS CodePipeline 구성
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Data Pipeline CI/CD Flow                                  │
│                                                                              │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐   │
│  │  Source │───▶│  Build  │───▶│  Test   │───▶│ Staging │───▶│  Prod   │   │
│  │ (Git)   │    │(CodeBuild)   │(Quality)│    │ Deploy  │    │ Deploy  │   │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘    └─────────┘   │
│       │              │              │              │              │          │
│       ▼              ▼              ▼              ▼              ▼          │
│  • CodeCommit   • Unit Test    • Data Quality • Glue Job    • Manual       │
│  • GitHub       • Lint         • Schema       • Step Func   • Approval     │
│  • S3           • Package      • Integration  • Lambda      • Canary       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

// Terraform CodePipeline 구성

resource "aws_codepipeline" "data_pipeline" {
  name     = "data-pipeline-cicd"
  role_arn = aws_iam_role.codepipeline.arn

  artifact_store {
    location = aws_s3_bucket.artifacts.bucket
    type     = "S3"
    
    encryption_key {
      id   = aws_kms_key.artifacts.arn
      type = "KMS"
    }
  }

  # Source Stage
  stage {
    name = "Source"
    
    action {
      name             = "Source"
      category         = "Source"
      owner            = "AWS"
      provider         = "CodeStarSourceConnection"
      version          = "1"
      output_artifacts = ["source_output"]
      
      configuration = {
        ConnectionArn    = aws_codestarconnections_connection.github.arn
        FullRepositoryId = "org/data-platform"
        BranchName       = "main"
      }
    }
  }

  # Build Stage
  stage {
    name = "Build"
    
    action {
      name             = "Build"
      category         = "Build"
      owner            = "AWS"
      provider         = "CodeBuild"
      input_artifacts  = ["source_output"]
      output_artifacts = ["build_output"]
      version          = "1"
      
      configuration = {
        ProjectName = aws_codebuild_project.build.name
      }
    }
  }

  # Test Stage
  stage {
    name = "Test"
    
    action {
      name            = "UnitTest"
      category        = "Test"
      owner           = "AWS"
      provider        = "CodeBuild"
      input_artifacts = ["build_output"]
      version         = "1"
      run_order       = 1
      
      configuration = {
        ProjectName = aws_codebuild_project.unit_test.name
      }
    }
    
    action {
      name            = "DataQualityTest"
      category        = "Test"
      owner           = "AWS"
      provider        = "CodeBuild"
      input_artifacts = ["build_output"]
      version         = "1"
      run_order       = 2
      
      configuration = {
        ProjectName = aws_codebuild_project.data_quality.name
      }
    }
  }

  # Staging Deploy
  stage {
    name = "Staging"
    
    action {
      name            = "DeployToStaging"
      category        = "Deploy"
      owner           = "AWS"
      provider        = "CloudFormation"
      input_artifacts = ["build_output"]
      version         = "1"
      
      configuration = {
        ActionMode    = "CREATE_UPDATE"
        StackName     = "data-pipeline-staging"
        TemplatePath  = "build_output::template.yaml"
        Capabilities  = "CAPABILITY_IAM,CAPABILITY_AUTO_EXPAND"
        RoleArn       = aws_iam_role.cloudformation.arn
        ParameterOverrides = jsonencode({
          Environment = "staging"
        })
      }
    }
  }

  # Production Deploy with Approval
  stage {
    name = "Production"
    
    action {
      name     = "ManualApproval"
      category = "Approval"
      owner    = "AWS"
      provider = "Manual"
      version  = "1"
      run_order = 1
      
      configuration = {
        NotificationArn = aws_sns_topic.approval.arn
        CustomData      = "Please review staging deployment before approving production"
      }
    }
    
    action {
      name            = "DeployToProduction"
      category        = "Deploy"
      owner           = "AWS"
      provider        = "CloudFormation"
      input_artifacts = ["build_output"]
      version         = "1"
      run_order       = 2
      
      configuration = {
        ActionMode    = "CREATE_UPDATE"
        StackName     = "data-pipeline-production"
        TemplatePath  = "build_output::template.yaml"
        Capabilities  = "CAPABILITY_IAM,CAPABILITY_AUTO_EXPAND"
        RoleArn       = aws_iam_role.cloudformation.arn
        ParameterOverrides = jsonencode({
          Environment = "production"
        })
      }
    }
  }
}

9.1 CodeBuild 프로젝트

빌드 및 테스트 설정

// buildspec.yml

version: 0.2

env:
  variables:
    PYTHON_VERSION: "3.9"
  secrets-manager:
    DB_PASSWORD: "prod/database:password"

phases:
  install:
    runtime-versions:
      python: 3.9
    commands:
      - pip install --upgrade pip
      - pip install -r requirements.txt
      - pip install pytest pytest-cov great_expectations

  pre_build:
    commands:
      - echo "Running linting..."
      - flake8 src/ --max-line-length=120
      - black --check src/
      - echo "Running security scan..."
      - bandit -r src/

  build:
    commands:
      - echo "Running unit tests..."
      - pytest tests/unit/ -v --cov=src --cov-report=xml
      - echo "Packaging Glue jobs..."
      - zip -r glue_jobs.zip src/glue/
      - aws s3 cp glue_jobs.zip s3://${ARTIFACT_BUCKET}/glue/${CODEBUILD_RESOLVED_SOURCE_VERSION}/

  post_build:
    commands:
      - echo "Generating CloudFormation template..."
      - python scripts/generate_cfn.py
      - echo "Build completed on $(date)"

reports:
  pytest_reports:
    files:
      - "coverage.xml"
    file-format: COBERTURAXML

artifacts:
  files:
    - template.yaml
    - parameters/*.json
  discard-paths: no

cache:
  paths:
    - '/root/.cache/pip/**/*'

// 데이터 품질 테스트 buildspec

version: 0.2

phases:
  install:
    commands:
      - pip install great_expectations boto3 pandas

  build:
    commands:
      - echo "Running data quality tests..."
      - python scripts/run_data_quality.py
      
      # Great Expectations 검증
      - |
        python << EOF
        import great_expectations as gx
        import sys
        
        context = gx.get_context()
        
        # 스테이징 데이터 검증
        checkpoint_result = context.run_checkpoint(
            checkpoint_name="staging_checkpoint"
        )
        
        if not checkpoint_result.success:
            print("Data quality check failed!")
            sys.exit(1)
        
        print("Data quality check passed!")
        EOF

  post_build:
    commands:
      - echo "Generating data quality report..."
      - great_expectations docs build --no-view

9.2 GitHub Actions 대안

GitHub Actions 워크플로우

// .github/workflows/data-pipeline.yml

name: Data Pipeline CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  AWS_REGION: ap-northeast-2

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.9'
          cache: 'pip'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov flake8 black
      
      - name: Lint
        run: |
          flake8 src/ --max-line-length=120
          black --check src/
      
      - name: Unit Tests
        run: pytest tests/unit/ -v --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          files: ./coverage.xml

  deploy-staging:
    needs: test
    if: github.ref == 'refs/heads/develop'
    runs-on: ubuntu-latest
    environment: staging
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN_STAGING }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Deploy to Staging
        run: |
          aws cloudformation deploy \
            --template-file template.yaml \
            --stack-name data-pipeline-staging \
            --parameter-overrides Environment=staging \
            --capabilities CAPABILITY_IAM

  deploy-production:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    environment: production
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ROLE_ARN_PROD }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Deploy to Production
        run: |
          aws cloudformation deploy \
            --template-file template.yaml \
            --stack-name data-pipeline-production \
            --parameter-overrides Environment=production \
            --capabilities CAPABILITY_IAM

CodePipeline 장점

  • • AWS 네이티브 통합
  • • IAM 역할 기반 보안
  • • CloudWatch 통합
  • • 수동 승인 단계

GitHub Actions 장점

  • • 코드와 함께 버전 관리
  • • 풍부한 마켓플레이스
  • • 매트릭스 빌드
  • • 커뮤니티 지원