Apache Doris(原百度 Palo)是一个基于 MPP 架构的实时分析型数据库,专为高并发、低延迟的 OLAP 查询场景设计。它兼容 MySQL 协议,无外部依赖,支持秒级数据导入可见,是目前国内大厂数仓 DWS/ADS 层的主流选型之一。
本文从架构原理出发,深度剖析 Doris 的每一个核心模块:FE/BE 架构、数据模型、存储引擎、查询执行、数据导入,最后给出实战使用要点。
一、整体架构:FE + BE 分离
Doris 只有两类进程,无任何外部依赖(不需要 ZooKeeper、HDFS):
graph TD
subgraph Clients[客户端]
C1["MySQL 客户端"]
C2["JDBC / ODBC"]
C3["BI 工具"]
end
subgraph FELayer[FE 层(元数据 + 查询规划)]
FEL["FE Leader 处理 DDL / 元数据写入"]
FEF1["FE Follower 参与 Leader 选举"]
FEF2["FE Follower 参与 Leader 选举"]
FEL <-->|"Paxos 同步"| FEF1
FEL <-->|"Paxos 同步"| FEF2
end
subgraph BELayer[BE 层(存储 + 计算)]
BE1["BE 节点 存储 + 向量化计算"]
BE2["BE 节点 存储 + 向量化计算"]
BE3["BE 节点 存储 + 向量化计算"]
end
C1 & C2 & C3 --> FEL
FEL -->|"查询计划下发(Fragment)"| BE1 & BE2 & BE3
BE1 <-->|"数据 Shuffle(gRPC)"| BE2
BE2 <-->|"数据 Shuffle(gRPC)"| BE3
FE(Frontend):大脑
FE 负责集群的元数据管理和 SQL 处理,分三种角色:
- Leader FE:唯一写入节点,处理所有 DDL、元数据变更,通过 BDB-JE(嵌入式 KV)存储元数据
- Follower FE:同步 Leader 的元数据,可以接收读请求,参与 Leader 选举(基于 Paxos)
- Observer FE:只同步元数据,不参与选举,用于扩展读能力
FE 的核心工作流:
SQL 输入
→ 词法/语法解析(MySQL 方言)
→ 语义分析(类型检查、权限验证)
→ 查询优化(RBO 规则优化 + CBO 代价优化)
→ 生成分布式执行计划(Fragment)
→ 下发到各 BE 执行
BE(Backend):存储与计算
BE 同时负责数据存储和查询计算,每个 BE 节点上:
- 管理若干 Tablet(数据分片),每个 Tablet 多副本分布在不同 BE
- 接收 FE 下发的查询 Fragment,在本地数据上执行向量化计算
- 通过 Exchange 算子和其他 BE 交换中间结果(Shuffle)
BE 间通信走 gRPC,BE 与 FE 通信走 Thrift。
数据分布:Partition + Tablet
graph TD
T["一张表 Table"]
T --> P1["Partition(按时间/值范围分区)"]
P1 --> B1["Bucket(Hash 分桶)"]
B1 --> Tab["Tablet(最小存储单元,默认 3 副本)"]
Tab --> RS["Rowset(一次导入产生一个 Rowset)"]
RS --> Seg["Segment 文件(列存格式)"]
建表时指定分区键和分桶数:
CREATE TABLE orders (
order_id BIGINT,
user_id INT,
order_date DATE,
amount DECIMAL(10, 2)
)
PARTITION BY RANGE(order_date) (
PARTITION p202501 VALUES LESS THAN ('2025-02-01'),
PARTITION p202502 VALUES LESS THAN ('2025-03-01')
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("replication_num" = "3");
查询时 FE 先做分区裁剪(Partition Pruning),只扫描满足条件的分区;再做分桶裁剪(Bucket Pruning),对等值条件只扫描对应的 Bucket,大幅减少扫描量。
二、四种数据模型
Doris 的数据模型决定了数据如何存储和聚合,是建表时最重要的选择。
1. Duplicate Key(明细模型)
CREATE TABLE logs (
ts DATETIME,
user_id INT,
action VARCHAR(50),
...
) DUPLICATE KEY(ts, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 16;
所有数据原样保存,允许完全重复的行。适合日志、事件流等不需要去重的场景。Key 列只用于数据排序(Sort Key),不做唯一性约束。
2. Aggregate Key(聚合模型)
CREATE TABLE user_stats (
user_id INT,
date DATE,
pv BIGINT SUM, -- 页面浏览次数,SUM 聚合
uv BIGINT REPLACE, -- 独立访客,取最新值
max_amount DECIMAL MAX -- 最大金额
) AGGREGATE KEY(user_id, date)
DISTRIBUTED BY HASH(user_id) BUCKETS 16;
相同 Key 的行在导入时自动聚合(SUM/MAX/MIN/REPLACE 等)。适合预聚合报表,查询时数据量极小,但无法还原明细。
3. Unique Key(主键模型)
CREATE TABLE users (
user_id INT,
name VARCHAR(100),
email VARCHAR(100),
updated DATETIME
) UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 16
PROPERTIES ("enable_unique_key_merge_on_write" = "true");
相同主键的行保留最新版本(REPLACE 语义),实现 Upsert。有两种实现方式:
- Merge-on-Read(MoR):写入时不合并,查询时实时合并多个版本,写快读慢
- Merge-on-Write(MoW,推荐):写入时立即合并,查询直接读最新数据,读快写稍慢,2.0+ 默认推荐
MoW 还支持行存(Row Store),为点查性能再提一个数量级:
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"store_row_column" = "true" -- 开启行存,加速点查
);
4. Primary Key(主键模型,2.0+)
Primary Key 是对 Unique Key MoW 的进一步增强,支持部分列更新(Partial Update),不需要提供所有列的值,只更新指定列:
-- 只更新 email 列,不影响其他列
INSERT INTO users (user_id, email) VALUES (1001, 'new@example.com');
适合宽表实时更新场景,如用户画像、商品属性实时同步。
三、存储引擎:Segment 列存
Segment 文件格式
Segment 文件结构:
┌─────────────────────────────────────────────┐
│ Column A Data (列数据块,按 1024 行一组) │
│ Column B Data │
│ Column C Data │
│ ... │
├─────────────────────────────────────────────┤
│ Ordinal Index (行号 → 数据块偏移) │
│ Short Key Index (前缀索引,每隔 1024 行) │
│ ZoneMap Index (每列的 min/max 值) │
│ Bloom Filter (高基数列的存在性过滤) │
│ Bitmap Index (低基数列的精确过滤) │
├─────────────────────────────────────────────┤
│ Footer(列元信息、索引偏移) │
└─────────────────────────────────────────────┘
三层索引过滤
一个查询 WHERE date = '2025-01-15' AND status = 'paid' 的过滤流程:
flowchart TD
Q["查询:WHERE date = '2025-01-15' AND status = 'paid'"]
Q --> PP["① 分区裁剪 只读 date 所在分区 跳过其他月份所有 Segment"]
PP --> ZM["② ZoneMap 过滤 每个 Segment 记录 date 列 min/max 跳过不含目标日期的 Segment"]
ZM --> SK["③ Short Key 索引 Segment 内二分查找 定位目标数据块(1024 行粒度)"]
SK --> BF["④ Bitmap / Bloom Filter 数据块内精确过滤 读取真正满足条件的行"]
BF --> Result["最终结果 实际读取行数可能仅为全表的 1/1000"]
- 分区裁剪:只读 date 所在分区,跳过其他月份的所有 Segment
- ZoneMap 过滤:每个 Segment 记录了 date 列的 min/max,跳过不含目标日期的 Segment
- Short Key 索引:在 Segment 内做二分查找,定位到包含目标行的数据块(1024 行粒度)
- Bitmap/Bloom Filter:在数据块内进一步过滤,读取真正满足条件的行
每一层都能跳过大量数据,最终读取的实际行数可能只有全表的千分之一。
Compaction:后台合并
每次导入都会产生一个新的 Rowset,随着导入次数增加,Segment 文件越来越多,查询需要合并多个版本。Doris 后台持续做 Compaction:
- Cumulative Compaction:将多个小 Rowset 合并成较大的 Rowset(增量合并)
- Base Compaction:将所有 Rowset 合并成一个 Base Rowset(全量合并,触发频率低)
Compaction 完成后,查询无需再做版本合并,性能恢复到最优状态。
四、向量化执行引擎
Doris 1.2 版本引入全面向量化执行引擎,是查询性能提升的核心。
标量执行 vs 向量化执行
标量执行(逐行处理):
for row in rows:
result = row.amount * row.quantity ← 每次处理 1 行
向量化执行(批量处理):
for batch in batches: ← 每批 1024 行
result = amount_vec * quantity_vec ← SIMD 一次处理 4/8/16 个值
向量化执行的两个核心收益:
- SIMD(Single Instruction Multiple Data):CPU 的 AVX2/AVX512 指令一次可以处理 8 个 32 位整数,相当于 8 倍吞吐量
- Cache 友好:列数据在内存中连续,批量读取的 Cache Miss 极低
Pipeline 执行模型(2.0+)
Doris 2.0 引入 Pipeline 执行模型,彻底解决了大查询阻塞小查询的问题:
传统阻塞模型:
线程1: [Scan]────────[HashJoin]──[Agg]──[结果] ← 占用线程直到完成
Pipeline 模型:
Driver1: [Scan chunk] → 放入队列
Driver2: [HashJoin chunk] ← 从队列取 → 放入队列
Driver3: [Agg chunk] ← 从队列取 → 输出
(各阶段独立调度,不阻塞,可以和其他查询的 Driver 穿插执行)
Pipeline 模型让 Doris 在高并发场景下资源利用率更高,小查询不再被大查询饿死。
五、数据导入
Doris 提供多种导入方式,适合不同的数据源和延迟要求:
Stream Load(推荐,实时导入)
# 通过 HTTP PUT 直接推送数据,原子性强,延迟秒级
curl -X PUT \
-H "label: load_20250108_001" \
-H "column_separator: ," \
-T orders.csv \
http://fe_host:8030/api/mydb/orders/_stream_load \
-u admin:password
Stream Load 的特点:同步返回结果、事务性保证(要么全部成功要么全部失败)、适合小批量高频导入(每次几 MB 到几十 MB)。
Routine Load(Kafka 消费)
-- 创建 Routine Load 任务,持续消费 Kafka
CREATE ROUTINE LOAD mydb.orders_load ON orders
COLUMNS TERMINATED BY ","
PROPERTIES (
"max_batch_interval" = "10", -- 每 10 秒提交一批
"max_batch_rows" = "200000",
"format" = "json"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "orders_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "OFFSET_BEGINNING"
);
Routine Load 在 BE 内部维护 Kafka Consumer,每隔指定时间提交一批数据,数据延迟通常在 10-30 秒内,无需外部调度。
Broker Load(批量离线导入)
-- 从 HDFS 批量导入
LOAD LABEL mydb.load_20250108 (
DATA INFILE("hdfs://namenode/data/orders/2025-01-08/*.parquet")
INTO TABLE orders
FORMAT AS "parquet"
(order_id, user_id, order_date, amount)
)
WITH BROKER hdfs_broker
("username" = "hdfs", "password" = "");
-- 查看导入进度
SHOW LOAD WHERE LABEL = "load_20250108";
INSERT INTO SELECT(跨表导入/ETL)
-- 从 ODS 层聚合写入 DWS 层
INSERT INTO dws_user_daily_stats
SELECT
user_id,
DATE(order_date) AS date,
COUNT(*) AS order_cnt,
SUM(amount) AS total_amount
FROM ods_orders
WHERE order_date >= '2025-01-08'
GROUP BY user_id, DATE(order_date);
导入方式对比
| 方式 | 数据源 | 延迟 | 吞吐量 | 适用场景 |
|---|---|---|---|---|
| Stream Load | HTTP 推送 | 秒级 | 中 | 微服务直写、Flink sink |
| Routine Load | Kafka | 10-30 秒 | 中高 | 实时流数据消费 |
| Broker Load | HDFS/S3 | 分钟级 | 高 | T+1 离线批量导入 |
| INSERT INTO | Doris 内部 | 秒-分钟 | 中 | 内部 ETL、数仓分层 |
六、查询优化关键机制
Colocate Join:零 Shuffle 的 JOIN
普通分布式 JOIN 需要将数据 Shuffle 到同一节点才能 JOIN,网络开销大。Colocate Join 让两张表按相同的分桶键和分桶数分布,JOIN 时直接在本地数据上计算,完全避免网络传输:
-- 两张表使用相同的 Colocate Group
CREATE TABLE orders (
user_id INT,
...
) DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");
CREATE TABLE users (
user_id INT,
...
) DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");
-- 查询时自动走 Colocate Join,无 Shuffle
SELECT u.name, COUNT(*) as order_cnt
FROM orders o JOIN users u ON o.user_id = u.user_id
GROUP BY u.name;
graph LR
subgraph NormalJoin[普通分布式 JOIN(需要 Shuffle)]
O1["orders BE1: user_id=1,2"]
O2["orders BE2: user_id=3,4"]
U1["users BE1: user_id=3,4"]
U2["users BE2: user_id=1,2"]
O1 -->|"网络 Shuffle"| JOIN1["JOIN 节点"]
O2 -->|"网络 Shuffle"| JOIN1
U1 -->|"网络 Shuffle"| JOIN1
U2 -->|"网络 Shuffle"| JOIN1
end
subgraph ColocateJoin[Colocate JOIN(零 Shuffle)]
CO1["orders+users BE1: user_id=1,2 本地直接 JOIN"]
CO2["orders+users BE2: user_id=3,4 本地直接 JOIN"]
end
Colocate Join 的前提:两表分桶键相同、分桶数相同、副本数相同,且在同一个 Colocate Group 里。
物化视图(Materialized View)
Doris 支持同步物化视图和异步物化视图:
-- 同步物化视图:自动维护,查询自动路由
CREATE MATERIALIZED VIEW mv_user_daily
AS SELECT user_id, DATE(order_date), SUM(amount), COUNT(*)
FROM orders
GROUP BY user_id, DATE(order_date);
-- 查询 orders 时,如果命中物化视图,自动改写为查询 mv_user_daily
-- 无需修改查询 SQL,透明加速
SQL Cache 与 Partition Cache
- SQL Cache:完全相同的 SQL + 参数,直接返回缓存结果,适合固定报表
- Partition Cache:缓存已有分区的结果,只对新增分区重新计算,适合时间范围查询
-- 开启 SQL Cache
SET enable_sql_cache = true;
-- 查询时间范围报表,历史分区走缓存,只计算最新分区
SELECT date, SUM(amount)
FROM orders
WHERE order_date BETWEEN '2025-01-01' AND '2025-01-08'
GROUP BY date;
Runtime Filter
大表 JOIN 小表时,Doris 会先扫描小表,构建一个 Bloom Filter,然后下推到大表的扫描层,在读取数据时就过滤掉不可能匹配的行:
SELECT * FROM big_table b JOIN small_table s ON b.id = s.id
WHERE s.type = 'VIP'
执行流程:
1. 扫描 small_table,过滤 type='VIP',得到 id 集合 {1, 5, 9, ...}
2. 构建 Bloom Filter,下推到 big_table 的 scan 阶段
3. big_table 在读取时,每行先过 Bloom Filter,不在集合里直接跳过
4. 大幅减少 big_table 实际读取的行数
七、资源隔离:Workload Group
高并发场景下,不同业务的查询需要隔离,防止一个大查询拖垮所有人:
-- 创建 Workload Group,限制 CPU 和内存
CREATE WORKLOAD GROUP report_group
PROPERTIES (
"cpu_share" = "10", -- CPU 权重
"memory_limit" = "30%", -- 最多使用 30% 内存
"max_concurrency" = "20", -- 最大并发查询数
"max_queue_size" = "100", -- 排队上限
"queue_timeout" = "300" -- 排队超时秒数
);
-- 将用户绑定到 Workload Group
ALTER USER report_user DEFAULT WORKLOAD GROUP "report_group";
Workload Group 让不同优先级的业务(实时大屏 vs 离线报表 vs 临时分析)共享集群资源的同时互不影响。
八、Multi-Catalog:联邦查询
Doris 2.0+ 引入 Multi-Catalog,可以直接查询外部数据源,无需导入:
-- 创建 Hive Catalog
CREATE CATALOG hive_catalog PROPERTIES (
"type" = "hms",
"hive.metastore.uris" = "thrift://hms-host:9083"
);
-- 直接查询 Hive 表
SELECT * FROM hive_catalog.dwd.orders LIMIT 10;
-- 跨 Catalog JOIN(Hive 大表 + Doris 维度表)
SELECT h.order_id, d.user_name
FROM hive_catalog.dwd.orders h
JOIN doris_internal.dim.users d ON h.user_id = d.user_id
WHERE h.order_date = '2025-01-08';
支持的外部 Catalog:Hive、Iceberg、Hudi、Delta Lake、MySQL、PostgreSQL、Elasticsearch、S3/HDFS 上的 Parquet/ORC 文件。
九、实战建表要点
数据模型选择
| 场景 | 推荐模型 | 原因 |
|---|---|---|
| 日志、埋点、事件流 | Duplicate Key | 保留所有明细,不去重 |
| 预聚合报表(UV/PV/金额) | Aggregate Key | 导入时预聚合,查询极快 |
| 业务实体(用户、订单状态) | Unique Key MoW | 支持 Upsert,保证唯一性 |
| 宽表实时更新(画像标签) | Primary Key | 支持部分列更新 |
分桶数选择
推荐:每个 Tablet 的数据量控制在 1-10 GB
公式:分桶数 = 总数据量 / (3GB * BE节点数) 取整,再向上取 2 的幂次
示例:100GB 数据,3 个 BE 节点
分桶数 ≈ 100 / (3 * 3) ≈ 11,向上取 16
常见坑
- 分桶键选错:分桶键应选择查询里最常用于 JOIN 或 WHERE 等值过滤的列,而不是时间列(时间列适合做分区键)
- 分桶数过少:导致单个 Tablet 过大,查询时并行度不足,性能差
- Aggregate 模型误用:Aggregate 模型的非 Key 列必须指定聚合函数,不能存原始明细,如果既要聚合又要明细,应用 Duplicate Key + 物化视图
- Unique Key 忘开 MoW:默认的 MoR 模式在读多写少场景性能差,生产建议加
"enable_unique_key_merge_on_write" = "true" - 导入频率过高:每次导入产生一个 Rowset,频繁小批量导入(<1 秒一次)会导致 Compaction 跟不上,查询性能下降,建议 Stream Load 批间隔 ≥ 10 秒
十、关键点总结
- FE/BE 分离架构:FE 管元数据和查询计划,BE 管存储和计算,无外部依赖,运维简单
- 四种数据模型:Duplicate(明细)/ Aggregate(预聚合)/ Unique Key(Upsert)/ Primary Key(部分列更新),根据业务特性选择
- Segment 列存 + 三层索引:ZoneMap → Short Key → Bitmap/Bloom Filter,层层过滤,极致减少 I/O
- 向量化 + Pipeline:批量 SIMD 计算 + 非阻塞调度,高并发下吞吐量和延迟兼顾
- Routine Load:BE 内置 Kafka Consumer,秒级到分钟级实时导入,无需外部调度
- Colocate Join:同 Colocate Group 的表 JOIN 零 Shuffle,是 Doris 多表查询性能超越 ClickHouse 的核心武器
- Workload Group:CPU/内存隔离,高优先级业务不被大查询拖垮
- Multi-Catalog:直接查 Hive/Iceberg/MySQL,打通数仓上下游,无需数据迁移