基于 gRPC 与数据仓库的 CircleCI 异构 Monorepo 增量构建与效能分析


我们团队的 Monorepo 越来越臃肿。前端应用(TypeScript + React)、后端 gRPC 服务(Go)、共享的 Protobuf 定义,全部挤在一个仓库里。最初的 CircleCI 配置很简单:git push 就触发所有服务的测试、构建和部署。当只有两三个服务时,这还能接受。但现在,服务数量超过了二十个,每次提交哪怕只改动一个文档的错别字,整个流水线也要跑上二十分钟,CI/CD 的账单也变得相当难看。问题已经摆在面前:必须实现基于变更的增量构建,并且,我们需要一套机制来度量和持续优化流水线本身的效能。

单纯修复 CI 效率问题只是治标,我们需要一套数据驱动的改进体系。最终的构想是,流水线本身也应该像一个生产服务一样,具备完善的可观测性。于是,一个结合了增量构建逻辑和效能数据回传的架构方案浮出水面。

sequenceDiagram
    participant Dev
    participant GitHub
    participant CircleCI
    participant TelemetrySvc
    participant DataWarehouse

    Dev->>GitHub: git push
    GitHub->>CircleCI: Webhook Trigger
    CircleCI->>CircleCI: 启动 Setup Workflow
    Note right of CircleCI: 1. 检出代码
2. 运行变更检测脚本 CircleCI->>CircleCI: 动态生成 Continuation Config CircleCI->>CircleCI: 根据变更执行特定 Job (e.g., build-frontend) participant Job Job->>Job: 开始执行 Job->>TelemetrySvc: 发送 gRPC 请求 (event: job_started) Job->>Job: 拉取缓存, 安装依赖, 构建 (esbuild/go build) Job->>Job: 运行测试 Job->>TelemetrySvc: 发送 gRPC 请求 (event: job_finished) TelemetrySvc->>DataWarehouse: 异步写入构建指标 Note right of DataWarehouse: 存储 Job 耗时, 缓存命中率, 状态等

这个方案的核心是两部分:

  1. CircleCI 动态配置: 利用 CircleCI 的 setup 工作流,在流水线启动时运行一个脚本来检测文件变更,并动态生成只包含受影响服务的构建任务。
  2. 构建遥测服务 (Telemetry Service): 一个独立的内部 gRPC 服务,专门接收来自 CI job 的遥测数据,如任务耗时、缓存命中情况、成功或失败状态。这些数据最终被清洗并导入数据仓库(我们用的是 BigQuery)进行长期分析。

第一步:定义遥测数据的契约

在动手写任何脚本或配置之前,首先要定义数据模型。这是整个系统的基石。我们选择 gRPC 和 Protobuf,因为它提供了强类型的、向后兼容的 schema 定义,比用 JSON over HTTP传来传去要严谨得多。

这是我们的 telemetry.proto 文件,它定义了整个效能度量系统的核心数据结构。

// proto/telemetry/v1/telemetry.proto
syntax = "proto3";

package telemetry.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

option go_package = "github.com/your-org/telemetry-svc/gen/go/telemetry/v1;telemetryv1";

// TelemetryService 是用于接收 CI/CD 流水线指标的服务
service TelemetryService {
  // ReportBuildEvent 用于上报一个构建事件
  rpc ReportBuildEvent(ReportBuildEventRequest) returns (ReportBuildEventResponse);
}

// BuildEvent 代表一个具体的构建事件
message BuildEvent {
  // 事件的唯一ID, 建议使用 CI 平台的 Job ID
  string event_id = 1;
  // CI 平台的 Job 名称
  string job_name = 2;
  // 触发事件的 Git Commit SHA
  string commit_sha = 3;
  // 事件类型
  EventType event_type = 4;
  // 事件发生的时间戳
  google.protobuf.Timestamp timestamp = 5;
  // 项目或子模块名称, 用于 Monorepo
  string project_name = 6;
  // CI/CD 平台名称, e.g., "circleci", "github_actions"
  string platform = 7;
  // 附加的元数据, KV 结构, 用于存储特定于 Job 的信息
  // 例如: {"cache_hit": true, "node_version": "18.16.0"}
  google.protobuf.Struct metadata = 8;
}

enum EventType {
  EVENT_TYPE_UNSPECIFIED = 0;
  EVENT_TYPE_JOB_STARTED = 1;
  EVENT_TYPE_JOB_FINISHED_SUCCESS = 2;
  EVENT_TYPE_JOB_FINISHED_FAILURE = 3;
  EVENT_TYPE_JOB_FINISHED_CANCELLED = 4;
}

message ReportBuildEventRequest {
  BuildEvent event = 1;
}

message ReportBuildEventResponse {
  // 服务器接收成功确认
  bool acknowledged = 1;
}

这份 proto 定义清晰地描述了我们关心的数据:哪个 Job (job_name)、为哪个项目 (project_name)、在哪次提交 (commit_sha) 上发生了什么事 (event_type),以及一些可扩展的元数据 (metadata)。

第二步:实现遥测数据接收服务

有了 proto 定义,我们用 Go 快速实现这个 gRPC 服务。它的职责很简单:接收数据,做最基本的数据校验,然后把它扔给一个数据仓库的客户端。在真实项目中,这里应该是一个缓冲队列(如 Kafka 或 Pub/Sub),以削峰填谷,并与数据仓库解耦。为了简化,这里直接展示写入逻辑的骨架。

// internal/server/grpc.go
package server

import (
	"context"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/your-org/telemetry-svc/internal/warehouse"
	telemetryv1 "github.com/your-org/telemetry-svc/gen/go/telemetry/v1"
)

type TelemetryServer struct {
	telemetryv1.UnimplementedTelemetryServiceServer
	writer warehouse.EventWriter
}

func NewTelemetryServer(writer warehouse.EventWriter) *TelemetryServer {
	return &TelemetryServer{writer: writer}
}

func (s *TelemetryServer) Register(srv *grpc.Server) {
	telemetryv1.RegisterTelemetryServiceServer(srv, s)
}

func (s *TelemetryServer) ReportBuildEvent(ctx context.Context, req *telemetryv1.ReportBuildEventRequest) (*telemetryv1.ReportBuildEventResponse, error) {
	event := req.GetEvent()
	if event == nil {
		return nil, status.Error(codes.InvalidArgument, "event is required")
	}

	// 基本的校验
	if event.GetEventId() == "" || event.GetJobName() == "" || event.GetCommitSha() == "" {
		return nil, status.Error(codes.InvalidArgument, "event_id, job_name, and commit_sha are required")
	}

	log.Printf("Received event: job=%s, project=%s, type=%s", event.GetJobName(), event.GetProjectName(), event.GetEventType())

	// 在真实项目中,这里应该是异步处理
	// 避免阻塞 gRPC 响应,导致 CI Job 挂起
	go func() {
		// 使用一个新的 context,因为它将在原始 RPC 调用返回后继续运行
		bgCtx := context.Background()
		if err := s.writer.Write(bgCtx, event); err != nil {
			// 这里的错误处理需要非常健壮,例如推送到死信队列
			log.Printf("ERROR: Failed to write event to data warehouse: %v", err)
		}
	}()
	
	return &telemetryv1.ReportBuildEventResponse{Acknowledged: true}, nil
}

// warehouse/writer.go (伪代码)
package warehouse

import (
	"context"
	"fmt"
	"time"
	"cloud.google.com/go/bigquery"
	telemetryv1 "github.com/your-org/telemetry-svc/gen/go/telemetry/v1"
)

// EventWriter 是一个写入数据仓库的接口
type EventWriter interface {
	Write(ctx context.Context, event *telemetryv1.BuildEvent) error
}

// BigQueryWriter 实现了 EventWriter
type BigQueryWriter struct {
	client *bigquery.Client
	table  *bigquery.Table
}

// BigQueryRow 是映射到 BigQuery 表结构的对象
type BigQueryRow struct {
	EventID     string    `bigquery:"event_id"`
	JobName     string    `bigquery:"job_name"`
	CommitSHA   string    `bigquery:"commit_sha"`
	EventType   string    `bigquery:"event_type"`
	Timestamp   time.Time `bigquery:"timestamp"`
	ProjectName string    `bigquery:"project_name"`
	Platform    string    `bigquery:"platform"`
	Metadata    string    `bigquery:"metadata"` // 在实际中,会把 proto Struct 转为 JSON string
}

func (w *BigQueryWriter) Write(ctx context.Context, event *telemetryv1.BuildEvent) error {
	// ... (将 proto message 转换为 BigQueryRow 的逻辑) ...
	// ... (使用 BigQuery Streaming Inserter 写入数据) ...
	log.Printf("Event %s successfully streamed to BigQuery.", event.EventId)
	return nil
}

这个 Go 服务非常轻量,核心逻辑就是接收、校验、转发。注意,实际的写入操作被放到了一个 goroutine 中,这样可以立刻返回响应给 CI job,避免因为数据仓库的延迟而拖慢整个流水线。

第三步:改造 CircleCI 配置实现动态化

这是整个方案的枢纽。CircleCI 的动态配置能力允许一个初始的 setup job 运行代码,并基于其输出决定接下来要执行哪些 job。

首先,是 .circleci/config.yml 的入口配置:

# .circleci/config.yml
version: 2.1

# 声明一个 setup workflow,它将用于生成动态配置
setup: true

orbs:
  # continuation orb 用于传递生成的配置
  continuation: circleci/continuation@0.3.1

# 定义一个用于动态配置生成的工作
jobs:
  generate-config:
    docker:
      - image: cimg/base:2023.08
    steps:
      - checkout
      - run:
          name: "Generate dynamic configuration"
          command: |
            # 这个脚本是核心,它会检测变更并生成一个配置文件
            ./scripts/generate-config.sh > /tmp/generated_config.yml
      - continuation/continue:
          configuration_path: /tmp/generated_config.yml

# 整个 CI 的入口点
workflows:
  setup-workflow:
    jobs:
      - generate-config

这个配置非常简洁,它只做一件事:运行 generate-config job。这个 job 的核心是执行 ./scripts/generate-config.sh 脚本,并将输出重定向到 /tmp/generated_config.yml,然后 continuation orb 会把这个文件作为下一阶段的配置来执行。

现在,来看一下 ./scripts/generate-config.sh 这个魔法脚本:

#!/bin/bash
set -e

# 获取与主分支的差异
# CIRCLE_COMPARE_URL 环境变量包含两个 commit SHA,可以从中提取基准 commit
BASE_COMMIT=$(echo $CIRCLE_COMPARE_URL | cut -d'/' -f7 | cut -d'.' -f1)
if [ -z "$BASE_COMMIT" ]; then
    # 如果是新分支,则与 main 分支比较
    BASE_COMMIT="origin/main"
fi

echo "Comparing against base commit: $BASE_COMMIT"
CHANGED_FILES=$(git diff --name-only "$BASE_COMMIT" HEAD)
echo "Changed files:"
echo "$CHANGED_FILES"

# 初始化标志位
run_frontend_ci=false
run_backend_serviceA_ci=false
run_proto_ci=false

# 根据文件路径判断需要运行哪些 job
while IFS= read -r file; do
    if [[ "$file" == web/app/* ]]; then
        run_frontend_ci=true
    elif [[ "$file" == services/serviceA/* ]]; then
        run_backend_serviceA_ci=true
    elif [[ "$file" == protos/* ]]; then
        run_proto_ci=true
    fi
done <<< "$CHANGED_FILES"

# 如果 proto 变更了,所有依赖它的服务都需要重新构建
if [ "$run_proto_ci" = true ]; then
    echo "Proto files changed, triggering all dependent services."
    run_backend_serviceA_ci=true
    # ... 其他后端服务的标志位也设置为 true
fi

# 开始生成 YAML 配置文件
# 使用 yq 工具会更稳健,但为了减少依赖,这里用 heredoc 手动拼接
cat << EOF
version: 2.1
orbs:
  go: circleci/go@1.8.0
  node: circleci/node@5.1.0

# 定义可复用的 command,用于上报遥测数据
commands:
  report_event:
    parameters:
      event_type:
        type: enum
        enum: ["JOB_STARTED", "JOB_FINISHED_SUCCESS", "JOB_FINISHED_FAILURE"]
      project_name:
        type: string
    steps:
      - run:
          name: "Report << parameters.event_type >> event for << parameters.project_name >>"
          # 在真实环境中,这里应该是一个编译好的二进制客户端或封装好的脚本
          # 为简化,使用 grpcurl
          # TELEMETRY_SVC_ADDR 是在 CircleCI context 中配置的环境变量
          command: |
            grpcurl -plaintext -d '{
              "event": {
                "event_id": "'"$CIRCLE_WORKFLOW_ID-$CIRCLE_JOB"'",
                "job_name": "'"$CIRCLE_JOB"'",
                "commit_sha": "'"$CIRCLE_SHA1"'",
                "event_type": "EVENT_TYPE_<< parameters.event_type >>",
                "timestamp": "'"$(date -u +%Y-%m-%dT%H:%M:%SZ)"'",
                "project_name": "<< parameters.project_name >>",
                "platform": "circleci",
                "metadata": {
                  "fields": {
                    "branch": {"string_value": "'"$CIRCLE_BRANCH"'"},
                    "build_num": {"number_value": '"$CIRCLE_BUILD_NUM"'}
                  }
                }
              }
            }' $TELEMETRY_SVC_ADDR telemetry.v1.TelemetryService/ReportBuildEvent

jobs:
  build-and-test-frontend:
    executor: node/default
    steps:
      - report_event:
          event_type: JOB_STARTED
          project_name: "frontend-app"
      - checkout
      - node/install-packages:
          cache-key: "yarn-{{ .Branch }}-{{ checksum \"yarn.lock\" }}"
      - run:
          name: "Lint and Format with Rome"
          command: yarn rome check web/app/
      - run:
          name: "Build with esbuild"
          command: yarn esbuild web/app/src/index.tsx --bundle --outfile=dist/bundle.js --minify
      - run:
          when: on_success
          command: report_event:
              event_type: JOB_FINISHED_SUCCESS
              project_name: "frontend-app"
      - run:
          when: on_fail
          command: report_event:
              event_type: JOB_FINISHED_FAILURE
              project_name: "frontend-app"

  build-and-test-backend-serviceA:
    executor: go/default
    steps:
      - report_event:
          event_type: JOB_STARTED
          project_name: "backend-serviceA"
      - checkout
      # ... Go 服务的构建和测试步骤
      - run:
          when: on_success
          command: report_event:
              event_type: JOB_FINISHED_SUCCESS
              project_name: "backend-serviceA"
      - run:
          when: on_fail
          command: report_event:
              event_type: JOB_FINISHED_FAILURE
              project_name: "backend-serviceA"

workflows:
  dynamic-workflow:
    jobs:
EOF

# 根据标志位动态添加 job 到 workflow
if [ "$run_frontend_ci" = true ]; then
    echo "      - build-and-test-frontend" >> /tmp/workflow.yml
fi

if [ "$run_backend_serviceA_ci" = true ]; then
    echo "      - build-and-test-backend-serviceA" >> /tmp/workflow.yml
fi

# 如果有任何 job 被触发,才把 workflow 部分追加到主配置文件
if [ -s /tmp/workflow.yml ]; then
    cat /tmp/workflow.yml
fi

这个脚本做了几件关键事情:

  1. 变更检测: 通过 git diff 找出从上次构建以来的变更文件。
  2. 逻辑判断: 根据文件路径判断哪个项目(frontend, backend-serviceA)受到了影响。特别地,它处理了 protos 变更需要触发所有后端服务重新构建的依赖关系。
  3. 动态生成: 使用 catheredoc 生成一个完整的、可执行的 CircleCI config.yml
  4. 遥测植入: 在每个 job 的开始和结束(无论是成功还是失败)都插入了一个 report_event command。这个 command 使用 grpcurl(一个 gRPC 的命令行工具)来调用我们的遥测服务,上报构建事件。

这里的前端构建任务特意选用了 Romeesbuild,因为它们以性能著称。将这些工具的执行时间作为度量指标的一部分,可以帮助我们量化升级构建工具带来的收益。

第四步:分析与展望

当这套系统运行起来后,数据仓库里的 build_events 表开始积累数据。我们可以用 SQL 查询来回答以前只能靠感觉回答的问题:

  • 哪个 job 是我们 CI 流水线里最慢的瓶颈?
  • 前端构建时间在引入 esbuild 后缩短了多少?
  • 哪个后端服务的测试套件最不稳定(失败率最高)?
  • CI 缓存的命中率如何?缓存未命中时对构建时长的影响有多大?

例如,一个计算 P95 构建时长的查询可能长这样(以 BigQuery SQL 为例):

WITH job_durations AS (
  SELECT
    start_event.job_name,
    start_event.project_name,
    TIMESTAMP_DIFF(end_event.timestamp, start_event.timestamp, SECOND) AS duration_seconds
  FROM
    `your_project.your_dataset.build_events` AS start_event
  JOIN
    `your_project.your_dataset.build_events` AS end_event
  ON
    start_event.event_id = end_event.event_id
  WHERE
    start_event.event_type = 'EVENT_TYPE_JOB_STARTED'
    AND end_event.event_type IN ('EVENT_TYPE_JOB_FINISHED_SUCCESS', 'EVENT_TYPE_JOB_FINISHED_FAILURE')
)
SELECT
  job_name,
  project_name,
  APPROX_QUANTILES(duration_seconds, 100)[OFFSET(95)] AS p95_duration_seconds
FROM
  job_durations
GROUP BY
  job_name,
  project_name
ORDER BY
  p95_duration_seconds DESC;

这套系统的建立,将我们的 DevOps 实践从“手工作坊”带入了“数据驱动”的阶段。

当然,当前方案也存在局限性。首先,基于文件路径的变更检测逻辑比较脆弱,它无法理解代码间的真实依赖关系。比如,修改了一个共享的 utils 包,但脚本无法自动检测到所有依赖它的服务并触发它们的构建。一个更健壮的方案是引入像 Bazel 或 Nx 这样的构建系统,它们能精确地分析项目的依赖图(DAG),从而实现更精准的增量构建。其次,遥测数据的上报目前是“尽力而为”模式,如果 grpcurl 调用失败或遥测服务宕机,这部分数据就会丢失。引入一个更可靠的本地代理或消息队列可以缓解这个问题。未来的迭代方向将聚焦于引入更智能的依赖图分析工具,并增强遥测管道的韧性。


  目录