DataOps는 데이터 파이프라인에 DevOps 원칙을 적용하여 데이터 품질, 속도, 협업을 개선하는 방법론입니다.
┌─────────────────────────────────────────────────────────────────────┐
│ DataOps Pipeline │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Code │───▶│ Build │───▶│ Test │───▶│ Deploy │ │
│ │ │ │ │ │ │ │ │ │
│ │ • ETL │ │ • Package│ │ • Unit │ │ • Dev │ │
│ │ • Schema │ │ • Docker │ │ • Data │ │ • Stage │ │
│ │ • Config │ │ • Glue │ │ • Quality│ │ • Prod │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │
│ │ ┌──────────┐ │ │
│ └──────────────│ Monitor │◀────────────────────┘ │
│ │ │ │
│ │ • Metrics│ │
│ │ • Alerts │ │
│ │ • Logs │ │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘ETL 코드, 스키마, 설정을 Git으로 관리
데이터 품질 검증을 CI/CD에 통합
Dev → Stage → Prod 단계별 배포
// CodePipeline for Glue ETL
# buildspec.yml for Glue ETL
version: 0.2
phases:
install:
runtime-versions:
python: 3.9
pre_build:
commands:
- pip install pytest boto3 awsglue-local
build:
commands:
# Unit tests
- pytest tests/unit/ -v
# Data quality tests
- python tests/data_quality_check.py
post_build:
commands:
# Deploy Glue Job
- aws glue update-job --job-name ${GLUE_JOB_NAME} \
--job-update "Command={ScriptLocation=s3://${BUCKET}/scripts/etl.py}"
# Update Glue Workflow
- aws glue update-workflow --name ${WORKFLOW_NAME}
artifacts:
files:
- scripts/**/*
- tests/reports/**/*// Terraform - CodePipeline
resource "aws_codepipeline" "data_pipeline" {
name = "data-etl-pipeline"
role_arn = aws_iam_role.codepipeline.arn
stage {
name = "Source"
action {
name = "Source"
category = "Source"
owner = "AWS"
provider = "CodeCommit"
version = "1"
output_artifacts = ["source_output"]
configuration = {
RepositoryName = "data-etl-repo"
BranchName = "main"
}
}
}
stage {
name = "Test"
action {
name = "UnitTest"
category = "Build"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["source_output"]
configuration = {
ProjectName = aws_codebuild_project.test.name
}
}
}
stage {
name = "Deploy-Dev"
action {
name = "DeployDev"
category = "Build"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["source_output"]
configuration = {
ProjectName = aws_codebuild_project.deploy.name
EnvironmentVariables = jsonencode([
{ name = "ENV", value = "dev" }
])
}
}
}
}┌─────────────────────────────────────────────────────────────────────┐
│ MLOps Pipeline │
│ │
│ Data Preparation Model Development Deployment │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Feature │ │ Training │ │ Endpoint │ │
│ │ Engineering │─────────▶│ & Tuning │─────────▶│ Deployment │ │
│ │ │ │ │ │ │ │
│ │ • S3 │ │ • SageMaker │ │ • Real-time │ │
│ │ • Glue │ │ • HPO │ │ • Batch │ │
│ │ • Feature │ │ • Experiments│ │ • Serverless│ │
│ │ Store │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Model Registry ││
│ │ • Version Control • Approval Workflow • Lineage Tracking ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Model Monitoring ││
│ │ • Data Drift • Model Drift • Bias Detection ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘// SageMaker Pipeline 정의
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
# 1. 데이터 전처리 단계
processing_step = ProcessingStep(
name="PreprocessData",
processor=sklearn_processor,
inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
outputs=[ProcessingOutput(output_name="train", source="/opt/ml/processing/train")],
code="preprocess.py"
)
# 2. 모델 학습 단계
training_step = TrainingStep(
name="TrainModel",
estimator=xgb_estimator,
inputs={"train": TrainingInput(s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri)}
)
# 3. 모델 평가 단계
evaluation_step = ProcessingStep(
name="EvaluateModel",
processor=sklearn_processor,
inputs=[ProcessingInput(source=training_step.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model")],
code="evaluate.py"
)
# 4. 조건부 배포 (정확도 > 0.8)
condition_step = ConditionStep(
name="CheckAccuracy",
conditions=[ConditionGreaterThanOrEqualTo(
left=JsonGet(step_name="EvaluateModel", property_file="evaluation.json", json_path="accuracy"),
right=0.8
)],
if_steps=[register_step, deploy_step],
else_steps=[fail_step]
)
# 파이프라인 생성
pipeline = Pipeline(
name="ml-training-pipeline",
steps=[processing_step, training_step, evaluation_step, condition_step],
sagemaker_session=sagemaker_session
)
pipeline.upsert(role_arn=role)SageMaker Feature Store는 ML 피처를 중앙에서 저장, 공유, 재사용할 수 있는 저장소입니다.
┌─────────────────────────────────────────────────────────────────────┐
│ SageMaker Feature Store │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Feature Group: customer_features ││
│ │ ├── customer_id (identifier) ││
│ │ ├── total_purchases (numeric) ││
│ │ ├── avg_order_value (numeric) ││
│ │ ├── days_since_last_order (numeric) ││
│ │ └── event_time (timestamp) ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │ │
│ ┌───────────────┴───────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Online Store │ │ Offline Store │ │
│ │ (Low Latency) │ │ (S3 + Glue) │ │
│ │ │ │ │ │
│ │ • Real-time │ │ • Batch Training │ │
│ │ Inference │ │ • Historical │ │
│ │ • < 10ms │ │ Analysis │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────────┐
│ Data Pipeline Observability │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Glue ETL Metrics ││
│ │ ├── glue.driver.aggregate.numCompletedTasks ││
│ │ ├── glue.driver.aggregate.numFailedTasks ││
│ │ ├── glue.driver.ExecutorAllocationManager.executors.numberAll ││
│ │ └── glue.driver.jvm.heap.usage ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Kinesis Metrics ││
│ │ ├── IncomingRecords / IncomingBytes ││
│ │ ├── GetRecords.IteratorAgeMilliseconds (Consumer Lag) ││
│ │ ├── WriteProvisionedThroughputExceeded ││
│ │ └── ReadProvisionedThroughputExceeded ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Redshift Metrics ││
│ │ ├── CPUUtilization / PercentageDiskSpaceUsed ││
│ │ ├── DatabaseConnections ││
│ │ ├── QueryDuration / QueriesCompletedPerSecond ││
│ │ └── WLMQueueLength ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘// CloudWatch Alarm 설정
resource "aws_cloudwatch_metric_alarm" "glue_job_failure" {
alarm_name = "glue-etl-job-failure"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "glue.driver.aggregate.numFailedTasks"
namespace = "Glue"
period = 300
statistic = "Sum"
threshold = 0
alarm_description = "Glue ETL job has failed tasks"
dimensions = {
JobName = aws_glue_job.etl.name
Type = "gauge"
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
resource "aws_cloudwatch_metric_alarm" "kinesis_lag" {
alarm_name = "kinesis-consumer-lag"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 3
metric_name = "GetRecords.IteratorAgeMilliseconds"
namespace = "AWS/Kinesis"
period = 60
statistic = "Maximum"
threshold = 60000 # 1분 이상 지연
dimensions = {
StreamName = aws_kinesis_stream.events.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}┌─────────────────────────────────────────────────────────────────────┐
│ Model Monitoring Types │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 1. Data Quality Monitoring ││
│ │ • 입력 데이터의 통계적 특성 변화 감지 ││
│ │ • Baseline vs Current 비교 ││
│ │ • Missing values, data types, distribution ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 2. Model Quality Monitoring ││
│ │ • 예측 정확도 추적 (Ground Truth 필요) ││
│ │ • Accuracy, Precision, Recall, F1 ││
│ │ • Regression: MSE, MAE, R² ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 3. Bias Drift Monitoring ││
│ │ • 모델 편향 변화 감지 ││
│ │ • Demographic parity, Equalized odds ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ 4. Feature Attribution Drift ││
│ │ • Feature importance 변화 감지 ││
│ │ • SHAP values 비교 ││
│ └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘// Model Monitor 설정
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
# Data Quality Monitor
data_quality_monitor = DefaultModelMonitor(
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
volume_size_in_gb=20,
max_runtime_in_seconds=3600
)
# Baseline 생성
data_quality_monitor.suggest_baseline(
baseline_dataset=baseline_data_uri,
dataset_format=DatasetFormat.csv(header=True)
)
# 모니터링 스케줄 생성
data_quality_monitor.create_monitoring_schedule(
monitor_schedule_name="data-quality-schedule",
endpoint_input=endpoint_name,
output_s3_uri=f"s3://{bucket}/monitoring/data-quality",
statistics=data_quality_monitor.baseline_statistics(),
constraints=data_quality_monitor.suggested_constraints(),
schedule_cron_expression="cron(0 * ? * * *)" # 매시간
)SageMaker Pipelines
ML 워크플로우 오케스트레이션, 조건부 단계 지원
Feature Store
Online (실시간) + Offline (배치) 이중 저장소
Model Monitor
Data Quality, Model Quality, Bias, Feature Attribution
Kinesis Lag
IteratorAgeMilliseconds로 Consumer 지연 감지
| 기능 | AWS 서비스 |
|---|---|
| CI/CD | CodePipeline, CodeBuild, CodeCommit |
| ML 파이프라인 | SageMaker Pipelines, Step Functions |
| 피처 저장소 | SageMaker Feature Store |
| 모델 레지스트리 | SageMaker Model Registry |
| 모델 모니터링 | SageMaker Model Monitor |
| 인프라 모니터링 | CloudWatch, EventBridge, SNS |
데이터 플랫폼 인프라를 코드로 관리하면 일관성, 재현성, 버전 관리가 가능합니다. Terraform, CDK, CloudFormation을 활용한 IaC 패턴을 살펴봅니다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Data Platform IaC Structure │
│ │
│ terraform/ │
│ ├── environments/ │
│ │ ├── dev/ │
│ │ │ ├── main.tf │
│ │ │ ├── variables.tf │
│ │ │ └── terraform.tfvars │
│ │ ├── staging/ │
│ │ └── prod/ │
│ │ │
│ ├── modules/ │
│ │ ├── data-lake/ │
│ │ │ ├── s3.tf # S3 버킷, 라이프사이클 │
│ │ │ ├── glue.tf # Glue Catalog, Crawlers │
│ │ │ ├── lake-formation.tf │
│ │ │ └── outputs.tf │
│ │ │ │
│ │ ├── streaming/ │
│ │ │ ├── kinesis.tf # Kinesis Streams, Firehose │
│ │ │ ├── msk.tf # MSK Cluster │
│ │ │ └── flink.tf # Managed Flink │
│ │ │ │
│ │ ├── analytics/ │
│ │ │ ├── redshift.tf # Redshift Cluster/Serverless │
│ │ │ ├── athena.tf # Athena Workgroups │
│ │ │ └── quicksight.tf # QuickSight │
│ │ │ │
│ │ └── governance/ │
│ │ ├── iam.tf # IAM Roles, Policies │
│ │ ├── kms.tf # KMS Keys │
│ │ └── monitoring.tf # CloudWatch, Alarms │
│ │ │
│ └── shared/ │
│ ├── vpc.tf # VPC, Subnets, Endpoints │
│ └── security-groups.tf │
└─────────────────────────────────────────────────────────────────────────────┘// modules/data-lake/main.tf
# S3 Data Lake Buckets
resource "aws_s3_bucket" "data_lake" {
for_each = toset(["raw", "curated", "analytics"])
bucket = "${var.project_name}-${each.key}-${var.environment}"
tags = merge(var.tags, {
Layer = each.key
})
}
resource "aws_s3_bucket_versioning" "data_lake" {
for_each = aws_s3_bucket.data_lake
bucket = each.value.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "data_lake" {
for_each = aws_s3_bucket.data_lake
bucket = each.value.id
rule {
id = "transition-to-ia"
status = "Enabled"
transition {
days = 90
storage_class = "STANDARD_IA"
}
transition {
days = 180
storage_class = "GLACIER"
}
noncurrent_version_expiration {
noncurrent_days = 30
}
}
}
# Glue Database
resource "aws_glue_catalog_database" "data_lake" {
for_each = toset(["raw", "curated", "analytics"])
name = "${var.project_name}_${each.key}_${var.environment}"
create_table_default_permission {
permissions = ["ALL"]
principal {
data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
}
}
}
# Lake Formation Registration
resource "aws_lakeformation_resource" "data_lake" {
for_each = aws_s3_bucket.data_lake
arn = each.value.arn
}
# Glue Crawler
resource "aws_glue_crawler" "data_lake" {
for_each = {
raw = { schedule = "cron(0 */6 * * ? *)", path = "raw/" }
curated = { schedule = "cron(0 */2 * * ? *)", path = "curated/" }
}
name = "${var.project_name}-${each.key}-crawler"
database_name = aws_glue_catalog_database.data_lake[each.key].name
role = var.glue_role_arn
schedule = each.value.schedule
s3_target {
path = "s3://${aws_s3_bucket.data_lake[each.key].id}/${each.value.path}"
}
schema_change_policy {
delete_behavior = "LOG"
update_behavior = "UPDATE_IN_DATABASE"
}
configuration = jsonencode({
Version = 1.0
Grouping = {
TableGroupingPolicy = "CombineCompatibleSchemas"
}
})
}// TypeScript CDK
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as glue from 'aws-cdk-lib/aws-glue';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as iam from 'aws-cdk-lib/aws-iam';
export class DataPipelineStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// S3 Data Lake
const rawBucket = new s3.Bucket(this, 'RawBucket', {
bucketName: `data-lake-raw-${this.account}`,
versioned: true,
encryption: s3.BucketEncryption.KMS_MANAGED,
lifecycleRules: [
{
transitions: [
{ storageClass: s3.StorageClass.INFREQUENT_ACCESS, transitionAfter: cdk.Duration.days(90) },
{ storageClass: s3.StorageClass.GLACIER, transitionAfter: cdk.Duration.days(180) }
]
}
]
});
// Kinesis Stream
const eventStream = new kinesis.Stream(this, 'EventStream', {
streamName: 'user-events',
shardCount: 4,
retentionPeriod: cdk.Duration.hours(24)
});
// Firehose Delivery Stream
const firehoseRole = new iam.Role(this, 'FirehoseRole', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com')
});
rawBucket.grantWrite(firehoseRole);
eventStream.grantRead(firehoseRole);
const deliveryStream = new firehose.CfnDeliveryStream(this, 'DeliveryStream', {
deliveryStreamName: 'events-to-s3',
deliveryStreamType: 'KinesisStreamAsSource',
kinesisStreamSourceConfiguration: {
kinesisStreamArn: eventStream.streamArn,
roleArn: firehoseRole.roleArn
},
extendedS3DestinationConfiguration: {
bucketArn: rawBucket.bucketArn,
roleArn: firehoseRole.roleArn,
prefix: 'events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
errorOutputPrefix: 'errors/',
bufferingHints: {
intervalInSeconds: 300,
sizeInMBs: 128
},
compressionFormat: 'GZIP',
dataFormatConversionConfiguration: {
enabled: true,
inputFormatConfiguration: { deserializer: { openXJsonSerDe: {} } },
outputFormatConfiguration: { serializer: { parquetSerDe: { compression: 'SNAPPY' } } },
schemaConfiguration: {
databaseName: 'raw_db',
tableName: 'events',
roleArn: firehoseRole.roleArn
}
}
}
});
// Glue Database
const database = new glue.CfnDatabase(this, 'GlueDatabase', {
catalogId: this.account,
databaseInput: {
name: 'raw_db',
description: 'Raw data lake database'
}
});
// Outputs
new cdk.CfnOutput(this, 'RawBucketName', { value: rawBucket.bucketName });
new cdk.CfnOutput(this, 'StreamName', { value: eventStream.streamName });
}
}| 특성 | Terraform | AWS CDK | CloudFormation |
|---|---|---|---|
| 언어 | HCL | TypeScript, Python 등 | YAML/JSON |
| 멀티 클라우드 | ✅ | ❌ (AWS only) | ❌ (AWS only) |
| 상태 관리 | tfstate (S3) | CloudFormation | 내장 |
| 권장 사용 | 멀티 클라우드, 팀 표준 | 복잡한 로직, 개발자 친화 | AWS 네이티브 |
SageMaker Pipelines
ML 워크플로우 오케스트레이션, 조건부 단계 지원
Feature Store
Online (실시간) + Offline (배치) 이중 저장소
Model Monitor
Data Quality, Model Quality, Bias, Feature Attribution
Kinesis Lag
IteratorAgeMilliseconds로 Consumer 지연 감지
CI/CD for Data
CodePipeline + CodeBuild + 데이터 품질 테스트
IaC 선택
Terraform = 멀티클라우드, CDK = 복잡한 로직
| 기능 | AWS 서비스 | 대안 |
|---|---|---|
| CI/CD | CodePipeline, CodeBuild | GitHub Actions, Jenkins |
| ML 파이프라인 | SageMaker Pipelines | Step Functions, Airflow |
| 피처 저장소 | SageMaker Feature Store | Feast, Tecton |
| 모델 레지스트리 | SageMaker Model Registry | MLflow |
| 모델 모니터링 | SageMaker Model Monitor | Evidently, WhyLabs |
| 인프라 모니터링 | CloudWatch, EventBridge | Datadog, Grafana |
| IaC | CloudFormation, CDK | Terraform, Pulumi |
┌─────────────────────────────────────────────────────────────────────────────┐
│ DataOps/MLOps Maturity Model │
│ │
│ Level 5: Optimized │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ • 자동 스케일링, 자가 치유 ││
│ │ • AI 기반 이상 탐지 및 자동 조치 ││
│ │ • 비용 최적화 자동화 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Level 4: Managed │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ • 완전 자동화된 CI/CD ││
│ │ • 모델 드리프트 자동 감지 및 재학습 ││
│ │ • 데이터 품질 게이트 통합 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Level 3: Defined │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ • 표준화된 파이프라인 템플릿 ││
│ │ • 중앙 집중식 모니터링 ││
│ │ • 환경 분리 (Dev/Stage/Prod) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Level 2: Repeatable │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ • 버전 관리 (Git) ││
│ │ • 기본 CI/CD ││
│ │ • 수동 테스트 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Level 1: Initial │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ • 수동 배포 ││
│ │ • Ad-hoc 프로세스 ││
│ │ • 문서화 부족 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────────────┘데이터 파이프라인의 품질을 보장하기 위해 다양한 수준의 테스트가 필요합니다. 단위 테스트부터 통합 테스트, 데이터 품질 테스트까지 살펴봅니다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Data Pipeline Test Pyramid │
│ │
│ ┌─────────────┐ │
│ │ E2E Test │ • 전체 파이프라인 검증 │
│ │ │ • 프로덕션 유사 환경 │
│ └─────────────┘ • 느림, 비용 높음 │
│ │
│ ┌─────────────────────┐ │
│ │ Integration Test │ • 서비스 간 연동 검증 │
│ │ │ • 실제 AWS 서비스 사용 │
│ └─────────────────────┘ • 중간 속도/비용 │
│ │
│ ┌───────────────────────────────┐ │
│ │ Data Quality Test │ • 데이터 품질 규칙 │
│ │ │ • 스키마 검증 │
│ └───────────────────────────────┘ • 빠름 │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Unit Test │ • ETL 로직 검증 │
│ │ │ • 변환 함수 테스트 │
│ └─────────────────────────────────────────┘ • 가장 빠름 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘// ETL 변환 함수
# etl/transformations.py
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit, to_date
def clean_customer_data(df: DataFrame) -> DataFrame:
"""고객 데이터 정제"""
return df \
.filter(col("customer_id").isNotNull()) \
.filter(col("email").rlike("^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$")) \
.withColumn("status",
when(col("status").isNull(), lit("active"))
.otherwise(col("status"))) \
.dropDuplicates(["customer_id"])
def calculate_order_metrics(df: DataFrame) -> DataFrame:
"""주문 메트릭 계산"""
return df \
.withColumn("order_date", to_date(col("order_timestamp"))) \
.withColumn("total_amount", col("quantity") * col("unit_price")) \
.withColumn("discount_amount",
when(col("discount_rate").isNotNull(),
col("total_amount") * col("discount_rate"))
.otherwise(lit(0))) \
.withColumn("final_amount",
col("total_amount") - col("discount_amount"))// 단위 테스트
# tests/test_transformations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from etl.transformations import clean_customer_data, calculate_order_metrics
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder \
.master("local[*]") \
.appName("ETL Tests") \
.getOrCreate()
class TestCleanCustomerData:
def test_removes_null_customer_id(self, spark):
# Given
data = [
("C001", "test@example.com", "active"),
(None, "test2@example.com", "active"),
]
df = spark.createDataFrame(data, ["customer_id", "email", "status"])
# When
result = clean_customer_data(df)
# Then
assert result.count() == 1
assert result.first()["customer_id"] == "C001"
def test_filters_invalid_email(self, spark):
# Given
data = [
("C001", "valid@example.com", "active"),
("C002", "invalid-email", "active"),
]
df = spark.createDataFrame(data, ["customer_id", "email", "status"])
# When
result = clean_customer_data(df)
# Then
assert result.count() == 1
def test_fills_null_status(self, spark):
# Given
data = [("C001", "test@example.com", None)]
df = spark.createDataFrame(data, ["customer_id", "email", "status"])
# When
result = clean_customer_data(df)
# Then
assert result.first()["status"] == "active"
class TestCalculateOrderMetrics:
def test_calculates_total_amount(self, spark):
# Given
data = [(10, 100.0, None)]
schema = StructType([
StructField("quantity", DoubleType()),
StructField("unit_price", DoubleType()),
StructField("discount_rate", DoubleType())
])
df = spark.createDataFrame(data, schema)
# When
result = calculate_order_metrics(df)
# Then
assert result.first()["total_amount"] == 1000.0
assert result.first()["discount_amount"] == 0
assert result.first()["final_amount"] == 1000.0
def test_applies_discount(self, spark):
# Given
data = [(10, 100.0, 0.1)] # 10% 할인
schema = StructType([
StructField("quantity", DoubleType()),
StructField("unit_price", DoubleType()),
StructField("discount_rate", DoubleType())
])
df = spark.createDataFrame(data, schema)
# When
result = calculate_order_metrics(df)
# Then
assert result.first()["discount_amount"] == 100.0
assert result.first()["final_amount"] == 900.0// Great Expectations 설정
# expectations/orders_expectations.py
import great_expectations as gx
def create_orders_expectations(context):
"""주문 데이터 기대치 정의"""
suite = context.add_expectation_suite("orders_suite")
# 필수 컬럼 존재
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="customer_id")
)
# Null 검사
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
# 고유성 검사
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# 값 범위 검사
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount",
min_value=0,
max_value=1000000
)
)
# 값 집합 검사
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "completed", "cancelled", "refunded"]
)
)
# 날짜 형식 검사
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchStrftimeFormat(
column="order_date",
strftime_format="%Y-%m-%d"
)
)
# 행 수 검사
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=10000000
)
)
return suite
# CI/CD에서 실행
def validate_data(df, context):
"""데이터 검증 실행"""
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[df],
expectation_suite_name="orders_suite"
)
if not results["success"]:
failed_expectations = [
r for r in results["results"]
if not r["success"]
]
raise ValueError(f"Data validation failed: {failed_expectations}")
return resultsSageMaker Pipelines
ML 워크플로우 오케스트레이션, 조건부 단계 지원
Feature Store
Online (실시간) + Offline (배치) 이중 저장소
Model Monitor
Data Quality, Model Quality, Bias, Feature Attribution
Kinesis Lag
IteratorAgeMilliseconds로 Consumer 지연 감지
CI/CD for Data
CodePipeline + CodeBuild + 데이터 품질 테스트
IaC 선택
Terraform = 멀티클라우드, CDK = 복잡한 로직
데이터 파이프라인의 CI/CD는 코드 변경부터 프로덕션 배포까지 자동화된 워크플로우를 구축합니다. AWS CodePipeline과 GitHub Actions를 활용한 구현 방법을 살펴봅니다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Data Pipeline CI/CD Flow │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Source │───▶│ Build │───▶│ Test │───▶│ Staging │───▶│ Prod │ │
│ │ (Git) │ │(CodeBuild) │(Quality)│ │ Deploy │ │ Deploy │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ • CodeCommit • Unit Test • Data Quality • Glue Job • Manual │
│ • GitHub • Lint • Schema • Step Func • Approval │
│ • S3 • Package • Integration • Lambda • Canary │
│ │
└─────────────────────────────────────────────────────────────────────────────┘// Terraform CodePipeline 구성
resource "aws_codepipeline" "data_pipeline" {
name = "data-pipeline-cicd"
role_arn = aws_iam_role.codepipeline.arn
artifact_store {
location = aws_s3_bucket.artifacts.bucket
type = "S3"
encryption_key {
id = aws_kms_key.artifacts.arn
type = "KMS"
}
}
# Source Stage
stage {
name = "Source"
action {
name = "Source"
category = "Source"
owner = "AWS"
provider = "CodeStarSourceConnection"
version = "1"
output_artifacts = ["source_output"]
configuration = {
ConnectionArn = aws_codestarconnections_connection.github.arn
FullRepositoryId = "org/data-platform"
BranchName = "main"
}
}
}
# Build Stage
stage {
name = "Build"
action {
name = "Build"
category = "Build"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["source_output"]
output_artifacts = ["build_output"]
version = "1"
configuration = {
ProjectName = aws_codebuild_project.build.name
}
}
}
# Test Stage
stage {
name = "Test"
action {
name = "UnitTest"
category = "Test"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["build_output"]
version = "1"
run_order = 1
configuration = {
ProjectName = aws_codebuild_project.unit_test.name
}
}
action {
name = "DataQualityTest"
category = "Test"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["build_output"]
version = "1"
run_order = 2
configuration = {
ProjectName = aws_codebuild_project.data_quality.name
}
}
}
# Staging Deploy
stage {
name = "Staging"
action {
name = "DeployToStaging"
category = "Deploy"
owner = "AWS"
provider = "CloudFormation"
input_artifacts = ["build_output"]
version = "1"
configuration = {
ActionMode = "CREATE_UPDATE"
StackName = "data-pipeline-staging"
TemplatePath = "build_output::template.yaml"
Capabilities = "CAPABILITY_IAM,CAPABILITY_AUTO_EXPAND"
RoleArn = aws_iam_role.cloudformation.arn
ParameterOverrides = jsonencode({
Environment = "staging"
})
}
}
}
# Production Deploy with Approval
stage {
name = "Production"
action {
name = "ManualApproval"
category = "Approval"
owner = "AWS"
provider = "Manual"
version = "1"
run_order = 1
configuration = {
NotificationArn = aws_sns_topic.approval.arn
CustomData = "Please review staging deployment before approving production"
}
}
action {
name = "DeployToProduction"
category = "Deploy"
owner = "AWS"
provider = "CloudFormation"
input_artifacts = ["build_output"]
version = "1"
run_order = 2
configuration = {
ActionMode = "CREATE_UPDATE"
StackName = "data-pipeline-production"
TemplatePath = "build_output::template.yaml"
Capabilities = "CAPABILITY_IAM,CAPABILITY_AUTO_EXPAND"
RoleArn = aws_iam_role.cloudformation.arn
ParameterOverrides = jsonencode({
Environment = "production"
})
}
}
}
}// buildspec.yml
version: 0.2
env:
variables:
PYTHON_VERSION: "3.9"
secrets-manager:
DB_PASSWORD: "prod/database:password"
phases:
install:
runtime-versions:
python: 3.9
commands:
- pip install --upgrade pip
- pip install -r requirements.txt
- pip install pytest pytest-cov great_expectations
pre_build:
commands:
- echo "Running linting..."
- flake8 src/ --max-line-length=120
- black --check src/
- echo "Running security scan..."
- bandit -r src/
build:
commands:
- echo "Running unit tests..."
- pytest tests/unit/ -v --cov=src --cov-report=xml
- echo "Packaging Glue jobs..."
- zip -r glue_jobs.zip src/glue/
- aws s3 cp glue_jobs.zip s3://${ARTIFACT_BUCKET}/glue/${CODEBUILD_RESOLVED_SOURCE_VERSION}/
post_build:
commands:
- echo "Generating CloudFormation template..."
- python scripts/generate_cfn.py
- echo "Build completed on $(date)"
reports:
pytest_reports:
files:
- "coverage.xml"
file-format: COBERTURAXML
artifacts:
files:
- template.yaml
- parameters/*.json
discard-paths: no
cache:
paths:
- '/root/.cache/pip/**/*'// 데이터 품질 테스트 buildspec
version: 0.2
phases:
install:
commands:
- pip install great_expectations boto3 pandas
build:
commands:
- echo "Running data quality tests..."
- python scripts/run_data_quality.py
# Great Expectations 검증
- |
python << EOF
import great_expectations as gx
import sys
context = gx.get_context()
# 스테이징 데이터 검증
checkpoint_result = context.run_checkpoint(
checkpoint_name="staging_checkpoint"
)
if not checkpoint_result.success:
print("Data quality check failed!")
sys.exit(1)
print("Data quality check passed!")
EOF
post_build:
commands:
- echo "Generating data quality report..."
- great_expectations docs build --no-view// .github/workflows/data-pipeline.yml
name: Data Pipeline CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
AWS_REGION: ap-northeast-2
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov flake8 black
- name: Lint
run: |
flake8 src/ --max-line-length=120
black --check src/
- name: Unit Tests
run: pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./coverage.xml
deploy-staging:
needs: test
if: github.ref == 'refs/heads/develop'
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN_STAGING }}
aws-region: ${{ env.AWS_REGION }}
- name: Deploy to Staging
run: |
aws cloudformation deploy \
--template-file template.yaml \
--stack-name data-pipeline-staging \
--parameter-overrides Environment=staging \
--capabilities CAPABILITY_IAM
deploy-production:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN_PROD }}
aws-region: ${{ env.AWS_REGION }}
- name: Deploy to Production
run: |
aws cloudformation deploy \
--template-file template.yaml \
--stack-name data-pipeline-production \
--parameter-overrides Environment=production \
--capabilities CAPABILITY_IAM