一个跨越多个微服务的业务流程,例如为一个新客户开通云资源,天然地将我们引向了分布式事务的难题。这个流程可能包含三个原子步骤:在用户服务中创建账户,在存储服务中分配对象存储桶,最后在权限服务中设置访问策略。这三步必须作为一个整体成功或失败。任何一步的失败都要求回滚之前已成功的步骤,以保证系统数据的最终一致性。在真实项目中,使用传统的两阶段提交(2PC)协议,由于其同步阻塞特性和对数据库的强依赖,在松耦合的微服务架构中几乎是不可行的。这迫使我们必须寻找替代方案。
方案 A:基于事件的编舞式 Saga (Choreography-based Saga)
编舞式 Saga 是一种去中心化的模式。每个服务在完成自己的本地事务后,发布一个事件。其他服务订阅这些事件并触发自己的本地事务。如果某个步骤失败,该服务会发布一个失败事件,触发之前已成功的服务执行补偿操作。
sequenceDiagram
participant Client
participant UserService
participant StorageService
participant PermissionService
participant MessageBroker
Client->>UserService: 1. CreateUserRequest
UserService->>MessageBroker: 2. Publish(UserCreatedEvent)
MessageBroker-->>StorageService: 3. Consume(UserCreatedEvent)
StorageService->>MessageBroker: 4. Publish(StorageAllocatedEvent)
MessageBroker-->>PermissionService: 5. Consume(StorageAllocatedEvent)
PermissionService->>MessageBroker: 6. Publish(PermissionsGrantedEvent)
Note over PermissionService: 操作失败!
PermissionService->>MessageBroker: 7. Publish(PermissionGrantFailedEvent)
MessageBroker-->>StorageService: 8. Consume(PermissionGrantFailedEvent)
Note over StorageService: 执行补偿: DeallocateStorage
StorageService->>MessageBroker: 9. Publish(StorageDeallocatedEvent)
MessageBroker-->>UserService: 10. Consume(StorageDeallocatedEvent)
Note over UserService: 执行补偿: DeactivateUser
优势分析:
- 高度解耦: 服务之间没有直接调用,它们只关心事件,这符合微服务的设计哲学。
- 简单性: 对于单个服务而言,逻辑很简单:完成工作,然后发布事件。
劣势分析:
- 流程可见性差: 整个业务流程的定义被分散在各个服务中。当流程涉及十几个服务时,要弄清楚完整的调用链和状态变迁会变得极其困难。没有一个地方能看到事务的全局视图。
- 循环依赖风险:
A -> B -> C -> A这样的事件循环很难在设计阶段被发现,一旦发生,将导致系统灾难。 - 调试噩梦: 当一个事务失败时,追踪问题根源需要检查多个服务的日志和消息队列的状态。定位“是哪个服务没有响应?”或“为什么补偿事件没有被消费?”这类问题,成本极高。
- 测试复杂: 端到端的集成测试需要部署所有相关的服务和消息中间件,维护一套稳定的测试环境本身就是一个挑战。
对于我们这个确定性的、步骤清晰的资源开通过程,编舞模式带来的“灵活性”实际上是混乱的源头。我们需要的是一个对业务流程有强控制力、状态明确、易于监控和调试的方案。因此,编排式 Saga 成为更现实的选择。
方案 B:集中式的编排式 Saga (Orchestration-based Saga)
编排式 Saga 引入了一个中心协调者(Orchestrator)。这个协调者负责驱动整个业务流程,它按顺序调用参与方服务执行操作。如果某个操作失败,协调者负责调用之前所有已成功操作的补偿接口。
sequenceDiagram
participant Client
participant Orchestrator
participant UserService
participant StorageService
participant PermissionService
Client->>Orchestrator: 1. StartProvisionSaga(userId, ...)
Orchestrator->>UserService: 2. Execute: CreateUser
UserService-->>Orchestrator: 3. Success
Orchestrator->>StorageService: 4. Execute: AllocateStorage
StorageService-->>Orchestrator: 5. Success
Orchestrator->>PermissionService: 6. Execute: GrantPermissions
PermissionService-->>Orchestrator: 7. Failure
Note over Orchestrator: GrantPermissions 失败, 开始回滚
Orchestrator->>StorageService: 8. Compensate: DeallocateStorage
StorageService-->>Orchestrator: 9. Success
Orchestrator->>UserService: 10. Compensate: DeactivateUser
UserService-->>Orchestrator: 11. Success
Note over Orchestrator: Saga 完成回滚, 更新状态为 FAILED
Orchestrator-->>Client: 12. SagaStatus: FAILED
优势分析:
- 集中化流程控制: 业务流程的逻辑(执行步骤、重试策略、补偿顺序)都集中在协调者中,清晰可见,易于理解和修改。
- 明确的状态管理: 协调者维护着 Saga 的全局状态机,任何时候都可以查询一个事务进行到了哪一步,是成功、失败还是正在回滚。
- 简化的参与方: 参与方服务变得非常“纯粹”,它们只需要提供执行操作和补偿操作的接口,无需了解整个业务流程。
- 可观测性与调试: 所有的流程控制日志都集中在协调者,极大地简化了故障排查。
劣势分析:
- 协调者单点风险: 协调者本身成为系统的关键组件,必须保证其高可用。
- 服务发现与耦合: 协调者需要知道所有参与方服务的地址和接口,形成了一定程度的调用耦合。但这在可控范围内。
最终决策:
在可靠性和可维护性压倒一切的生产环境中,编排模式是更务实的选择。它将复杂的流程控制逻辑从各个业务服务中剥离出来,集中管理,使得系统行为更加可预测和可控。我们将基于 Python 构建这个协调者服务。
核心实现概览
我们将构建四个服务:saga-orchestrator、user-service、storage-service 和 permission-service,全部使用 FastAPI 和 SQLAlchemy。前端则使用 Next.js 与协调者交互。
1. Saga Orchestrator 服务
这是整个模式的核心。它需要一个持久化层来存储每个 Saga 实例的状态。
数据库模型 (models.py):
# saga_orchestrator/models.py
import uuid
from enum import Enum
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class SagaStatus(str, Enum):
STARTED = "STARTED"
EXECUTING_USER_CREATION = "EXECUTING_USER_CREATION"
EXECUTING_STORAGE_ALLOCATION = "EXECUTING_STORAGE_ALLOCATION"
EXECUTING_PERMISSION_GRANTING = "EXECUTING_PERMISSION_GRANTING"
COMPLETED = "COMPLETED"
COMPENSATING_PERMISSION_GRANTING = "COMPENSATING_PERMISSION_GRANTING"
COMPENSATING_STORAGE_ALLOCATION = "COMPENSATING_STORAGE_ALLOCATION"
COMPENSATING_USER_CREATION = "COMPENSATING_USER_CREATION"
FAILED = "FAILED"
class SagaInstance(Base):
__tablename__ = "saga_instances"
id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
saga_name = sa.Column(sa.String, nullable=False, index=True)
current_status = sa.Column(sa.Enum(SagaStatus), nullable=False, default=SagaStatus.STARTED)
payload = sa.Column(JSONB, nullable=False)
# Store results from each step for compensation
step_results = sa.Column(JSONB, nullable=True, default=dict)
created_at = sa.Column(sa.DateTime, server_default=sa.func.now())
updated_at = sa.Column(sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now())
def __repr__(self):
return f"<SagaInstance(id={self.id}, status='{self.current_status.value}')>"
Saga 定义与执行器 (saga.py):
我们将定义一个 Saga 流程,包含每个步骤的操作和补偿。
# saga_orchestrator/saga.py
import httpx
import logging
from sqlalchemy.orm import Session
from .models import SagaInstance, SagaStatus
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# In a real app, these would be in a config file
USER_SERVICE_URL = "http://user-service:8001"
STORAGE_SERVICE_URL = "http://storage-service:8002"
PERMISSION_SERVICE_URL = "http://permission-service:8003"
class SagaStep:
def __init__(self, action, compensation):
self.action = action
self.compensation = compensation
async def create_user_action(saga_instance: SagaInstance):
payload = saga_instance.payload
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(f"{USER_SERVICE_URL}/users", json={"username": payload["username"]})
response.raise_for_status() # Raise exception for non-2xx status
# Store created user ID for potential compensation
return {"user_id": response.json()["id"]}
async def deactivate_user_compensation(saga_instance: SagaInstance):
# Idempotency: The user service should handle repeated deletions gracefully.
user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
if not user_id:
logger.warning(f"Saga {saga_instance.id}: No user_id found for compensation.")
return
async with httpx.AsyncClient(timeout=10.0) as client:
await client.delete(f"{USER_SERVICE_URL}/users/{user_id}")
async def allocate_storage_action(saga_instance: SagaInstance):
user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(f"{STORAGE_SERVICE_URL}/buckets", json={"owner_id": user_id})
response.raise_for_status()
return {"bucket_name": response.json()["name"]}
async def deallocate_storage_compensation(saga_instance: SagaInstance):
bucket_name = saga_instance.step_results.get("allocate_storage", {}).get("bucket_name")
if not bucket_name:
logger.warning(f"Saga {saga_instance.id}: No bucket_name found for compensation.")
return
async with httpx.AsyncClient(timeout=10.0) as client:
# Note: DELETE should be idempotent by definition.
await client.delete(f"{STORAGE_SERVICE_URL}/buckets/{bucket_name}")
async def grant_permissions_action(saga_instance: SagaInstance):
user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
bucket_name = saga_instance.step_results.get("allocate_storage", {}).get("bucket_name")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{PERMISSION_SERVICE_URL}/permissions",
json={"user_id": user_id, "resource": f"bucket:{bucket_name}"}
)
response.raise_for_status()
return {"permission_id": response.json()["id"]}
# The compensation for granting permission is often just revoking it.
async def revoke_permissions_compensation(saga_instance: SagaInstance):
permission_id = saga_instance.step_results.get("grant_permissions", {}).get("permission_id")
if not permission_id:
logger.warning(f"Saga {saga_instance.id}: No permission_id found for compensation.")
return
async with httpx.AsyncClient(timeout=10.0) as client:
await client.delete(f"{PERMISSION_SERVICE_URL}/permissions/{permission_id}")
# Define the Saga workflow
PROVISION_RESOURCE_SAGA = {
"name": "provision_resource",
"steps": [
(SagaStatus.EXECUTING_USER_CREATION, SagaStep(action=create_user_action, compensation=deactivate_user_compensation)),
(SagaStatus.EXECUTING_STORAGE_ALLOCATION, SagaStep(action=allocate_storage_action, compensation=deallocate_storage_compensation)),
(SagaStatus.EXECUTING_PERMISSION_GRANTING, SagaStep(action=grant_permissions_action, compensation=revoke_permissions_compensation)),
]
}
class SagaExecutionError(Exception):
pass
class SagaExecutor:
def __init__(self, db: Session, saga_definition: dict):
self.db = db
self.saga_definition = saga_definition
async def run(self, saga_instance: SagaInstance):
completed_steps = []
try:
for i, (status, step) in enumerate(self.saga_definition["steps"]):
# 1. Update status before executing step
saga_instance.current_status = status
self.db.commit()
self.db.refresh(saga_instance)
logger.info(f"Saga {saga_instance.id}: Executing step {status.value}")
# 2. Execute action
result = await step.action(saga_instance)
# 3. Persist step result
action_name = step.action.__name__.replace('_action', '')
saga_instance.step_results[action_name] = result
# A mutable JSONB field needs to be flagged as modified for SQLAlchemy to detect changes
from sqlalchemy.orm.attributes import flag_modified
flag_modified(saga_instance, "step_results")
self.db.commit()
completed_steps.append(step)
saga_instance.current_status = SagaStatus.COMPLETED
self.db.commit()
logger.info(f"Saga {saga_instance.id}: Completed successfully.")
except Exception as e:
logger.error(f"Saga {saga_instance.id}: Failed at step {saga_instance.current_status.value}. Error: {e}. Starting compensation.")
await self.compensate(saga_instance, completed_steps)
raise SagaExecutionError(f"Saga failed and was rolled back.") from e
async def compensate(self, saga_instance: SagaInstance, completed_steps: list):
for step in reversed(completed_steps):
try:
# Infer compensation status from action status
compensation_status_str = f"COMPENSATING_{saga_instance.current_status.value.replace('EXECUTING_', '')}"
saga_instance.current_status = SagaStatus[compensation_status_str]
self.db.commit()
logger.info(f"Saga {saga_instance.id}: Compensating step {saga_instance.current_status.value}")
await step.compensation(saga_instance)
except Exception as e:
# If compensation fails, it's a critical error. Requires manual intervention.
# A common practice is to move it to a "dead letter" queue/state.
logger.critical(f"Saga {saga_instance.id}: FATAL! Compensation failed for step {step.action.__name__}. Error: {e}. Manual intervention required.")
saga_instance.current_status = SagaStatus.FAILED
# Mark as terminally failed to stop further processing
self.db.commit()
return
saga_instance.current_status = SagaStatus.FAILED
self.db.commit()
logger.info(f"Saga {saga_instance.id}: Compensation completed.")
API Endpoints (main.py):
# saga_orchestrator/main.py
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException
from sqlalchemy.orm import Session
from .database import get_db
from .models import SagaInstance, SagaStatus
from .saga import SagaExecutor, PROVISION_RESOURCE_SAGA, SagaExecutionError
import uuid
app = FastAPI()
async def run_saga_background(saga_id: uuid.UUID, db: Session):
saga_instance = db.query(SagaInstance).filter(SagaInstance.id == saga_id).first()
if not saga_instance:
return # Or log error
executor = SagaExecutor(db, PROVISION_RESOURCE_SAGA)
try:
await executor.run(saga_instance)
except SagaExecutionError:
# The error is already logged within the executor
pass
@app.post("/provision", status_code=202)
def start_provisioning(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
new_saga = SagaInstance(
saga_name=PROVISION_RESOURCE_SAGA["name"],
payload={"username": f"user_{uuid.uuid4().hex[:8]}"} # Example payload
)
db.add(new_saga)
db.commit()
db.refresh(new_saga)
# Run the potentially long-running process in the background
background_tasks.add_task(run_saga_background, new_saga.id, db)
return {"saga_id": new_saga.id, "status": new_saga.current_status.value}
@app.get("/provision/{saga_id}")
def get_saga_status(saga_id: uuid.UUID, db: Session = Depends(get_db)):
saga_instance = db.query(SagaInstance).filter(SagaInstance.id == saga_id).first()
if not saga_instance:
raise HTTPException(status_code=404, detail="Saga not found")
return {
"saga_id": saga_instance.id,
"status": saga_instance.current_status.value,
"payload": saga_instance.payload,
"step_results": saga_instance.step_results,
"updated_at": saga_instance.updated_at
}
2. Participant Services
参与方服务必须保证其补偿操作的幂等性。这是一个常见的错误来源。一个简单的DELETE操作天生就是幂等的(删除一个不存在的资源不会报错),但更复杂的操作,比如“退款”,则需要业务逻辑来保证。
user-service 示例:
# user_service/main.py
from fastapi import FastAPI, HTTPException
import uuid
app = FastAPI()
# In-memory storage for demonstration. Use a real database in production.
users = {}
@app.post("/users", status_code=201)
def create_user(request: dict):
user_id = str(uuid.uuid4())
users[user_id] = {"username": request["username"], "active": True}
return {"id": user_id, "username": request["username"]}
@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: str):
# Idempotency check:
# If the user exists, deactivate them. If not, do nothing and return success.
# This prevents errors if a compensation request is sent twice.
if user_id in users:
# In a real system, this would be a soft delete (is_active = false)
# to preserve data integrity, not a hard delete.
del users[user_id]
print(f"User {user_id} deleted/deactivated.")
else:
print(f"User {user_id} not found, compensation is considered successful (idempotent).")
return
storage-service 和 permission-service 的实现与此类似,关键在于补偿操作的幂等性设计。
3. Next.js 前端交互
前端不能同步等待Saga完成。它发起请求,得到一个saga_id,然后必须通过轮询来获取最终状态。这对用户体验设计提出了要求。
React Component for Polling:
// components/ProvisioningStatus.js
import { useState, useEffect } from 'react';
const ORCHESTRATOR_API = 'http://localhost:8000'; // Assuming orchestrator runs on port 8000
const TERMINAL_STATUSES = ['COMPLETED', 'FAILED'];
function useSagaStatus(sagaId) {
const [status, setStatus] = useState(null);
const [error, setError] = useState(null);
useEffect(() => {
if (!sagaId) return;
const fetchStatus = async () => {
try {
const response = await fetch(`${ORCHESTRATOR_API}/provision/${sagaId}`);
if (!response.ok) {
throw new Error(`Failed to fetch status: ${response.statusText}`);
}
const data = await response.json();
setStatus(data.status);
// Stop polling if the saga has reached a terminal state
if (TERMINAL_STATUSES.includes(data.status)) {
clearInterval(intervalId);
}
} catch (err) {
setError(err.message);
clearInterval(intervalId);
}
};
fetchStatus(); // Initial fetch
const intervalId = setInterval(fetchStatus, 3000); // Poll every 3 seconds
return () => clearInterval(intervalId); // Cleanup on unmount
}, [sagaId]);
return { status, error };
}
export default function ProvisioningManager() {
const [sagaId, setSagaId] = useState(null);
const [isProvisioning, setIsProvisioning] = useState(false);
const { status, error } = useSagaStatus(sagaId);
const handleProvision = async () => {
setIsProvisioning(true);
setSagaId(null);
try {
const response = await fetch(`${ORCHESTRATOR_API}/provision`, { method: 'POST' });
const data = await response.json();
setSagaId(data.saga_id);
} catch (err) {
console.error("Failed to start provisioning", err);
} finally {
// We don't set isProvisioning to false here, the status polling will handle UI updates
}
};
const getStatusMessage = () => {
if (!sagaId) return "Ready to provision new resource.";
if (error) return `Error: ${error}`;
switch (status) {
case 'STARTED':
return 'Provisioning started...';
case 'EXECUTING_USER_CREATION':
return 'Step 1/3: Creating user account...';
case 'EXECUTING_STORAGE_ALLOCATION':
return 'Step 2/3: Allocating cloud storage...';
case 'EXECUTING_PERMISSION_GRANTING':
return 'Step 3/3: Setting access permissions...';
case 'COMPLETED':
return 'Success! Resource provisioned successfully.';
case 'FAILED':
return 'Failed. All changes have been rolled back.';
case 'COMPENSATING_STORAGE_ALLOCATION':
return 'Failure detected. Rolling back storage allocation...';
case 'COMPENSATING_USER_CREATION':
return 'Failure detected. Rolling back user creation...';
default:
return `Current status: ${status || 'Initializing...'}`;
}
};
const isProcessRunning = sagaId && !TERMINAL_STATUSES.includes(status);
return (
<div>
<h1>Cloud Resource Provisioner</h1>
<button onClick={handleProvision} disabled={isProcessRunning}>
{isProcessRunning ? 'Provisioning in Progress...' : 'Start New Provision'}
</button>
<div style={{ marginTop: '20px', padding: '10px', border: '1px solid #ccc' }}>
<p><strong>Status:</strong> {getStatusMessage()}</p>
{sagaId && <p>Saga ID: {sagaId}</p>}
</div>
</div>
);
}
这个React组件清晰地展示了前端如何处理这种异步、长轮询的交互模式,并为用户提供了实时的状态反馈。
架构的扩展性与局限性
扩展性:
向这个Saga流程中添加一个新步骤,比如“配置CDN服务”,是相对清晰的:
- 创建一个新的
cdn-service,包含configure_cdn和deconfigure_cdn(幂等)接口。 - 在Saga协调器的
SagaStatusEnum中添加EXECUTING_CDN_CONFIG和COMPENSATING_CDN_CONFIG。 - 在
PROVISION_RESOURCE_SAGA定义中插入新步骤的action和compensation函数。
整个过程不影响任何现有的参与方服务,只修改协调者,体现了良好的关注点分离。
局限性与潜在陷阱:
- 协调者的高可用性: 协调者是整个流程的核心。在生产环境中,协调者服务必须部署多个实例,并依赖于高可用的数据库(如PostgreSQL集群)来保证其状态的持久性和一致性。
- 补偿操作的可靠性: 这是Saga模式的阿喀琉斯之踵。如果一个补偿操作持续失败,整个Saga会处于一个“待回滚”的中间状态,需要人工介入。因此,补偿操作必须设计得极其健壮、简单且幂等。一个常见的错误是让补偿操作依赖于其他不稳定的服务。
- 数据隔离级别: Saga模式不提供原子性。在Saga执行期间,其他事务可能会看到中间状态(例如,用户已创建但存储桶尚未分配)。这被称为“脏读”。业务上必须能够容忍这种暂时的不一致。对于不能容忍的场景,可能需要在API层面增加状态检查或使用其他隔离机制。
- 长事务的影响: 如果一个Saga执行时间很长(数分钟或数小时),它会长时间锁定资源或持有中间状态,这可能对系统其他部分产生影响。设计时需要评估Saga的最大执行时间。
- 单元测试与集成测试: 虽然协调者逻辑可以被单元测试,但完整的Saga流程验证必须通过集成测试。这要求能够方便地启动所有依赖服务。使用
docker-compose可以极大地简化本地测试环境的搭建。对每个参与方服务的接口,推荐使用契约测试(Contract Testing)来确保接口兼容性。