构建基于 LevelDB 与 TiDB 的 GKE 模型服务分层特征存储架构


一个典型的实时推荐模型服务,部署在 GKE 上,对外承诺的 p99 响应延迟是 50ms。模型推理本身经过优化后,耗时稳定在 15ms 左右。然而,瓶颈很快出现在了特征获取环节。业务要求模型能够访问海量的用户和物品特征,这些特征存储在一个大规模的 TiDB 集群中。尽管 TiDB 性能卓越,但在高并发下,一次典型的多键特征查询(multi-get)的网络开销和数据库处理时间,p99 延迟仍在 20-30ms 之间。这几乎占满了我们全部的延迟预算,留给业务逻辑、网络抖动和其他开销的余地所剩无几。

在真实项目中,这种场景极为普遍。延迟的每一毫秒都可能影响用户体验和商业指标。

方案A: 独立的分布式缓存层

最直接的思路是在模型服务和 TiDB 之间增加一个独立的分布式缓存层,例如 Redis 或 Memcached Cluster。

graph TD
    subgraph GKE Cluster
        A[Ingress] --> B{Service};
        B --> C1[Model Pod 1];
        B --> C2[Model Pod 2];
        B --> C3[Model Pod N];
    end

    subgraph " "
      D{Redis Cluster};
    end
    
    subgraph " "
      E[TiDB Cluster];
    end

    C1 -->|Cache Miss| D;
    C2 -->|Cache Miss| D;
    C3 -->|Cache Miss| D;
    
    D -->|Cache Hit| C1;
    D -->|Cache Hit| C2;
    D -->|Cache Hit| C3;

    D -->|Cache Miss / Write-back| E;

优势分析:

  1. 成熟方案: 这是业界解决数据库访问压力的标准模式,运维工具链和客户端库都非常成熟。
  2. 解耦: 缓存集群与应用服务分离,可以独立扩缩容,权责清晰。
  3. 命中率高: 对于热点数据,可以获得极高的命中率,显著降低对后端 TiDB 的请求压力。

劣势与权衡:

  1. 延迟瓶颈仍在: 即使缓存命中,从 GKE Pod 到 Redis Cluster 依然存在一次网络往返(Round Trip)。在 GCP 的同一个 VPC 内,这个延迟通常在 1-2ms,但在高负载下网络抖动可能使其恶化。这对于我们追求极致的延迟目标来说,仍然是一个不可忽视的开销。
  2. 运维成本: 引入一个新的分布式组件意味着额外的维护、监控和故障排查成本。Redis Cluster 的高可用部署和容量规划本身就是一项复杂的任务。
  3. 一致性问题: 缓存与主数据源 TiDB 之间的一致性保障是个难题。采用 Cache-Aside 模式时,更新数据库和失效缓存的原子性无法保证。如果采用 Read-Through/Write-Through,则对缓存中间件的要求更高。数据不一致可能导致模型使用过期特征,产生错误的预测。
  4. 热点与雪崩: 缓存雪崩、穿透、击穿等经典问题都需要在架构层面进行周全考虑和防护,增加了系统复杂度。

对于我们的场景,方案A能缓解 TiDB 的压力,但无法从根本上消除那“最后几毫秒”的网络延迟,无法满足最苛刻的性能要求。

方案B: 服务 Pod 内嵌本地存储

另一个思路是将“热”数据极致地贴近计算。既然网络是瓶颈,那就消除网络。我们可以在每个模型服务 Pod 中直接使用本地存储,将最核心、最频繁访问的特征数据缓存在 Pod 本地。

这里我们面临两个选择:

  1. 内存缓存 (In-Memory Map): 在服务进程内维护一个巨大的 mapLRUCache
  2. 嵌入式KV存储 (Embedded KV Store): 在 Pod 挂载的本地磁盘上使用一个嵌入式数据库,如 LevelDB 或 RocksDB。

对比两者:

  • 内存缓存: 访问速度最快(纳秒级),但问题致命。首先是数据持久性,Pod 重启(例如滚动更新、节点故障)后,缓存全部丢失,需要漫长的冷启动过程来重新预热,期间服务性能会急剧下降。其次是内存消耗,海量特征会挤占宝贵的内存资源,影响模型加载和推理,并可能引发严重的 GC 问题。
  • 嵌入式KV存储: 访问速度略低于内存(微秒级,因为涉及系统调用和磁盘IO),但它将数据持久化到磁盘。这意味着 Pod 重启后,数据依然存在,可以瞬时恢复服务能力。它将数据压力从内存转移到了磁盘,允许我们缓存远超内存容量的数据。

综合考虑,嵌入式KV存储是更稳健的选择。我们选择 LevelDB,因为它实现简洁、性能可靠,并且有高质量的 Go 语言实现。

最终的架构决策是:采用一种分层存储策略。

graph TD
    subgraph GKE StatefulSet
        subgraph Pod 1
            A1[Model Serving App]
            B1[Embedded LevelDB]
            C1[(PV/Persistent Disk)]
            A1 <--> B1
            B1 -- Stores data on --> C1
        end
        subgraph Pod 2
            A2[Model Serving App]
            B2[Embedded LevelDB]
            C2[(PV/Persistent Disk)]
            A2 <--> B2
            B2 -- Stores data on --> C2
        end
        subgraph Pod N
            An[Model Serving App]
            Bn[Embedded LevelDB]
            Cn[(PV/Persistent Disk)]
            An <--> Bn
            Bn -- Stores data on --> Cn
        end
    end

    subgraph "Upstream Data Source"
        D[TiDB Cluster]
    end

    A1 -->|Cache Miss / Sync| D
    A2 -->|Cache Miss / Sync| D
    An -->|Cache Miss / Sync| D

    subgraph "User Traffic"
        U[Request] --> S{GKE Service}
        S --> A1
        S --> A2
        S --> An
    end
    
    style Pod 1 fill:#f9f,stroke:#333,stroke-width:2px
    style Pod 2 fill:#f9f,stroke:#333,stroke-width:2px
    style Pod N fill:#f9f,stroke:#333,stroke-width:2px

这个架构的核心是:

  • Tier 0 (Hot Tier): 每个模型服务 Pod 内嵌一个 LevelDB 实例,运行在由 GKE PersistentVolume 提供的 SSD 盘上。它存储了最核心、最频繁访问的特征子集。
  • Tier 1 (Warm/Cold Tier): 后端的 TiDB 集群,作为全量特征的最终数据源和事实标准 (Source of Truth)。

请求的生命周期变为:

  1. 模型服务接收到请求。
  2. 应用首先查询本地 LevelDB 获取所需特征。
  3. 如果命中,直接进入模型推理,延迟在微秒级别。
  4. 如果未命中,回源到 TiDB 查询。
  5. 从 TiDB 获取到特征后,异步地写回本地 LevelDB,以便下次访问。
  6. 同时,有一个后台同步机制,负责主动预热和更新 LevelDB 中的数据。

核心实现概览

为了实现这个架构,我们需要解决几个关键问题:Pod 的状态管理、存储的生命周期、特征访问逻辑的封装,以及数据同步策略。

1. Kubernetes StatefulSet 定义

Deployment 不适合这个场景,因为我们需要为每个 Pod 提供一个稳定、独立的存储卷。StatefulSet 正是为此设计的。

# feature-server-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: feature-server
  namespace: model-serving
spec:
  serviceName: "feature-server"
  replicas: 3
  selector:
    matchLabels:
      app: feature-server
  template:
    metadata:
      labels:
        app: feature-server
    spec:
      terminationGracePeriodSeconds: 30
      containers:
        - name: server
          image: your-repo/feature-server:v1.2.0
          ports:
            - containerPort: 8080
              name: http
          env:
            - name: TIDB_DSN
              valueFrom:
                secretKeyRef:
                  name: tidb-credentials
                  key: dsn
            - name: LEVELDB_PATH
              value: /data/leveldb
          # 关键部分:将存储卷挂载到容器中
          volumeMounts:
            - name: feature-data
              mountPath: /data
          # 生产环境必备:资源限制与存活探针
          resources:
            requests:
              cpu: "2"
              memory: "4Gi"
            limits:
              cpu: "4"
              memory: "8Gi"
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 20
          readinessProbe:
            httpGet:
              path: /readyz
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
  # 关键部分:为每个 Pod 动态创建 PersistentVolume
  volumeClaimTemplates:
    - metadata:
        name: feature-data
      spec:
        accessModes: ["ReadWriteOnce"]
        # 使用 GCP 的高性能 SSD Persistent Disk
        storageClassName: "premium-rwo" 
        resources:
          requests:
            storage: 50Gi

这里的核心是 volumeClaimTemplates。它会为 StatefulSet 的每个 Pod (如 feature-server-0, feature-server-1) 自动创建一个对应的 PersistentVolumeClaimPersistentVolume。当 Pod 被调度到某个 GKE 节点上时,GCP 会将一个 50GB 的 SSD 盘挂载到该节点,并让 Pod 使用。即使 Pod 重启或漂移到其他节点,这个存储卷也会跟随它,确保数据的持久性。

2. Go 服务中的分层存储访问器

接下来是在 Go 应用中实现特征访问逻辑。我们需要一个抽象层来屏蔽底层的复杂性。

// pkg/featurestore/store.go
package featurestore

import (
	"context"
	"database/sql"
	"fmt"
	"sync"
	"time"

	"github.com/go-sql-driver/mysql" // TiDB client
	"github.com/syndtr/goleveldb/leveldb"
	"github.com/syndtr/goleveldb/leveldb/opt"
	"go.uber.org/zap"
)

// Feature represents a single feature value.
// In a real system, this would be a more complex struct, likely protobuf.
type Feature []byte

// TieredStore implements the two-tier feature fetching logic.
type TieredStore struct {
	ldb    *leveldb.DB
	tidb   *sql.DB
	logger *zap.Logger
	mu     sync.RWMutex
}

// NewTieredStore creates a new store instance.
func NewTieredStore(leveldbPath, tidbDSN string, logger *zap.Logger) (*TieredStore, error) {
	// --- LevelDB Initialization ---
	// Production-grade options: caching, compression, etc.
	o := &opt.Options{
		BlockCacheCapacity: 1024 * opt.MiB, // 1GB block cache
		WriteBuffer:        256 * opt.MiB,   // 256MB write buffer
		Compression:        opt.SnappyCompression,
	}
	ldb, err := leveldb.OpenFile(leveldbPath, o)
	if err != nil {
		return nil, fmt.Errorf("failed to open leveldb: %w", err)
	}
	logger.Info("LevelDB initialized", zap.String("path", leveldbPath))

	// --- TiDB Initialization ---
	cfg, err := mysql.ParseDSN(tidbDSN)
	if err != nil {
		return nil, fmt.Errorf("failed to parse TiDB DSN: %w", err)
	}
	// Production-grade connection pool settings
	cfg.ParseTime = true
	cfg.Loc = time.Local
	
	db, err := sql.Open("mysql", cfg.FormatDSN())
	if err != nil {
		return nil, fmt.Errorf("failed to connect to TiDB: %w", err)
	}
	db.SetMaxOpenConns(100)
	db.SetMaxIdleConns(20)
	db.SetConnMaxLifetime(10 * time.Minute)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := db.PingContext(ctx); err != nil {
		return nil, fmt.Errorf("failed to ping TiDB: %w", err)
	}
	logger.Info("TiDB connection pool established")

	return &TieredStore{
		ldb:    ldb,
		tidb:   db,
		logger: logger,
	}, nil
}

// GetFeatures fetches features for a given list of keys.
// This is the core logic of the tiered architecture.
func (s *TieredStore) GetFeatures(ctx context.Context, keys []string) (map[string]Feature, error) {
	results := make(map[string]Feature, len(keys))
	var missingKeys []string

	// 1. First, try to fetch from LevelDB (Tier 0)
	for _, key := range keys {
		// Note: The key in a real system might be composite, e.g., "user_feature:12345"
		data, err := s.ldb.Get([]byte(key), nil)
		if err == leveldb.ErrNotFound {
			missingKeys = append(missingKeys, key)
			continue
		}
		if err != nil {
			s.logger.Error("Error reading from LevelDB", zap.String("key", key), zap.Error(err))
			// Even if one key fails, we should continue for others
			missingKeys = append(missingKeys, key)
			continue
		}
		results[key] = Feature(data)
	}

	if len(missingKeys) == 0 {
		// All features found in local cache, best case scenario!
		return results, nil
	}

	// 2. For missing keys, fallback to TiDB (Tier 1)
	s.logger.Info("Cache miss, fetching from TiDB", zap.Int("missing_count", len(missingKeys)))
	tidbFeatures, err := s.fetchFromTiDB(ctx, missingKeys)
	if err != nil {
		// If TiDB fails, we return partial results from LevelDB.
		// This improves system resilience.
		s.logger.Error("Failed to fetch from TiDB", zap.Error(err))
		return results, fmt.Errorf("partial failure: TiDB fetch failed: %w", err)
	}

	// 3. Populate results and asynchronously write-back to LevelDB
	go func() {
		batch := new(leveldb.Batch)
		for key, feature := range tidbFeatures {
			results[key] = feature
			// The key must be byte slice
			batch.Put([]byte(key), feature)
		}
		
		// This write-back is "fire and forget" from the request path's perspective.
		// In a production system, you'd want metrics to monitor this.
		if err := s.ldb.Write(batch, nil); err != nil {
			s.logger.Error("Failed to write-back to LevelDB", zap.Error(err))
		}
	}()

	return results, nil
}

// fetchFromTiDB is a helper to query TiDB. It should use a prepared statement in production.
func (s *TieredStore) fetchFromTiDB(ctx context.Context, keys []string) (map[string]Feature, error) {
	// A common mistake is to query one-by-one in a loop.
	// Always use IN clause or multi-get for batch fetching.
	// For simplicity, this example uses IN. A real-world scenario may use more complex queries.
	
	// Building the query with placeholders
	query := "SELECT feature_key, feature_value FROM user_features WHERE feature_key IN (?" + strings.Repeat(",?", len(keys)-1) + ")"
	
	args := make([]interface{}, len(keys))
	for i, key := range keys {
		args[i] = key
	}

	rows, err := s.tidb.QueryContext(ctx, query, args...)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	fetched := make(map[string]Feature)
	for rows.Next() {
		var key string
		var value []byte
		if err := rows.Scan(&key, &value); err != nil {
			s.logger.Error("Error scanning row from TiDB", zap.Error(err))
			continue
		}
		fetched[key] = value
	}

	return fetched, rows.Err()
}


// Close gracefully shuts down the connections.
func (s *TieredStore) Close() {
	if s.ldb != nil {
		s.ldb.Close()
	}
	if s.tidb != nil {
		s.tidb.Close()
	}
}

这段代码展示了核心的分层查询逻辑,并包含了生产环境中必须考虑的细节:

  • 错误处理: 对 LevelDB 和 TiDB 的访问都做了错误处理。当 TiDB 失败时,它仍能返回从 LevelDB 中获取的部分结果,提高了系统的容错性。
  • 并发安全: 虽然示例中 GetFeatures 是读操作为主,但 TieredStore 结构体中加入了 sync.RWMutex 以备将来需要支持并发写操作。
  • 性能考量: 对 TiDB 的查询明确指出要避免 N+1 问题,应使用 IN 子句或等效的批量查询方式。LevelDB 的初始化也配置了合理的缓存大小。
  • 异步写回: 从 TiDB 获取数据后,对 LevelDB 的写回操作是在一个单独的 goroutine 中异步执行的,避免阻塞主请求路径,保证低延迟。

3. 数据同步与预热

仅靠被动的“读时回填”是不够的。这会导致新 Pod 启动时 LevelDB 为空,所有请求都打向 TiDB,造成瞬间的性能抖动和对后端的冲击。我们需要一个主动的数据同步机制。

一个务实的策略是启动一个后台 goroutine,定期从 TiDB 拉取热点数据进行刷新。

// pkg/featurestore/sync.go

package featurestore

import (
    "context"
    "time"
    "go.uber.org/zap"
)

// StartSyncer starts a background goroutine to periodically sync hot data from TiDB.
func (s *TieredStore) StartSyncer(ctx context.Context, interval time.Duration) {
    s.logger.Info("Starting background syncer", zap.Duration("interval", interval))
    go func() {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        
        // Initial sync on startup
        s.runSync(ctx)

        for {
            select {
            case <-ticker.C:
                s.runSync(ctx)
            case <-ctx.Done():
                s.logger.Info("Stopping background syncer")
                return
            }
        }
    }()
}

// runSync performs a single sync cycle.
func (s *TieredStore) runSync(ctx context.Context) {
    s.logger.Info("Running periodic sync from TiDB to LevelDB")

    // The logic to identify "hot" data is business-specific.
    // It could be recently updated users, most popular items, etc.
    // Here, we simulate fetching features updated in the last hour.
    query := "SELECT feature_key, feature_value FROM user_features WHERE updated_at > ?"
    
    oneHourAgo := time.Now().Add(-1 * time.Hour)

    rows, err := s.tidb.QueryContext(ctx, query, oneHourAgo)
    if err != nil {
        s.logger.Error("Syncer failed to query TiDB", zap.Error(err))
        return
    }
    defer rows.Close()

    batch := new(leveldb.Batch)
    count := 0
    for rows.Next() {
        var key string
        var value []byte
        if err := rows.Scan(&key, &value); err != nil {
            s.logger.Error("Syncer failed to scan row", zap.Error(err))
            continue
        }
        batch.Put([]byte(key), value)
        count++
        
        // Write in batches to avoid holding a giant batch in memory
        if batch.Len() > 1000 {
            if err := s.ldb.Write(batch, nil); err != nil {
                s.logger.Error("Syncer failed to write batch to LevelDB", zap.Error(err))
                // Even if a batch fails, we should continue with the next one
            }
            batch.Reset()
        }
    }
    
    // Write the final batch
    if batch.Len() > 0 {
         if err := s.ldb.Write(batch, nil); err != nil {
            s.logger.Error("Syncer failed to write final batch to LevelDB", zap.Error(err))
        }
    }

    if rows.Err() != nil {
        s.logger.Error("Syncer encountered an error during row iteration", zap.Error(rows.Err()))
    }

    s.logger.Info("Sync cycle completed", zap.Int("features_synced", count))
}

这里的同步逻辑相对简单,但点出了核心:

  • 热数据定义: 如何定义“热数据”是业务问题。示例中简单地使用了时间窗口,在真实项目中可能基于访问频率、业务规则等。
  • 批量处理: 从 TiDB 拉取数据和写入 LevelDB 都采用了批量操作,这是提高效率的关键。
  • 健壮性: 即使同步过程中出现单条或单批次错误,也不应中断整个同步循环。

一个更高级的同步方案是订阅 TiDB 的 CDC (Change Data Capture) 数据流。这可以实现近乎实时的数据同步,但会增加架构的复杂性,需要引入 Kafka 等消息队列。对于许多场景,周期性刷新已经足够。

架构的扩展性与局限性

扩展性:

  • 多数据中心: 此模式天然支持多地域部署。每个 GKE 集群都可以有自己独立的 StatefulSet,它们都回源到同一个(或区域内副本)TiDB 集群。数据的本地化使得跨地域访问延迟大大降低。
  • 特征多样性: 可以通过在 LevelDB 的 key 中增加前缀来支持不同类型的特征(用户、物品、上下文),并在同一个实例中管理。

局限性与挑战:

  1. 数据一致性: 这是此架构最大的权衡。LevelDB 中的数据永远是 TiDB 的一个有延迟的副本。延迟取决于回填和后台同步的频率。业务方必须能够容忍这种最终一致性。对于交易、计费等要求强一致性的场景,此架构完全不适用。
  2. 存储成本: GKE 的 PersistentVolume (尤其是高性能 SSD) 是一笔不小的开销。每个 Pod 都需要一个,这会直接增加基础设施成本。需要仔细规划每个 Pod 的磁盘容量。
  3. 运维复杂度: 管理 StatefulSet 和 PV/PVC 比管理无状态的 Deployment 更复杂。Pod 的缩容操作需要谨慎处理,以防数据丢失。备份和恢复策略也需要为每个 Pod 的本地 LevelDB 数据单独设计。例如,可以定期对 PV 进行快照。
  4. 本地磁盘性能: Pod 内 LevelDB 的性能直接受限于 GKE 节点所挂载的持久化磁盘的 IOPS 和吞吐量。在进行容量规划时,必须对磁盘性能进行压测,确保它能承载预期的读写负载。
  5. “热”数据识别: 整个方案的有效性高度依赖于对“热”数据子集的准确识别。如果缓存命中率过低,大量请求仍会穿透到 TiDB,本地存储的优势就无法发挥,反而增加了系统的复杂度和成本。

  目录