我们面临一个典型的单体应用困境。一个高度规范化的关系型数据库支撑着所有业务,Product 表有超过40个字段,关联着十几个其他表。每次对核心产品信息进行更新,都会触发复杂的业务逻辑,锁定多行数据,导致在高并发写入时性能急剧下降。同时,查询侧为了在前端展示一个完整的产品视图,需要执行一个包含七八个JOIN的SQL,这同样是性能瓶颈。更要命的是,业务方开始频繁提出需求,需要追溯某个产品价格、库存、描述等关键属性在过去任意时间点的状态,现有的模型根本无法支持。
初步的构想是引入CQRS(命令查询职责分离)和Event Sourcing(事件溯源)。这个组合拳理论上能完美解决我们的问题:
- 写模型 (Command Side): 使用Event Sourcing。所有状态变更都以不可变事件(
Event)的形式持久化。这天然地提供了一个完整的审计日志,解决了历史追溯问题。命令(Command)处理只涉及追加事件,这是一个极快的操作,能极大提升写入性能。 - 读模型 (Query Side): 从事件流中,异步地构建一个或多个为特定查询场景优化的“投影”(
Projection)。这些投影可以是反规范化的,专门服务于某个UI视图,从而消除复杂的JOIN查询。
技术选型决策很快就变得棘手。团队对Spring Boot和MyBatis有非常深厚的技术积累,运维团队对MySQL的管理也轻车熟路。贸然引入一套全新的技术栈,比如用Kafka做事件总线,用MongoDB或Elasticsearch做读模型,会带来巨大的学习成本和运维风险。在真实项目中,技术选型不只是看哪个更“酷”,而是要在理想架构和团队现有能力之间找到平衡。
最终的决策是:
- 写模型/事件存储: 保持简单,初期直接用一个
events表来存储事件流,利用数据库的事务性来保证原子写入。 - 读模型/投影: 继续使用MySQL和MyBatis。构建一个或多个反规范化的
product_summary_view表,由一个后台进程(Projector)消费事件并更新这些表。这样,现有的数据访问层知识可以完全复用。 - API层: 前端需要聚合来自不同读模型的数据,GraphQL是理想选择。它允许客户端精确声明所需数据,避免多次API调用。
- 前端测试: 复杂的前端视图和数据流交互,必须有坚实的测试保障。Jest作为React生态的事实标准,配合testing-library和GraphQL客户端的Mock工具,能确保UI在各种数据状态下的行为符合预期。
步骤化实现:从事件到视图的完整链路
1. 领域模型与事件定义 (Write Side)
我们从核心的Product聚合根开始。它不直接存储状态,而是通过应用事件来重建状态和验证业务规则。
// ProductAggregate.java
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
// 这是一个简化的聚合根,它只在内存中处理事件
public class ProductAggregate {
private UUID id;
private String name;
private BigDecimal price;
private int inventory;
private boolean discontinued;
private long version = 0; // 用于乐观锁
private final List<DomainEvent> changes = new ArrayList<>();
public ProductAggregate() {}
// 工厂方法用于创建新产品
public static ProductAggregate create(String name, BigDecimal price, int initialInventory) {
ProductAggregate product = new ProductAggregate();
UUID productId = UUID.randomUUID();
// 验证业务规则
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Product name cannot be empty.");
}
if (price.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("Price must be positive.");
}
// 生成事件
ProductCreatedEvent event = new ProductCreatedEvent(productId, name, price, initialInventory);
product.apply(event);
product.changes.add(event);
return product;
}
// 调整价格的业务方法
public void adjustPrice(BigDecimal newPrice) {
if (this.discontinued) {
throw new IllegalStateException("Cannot adjust price of a discontinued product.");
}
if (newPrice.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("New price must be positive.");
}
PriceAdjustedEvent event = new PriceAdjustedEvent(this.id, newPrice);
apply(event);
this.changes.add(event);
}
// 下架产品
public void discontinue() {
if (this.discontinued) {
return; // 幂等操作
}
ProductDiscontinuedEvent event = new ProductDiscontinuedEvent(this.id);
apply(event);
this.changes.add(event);
}
// `apply`方法用于根据事件更新内部状态
private void apply(DomainEvent event) {
if (event instanceof ProductCreatedEvent e) {
this.id = e.aggregateId();
this.name = e.name();
this.price = e.price();
this.inventory = e.inventory();
this.discontinued = false;
} else if (event instanceof PriceAdjustedEvent e) {
this.price = e.newPrice();
} else if (event instanceof ProductDiscontinuedEvent e) {
this.discontinued = true;
}
this.version++;
}
// 从历史事件中重建聚合状态
public static ProductAggregate fromEvents(List<DomainEvent> history) {
ProductAggregate product = new ProductAggregate();
for (DomainEvent event : history) {
product.apply(event);
}
// 清空重建过程中的变更记录
product.getUncommittedChanges().clear();
return product;
}
public UUID getId() {
return id;
}
public long getVersion() {
return version;
}
public List<DomainEvent> getUncommittedChanges() {
return changes;
}
}
// DomainEvent 接口及其实现
public interface DomainEvent {
UUID aggregateId();
long occurredOn();
}
public record ProductCreatedEvent(UUID aggregateId, String name, BigDecimal price, int inventory, long occurredOn) implements DomainEvent {
public ProductCreatedEvent(UUID aggregateId, String name, BigDecimal price, int inventory) {
this(aggregateId, name, price, inventory, System.currentTimeMillis());
}
}
public record PriceAdjustedEvent(UUID aggregateId, BigDecimal newPrice, long occurredOn) implements DomainEvent {
public PriceAdjustedEvent(UUID aggregateId, BigDecimal newPrice) {
this(aggregateId, newPrice, System.currentTimeMillis());
}
}
public record ProductDiscontinuedEvent(UUID aggregateId, long occurredOn) implements DomainEvent {
public ProductDiscontinuedEvent(UUID aggregateId) {
this(aggregateId, System.currentTimeMillis());
}
}
2. 事件存储的实现
这里我们用一个简单的event_store表来持久化事件。真实项目中,event_data字段通常是JSON或二进制格式(如Protobuf),并且需要对aggregate_id和sequence建立索引。
CREATE TABLE event_store (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id VARCHAR(36) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data TEXT NOT NULL, -- 在生产中通常是 JSON 或二进制格式
sequence BIGINT NOT NULL, -- 聚合的版本号/事件序号
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_agg_id_seq (aggregate_id, sequence)
);
对应的EventStore服务负责保存和加载事件。
// EventStore.java (简化版)
// 这是一个概念性的实现,生产环境需要更健壮的序列化和错误处理
@Service
public class SimpleEventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper; // Jackson ObjectMapper
public SimpleEventStore(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
@Transactional
public void saveEvents(UUID aggregateId, long expectedVersion, List<DomainEvent> events) {
// 简单的乐观锁检查
long currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(sequence), -1) FROM event_store WHERE aggregate_id = ?",
Long.class,
aggregateId.toString()
);
if (currentVersion != expectedVersion - 1) {
throw new OptimisticLockingFailureException("Concurrency conflict for aggregate: " + aggregateId);
}
long sequence = currentVersion;
for (DomainEvent event : events) {
sequence++;
try {
String eventData = objectMapper.writeValueAsString(event);
jdbcTemplate.update(
"INSERT INTO event_store (aggregate_id, event_type, event_data, sequence) VALUES (?, ?, ?, ?)",
aggregateId.toString(),
event.getClass().getSimpleName(),
eventData,
sequence
);
} catch (JsonProcessingException e) {
// 在生产环境中,这里需要更复杂的错误处理和日志记录
throw new RuntimeException("Failed to serialize event", e);
}
}
}
public List<DomainEvent> loadEvents(UUID aggregateId) {
// ... 实现从数据库加载并反序列化事件的逻辑 ...
// 省略具体代码,但核心是查询 event_store 表并用 ObjectMapper 反序列化
return new ArrayList<>();
}
}
3. 投影器:连接写模型与MyBatis读模型的桥梁
Projector是这个架构的核心。它监听新持久化的事件,并调用MyBatis Mapper来更新读模型。一个常见的错误是直接在保存事件的事务里同步更新读模型,这会破坏CQRS的初衷。我们采用异步方式。
sequenceDiagram
participant CommandHandler as 命令处理器
participant Aggregate as 聚合根
participant EventStore as 事件存储
participant EventBus as 事件总线(异步)
participant Projector as 投影器
participant ReadDB as 读数据库(MyBatis)
CommandHandler->>Aggregate: execute(command)
Aggregate-->>CommandHandler: return events
CommandHandler->>EventStore: saveEvents(events)
EventStore->>EventBus: publish(events)
Note right of EventBus: 异步处理
EventBus->>Projector: onEvent(event)
Projector->>ReadDB: 更新/插入读模型
读模型表结构,完全为查询优化:
CREATE TABLE product_summary_view (
product_id VARCHAR(36) PRIMARY KEY,
product_name VARCHAR(255) NOT NULL,
current_price DECIMAL(10, 2) NOT NULL,
inventory_count INT NOT NULL,
is_active BOOLEAN NOT NULL,
last_updated_at TIMESTAMP
);
MyBatis Mapper接口:
// ProductSummaryMapper.java
@Mapper
public interface ProductSummaryMapper {
void insertProductSummary(ProductSummaryView view);
void updatePrice(@Param("productId") String productId, @Param("newPrice") BigDecimal newPrice, @Param("updatedAt") Instant updatedAt);
void markAsDiscontinued(@Param("productId") String productId, @Param("updatedAt") Instant updatedAt);
ProductSummaryView findById(@Param("productId") String productId);
}
对应的XML(省略,为标准MyBatis insert/update语句)。
投影器的实现。这里使用Spring的@EventListener来模拟一个简单的异步事件总线。
// ProductProjector.java
@Component
public class ProductProjector {
private static final Logger log = LoggerFactory.getLogger(ProductProjector.class);
private final ProductSummaryMapper productSummaryMapper;
public ProductProjector(ProductSummaryMapper productSummaryMapper) {
this.productSummaryMapper = productSummaryMapper;
}
// @TransactionalEventListener 会在提交事务后触发,保证事件已持久化
// phase = TransactionPhase.AFTER_COMMIT
@EventListener
@Async // 确保在独立线程中异步执行,不阻塞命令处理流程
public void handleProductCreated(ProductCreatedEvent event) {
log.info("Projecting ProductCreatedEvent for aggregate {}", event.aggregateId());
ProductSummaryView view = new ProductSummaryView(
event.aggregateId().toString(),
event.name(),
event.price(),
event.inventory(),
true,
Instant.ofEpochMilli(event.occurredOn())
);
// 这里的坑在于:消息可能重复投递。需要保证操作的幂等性。
// 对于创建操作,可以先查询是否存在,或者依赖主键冲突忽略。
// 我们假设product_id是主键,插入失败会抛异常。
try {
productSummaryMapper.insertProductSummary(view);
} catch (DuplicateKeyException e) {
log.warn("Idempotency: Received duplicate ProductCreatedEvent for ID {}", event.aggregateId());
}
}
@EventListener
@Async
public void handlePriceAdjusted(PriceAdjustedEvent event) {
log.info("Projecting PriceAdjustedEvent for aggregate {}", event.aggregateId());
productSummaryMapper.updatePrice(
event.aggregateId().toString(),
event.newPrice(),
Instant.ofEpochMilli(event.occurredOn())
);
}
@EventListener
@Async
public void handleProductDiscontinued(ProductDiscontinuedEvent event) {
log.info("Projecting ProductDiscontinuedEvent for aggregate {}", event.aggregateId());
productSummaryMapper.markAsDiscontinued(
event.aggregateId().toString(),
Instant.ofEpochMilli(event.occurredOn())
);
}
}
4. GraphQL API 与前端消费
GraphQL层非常直接,它只是一个薄层,负责调用MyBatis Mapper。
GraphQL Schema:
type Query {
productSummary(id: ID!): ProductSummary
}
type ProductSummary {
productId: ID!
productName: String!
currentPrice: Float!
inventoryCount: Int!
isActive: Boolean!
lastUpdatedAt: String
}
Spring for GraphQL DataFetcher:
// ProductQueryController.java
@Controller
public class ProductQueryController {
private final ProductSummaryMapper productSummaryMapper;
public ProductQueryController(ProductSummaryMapper productSummaryMapper) {
this.productSummaryMapper = productSummaryMapper;
}
@QueryMapping
public ProductSummaryView productSummary(@Argument String id) {
// 直接调用MyBatis查询优化后的读模型
ProductSummaryView view = productSummaryMapper.findById(id);
if (view == null) {
// 在GraphQL中,返回null表示未找到是标准做法
return null;
}
return view;
}
}
前端React组件使用Apollo Client消费数据。
// src/components/ProductDisplay.js
import React from 'react';
import { gql, useQuery } from '@apollo/client';
export const GET_PRODUCT_SUMMARY = gql`
query GetProductSummary($id: ID!) {
productSummary(id: $id) {
productId
productName
currentPrice
inventoryCount
isActive
}
}
`;
export function ProductDisplay({ productId }) {
const { loading, error, data } = useQuery(GET_PRODUCT_SUMMARY, {
variables: { id: productId },
});
if (loading) return <p>Loading...</p>;
if (error) return <p>Error: {error.message}</p>;
if (!data || !data.productSummary) return <p>Product not found.</p>;
const { productName, currentPrice, inventoryCount, isActive } = data.productSummary;
return (
<div>
<h1>{productName}</h1>
<p>Price: ${currentPrice.toFixed(2)}</p>
<p>In Stock: {inventoryCount}</p>
<p>Status: {isActive ? 'Active' : 'Discontinued'}</p>
</div>
);
}
5. 使用Jest进行前端健壮性测试
这套架构最容易出问题的地方在于前端对异步数据流的处理。用户可能看到加载状态、错误状态、空数据状态。Jest和@apollo/client/testing能完美模拟这些场景。
// src/components/ProductDisplay.test.js
import React from 'react';
import { render, screen, waitFor } from '@testing-library/react';
import { MockedProvider } from '@apollo/client/testing';
import { ProductDisplay, GET_PRODUCT_SUMMARY } from './ProductDisplay';
// 测试用例 1: 成功加载数据
test('renders product details when query is successful', async () => {
const mockProductId = 'a1b2c3d4';
const mocks = [
{
request: {
query: GET_PRODUCT_SUMMARY,
variables: { id: mockProductId },
},
result: {
data: {
productSummary: {
productId: mockProductId,
productName: 'Super Widget',
currentPrice: 99.99,
inventoryCount: 150,
isActive: true,
},
},
},
},
];
render(
<MockedProvider mocks={mocks} addTypename={false}>
<ProductDisplay productId={mockProductId} />
</MockedProvider>
);
// 初始状态应显示 Loading
expect(screen.getByText('Loading...')).toBeInTheDocument();
// 等待异步查询完成并断言UI更新
await waitFor(() => {
expect(screen.getByText('Super Widget')).toBeInTheDocument();
});
expect(screen.getByText('$99.99')).toBeInTheDocument();
expect(screen.getByText('In Stock: 150')).toBeInTheDocument();
expect(screen.getByText('Status: Active')).toBeInTheDocument();
});
// 测试用例 2: GraphQL返回错误
test('renders error message when query fails', async () => {
const mockProductId = 'a1b2c3d4';
const mocks = [
{
request: {
query: GET_PRODUCT_SUMMARY,
variables: { id: mockProductId },
},
error: new Error('An error occurred'),
},
];
render(
<MockedProvider mocks={mocks} addTypename={false}>
<ProductDisplay productId={mockProductId} />
</MockedProvider>
);
// 等待错误信息显示
await waitFor(() => {
expect(screen.getByText('Error: An error occurred')).toBeInTheDocument();
});
});
// 测试用例 3: 未找到数据
test('renders "not found" message when data is null', async () => {
const mockProductId = 'a1b2c3d4';
const mocks = [
{
request: {
query: GET_PRODUCT_SUMMARY,
variables: { id: mockProductId },
},
result: {
data: {
productSummary: null, // API返回null
},
},
},
];
render(
<MockedProvider mocks={mocks} addTypename={false}>
<ProductDisplay productId={mockProductId} />
</MockedProvider>
);
await waitFor(() => {
expect(screen.getByText('Product not found.')).toBeInTheDocument();
});
});
这里的核心是MockedProvider,它拦截了组件发出的GraphQL请求,并返回我们预设的成功、失败或空数据。这使得我们可以在完全不依赖后端服务的情况下,对UI组件的各种边界条件进行全面测试。
架构的局限性与未来迭代路径
这套架构并非银弹。最显著的挑战是最终一致性。从命令执行到读模型更新之间存在延迟,用户可能在刷新页面后短暂地看不到最新的状态。业务上需要评估这种延迟的可接受程度。对于无法接受延迟的场景,可以采用一些UI层面的补偿策略,比如命令执行后前端进行乐观更新,或者轮询查询结果直到数据更新。
另一个问题是投影的健壮性。如果投影器在处理某个事件时失败,需要有重试和死信队列机制来保证数据最终能被处理。同时,当业务逻辑变更需要修改读模型结构时,我们可能需要“重放”(Replay)所有历史事件来重建新的读模型,这对事件存储的查询性能和投影器的幂等性设计提出了更高要求。
未来的优化路径可以包括:
- 引入真正的消息队列: 使用Kafka或RabbitMQ替代应用内的事件总线,提供更好的削峰填谷、持久化保证和水平扩展能力。
- 快照(Snapshot)机制: 对于生命周期很长、事件数量极多的聚合,每次都从头加载所有事件会很慢。可以定期为聚合状态创建快照,重建时从最近的快照开始,再应用后续事件。
- 完善的事务性保证: 为保证“事件一定能被发布”,可以引入Transactional Outbox模式,将待发布的事件和业务操作在同一个本地事务中写入数据库,由另一个进程轮询这个“发件箱”表并将事件可靠地投递到消息总线。