Apache Flume 诞生于 2011 年的 Cloudera,是 Hadoop 生态里最早解决"怎么把日志收进 HDFS"问题的工具。它的设计目标很明确:大规模分布式日志采集,可靠、灵活、可扩展。
虽然在云原生时代已被更轻量的方案部分取代,但 Flume 的架构设计——特别是它的事务机制和 Agent 拓扑——至今仍是理解日志采集系统设计的最佳教材。本文从架构原理出发,完整剖析 Flume 的每一层。
一、整体架构:Agent 是一切的基础
Flume 的核心是 Agent——一个 JVM 进程,包含三个组件:
graph LR
ExtSrc["外部数据源 (文件/网络/消息队列)"]
ExtSrc -->|"读取/接收"| Source["Source 数据采集 封装成 Event"]
Source -->|"put(事务保护)"| Channel["Channel 数据缓冲 解耦 Source 与 Sink"]
Channel -->|"take(事务保护)"| Sink["Sink 数据写出 (HDFS/Kafka/HBase)"]
- Source:从外部数据源(文件、网络、消息队列)拉取或接收数据,封装成 Event
- Channel:Event 的临时存储,起缓冲作用,是 Source 和 Sink 之间的解耦层
- Sink:从 Channel 取出 Event,写入目的地(HDFS、Kafka、HBase 等)
三个组件之间的数据流动由事务(Transaction)保护,这是 Flume 可靠性的核心。
Event:Flume 的基本数据单元
// Event 结构极简:headers + body
public interface Event {
Map<String, String> getHeaders(); // 元数据键值对(可用于路由)
byte[] getBody(); // 实际数据内容(一般是一行日志)
}
// 示例:一条 Nginx 访问日志对应的 Event
headers: {
"host": "web-server-01",
"timestamp": "1715385600000",
"source": "/var/log/nginx/access.log"
}
body: b'10.0.0.1 - - [11/May/2026:10:00:00] "GET /api/v1/users HTTP/1.1" 200 1234'
二、Source 深度解析
Source 负责从外部获取数据。Flume 内置了十几种 Source,覆盖主流数据源。
Taildir Source(最常用)
监控一组目录下的文件,支持断点续传,是生产环境日志采集的首选:
# flume-agent.conf
agent.sources = r1
agent.sources.r1.type = TAILDIR
# 指定要监控的文件组(支持通配符)
agent.sources.r1.filegroups = f1 f2
agent.sources.r1.filegroups.f1 = /var/log/nginx/access.log
agent.sources.r1.filegroups.f2 = /var/log/app/*.log
# 断点续传文件:记录每个文件已读取到的位置(inode + 偏移量)
agent.sources.r1.positionFile = /var/flume/taildir_position.json
# 批量读取条数(每次最多从文件读多少行)
agent.sources.r1.batchSize = 1000
Taildir Source 的断点续传原理:用 JSON 文件记录每个被监控文件的 inode 号和读取偏移量。Agent 重启后从 positionFile 恢复,不会重复读取已发送的数据(注意:不能保证完全 Exactly-Once,极端情况下可能重发最后一批)。
Kafka Source
从 Kafka Topic 消费数据,适合构建"Kafka → Flume → HDFS"的数仓导入链路:
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.kafka.bootstrap.servers = broker1:9092,broker2:9092
agent.sources.r1.kafka.topics = app-logs,access-logs
agent.sources.r1.kafka.consumer.group.id = flume-hdfs-consumer
agent.sources.r1.batchSize = 1000
agent.sources.r1.batchDurationMillis = 2000 # 最多等 2 秒凑满一批
其他常用 Source
| Source 类型 | 说明 | 典型场景 |
|---|---|---|
| Avro Source | 接收其他 Agent 通过 Avro RPC 发送的数据 | 多级 Agent 汇聚 |
| Syslog Source | 接收 Syslog UDP/TCP 协议数据 | 系统日志采集 |
| Exec Source | 执行 shell 命令,将输出作为 Event | 自定义命令输出 |
| Spooling Dir Source | 监控目录,将新文件整体作为数据源 | 批量文件导入 |
| HTTP Source | 接收 HTTP POST 的 JSON 数据 | 应用主动上报 |
| Thrift Source | 接收 Thrift RPC 数据 | 跨语言数据采集 |
三、Channel 深度解析:可靠性的关键
Channel 是 Source 和 Sink 之间的缓冲区。它的设计决定了 Flume 的可靠性和性能。
事务机制:at-least-once 语义
Flume 通过两个独立的事务保护数据流转:
sequenceDiagram
participant Src as Source
participant Ch as Channel
participant Sk as Sink
participant Dest as 目的地(HDFS/Kafka)
Note over Src,Ch: Put 事务
Src->>Ch: 开启 Put 事务
Src->>Ch: put 一批 Event
alt 全部成功
Ch-->>Src: commit,Event 持久化到 Channel
else 失败
Ch-->>Src: rollback,Source 下次重试
end
Note over Ch,Dest: Take 事务
Sk->>Ch: 开启 Take 事务
Sk->>Ch: take 一批 Event
Sk->>Dest: 写入目的地
alt 写入成功
Sk->>Ch: commit,Channel 删除这批 Event
else 写入失败
Sk->>Ch: rollback,Event 回到 Channel 等待重试
end
这套机制保证了 at-least-once(至少一次)语义:数据不会丢失,但在 Sink 成功写入后 Agent 崩溃的极端情况下,可能重复发送。
Memory Channel:高吞吐,低可靠
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000 # Channel 最大容纳 Event 数
agent.channels.c1.transactionCapacity = 1000 # 每个事务最多处理 Event 数
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 800000000 # Channel 最大字节数(800MB)
数据存在 JVM 堆内存,Agent 崩溃时 Channel 中的数据全部丢失。适合允许少量丢失、追求高吞吐的场景(如监控指标采集)。
File Channel:低吞吐,高可靠
agent.channels.c1.type = file
agent.channels.c1.checkpointDir = /var/flume/checkpoint
agent.channels.c1.dataDirs = /var/flume/data1,/var/flume/data2 # 多目录提升吞吐
agent.channels.c1.capacity = 10000000 # 最大 Event 数
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.checkpointInterval = 30000 # 每 30 秒做一次 checkpoint
数据写入磁盘(WAL,Write-Ahead Log),崩溃恢复后数据完整。代价是吞吐量比 Memory Channel 低 5-10 倍。适合日志不能丢失的场景(如交易日志、审计日志)。
Kafka Channel:兼顾可靠与吞吐
agent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.c1.kafka.bootstrap.servers = broker1:9092
agent.channels.c1.kafka.topic = flume-channel-topic
agent.channels.c1.kafka.consumer.group.id = flume-channel-group
用 Kafka 作为 Channel,把 Event 写入 Kafka Topic。可靠性由 Kafka 保证,同时 Kafka 本身就是高吞吐的。这种模式下 Sink 直接从 Kafka 消费,实际上 Flume 退化成了一个 Kafka Producer。
四、Sink 深度解析
HDFS Sink(最重要)
将 Event 写入 HDFS,是 Flume 最核心的使用场景:
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = /data/logs/%Y/%m/%d/%H # 按时间分目录
agent.sinks.k1.hdfs.filePrefix = access-log
agent.sinks.k1.hdfs.fileSuffix = .log
# 文件滚动策略(三个条件任意一个触发就滚动生成新文件)
agent.sinks.k1.hdfs.rollInterval = 3600 # 每 1 小时滚动一次
agent.sinks.k1.hdfs.rollSize = 134217728 # 文件达到 128MB 滚动
agent.sinks.k1.hdfs.rollCount = 1000000 # 写入 100 万条滚动
# 文件格式
agent.sinks.k1.hdfs.fileType = DataStream # 普通文本
# agent.sinks.k1.hdfs.fileType = CompressedStream # 压缩文本
# agent.sinks.k1.hdfs.codeC = snappy # Snappy 压缩
# 写入批次大小
agent.sinks.k1.hdfs.batchSize = 1000
# 防止 HDFS 小文件问题:使用 .tmp 后缀标记写入中的文件
agent.sinks.k1.hdfs.inUseSuffix = .tmp
文件滚动策略是 HDFS Sink 最重要的调优参数。太频繁滚动会产生大量小文件(HDFS 的致命问题);太不频繁则实时性差,且单文件过大影响后续 MapReduce 的并行度。
Kafka Sink
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092
agent.sinks.k1.kafka.topic = app-logs
agent.sinks.k1.kafka.flumeBatchSize = 2000
agent.sinks.k1.kafka.producer.acks = 1 # 性能优先
# agent.sinks.k1.kafka.producer.acks = all # 可靠性优先
其他常用 Sink
| Sink 类型 | 说明 |
|---|---|
| Avro Sink | 把 Event 通过 Avro RPC 发给另一个 Agent(多级聚合必用) |
| HBase Sink | 写入 HBase,支持按列族映射 |
| Elasticsearch Sink | 写入 ES,适合日志检索场景 |
| File Roll Sink | 写本地文件,按时间/大小滚动 |
| Logger Sink | 打印到日志,仅用于调试 |
| Null Sink | 丢弃所有数据,用于压测 Source/Channel |
五、拦截器与通道选择器:数据处理与路由
拦截器(Interceptor):在 Source 侧处理 Event
拦截器在 Event 进入 Channel 之前对其进行过滤或修改:
# 时间戳拦截器:给 Event 打上采集时间戳(HDFS Sink 路径用到 %Y/%m/%d 时必须加)
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = timestamp
# Host 拦截器:把采集机器的主机名加入 headers
agent.sources.r1.interceptors = i1 i2
agent.sources.r1.interceptors.i1.type = timestamp
agent.sources.r1.interceptors.i2.type = host
agent.sources.r1.interceptors.i2.hostHeader = hostname
# 正则过滤拦截器:过滤掉匹配正则的 Event(或只保留匹配的)
agent.sources.r1.interceptors.i3.type = regex_filter
agent.sources.r1.interceptors.i3.regex = .*ERROR.*
agent.sources.r1.interceptors.i3.excludeEvents = false # false=只保留匹配的
# 正则提取拦截器:从 body 提取字段放入 headers(用于后续路由)
agent.sources.r1.interceptors.i4.type = regex_extractor
agent.sources.r1.interceptors.i4.regex = (\\w+)\\s+(\\d+)
agent.sources.r1.interceptors.i4.serializers = s1 s2
agent.sources.r1.interceptors.i4.serializers.s1.name = level
agent.sources.r1.interceptors.i4.serializers.s2.name = code
通道选择器(Channel Selector):一份数据发给多个 Channel
# Replicating(复制,默认):Event 发给所有 Channel(数据备份)
agent.sources.r1.selector.type = replicating
agent.sources.r1.channels = c1 c2 # c1 写 HDFS,c2 写 Kafka,两份都有
# Multiplexing(多路复用):根据 header 值路由到不同 Channel
agent.sources.r1.selector.type = multiplexing
agent.sources.r1.selector.header = level # 按 level header 路由
agent.sources.r1.selector.mapping.ERROR = c_error # ERROR 级别 → c_error Channel
agent.sources.r1.selector.mapping.WARN = c_warn
agent.sources.r1.selector.mapping.INFO = c_info
agent.sources.r1.selector.default = c_info # 未匹配的走默认
六、Agent 拓扑设计
Flume 的真正威力在于多个 Agent 的组合,构成灵活的数据采集网络。
单 Agent(简单场景)
Web Server(日志文件)
→ [Taildir Source → File Channel → HDFS Sink]
→ HDFS
扇入(Fan-in):多对一汇聚
多台 Web 服务器各自运行一个 Collector Agent,把日志发给汇聚层的 Aggregator Agent,再统一写入 HDFS。汇聚层便于统一管理 HDFS 写入,避免数千台服务器直连 HDFS 的连接风暴:
graph TD
W1["Web-01 Taildir Source Memory Channel Avro Sink"]
W2["Web-02 Taildir Source Memory Channel Avro Sink"]
W3["Web-03 Taildir Source Memory Channel Avro Sink"]
Wn["...(数百台)"]
AGG["Aggregator Agent Avro Source File Channel HDFS Sink"]
HDFS["HDFS"]
W1 -->|"Avro RPC"| AGG
W2 -->|"Avro RPC"| AGG
W3 -->|"Avro RPC"| AGG
Wn -->|"Avro RPC"| AGG
AGG --> HDFS
# Collector Agent(部署在每台 Web 服务器)
collector.sources = r1
collector.sources.r1.type = TAILDIR
collector.sources.r1.filegroups.f1 = /var/log/nginx/access.log
collector.sources.r1.channels = c1
collector.channels = c1
collector.channels.c1.type = memory
collector.channels.c1.capacity = 10000
collector.sinks = k1
collector.sinks.k1.type = avro
collector.sinks.k1.hostname = aggregator.example.com
collector.sinks.k1.port = 4141
collector.sinks.k1.channel = c1
# Aggregator Agent(部署在汇聚层,少量机器)
aggregator.sources = r1
aggregator.sources.r1.type = avro
aggregator.sources.r1.bind = 0.0.0.0
aggregator.sources.r1.port = 4141
aggregator.sources.r1.channels = c1
aggregator.channels = c1
aggregator.channels.c1.type = file
aggregator.channels.c1.checkpointDir = /var/flume/checkpoint
aggregator.channels.c1.dataDirs = /var/flume/data
aggregator.sinks = k1
aggregator.sinks.k1.type = hdfs
aggregator.sinks.k1.hdfs.path = /data/logs/%Y/%m/%d/%H
aggregator.sinks.k1.channel = c1
扇出(Fan-out):一份数据多路输出
一份日志同时写入 HDFS(离线分析)和 Kafka(实时处理):
graph TD
LogFile["日志文件"]
LogFile --> Taildir["Taildir Source (Replicating Channel Selector)"]
Taildir --> CH["Memory Channel(HDFS)"]
Taildir --> CK["Memory Channel(Kafka)"]
CH --> HS["HDFS Sink"]
CK --> KS["Kafka Sink"]
HS --> HDFS["HDFS(离线分析)"]
KS --> Kafka["Kafka(实时处理)"]
agent.sources = r1
agent.channels = c_hdfs c_kafka
agent.sinks = k_hdfs k_kafka
agent.sources.r1.type = TAILDIR
agent.sources.r1.filegroups.f1 = /var/log/app/*.log
# Replicating 选择器把 Event 同时放入两个 Channel
agent.sources.r1.selector.type = replicating
agent.sources.r1.channels = c_hdfs c_kafka
# HDFS Channel + Sink
agent.channels.c_hdfs.type = file
agent.sinks.k_hdfs.type = hdfs
agent.sinks.k_hdfs.hdfs.path = /data/logs/%Y/%m/%d
agent.sinks.k_hdfs.channel = c_hdfs
# Kafka Channel + Sink
agent.channels.c_kafka.type = memory
agent.sinks.k_kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k_kafka.kafka.bootstrap.servers = broker1:9092
agent.sinks.k_kafka.kafka.topic = app-logs
agent.sinks.k_kafka.channel = c_kafka
多路复用路由:按日志级别分流
日志文件
→ [Taildir Source + RegexExtractor 拦截器]
├── ERROR 级别 → [Channel → HDFS Sink(错误日志目录)]
└── INFO/WARN → [Channel → HDFS Sink(普通日志目录)]
七、Sink Processor:高可用与负载均衡
当一个 Channel 对应多个 Sink 时,Sink Processor 决定如何使用这些 Sink。
Load Balancing Sink Processor
agent.sinkgroups = sg1
agent.sinkgroups.sg1.sinks = k1 k2 k3
agent.sinkgroups.sg1.processor.type = load_balance
agent.sinkgroups.sg1.processor.selector = round_robin # 或 random
agent.sinkgroups.sg1.processor.backoff = true # 失败后退避,不立即重试
三个 Sink 轮询写入,提升吞吐量。适合写入多个 Avro Aggregator Agent 的场景。
Failover Sink Processor
agent.sinkgroups.sg1.processor.type = failover
agent.sinkgroups.sg1.processor.priority.k1 = 100 # 主 Sink,优先级最高
agent.sinkgroups.sg1.processor.priority.k2 = 80 # 备 Sink
agent.sinkgroups.sg1.processor.maxpenalty = 30000 # 失败后最长等待 30 秒再重试
主 Sink 挂掉后自动切换到备 Sink,主 Sink 恢复后自动切回。适合生产环境的高可用部署。
八、性能调优关键参数
批量大小:最重要的参数
Source batchSize:Source 一次从数据源读多少 Event 放入 Channel
Channel transactionCapacity:一个事务最多处理多少 Event
Sink batchSize:Sink 一次从 Channel 取多少 Event 写出
三者的关系:
Source batchSize ≤ Channel transactionCapacity
Sink batchSize ≤ Channel transactionCapacity
推荐值:
低延迟场景:batchSize = 100-500,transactionCapacity = 1000
高吞吐场景:batchSize = 1000-5000,transactionCapacity = 10000
线程模型
# Taildir Source 并发读取线程数(每个文件组一个线程)
agent.sources.r1.maxBatchCount = 100
# HDFS Sink 写入线程数(多线程写多个 HDFS 文件)
agent.sinks.k1.hdfs.threadsPoolSize = 10
agent.sinks.k1.hdfs.rollTimerPoolSize = 1
JVM 参数调优
# flume-env.sh
export JAVA_OPTS="-Xms4g -Xmx4g \ # 堆内存(Memory Channel 用堆内存,要给够)
-XX:+UseG1GC \ # G1 GC,减少停顿
-XX:MaxGCPauseMillis=100 \ # GC 最大停顿 100ms
-XX:+HeapDumpOnOutOfMemoryError \
-XX:HeapDumpPath=/var/flume/heapdump"
九、监控:生产环境必须关注的指标
Flume 内置了 JMX 和 HTTP JSON 两种监控接口:
# 启动时开启 HTTP 监控接口
flume-ng agent \
--conf conf \
--conf-file flume.conf \
--name agent \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34545
# 查看监控数据
curl http://localhost:34545/metrics | python -m json.tool
输出示例:
{
"SOURCE.r1": {
"EventReceivedCount": 10000000, // 已接收 Event 数
"EventAcceptedCount": 9999950, // 成功放入 Channel 的 Event 数
"AppendBatchAcceptedCount": 9999 // 成功的批次数
},
"CHANNEL.c1": {
"EventPutSuccessCount": 9999950,
"EventTakeSuccessCount": 9999900,
"ChannelSize": 50, // 当前 Channel 中积压的 Event 数(关键指标!)
"ChannelCapacity": 100000
},
"SINK.k1": {
"EventDrainSuccessCount": 9999900,
"EventDrainAttemptCount": 9999950,
"BatchCompleteCount": 9999,
"ConnectionFailedCount": 0 // 连接失败次数(>0 说明有问题)
}
}
最重要的监控指标:
- ChannelSize:Channel 积压量,持续增长说明 Sink 消费跟不上 Source 生产,需要扩容 Sink 或加大 batchSize
- EventAcceptedCount vs EventReceivedCount:差值大说明 Channel 满了,Source 在等待
- ConnectionFailedCount:Sink 连接目的地失败次数,>0 需要立即排查
十、与现代方案对比:何时还应该用 Flume
| 维度 | Flume | Filebeat | Kafka Connect | Fluentd |
|---|---|---|---|---|
| 部署重量 | 重(JVM) | 轻(Go) | 中(JVM) | 中(Ruby) |
| 配置复杂度 | 高 | 低 | 中 | 中 |
| HDFS 写入 | 原生支持,成熟 | 不支持 | 通过插件 | 通过插件 |
| Kafka 集成 | 支持但非原生 | 原生支持 | 原生,最强 | 支持 |
| 数据处理 | 拦截器(有限) | 处理器(有限) | Transform(中等) | 过滤器(强) |
| 可靠性 | at-least-once | at-least-once | exactly-once(部分) | at-least-once |
| 适合场景 | Hadoop 生态,HDFS 写入 | 日志→Elasticsearch/Kafka | 数据库→Kafka 变更捕获 | 多目的地路由 |
还应该用 Flume 的场景:
- 已有大量 Flume 部署的 Hadoop 生态,迁移成本高
- 需要直接可靠地写入 HDFS,且团队熟悉 Flume 运维
- 需要灵活的多级 Agent 拓扑(扇入/扇出/多路复用组合)
应该换掉 Flume 的场景:
- 新建系统,目的地是 Kafka 或 Elasticsearch → 用 Filebeat(轻量、运维简单)
- 需要数据库 CDC(Change Data Capture)→ 用 Kafka Connect + Debezium
- 需要强数据处理能力 → 用 Flink 或 Logstash
- 云原生环境(K8s)→ Fluentd/Fluentbit 更适合容器日志采集
十一、关键点总结
- Agent = Source + Channel + Sink,三者由事务(Transaction)连接,保证 at-least-once 语义
- Channel 的选择决定可靠性:Memory Channel 快但会丢数据,File Channel 慢但持久化,Kafka Channel 兼顾两者
- Taildir Source 是生产环境日志采集的首选:支持断点续传,positionFile 记录 inode+偏移量,Agent 重启不重复发送
- HDFS Sink 的文件滚动策略是关键调优点:rollInterval/rollSize/rollCount 控制文件大小,避免小文件问题
- 拦截器链在 Source 侧处理 Event:时间戳拦截器是 HDFS 时间分区路径的必要前提
- 通道选择器实现扇出:Replicating 复制到所有 Channel,Multiplexing 按 header 路由
- 多级 Agent 拓扑解决扩展性问题:大量采集机器 → 少量汇聚机器 → HDFS,避免直连 HDFS 的连接风暴
- ChannelSize 是最重要的监控指标:持续增长意味着消费跟不上生产,是扩容的信号
- 新系统优先考虑 Filebeat/Kafka Connect:Flume 的优势领域已收窄到 Hadoop 生态的 HDFS 直写场景