在 Python 微服务中构建编排式 Saga 应对分布式事务挑战


一个跨越多个微服务的业务流程,例如为一个新客户开通云资源,天然地将我们引向了分布式事务的难题。这个流程可能包含三个原子步骤:在用户服务中创建账户,在存储服务中分配对象存储桶,最后在权限服务中设置访问策略。这三步必须作为一个整体成功或失败。任何一步的失败都要求回滚之前已成功的步骤,以保证系统数据的最终一致性。在真实项目中,使用传统的两阶段提交(2PC)协议,由于其同步阻塞特性和对数据库的强依赖,在松耦合的微服务架构中几乎是不可行的。这迫使我们必须寻找替代方案。

方案 A:基于事件的编舞式 Saga (Choreography-based Saga)

编舞式 Saga 是一种去中心化的模式。每个服务在完成自己的本地事务后,发布一个事件。其他服务订阅这些事件并触发自己的本地事务。如果某个步骤失败,该服务会发布一个失败事件,触发之前已成功的服务执行补偿操作。

sequenceDiagram
    participant Client
    participant UserService
    participant StorageService
    participant PermissionService
    participant MessageBroker

    Client->>UserService: 1. CreateUserRequest
    UserService->>MessageBroker: 2. Publish(UserCreatedEvent)
    MessageBroker-->>StorageService: 3. Consume(UserCreatedEvent)
    StorageService->>MessageBroker: 4. Publish(StorageAllocatedEvent)
    MessageBroker-->>PermissionService: 5. Consume(StorageAllocatedEvent)
    PermissionService->>MessageBroker: 6. Publish(PermissionsGrantedEvent)

    Note over PermissionService: 操作失败!
    PermissionService->>MessageBroker: 7. Publish(PermissionGrantFailedEvent)
    MessageBroker-->>StorageService: 8. Consume(PermissionGrantFailedEvent)
    Note over StorageService: 执行补偿: DeallocateStorage
    StorageService->>MessageBroker: 9. Publish(StorageDeallocatedEvent)
    MessageBroker-->>UserService: 10. Consume(StorageDeallocatedEvent)
    Note over UserService: 执行补偿: DeactivateUser

优势分析:

  • 高度解耦: 服务之间没有直接调用,它们只关心事件,这符合微服务的设计哲学。
  • 简单性: 对于单个服务而言,逻辑很简单:完成工作,然后发布事件。

劣势分析:

  • 流程可见性差: 整个业务流程的定义被分散在各个服务中。当流程涉及十几个服务时,要弄清楚完整的调用链和状态变迁会变得极其困难。没有一个地方能看到事务的全局视图。
  • 循环依赖风险: A -> B -> C -> A 这样的事件循环很难在设计阶段被发现,一旦发生,将导致系统灾难。
  • 调试噩梦: 当一个事务失败时,追踪问题根源需要检查多个服务的日志和消息队列的状态。定位“是哪个服务没有响应?”或“为什么补偿事件没有被消费?”这类问题,成本极高。
  • 测试复杂: 端到端的集成测试需要部署所有相关的服务和消息中间件,维护一套稳定的测试环境本身就是一个挑战。

对于我们这个确定性的、步骤清晰的资源开通过程,编舞模式带来的“灵活性”实际上是混乱的源头。我们需要的是一个对业务流程有强控制力、状态明确、易于监控和调试的方案。因此,编排式 Saga 成为更现实的选择。

方案 B:集中式的编排式 Saga (Orchestration-based Saga)

编排式 Saga 引入了一个中心协调者(Orchestrator)。这个协调者负责驱动整个业务流程,它按顺序调用参与方服务执行操作。如果某个操作失败,协调者负责调用之前所有已成功操作的补偿接口。

sequenceDiagram
    participant Client
    participant Orchestrator
    participant UserService
    participant StorageService
    participant PermissionService

    Client->>Orchestrator: 1. StartProvisionSaga(userId, ...)
    Orchestrator->>UserService: 2. Execute: CreateUser
    UserService-->>Orchestrator: 3. Success
    Orchestrator->>StorageService: 4. Execute: AllocateStorage
    StorageService-->>Orchestrator: 5. Success
    Orchestrator->>PermissionService: 6. Execute: GrantPermissions
    PermissionService-->>Orchestrator: 7. Failure
    
    Note over Orchestrator: GrantPermissions 失败, 开始回滚
    Orchestrator->>StorageService: 8. Compensate: DeallocateStorage
    StorageService-->>Orchestrator: 9. Success
    Orchestrator->>UserService: 10. Compensate: DeactivateUser
    UserService-->>Orchestrator: 11. Success
    
    Note over Orchestrator: Saga 完成回滚, 更新状态为 FAILED
    Orchestrator-->>Client: 12. SagaStatus: FAILED

优势分析:

  • 集中化流程控制: 业务流程的逻辑(执行步骤、重试策略、补偿顺序)都集中在协调者中,清晰可见,易于理解和修改。
  • 明确的状态管理: 协调者维护着 Saga 的全局状态机,任何时候都可以查询一个事务进行到了哪一步,是成功、失败还是正在回滚。
  • 简化的参与方: 参与方服务变得非常“纯粹”,它们只需要提供执行操作和补偿操作的接口,无需了解整个业务流程。
  • 可观测性与调试: 所有的流程控制日志都集中在协调者,极大地简化了故障排查。

劣势分析:

  • 协调者单点风险: 协调者本身成为系统的关键组件,必须保证其高可用。
  • 服务发现与耦合: 协调者需要知道所有参与方服务的地址和接口,形成了一定程度的调用耦合。但这在可控范围内。

最终决策:
在可靠性和可维护性压倒一切的生产环境中,编排模式是更务实的选择。它将复杂的流程控制逻辑从各个业务服务中剥离出来,集中管理,使得系统行为更加可预测和可控。我们将基于 Python 构建这个协调者服务。

核心实现概览

我们将构建四个服务:saga-orchestratoruser-servicestorage-servicepermission-service,全部使用 FastAPI 和 SQLAlchemy。前端则使用 Next.js 与协调者交互。

1. Saga Orchestrator 服务

这是整个模式的核心。它需要一个持久化层来存储每个 Saga 实例的状态。

数据库模型 (models.py):

# saga_orchestrator/models.py
import uuid
from enum import Enum
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class SagaStatus(str, Enum):
    STARTED = "STARTED"
    EXECUTING_USER_CREATION = "EXECUTING_USER_CREATION"
    EXECUTING_STORAGE_ALLOCATION = "EXECUTING_STORAGE_ALLOCATION"
    EXECUTING_PERMISSION_GRANTING = "EXECUTING_PERMISSION_GRANTING"
    COMPLETED = "COMPLETED"
    
    COMPENSATING_PERMISSION_GRANTING = "COMPENSATING_PERMISSION_GRANTING"
    COMPENSATING_STORAGE_ALLOCATION = "COMPENSATING_STORAGE_ALLOCATION"
    COMPENSATING_USER_CREATION = "COMPENSATING_USER_CREATION"
    FAILED = "FAILED"

class SagaInstance(Base):
    __tablename__ = "saga_instances"

    id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    saga_name = sa.Column(sa.String, nullable=False, index=True)
    current_status = sa.Column(sa.Enum(SagaStatus), nullable=False, default=SagaStatus.STARTED)
    payload = sa.Column(JSONB, nullable=False)
    # Store results from each step for compensation
    step_results = sa.Column(JSONB, nullable=True, default=dict)
    
    created_at = sa.Column(sa.DateTime, server_default=sa.func.now())
    updated_at = sa.Column(sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now())

    def __repr__(self):
        return f"<SagaInstance(id={self.id}, status='{self.current_status.value}')>"

Saga 定义与执行器 (saga.py):
我们将定义一个 Saga 流程,包含每个步骤的操作和补偿。

# saga_orchestrator/saga.py
import httpx
import logging
from sqlalchemy.orm import Session
from .models import SagaInstance, SagaStatus

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# In a real app, these would be in a config file
USER_SERVICE_URL = "http://user-service:8001"
STORAGE_SERVICE_URL = "http://storage-service:8002"
PERMISSION_SERVICE_URL = "http://permission-service:8003"


class SagaStep:
    def __init__(self, action, compensation):
        self.action = action
        self.compensation = compensation

async def create_user_action(saga_instance: SagaInstance):
    payload = saga_instance.payload
    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(f"{USER_SERVICE_URL}/users", json={"username": payload["username"]})
        response.raise_for_status() # Raise exception for non-2xx status
        # Store created user ID for potential compensation
        return {"user_id": response.json()["id"]}

async def deactivate_user_compensation(saga_instance: SagaInstance):
    # Idempotency: The user service should handle repeated deletions gracefully.
    user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
    if not user_id:
        logger.warning(f"Saga {saga_instance.id}: No user_id found for compensation.")
        return
    async with httpx.AsyncClient(timeout=10.0) as client:
        await client.delete(f"{USER_SERVICE_URL}/users/{user_id}")

async def allocate_storage_action(saga_instance: SagaInstance):
    user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(f"{STORAGE_SERVICE_URL}/buckets", json={"owner_id": user_id})
        response.raise_for_status()
        return {"bucket_name": response.json()["name"]}

async def deallocate_storage_compensation(saga_instance: SagaInstance):
    bucket_name = saga_instance.step_results.get("allocate_storage", {}).get("bucket_name")
    if not bucket_name:
        logger.warning(f"Saga {saga_instance.id}: No bucket_name found for compensation.")
        return
    async with httpx.AsyncClient(timeout=10.0) as client:
        # Note: DELETE should be idempotent by definition.
        await client.delete(f"{STORAGE_SERVICE_URL}/buckets/{bucket_name}")

async def grant_permissions_action(saga_instance: SagaInstance):
    user_id = saga_instance.step_results.get("create_user", {}).get("user_id")
    bucket_name = saga_instance.step_results.get("allocate_storage", {}).get("bucket_name")
    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(
            f"{PERMISSION_SERVICE_URL}/permissions", 
            json={"user_id": user_id, "resource": f"bucket:{bucket_name}"}
        )
        response.raise_for_status()
        return {"permission_id": response.json()["id"]}

# The compensation for granting permission is often just revoking it.
async def revoke_permissions_compensation(saga_instance: SagaInstance):
    permission_id = saga_instance.step_results.get("grant_permissions", {}).get("permission_id")
    if not permission_id:
        logger.warning(f"Saga {saga_instance.id}: No permission_id found for compensation.")
        return
    async with httpx.AsyncClient(timeout=10.0) as client:
        await client.delete(f"{PERMISSION_SERVICE_URL}/permissions/{permission_id}")


# Define the Saga workflow
PROVISION_RESOURCE_SAGA = {
    "name": "provision_resource",
    "steps": [
        (SagaStatus.EXECUTING_USER_CREATION, SagaStep(action=create_user_action, compensation=deactivate_user_compensation)),
        (SagaStatus.EXECUTING_STORAGE_ALLOCATION, SagaStep(action=allocate_storage_action, compensation=deallocate_storage_compensation)),
        (SagaStatus.EXECUTING_PERMISSION_GRANTING, SagaStep(action=grant_permissions_action, compensation=revoke_permissions_compensation)),
    ]
}

class SagaExecutionError(Exception):
    pass

class SagaExecutor:
    def __init__(self, db: Session, saga_definition: dict):
        self.db = db
        self.saga_definition = saga_definition

    async def run(self, saga_instance: SagaInstance):
        completed_steps = []
        try:
            for i, (status, step) in enumerate(self.saga_definition["steps"]):
                # 1. Update status before executing step
                saga_instance.current_status = status
                self.db.commit()
                self.db.refresh(saga_instance)
                
                logger.info(f"Saga {saga_instance.id}: Executing step {status.value}")
                
                # 2. Execute action
                result = await step.action(saga_instance)
                
                # 3. Persist step result
                action_name = step.action.__name__.replace('_action', '')
                saga_instance.step_results[action_name] = result
                # A mutable JSONB field needs to be flagged as modified for SQLAlchemy to detect changes
                from sqlalchemy.orm.attributes import flag_modified
                flag_modified(saga_instance, "step_results")
                
                self.db.commit()

                completed_steps.append(step)

            saga_instance.current_status = SagaStatus.COMPLETED
            self.db.commit()
            logger.info(f"Saga {saga_instance.id}: Completed successfully.")

        except Exception as e:
            logger.error(f"Saga {saga_instance.id}: Failed at step {saga_instance.current_status.value}. Error: {e}. Starting compensation.")
            await self.compensate(saga_instance, completed_steps)
            raise SagaExecutionError(f"Saga failed and was rolled back.") from e

    async def compensate(self, saga_instance: SagaInstance, completed_steps: list):
        for step in reversed(completed_steps):
            try:
                # Infer compensation status from action status
                compensation_status_str = f"COMPENSATING_{saga_instance.current_status.value.replace('EXECUTING_', '')}"
                saga_instance.current_status = SagaStatus[compensation_status_str]
                self.db.commit()
                
                logger.info(f"Saga {saga_instance.id}: Compensating step {saga_instance.current_status.value}")
                await step.compensation(saga_instance)
                
            except Exception as e:
                # If compensation fails, it's a critical error. Requires manual intervention.
                # A common practice is to move it to a "dead letter" queue/state.
                logger.critical(f"Saga {saga_instance.id}: FATAL! Compensation failed for step {step.action.__name__}. Error: {e}. Manual intervention required.")
                saga_instance.current_status = SagaStatus.FAILED 
                # Mark as terminally failed to stop further processing
                self.db.commit()
                return

        saga_instance.current_status = SagaStatus.FAILED
        self.db.commit()
        logger.info(f"Saga {saga_instance.id}: Compensation completed.")

API Endpoints (main.py):

# saga_orchestrator/main.py
from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException
from sqlalchemy.orm import Session
from .database import get_db
from .models import SagaInstance, SagaStatus
from .saga import SagaExecutor, PROVISION_RESOURCE_SAGA, SagaExecutionError
import uuid

app = FastAPI()

async def run_saga_background(saga_id: uuid.UUID, db: Session):
    saga_instance = db.query(SagaInstance).filter(SagaInstance.id == saga_id).first()
    if not saga_instance:
        return # Or log error

    executor = SagaExecutor(db, PROVISION_RESOURCE_SAGA)
    try:
        await executor.run(saga_instance)
    except SagaExecutionError:
        # The error is already logged within the executor
        pass

@app.post("/provision", status_code=202)
def start_provisioning(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
    new_saga = SagaInstance(
        saga_name=PROVISION_RESOURCE_SAGA["name"],
        payload={"username": f"user_{uuid.uuid4().hex[:8]}"} # Example payload
    )
    db.add(new_saga)
    db.commit()
    db.refresh(new_saga)

    # Run the potentially long-running process in the background
    background_tasks.add_task(run_saga_background, new_saga.id, db)
    
    return {"saga_id": new_saga.id, "status": new_saga.current_status.value}

@app.get("/provision/{saga_id}")
def get_saga_status(saga_id: uuid.UUID, db: Session = Depends(get_db)):
    saga_instance = db.query(SagaInstance).filter(SagaInstance.id == saga_id).first()
    if not saga_instance:
        raise HTTPException(status_code=404, detail="Saga not found")
    return {
        "saga_id": saga_instance.id, 
        "status": saga_instance.current_status.value,
        "payload": saga_instance.payload,
        "step_results": saga_instance.step_results,
        "updated_at": saga_instance.updated_at
    }

2. Participant Services

参与方服务必须保证其补偿操作的幂等性。这是一个常见的错误来源。一个简单的DELETE操作天生就是幂等的(删除一个不存在的资源不会报错),但更复杂的操作,比如“退款”,则需要业务逻辑来保证。

user-service 示例:

# user_service/main.py
from fastapi import FastAPI, HTTPException
import uuid

app = FastAPI()
# In-memory storage for demonstration. Use a real database in production.
users = {}

@app.post("/users", status_code=201)
def create_user(request: dict):
    user_id = str(uuid.uuid4())
    users[user_id] = {"username": request["username"], "active": True}
    return {"id": user_id, "username": request["username"]}

@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: str):
    # Idempotency check:
    # If the user exists, deactivate them. If not, do nothing and return success.
    # This prevents errors if a compensation request is sent twice.
    if user_id in users:
        # In a real system, this would be a soft delete (is_active = false)
        # to preserve data integrity, not a hard delete.
        del users[user_id]
        print(f"User {user_id} deleted/deactivated.")
    else:
        print(f"User {user_id} not found, compensation is considered successful (idempotent).")
    
    return

storage-servicepermission-service 的实现与此类似,关键在于补偿操作的幂等性设计。

3. Next.js 前端交互

前端不能同步等待Saga完成。它发起请求,得到一个saga_id,然后必须通过轮询来获取最终状态。这对用户体验设计提出了要求。

React Component for Polling:

// components/ProvisioningStatus.js
import { useState, useEffect } from 'react';

const ORCHESTRATOR_API = 'http://localhost:8000'; // Assuming orchestrator runs on port 8000

const TERMINAL_STATUSES = ['COMPLETED', 'FAILED'];

function useSagaStatus(sagaId) {
  const [status, setStatus] = useState(null);
  const [error, setError] = useState(null);

  useEffect(() => {
    if (!sagaId) return;

    const fetchStatus = async () => {
      try {
        const response = await fetch(`${ORCHESTRATOR_API}/provision/${sagaId}`);
        if (!response.ok) {
          throw new Error(`Failed to fetch status: ${response.statusText}`);
        }
        const data = await response.json();
        setStatus(data.status);
        
        // Stop polling if the saga has reached a terminal state
        if (TERMINAL_STATUSES.includes(data.status)) {
          clearInterval(intervalId);
        }
      } catch (err) {
        setError(err.message);
        clearInterval(intervalId);
      }
    };
    
    fetchStatus(); // Initial fetch
    const intervalId = setInterval(fetchStatus, 3000); // Poll every 3 seconds

    return () => clearInterval(intervalId); // Cleanup on unmount
  }, [sagaId]);

  return { status, error };
}

export default function ProvisioningManager() {
  const [sagaId, setSagaId] = useState(null);
  const [isProvisioning, setIsProvisioning] = useState(false);
  const { status, error } = useSagaStatus(sagaId);

  const handleProvision = async () => {
    setIsProvisioning(true);
    setSagaId(null);
    try {
      const response = await fetch(`${ORCHESTRATOR_API}/provision`, { method: 'POST' });
      const data = await response.json();
      setSagaId(data.saga_id);
    } catch (err) {
      console.error("Failed to start provisioning", err);
    } finally {
        // We don't set isProvisioning to false here, the status polling will handle UI updates
    }
  };

  const getStatusMessage = () => {
    if (!sagaId) return "Ready to provision new resource.";
    if (error) return `Error: ${error}`;
    
    switch (status) {
        case 'STARTED':
            return 'Provisioning started...';
        case 'EXECUTING_USER_CREATION':
            return 'Step 1/3: Creating user account...';
        case 'EXECUTING_STORAGE_ALLOCATION':
            return 'Step 2/3: Allocating cloud storage...';
        case 'EXECUTING_PERMISSION_GRANTING':
            return 'Step 3/3: Setting access permissions...';
        case 'COMPLETED':
            return 'Success! Resource provisioned successfully.';
        case 'FAILED':
            return 'Failed. All changes have been rolled back.';
        case 'COMPENSATING_STORAGE_ALLOCATION':
            return 'Failure detected. Rolling back storage allocation...';
        case 'COMPENSATING_USER_CREATION':
            return 'Failure detected. Rolling back user creation...';
        default:
            return `Current status: ${status || 'Initializing...'}`;
    }
  };
  
  const isProcessRunning = sagaId && !TERMINAL_STATUSES.includes(status);

  return (
    <div>
      <h1>Cloud Resource Provisioner</h1>
      <button onClick={handleProvision} disabled={isProcessRunning}>
        {isProcessRunning ? 'Provisioning in Progress...' : 'Start New Provision'}
      </button>
      <div style={{ marginTop: '20px', padding: '10px', border: '1px solid #ccc' }}>
        <p><strong>Status:</strong> {getStatusMessage()}</p>
        {sagaId && <p>Saga ID: {sagaId}</p>}
      </div>
    </div>
  );
}

这个React组件清晰地展示了前端如何处理这种异步、长轮询的交互模式,并为用户提供了实时的状态反馈。

架构的扩展性与局限性

扩展性:
向这个Saga流程中添加一个新步骤,比如“配置CDN服务”,是相对清晰的:

  1. 创建一个新的cdn-service,包含configure_cdndeconfigure_cdn(幂等)接口。
  2. 在Saga协调器的SagaStatus Enum中添加EXECUTING_CDN_CONFIGCOMPENSATING_CDN_CONFIG
  3. PROVISION_RESOURCE_SAGA定义中插入新步骤的actioncompensation函数。
    整个过程不影响任何现有的参与方服务,只修改协调者,体现了良好的关注点分离。

局限性与潜在陷阱:

  • 协调者的高可用性: 协调者是整个流程的核心。在生产环境中,协调者服务必须部署多个实例,并依赖于高可用的数据库(如PostgreSQL集群)来保证其状态的持久性和一致性。
  • 补偿操作的可靠性: 这是Saga模式的阿喀琉斯之踵。如果一个补偿操作持续失败,整个Saga会处于一个“待回滚”的中间状态,需要人工介入。因此,补偿操作必须设计得极其健壮、简单且幂等。一个常见的错误是让补偿操作依赖于其他不稳定的服务。
  • 数据隔离级别: Saga模式不提供原子性。在Saga执行期间,其他事务可能会看到中间状态(例如,用户已创建但存储桶尚未分配)。这被称为“脏读”。业务上必须能够容忍这种暂时的不一致。对于不能容忍的场景,可能需要在API层面增加状态检查或使用其他隔离机制。
  • 长事务的影响: 如果一个Saga执行时间很长(数分钟或数小时),它会长时间锁定资源或持有中间状态,这可能对系统其他部分产生影响。设计时需要评估Saga的最大执行时间。
  • 单元测试与集成测试: 虽然协调者逻辑可以被单元测试,但完整的Saga流程验证必须通过集成测试。这要求能够方便地启动所有依赖服务。使用docker-compose可以极大地简化本地测试环境的搭建。对每个参与方服务的接口,推荐使用契约测试(Contract Testing)来确保接口兼容性。

  目录