这里分享下最近用 Elasticsearch + ClickHouse 做日志分析的一些折腾。
缘起
最近看了不少 Elasticsearch 和 ClickHouse 相关的文章和帖子,手痒想自己动手试试。正好手上有一些云下业务系统有着十分可观的日志量,就边学边做把想法落地了。
日志是系统运行的第一手记录,里面藏着很多有价值的信息——时间戳、耗时、交易流水号、用户 ID 等等。如果能把这些非结构化的文本变成可查询、可统计、可告警的数据,那日志就不只是排查问题时才翻出来看的"黑匣子"了。
另外,即使在有 APM 的系统里,日志监控也有它独特的价值。APM 更多是技术视角(调用链、方法耗时),而日志往往承载着业务语义(交易码、各环节的状态等)。两者是互补的。
这篇文章就记录下我这次折腾的过程和一些心得。
下面是整体的技术架构:
┌─────────────────────────────────────────────┐
│ Kafka 集群 (多 Topic) │
└─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 高性能日志处理引擎 │
│ ├─ 字段提取(Processor) │
│ ├─ 批量写入 Elasticsearch │
│ └─ 双写 ClickHouse(通过 Kafka 中间层) │
└─────────────────────────────────────────────┘
│
┌───────┴───────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Elasticsearch │ │ ClickHouse │
│ 日志检索/告警 │ │ 性能分析/统计 │
└──────────────────┘ └──────────────────┘日志采集
采集这块没什么特别的,用的还是 Filebeat + Kafka 这套经典组合:
- Filebeat 部署在各应用服务器,负责收集日志文件
- 日志发送到 Kafka 的不同 Topic,按应用区分
- 自己写的高性能消费端从 Kafka 拉取消息处理
之所以加一层 Kafka,主要是为了:
- 削峰填谷:高峰期不会打垮后端存储
- 解耦:采集和处理可以独立扩展
- 可靠性:消息持久化,出问题还能重放
字段解析与提取
日志原本就是一堆非结构化文本,要变成可分析的数据,得用正则把关键字段扒出来。
这里定义了一个处理器接口:
type Processor interface {
Process(msg *sarama.ConsumerMessage) *Result
Name() string
}
type Result struct {
Success bool
Document *ProcessedDocument
Error error
Retryable bool // 是否可重试
SkipReason string // 跳过原因
}日志头部解析
大多数 Java 应用的日志长这样:
2024-01-01 12:00:00,123 INFO [com.example.Service] 业务处理完成用正则提取时间戳、日志级别和 Logger:
headerRegex := regexp.MustCompile(
`^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+` +
`(DEBUG|INFO|WARN|ERROR)\s+\[([^\]]+)\]\s+(.+)$`)
func (p *Processor) parseHeader(data map[string]interface{}, message string) {
matches := p.headerRegex.FindStringSubmatch(message)
if len(matches) == 5 {
loc, _ := time.LoadLocation("Asia/Shanghai")
ts, err := time.ParseInLocation("2006-01-02 15:04:05,000", matches[1], loc)
if err == nil {
data["@timestamp"] = ts.Format(time.RFC3339Nano)
}
data["log.level"] = strings.ToLower(matches[2])
data["log.logger"] = matches[3]
data["message"] = matches[4]
}
}业务字段提取
从日志内容里提取交易流水号、用户 ID、耗时这些关键信息:
traceIDRegex := regexp.MustCompile(`(?:会话ID|交易流水)[::]?\s*([^,,\s]+)`)
userIDRegex := regexp.MustCompile(`用户ID[::]?\s*([A-Za-z0-9._-]+)`)
fullMSRegex := regexp.MustCompile(`整个交易消耗时间为ms[::]?(\d+)`)
serverMSRegex := regexp.MustCompile(`当前服务器执行耗时\(ms\)[::]?(\d+)`)
func (p *Processor) extractCostMetrics(data map[string]interface{}, message string) {
var fullMS, serverMS int64
if matches := p.fullMSRegex.FindStringSubmatch(message); len(matches) > 1 {
fullMS, _ = strconv.ParseInt(matches[1], 10, 64)
}
if matches := p.serverMSRegex.FindStringSubmatch(message); len(matches) > 1 {
serverMS, _ = strconv.ParseInt(matches[1], 10, 64)
}
if fullMS > 0 {
data["cost_ms"] = fullMS
data["metrics.cost.source"] = "full"
} else if serverMS > 0 {
data["cost_ms"] = serverMS
data["metrics.cost.source"] = "server"
}
}添加元数据
每条日志还会带上 Kafka 来源信息,排查问题时能用得上:
func (p *Processor) addMetadata(data map[string]interface{}, msg *sarama.ConsumerMessage) {
if _, ok := data["@timestamp"]; !ok {
data["@timestamp"] = msg.Timestamp.Format(time.RFC3339Nano)
}
kafkaMeta := map[string]interface{}{
"topic": msg.Topic,
"partition": msg.Partition,
"offset": msg.Offset,
}
data["kafka"] = kafkaMeta
}双写存储架构
处理后的数据同时写 Elasticsearch 和 ClickHouse,各有各的用处:
| 存储 | 干嘛用 |
|---|---|
| Elasticsearch | 日志检索、全文搜索、实时告警 |
| ClickHouse | 聚合统计、趋势分析、做报表 |
Elasticsearch 批量写入
用 Bulk API 批量写入,效率高不少:
type Writer struct {
client *elastic.Client
processor *elastic.BulkProcessor
logger *zap.Logger
}
func (w *Writer) createBulkProcessor(cfg *ElasticsearchConfig) (*elastic.BulkProcessor, error) {
return w.client.BulkProcessor().
Name("log-bulk-processor").
Workers(cfg.Workers).
BulkActions(cfg.BulkActions).
BulkSize(cfg.BulkSize << 20).
FlushInterval(cfg.FlushInterval).
Before(w.beforeBulk).
After(w.afterBulk).
Do(context.Background())
}
func (w *Writer) Add(doc Document) error {
req := elastic.NewBulkIndexRequest().
Index(doc.Index()).
Doc(doc.Body())
w.processor.Add(req)
return nil
}ClickHouse 通过 Kafka 中间层写入
ClickHouse 原生支持 Kafka 引擎,这点挺好用的,直接消费 Kafka 数据就行:
type Producer struct {
producer sarama.SyncProducer
topic string
enabled bool
}
// 异步发送,不阻塞主流程
func (p *Producer) SendAsync(doc *ProcessedDocument) {
if !p.enabled {
return
}
go func() {
data, _ := json.Marshal(doc.Body())
msg := &sarama.ProducerMessage{
Topic: p.topic,
Value: sarama.ByteEncoder(data),
}
p.producer.SendMessage(msg)
}()
}ClickHouse 表结构
分布式表 + 物化视图,数据自动入库:
-- 本地存储表
CREATE TABLE logs_local ON CLUSTER 'cluster_name' (
timestamp DateTime64(3) DEFAULT now64(3),
index_day Date MATERIALIZED toDate(timestamp),
message String,
log_level LowCardinality(String) DEFAULT '',
logger String DEFAULT '',
service_name LowCardinality(String) DEFAULT '',
trace_id String DEFAULT '',
transaction_id String DEFAULT '',
user_id String DEFAULT '',
cost_ms Int64 DEFAULT 0,
cost_source LowCardinality(String) DEFAULT '',
kafka_topic LowCardinality(String) DEFAULT '',
kafka_partition UInt32 DEFAULT 0,
kafka_offset UInt64 DEFAULT 0,
raw_json String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service_name, timestamp, trace_id)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;
-- Distributed 表
CREATE TABLE logs ON CLUSTER 'cluster_name'
AS logs_local
ENGINE = Distributed('cluster_name', 'db', 'logs_local', rand());
-- Kafka 消费表
CREATE TABLE kafka_logs (
raw String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'processed-logs',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'RawBLOB',
kafka_num_consumers = 5;
-- 物化视图自动解析
CREATE MATERIALIZED VIEW logs_mv TO logs AS
SELECT
parseDateTime64BestEffortOrZero(
JSONExtractString(raw, '@timestamp'), 3, 'Asia/Shanghai'
) AS timestamp,
JSONExtractString(raw, 'message') AS message,
lower(JSONExtractString(raw, 'log', 'level')) AS log_level,
JSONExtractString(raw, 'service', 'name') AS service_name,
JSONExtractString(raw, 'trace.id') AS trace_id,
JSONExtractInt(raw, 'cost_ms') AS cost_ms,
JSONExtractString(raw, 'metrics', 'cost', 'source') AS cost_source,
raw AS raw_json
FROM kafka_logs;性能监控与预警
有了结构化的日志数据,就可以搞性能监控和预警了。
Prometheus 指标
消费端暴露了一些关键指标给 Prometheus:
var (
KafkaMessagesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "log_kafka_messages_total",
Help: "Total number of messages consumed from Kafka",
},
[]string{"topic", "consumer"},
)
KafkaLag = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "log_kafka_lag",
Help: "Kafka consumer lag",
},
[]string{"topic", "partition", "consumer"},
)
ProcessingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "log_processing_duration_seconds",
Help: "Duration of message processing",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{"processor"},
)
)基于 ES 的告警
利用 ES 的聚合能力检测异常,比如错误率激增:
{
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-5m" } } },
{ "term": { "log.level": "error" } }
]
}
},
"aggs": {
"error_count_per_minute": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
}
}
}
}或者检测响应时间过长:
{
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-5m" } } },
{ "range": { "cost_ms": { "gte": 5000 } } }
]
}
},
"aggs": {
"slow_requests_by_transaction": {
"terms": { "field": "transaction.code", "size": 20 },
"aggs": {
"avg_cost": { "avg": { "field": "cost_ms" } }
}
}
}
}基于 ClickHouse 的性能分析
ClickHouse 的聚合性能确实强,跑这种统计查询很爽:
接口性能 TOP N
SELECT
service_name,
transaction_id AS endpoint,
count() AS request_count,
avg(cost_ms) AS avg_cost_ms,
quantile(0.99)(cost_ms) AS p99_cost_ms,
max(cost_ms) AS max_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND cost_ms > 0
GROUP BY service_name, transaction_id
ORDER BY avg_cost_ms DESC
LIMIT 20;时间趋势统计
SELECT
toStartOfMinute(timestamp) AS time_bucket,
service_name,
count() AS total_requests,
countIf(log_level = 'error') AS error_count,
avg(cost_ms) AS avg_cost_ms,
quantile(0.95)(cost_ms) AS p95_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 30 MINUTE
GROUP BY time_bucket, service_name
ORDER BY time_bucket, service_name;慢请求分布
SELECT
multiIf(
cost_ms < 100, '<100ms',
cost_ms < 500, '100-500ms',
cost_ms < 1000, '500ms-1s',
cost_ms < 5000, '1-5s',
'>5s'
) AS cost_bucket,
count() AS request_count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND cost_ms > 0
GROUP BY cost_bucket
ORDER BY
CASE cost_bucket
WHEN '<100ms' THEN 1
WHEN '100-500ms' THEN 2
WHEN '500ms-1s' THEN 3
WHEN '1-5s' THEN 4
ELSE 5
END;实际效果
跑了一段时间,效果还不错:
| 能力 | 说明 |
|---|---|
| 实时监控 | Kafka 延迟在秒级,基本能实时感知系统状态 |
| 性能分析 | 基于日志里的耗时字段,能分析出接口性能分布 |
| 错误追踪 | 通过 trace_id 串联一个请求的完整日志链路 |
| 趋势分析 | ClickHouse 跑大时间跨度的统计也很快 |
| 告警预警 | 基于 ES 查询触发告警 |
日志消费性能
目前部署了 4 个消费节点,跑了一段时间后的实际数据:
| 指标 | 数值 |
|---|---|
| 总吞吐量(峰值) | ~30K docs/s |
| ES 写入成功率 | 接近 100% |
| 消息处理延迟 P95 | 20-40ms |
| 搜索引擎入库吞吐 | 10K-20K docs/s |
| 队列积压 | 正常情况下接近 0 |
从监控看,凌晨时段业务量低,吞吐量降到几千;白天高峰期能到 30K/s,系统也没什么压力。
ClickHouse 性能分析数据
通过 ClickHouse 对日志做统计分析,可以拿到这样的性能概览(某次统计窗口内的数据):
| 指标 | 数值 |
|---|---|
| 总请求量 | 14.6 Million |
| 平均响应时间 | 90.9 ms |
| P95 响应时间 | 299 ms |
| P99 响应时间 | 1.12 s |
| 慢接口数 | 1.75 K |
| 错误率 | 0.03% |
还能按接口维度拆分,比如某个接口的请求量、平均耗时、P95/P99 延迟一目了然。日志级别的分布趋势(info/warn/error)也能直观地看到波动。
这些数据放以前只能靠人肉翻日志,现在几秒钟就出结果。
一点想法
整个过程其实就是把非结构化的日志变成结构化的数据,然后利用 ES 和 ClickHouse 各自的优势来做不同的事情。核心就几点:
- 日志格式要规范:越规范越好解析
- 双写架构:ES 负责检索告警,ClickHouse 负责统计分析,各司其职
- Kafka 缓冲:解耦采集与处理,也增加了弹性
- 指标体系:用 Prometheus 监控整个链路的健康状态
当然这套方案也有局限性,比如日志格式变了就得跟着改解析规则。不过整体来说,边学边做还是挺有收获的。
如果你也在做类似的事情,或者有更好的思路,欢迎交流探讨。