DDD 이론 11: CQRS
Command와 Query의 책임을 분리하여 읽기와 쓰기를 독립적으로 최적화하는 아키텍처 패턴을 학습합니다.
- • CQS와 CQRS의 차이를 이해한다
- • Command와 Query 핸들러를 구현할 수 있다
- • Read Model과 Projection을 설계할 수 있다
- • 최종 일관성 지연을 처리하는 방법을 익힌다
- • CQRS 적용 시나리오와 트레이드오프를 판단할 수 있다
1. CQRS의 본질
"모든 메서드는 상태를 변경하는 Command이거나, 데이터를 반환하는 Query여야 한다. 둘 다 해서는 안 된다."
— Bertrand Meyer, CQS 원칙
1.1 CQS에서 CQRS로
CQS (Command Query Separation)
메서드 수준의 분리. 하나의 객체 내에서 Command와 Query 메서드를 구분.
class Account {
// Command: 상태 변경, void 반환
deposit(amount: Money): void
// Query: 상태 조회, 값 반환
getBalance(): Money
}CQRS (Command Query Responsibility Segregation)
아키텍처 수준의 분리. 읽기와 쓰기를 완전히 다른 모델로 분리.
// Write Model (Command)
class OrderAggregate { ... }
// Read Model (Query)
class OrderSummaryView { ... }
class OrderDetailView { ... }1.2 왜 CQRS인가?
🔥 문제: 읽기와 쓰기의 요구사항 충돌
- • 쓰기: 복잡한 비즈니스 규칙, 트랜잭션 일관성, 정규화된 데이터
- • 읽기: 빠른 응답, 다양한 뷰, 비정규화된 데이터, 캐싱
- • 하나의 모델로 둘 다 만족시키기 어려움
✅ 해결: 책임 분리
- • Command Model: 도메인 로직에 최적화, Aggregate 중심
- • Query Model: 조회 성능에 최적화, 화면/API 중심
- • 각 모델을 독립적으로 최적화 가능
1.3 CQRS 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐ │ CQRS Architecture │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ [Client] │ │ / \ │ │ / \ │ │ Commands Queries │ │ ↓ ↓ │ │ ┌───────────────────┐ ┌───────────────────┐ │ │ │ Command Handler │ │ Query Handler │ │ │ │ │ │ │ │ │ │ - Validation │ │ - Simple Read │ │ │ │ - Business Logic │ │ - No Logic │ │ │ │ - Domain Events │ │ - Optimized │ │ │ └─────────┬─────────┘ └─────────┬─────────┘ │ │ │ │ │ │ ↓ ↓ │ │ ┌───────────────────┐ ┌───────────────────┐ │ │ │ Write Model │ │ Read Model │ │ │ │ (Aggregates) │ │ (Views/DTOs) │ │ │ └─────────┬─────────┘ └─────────┬─────────┘ │ │ │ ↑ │ │ ↓ │ │ │ ┌───────────────────┐ │ │ │ │ Write Database │───Events───┘ │ │ │ (Normalized) │ Sync │ │ └───────────────────┘ │ │ ┌───────────────────┐ │ │ │ Read Database │ │ │ │ (Denormalized) │ │ │ └───────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘
1.4 CQRS 적용 수준
| 수준 | 설명 | 복잡도 |
|---|---|---|
| Level 1: 코드 분리 | Command/Query 핸들러만 분리, 같은 DB | 낮음 |
| Level 2: 모델 분리 | Write/Read 모델 분리, 같은 DB | 중간 |
| Level 3: DB 분리 | Write/Read DB 분리, 동기화 필요 | 높음 |
| Level 4: Event Sourcing | 이벤트 저장소 + Projection | 매우 높음 |
2. Command와 Query 구현
2.1 Command 구현
// Command 정의 - 의도를 명확히 표현
interface Command {
readonly commandId: string;
readonly timestamp: Date;
}
class CreateOrderCommand implements Command {
readonly commandId = crypto.randomUUID();
readonly timestamp = new Date();
constructor(
readonly customerId: string,
readonly items: Array<{
productId: string;
quantity: number;
}>,
readonly shippingAddress: {
street: string;
city: string;
zipCode: string;
}
) {}
}
class ConfirmOrderCommand implements Command {
readonly commandId = crypto.randomUUID();
readonly timestamp = new Date();
constructor(readonly orderId: string) {}
}
class CancelOrderCommand implements Command {
readonly commandId = crypto.randomUUID();
readonly timestamp = new Date();
constructor(
readonly orderId: string,
readonly reason: string
) {}
}
// Command Handler
interface CommandHandler<T extends Command> {
handle(command: T): Promise<void>;
}
class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
constructor(
private readonly orderRepository: OrderRepository,
private readonly productRepository: ProductRepository,
private readonly eventPublisher: EventPublisher
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// 1. 유효성 검증
const products = await this.productRepository.findByIds(
command.items.map(i => i.productId)
);
if (products.length !== command.items.length) {
throw new ProductNotFoundException();
}
// 2. Aggregate 생성
const order = Order.create({
customerId: CustomerId.from(command.customerId),
items: command.items.map(item => {
const product = products.find(p => p.id.value === item.productId)!;
return OrderItem.create({
productId: product.id,
productName: product.name,
quantity: Quantity.of(item.quantity),
unitPrice: product.price
});
}),
shippingAddress: Address.create(command.shippingAddress)
});
// 3. 저장
await this.orderRepository.save(order);
// 4. 이벤트 발행
for (const event of order.domainEvents) {
await this.eventPublisher.publish(event);
}
order.clearDomainEvents();
}
}
// Command Bus
class CommandBus {
private handlers = new Map<string, CommandHandler<any>>();
register<T extends Command>(
commandType: new (...args: any[]) => T,
handler: CommandHandler<T>
): void {
this.handlers.set(commandType.name, handler);
}
async dispatch<T extends Command>(command: T): Promise<void> {
const handler = this.handlers.get(command.constructor.name);
if (!handler) {
throw new Error(`No handler for ${command.constructor.name}`);
}
await handler.handle(command);
}
}2.2 Query 구현
// Query 정의
interface Query<TResult> {
readonly queryId: string;
}
// 다양한 Query 타입
class GetOrderByIdQuery implements Query<OrderDetailDto> {
readonly queryId = crypto.randomUUID();
constructor(readonly orderId: string) {}
}
class GetOrdersForCustomerQuery implements Query<OrderSummaryDto[]> {
readonly queryId = crypto.randomUUID();
constructor(
readonly customerId: string,
readonly page: number = 1,
readonly pageSize: number = 20
) {}
}
class SearchOrdersQuery implements Query<PaginatedResult<OrderSummaryDto>> {
readonly queryId = crypto.randomUUID();
constructor(
readonly filters: {
status?: OrderStatus[];
dateFrom?: Date;
dateTo?: Date;
minAmount?: number;
maxAmount?: number;
},
readonly sortBy: 'createdAt' | 'totalAmount' = 'createdAt',
readonly sortOrder: 'asc' | 'desc' = 'desc',
readonly page: number = 1,
readonly pageSize: number = 20
) {}
}
// Read Model (DTO)
interface OrderSummaryDto {
orderId: string;
customerName: string;
status: string;
itemCount: number;
totalAmount: number;
createdAt: Date;
}
interface OrderDetailDto {
orderId: string;
customer: {
id: string;
name: string;
email: string;
};
items: Array<{
productId: string;
productName: string;
quantity: number;
unitPrice: number;
subtotal: number;
}>;
shippingAddress: {
street: string;
city: string;
zipCode: string;
};
status: string;
totalAmount: number;
createdAt: Date;
confirmedAt?: Date;
shippedAt?: Date;
}
// Query Handler
interface QueryHandler<TQuery extends Query<TResult>, TResult> {
handle(query: TQuery): Promise<TResult>;
}
class GetOrderByIdHandler implements QueryHandler<GetOrderByIdQuery, OrderDetailDto> {
constructor(private readonly readDb: ReadDatabase) {}
async handle(query: GetOrderByIdQuery): Promise<OrderDetailDto> {
// Read Model에서 직접 조회 (비정규화된 데이터)
const result = await this.readDb.query<OrderDetailDto>(`
SELECT
o.id as "orderId",
json_build_object(
'id', c.id,
'name', c.name,
'email', c.email
) as customer,
o.items,
o.shipping_address as "shippingAddress",
o.status,
o.total_amount as "totalAmount",
o.created_at as "createdAt",
o.confirmed_at as "confirmedAt",
o.shipped_at as "shippedAt"
FROM order_read_model o
JOIN customer_read_model c ON o.customer_id = c.id
WHERE o.id = $1
`, [query.orderId]);
if (!result) {
throw new OrderNotFoundException(query.orderId);
}
return result;
}
}
// Query Bus
class QueryBus {
private handlers = new Map<string, QueryHandler<any, any>>();
register<TQuery extends Query<TResult>, TResult>(
queryType: new (...args: any[]) => TQuery,
handler: QueryHandler<TQuery, TResult>
): void {
this.handlers.set(queryType.name, handler);
}
async dispatch<TResult>(query: Query<TResult>): Promise<TResult> {
const handler = this.handlers.get(query.constructor.name);
if (!handler) {
throw new Error(`No handler for ${query.constructor.name}`);
}
return handler.handle(query);
}
}2.3 API 계층 통합
// REST API Controller
@Controller('/orders')
class OrderController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus
) {}
// Command: POST, PUT, DELETE
@Post('/')
async createOrder(@Body() body: CreateOrderRequest): Promise<{ orderId: string }> {
const command = new CreateOrderCommand(
body.customerId,
body.items,
body.shippingAddress
);
await this.commandBus.dispatch(command);
// Command는 void 반환, ID는 별도 방식으로 전달
return { orderId: command.commandId };
}
@Post('/:id/confirm')
async confirmOrder(@Param('id') orderId: string): Promise<void> {
await this.commandBus.dispatch(new ConfirmOrderCommand(orderId));
}
// Query: GET
@Get('/:id')
async getOrder(@Param('id') orderId: string): Promise<OrderDetailDto> {
return this.queryBus.dispatch(new GetOrderByIdQuery(orderId));
}
@Get('/')
async searchOrders(
@Query() filters: SearchOrdersRequest
): Promise<PaginatedResult<OrderSummaryDto>> {
return this.queryBus.dispatch(new SearchOrdersQuery(
filters,
filters.sortBy,
filters.sortOrder,
filters.page,
filters.pageSize
));
}
}
// GraphQL Resolver (대안)
@Resolver('Order')
class OrderResolver {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus
) {}
// Mutation = Command
@Mutation()
async createOrder(@Args('input') input: CreateOrderInput): Promise<string> {
const command = new CreateOrderCommand(
input.customerId,
input.items,
input.shippingAddress
);
await this.commandBus.dispatch(command);
return command.commandId;
}
// Query = Query
@Query()
async order(@Args('id') id: string): Promise<OrderDetailDto> {
return this.queryBus.dispatch(new GetOrderByIdQuery(id));
}
}3. Read Model 동기화
3.1 동기화 전략
동기식 (Transactional)
- • 같은 트랜잭션에서 Write/Read 모델 갱신
- • 강한 일관성 보장
- • 성능 저하 가능
- • 단일 DB에서만 가능
비동기식 (Eventual)
- • 이벤트로 Read 모델 갱신
- • 최종 일관성
- • 높은 성능
- • 분리된 DB 가능
3.2 Projection 구현
// Projector: 이벤트를 Read Model로 변환
interface Projector<T extends DomainEvent> {
project(event: T): Promise<void>;
}
// Order Read Model Projector
class OrderReadModelProjector {
constructor(private readonly readDb: ReadDatabase) {}
@OnEvent('OrderCreated')
async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readDb.execute(`
INSERT INTO order_read_model (
id, customer_id, items, shipping_address,
status, total_amount, item_count, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, [
event.orderId,
event.customerId,
JSON.stringify(event.items),
JSON.stringify(event.shippingAddress),
'CREATED',
event.totalAmount,
event.items.length,
event.occurredAt
]);
}
@OnEvent('OrderConfirmed')
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
await this.readDb.execute(`
UPDATE order_read_model
SET status = 'CONFIRMED', confirmed_at = $2
WHERE id = $1
`, [event.orderId, event.occurredAt]);
}
@OnEvent('OrderShipped')
async onOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readDb.execute(`
UPDATE order_read_model
SET status = 'SHIPPED',
shipped_at = $2,
tracking_number = $3
WHERE id = $1
`, [event.orderId, event.occurredAt, event.trackingNumber]);
}
@OnEvent('OrderCancelled')
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readDb.execute(`
UPDATE order_read_model
SET status = 'CANCELLED',
cancelled_at = $2,
cancellation_reason = $3
WHERE id = $1
`, [event.orderId, event.occurredAt, event.reason]);
}
}
// 여러 Read Model을 위한 Projector
class CustomerOrderStatsProjector {
constructor(private readonly readDb: ReadDatabase) {}
@OnEvent('OrderCreated')
async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readDb.execute(`
INSERT INTO customer_order_stats (customer_id, total_orders, total_spent)
VALUES ($1, 1, $2)
ON CONFLICT (customer_id) DO UPDATE SET
total_orders = customer_order_stats.total_orders + 1,
total_spent = customer_order_stats.total_spent + $2
`, [event.customerId, event.totalAmount]);
}
@OnEvent('OrderCancelled')
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
// 취소된 주문 통계 조정
const order = await this.readDb.query(
'SELECT total_amount FROM order_read_model WHERE id = $1',
[event.orderId]
);
await this.readDb.execute(`
UPDATE customer_order_stats SET
total_orders = total_orders - 1,
total_spent = total_spent - $2,
cancelled_orders = cancelled_orders + 1
WHERE customer_id = $1
`, [event.customerId, order.total_amount]);
}
}3.3 Read Model 재구축
Read Model에 버그가 있거나 새로운 뷰가 필요할 때, 이벤트를 재생하여 재구축합니다.
// Read Model 재구축 서비스
class ReadModelRebuilder {
constructor(
private readonly eventStore: EventStore,
private readonly projectors: Map<string, Projector<any>[]>
) {}
async rebuild(readModelName: string): Promise<void> {
console.log(`Rebuilding ${readModelName}...`);
// 1. 기존 Read Model 삭제
await this.clearReadModel(readModelName);
// 2. 모든 이벤트 재생
let processedCount = 0;
const batchSize = 1000;
let lastEventId: string | undefined;
while (true) {
const events = await this.eventStore.getEvents({
afterEventId: lastEventId,
limit: batchSize
});
if (events.length === 0) break;
for (const event of events) {
const projectors = this.projectors.get(event.eventType) || [];
for (const projector of projectors) {
if (this.isForReadModel(projector, readModelName)) {
await projector.project(event);
}
}
processedCount++;
}
lastEventId = events[events.length - 1].eventId;
console.log(`Processed ${processedCount} events...`);
}
console.log(`Rebuild complete. Total events: ${processedCount}`);
}
async rebuildAll(): Promise<void> {
const readModels = ['order_read_model', 'customer_order_stats', 'product_sales'];
for (const readModel of readModels) {
await this.rebuild(readModel);
}
}
}
// 점진적 재구축 (운영 중 사용)
class IncrementalRebuilder {
async rebuildWithZeroDowntime(readModelName: string): Promise<void> {
const newTableName = `${readModelName}_new`;
// 1. 새 테이블 생성
await this.createTable(newTableName);
// 2. 새 테이블에 재구축
await this.rebuildToTable(newTableName);
// 3. 테이블 스왑 (atomic)
await this.swapTables(readModelName, newTableName);
// 4. 이전 테이블 삭제
await this.dropTable(`${readModelName}_old`);
}
}3.4 일관성 지연 처리
// UI에서 일관성 지연 처리
class OrderService {
// Command 후 즉시 조회 시 문제
async createAndGet(command: CreateOrderCommand): Promise<OrderDetailDto> {
await this.commandBus.dispatch(command);
// ❌ Read Model이 아직 갱신되지 않았을 수 있음
return this.queryBus.dispatch(new GetOrderByIdQuery(command.orderId));
}
}
// 해결책 1: Optimistic UI
// 클라이언트에서 Command 성공 시 로컬 상태 즉시 갱신
const createOrder = async (orderData) => {
const tempId = generateTempId();
// 낙관적 업데이트
dispatch({ type: 'ORDER_CREATED_OPTIMISTIC', payload: { ...orderData, id: tempId } });
try {
const { orderId } = await api.createOrder(orderData);
dispatch({ type: 'ORDER_CREATED_CONFIRMED', payload: { tempId, orderId } });
} catch (error) {
dispatch({ type: 'ORDER_CREATED_FAILED', payload: { tempId, error } });
}
};
// 해결책 2: Command에서 ID 반환
class CreateOrderHandler {
async handle(command: CreateOrderCommand): Promise<string> {
const order = Order.create(command);
await this.orderRepository.save(order);
// Write Model에서 직접 ID 반환
return order.id.value;
}
}
// 해결책 3: Polling / WebSocket
class OrderStatusPoller {
async waitForReadModel(orderId: string, maxWaitMs: number = 5000): Promise<OrderDetailDto> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
try {
return await this.queryBus.dispatch(new GetOrderByIdQuery(orderId));
} catch (error) {
if (error instanceof OrderNotFoundException) {
await this.delay(100);
continue;
}
throw error;
}
}
throw new TimeoutError('Read model not yet available');
}
}4. CQRS 적용 시나리오와 트레이드오프
4.1 CQRS가 적합한 경우
✅ 적합한 시나리오
- 읽기/쓰기 비율 불균형: 읽기가 쓰기보다 훨씬 많은 경우 (예: 10:1 이상)
- 복잡한 조회 요구사항: 다양한 뷰, 집계, 검색이 필요한 경우
- 성능 요구사항 차이: 읽기는 빠르게, 쓰기는 정확하게
- 팀 분리: 읽기/쓰기 팀이 독립적으로 개발해야 하는 경우
- 확장성 요구: 읽기/쓰기를 독립적으로 스케일링해야 하는 경우
❌ 부적합한 시나리오
- 단순한 CRUD: 복잡한 비즈니스 로직이 없는 경우
- 강한 일관성 필수: 즉시 일관성이 반드시 필요한 경우
- 작은 팀/프로젝트: 복잡도 대비 이점이 적은 경우
- 읽기/쓰기 균형: 비율이 비슷한 경우
4.2 트레이드오프
| 장점 | 단점 |
|---|---|
| 독립적 최적화 | 복잡도 증가 |
| 독립적 확장 | 최종 일관성 처리 |
| 읽기 성능 향상 | 데이터 중복 |
| 유연한 Read Model | 동기화 로직 필요 |
| 팀 독립성 | 운영 복잡도 |
4.3 점진적 도입 전략
코드 수준 분리부터 시작
같은 DB를 사용하면서 Command/Query 핸들러만 분리. 기존 코드 변경 최소화.
Read Model 추가
복잡한 조회가 필요한 부분에 비정규화된 Read Model 추가. 동기식 갱신으로 시작.
이벤트 기반 동기화
Domain Event를 발행하고 Projector로 Read Model 갱신. 최종 일관성 수용.
DB 분리 (선택)
필요시 Write/Read DB 분리. Read DB는 읽기 최적화 (예: Elasticsearch, Redis).
4.4 실무 구현 예시
// 실무 CQRS 구조 예시
src/
├── application/
│ ├── commands/
│ │ ├── CreateOrderCommand.ts
│ │ ├── CreateOrderHandler.ts
│ │ ├── ConfirmOrderCommand.ts
│ │ └── ConfirmOrderHandler.ts
│ ├── queries/
│ │ ├── GetOrderByIdQuery.ts
│ │ ├── GetOrderByIdHandler.ts
│ │ ├── SearchOrdersQuery.ts
│ │ └── SearchOrdersHandler.ts
│ └── projectors/
│ ├── OrderReadModelProjector.ts
│ └── CustomerStatsProjector.ts
├── domain/
│ ├── order/
│ │ ├── Order.ts # Aggregate
│ │ ├── OrderItem.ts # Entity
│ │ └── events/
│ │ ├── OrderCreated.ts
│ │ └── OrderConfirmed.ts
│ └── shared/
│ └── ValueObjects.ts
├── infrastructure/
│ ├── persistence/
│ │ ├── OrderRepository.ts # Write
│ │ └── OrderReadRepository.ts # Read
│ └── messaging/
│ └── EventPublisher.ts
└── interfaces/
├── rest/
│ └── OrderController.ts
└── graphql/
└── OrderResolver.ts
// Module 구성 (NestJS 예시)
@Module({
imports: [
CqrsModule,
TypeOrmModule.forFeature([OrderEntity]), // Write
ElasticsearchModule.forRoot({}), // Read
],
providers: [
// Command Handlers
CreateOrderHandler,
ConfirmOrderHandler,
// Query Handlers
GetOrderByIdHandler,
SearchOrdersHandler,
// Projectors
OrderReadModelProjector,
// Infrastructure
OrderRepository,
OrderReadRepository,
],
controllers: [OrderController],
})
export class OrderModule {}5. FAQ 및 핵심 요약
아니요. CQRS와 Event Sourcing은 독립적인 패턴입니다. CQRS만 단독으로 사용해도 됩니다. Event Sourcing은 CQRS의 Read Model 동기화에 유용하지만 필수는 아닙니다.
네, 용도별로 여러 Read Model을 만들 수 있습니다. 예: 목록용 OrderSummaryView, 상세용 OrderDetailView, 검색용 OrderSearchView. 각각 다른 저장소(RDB, Elasticsearch, Redis)를 사용할 수도 있습니다.
순수 CQRS에서는 Command는 void를 반환합니다. 하지만 실무에서는 생성된 ID나 버전 번호 정도는 반환하는 것이 일반적입니다. 복잡한 데이터 반환은 피하세요.
일반적으로 밀리초~초 단위입니다. 메시지 브로커 성능, Projector 처리 속도에 따라 다릅니다. 대부분의 사용자는 이 지연을 인지하지 못합니다.
아니요. CQRS는 복잡도를 추가하므로, 이점이 명확한 Context에만 적용하세요. 단순한 CRUD Context는 전통적인 방식이 더 적합합니다.
핵심 요약
- • 상태 변경 의도 표현
- • 도메인 로직 실행
- • Aggregate 중심
- • 트랜잭션 일관성
- • 데이터 조회 전용
- • 비정규화된 Read Model
- • 화면/API 최적화
- • 캐싱 용이
- • Projector로 Read Model 갱신
- • 이벤트 기반 비동기
- • 최종 일관성 수용
- • 재구축 가능
- • 복잡도 vs 유연성
- • 일관성 vs 성능
- • 점진적 도입 권장
- • 필요한 곳에만 적용
"CQRS는 읽기와 쓰기의 요구사항이 다르다는 것을 인정하고,
각각에 최적화된 모델을 제공하는 것이다."
— Greg Young