手写 SQLite 09:索引 B-Tree 与加速查询

这是《手写 SQLite》系列的最终篇。回顾一下我们走过的路:从第 01 篇读取 100 字节的文件头,到第 02 篇解析页结构,第 03 篇遍历 B-Tree,第 04 篇解码 varint 与 Record,第 05 篇读取 sqlite_schema,第 06 篇输出带列名的 SELECT *,第 07 篇实现 SELECT count(*),第 08 篇支持 WHERE 条件过滤——现在我们为这个系列画上圆满的句号:读取索引 B-Tree,利用索引加速 WHERE 查询

经过本篇,当用户对已建立索引的列执行 WHERE 查询时,我们的 SQLite 读取器将通过索引 B-Tree 获取匹配行的 rowid,再凭 rowid 直接到表 B-Tree 中取行,而不是逐行解码全部数据。

一、索引 B-Tree 结构 vs 表 B-Tree

SQLite 有两种 B-Tree:表 B-Tree(Table B-Tree)索引 B-Tree(Index B-Tree),它们的叶子节点 Cell 格式不同。

表 B-Tree 叶子 Cell 格式

[ payload_size (varint) ][ rowid (varint) ][ payload (Record) ]
                                              ↑ 存放实际列数据

rowid 作为 Cell 元数据单独存储,Record 里不包含 rowid(INTEGER PRIMARY KEY 列在 Record 中的 Serial Type 为 0)。

索引 B-Tree 叶子 Cell 格式

[ payload_size (varint) ][ payload (Record) ]
                           ↑ Record 包含: 索引列值 + rowid(最后一列)

索引 B-Tree 没有单独的 rowid 字段——rowid 被打包进 Record 的最后一列。这是两种 B-Tree 最关键的区别。

结构对比

表 B-Tree 叶子 Cell:
  ┌──────────────┬────────┬──────────────────────────────────┐
  │ payload_size │ rowid  │ Record: [NULL, "Alice", 30]      │
  │  (varint)    │(varint)│  (id 列为 NULL,用 rowid 填充)   │
  └──────────────┴────────┴──────────────────────────────────┘

索引 B-Tree 叶子 Cell(对 name 列建索引):
  ┌──────────────┬──────────────────────────────────────────┐
  │ payload_size │ Record: ["Alice", 1]                     │
  │  (varint)    │  (索引列值 + rowid 作为最后一列)         │
  └──────────────┴──────────────────────────────────────────┘

索引 B-Tree 按索引列的值排序,因此可以对索引列做二分查找(B-Tree 搜索),找到目标值后从 Record 的最后一列取出 rowid,再回到表 B-Tree 中定位完整行。

二、SQLite 如何使用索引

当执行如下语句时:

CREATE INDEX idx_name ON users(name);

SQLite 在 sqlite_schema 中新增一条记录:

type      = 'index'
name      = 'idx_name'
tbl_name  = 'users'
rootpage  = N        ← 索引 B-Tree 的根页号
sql       = 'CREATE INDEX idx_name ON users(name)'

索引 B-Tree 的根页与表 B-Tree 的根页完全独立,各自占用不同的页号。索引 B-Tree 按 name 列的值排序存储,每条记录形如 ["Alice", 1]["Bob", 2]["Charlie", 3]

查询 SELECT * FROM users WHERE name = 'Alice' 的执行流程:

  1. sqlite_schema 中查找 tbl_name='users'type='index' 的条目,找到索引根页
  2. 遍历索引 B-Tree,找到 name='Alice' 的条目,从 Record 最后一列取出 rowid(如 1)
  3. 遍历表 B-Tree,找到 rowid=1 的 Cell,解码完整行数据并返回

步骤 2 跳过了对整张表所有行的 Record 解码,步骤 3 只解码一行——这就是索引加速的本质。

三、读取索引条目

查找索引根页

// src/index.rs(新建)

use crate::pager::Pager;
use crate::schema::read_schema;

/// 在 sqlite_schema 中查找指定表上的索引根页号
/// 如果存在多个索引,返回第一个匹配的
pub fn find_index_root_page(
    pager: &mut Pager,
    table_name: &str,
    index_col: Option<&str>,  // 若为 None 则返回该表任意索引
) -> Option<u32> {
    let entries = read_schema(pager);
    for entry in &entries {
        if entry.object_type != "index" {
            continue;
        }
        if entry.tbl_name.as_deref() != Some(table_name) {
            continue;
        }
        // 如果调用方指定了列名,检查 sql 中是否包含该列
        if let Some(col) = index_col {
            let sql_upper = entry.sql.as_deref().unwrap_or("").to_uppercase();
            let col_upper = col.to_uppercase();
            if !sql_upper.contains(&col_upper) {
                continue;
            }
        }
        return Some(entry.root_page);
    }
    None
}

解析索引叶子 Cell

// src/index.rs(续)

use crate::varint::read_varint;
use crate::record::{parse_record, Value};

/// 解析索引 B-Tree 叶子 Cell
/// 返回 (key_values, rowid)
/// key_values: 索引列的值(可能是多列索引)
/// rowid: Record 最后一列即为 rowid
pub fn parse_index_leaf_cell(data: &[u8], offset: usize) -> (Vec<Value>, i64) {
    let mut pos = offset;

    // 1. 读取 payload_size(varint)
    let (payload_size, n) = read_varint(&data[pos..]);
    pos += n;
    let _ = payload_size; // 本实现直接解析,不做溢出页处理

    // 2. 索引叶子 Cell 没有单独的 rowid 字段,直接是 payload(Record)
    let record_data = &data[pos..];
    let mut values = parse_record(record_data);

    // 3. Record 最后一列是 rowid
    let rowid = if let Some(last) = values.pop() {
        match last {
            Value::Integer(n) => n,
            _ => 0,
        }
    } else {
        0
    };

    (values, rowid)
}

注意 parse_record 是我们在第 04 篇实现的函数,直接复用即可。唯一的差异是:索引 Cell 比表 Cell 少了一个前置 rowid varint,其余解析逻辑完全相同。

四、全量索引扫描实现

我们先实现最简单的形式:遍历整棵索引 B-Tree,收集所有 (key_value, rowid) 对,然后在内存中过滤。虽然这不是真正的 B-Tree 查找(未实现二分定位),但相比全表扫描,省去了对每行完整 Record 的解码,对宽表(列多、行大)有明显加速。

// src/index.rs(续)

use crate::btree::collect_leaf_cells;

/// 扫描索引 B-Tree,返回所有 (第一个索引列值, rowid) 对
/// 多列索引时 key_values 包含多个值;此处只返回第一列用于过滤
pub fn scan_index(
    pager: &mut Pager,
    index_root_page: u32,
) -> Vec<(Value, i64)> {
    let cells = collect_leaf_cells(pager, index_root_page);
    let mut result = Vec::new();

    for cell_addr in &cells {
        let page_data = match pager.read_page(cell_addr.page_num) {
            Ok(d) => d,
            Err(_) => continue,
        };
        let (mut key_values, rowid) =
            parse_index_leaf_cell(&page_data, cell_addr.offset as usize);

        // 取第一列作为 key
        let key = if key_values.is_empty() {
            Value::Null
        } else {
            key_values.remove(0)
        };
        result.push((key, rowid));
    }
    result
}

collect_leaf_cells 是第 03 篇实现的通用 B-Tree 叶子遍历器,对表 B-Tree 和索引 B-Tree 都适用——因为它只关心页类型(0x0D = 叶子)和 Cell 偏移表,不关心 Cell 内容。

五、按 rowid 查找行

拿到 rowid 之后,需要在表 B-Tree 中定位对应的完整行。标准 SQLite 的表 B-Tree 是按 rowid 有序的 B+ 树,可以做 O(log N) 的二分查找。我们的简化实现做线性扫描——遍历所有叶子 Cell,比较 rowid,找到即返回。

// src/sql.rs(新增函数)

use crate::btree::collect_leaf_cells;
use crate::pager::Pager;
use crate::record::{parse_leaf_cell, Value};
use crate::schema::ColumnDef;

/// 在表 B-Tree 中按 rowid 查找指定行
/// 返回解码后的列值列表(已按 col_defs 填充 rowid 别名列)
pub fn lookup_by_rowid(
    pager: &mut Pager,
    table_root_page: u32,
    target_rowid: i64,
    col_defs: &[ColumnDef],
) -> Option<Vec<String>> {
    let cells = collect_leaf_cells(pager, table_root_page);

    for cell_addr in &cells {
        let page_data = pager.read_page(cell_addr.page_num).ok()?;
        let (rowid, values) = parse_leaf_cell(&page_data, cell_addr.offset as usize);

        if rowid != target_rowid {
            continue;
        }

        // rowid 匹配,按列定义组装行
        let mut row = Vec::new();
        let mut val_idx = 0usize;

        for col_def in col_defs {
            if col_def.is_rowid_alias {
                row.push(rowid.to_string());
            } else {
                let v = values.get(val_idx)
                    .map(value_to_string)
                    .unwrap_or_else(|| "NULL".to_string());
                row.push(v);
                val_idx += 1;
            }
        }
        return Some(row);
    }
    None
}

fn value_to_string(v: &Value) -> String {
    match v {
        Value::Null       => "NULL".to_string(),
        Value::Integer(n) => n.to_string(),
        Value::Float(f)   => {
            if *f == f.trunc() && f.abs() < 1e15 {
                format!("{}", *f as i64)
            } else {
                format!("{}", f)
            }
        }
        Value::Text(s)    => s.clone(),
        Value::Blob(b)    => format!("<blob {}>", b.len()),
    }
}

这里的 parse_leaf_cellvalue_to_stringColumnDef 均来自前几篇的实现,本篇无需修改它们。

六、整合:索引辅助的 WHERE 查询

现在把所有部件组装起来。策略很简单:

  1. 检查 WHERE 列是否有对应的索引(查 sqlite_schema
  2. 如果有,走索引路径:扫描索引 B-Tree → 过滤匹配 rowid → 按 rowid 取完整行
  3. 如果没有,回退到第 08 篇实现的全表扫描 + 逐行过滤
// src/sql.rs(更新 select_where 函数)

use crate::index::{find_index_root_page, scan_index};
use crate::record::Value;
use crate::schema::{read_schema, parse_column_defs};
use crate::pager::Pager;

pub struct QueryResult {
    pub columns: Vec<String>,
    pub rows:    Vec<Vec<String>>,
}

/// 执行 SELECT * FROM <table> WHERE <col> = <val>
pub fn select_where(
    pager: &mut Pager,
    table_name: &str,
    where_col: &str,
    where_val: &str,
) -> Result<QueryResult, String> {
    // 1. 读取表定义
    let entries = read_schema(pager);
    let entry = entries.iter()
        .find(|e| e.object_type == "table" && e.name == table_name)
        .ok_or_else(|| format!("table '{}' not found", table_name))?;

    let table_root = entry.root_page;
    let col_defs = entry.sql.as_deref()
        .map(parse_column_defs)
        .unwrap_or_default();
    let col_names: Vec<String> = col_defs.iter().map(|c| c.name.clone()).collect();

    // 2. 检查是否存在对应列的索引
    let index_root = find_index_root_page(pager, table_name, Some(where_col));

    let rows = if let Some(idx_root) = index_root {
        // —— 索引路径 ——
        // 2a. 扫描索引 B-Tree,收集匹配 rowid
        let index_entries = scan_index(pager, idx_root);
        let matched_rowids: Vec<i64> = index_entries
            .into_iter()
            .filter(|(key, _rowid)| value_matches(key, where_val))
            .map(|(_key, rowid)| rowid)
            .collect();

        // 2b. 按 rowid 从表 B-Tree 取完整行
        let mut rows = Vec::new();
        for rowid in matched_rowids {
            if let Some(row) = lookup_by_rowid(pager, table_root, rowid, &col_defs) {
                rows.push(row);
            }
        }
        rows
    } else {
        // —— 全表扫描回退路径(第 08 篇实现)——
        select_all_filtered(pager, table_root, &col_defs, where_col, where_val)
    };

    Ok(QueryResult { columns: col_names, rows })
}

/// 判断索引键值是否与目标字符串匹配(等值比较)
fn value_matches(v: &Value, target: &str) -> bool {
    match v {
        Value::Text(s)    => s == target,
        Value::Integer(n) => n.to_string() == target,
        Value::Float(f)   => {
            // 整数形式的浮点数也能匹配,如 1.0 匹配 "1"
            if *f == f.trunc() && f.abs() < 1e15 {
                format!("{}", *f as i64) == target
            } else {
                format!("{}", f) == target
            }
        }
        Value::Null => target.eq_ignore_ascii_case("null"),
        Value::Blob(_) => false,
    }
}

/// 全表扫描 + 过滤(回退路径,来自第 08 篇)
fn select_all_filtered(
    pager: &mut Pager,
    root_page: u32,
    col_defs: &[crate::schema::ColumnDef],
    where_col: &str,
    where_val: &str,
) -> Vec<Vec<String>> {
    use crate::btree::collect_leaf_cells;
    use crate::record::parse_leaf_cell;

    // 找出 where_col 在列定义中的索引位置(考虑 rowid 别名偏移)
    let col_pos = col_defs.iter().position(|c| c.name.eq_ignore_ascii_case(where_col));
    let cells = collect_leaf_cells(pager, root_page);
    let mut rows = Vec::new();

    for cell_addr in &cells {
        let page_data = match pager.read_page(cell_addr.page_num) {
            Ok(d) => d,
            Err(_) => continue,
        };
        let (rowid, values) = parse_leaf_cell(&page_data, cell_addr.offset as usize);

        // 组装完整行
        let mut row = Vec::new();
        let mut val_idx = 0usize;
        for col_def in col_defs {
            if col_def.is_rowid_alias {
                row.push(rowid.to_string());
            } else {
                let v = values.get(val_idx)
                    .map(value_to_string)
                    .unwrap_or_else(|| "NULL".to_string());
                row.push(v);
                val_idx += 1;
            }
        }

        // 过滤:检查 where 列是否等于目标值
        let matches = col_pos
            .and_then(|i| row.get(i))
            .map(|cell_val| cell_val == where_val)
            .unwrap_or(false);

        if matches {
            rows.push(row);
        }
    }
    rows
}

七、测试

建立索引

# 为 users 表的 name 列创建索引
sqlite3 test.db "CREATE INDEX idx_name ON users(name)"

# 确认索引已写入 sqlite_schema
sqlite3 test.db "SELECT type, name, tbl_name, rootpage FROM sqlite_schema"
table|users|users|2
index|idx_name|users|3

索引根页为 3,与表根页 2 独立。

更新 main.rs

// src/main.rs

mod header; mod page; mod pager; mod btree;
mod varint; mod record; mod schema; mod sql; mod index;

use header::DbHeader;
use pager::Pager;
use schema::read_schema;
use sql::{select_all, select_where};

fn main() {
    let args: Vec<String> = std::env::args().collect();
    if args.len() < 3 {
        eprintln!("Usage: sqlite-rs <db> <.tables | SQL>");
        std::process::exit(1);
    }

    let db_path = &args[1];
    let command = args[2..].join(" ");
    let db_header = DbHeader::read_from_file(db_path).unwrap();
    let mut pager = Pager::open(db_path, db_header.page_size).unwrap();

    if command == ".tables" {
        let entries = read_schema(&mut pager);
        let names: Vec<&str> = entries.iter()
            .filter(|e| e.object_type == "table" && !e.name.starts_with("sqlite_"))
            .map(|e| e.name.as_str())
            .collect();
        println!("{}", names.join(" "));
        return;
    }

    // SELECT * FROM <table> WHERE <col> = '<val>'
    if let Some((table, col, val)) = parse_select_where(&command) {
        match select_where(&mut pager, table, col, val) {
            Ok(result) => {
                println!("{}", result.columns.join("|"));
                for row in &result.rows {
                    println!("{}", row.join("|"));
                }
            }
            Err(e) => { eprintln!("Error: {}", e); std::process::exit(1); }
        }
        return;
    }

    // SELECT * FROM <table>
    if let Some(table_name) = parse_select_star(&command) {
        match select_all(&mut pager, table_name) {
            Ok(result) => {
                println!("{}", result.columns.join("|"));
                for row in &result.rows {
                    println!("{}", row.join("|"));
                }
            }
            Err(e) => { eprintln!("Error: {}", e); std::process::exit(1); }
        }
        return;
    }

    eprintln!("Unsupported command: {}", command);
    std::process::exit(1);
}

fn parse_select_star(sql: &str) -> Option<&str> {
    let upper = sql.to_uppercase();
    let rest = upper.strip_prefix("SELECT")?.trim_start()
        .strip_prefix('*')?.trim_start()
        .strip_prefix("FROM")?.trim_start();
    // 停在 WHERE 或字符串末尾
    let table_end = rest.find(|c: char| c.is_whitespace()).unwrap_or(rest.len());
    let offset = sql.len() - rest.len();
    Some(sql[offset..offset + table_end].trim())
}

/// 解析 "SELECT * FROM users WHERE name = 'Alice'"
/// 返回 (table_name, col_name, value)(value 不含引号)
fn parse_select_where(sql: &str) -> Option<(&str, &str, &str)> {
    let upper = sql.to_uppercase();
    // 必须包含 WHERE
    let where_pos = upper.find("WHERE")?;

    // 表名:FROM 和 WHERE 之间
    let from_pos = upper.find("FROM")?;
    let table_str = sql[from_pos + 4..where_pos].trim();
    // 取第一个 token(去掉多余内容)
    let table_name = table_str.split_whitespace().next()?;

    // WHERE 子句
    let where_clause = sql[where_pos + 5..].trim();
    // 期望格式:col = 'val' 或 col = val
    let eq_pos = where_clause.find('=')?;
    let col = where_clause[..eq_pos].trim();
    let raw_val = where_clause[eq_pos + 1..].trim();
    // 去掉引号
    let val = raw_val.trim_matches('\'').trim_matches('"');

    Some((table_name, col, val))
}

执行查询验证

cargo run -- test.db "SELECT * FROM users WHERE name = 'Alice'"
id|name|age
1|Alice|30

查询走了索引路径:先在索引 B-Tree 找到 ["Alice", 1],取出 rowid=1,然后在表 B-Tree 中找到 rowid=1 的行并解码,输出结果。

# 无索引的列,自动回退全表扫描
cargo run -- test.db "SELECT * FROM users WHERE age = '30'"
id|name|age
1|Alice|30

age 列没有索引,走全表扫描路径,结果同样正确。

# 大表测试:500 行,name 列有索引
cargo run -- big.db "SELECT * FROM users WHERE name = 'User250'"
id|name|age
250|User250|270

索引路径只解码了一行的完整 Record,而不是 500 行,速度显著优于全表扫描。

八、系列回顾:我们构建了什么

全部九篇一览

标题核心实现
01文件头解析读取 100 字节数据库头,提取页大小、页数、编码等元信息
02页结构与 Pager解析 B-Tree 页头(8/12 字节),实现 Pager 页读取缓存
03遍历 B-Tree递归遍历内部页与叶子页,收集所有叶子 Cell 地址
04Record 格式解析实现 varint 解码,解析 Record header + Serial Type,提取列值
05Schema 解析与 .tables读取 sqlite_schema 表,实现 .tables 命令
06SELECT * 带列名输出从 CREATE TABLE SQL 解析列定义,处理 INTEGER PRIMARY KEY 别名
07SELECT count(*)利用 B-Tree 叶子 Cell 计数,不解码 Record,O(N pages) 统计
08WHERE 条件过滤全表扫描 + 内存过滤,支持等值 WHERE 条件
09索引 B-Tree 与加速查询解析索引叶子 Cell,通过索引获取 rowid,再按 rowid 取行

最终代码结构

sqlite-rs/
└── src/
    ├── main.rs     ← 命令路由:.tables / SELECT * / SELECT WHERE
    ├── header.rs   ← 文件头解析(第 01 篇)
    ├── page.rs     ← 页头 + Cell 指针数组(第 02 篇)
    ├── pager.rs    ← 页读取器与缓存(第 02 篇)
    ├── btree.rs    ← B-Tree 递归遍历,收集叶子 Cell(第 03 篇)
    ├── varint.rs   ← varint 解码(第 04 篇)
    ├── record.rs   ← Record 格式解析,Serial Type 到 Value(第 04 篇)
    ├── schema.rs   ← sqlite_schema 读取 + CREATE TABLE 列定义解析(第 05、06 篇)
    ├── sql.rs      ← SQL 执行器:SELECT * / count(*) / WHERE 过滤(第 06-08 篇)
    └── index.rs    ← 索引 B-Tree 扫描 + rowid 定位(第 09 篇)

未实现的功能

我们构建了一个只读的、无事务的最简 SQLite 读取器。以下功能超出了本系列的范围:

  • WAL 模式(Write-Ahead Log):本系列只处理 journal_mode=delete 的传统格式,WAL 模式有独立的 .wal 文件和 shm 文件
  • 溢出页(Overflow Pages):当一个 Cell 的 payload 超过页内限制时,剩余数据存储在溢出链表中;本系列假设所有 Cell 都在单页内
  • 写操作(INSERT / UPDATE / DELETE):涉及页分裂、页合并、空闲页管理等复杂逻辑
  • 事务与崩溃恢复:journal 文件的读取与应用
  • 查询优化器:真正的 SQLite 有基于代价估算的查询计划,我们只做了简单的"有索引就用"规则
  • 真正的 B-Tree 查找:我们的索引扫描是全量遍历后过滤,真正的实现应该利用 B-Tree 有序性做二分查找,复杂度从 O(N) 降到 O(log N)
  • 多列索引、覆盖索引、表达式索引:本系列只处理了最简单的单列等值索引

进一步阅读

九、关键点总结

  • 索引 B-Tree 的叶子 Cell 格式:payload_size(varint) + payload(Record),没有单独的 rowid 字段——rowid 是 Record 的最后一列
  • 表 B-Tree 的叶子 Cell 格式:payload_size(varint) + rowid(varint) + payload(Record),rowid 单独存储,Record 中的 INTEGER PRIMARY KEY 列为 NULL
  • sqlite_schematype='index' 的条目记录了索引名、所属表和根页号,通过根页号找到索引 B-Tree
  • 索引辅助查询的流程:扫描索引 B-Tree 得到匹配的 rowid 集合 → 按 rowid 到表 B-Tree 中取完整行
  • 没有索引时自动回退全表扫描,保证查询结果的正确性
  • collect_leaf_cells 对表 B-Tree 和索引 B-Tree 通用,差异只在 Cell 内容的解析层
  • 至此,一个具备基本读取能力的手写 SQLite 解析器已经完成:从文件头读取到 B-Tree 遍历,从 Record 解码到索引加速查询,涵盖了 SQLite 文件格式的核心机制