构建基于 Google Cloud Functions 与 mTLS 的分布式实时特征存储摄取层


定义问题:特征工程中的零信任摄取挑战

在机器学习系统中,特征存储(Feature Store)是连接数据管道与模型训练/推理的枢纽。一个关键挑战在于如何构建一个安全、低延迟、高可用的实时特征摄取(Ingestion)层。数据源可能来自内部的多个微服务,也可能来自外部合作伙伴,其网络环境本质上是不可信的。我们需要一个摄取端点,它不仅能处理高并发的写入请求,还要能以零信任(Zero Trust)的原则严格验证每一个请求来源的身份,确保只有经过授权的客户端才能写入特征数据。

方案A:API 网关 + 服务账户密钥 (IAM)

这是云原生环境中最常见的模式。通过 Google API Gateway 或 Apigee 暴露一个 RESTful 端点,然后为每个数据生产者(Producer)创建一个专用的服务账户(Service Account),并为其生成一个长期有效的 JSON 密钥。生产者在请求时,使用该密钥签名生成一个 JWT 或携带一个 OAuth 2.0 Access Token。

优势:

  1. 实现简单: 完全基于云厂商提供的托管服务,配置相对直接。
  2. 生态集成度高: 与 IAM、Cloud Logging、Cloud Monitoring 等无缝集成。
  3. 认证粒度: 可以通过 IAM Role 控制服务账户对特定资源的访问权限。

劣势:

  1. 密钥管理的风险: 长期有效的服务账户密钥是巨大的安全隐患。一旦泄露,攻击者就能在密钥有效期内冒充该服务。虽然可以实施密钥轮换策略,但这增加了运维复杂性。
  2. 单向认证: 这种模式下,通常只有服务端验证客户端的身份(通过 Token)。客户端无法有效验证服务端的身份,容易受到中间人攻击(MITM),除非额外实现复杂的证书固定(Certificate Pinning)逻辑。
  3. 不够灵活: IAM 策略通常作用于资源级别,对于更细粒度的、基于请求内容的动态授权逻辑支持不足。

方案B:分布式消息队列 + Serverless 函数 + 双向TLS (mTLS)

该方案改变了架构范式。数据生产者不再直接调用一个同步的 API 端点,而是将特征数据作为消息发送到分布式消息队列(如 Google Cloud Pub/Sub)。一个 Serverless 函数(Google Cloud Functions)订阅该主题,消费消息,然后通过一个强制执行 mTLS 的连接,将数据写入后端的特征存储服务。

graph TD
    subgraph "数据生产者 (不可信网络)"
        Producer1 --> PubSub
        Producer2 --> PubSub
        Producer3 --> PubSub
    end

    subgraph "Google Cloud Platform"
        PubSub[Cloud Pub/Sub 主题] -- 触发 --> GCF[Cloud Function]
        GCF -- 携带客户端证书 --> IngestionAPI
        IngestionAPI -- 验证客户端证书 --> GCF
        IngestionAPI -- 写入 --> FeatureStoreDB[(Feature Store 数据库)]
        
        subgraph "安全凭证管理"
            SecretManager[Secret Manager] -- 提供证书/私钥 --> GCF
        end
    end

    subgraph "特征存储摄取层 (VPC内)"
        IngestionAPI(特征摄取API)
    end

    linkStyle 2 stroke:#4CAF50,stroke-width:2px,fill:none;
    style GCF fill:#FFC107,stroke:#333,stroke-width:2px
    style IngestionAPI fill:#2196F3,stroke:#333,stroke-width:2px
    style PubSub fill:#9C27B0,stroke:#333,stroke-width:2px

优势:

  1. 零信任安全模型: mTLS 提供了严格的双向身份验证。服务端和客户端都必须出示并验证对方的证书。证书由一个私有的证书颁发机构(CA)签发,彻底消除了基于共享密钥的风险。
  2. 架构解耦与弹性: Pub/Sub 作为中间件,使生产者与消费者完全解耦。即使后端摄取服务短暂不可用,数据也会保留在队列中,提高了系统的整体韧性。同时,Pub/Sub 和 Cloud Functions 都能自动伸缩以应对流量洪峰。
  3. 证书的精细化管理: 我们可以为每个逻辑客户端(或每类客户端)颁发具有不同有效期和元数据的证书,实现比 IAM Role 更精细的访问控制和快速吊销。

劣势:

  1. 实现复杂性: 引入了证书生命周期管理的复杂性,包括 CA 的建立、证书的签发、分发和轮换。
  2. Serverless 冷启动延迟: Cloud Functions 的冷启动可能会给实时性要求极高的场景带来延迟毛刺。
  3. 网络配置: 需要确保 Cloud Function 的出站流量可以访问到 VPC 内的特征存储摄取 API,这可能需要配置 VPC Connector。

最终选择与理由

在真实项目中,特别是处理金融交易、用户隐私等高度敏感数据的场景,安全性是不可妥协的。方案 A 中的服务账户密钥一旦泄露,其潜在破坏力巨大。方案 B 虽然增加了证书管理的复杂性,但它通过 mTLS 建立的零信任通信管道,从根本上解决了身份伪造和中间人攻击的风险。对于证书管理,可以借助 Google Secret Manager 等工具进行安全分发,并通过自动化脚本实现证书的定期轮换,将运维开销控制在可接受范围内。因此,我们选择方案 B 作为最终实现。

核心实现概览

我们将分三部分展示核心代码:证书生成、强制 mTLS 的服务端(特征摄取API),以及作为 mTLS 客户端的 Google Cloud Function。

1. 证书生成与管理

首先,我们需要一个本地的证书颁发机构 (CA)。在生产环境中,这应该是一个受严格保护的离线 CA 或使用云厂商的托管 CA 服务。这里我们使用 openssl 进行演示。

#!/bin/bash

# 清理旧文件
rm -rf certs
mkdir -p certs

# 1. 创建根证书颁发机构 (CA)
echo "--- Generating CA ---"
openssl genrsa -out certs/ca.key 4096
openssl req -new -x509 -days 3650 -key certs/ca.key -out certs/ca.crt \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/OU=CA/CN=my-feature-store-ca"

# 2. 为特征摄取API (服务端) 创建证书
echo "--- Generating Server Certificate ---"
openssl genrsa -out certs/server.key 2048
openssl req -new -key certs/server.key -out certs/server.csr \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/OU=API/CN=ingestion-api.internal"
openssl x509 -req -in certs/server.csr -CA certs/ca.crt -CAkey certs/ca.key \
  -CAcreateserial -out certs/server.crt -days 365 -sha256

# 3. 为 Google Cloud Function (客户端) 创建证书
echo "--- Generating Client Certificate ---"
openssl genrsa -out certs/client.key 2048
openssl req -new -key certs/client.key -out certs/client.csr \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/OU=Functions/CN=feature-ingest-function"
openssl x509 -req -in certs/client.csr -CA certs/ca.crt -CAkey certs/ca.key \
  -CAcreateserial -out certs/client.crt -days 365 -sha256

echo "--- Certificates Generated in 'certs/' directory ---"
# ca.crt: 公共CA证书,需要分发给服务端和客户端
# server.crt, server.key: 服务端证书和私钥
# client.crt, client.key: 客户端证书和私钥

生成的 ca.crt, client.crt, client.key 需要上传到 Google Secret Manager,以便 Cloud Function 在运行时安全地拉取。

2. 特征摄取API:强制mTLS的服务端

我们使用 Python 和 FastAPI 构建这个 API。核心在于配置 uvicorn 或任何 ASGI 服务器,使其加载 CA 证书、自身的服务端证书和私钥,并强制要求客户端提供有效的证书。

# file: ingestion_api/main.py

import logging
import ssl
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import uvicorn

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

# CA 证书路径,用于验证客户端证书
CA_CERT_PATH = "./certs/ca.crt"
# 服务端证书和私钥路径
SERVER_CERT_PATH = "./certs/server.crt"
SERVER_KEY_PATH = "./certs/server.key"

class FeatureVector(BaseModel):
    entity_id: str
    features: dict

@app.post("/v1/ingest")
async def ingest_feature(request: Request, feature_vector: FeatureVector):
    """
    接收并处理特征向量的端点
    mTLS 验证由 web server 层完成,这里只关注业务逻辑
    """
    # 在真实项目中,我们会从客户端证书中提取 CN (Common Name) 或其他字段
    # 用于进行更细粒度的授权检查。
    client_cert = request.scope.get('client_cert')
    if not client_cert:
        # 理论上,如果 mTLS 配置正确,不会走到这一步
        logger.error("Request arrived without a client certificate.")
        raise HTTPException(status_code=403, detail="Client certificate required but not provided.")

    # 从对等证书信息中提取主题
    # 'client_cert' 的具体结构取决于 ASGI 服务器实现
    # 对于 Uvicorn,它可能是一个字典
    client_subject = client_cert.get('subject')
    logger.info(f"Received request from client: {client_subject}")
    
    # 示例: 检查客户端的 CN 是否是我们期望的
    client_cn = dict(x[0] for x in client_subject).get('commonName')
    if client_cn != "feature-ingest-function":
        logger.warning(f"Unauthorized client CN: {client_cn}")
        raise HTTPException(status_code=403, detail="Client identity not authorized.")

    # 业务逻辑:将特征写入 Feature Store
    # 这里用日志模拟写入操作
    logger.info(f"Ingesting features for entity_id: {feature_vector.entity_id}")
    # ... feature_store.write(feature_vector) ...

    return {"status": "success", "entity_id": feature_vector.entity_id}

if __name__ == "__main__":
    # 这是 uvicorn 的关键配置,用于启动 mTLS
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8443,
        reload=True,
        ssl_keyfile=SERVER_KEY_PATH,
        ssl_certfile=SERVER_CERT_PATH,
        ssl_ca_certs=CA_CERT_PATH,
        # 最关键的选项: 要求并验证客户端证书
        ssl_cert_reqs=ssl.CERT_REQUIRED
    )

这段代码的核心在于 uvicorn.runssl_* 参数。ssl_cert_reqs=ssl.CERT_REQUIRED 明确告诉服务器拒绝任何没有提供有效、可信客户端证书的连接。

3. Google Cloud Function:携带证书的mTLS客户端

这是整个架构中最具挑战性的部分。Cloud Function 是一个无状态、短暂的环境。我们不能将私钥硬编码或直接打包在代码中。正确的做法是利用 Secret Manager。

部署准备:

  1. ca.crt, client.crt, client.key 的内容分别创建为 Secret Manager 中的三个 Secret。
  2. 授予 Cloud Function 的运行时服务账户访问这些 Secret 的 Secret Manager Secret Accessor 角色。
# file: cloud_function/main.py

import base64
import json
import os
import ssl
import tempfile
import logging
from typing import Dict, Any

import functions_framework
import requests
from google.cloud import secretmanager

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 从环境变量中获取配置
PROJECT_ID = os.environ.get("GCP_PROJECT")
INGESTION_API_URL = os.environ.get("INGESTION_API_URL") # e.g., "https://ingestion-api.internal:8443/v1/ingest"

# Secret Manager 中 Secret 的名称
CLIENT_CERT_SECRET_NAME = os.environ.get("CLIENT_CERT_SECRET_NAME")
CLIENT_KEY_SECRET_NAME = os.environ.get("CLIENT_KEY_SECRET_NAME")
CA_CERT_SECRET_NAME = os.environ.get("CA_CERT_SECRET_NAME")

# 全局变量以缓存 Secret 内容,避免每次调用都重新拉取
# 注意:在 Cloud Functions 环境中,全局变量在函数实例的生命周期内保持不变
secret_cache: Dict[str, str] = {}
session = None

def get_secret(secret_name: str) -> str:
    """从 Secret Manager 获取 Secret 内容,并缓存结果。"""
    if secret_name in secret_cache:
        return secret_cache[secret_name]

    try:
        client = secretmanager.SecretManagerServiceClient()
        name = f"projects/{PROJECT_ID}/secrets/{secret_name}/versions/latest"
        response = client.access_secret_version(request={"name": name})
        payload = response.payload.data.decode("UTF-8")
        secret_cache[secret_name] = payload
        logger.info(f"Successfully fetched and cached secret: {secret_name}")
        return payload
    except Exception as e:
        logger.critical(f"Failed to access secret: {secret_name}. Error: {e}")
        raise

def get_https_session() -> requests.Session:
    """
    创建一个配置了 mTLS 证书的 requests.Session 对象。
    这个函数是本架构的核心,处理了 Serverless 环境下的证书管理。
    """
    global session
    if session:
        return session
    
    # 从 Secret Manager 获取证书内容
    client_cert_str = get_secret(CLIENT_CERT_SECRET_NAME)
    client_key_str = get_secret(CLIENT_KEY_SECRET_NAME)
    ca_cert_str = get_secret(CA_CERT_SECRET_NAME)

    # requests 库需要证书以文件形式存在,因此我们将 Secret 内容写入临时文件
    # 使用 NamedTemporaryFile 确保文件在关闭后被自动删除
    with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.crt') as client_cert_file, \
         tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.key') as client_key_file, \
         tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.crt') as ca_cert_file:
        
        client_cert_file.write(client_cert_str)
        client_key_file.write(client_key_str)
        ca_cert_file.write(ca_cert_str)

        client_cert_path = client_cert_file.name
        client_key_path = client_key_file.name
        ca_cert_path = ca_cert_file.name

    # 创建并配置 Session
    s = requests.Session()
    # cert 参数接受一个元组 (客户端证书路径, 客户端私钥路径)
    s.cert = (client_cert_path, client_key_path)
    # verify 参数指定 CA 证书路径,用于验证服务端证书
    s.verify = ca_cert_path
    
    # 可以在这里添加重试逻辑
    adapter = requests.adapters.HTTPAdapter(max_retries=3)
    s.mount("https://", adapter)
    
    session = s
    logger.info("HTTPS session with mTLS configured.")
    return session


@functions_framework.cloud_event
def process_pubsub_message(cloud_event):
    """
    Pub/Sub 触发的 Cloud Function 主函数。
    """
    try:
        # 从 Pub/Sub 消息中解码数据
        message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode("utf-8")
        feature_data = json.loads(message_data)
        logger.info(f"Received feature data: {feature_data}")

        # 获取配置了 mTLS 的 session
        https_session = get_https_session()

        # 发送请求到特征摄取 API
        response = https_session.post(INGESTION_API_URL, json=feature_data, timeout=10)
        
        response.raise_for_status() # 如果状态码不是 2xx,则抛出异常

        logger.info(f"Successfully ingested data. API response: {response.json()}")

    except json.JSONDecodeError as e:
        logger.error(f"Failed to decode Pub/Sub message: {e}")
        # 不需要重试,这是一条坏消息
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP request to ingestion API failed: {e}")
        # 请求失败,Cloud Functions 会根据配置自动重试
        raise
    except Exception as e:
        logger.critical(f"An unexpected error occurred: {e}")
        # 抛出异常以触发重试
        raise

这段代码的精髓在于 get_https_session 函数。它在函数实例的第一次调用时,从 Secret Manager 拉取所有证书内容,将它们写入临时文件,然后配置一个 requests.Session 对象。这个 Session 对象被缓存为全局变量,后续的调用可以直接复用,避免了重复拉取 Secret 和创建文件的开销,这对性能至关重要。

架构的扩展性与局限性

此架构模式并非银弹。它的一个显著局限性在于证书的生命周期管理。虽然我们使用 Secret Manager 解决了安全分发问题,但证书的轮换(rotation)仍需自动化。一个可行的路径是构建一个内部的 CA 服务(或使用 Cloud Certificate Authority Service),通过一个定时的 Cloud Scheduler 任务触发一个函数,为即将过期的客户端生成新证书,更新 Secret Manager 中的版本,并平滑地重启依赖该证书的应用实例。

另一个考量是冷启动延迟。如果业务对数据摄取的 p99 延迟有毫秒级的严苛要求,那么 Cloud Functions 的冷启动可能会成为瓶颈。在这种情况下,可以考虑使用配置了最小实例数(min-instances)的 Cloud Run 服务替换 Cloud Function。Cloud Run 同样可以由 Pub/Sub 触发,但通过保持至少一个实例的“温热”状态,可以显著降低尾部延迟,代价是更高的闲置成本。

最后,该模式的可扩展性极佳。我们可以为不同类型的数据生产者颁发不同的客户端证书,摄取 API 可以根据证书的 Common NameOrganizational Unit 字段,将请求路由到不同的处理逻辑或数据表中,从而在接入层就实现多租户隔离和精细化授权。


  目录