将 Event Sourcing 写模型投影到 MyBatis 管理的读模型以实现 CQRS 架构


我们面临一个典型的单体应用困境。一个高度规范化的关系型数据库支撑着所有业务,Product 表有超过40个字段,关联着十几个其他表。每次对核心产品信息进行更新,都会触发复杂的业务逻辑,锁定多行数据,导致在高并发写入时性能急剧下降。同时,查询侧为了在前端展示一个完整的产品视图,需要执行一个包含七八个JOIN的SQL,这同样是性能瓶颈。更要命的是,业务方开始频繁提出需求,需要追溯某个产品价格、库存、描述等关键属性在过去任意时间点的状态,现有的模型根本无法支持。

初步的构想是引入CQRS(命令查询职责分离)和Event Sourcing(事件溯源)。这个组合拳理论上能完美解决我们的问题:

  1. 写模型 (Command Side): 使用Event Sourcing。所有状态变更都以不可变事件(Event)的形式持久化。这天然地提供了一个完整的审计日志,解决了历史追溯问题。命令(Command)处理只涉及追加事件,这是一个极快的操作,能极大提升写入性能。
  2. 读模型 (Query Side): 从事件流中,异步地构建一个或多个为特定查询场景优化的“投影”(Projection)。这些投影可以是反规范化的,专门服务于某个UI视图,从而消除复杂的JOIN查询。

技术选型决策很快就变得棘手。团队对Spring Boot和MyBatis有非常深厚的技术积累,运维团队对MySQL的管理也轻车熟路。贸然引入一套全新的技术栈,比如用Kafka做事件总线,用MongoDB或Elasticsearch做读模型,会带来巨大的学习成本和运维风险。在真实项目中,技术选型不只是看哪个更“酷”,而是要在理想架构和团队现有能力之间找到平衡。

最终的决策是:

  • 写模型/事件存储: 保持简单,初期直接用一个events表来存储事件流,利用数据库的事务性来保证原子写入。
  • 读模型/投影: 继续使用MySQL和MyBatis。构建一个或多个反规范化的product_summary_view表,由一个后台进程(Projector)消费事件并更新这些表。这样,现有的数据访问层知识可以完全复用。
  • API层: 前端需要聚合来自不同读模型的数据,GraphQL是理想选择。它允许客户端精确声明所需数据,避免多次API调用。
  • 前端测试: 复杂的前端视图和数据流交互,必须有坚实的测试保障。Jest作为React生态的事实标准,配合testing-library和GraphQL客户端的Mock工具,能确保UI在各种数据状态下的行为符合预期。

步骤化实现:从事件到视图的完整链路

1. 领域模型与事件定义 (Write Side)

我们从核心的Product聚合根开始。它不直接存储状态,而是通过应用事件来重建状态和验证业务规则。

// ProductAggregate.java
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

// 这是一个简化的聚合根,它只在内存中处理事件
public class ProductAggregate {

    private UUID id;
    private String name;
    private BigDecimal price;
    private int inventory;
    private boolean discontinued;
    private long version = 0; // 用于乐观锁

    private final List<DomainEvent> changes = new ArrayList<>();

    public ProductAggregate() {}

    // 工厂方法用于创建新产品
    public static ProductAggregate create(String name, BigDecimal price, int initialInventory) {
        ProductAggregate product = new ProductAggregate();
        UUID productId = UUID.randomUUID();
        // 验证业务规则
        if (name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException("Product name cannot be empty.");
        }
        if (price.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Price must be positive.");
        }
        // 生成事件
        ProductCreatedEvent event = new ProductCreatedEvent(productId, name, price, initialInventory);
        product.apply(event);
        product.changes.add(event);
        return product;
    }

    // 调整价格的业务方法
    public void adjustPrice(BigDecimal newPrice) {
        if (this.discontinued) {
            throw new IllegalStateException("Cannot adjust price of a discontinued product.");
        }
        if (newPrice.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("New price must be positive.");
        }
        PriceAdjustedEvent event = new PriceAdjustedEvent(this.id, newPrice);
        apply(event);
        this.changes.add(event);
    }
    
    // 下架产品
    public void discontinue() {
        if (this.discontinued) {
            return; // 幂等操作
        }
        ProductDiscontinuedEvent event = new ProductDiscontinuedEvent(this.id);
        apply(event);
        this.changes.add(event);
    }

    // `apply`方法用于根据事件更新内部状态
    private void apply(DomainEvent event) {
        if (event instanceof ProductCreatedEvent e) {
            this.id = e.aggregateId();
            this.name = e.name();
            this.price = e.price();
            this.inventory = e.inventory();
            this.discontinued = false;
        } else if (event instanceof PriceAdjustedEvent e) {
            this.price = e.newPrice();
        } else if (event instanceof ProductDiscontinuedEvent e) {
            this.discontinued = true;
        }
        this.version++;
    }

    // 从历史事件中重建聚合状态
    public static ProductAggregate fromEvents(List<DomainEvent> history) {
        ProductAggregate product = new ProductAggregate();
        for (DomainEvent event : history) {
            product.apply(event);
        }
        // 清空重建过程中的变更记录
        product.getUncommittedChanges().clear();
        return product;
    }

    public UUID getId() {
        return id;
    }

    public long getVersion() {
        return version;
    }

    public List<DomainEvent> getUncommittedChanges() {
        return changes;
    }
}

// DomainEvent 接口及其实现
public interface DomainEvent {
    UUID aggregateId();
    long occurredOn();
}

public record ProductCreatedEvent(UUID aggregateId, String name, BigDecimal price, int inventory, long occurredOn) implements DomainEvent {
    public ProductCreatedEvent(UUID aggregateId, String name, BigDecimal price, int inventory) {
        this(aggregateId, name, price, inventory, System.currentTimeMillis());
    }
}

public record PriceAdjustedEvent(UUID aggregateId, BigDecimal newPrice, long occurredOn) implements DomainEvent {
     public PriceAdjustedEvent(UUID aggregateId, BigDecimal newPrice) {
        this(aggregateId, newPrice, System.currentTimeMillis());
    }
}

public record ProductDiscontinuedEvent(UUID aggregateId, long occurredOn) implements DomainEvent {
    public ProductDiscontinuedEvent(UUID aggregateId) {
        this(aggregateId, System.currentTimeMillis());
    }
}

2. 事件存储的实现

这里我们用一个简单的event_store表来持久化事件。真实项目中,event_data字段通常是JSON或二进制格式(如Protobuf),并且需要对aggregate_idsequence建立索引。

CREATE TABLE event_store (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    aggregate_id VARCHAR(36) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data TEXT NOT NULL,         -- 在生产中通常是 JSON 或二进制格式
    sequence BIGINT NOT NULL,         -- 聚合的版本号/事件序号
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uq_agg_id_seq (aggregate_id, sequence)
);

对应的EventStore服务负责保存和加载事件。

// EventStore.java (简化版)
// 这是一个概念性的实现,生产环境需要更健壮的序列化和错误处理
@Service
public class SimpleEventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper; // Jackson ObjectMapper

    public SimpleEventStore(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
        this.jdbcTemplate = jdbcTemplate;
        this.objectMapper = objectMapper;
    }

    @Transactional
    public void saveEvents(UUID aggregateId, long expectedVersion, List<DomainEvent> events) {
        // 简单的乐观锁检查
        long currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(sequence), -1) FROM event_store WHERE aggregate_id = ?", 
            Long.class, 
            aggregateId.toString()
        );

        if (currentVersion != expectedVersion - 1) {
            throw new OptimisticLockingFailureException("Concurrency conflict for aggregate: " + aggregateId);
        }

        long sequence = currentVersion;
        for (DomainEvent event : events) {
            sequence++;
            try {
                String eventData = objectMapper.writeValueAsString(event);
                jdbcTemplate.update(
                    "INSERT INTO event_store (aggregate_id, event_type, event_data, sequence) VALUES (?, ?, ?, ?)",
                    aggregateId.toString(),
                    event.getClass().getSimpleName(),
                    eventData,
                    sequence
                );
            } catch (JsonProcessingException e) {
                // 在生产环境中,这里需要更复杂的错误处理和日志记录
                throw new RuntimeException("Failed to serialize event", e);
            }
        }
    }

    public List<DomainEvent> loadEvents(UUID aggregateId) {
        // ... 实现从数据库加载并反序列化事件的逻辑 ...
        // 省略具体代码,但核心是查询 event_store 表并用 ObjectMapper 反序列化
        return new ArrayList<>(); 
    }
}

3. 投影器:连接写模型与MyBatis读模型的桥梁

Projector是这个架构的核心。它监听新持久化的事件,并调用MyBatis Mapper来更新读模型。一个常见的错误是直接在保存事件的事务里同步更新读模型,这会破坏CQRS的初衷。我们采用异步方式。

sequenceDiagram
    participant CommandHandler as 命令处理器
    participant Aggregate as 聚合根
    participant EventStore as 事件存储
    participant EventBus as 事件总线(异步)
    participant Projector as 投影器
    participant ReadDB as 读数据库(MyBatis)

    CommandHandler->>Aggregate: execute(command)
    Aggregate-->>CommandHandler: return events
    CommandHandler->>EventStore: saveEvents(events)
    EventStore->>EventBus: publish(events)
    
    Note right of EventBus: 异步处理
    
    EventBus->>Projector: onEvent(event)
    Projector->>ReadDB: 更新/插入读模型

读模型表结构,完全为查询优化:

CREATE TABLE product_summary_view (
    product_id VARCHAR(36) PRIMARY KEY,
    product_name VARCHAR(255) NOT NULL,
    current_price DECIMAL(10, 2) NOT NULL,
    inventory_count INT NOT NULL,
    is_active BOOLEAN NOT NULL,
    last_updated_at TIMESTAMP
);

MyBatis Mapper接口:

// ProductSummaryMapper.java
@Mapper
public interface ProductSummaryMapper {

    void insertProductSummary(ProductSummaryView view);

    void updatePrice(@Param("productId") String productId, @Param("newPrice") BigDecimal newPrice, @Param("updatedAt") Instant updatedAt);
    
    void markAsDiscontinued(@Param("productId") String productId, @Param("updatedAt") Instant updatedAt);

    ProductSummaryView findById(@Param("productId") String productId);
}

对应的XML(省略,为标准MyBatis insert/update语句)。

投影器的实现。这里使用Spring的@EventListener来模拟一个简单的异步事件总线。

// ProductProjector.java
@Component
public class ProductProjector {

    private static final Logger log = LoggerFactory.getLogger(ProductProjector.class);
    private final ProductSummaryMapper productSummaryMapper;

    public ProductProjector(ProductSummaryMapper productSummaryMapper) {
        this.productSummaryMapper = productSummaryMapper;
    }

    // @TransactionalEventListener 会在提交事务后触发,保证事件已持久化
    // phase = TransactionPhase.AFTER_COMMIT
    @EventListener
    @Async // 确保在独立线程中异步执行,不阻塞命令处理流程
    public void handleProductCreated(ProductCreatedEvent event) {
        log.info("Projecting ProductCreatedEvent for aggregate {}", event.aggregateId());
        ProductSummaryView view = new ProductSummaryView(
            event.aggregateId().toString(),
            event.name(),
            event.price(),
            event.inventory(),
            true,
            Instant.ofEpochMilli(event.occurredOn())
        );
        // 这里的坑在于:消息可能重复投递。需要保证操作的幂等性。
        // 对于创建操作,可以先查询是否存在,或者依赖主键冲突忽略。
        // 我们假设product_id是主键,插入失败会抛异常。
        try {
            productSummaryMapper.insertProductSummary(view);
        } catch (DuplicateKeyException e) {
            log.warn("Idempotency: Received duplicate ProductCreatedEvent for ID {}", event.aggregateId());
        }
    }

    @EventListener
    @Async
    public void handlePriceAdjusted(PriceAdjustedEvent event) {
        log.info("Projecting PriceAdjustedEvent for aggregate {}", event.aggregateId());
        productSummaryMapper.updatePrice(
            event.aggregateId().toString(),
            event.newPrice(),
            Instant.ofEpochMilli(event.occurredOn())
        );
    }
    
    @EventListener
    @Async
    public void handleProductDiscontinued(ProductDiscontinuedEvent event) {
        log.info("Projecting ProductDiscontinuedEvent for aggregate {}", event.aggregateId());
        productSummaryMapper.markAsDiscontinued(
            event.aggregateId().toString(),
            Instant.ofEpochMilli(event.occurredOn())
        );
    }
}

4. GraphQL API 与前端消费

GraphQL层非常直接,它只是一个薄层,负责调用MyBatis Mapper。

GraphQL Schema:

type Query {
  productSummary(id: ID!): ProductSummary
}

type ProductSummary {
  productId: ID!
  productName: String!
  currentPrice: Float!
  inventoryCount: Int!
  isActive: Boolean!
  lastUpdatedAt: String
}

Spring for GraphQL DataFetcher:

// ProductQueryController.java
@Controller
public class ProductQueryController {

    private final ProductSummaryMapper productSummaryMapper;

    public ProductQueryController(ProductSummaryMapper productSummaryMapper) {
        this.productSummaryMapper = productSummaryMapper;
    }

    @QueryMapping
    public ProductSummaryView productSummary(@Argument String id) {
        // 直接调用MyBatis查询优化后的读模型
        ProductSummaryView view = productSummaryMapper.findById(id);
        if (view == null) {
            // 在GraphQL中,返回null表示未找到是标准做法
            return null;
        }
        return view;
    }
}

前端React组件使用Apollo Client消费数据。

// src/components/ProductDisplay.js
import React from 'react';
import { gql, useQuery } from '@apollo/client';

export const GET_PRODUCT_SUMMARY = gql`
  query GetProductSummary($id: ID!) {
    productSummary(id: $id) {
      productId
      productName
      currentPrice
      inventoryCount
      isActive
    }
  }
`;

export function ProductDisplay({ productId }) {
  const { loading, error, data } = useQuery(GET_PRODUCT_SUMMARY, {
    variables: { id: productId },
  });

  if (loading) return <p>Loading...</p>;
  if (error) return <p>Error: {error.message}</p>;
  if (!data || !data.productSummary) return <p>Product not found.</p>;

  const { productName, currentPrice, inventoryCount, isActive } = data.productSummary;

  return (
    <div>
      <h1>{productName}</h1>
      <p>Price: ${currentPrice.toFixed(2)}</p>
      <p>In Stock: {inventoryCount}</p>
      <p>Status: {isActive ? 'Active' : 'Discontinued'}</p>
    </div>
  );
}

5. 使用Jest进行前端健壮性测试

这套架构最容易出问题的地方在于前端对异步数据流的处理。用户可能看到加载状态、错误状态、空数据状态。Jest和@apollo/client/testing能完美模拟这些场景。

// src/components/ProductDisplay.test.js
import React from 'react';
import { render, screen, waitFor } from '@testing-library/react';
import { MockedProvider } from '@apollo/client/testing';
import { ProductDisplay, GET_PRODUCT_SUMMARY } from './ProductDisplay';

// 测试用例 1: 成功加载数据
test('renders product details when query is successful', async () => {
  const mockProductId = 'a1b2c3d4';
  const mocks = [
    {
      request: {
        query: GET_PRODUCT_SUMMARY,
        variables: { id: mockProductId },
      },
      result: {
        data: {
          productSummary: {
            productId: mockProductId,
            productName: 'Super Widget',
            currentPrice: 99.99,
            inventoryCount: 150,
            isActive: true,
          },
        },
      },
    },
  ];

  render(
    <MockedProvider mocks={mocks} addTypename={false}>
      <ProductDisplay productId={mockProductId} />
    </MockedProvider>
  );

  // 初始状态应显示 Loading
  expect(screen.getByText('Loading...')).toBeInTheDocument();

  // 等待异步查询完成并断言UI更新
  await waitFor(() => {
    expect(screen.getByText('Super Widget')).toBeInTheDocument();
  });
  
  expect(screen.getByText('$99.99')).toBeInTheDocument();
  expect(screen.getByText('In Stock: 150')).toBeInTheDocument();
  expect(screen.getByText('Status: Active')).toBeInTheDocument();
});

// 测试用例 2: GraphQL返回错误
test('renders error message when query fails', async () => {
  const mockProductId = 'a1b2c3d4';
  const mocks = [
    {
      request: {
        query: GET_PRODUCT_SUMMARY,
        variables: { id: mockProductId },
      },
      error: new Error('An error occurred'),
    },
  ];

  render(
    <MockedProvider mocks={mocks} addTypename={false}>
      <ProductDisplay productId={mockProductId} />
    </MockedProvider>
  );

  // 等待错误信息显示
  await waitFor(() => {
    expect(screen.getByText('Error: An error occurred')).toBeInTheDocument();
  });
});

// 测试用例 3: 未找到数据
test('renders "not found" message when data is null', async () => {
  const mockProductId = 'a1b2c3d4';
  const mocks = [
    {
      request: {
        query: GET_PRODUCT_SUMMARY,
        variables: { id: mockProductId },
      },
      result: {
        data: {
          productSummary: null, // API返回null
        },
      },
    },
  ];

  render(
    <MockedProvider mocks={mocks} addTypename={false}>
      <ProductDisplay productId={mockProductId} />
    </MockedProvider>
  );

  await waitFor(() => {
    expect(screen.getByText('Product not found.')).toBeInTheDocument();
  });
});

这里的核心是MockedProvider,它拦截了组件发出的GraphQL请求,并返回我们预设的成功、失败或空数据。这使得我们可以在完全不依赖后端服务的情况下,对UI组件的各种边界条件进行全面测试。

架构的局限性与未来迭代路径

这套架构并非银弹。最显著的挑战是最终一致性。从命令执行到读模型更新之间存在延迟,用户可能在刷新页面后短暂地看不到最新的状态。业务上需要评估这种延迟的可接受程度。对于无法接受延迟的场景,可以采用一些UI层面的补偿策略,比如命令执行后前端进行乐观更新,或者轮询查询结果直到数据更新。

另一个问题是投影的健壮性。如果投影器在处理某个事件时失败,需要有重试和死信队列机制来保证数据最终能被处理。同时,当业务逻辑变更需要修改读模型结构时,我们可能需要“重放”(Replay)所有历史事件来重建新的读模型,这对事件存储的查询性能和投影器的幂等性设计提出了更高要求。

未来的优化路径可以包括:

  1. 引入真正的消息队列: 使用Kafka或RabbitMQ替代应用内的事件总线,提供更好的削峰填谷、持久化保证和水平扩展能力。
  2. 快照(Snapshot)机制: 对于生命周期很长、事件数量极多的聚合,每次都从头加载所有事件会很慢。可以定期为聚合状态创建快照,重建时从最近的快照开始,再应用后续事件。
  3. 完善的事务性保证: 为保证“事件一定能被发布”,可以引入Transactional Outbox模式,将待发布的事件和业务操作在同一个本地事务中写入数据库,由另一个进程轮询这个“发件箱”表并将事件可靠地投递到消息总线。

  目录