项目初期,我们面临一个棘手的局面。两个核心业务系统,一个是用 Ruby on Rails 写的庞大单体应用,负责订单和用户核心;另一个是新晋的 Node.js/Express.js 微服务,处理高并发的活动和营销功能。两个团队的技术栈、日志习惯完全不同。业务方提出的需求很简单:他们不想再通过数据库慢查询来获取关键指标,而是希望每隔几分钟就能看到一张更新的业务趋势图,比如“过去一小时内各渠道的订单量变化”或者“新注册用户来源分布”。
传统的监控系统,比如 Prometheus + Grafana,对于应用指标很在行,但对于这种需要深度定制、多维度聚合的“业务可视化”需求,显得有些力不从心。Grafana 的图表虽好,但业务方想要的格式往往非常特定,甚至需要叠加一些统计学模型的结果,这不是简单配置就能实现的。他们最终想要的是一张可以直接插入周报 PPT 的 PNG 图片。
最初的构想是写个定时任务,每五分钟跑一次,聚合数据库数据,然后调用一个脚本生成图片。但这方案很快被否决了:它对生产数据库的侵入性太强,频繁的聚合查询会显著影响主业务性能。而且,随着业务量增长,五分钟的批处理窗口会越来越慢。
我们决定换个思路:能不能利用系统无时无刻不在产生的日志?如果日志是结构化的、包含业务信息的,那么它本身就是最实时的数据源。我们的目标演变成了构建一个日志驱动的、异步的、能将异构服务产生的业务事件转化为定制化数据可视化的管道。
第一步:统一异构服务的结构化日志
一切的基础是高质量的、统一格式的结构化日志。如果没有这个,后续所有处理都是空中楼阁。我们为 Rails 和 Express.js 应用分别引入了日志库,并强制约定了一个最小化的JSON日志范式。
关键在于,除了常规的 timestamp, level, message 字段,我们引入了一个业务事件专用的命名空间 biz_event。当一个日志事件需要触发后续的可视化流程时,它必须包含这个字段。
对于 Ruby on Rails 应用 (config/initializers/lograge.rb):
# config/initializers/lograge.rb
Rails.application.configure do
# 使用 Lograge 来生成单行、键值对格式的日志
config.lograge.enabled = true
config.lograge.formatter = Lograge::Formatters::Json.new
# 自定义payload,这是注入业务事件的关键
config.lograge.custom_options = lambda do |event|
payload = {
params: event.payload[:params].except('controller', 'action', 'format', 'id'),
host: event.payload[:host],
ip: event.payload[:ip],
# 这是一个关键的线程安全设计,允许我们在任何地方注入业务事件
biz_event: Thread.current[:biz_event]
}
# 清理,防止在请求之间泄露
Thread.current[:biz_event] = nil
payload
end
end
# ApplicationController 中增加一个辅助方法
class ApplicationController < ActionController::Base
# 在业务逻辑中,通过这个方法注入需要被捕获的事件
def log_biz_event(name:, data:)
# 使用线程本地变量来传递业务数据到Lograge的payload中
# 这是在Rails请求生命周期内传递上下文的一种可靠方式
Thread.current[:biz_event] = { name: name, data: data }
end
end
# 在 OrdersController 中的 create 方法中实际使用
class OrdersController < ApplicationController
def create
# ... 创建订单的复杂逻辑 ...
if @order.save
# 订单创建成功后,记录一个业务事件
log_biz_event(
name: 'new_order',
data: {
channel: @order.channel,
amount: @order.total_amount,
user_id: @order.user_id,
product_categories: @order.line_items.map(&:category).uniq
}
)
render json: @order, status: :created
else
# ... 错误处理 ...
end
end
end
对于 Express.js 应用 (使用 Winston):
// logger.js
const { createLogger, format, transports } = require('winston');
const { combine, timestamp, json, errors } = format;
// 创建一个基础 Logger
const logger = createLogger({
level: 'info',
// 强制JSON格式,并包含错误堆栈信息
format: combine(
errors({ stack: true }), // <-- 包含错误堆栈
timestamp(),
json()
),
transports: [
// 在生产环境中,我们不直接写文件,而是输出到 stdout
// 日志将由容器运行时或 systemd 捕获,然后转发给 Fluentd
new transports.Console(),
],
exceptionHandlers: [
new transports.Console() // 未捕获的异常也走这个通道
],
exitOnError: false, // 发生未捕获异常后不退出进程
});
// 封装一个业务事件专用的函数,确保格式统一
function logBizEvent(name, data) {
logger.info('Business event occurred', {
biz_event: {
name,
data,
},
});
}
// 在路由中使用
// routes/activity.js
const express = require('express');
const router = express.Router();
const { logBizEvent } = require('../logger');
router.post('/register', (req, res) => {
// ... 用户注册逻辑 ...
const newUser = { id: 123, source: req.body.source || 'direct' };
// 记录注册成功事件
logBizEvent('new_registration', {
source: newUser.source,
timestamp: Date.now(),
});
res.status(201).json({ message: 'User registered', user: newUser });
});
module.exports = { router, logBizEvent };
这样,两个完全不同的服务现在都能产生类似结构的业务日志,例如:{"level":"info", "biz_event":{"name":"new_order", "data":{...}}}
第二步:配置 Fluentd 作为数据中枢
Fluentd 是整个管道的核心。它的配置文件需要处理几件事:
- 从两个服务接收日志。
- 解析 JSON 日志。
- 过滤出包含
biz_event的日志。 - 将日志兵分两路:一路归档到长期存储(例如 S3),另一路发送给我们的可视化服务。
这里的挑战在于,可视化生成可能是一个慢操作,我们绝不能让它阻塞日志管道。
# fluent.conf
# ---------------------------------
# 1. 输入源 (Sources)
# ---------------------------------
# 接收来自 Rails 和 Express.js 服务的 forward 协议日志
# 在生产中,我们会为每个服务部署一个 fluentd-agent 来转发
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# 为了健壮性,同时监听 HTTP 端口
# 这允许服务在 forward 协议不可用时回退到 HTTP POST
<source>
@type http
port 9880
bind 0.0.0.0
body_key "message"
add_http_headers true
</source>
# ---------------------------------
# 2. 预处理与路由 (Preprocessing & Routing)
# ---------------------------------
# 匹配所有来自 Rails 服务的日志 (假设 agent 发送时打了 tag)
<match app.rails.**>
@type rewrite_tag_filter
<rule>
key message
pattern /^\{/ # 检查是否是 JSON 字符串
tag parsed.rails.${tag}
</rule>
# 如果不是 JSON,则标记为未解析
<rule>
key message
pattern /.+/
tag unparsed.rails.${tag}
</rule>
</match>
# 匹配所有来自 Express 服务的日志
<match app.express.**>
@type rewrite_tag_filter
<rule>
key message
pattern /^\{/
tag parsed.express.${tag}
</rule>
<rule>
key message
pattern /.+/
tag unparsed.express.${tag}
</rule>
</match>
# ---------------------------------
# 3. 解析与过滤 (Parsing & Filtering)
# ---------------------------------
# 处理所有已标记为 "parsed" 的日志
<match parsed.**>
@type parser
key_name message
reserve_data true # 保留原始的 message 字段
<parse>
@type json
</parse>
tag processed.${tag}
</match>
# 过滤出包含 biz_event 的日志,并将其重新标记
<match processed.**>
@type grep
<regexp>
key biz_event
pattern /.+/ # 只要存在 biz_event 字段即可
</regexp>
tag biz_event.${tag}
</match>
# ---------------------------------
# 4. 输出 (Outputs)
# ---------------------------------
# 所有处理过的日志 (processed.**),都进行归档
<match processed.**>
@type copy
# 存储到 S3,用于长期归档和问题排查
<store>
@type s3
aws_key_id YOUR_AWS_KEY
aws_sec_key YOUR_AWS_SECRET
s3_bucket your-log-archive-bucket
s3_region ap-northeast-1
path logs/${tag}/%Y/%m/%d/
# 缓冲区设置是生产环境的关键,确保在网络抖动时不会丢失日志
<buffer tag,time>
@type file
path /var/log/fluentd/buffer/s3
timekey 3600 # 1 hour chunks
timekey_wait 10m
chunk_limit_size 256m
flush_interval 5m
retry_max_interval 30
retry_forever true # 关键!无限重试
</buffer>
</store>
</match>
# 这是核心:只处理 biz_event 日志
<match biz_event.**>
@type copy
# 我们需要一个更可靠的机制来触发任务,直接执行命令有风险
# 初步方案失败后,我们选择了 RabbitMQ
<store>
@type rabbitmq
host "rabbitmq.internal"
port 5672
user "guest"
password "guest"
vhost "/"
exchange "biz_events_exchange"
exchange_type "topic"
# 使用 biz_event.name 作为路由键
routing_key_key $.biz_event.name
<format>
@type json
</format>
# 同样,健壮的缓冲配置
<buffer tag>
@type file
path /var/log/fluentd/buffer/rabbitmq
flush_interval 1s # 尽快发送
chunk_limit_size 1m
retry_max_interval 30
retry_forever true
</buffer>
</store>
</match>
一个常见的错误是直接使用 out_exec 插件去调用脚本。我们最初尝试过,但很快发现这是个陷阱。如果可视化服务响应慢或失败,out_exec 会同步阻塞 Fluentd 的事件循环,最终导致整个日志管道堵塞,甚至数据丢失。正确的做法是解耦:Fluentd 的职责是把事件可靠地投递到一个消息队列(我们选择了 RabbitMQ),然后让下游服务去消费,这样即使下游服务宕机,事件也会保留在队列中,不会影响日志收集。
第三步:构建 Matplotlib 可视化服务
这是一个独立的 Python 服务,我们用 Flask 快速搭建。它的唯一职责是:监听 RabbitMQ 的 biz_events_exchange,消费事件,聚合数据,并使用 Matplotlib 生成图表。
数据聚合是一个难点。单个日志事件不足以绘图。我们需要将一段时间内的事件聚合起来。这里我们选择 Redis 作为高速缓存和聚合器。
# visualizer_service.py
import os
import pika
import json
import redis
import logging
from datetime import datetime, timedelta
import matplotlib
matplotlib.use('Agg') # 关键!设置 Matplotlib 后端为非交互式
import matplotlib.pyplot as plt
import pandas as pd
import threading
import time
# --- 配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
OUTPUT_DIR = '/var/www/html/reports' # 假设 Nginx 会服务这个目录
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# 连接 Redis
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# --- 核心绘图逻辑 ---
def generate_order_trend_report():
"""
一个复杂的绘图函数示例:绘制过去60分钟的订单量和总金额趋势
"""
try:
logging.info("Generating order trend report...")
now = datetime.utcnow()
timestamps = []
amounts = []
# 从 Redis 的 Sorted Set 中获取过去一小时的数据
# 我们用 score 来存储 timestamp,用 member 来存储 amount:order_id
# 这种设计是为了防止重复处理同一个订单
one_hour_ago = int((now - timedelta(hours=1)).timestamp())
raw_data = redis_client.zrangebyscore('events:new_order', one_hour_ago, now.timestamp(), withscores=True)
if not raw_data:
logging.warning("No new_order data in the last hour to generate report.")
return
for member, score in raw_data:
amount, _ = member.split(':')
amounts.append(float(amount))
timestamps.append(datetime.fromtimestamp(score))
df = pd.DataFrame({'amount': amounts}, index=pd.to_datetime(timestamps))
# 按分钟重采样
orders_per_minute = df.index.to_series().resample('1min').count()
amount_per_minute = df['amount'].resample('1min').sum()
# --- Matplotlib 绘图 ---
fig, ax1 = plt.subplots(figsize=(12, 7))
# 风格化
plt.style.use('seaborn-v0_8-whitegrid')
# 坐标轴1:订单量
color = 'tab:blue'
ax1.set_xlabel('Time (UTC)')
ax1.set_ylabel('Orders per Minute', color=color)
ax1.plot(orders_per_minute.index, orders_per_minute.values, color=color, marker='o', linestyle='-', markersize=4, label='Orders/min')
ax1.tick_params(axis='y', labelcolor=color)
# 创建第二个 Y 轴共享 X 轴
ax2 = ax1.twinx()
color = 'tab:red'
ax2.set_ylabel('Total Amount per Minute ($)', color=color)
ax2.plot(amount_per_minute.index, amount_per_minute.values, color=color, marker='x', linestyle='--', markersize=4, label='Amount/min')
ax2.tick_params(axis='y', labelcolor=color)
# 标题和图例
total_orders = len(df)
total_amount = df['amount'].sum()
plt.title(f'Order Trends (Last 60 Minutes)\nTotal Orders: {total_orders}, Total Amount: ${total_amount:,.2f}')
fig.tight_layout()
plt.grid(True)
# 保存文件
filename = f"order_trend_{now.strftime('%Y%m%d%H%M%S')}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
latest_link = os.path.join(OUTPUT_DIR, 'order_trend_latest.png')
plt.savefig(filepath, dpi=150, bbox_inches='tight')
plt.close(fig) # 必须关闭,否则会内存泄漏
# 创建一个符号链接指向最新的报告,方便业务方访问固定 URL
if os.path.exists(latest_link):
os.remove(latest_link)
os.symlink(filepath, latest_link)
logging.info(f"Report generated: {filepath}")
except Exception as e:
logging.error(f"Failed to generate order trend report: {e}", exc_info=True)
# --- RabbitMQ 消费者 ---
def event_consumer():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange='biz_events_exchange', exchange_type='topic')
# 声明一个独占队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定我们关心的事件
channel.queue_bind(exchange='biz_events_exchange', queue=queue_name, routing_key='new_order')
channel.queue_bind(exchange='biz_events_exchange', queue=queue_name, routing_key='new_registration')
def callback(ch, method, properties, body):
try:
message = json.loads(body)
event_name = message.get('biz_event', {}).get('name')
event_data = message.get('biz_event', {}).get('data')
if not event_name or not event_data:
logging.warning(f"Malformed biz_event received: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
# 数据处理与存入 Redis
if event_name == 'new_order':
# 使用 Sorted Set 存储,score 是时间戳,member 是唯一值,防止重复
timestamp = message.get('time', int(datetime.utcnow().timestamp()))
order_id = event_data.get('order_id', time.time()) # fallback
member = f"{event_data['amount']}:{order_id}"
redis_client.zadd('events:new_order', {member: timestamp})
# 设置过期时间,自动清理老数据
redis_client.expire('events:new_order', timedelta(days=1))
# 其他事件处理...
logging.info(f"Processed event '{event_name}'")
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON message: {body}")
except Exception as e:
logging.error(f"Error processing message: {e}", exc_info=True)
finally:
# 确认消息已被处理
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
logging.info('Waiting for business events. To exit press CTRL+C')
channel.start_consuming()
# --- 定时报告生成器 ---
def report_scheduler():
while True:
generate_order_trend_report()
# ... generate other reports ...
time.sleep(300) # 每5分钟生成一次报告
if __name__ == '__main__':
# 在独立线程中运行消费者
consumer_thread = threading.Thread(target=event_consumer, daemon=True)
consumer_thread.start()
# 在主线程中运行调度器
report_scheduler()
这里的架构决策是:消费者(event_consumer)只负责接收事件并快速写入 Redis,它不进行任何慢操作。而实际的绘图(generate_order_trend_report)由一个独立的调度器(report_scheduler)周期性触发。这进一步实现了解耦和削峰填谷。即使瞬间涌入大量订单事件,消费者也只是快速写入 Redis,绘图任务的频率是固定的,系统负载更平稳。
整个流程的 Mermaid 图示如下:
graph TD
subgraph Service A [Ruby on Rails]
A1(OrdersController) -- emits log --> A2(Lograge JSON Log)
end
subgraph Service B [Express.js]
B1(Activity Route) -- emits log --> B2(Winston JSON Log)
end
A2 -- forward protocol --> C1(Fluentd Agent)
B2 -- forward protocol --> C1
subgraph Central Logging [Fluentd Aggregator]
C1 -- TCP --> C2(Fluentd Main)
C2 -- Parse & Filter --> C3{Has biz_event?}
end
C3 -- Yes --> D1(RabbitMQ Exchange)
C3 -- No --> E1(S3 Archive)
subgraph Visualization Service [Python]
D1 -- Consumes --> F1(Event Consumer)
F1 -- Writes data --> F2(Redis)
F3(Report Scheduler) -- triggers every 5 min --> F4(Plotting Logic)
F4 -- Reads data --> F2
F4 -- Uses Matplotlib --> F5(Generate PNG)
F5 -- Saves to --> F6(Shared Volume/Nginx)
end
G1(Business User) -- accesses --> F6
局限性与未来迭代方向
这套系统上线后稳定运行了一段时间,成功地将业务可视化需求与核心业务系统解耦。但它并非完美,还存在一些局限和可以改进的地方:
数据聚合的粒度问题:目前所有聚合都在 Python 服务的内存中(通过 Pandas)或在 Redis 中完成。当数据量进一步增长,或者需要更复杂的跨时间窗口聚合时,Redis 可能会成为瓶颈。一个更长远的方案是将 Fluentd 的
biz_event输出对接到一个真正的时序数据库(如 InfluxDB 或 ClickHouse),这样可视化服务就可以执行更强大的查询,而不是自己维护状态。报告触发机制:目前是固定周期触发。对于某些事件,业务方可能希望“事件驱动”地触发报告。例如,当一个小时内的订单量突增超过某个阈值时,立即生成一份异常报告。这可以通过在 Python 消费者中增加一个逻辑判断来实现,当检测到异常时,直接调用绘图函数或将其推入一个高优先级队列。
服务自身的健壮性:Python 可视化服务目前是单点。虽然消费者和调度器是多线程的,但进程本身挂了就会中断服务。需要将其容器化,并使用 Kubernetes 之类的工具进行部署,保证其高可用。此外,需要为它添加更完善的监控,比如报告生成耗时、成功率等指标。
配置的复杂性:整个管道涉及多个组件的配置,Fluentd 的配置尤为关键。任何一个环节的格式变更都可能导致数据丢失。未来可以引入配置管理工具,并为日志 schema 建立版本控制和自动化测试,确保上游服务的日志变更不会意外破坏下游的处理逻辑。