Apache Doris 深度解析

Apache Doris(原百度 Palo)是一个基于 MPP 架构的实时分析型数据库,专为高并发、低延迟的 OLAP 查询场景设计。它兼容 MySQL 协议,无外部依赖,支持秒级数据导入可见,是目前国内大厂数仓 DWS/ADS 层的主流选型之一。

本文从架构原理出发,深度剖析 Doris 的每一个核心模块:FE/BE 架构、数据模型、存储引擎、查询执行、数据导入,最后给出实战使用要点。

一、整体架构:FE + BE 分离

Doris 只有两类进程,无任何外部依赖(不需要 ZooKeeper、HDFS):

graph TD
    subgraph Clients[客户端]
        C1["MySQL 客户端"]
        C2["JDBC / ODBC"]
        C3["BI 工具"]
    end
    subgraph FELayer[FE 层(元数据 + 查询规划)]
        FEL["FE Leader 处理 DDL / 元数据写入"]
        FEF1["FE Follower 参与 Leader 选举"]
        FEF2["FE Follower 参与 Leader 选举"]
        FEL <-->|"Paxos 同步"| FEF1
        FEL <-->|"Paxos 同步"| FEF2
    end
    subgraph BELayer[BE 层(存储 + 计算)]
        BE1["BE 节点 存储 + 向量化计算"]
        BE2["BE 节点 存储 + 向量化计算"]
        BE3["BE 节点 存储 + 向量化计算"]
    end
    C1 & C2 & C3 --> FEL
    FEL -->|"查询计划下发(Fragment)"| BE1 & BE2 & BE3
    BE1 <-->|"数据 Shuffle(gRPC)"| BE2
    BE2 <-->|"数据 Shuffle(gRPC)"| BE3

FE(Frontend):大脑

FE 负责集群的元数据管理和 SQL 处理,分三种角色:

  • Leader FE:唯一写入节点,处理所有 DDL、元数据变更,通过 BDB-JE(嵌入式 KV)存储元数据
  • Follower FE:同步 Leader 的元数据,可以接收读请求,参与 Leader 选举(基于 Paxos)
  • Observer FE:只同步元数据,不参与选举,用于扩展读能力

FE 的核心工作流:

SQL 输入
  → 词法/语法解析(MySQL 方言)
  → 语义分析(类型检查、权限验证)
  → 查询优化(RBO 规则优化 + CBO 代价优化)
  → 生成分布式执行计划(Fragment)
  → 下发到各 BE 执行

BE(Backend):存储与计算

BE 同时负责数据存储和查询计算,每个 BE 节点上:

  • 管理若干 Tablet(数据分片),每个 Tablet 多副本分布在不同 BE
  • 接收 FE 下发的查询 Fragment,在本地数据上执行向量化计算
  • 通过 Exchange 算子和其他 BE 交换中间结果(Shuffle)

BE 间通信走 gRPC,BE 与 FE 通信走 Thrift。

数据分布:Partition + Tablet

graph TD
    T["一张表 Table"]
    T --> P1["Partition(按时间/值范围分区)"]
    P1 --> B1["Bucket(Hash 分桶)"]
    B1 --> Tab["Tablet(最小存储单元,默认 3 副本)"]
    Tab --> RS["Rowset(一次导入产生一个 Rowset)"]
    RS --> Seg["Segment 文件(列存格式)"]

建表时指定分区键和分桶数:

CREATE TABLE orders (
    order_id    BIGINT,
    user_id     INT,
    order_date  DATE,
    amount      DECIMAL(10, 2)
)
PARTITION BY RANGE(order_date) (
    PARTITION p202501 VALUES LESS THAN ('2025-02-01'),
    PARTITION p202502 VALUES LESS THAN ('2025-03-01')
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("replication_num" = "3");

查询时 FE 先做分区裁剪(Partition Pruning),只扫描满足条件的分区;再做分桶裁剪(Bucket Pruning),对等值条件只扫描对应的 Bucket,大幅减少扫描量。

二、四种数据模型

Doris 的数据模型决定了数据如何存储和聚合,是建表时最重要的选择。

1. Duplicate Key(明细模型)

CREATE TABLE logs (
    ts      DATETIME,
    user_id INT,
    action  VARCHAR(50),
    ...
) DUPLICATE KEY(ts, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 16;

所有数据原样保存,允许完全重复的行。适合日志、事件流等不需要去重的场景。Key 列只用于数据排序(Sort Key),不做唯一性约束。

2. Aggregate Key(聚合模型)

CREATE TABLE user_stats (
    user_id     INT,
    date        DATE,
    pv          BIGINT SUM,      -- 页面浏览次数,SUM 聚合
    uv          BIGINT REPLACE,  -- 独立访客,取最新值
    max_amount  DECIMAL MAX      -- 最大金额
) AGGREGATE KEY(user_id, date)
DISTRIBUTED BY HASH(user_id) BUCKETS 16;

相同 Key 的行在导入时自动聚合(SUM/MAX/MIN/REPLACE 等)。适合预聚合报表,查询时数据量极小,但无法还原明细。

3. Unique Key(主键模型)

CREATE TABLE users (
    user_id   INT,
    name      VARCHAR(100),
    email     VARCHAR(100),
    updated   DATETIME
) UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 16
PROPERTIES ("enable_unique_key_merge_on_write" = "true");

相同主键的行保留最新版本(REPLACE 语义),实现 Upsert。有两种实现方式:

  • Merge-on-Read(MoR):写入时不合并,查询时实时合并多个版本,写快读慢
  • Merge-on-Write(MoW,推荐):写入时立即合并,查询直接读最新数据,读快写稍慢,2.0+ 默认推荐

MoW 还支持行存(Row Store),为点查性能再提一个数量级:

PROPERTIES (
    "enable_unique_key_merge_on_write" = "true",
    "store_row_column" = "true"  -- 开启行存,加速点查
);

4. Primary Key(主键模型,2.0+)

Primary Key 是对 Unique Key MoW 的进一步增强,支持部分列更新(Partial Update),不需要提供所有列的值,只更新指定列:

-- 只更新 email 列,不影响其他列
INSERT INTO users (user_id, email) VALUES (1001, 'new@example.com');

适合宽表实时更新场景,如用户画像、商品属性实时同步。

三、存储引擎:Segment 列存

Segment 文件格式

Segment 文件结构:
┌─────────────────────────────────────────────┐
│  Column A Data (列数据块,按 1024 行一组)   │
│  Column B Data                               │
│  Column C Data                               │
│  ...                                         │
├─────────────────────────────────────────────┤
│  Ordinal Index   (行号 → 数据块偏移)        │
│  Short Key Index (前缀索引,每隔 1024 行)   │
│  ZoneMap Index   (每列的 min/max 值)        │
│  Bloom Filter    (高基数列的存在性过滤)     │
│  Bitmap Index    (低基数列的精确过滤)       │
├─────────────────────────────────────────────┤
│  Footer(列元信息、索引偏移)                 │
└─────────────────────────────────────────────┘

三层索引过滤

一个查询 WHERE date = '2025-01-15' AND status = 'paid' 的过滤流程:

flowchart TD
    Q["查询:WHERE date = '2025-01-15' AND status = 'paid'"]
    Q --> PP["① 分区裁剪 只读 date 所在分区 跳过其他月份所有 Segment"]
    PP --> ZM["② ZoneMap 过滤 每个 Segment 记录 date 列 min/max 跳过不含目标日期的 Segment"]
    ZM --> SK["③ Short Key 索引 Segment 内二分查找 定位目标数据块(1024 行粒度)"]
    SK --> BF["④ Bitmap / Bloom Filter 数据块内精确过滤 读取真正满足条件的行"]
    BF --> Result["最终结果 实际读取行数可能仅为全表的 1/1000"]
  1. 分区裁剪:只读 date 所在分区,跳过其他月份的所有 Segment
  2. ZoneMap 过滤:每个 Segment 记录了 date 列的 min/max,跳过不含目标日期的 Segment
  3. Short Key 索引:在 Segment 内做二分查找,定位到包含目标行的数据块(1024 行粒度)
  4. Bitmap/Bloom Filter:在数据块内进一步过滤,读取真正满足条件的行

每一层都能跳过大量数据,最终读取的实际行数可能只有全表的千分之一。

Compaction:后台合并

每次导入都会产生一个新的 Rowset,随着导入次数增加,Segment 文件越来越多,查询需要合并多个版本。Doris 后台持续做 Compaction:

  • Cumulative Compaction:将多个小 Rowset 合并成较大的 Rowset(增量合并)
  • Base Compaction:将所有 Rowset 合并成一个 Base Rowset(全量合并,触发频率低)

Compaction 完成后,查询无需再做版本合并,性能恢复到最优状态。

四、向量化执行引擎

Doris 1.2 版本引入全面向量化执行引擎,是查询性能提升的核心。

标量执行 vs 向量化执行

标量执行(逐行处理):
  for row in rows:
      result = row.amount * row.quantity   ← 每次处理 1 行

向量化执行(批量处理):
  for batch in batches:   ← 每批 1024 行
      result = amount_vec * quantity_vec   ← SIMD 一次处理 4/8/16 个值

向量化执行的两个核心收益:

  1. SIMD(Single Instruction Multiple Data):CPU 的 AVX2/AVX512 指令一次可以处理 8 个 32 位整数,相当于 8 倍吞吐量
  2. Cache 友好:列数据在内存中连续,批量读取的 Cache Miss 极低

Pipeline 执行模型(2.0+)

Doris 2.0 引入 Pipeline 执行模型,彻底解决了大查询阻塞小查询的问题:

传统阻塞模型:
  线程1: [Scan]────────[HashJoin]──[Agg]──[结果]   ← 占用线程直到完成

Pipeline 模型:
  Driver1: [Scan chunk] → 放入队列
  Driver2: [HashJoin chunk] ← 从队列取 → 放入队列
  Driver3: [Agg chunk] ← 从队列取 → 输出
  (各阶段独立调度,不阻塞,可以和其他查询的 Driver 穿插执行)

Pipeline 模型让 Doris 在高并发场景下资源利用率更高,小查询不再被大查询饿死。

五、数据导入

Doris 提供多种导入方式,适合不同的数据源和延迟要求:

Stream Load(推荐,实时导入)

# 通过 HTTP PUT 直接推送数据,原子性强,延迟秒级
curl -X PUT \
  -H "label: load_20250108_001" \
  -H "column_separator: ," \
  -T orders.csv \
  http://fe_host:8030/api/mydb/orders/_stream_load \
  -u admin:password

Stream Load 的特点:同步返回结果、事务性保证(要么全部成功要么全部失败)、适合小批量高频导入(每次几 MB 到几十 MB)。

Routine Load(Kafka 消费)

-- 创建 Routine Load 任务,持续消费 Kafka
CREATE ROUTINE LOAD mydb.orders_load ON orders
COLUMNS TERMINATED BY ","
PROPERTIES (
    "max_batch_interval" = "10",    -- 每 10 秒提交一批
    "max_batch_rows" = "200000",
    "format" = "json"
)
FROM KAFKA (
    "kafka_broker_list" = "broker1:9092,broker2:9092",
    "kafka_topic" = "orders_topic",
    "kafka_partitions" = "0,1,2,3",
    "kafka_offsets" = "OFFSET_BEGINNING"
);

Routine Load 在 BE 内部维护 Kafka Consumer,每隔指定时间提交一批数据,数据延迟通常在 10-30 秒内,无需外部调度。

Broker Load(批量离线导入)

-- 从 HDFS 批量导入
LOAD LABEL mydb.load_20250108 (
    DATA INFILE("hdfs://namenode/data/orders/2025-01-08/*.parquet")
    INTO TABLE orders
    FORMAT AS "parquet"
    (order_id, user_id, order_date, amount)
)
WITH BROKER hdfs_broker
("username" = "hdfs", "password" = "");

-- 查看导入进度
SHOW LOAD WHERE LABEL = "load_20250108";

INSERT INTO SELECT(跨表导入/ETL)

-- 从 ODS 层聚合写入 DWS 层
INSERT INTO dws_user_daily_stats
SELECT
    user_id,
    DATE(order_date) AS date,
    COUNT(*)          AS order_cnt,
    SUM(amount)       AS total_amount
FROM ods_orders
WHERE order_date >= '2025-01-08'
GROUP BY user_id, DATE(order_date);

导入方式对比

方式数据源延迟吞吐量适用场景
Stream LoadHTTP 推送秒级微服务直写、Flink sink
Routine LoadKafka10-30 秒中高实时流数据消费
Broker LoadHDFS/S3分钟级T+1 离线批量导入
INSERT INTODoris 内部秒-分钟内部 ETL、数仓分层

六、查询优化关键机制

Colocate Join:零 Shuffle 的 JOIN

普通分布式 JOIN 需要将数据 Shuffle 到同一节点才能 JOIN,网络开销大。Colocate Join 让两张表按相同的分桶键和分桶数分布,JOIN 时直接在本地数据上计算,完全避免网络传输:

-- 两张表使用相同的 Colocate Group
CREATE TABLE orders (
    user_id INT,
    ...
) DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");

CREATE TABLE users (
    user_id INT,
    ...
) DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");

-- 查询时自动走 Colocate Join,无 Shuffle
SELECT u.name, COUNT(*) as order_cnt
FROM orders o JOIN users u ON o.user_id = u.user_id
GROUP BY u.name;
graph LR
    subgraph NormalJoin[普通分布式 JOIN(需要 Shuffle)]
        O1["orders BE1: user_id=1,2"]
        O2["orders BE2: user_id=3,4"]
        U1["users BE1: user_id=3,4"]
        U2["users BE2: user_id=1,2"]
        O1 -->|"网络 Shuffle"| JOIN1["JOIN 节点"]
        O2 -->|"网络 Shuffle"| JOIN1
        U1 -->|"网络 Shuffle"| JOIN1
        U2 -->|"网络 Shuffle"| JOIN1
    end
    subgraph ColocateJoin[Colocate JOIN(零 Shuffle)]
        CO1["orders+users BE1: user_id=1,2 本地直接 JOIN"]
        CO2["orders+users BE2: user_id=3,4 本地直接 JOIN"]
    end

Colocate Join 的前提:两表分桶键相同、分桶数相同、副本数相同,且在同一个 Colocate Group 里。

物化视图(Materialized View)

Doris 支持同步物化视图和异步物化视图:

-- 同步物化视图:自动维护,查询自动路由
CREATE MATERIALIZED VIEW mv_user_daily
AS SELECT user_id, DATE(order_date), SUM(amount), COUNT(*)
FROM orders
GROUP BY user_id, DATE(order_date);

-- 查询 orders 时,如果命中物化视图,自动改写为查询 mv_user_daily
-- 无需修改查询 SQL,透明加速

SQL Cache 与 Partition Cache

  • SQL Cache:完全相同的 SQL + 参数,直接返回缓存结果,适合固定报表
  • Partition Cache:缓存已有分区的结果,只对新增分区重新计算,适合时间范围查询
-- 开启 SQL Cache
SET enable_sql_cache = true;

-- 查询时间范围报表,历史分区走缓存,只计算最新分区
SELECT date, SUM(amount)
FROM orders
WHERE order_date BETWEEN '2025-01-01' AND '2025-01-08'
GROUP BY date;

Runtime Filter

大表 JOIN 小表时,Doris 会先扫描小表,构建一个 Bloom Filter,然后下推到大表的扫描层,在读取数据时就过滤掉不可能匹配的行:

SELECT * FROM big_table b JOIN small_table s ON b.id = s.id
WHERE s.type = 'VIP'

执行流程:
1. 扫描 small_table,过滤 type='VIP',得到 id 集合 {1, 5, 9, ...}
2. 构建 Bloom Filter,下推到 big_table 的 scan 阶段
3. big_table 在读取时,每行先过 Bloom Filter,不在集合里直接跳过
4. 大幅减少 big_table 实际读取的行数

七、资源隔离:Workload Group

高并发场景下,不同业务的查询需要隔离,防止一个大查询拖垮所有人:

-- 创建 Workload Group,限制 CPU 和内存
CREATE WORKLOAD GROUP report_group
PROPERTIES (
    "cpu_share"          = "10",    -- CPU 权重
    "memory_limit"       = "30%",   -- 最多使用 30% 内存
    "max_concurrency"    = "20",    -- 最大并发查询数
    "max_queue_size"     = "100",   -- 排队上限
    "queue_timeout"      = "300"    -- 排队超时秒数
);

-- 将用户绑定到 Workload Group
ALTER USER report_user DEFAULT WORKLOAD GROUP "report_group";

Workload Group 让不同优先级的业务(实时大屏 vs 离线报表 vs 临时分析)共享集群资源的同时互不影响。

八、Multi-Catalog:联邦查询

Doris 2.0+ 引入 Multi-Catalog,可以直接查询外部数据源,无需导入:

-- 创建 Hive Catalog
CREATE CATALOG hive_catalog PROPERTIES (
    "type" = "hms",
    "hive.metastore.uris" = "thrift://hms-host:9083"
);

-- 直接查询 Hive 表
SELECT * FROM hive_catalog.dwd.orders LIMIT 10;

-- 跨 Catalog JOIN(Hive 大表 + Doris 维度表)
SELECT h.order_id, d.user_name
FROM hive_catalog.dwd.orders h
JOIN doris_internal.dim.users d ON h.user_id = d.user_id
WHERE h.order_date = '2025-01-08';

支持的外部 Catalog:Hive、Iceberg、Hudi、Delta Lake、MySQL、PostgreSQL、Elasticsearch、S3/HDFS 上的 Parquet/ORC 文件。

九、实战建表要点

数据模型选择

场景推荐模型原因
日志、埋点、事件流Duplicate Key保留所有明细,不去重
预聚合报表(UV/PV/金额)Aggregate Key导入时预聚合,查询极快
业务实体(用户、订单状态)Unique Key MoW支持 Upsert,保证唯一性
宽表实时更新(画像标签)Primary Key支持部分列更新

分桶数选择

推荐:每个 Tablet 的数据量控制在 1-10 GB
公式:分桶数 = 总数据量 / (3GB * BE节点数) 取整,再向上取 2 的幂次

示例:100GB 数据,3 个 BE 节点
  分桶数 ≈ 100 / (3 * 3) ≈ 11,向上取 16

常见坑

  • 分桶键选错:分桶键应选择查询里最常用于 JOIN 或 WHERE 等值过滤的列,而不是时间列(时间列适合做分区键)
  • 分桶数过少:导致单个 Tablet 过大,查询时并行度不足,性能差
  • Aggregate 模型误用:Aggregate 模型的非 Key 列必须指定聚合函数,不能存原始明细,如果既要聚合又要明细,应用 Duplicate Key + 物化视图
  • Unique Key 忘开 MoW:默认的 MoR 模式在读多写少场景性能差,生产建议加 "enable_unique_key_merge_on_write" = "true"
  • 导入频率过高:每次导入产生一个 Rowset,频繁小批量导入(<1 秒一次)会导致 Compaction 跟不上,查询性能下降,建议 Stream Load 批间隔 ≥ 10 秒

十、关键点总结

  • FE/BE 分离架构:FE 管元数据和查询计划,BE 管存储和计算,无外部依赖,运维简单
  • 四种数据模型:Duplicate(明细)/ Aggregate(预聚合)/ Unique Key(Upsert)/ Primary Key(部分列更新),根据业务特性选择
  • Segment 列存 + 三层索引:ZoneMap → Short Key → Bitmap/Bloom Filter,层层过滤,极致减少 I/O
  • 向量化 + Pipeline:批量 SIMD 计算 + 非阻塞调度,高并发下吞吐量和延迟兼顾
  • Routine Load:BE 内置 Kafka Consumer,秒级到分钟级实时导入,无需外部调度
  • Colocate Join:同 Colocate Group 的表 JOIN 零 Shuffle,是 Doris 多表查询性能超越 ClickHouse 的核心武器
  • Workload Group:CPU/内存隔离,高优先级业务不被大查询拖垮
  • Multi-Catalog:直接查 Hive/Iceberg/MySQL,打通数仓上下游,无需数据迁移