我们面临一个棘手的需求:一个外部系统通过HTTP API推送关键业务事件,我们的后端系统必须对这些事件进行有状态的流式处理,并且从数据接收到处理完成,整个链路必须保证“精确一次”(Exactly-Once)语义。任何事件的丢失或重复处理都可能导致业务逻辑错误和财务损失。在真实项目中,这意味着即使在API服务、消息队列或流处理作业发生崩溃重启的情况下,数据完整性也不能被破坏。
最初的架构构想是 HTTP API -> 消息队列 -> Flink。这看起来很标准,但魔鬼藏在细节里。如何确保从HTTP请求成功响应的那一刻起,到Flink完成状态更新,这条漫长的路径上,数据不多不少,只被处理一次?单纯的Flink Checkpoint只能保证Flink内部的Exactly-Once,它无法覆盖从外部API到消息队列这一段。如果Koa服务在发送消息到ActiveMQ后、响应HTTP请求前崩溃,会发生什么?如果Flink消费了消息、更新了状态,但在Checkpoint完成前崩溃,又会发生什么?
我们决定直面这个挑战,构建一个从API入口(Koa.js)到消息队列(ActiveMQ)再到流处理核心(Apache Flink)的全链路Exactly-Once管道。整个构建、测试和部署流程将由GitHub Actions自动化,确保方案的健壮性可以在持续集成中得到验证。
技术选型决策与核心挑战
- Apache Flink: 作为流处理引擎,它的两阶段提交(Two-Phase Commit)Checkpoint机制是实现Exactly-Once的核心。它能与支持事务的Source和Sink协同工作,形成一个原子性的端到端事务。
- ActiveMQ: 我们选择ActiveMQ而不是更流行的Kafka,主要是为了利用其成熟的JMS事务支持。JMS事务提供了一个清晰的
commit/rollback模型,这对于Flink的TwoPhaseCommitSinkFunction和事务性Source来说,是一个非常经典的集成范例。这里的坑在于,Flink社区对ActiveMQ的事务性连接器支持不如Kafka完善,我们需要进行更细致的配置甚至定制。 - Koa.js + Babel: 作为API网关,Node.js的异步I/O模型非常适合处理大量并发的HTTP请求。最大的挑战在于,Node.js(一个非JVM环境)如何与ActiveMQ进行事务性交互。我们需要找到一个可靠的库,并精心设计API处理逻辑,确保“消息入队”和“HTTP响应”这两个操作的原子性。
- GitHub Actions: 自动化是生产级系统的基石。我们的工作流不仅要编译和打包一个Java应用和一个Node.js应用,还必须能够编排启动整个依赖环境(ActiveMQ, Flink),并运行一个端到端的集成测试,该测试会主动模拟故障来验证我们的Exactly-Once承诺。
步骤化实现:构建事务性数据管道
第一部分:Flink与ActiveMQ的事务性集成
Flink要实现端到端的Exactly-Once,其Source必须是可重放的,Sink必须是事务性的。在我们的场景中,ActiveMQ既是Source,也要作为可能的Sink。
首先是Flink作业的Maven依赖,我们需要flink-connector-jms和activemq-client。
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<!-- JMS Connector for Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jms</artifactId>
<version>1.17.1</version>
</dependency>
<!-- ActiveMQ Client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.18.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
<scope>runtime</scope>
</dependency>
</dependencies>
核心是实现一个事务性的JMSSource。flink-connector-jms提供的JMSSource通过启用JMS会话的事务模式,并结合Flink的Checkpoint机制来实现精确一次消费。
当Flink触发一个Checkpoint时:
-
JMSSource预提交当前的JMS事务。 - Flink将算子状态快照写入持久化存储。
- 当所有算子都成功完成快照后,Flink的JobManager会通知所有算子Checkpoint已完成。
-
JMSSource收到通知后,正式提交JMS事务,消息才会被确认为消费。 - 如果Checkpoint失败,
JMSSource会回滚事务,消息会回到队列中,待作业恢复后重新消费。
下面是Flink作业的核心代码,它从一个输入队列event-queue-in消费消息,进行简单的转换,然后使用一个自定义的两阶段提交Sink将结果写入event-queue-out。
// FlinkExactlyOnceJob.java
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jms.source.JMSSource;
import org.apache.flink.connector.jms.source.config.JMSSourceConfig;
import org.apache.flink.connector.jms.source.reader.deserializer.JMSDeserializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.CheckpointingMode;
import javax.jms.ConnectionFactory;
import javax.jms.TextMessage;
public class FlinkExactlyOnceJob {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String INPUT_QUEUE = "event-queue-in";
private static final String OUTPUT_QUEUE = "event-queue-out";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关键:启用Checkpoint,并设置为Exactly-Once模式
// checkpoint间隔5秒,对于生产环境这个值需要仔细权衡
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint超时
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 两次checkpoint之间最小间隔
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只能有一个checkpoint
// 配置ActiveMQ Connection Factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 构建事务性的JMSSource
JMSSource<String> jmsSource = JMSSource.builder()
.setConfig(JMSSourceConfig.builder()
.setConnectionFactory(connectionFactory)
.setDestination(INPUT_QUEUE)
// 关键:设置为客户端确认模式,并启用事务
.setAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE)
.setTransactional(true)
.build())
.setDeserializer(JMSDeserializer.of(message -> {
if (message instanceof TextMessage) {
return ((TextMessage) message).getText();
}
return null;
}))
.build();
DataStream<String> stream = env.fromSource(jmsSource, WatermarkStrategy.noWatermarks(), "ActiveMQ Source");
// 简单的业务逻辑:在消息体后追加处理时间戳
DataStream<String> processedStream = stream.map((MapFunction<String, String>) value -> {
// 在真实项目中,这里可能包含复杂的状态计算
System.out.println("Processing message: " + value);
return value + ", processed_at: " + System.currentTimeMillis();
});
// 添加一个两阶段提交的Sink,将结果写入另一个ActiveMQ队列
processedStream.addSink(new TwoPhaseCommitActiveMQSink(BROKER_URL, OUTPUT_QUEUE))
.name("Transactional ActiveMQ Sink");
env.execute("Flink End-to-End Exactly-Once Demo");
}
}
为了实现事务性输出,我们需要自己实现一个TwoPhaseCommitSinkFunction。
// TwoPhaseCommitActiveMQSink.java
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import javax.jms.*;
public class TwoPhaseCommitActiveMQSink extends TwoPhaseCommitSinkFunction<String, TwoPhaseCommitActiveMQSink.JmsTransaction, Void>
implements CheckpointedFunction {
private final String brokerUrl;
private final String queueName;
// 非瞬态,用于在恢复时重新提交或中止事务
private transient ListState<JmsTransaction> pendingTransactions;
public TwoPhaseCommitActiveMQSink(String brokerUrl, String queueName) {
// 使用一个简单的Void作为上下文
super(TypeInformation.of(JmsTransaction.class).createSerializer(), TypeInformation.of(Void.class).createSerializer());
this.brokerUrl = brokerUrl;
this.queueName = queueName;
}
// 步骤1:开始一个新事务
@Override
protected JmsTransaction beginTransaction() throws Exception {
System.out.println("Beginning transaction...");
return new JmsTransaction(brokerUrl, queueName);
}
// 步骤2:将数据写入当前事务
@Override
protected void invoke(JmsTransaction transaction, String value, Context context) throws Exception {
transaction.produce(value);
}
// 步骤3:预提交事务
@Override
protected void preCommit(JmsTransaction transaction) throws Exception {
System.out.println("Pre-committing transaction: " + transaction.id);
// 在JMS中,事务准备阶段是隐式的,实际工作在commit时完成
// 我们需要将事务信息保存到状态中,以便在故障恢复后能够commit
pendingTransactions.add(transaction);
}
// 步骤4:正式提交事务
@Override
protected void commit(JmsTransaction transaction) {
System.out.println("Committing transaction: " + transaction.id);
try {
transaction.commit();
// 提交成功后,从待处理列表中移除
pendingTransactions.clear();
} catch (Exception e) {
// 这里的异常是致命的,可能导致数据不一致
throw new RuntimeException("Failed to commit JMS transaction", e);
}
}
// 步骤5:中止事务
@Override
protected void abort(JmsTransaction transaction) {
System.out.println("Aborting transaction: " + transaction.id);
try {
transaction.rollback();
} catch (Exception e) {
// Rollback失败通常问题不大,因为连接关闭后事务会自动回滚
System.err.println("Error while rolling back transaction: " + e.getMessage());
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在 preCommit 之后,checkpoint 触发时,pendingTransactions 已经被更新
// Flink 会自动将这个状态快照
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<JmsTransaction> descriptor = new ListStateDescriptor<>(
"pending-jms-transactions",
TypeInformation.of(new TypeHint<JmsTransaction>() {})
);
pendingTransactions = context.getOperatorStateStore().getListState(descriptor);
// 从故障恢复时,检查是否有未完成的事务
if (context.isRestored()) {
for (JmsTransaction transaction : pendingTransactions.get()) {
// Flink的2PC协议保证,如果作业恢复,那么 preCommit 成功的事务最终都应该被 commit
// 我们在这里尝试重新提交,但更好的做法是在 open() 方法中进行
// 因为此时我们才能建立新的连接
// 简单起见,这里只打印日志。生产代码需要更复杂的恢复逻辑。
System.out.println("Restored pending transaction: " + transaction.id);
}
}
}
// 内部类,封装JMS事务
static class JmsTransaction {
// 注意:这个类需要可序列化才能保存在Flink状态中
final String id;
transient Connection connection;
transient Session session;
transient MessageProducer producer;
// 用于恢复
private String brokerUrl;
private String queueName;
JmsTransaction(String brokerUrl, String queueName) throws JMSException {
this.id = java.util.UUID.randomUUID().toString();
this.brokerUrl = brokerUrl;
this.queueName = queueName;
init();
}
private void init() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
connection = factory.createConnection();
connection.start();
// 关键:创建事务性Session
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
producer = session.createProducer(destination);
}
void produce(String message) throws JMSException {
producer.send(session.createTextMessage(message));
}
void commit() throws JMSException {
session.commit();
close();
}
void rollback() throws JMSException {
session.rollback();
close();
}
private void close() {
try { if (producer != null) producer.close(); } catch (JMSException e) { /* ignore */ }
try { if (session != null) session.close(); } catch (JMSException e) { /* ignore */ }
try { if (connection != null) connection.close(); } catch (JMSException e) { /* ignore */ }
}
}
}
以下是两阶段提交流程的可视化表示:
sequenceDiagram
participant JobManager
participant SourceTask
participant SinkTask
participant ActiveMQ
JobManager->>SourceTask: Trigger Checkpoint (ID: n)
SourceTask->>ActiveMQ: Pre-commit Tx (read messages)
SourceTask-->>JobManager: Acknowledge Checkpoint (ID: n, State Handle)
JobManager->>SinkTask: Trigger Checkpoint (ID: n)
SinkTask->>ActiveMQ: Begin Tx_sink
SinkTask->>ActiveMQ: Write messages within Tx_sink
Note right of SinkTask: This is preCommit phase
SinkTask-->>JobManager: Acknowledge Checkpoint (ID: n, State Handle)
JobManager->>JobManager: All tasks acknowledged for Checkpoint n
JobManager->>SourceTask: Notify Checkpoint n Complete
SourceTask->>ActiveMQ: Commit Tx (messages are consumed)
JobManager->>SinkTask: Notify Checkpoint n Complete
SinkTask->>ActiveMQ: Commit Tx_sink (messages are visible)
第二部分:Koa API的事务性消息生产
现在轮到入口了。Koa服务必须以事务方式向ActiveMQ发送消息。这意味着,我们必须在成功发送消息后才能向客户端返回200 OK,并且如果发送失败,必须返回错误且消息不能出现在队列中。
我们使用stomp-client库,因为它支持STOMP协议的事务。
// package.json
{
"name": "transactional-api",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"start": "babel-node src/app.js",
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"@stomp/stompjs": "^7.0.0",
"koa": "^2.14.2",
"koa-bodyparser": "^4.4.1",
"koa-router": "^12.0.1",
"log4js": "^6.9.1",
"ws": "^8.14.2"
},
"devDependencies": {
"@babel/cli": "^7.23.0",
"@babel/core": "^7.23.2",
"@babel/node": "^7.22.19",
"@babel/preset-env": "^7.23.2"
}
}
Babel配置用于支持ES6模块语法:
// .babelrc
{
"presets": [
"@babel/preset-env"
]
}
接下来是Koa应用的核心代码。我们创建一个中间件来管理STOMP客户端连接和事务。
// src/services/messageQueue.js
import { Client } from '@stomp/stompjs';
import { logger } from '../utils/logger.js';
const BROKER_URL = 'ws://localhost:61614'; // ActiveMQ's STOMP over WebSocket connector
// 维护一个单例的STOMP客户端
const client = new Client({
brokerURL: BROKER_URL,
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
beforeConnect: () => {
logger.info('Attempting to connect to ActiveMQ...');
}
});
client.onConnect = (frame) => {
logger.info('Connected to ActiveMQ: ' + frame);
};
client.onStompError = (frame) => {
logger.error('Broker reported error: ' + frame.headers['message']);
logger.error('Additional details: ' + frame.body);
};
client.activate();
/**
* 在一个事务中发送消息
* @param {string} destination - The queue name.
* @param {string} body - The message body.
* @returns {Promise<void>}
*/
export async function sendTransactionalMessage(destination, body) {
if (!client.active) {
throw new Error('Message broker is not connected.');
}
const transaction = client.begin();
try {
client.publish({
destination: destination,
body: body,
headers: { 'persistent': 'true' },
transaction: transaction.id,
});
// 这里的坑:一个常见的错误是在publish后立即提交。
// 应该由调用者(API路由)来决定何时提交。
// 这个函数返回一个包含commit和rollback方法的对象。
logger.info(`Message published to queue ${destination} within transaction ${transaction.id}`);
return {
commit: () => {
logger.info(`Committing transaction ${transaction.id}`);
transaction.commit();
},
rollback: () => {
logger.warn(`Rolling back transaction ${transaction.id}`);
transaction.rollback();
},
};
} catch (error) {
logger.error(`Failed to publish message in transaction ${transaction.id}. Rolling back.`);
transaction.rollback();
// 确保上层能捕获到异常
throw error;
}
}
然后是Koa的路由和应用入口,它使用了上述服务。
// src/app.js
import Koa from 'koa';
import Router from 'koa-router';
import bodyParser from 'koa-bodyparser';
import { sendTransactionalMessage } from './services/messageQueue.js';
import { configureLogger, logger } from './utils/logger.js';
// 初始化日志
configureLogger();
const app = new Koa();
const router = new Router();
app.use(bodyParser());
// 健康检查
router.get('/health', (ctx) => {
ctx.status = 200;
ctx.body = { status: 'UP' };
});
// 核心业务API
router.post('/events', async (ctx) => {
const event = ctx.request.body;
if (!event || !event.id || !event.payload) {
ctx.status = 400;
ctx.body = { error: 'Invalid event format. Required: id, payload.' };
return;
}
let transactionContext;
try {
// 1. 开始事务并发送消息
transactionContext = await sendTransactionalMessage(
'/queue/event-queue-in',
JSON.stringify(event)
);
// 2. 如果有其他需要同步执行的操作(如写入数据库),可以在这里执行。
// 如果失败,则调用 transactionContext.rollback() 并抛出异常。
// 3. 所有操作成功,提交事务
transactionContext.commit();
// 4. 最后向客户端返回成功
ctx.status = 202; // Accepted
ctx.body = { message: 'Event accepted for processing.', eventId: event.id };
logger.info(`Event ${event.id} accepted and transaction committed.`);
} catch (error) {
logger.error(`Failed to process event ${event.id}:`, error);
// 确保事务回滚
if (transactionContext) {
transactionContext.rollback();
}
// 向客户端返回服务端错误
ctx.status = 500;
ctx.body = { error: 'Failed to enqueue event. Please try again.' };
}
});
app.use(router.routes()).use(router.allowedMethods());
// 优雅停机
const server = app.listen(3000, () => {
logger.info('Koa server listening on port 3000');
});
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server');
server.close(() => {
logger.info('HTTP server closed');
// 可以在这里关闭数据库或MQ的连接
process.exit(0);
});
});
这个实现的关键在于,commit()操作在所有业务逻辑(即使是空的)都完成后,并且准备好发送HTTP响应之前执行。如果在sendTransactionalMessage内部或之后、commit之前发生任何异常,catch块会确保事务被回滚。
第三部分:GitHub Actions实现自动化构建、部署与验证
手动部署和测试这样一个分布式系统是不可靠的。我们使用GitHub Actions来固化这个流程。
# .github/workflows/ci-cd.yml
name: End-to-End Exactly-Once Pipeline CI/CD
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build_and_test:
runs-on: ubuntu-latest
services:
# 在后台启动一个ActiveMQ服务,用于集成测试
activemq:
image: rmohr/activemq:5.18.3
ports:
- 61616:61616 # OpenWire
- 61614:61614 # STOMP over WS
- 8161:8161 # Web Console
steps:
- name: Checkout repository
uses: actions/checkout@v3
#--------------------
# Flink Job Build
#--------------------
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
cache: 'maven'
- name: Build Flink Job with Maven
run: mvn -f flink-job/pom.xml clean package
# 假设Flink项目在flink-job目录下
- name: Upload Flink Job JAR as artifact
uses: actions/upload-artifact@v3
with:
name: flink-job-jar
path: flink-job/target/*.jar
#--------------------
# Koa Service Build & Unit Test
#--------------------
- name: Set up Node.js 18
uses: actions/setup-node@v3
with:
node-version: '18'
cache: 'npm'
cache-dependency-path: 'koa-api/package-lock.json'
# 假设Koa项目在koa-api目录下
- name: Install Koa dependencies
run: npm install
working-directory: ./koa-api
- name: Run Koa service unit tests
run: npm test
working-directory: ./koa-api
# 注意:这里应该是单元测试,不依赖外部服务
- name: Build Docker image for Koa service
run: |
docker build -t transactional-api:latest ./koa-api
# Dockerfile应该在koa-api目录下
#--------------------
# End-to-End Integration Test
#--------------------
- name: Run Integration Test
run: |
# 这个脚本是关键,它会模拟整个流程
# 1. 提交Flink作业到本地的MiniCluster或直接运行main方法
# 2. 启动Koa服务
# 3. 调用Koa API发送事件
# 4. 消费输出队列,验证结果是否正确
# 5. (高级)在处理中途模拟故障,然后重启,再次验证最终结果
./scripts/run-integration-test.sh
env:
ACTIVEMQ_HOST: localhost
ACTIVEMQ_PORT: 61616
scripts/run-integration-test.sh脚本会很复杂,它的大致思路是:
- 启动Flink作业进程在后台。
- 启动Koa API进程在后台。
- 等待服务都健康。
- 使用
curl发送一个包含唯一ID的事件到Koa API。 - 使用一个简单的JMS客户端连接到ActiveMQ的
event-queue-out队列。 - 消费消息,检查消息内容是否与预期一致(包含原始事件和处理时间戳)。
- 验证只收到一条消息,没有重复。
- 为了测试容错,可以发送消息后,立即
killFlink作业进程,然后再重启它,最后再验证输出队列,结果应该仍然是正确的。
遗留问题与未来迭代
尽管这个方案实现了端到端的Exactly-Once,但它并非没有权衡。
- 性能开销:两阶段提交和事务本身会带来显著的延迟和吞吐量损耗。对于需要极低延迟的场景,这可能不是最佳选择。与Kafka的幂等生产者+事务相比,JMS的事务模型通常更重。
- Koa与MQ的事务局限性:当前Koa服务与ActiveMQ之间的事务是局部的。如果API处理逻辑还需要写入数据库,那么这个数据库操作将不包含在ActiveMQ的事务中。要实现跨资源(数据库+消息队列)的分布式事务,需要引入XA协议,这在Node.js生态中实现起来非常复杂且支持有限。
- ActiveMQ集群:单点的ActiveMQ是系统的瓶颈和单点故障。在生产环境中,必须部署高可用的ActiveMQ集群(如Master-Slave或Network of Brokers),这会增加运维复杂性。
- 恢复逻辑的健壮性:
TwoPhaseCommitSinkFunction的恢复逻辑需要经过严格测试。在我们的示例中,恢复逻辑被简化了。生产代码需要处理更复杂的场景,例如在commit阶段持续失败(比如网络分区),这时可能需要人工干预。
未来的迭代方向可能包括:将消息队列替换为Kafka,利用其高吞吐量和更现代的事务模型进行对比;在GitHub Actions中引入更复杂的混沌测试,例如使用toxiproxy模拟网络延迟和中断,以系统化地验证系统的韧性。