基于日志的系统性能分析

这里分享下最近用 Elasticsearch + ClickHouse 做日志分析的一些折腾。

缘起

最近看了不少 Elasticsearch 和 ClickHouse 相关的文章和帖子,手痒想自己动手试试。正好手上有一些云下业务系统有着十分可观的日志量,就边学边做把想法落地了。

日志是系统运行的第一手记录,里面藏着很多有价值的信息——时间戳、耗时、交易流水号、用户 ID 等等。如果能把这些非结构化的文本变成可查询、可统计、可告警的数据,那日志就不只是排查问题时才翻出来看的"黑匣子"了。

另外,即使在有 APM 的系统里,日志监控也有它独特的价值。APM 更多是技术视角(调用链、方法耗时),而日志往往承载着业务语义(交易码、各环节的状态等)。两者是互补的。

这篇文章就记录下我这次折腾的过程和一些心得。

下面是整体的技术架构:

┌─────────────────────────────────────────────┐
│           Kafka 集群 (多 Topic)              │
└─────────────────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│          高性能日志处理引擎                    │
│  ├─ 字段提取(Processor)                     │
│  ├─ 批量写入 Elasticsearch                   │
│  └─ 双写 ClickHouse(通过 Kafka 中间层)       │
└─────────────────────────────────────────────┘
                   │
          ┌───────┴───────┐
          ▼               ▼
┌──────────────────┐ ┌──────────────────┐
│  Elasticsearch   │ │   ClickHouse     │
│  日志检索/告警     │ │  性能分析/统计     │
└──────────────────┘ └──────────────────┘

日志采集

采集这块没什么特别的,用的还是 Filebeat + Kafka 这套经典组合:

  1. Filebeat 部署在各应用服务器,负责收集日志文件
  2. 日志发送到 Kafka 的不同 Topic,按应用区分
  3. 自己写的高性能消费端从 Kafka 拉取消息处理

之所以加一层 Kafka,主要是为了:

  • 削峰填谷:高峰期不会打垮后端存储
  • 解耦:采集和处理可以独立扩展
  • 可靠性:消息持久化,出问题还能重放

字段解析与提取

日志原本就是一堆非结构化文本,要变成可分析的数据,得用正则把关键字段扒出来。

这里定义了一个处理器接口:

type Processor interface {
    Process(msg *sarama.ConsumerMessage) *Result
    Name() string
}

type Result struct {
    Success    bool
    Document   *ProcessedDocument
    Error      error
    Retryable  bool   // 是否可重试
    SkipReason string // 跳过原因
}

日志头部解析

大多数 Java 应用的日志长这样:

2024-01-01 12:00:00,123 INFO [com.example.Service] 业务处理完成

用正则提取时间戳、日志级别和 Logger:

headerRegex := regexp.MustCompile(
    `^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+` +
    `(DEBUG|INFO|WARN|ERROR)\s+\[([^\]]+)\]\s+(.+)$`)

func (p *Processor) parseHeader(data map[string]interface{}, message string) {
    matches := p.headerRegex.FindStringSubmatch(message)
    if len(matches) == 5 {
        loc, _ := time.LoadLocation("Asia/Shanghai")
        ts, err := time.ParseInLocation("2006-01-02 15:04:05,000", matches[1], loc)
        if err == nil {
            data["@timestamp"] = ts.Format(time.RFC3339Nano)
        }
        data["log.level"] = strings.ToLower(matches[2])
        data["log.logger"] = matches[3]
        data["message"] = matches[4]
    }
}

业务字段提取

从日志内容里提取交易流水号、用户 ID、耗时这些关键信息:

traceIDRegex  := regexp.MustCompile(`(?:会话ID|交易流水)[::]?\s*([^,,\s]+)`)
userIDRegex   := regexp.MustCompile(`用户ID[::]?\s*([A-Za-z0-9._-]+)`)
fullMSRegex   := regexp.MustCompile(`整个交易消耗时间为ms[::]?(\d+)`)
serverMSRegex := regexp.MustCompile(`当前服务器执行耗时\(ms\)[::]?(\d+)`)

func (p *Processor) extractCostMetrics(data map[string]interface{}, message string) {
    var fullMS, serverMS int64
    
    if matches := p.fullMSRegex.FindStringSubmatch(message); len(matches) > 1 {
        fullMS, _ = strconv.ParseInt(matches[1], 10, 64)
    }
    if matches := p.serverMSRegex.FindStringSubmatch(message); len(matches) > 1 {
        serverMS, _ = strconv.ParseInt(matches[1], 10, 64)
    }
    
    if fullMS > 0 {
        data["cost_ms"] = fullMS
        data["metrics.cost.source"] = "full"
    } else if serverMS > 0 {
        data["cost_ms"] = serverMS
        data["metrics.cost.source"] = "server"
    }
}

添加元数据

每条日志还会带上 Kafka 来源信息,排查问题时能用得上:

func (p *Processor) addMetadata(data map[string]interface{}, msg *sarama.ConsumerMessage) {
    if _, ok := data["@timestamp"]; !ok {
        data["@timestamp"] = msg.Timestamp.Format(time.RFC3339Nano)
    }

    kafkaMeta := map[string]interface{}{
        "topic":     msg.Topic,
        "partition": msg.Partition,
        "offset":    msg.Offset,
    }
    data["kafka"] = kafkaMeta
}

双写存储架构

处理后的数据同时写 Elasticsearch 和 ClickHouse,各有各的用处:

存储干嘛用
Elasticsearch日志检索、全文搜索、实时告警
ClickHouse聚合统计、趋势分析、做报表

Elasticsearch 批量写入

用 Bulk API 批量写入,效率高不少:

type Writer struct {
    client    *elastic.Client
    processor *elastic.BulkProcessor
    logger    *zap.Logger
}

func (w *Writer) createBulkProcessor(cfg *ElasticsearchConfig) (*elastic.BulkProcessor, error) {
    return w.client.BulkProcessor().
        Name("log-bulk-processor").
        Workers(cfg.Workers).
        BulkActions(cfg.BulkActions).
        BulkSize(cfg.BulkSize << 20).
        FlushInterval(cfg.FlushInterval).
        Before(w.beforeBulk).
        After(w.afterBulk).
        Do(context.Background())
}

func (w *Writer) Add(doc Document) error {
    req := elastic.NewBulkIndexRequest().
        Index(doc.Index()).
        Doc(doc.Body())
    w.processor.Add(req)
    return nil
}

ClickHouse 通过 Kafka 中间层写入

ClickHouse 原生支持 Kafka 引擎,这点挺好用的,直接消费 Kafka 数据就行:

type Producer struct {
    producer sarama.SyncProducer
    topic    string
    enabled  bool
}

// 异步发送,不阻塞主流程
func (p *Producer) SendAsync(doc *ProcessedDocument) {
    if !p.enabled {
        return
    }
    go func() {
        data, _ := json.Marshal(doc.Body())
        msg := &sarama.ProducerMessage{
            Topic: p.topic,
            Value: sarama.ByteEncoder(data),
        }
        p.producer.SendMessage(msg)
    }()
}

ClickHouse 表结构

分布式表 + 物化视图,数据自动入库:

-- 本地存储表
CREATE TABLE logs_local ON CLUSTER 'cluster_name' (
    timestamp DateTime64(3) DEFAULT now64(3),
    index_day Date MATERIALIZED toDate(timestamp),
    
    message String,
    log_level LowCardinality(String) DEFAULT '',
    logger String DEFAULT '',
    
    service_name LowCardinality(String) DEFAULT '',
    trace_id String DEFAULT '',
    transaction_id String DEFAULT '',
    user_id String DEFAULT '',
    
    cost_ms Int64 DEFAULT 0,
    cost_source LowCardinality(String) DEFAULT '',
    
    kafka_topic LowCardinality(String) DEFAULT '',
    kafka_partition UInt32 DEFAULT 0,
    kafka_offset UInt64 DEFAULT 0,
    
    raw_json String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service_name, timestamp, trace_id)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;

-- Distributed 表
CREATE TABLE logs ON CLUSTER 'cluster_name'
AS logs_local
ENGINE = Distributed('cluster_name', 'db', 'logs_local', rand());

-- Kafka 消费表
CREATE TABLE kafka_logs (
    raw String
) ENGINE = Kafka
SETTINGS 
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'processed-logs',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'RawBLOB',
    kafka_num_consumers = 5;

-- 物化视图自动解析
CREATE MATERIALIZED VIEW logs_mv TO logs AS
SELECT
    parseDateTime64BestEffortOrZero(
        JSONExtractString(raw, '@timestamp'), 3, 'Asia/Shanghai'
    ) AS timestamp,
    JSONExtractString(raw, 'message') AS message,
    lower(JSONExtractString(raw, 'log', 'level')) AS log_level,
    JSONExtractString(raw, 'service', 'name') AS service_name,
    JSONExtractString(raw, 'trace.id') AS trace_id,
    JSONExtractInt(raw, 'cost_ms') AS cost_ms,
    JSONExtractString(raw, 'metrics', 'cost', 'source') AS cost_source,
    raw AS raw_json
FROM kafka_logs;

性能监控与预警

有了结构化的日志数据,就可以搞性能监控和预警了。

Prometheus 指标

消费端暴露了一些关键指标给 Prometheus:

var (
    KafkaMessagesTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "log_kafka_messages_total",
            Help: "Total number of messages consumed from Kafka",
        },
        []string{"topic", "consumer"},
    )

    KafkaLag = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "log_kafka_lag",
            Help: "Kafka consumer lag",
        },
        []string{"topic", "partition", "consumer"},
    )

    ProcessingDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "log_processing_duration_seconds",
            Help:    "Duration of message processing",
            Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},
        },
        []string{"processor"},
    )
)

基于 ES 的告警

利用 ES 的聚合能力检测异常,比如错误率激增:

{
  "query": {
    "bool": {
      "filter": [
        { "range": { "@timestamp": { "gte": "now-5m" } } },
        { "term": { "log.level": "error" } }
      ]
    }
  },
  "aggs": {
    "error_count_per_minute": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1m"
      }
    }
  }
}

或者检测响应时间过长:

{
  "query": {
    "bool": {
      "filter": [
        { "range": { "@timestamp": { "gte": "now-5m" } } },
        { "range": { "cost_ms": { "gte": 5000 } } }
      ]
    }
  },
  "aggs": {
    "slow_requests_by_transaction": {
      "terms": { "field": "transaction.code", "size": 20 },
      "aggs": {
        "avg_cost": { "avg": { "field": "cost_ms" } }
      }
    }
  }
}

基于 ClickHouse 的性能分析

ClickHouse 的聚合性能确实强,跑这种统计查询很爽:

接口性能 TOP N

SELECT 
    service_name,
    transaction_id AS endpoint,
    count() AS request_count,
    avg(cost_ms) AS avg_cost_ms,
    quantile(0.99)(cost_ms) AS p99_cost_ms,
    max(cost_ms) AS max_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
  AND cost_ms > 0
GROUP BY service_name, transaction_id
ORDER BY avg_cost_ms DESC
LIMIT 20;

时间趋势统计

SELECT 
    toStartOfMinute(timestamp) AS time_bucket,
    service_name,
    count() AS total_requests,
    countIf(log_level = 'error') AS error_count,
    avg(cost_ms) AS avg_cost_ms,
    quantile(0.95)(cost_ms) AS p95_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 30 MINUTE
GROUP BY time_bucket, service_name
ORDER BY time_bucket, service_name;

慢请求分布

SELECT 
    multiIf(
        cost_ms < 100, '<100ms',
        cost_ms < 500, '100-500ms',
        cost_ms < 1000, '500ms-1s',
        cost_ms < 5000, '1-5s',
        '>5s'
    ) AS cost_bucket,
    count() AS request_count,
    round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
  AND cost_ms > 0
GROUP BY cost_bucket
ORDER BY 
    CASE cost_bucket
        WHEN '<100ms' THEN 1
        WHEN '100-500ms' THEN 2
        WHEN '500ms-1s' THEN 3
        WHEN '1-5s' THEN 4
        ELSE 5
    END;

实际效果

跑了一段时间,效果还不错:

能力说明
实时监控Kafka 延迟在秒级,基本能实时感知系统状态
性能分析基于日志里的耗时字段,能分析出接口性能分布
错误追踪通过 trace_id 串联一个请求的完整日志链路
趋势分析ClickHouse 跑大时间跨度的统计也很快
告警预警基于 ES 查询触发告警

日志消费性能

目前部署了 4 个消费节点,跑了一段时间后的实际数据:

指标数值
总吞吐量(峰值)~30K docs/s
ES 写入成功率接近 100%
消息处理延迟 P9520-40ms
搜索引擎入库吞吐10K-20K docs/s
队列积压正常情况下接近 0

从监控看,凌晨时段业务量低,吞吐量降到几千;白天高峰期能到 30K/s,系统也没什么压力。

ClickHouse 性能分析数据

通过 ClickHouse 对日志做统计分析,可以拿到这样的性能概览(某次统计窗口内的数据):

指标数值
总请求量14.6 Million
平均响应时间90.9 ms
P95 响应时间299 ms
P99 响应时间1.12 s
慢接口数1.75 K
错误率0.03%

还能按接口维度拆分,比如某个接口的请求量、平均耗时、P95/P99 延迟一目了然。日志级别的分布趋势(info/warn/error)也能直观地看到波动。

这些数据放以前只能靠人肉翻日志,现在几秒钟就出结果。

一点想法

整个过程其实就是把非结构化的日志变成结构化的数据,然后利用 ES 和 ClickHouse 各自的优势来做不同的事情。核心就几点:

  1. 日志格式要规范:越规范越好解析
  2. 双写架构:ES 负责检索告警,ClickHouse 负责统计分析,各司其职
  3. Kafka 缓冲:解耦采集与处理,也增加了弹性
  4. 指标体系:用 Prometheus 监控整个链路的健康状态

当然这套方案也有局限性,比如日志格式变了就得跟着改解析规则。不过整体来说,边学边做还是挺有收获的。

如果你也在做类似的事情,或者有更好的思路,欢迎交流探讨。

添加新评论