集成ActiveMQ与Koa实现Flink端到端Exactly-Once语义的架构复盘


我们面临一个棘手的需求:一个外部系统通过HTTP API推送关键业务事件,我们的后端系统必须对这些事件进行有状态的流式处理,并且从数据接收到处理完成,整个链路必须保证“精确一次”(Exactly-Once)语义。任何事件的丢失或重复处理都可能导致业务逻辑错误和财务损失。在真实项目中,这意味着即使在API服务、消息队列或流处理作业发生崩溃重启的情况下,数据完整性也不能被破坏。

最初的架构构想是 HTTP API -> 消息队列 -> Flink。这看起来很标准,但魔鬼藏在细节里。如何确保从HTTP请求成功响应的那一刻起,到Flink完成状态更新,这条漫长的路径上,数据不多不少,只被处理一次?单纯的Flink Checkpoint只能保证Flink内部的Exactly-Once,它无法覆盖从外部API到消息队列这一段。如果Koa服务在发送消息到ActiveMQ后、响应HTTP请求前崩溃,会发生什么?如果Flink消费了消息、更新了状态,但在Checkpoint完成前崩溃,又会发生什么?

我们决定直面这个挑战,构建一个从API入口(Koa.js)到消息队列(ActiveMQ)再到流处理核心(Apache Flink)的全链路Exactly-Once管道。整个构建、测试和部署流程将由GitHub Actions自动化,确保方案的健壮性可以在持续集成中得到验证。

技术选型决策与核心挑战

  1. Apache Flink: 作为流处理引擎,它的两阶段提交(Two-Phase Commit)Checkpoint机制是实现Exactly-Once的核心。它能与支持事务的Source和Sink协同工作,形成一个原子性的端到端事务。
  2. ActiveMQ: 我们选择ActiveMQ而不是更流行的Kafka,主要是为了利用其成熟的JMS事务支持。JMS事务提供了一个清晰的commit/rollback模型,这对于Flink的TwoPhaseCommitSinkFunction和事务性Source来说,是一个非常经典的集成范例。这里的坑在于,Flink社区对ActiveMQ的事务性连接器支持不如Kafka完善,我们需要进行更细致的配置甚至定制。
  3. Koa.js + Babel: 作为API网关,Node.js的异步I/O模型非常适合处理大量并发的HTTP请求。最大的挑战在于,Node.js(一个非JVM环境)如何与ActiveMQ进行事务性交互。我们需要找到一个可靠的库,并精心设计API处理逻辑,确保“消息入队”和“HTTP响应”这两个操作的原子性。
  4. GitHub Actions: 自动化是生产级系统的基石。我们的工作流不仅要编译和打包一个Java应用和一个Node.js应用,还必须能够编排启动整个依赖环境(ActiveMQ, Flink),并运行一个端到端的集成测试,该测试会主动模拟故障来验证我们的Exactly-Once承诺。

步骤化实现:构建事务性数据管道

第一部分:Flink与ActiveMQ的事务性集成

Flink要实现端到端的Exactly-Once,其Source必须是可重放的,Sink必须是事务性的。在我们的场景中,ActiveMQ既是Source,也要作为可能的Sink。

首先是Flink作业的Maven依赖,我们需要flink-connector-jmsactivemq-client

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.1</version>
        <scope>provided</scope>
    </dependency>
    <!-- JMS Connector for Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jms</artifactId>
        <version>1.17.1</version>
    </dependency>
    <!-- ActiveMQ Client -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.18.3</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.36</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

核心是实现一个事务性的JMSSourceflink-connector-jms提供的JMSSource通过启用JMS会话的事务模式,并结合Flink的Checkpoint机制来实现精确一次消费。

当Flink触发一个Checkpoint时:

  1. JMSSource预提交当前的JMS事务。
  2. Flink将算子状态快照写入持久化存储。
  3. 当所有算子都成功完成快照后,Flink的JobManager会通知所有算子Checkpoint已完成。
  4. JMSSource收到通知后,正式提交JMS事务,消息才会被确认为消费。
  5. 如果Checkpoint失败,JMSSource会回滚事务,消息会回到队列中,待作业恢复后重新消费。

下面是Flink作业的核心代码,它从一个输入队列event-queue-in消费消息,进行简单的转换,然后使用一个自定义的两阶段提交Sink将结果写入event-queue-out

// FlinkExactlyOnceJob.java
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jms.source.JMSSource;
import org.apache.flink.connector.jms.source.config.JMSSourceConfig;
import org.apache.flink.connector.jms.source.reader.deserializer.JMSDeserializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.CheckpointingMode;

import javax.jms.ConnectionFactory;
import javax.jms.TextMessage;

public class FlinkExactlyOnceJob {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String INPUT_QUEUE = "event-queue-in";
    private static final String OUTPUT_QUEUE = "event-queue-out";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 关键:启用Checkpoint,并设置为Exactly-Once模式
        // checkpoint间隔5秒,对于生产环境这个值需要仔细权衡
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint超时
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 两次checkpoint之间最小间隔
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只能有一个checkpoint

        // 配置ActiveMQ Connection Factory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        // 构建事务性的JMSSource
        JMSSource<String> jmsSource = JMSSource.builder()
            .setConfig(JMSSourceConfig.builder()
                .setConnectionFactory(connectionFactory)
                .setDestination(INPUT_QUEUE)
                // 关键:设置为客户端确认模式,并启用事务
                .setAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE)
                .setTransactional(true) 
                .build())
            .setDeserializer(JMSDeserializer.of(message -> {
                if (message instanceof TextMessage) {
                    return ((TextMessage) message).getText();
                }
                return null;
            }))
            .build();

        DataStream<String> stream = env.fromSource(jmsSource, WatermarkStrategy.noWatermarks(), "ActiveMQ Source");

        // 简单的业务逻辑:在消息体后追加处理时间戳
        DataStream<String> processedStream = stream.map((MapFunction<String, String>) value -> {
            // 在真实项目中,这里可能包含复杂的状态计算
            System.out.println("Processing message: " + value);
            return value + ", processed_at: " + System.currentTimeMillis();
        });

        // 添加一个两阶段提交的Sink,将结果写入另一个ActiveMQ队列
        processedStream.addSink(new TwoPhaseCommitActiveMQSink(BROKER_URL, OUTPUT_QUEUE))
            .name("Transactional ActiveMQ Sink");
        
        env.execute("Flink End-to-End Exactly-Once Demo");
    }
}

为了实现事务性输出,我们需要自己实现一个TwoPhaseCommitSinkFunction

// TwoPhaseCommitActiveMQSink.java
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import javax.jms.*;

public class TwoPhaseCommitActiveMQSink extends TwoPhaseCommitSinkFunction<String, TwoPhaseCommitActiveMQSink.JmsTransaction, Void>
    implements CheckpointedFunction {

    private final String brokerUrl;
    private final String queueName;

    // 非瞬态,用于在恢复时重新提交或中止事务
    private transient ListState<JmsTransaction> pendingTransactions;
    
    public TwoPhaseCommitActiveMQSink(String brokerUrl, String queueName) {
        // 使用一个简单的Void作为上下文
        super(TypeInformation.of(JmsTransaction.class).createSerializer(), TypeInformation.of(Void.class).createSerializer());
        this.brokerUrl = brokerUrl;
        this.queueName = queueName;
    }

    // 步骤1:开始一个新事务
    @Override
    protected JmsTransaction beginTransaction() throws Exception {
        System.out.println("Beginning transaction...");
        return new JmsTransaction(brokerUrl, queueName);
    }

    // 步骤2:将数据写入当前事务
    @Override
    protected void invoke(JmsTransaction transaction, String value, Context context) throws Exception {
        transaction.produce(value);
    }

    // 步骤3:预提交事务
    @Override
    protected void preCommit(JmsTransaction transaction) throws Exception {
        System.out.println("Pre-committing transaction: " + transaction.id);
        // 在JMS中,事务准备阶段是隐式的,实际工作在commit时完成
        // 我们需要将事务信息保存到状态中,以便在故障恢复后能够commit
        pendingTransactions.add(transaction);
    }

    // 步骤4:正式提交事务
    @Override
    protected void commit(JmsTransaction transaction) {
        System.out.println("Committing transaction: " + transaction.id);
        try {
            transaction.commit();
            // 提交成功后,从待处理列表中移除
            pendingTransactions.clear();
        } catch (Exception e) {
            // 这里的异常是致命的,可能导致数据不一致
            throw new RuntimeException("Failed to commit JMS transaction", e);
        }
    }

    // 步骤5:中止事务
    @Override
    protected void abort(JmsTransaction transaction) {
        System.out.println("Aborting transaction: " + transaction.id);
        try {
            transaction.rollback();
        } catch (Exception e) {
           // Rollback失败通常问题不大,因为连接关闭后事务会自动回滚
           System.err.println("Error while rolling back transaction: " + e.getMessage());
        }
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在 preCommit 之后,checkpoint 触发时,pendingTransactions 已经被更新
        // Flink 会自动将这个状态快照
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<JmsTransaction> descriptor = new ListStateDescriptor<>(
            "pending-jms-transactions",
            TypeInformation.of(new TypeHint<JmsTransaction>() {})
        );
        pendingTransactions = context.getOperatorStateStore().getListState(descriptor);

        // 从故障恢复时,检查是否有未完成的事务
        if (context.isRestored()) {
            for (JmsTransaction transaction : pendingTransactions.get()) {
                // Flink的2PC协议保证,如果作业恢复,那么 preCommit 成功的事务最终都应该被 commit
                // 我们在这里尝试重新提交,但更好的做法是在 open() 方法中进行
                // 因为此时我们才能建立新的连接
                // 简单起见,这里只打印日志。生产代码需要更复杂的恢复逻辑。
                 System.out.println("Restored pending transaction: " + transaction.id);
            }
        }
    }

    // 内部类,封装JMS事务
    static class JmsTransaction {
        // 注意:这个类需要可序列化才能保存在Flink状态中
        final String id;
        transient Connection connection;
        transient Session session;
        transient MessageProducer producer;
        
        // 用于恢复
        private String brokerUrl;
        private String queueName;

        JmsTransaction(String brokerUrl, String queueName) throws JMSException {
            this.id = java.util.UUID.randomUUID().toString();
            this.brokerUrl = brokerUrl;
            this.queueName = queueName;
            init();
        }

        private void init() throws JMSException {
            ConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
            connection = factory.createConnection();
            connection.start();
            // 关键:创建事务性Session
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Destination destination = session.createQueue(queueName);
            producer = session.createProducer(destination);
        }

        void produce(String message) throws JMSException {
            producer.send(session.createTextMessage(message));
        }

        void commit() throws JMSException {
            session.commit();
            close();
        }
        
        void rollback() throws JMSException {
            session.rollback();
            close();
        }

        private void close() {
            try { if (producer != null) producer.close(); } catch (JMSException e) { /* ignore */ }
            try { if (session != null) session.close(); } catch (JMSException e) { /* ignore */ }
            try { if (connection != null) connection.close(); } catch (JMSException e) { /* ignore */ }
        }
    }
}

以下是两阶段提交流程的可视化表示:

sequenceDiagram
    participant JobManager
    participant SourceTask
    participant SinkTask
    participant ActiveMQ

    JobManager->>SourceTask: Trigger Checkpoint (ID: n)
    SourceTask->>ActiveMQ: Pre-commit Tx (read messages)
    SourceTask-->>JobManager: Acknowledge Checkpoint (ID: n, State Handle)
    
    JobManager->>SinkTask: Trigger Checkpoint (ID: n)
    SinkTask->>ActiveMQ: Begin Tx_sink
    SinkTask->>ActiveMQ: Write messages within Tx_sink
    Note right of SinkTask: This is preCommit phase
    SinkTask-->>JobManager: Acknowledge Checkpoint (ID: n, State Handle)
    
    JobManager->>JobManager: All tasks acknowledged for Checkpoint n
    JobManager->>SourceTask: Notify Checkpoint n Complete
    SourceTask->>ActiveMQ: Commit Tx (messages are consumed)
    
    JobManager->>SinkTask: Notify Checkpoint n Complete
    SinkTask->>ActiveMQ: Commit Tx_sink (messages are visible)

第二部分:Koa API的事务性消息生产

现在轮到入口了。Koa服务必须以事务方式向ActiveMQ发送消息。这意味着,我们必须在成功发送消息后才能向客户端返回200 OK,并且如果发送失败,必须返回错误且消息不能出现在队列中。

我们使用stomp-client库,因为它支持STOMP协议的事务。

// package.json
{
  "name": "transactional-api",
  "version": "1.0.0",
  "main": "index.js",
  "scripts": {
    "start": "babel-node src/app.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "dependencies": {
    "@stomp/stompjs": "^7.0.0",
    "koa": "^2.14.2",
    "koa-bodyparser": "^4.4.1",
    "koa-router": "^12.0.1",
    "log4js": "^6.9.1",
    "ws": "^8.14.2"
  },
  "devDependencies": {
    "@babel/cli": "^7.23.0",
    "@babel/core": "^7.23.2",
    "@babel/node": "^7.22.19",
    "@babel/preset-env": "^7.23.2"
  }
}

Babel配置用于支持ES6模块语法:

// .babelrc
{
  "presets": [
    "@babel/preset-env"
  ]
}

接下来是Koa应用的核心代码。我们创建一个中间件来管理STOMP客户端连接和事务。

// src/services/messageQueue.js
import { Client } from '@stomp/stompjs';
import { logger } from '../utils/logger.js';

const BROKER_URL = 'ws://localhost:61614'; // ActiveMQ's STOMP over WebSocket connector

// 维护一个单例的STOMP客户端
const client = new Client({
  brokerURL: BROKER_URL,
  reconnectDelay: 5000,
  heartbeatIncoming: 4000,
  heartbeatOutgoing: 4000,
  beforeConnect: () => {
    logger.info('Attempting to connect to ActiveMQ...');
  }
});

client.onConnect = (frame) => {
  logger.info('Connected to ActiveMQ: ' + frame);
};

client.onStompError = (frame) => {
  logger.error('Broker reported error: ' + frame.headers['message']);
  logger.error('Additional details: ' + frame.body);
};

client.activate();

/**
 * 在一个事务中发送消息
 * @param {string} destination - The queue name.
 * @param {string} body - The message body.
 * @returns {Promise<void>}
 */
export async function sendTransactionalMessage(destination, body) {
  if (!client.active) {
    throw new Error('Message broker is not connected.');
  }

  const transaction = client.begin();
  try {
    client.publish({
      destination: destination,
      body: body,
      headers: { 'persistent': 'true' },
      transaction: transaction.id,
    });
    
    // 这里的坑:一个常见的错误是在publish后立即提交。
    // 应该由调用者(API路由)来决定何时提交。
    // 这个函数返回一个包含commit和rollback方法的对象。
    logger.info(`Message published to queue ${destination} within transaction ${transaction.id}`);
    return {
      commit: () => {
        logger.info(`Committing transaction ${transaction.id}`);
        transaction.commit();
      },
      rollback: () => {
        logger.warn(`Rolling back transaction ${transaction.id}`);
        transaction.rollback();
      },
    };
  } catch (error) {
    logger.error(`Failed to publish message in transaction ${transaction.id}. Rolling back.`);
    transaction.rollback();
    // 确保上层能捕获到异常
    throw error;
  }
}

然后是Koa的路由和应用入口,它使用了上述服务。

// src/app.js
import Koa from 'koa';
import Router from 'koa-router';
import bodyParser from 'koa-bodyparser';
import { sendTransactionalMessage } from './services/messageQueue.js';
import { configureLogger, logger } from './utils/logger.js';

// 初始化日志
configureLogger();

const app = new Koa();
const router = new Router();

app.use(bodyParser());

// 健康检查
router.get('/health', (ctx) => {
  ctx.status = 200;
  ctx.body = { status: 'UP' };
});

// 核心业务API
router.post('/events', async (ctx) => {
  const event = ctx.request.body;
  if (!event || !event.id || !event.payload) {
    ctx.status = 400;
    ctx.body = { error: 'Invalid event format. Required: id, payload.' };
    return;
  }
  
  let transactionContext;
  try {
    // 1. 开始事务并发送消息
    transactionContext = await sendTransactionalMessage(
      '/queue/event-queue-in',
      JSON.stringify(event)
    );
    
    // 2. 如果有其他需要同步执行的操作(如写入数据库),可以在这里执行。
    // 如果失败,则调用 transactionContext.rollback() 并抛出异常。
    
    // 3. 所有操作成功,提交事务
    transactionContext.commit();
    
    // 4. 最后向客户端返回成功
    ctx.status = 202; // Accepted
    ctx.body = { message: 'Event accepted for processing.', eventId: event.id };
    logger.info(`Event ${event.id} accepted and transaction committed.`);

  } catch (error) {
    logger.error(`Failed to process event ${event.id}:`, error);
    // 确保事务回滚
    if (transactionContext) {
      transactionContext.rollback();
    }
    
    // 向客户端返回服务端错误
    ctx.status = 500;
    ctx.body = { error: 'Failed to enqueue event. Please try again.' };
  }
});

app.use(router.routes()).use(router.allowedMethods());

// 优雅停机
const server = app.listen(3000, () => {
  logger.info('Koa server listening on port 3000');
});

process.on('SIGTERM', () => {
  logger.info('SIGTERM signal received: closing HTTP server');
  server.close(() => {
    logger.info('HTTP server closed');
    // 可以在这里关闭数据库或MQ的连接
    process.exit(0);
  });
});

这个实现的关键在于,commit()操作在所有业务逻辑(即使是空的)都完成后,并且准备好发送HTTP响应之前执行。如果在sendTransactionalMessage内部或之后、commit之前发生任何异常,catch块会确保事务被回滚。

第三部分:GitHub Actions实现自动化构建、部署与验证

手动部署和测试这样一个分布式系统是不可靠的。我们使用GitHub Actions来固化这个流程。

# .github/workflows/ci-cd.yml
name: End-to-End Exactly-Once Pipeline CI/CD

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  build_and_test:
    runs-on: ubuntu-latest
    services:
      # 在后台启动一个ActiveMQ服务,用于集成测试
      activemq:
        image: rmohr/activemq:5.18.3
        ports:
          - 61616:61616 # OpenWire
          - 61614:61614 # STOMP over WS
          - 8161:8161   # Web Console

    steps:
    - name: Checkout repository
      uses: actions/checkout@v3

    #--------------------
    # Flink Job Build
    #--------------------
    - name: Set up JDK 11
      uses: actions/setup-java@v3
      with:
        java-version: '11'
        distribution: 'temurin'
        cache: 'maven'

    - name: Build Flink Job with Maven
      run: mvn -f flink-job/pom.xml clean package
      # 假设Flink项目在flink-job目录下

    - name: Upload Flink Job JAR as artifact
      uses: actions/upload-artifact@v3
      with:
        name: flink-job-jar
        path: flink-job/target/*.jar

    #--------------------
    # Koa Service Build & Unit Test
    #--------------------
    - name: Set up Node.js 18
      uses: actions/setup-node@v3
      with:
        node-version: '18'
        cache: 'npm'
        cache-dependency-path: 'koa-api/package-lock.json'
        # 假设Koa项目在koa-api目录下

    - name: Install Koa dependencies
      run: npm install
      working-directory: ./koa-api

    - name: Run Koa service unit tests
      run: npm test
      working-directory: ./koa-api
      # 注意:这里应该是单元测试,不依赖外部服务

    - name: Build Docker image for Koa service
      run: |
        docker build -t transactional-api:latest ./koa-api
        # Dockerfile应该在koa-api目录下

    #--------------------
    # End-to-End Integration Test
    #--------------------
    - name: Run Integration Test
      run: |
        # 这个脚本是关键,它会模拟整个流程
        # 1. 提交Flink作业到本地的MiniCluster或直接运行main方法
        # 2. 启动Koa服务
        # 3. 调用Koa API发送事件
        # 4. 消费输出队列,验证结果是否正确
        # 5. (高级)在处理中途模拟故障,然后重启,再次验证最终结果
        ./scripts/run-integration-test.sh
      env:
        ACTIVEMQ_HOST: localhost
        ACTIVEMQ_PORT: 61616

scripts/run-integration-test.sh脚本会很复杂,它的大致思路是:

  1. 启动Flink作业进程在后台。
  2. 启动Koa API进程在后台。
  3. 等待服务都健康。
  4. 使用curl发送一个包含唯一ID的事件到Koa API。
  5. 使用一个简单的JMS客户端连接到ActiveMQ的event-queue-out队列。
  6. 消费消息,检查消息内容是否与预期一致(包含原始事件和处理时间戳)。
  7. 验证只收到一条消息,没有重复。
  8. 为了测试容错,可以发送消息后,立即kill Flink作业进程,然后再重启它,最后再验证输出队列,结果应该仍然是正确的。

遗留问题与未来迭代

尽管这个方案实现了端到端的Exactly-Once,但它并非没有权衡。

  1. 性能开销:两阶段提交和事务本身会带来显著的延迟和吞吐量损耗。对于需要极低延迟的场景,这可能不是最佳选择。与Kafka的幂等生产者+事务相比,JMS的事务模型通常更重。
  2. Koa与MQ的事务局限性:当前Koa服务与ActiveMQ之间的事务是局部的。如果API处理逻辑还需要写入数据库,那么这个数据库操作将不包含在ActiveMQ的事务中。要实现跨资源(数据库+消息队列)的分布式事务,需要引入XA协议,这在Node.js生态中实现起来非常复杂且支持有限。
  3. ActiveMQ集群:单点的ActiveMQ是系统的瓶颈和单点故障。在生产环境中,必须部署高可用的ActiveMQ集群(如Master-Slave或Network of Brokers),这会增加运维复杂性。
  4. 恢复逻辑的健壮性TwoPhaseCommitSinkFunction的恢复逻辑需要经过严格测试。在我们的示例中,恢复逻辑被简化了。生产代码需要处理更复杂的场景,例如在commit阶段持续失败(比如网络分区),这时可能需要人工干预。

未来的迭代方向可能包括:将消息队列替换为Kafka,利用其高吞吐量和更现代的事务模型进行对比;在GitHub Actions中引入更复杂的混沌测试,例如使用toxiproxy模拟网络延迟和中断,以系统化地验证系统的韧性。


  目录