手写 SQLite 04:Record 格式解析——varint、Serial Type 与列值还原

前三篇我们已经能从 B-Tree 遍历中拿到每一行的 (页号, 页内偏移)。这一篇解决最关键的问题:怎么把那个偏移处的字节流解码成真正的列值——比如 (id=42, name="Alice", age=30)

这需要理解两个核心编码:varint(可变长整数)和 Serial Type(列类型描述符)。

一、叶节点 Cell 的完整格式

叶节点表(LeafTable)的每个 Cell 格式如下:

┌─────────────────────────────────────────────────────────┐
│  Payload 字节数   (varint)                               │
├─────────────────────────────────────────────────────────┤
│  rowid            (varint)                               │
├─────────────────────────────────────────────────────────┤
│  Payload(Record)                                      │
│  ┌─────────────────────────────────────────────────┐   │
│  │ Header 字节数  (varint)                          │   │
│  │ Serial Type[0] (varint) ← 第 0 列的类型          │   │
│  │ Serial Type[1] (varint) ← 第 1 列的类型          │   │
│  │ ...                                              │   │
│  ├─────────────────────────────────────────────────┤   │
│  │ 列值[0]  (长度由 Serial Type[0] 决定)             │   │
│  │ 列值[1]  (长度由 Serial Type[1] 决定)             │   │
│  │ ...                                              │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

关键点:Header 先描述所有列的类型,Body 再依次存列值。解码时先读 Header 拿到类型列表,再按类型从 Body 读对应长度的字节。

二、varint 编码

varint(可变长整数)是 SQLite 用于节省空间的整数编码。规则:

  • 每个字节的最高位(bit 7)是延续标志:1 表示后面还有字节,0 表示这是最后一个字节
  • 每个字节的低 7 位是实际数据位
  • 最多 9 个字节,第 9 个字节用全部 8 位存数据
  • 小整数只需 1 字节,大整数最多 9 字节
示例:
  数值 50    → 0x32           (1 字节,bit7=0 表示结束)
  数值 200   → 0x81 0x48      (2 字节,0x81 的 bit7=1 继续,0x48 的 bit7=0 结束)
              取低7位:0000001 0101000 → 1001001000 → 200
  数值 16384 → 0x81 0x80 0x00 (3 字节)
// src/varint.rs

/// 从字节切片的指定位置解码一个 varint
/// 返回 (值, 消耗的字节数)
pub fn decode_varint(data: &[u8], pos: usize) -> (i64, usize) {
    let mut result: i64 = 0;
    let mut bytes_read = 0;

    for i in 0..9 {
        let byte = data[pos + i];
        bytes_read += 1;

        if i == 8 {
            // 第 9 个字节:全部 8 位都是数据位
            result = (result << 8) | (byte as i64);
            break;
        } else {
            // 前 8 个字节:低 7 位是数据
            result = (result << 7) | ((byte & 0x7F) as i64);
            if byte & 0x80 == 0 {
                // bit7 = 0,结束
                break;
            }
        }
    }

    (result, bytes_read)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_single_byte() {
        assert_eq!(decode_varint(&[0x32], 0), (50, 1));
        assert_eq!(decode_varint(&[0x00], 0), (0, 1));
        assert_eq!(decode_varint(&[0x7F], 0), (127, 1));
    }

    #[test]
    fn test_two_bytes() {
        // 200 = 0b11001000 → varint: 0x81 0x48
        assert_eq!(decode_varint(&[0x81, 0x48], 0), (200, 2));
    }
}

三、Serial Type:列类型描述符

Record Header 中每列对应一个 varint 类型描述符(Serial Type),它同时编码了类型字节长度

Serial Type 值含义数据字节数
0NULL0
18 位有符号整数1
216 位有符号整数(大端)2
324 位有符号整数(大端)3
432 位有符号整数(大端)4
548 位有符号整数(大端)6
664 位有符号整数(大端)8
7IEEE 754 64 位浮点数8
8整数 0(不占字节)0
9整数 1(不占字节)0
10, 11保留
N ≥ 12,偶数BLOB,长度 = (N - 12) / 2(N-12)/2
N ≥ 13,奇数TEXT,长度 = (N - 13) / 2(N-13)/2

TEXT 和 BLOB 的长度编码在 Serial Type 里,不需要单独存长度字段——这是非常紧凑的设计。

// src/record.rs

use crate::varint::decode_varint;

/// SQLite 列值
#[derive(Debug, Clone)]
pub enum Value {
    Null,
    Integer(i64),
    Float(f64),
    Text(String),
    Blob(Vec<u8>),
}

impl std::fmt::Display for Value {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Value::Null        => write!(f, "NULL"),
            Value::Integer(n)  => write!(f, "{}", n),
            Value::Float(n)    => write!(f, "{}", n),
            Value::Text(s)     => write!(f, "{}", s),
            Value::Blob(b)     => write!(f, "<blob {} bytes>", b.len()),
        }
    }
}

/// Serial Type 对应的字节长度
fn serial_type_size(st: i64) -> usize {
    match st {
        0       => 0,  // NULL
        1       => 1,
        2       => 2,
        3       => 3,
        4       => 4,
        5       => 6,
        6 | 7   => 8,
        8 | 9   => 0,  // 常量 0 / 1,不占字节
        n if n >= 12 && n % 2 == 0 => ((n - 12) / 2) as usize,  // BLOB
        n if n >= 13 && n % 2 == 1 => ((n - 13) / 2) as usize,  // TEXT
        _ => panic!("invalid serial type: {}", st),
    }
}

/// 从字节流的指定偏移读取一个列值
fn read_value(data: &[u8], offset: usize, serial_type: i64) -> Value {
    let d = &data[offset..];
    match serial_type {
        0 => Value::Null,
        1 => Value::Integer(d[0] as i8 as i64),
        2 => Value::Integer(i16::from_be_bytes([d[0], d[1]]) as i64),
        3 => {
            // 24 位有符号整数,手动处理符号扩展
            let raw = ((d[0] as i32) << 16) | ((d[1] as i32) << 8) | (d[2] as i32);
            let signed = if raw & 0x800000 != 0 { raw | !0xFFFFFF } else { raw };
            Value::Integer(signed as i64)
        }
        4 => Value::Integer(i32::from_be_bytes([d[0], d[1], d[2], d[3]]) as i64),
        5 => {
            // 48 位有符号整数
            let raw = ((d[0] as i64) << 40)
                    | ((d[1] as i64) << 32)
                    | ((d[2] as i64) << 24)
                    | ((d[3] as i64) << 16)
                    | ((d[4] as i64) << 8)
                    |  (d[5] as i64);
            let signed = if raw & (1 << 47) != 0 { raw | (!0i64 << 48) } else { raw };
            Value::Integer(signed)
        }
        6 => Value::Integer(i64::from_be_bytes([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7]])),
        7 => Value::Float(f64::from_be_bytes([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7]])),
        8 => Value::Integer(0),
        9 => Value::Integer(1),
        n if n >= 12 && n % 2 == 0 => {
            let len = ((n - 12) / 2) as usize;
            Value::Blob(d[..len].to_vec())
        }
        n if n >= 13 && n % 2 == 1 => {
            let len = ((n - 13) / 2) as usize;
            Value::Text(String::from_utf8_lossy(&d[..len]).into_owned())
        }
        _ => panic!("invalid serial type: {}", serial_type),
    }
}

解析完整 Record

// src/record.rs (续)

/// 解析叶节点 Cell,返回 (rowid, 列值列表)
pub fn parse_leaf_cell(page_data: &[u8], cell_offset: usize) -> (i64, Vec<Value>) {
    let mut pos = cell_offset;

    // 1. 读取 Payload 总字节数(varint)
    let (payload_size, n) = decode_varint(page_data, pos);
    pos += n;

    // 2. 读取 rowid(varint)
    let (rowid, n) = decode_varint(page_data, pos);
    pos += n;

    // 3. 解析 Record(Payload 从 pos 开始)
    let payload_start = pos;

    // 3a. 读取 Record Header 长度(varint)
    let (header_size, n) = decode_varint(page_data, pos);
    pos += n;

    // 3b. 读取所有 Serial Type
    let header_end = payload_start + header_size as usize;
    let mut serial_types = Vec::new();
    while pos < header_end {
        let (st, n) = decode_varint(page_data, pos);
        pos += n;
        serial_types.push(st);
    }

    // 3c. 按 Serial Type 依次读取列值
    let mut values = Vec::new();
    for st in &serial_types {
        let value = read_value(page_data, pos, *st);
        pos += serial_type_size(*st);
        values.push(value);
    }

    (rowid, values)
}

四、组装验证

更新 main.rs,遍历 B-Tree 并打印所有行:

// src/main.rs

mod header;
mod page;
mod pager;
mod btree;
mod varint;
mod record;

use header::DbHeader;
use btree::collect_leaf_cells;
use pager::Pager;
use record::parse_leaf_cell;

fn main() {
    let path = std::env::args().nth(1).unwrap_or_else(|| {
        eprintln!("Usage: sqlite-rs <database.db>");
        std::process::exit(1);
    });

    let db_header = DbHeader::read_from_file(&path).unwrap();
    let mut pager = Pager::open(&path, db_header.page_size).unwrap();

    // 暂时硬编码 users 表的根页为 Page 2
    // 第 05 篇从 sqlite_schema 动态读取
    let root_page = 2u32;
    let cells = collect_leaf_cells(&mut pager, root_page);

    println!("{:>5}  {}", "rowid", "columns");
    println!("{}", "-".repeat(40));

    for cell_addr in &cells {
        let page_data = pager.read_page(cell_addr.page_num).unwrap();
        let (rowid, values) = parse_leaf_cell(&page_data, cell_addr.offset as usize);

        let cols: Vec<String> = values.iter().map(|v| v.to_string()).collect();
        println!("{:>5}  {}", rowid, cols.join(" | "));
    }

    println!();
    println!("Total: {} rows", cells.len());
}
cargo run -- test.db
rowid  columns
----------------------------------------
    1  Alice | 30
    2  Bob | 25
    3  Charlie | 35

Total: 3 rows

注意:id 列是 INTEGER PRIMARY KEY,SQLite 把它作为 rowid 别名——所以 Record 里实际上不存 id 列,id 的值就是 rowid 本身。这就是为什么 columns 只显示 name | age 两列,而不是三列。

用 500 行的大库验证:

cargo run -- big.db | head -6
cargo run -- big.db | tail -3
rowid  columns
----------------------------------------
    1  User1 | 21
    2  User2 | 22
    3  User3 | 23
    4  User4 | 24

  498  User498 | 48
  499  User499 | 49
  500  User500 | 20

Total: 500 rows

顺序正确,数据完整。

五、hexdump 验证一行

用 Python 直接查看第一个 Cell 的原始字节,手动解码:

data = open('test.db', 'rb').read()

# Page 1 的 Cell 指针数组从偏移 108 开始
# Cell[0] 的偏移 = data[108:110] 大端序
cell_offset = int.from_bytes(data[108:110], 'big')
print(f"Cell offset in page: {cell_offset}")

# 读取 Cell(Page 1 偏移 0 处就是页头,Cell 从 cell_offset 起)
b = data[cell_offset:]

# 字节 1:payload 大小(varint)
payload_size = b[0]
print(f"Payload size: {payload_size}")

# 字节 2:rowid(varint)
rowid = b[1]
print(f"rowid: {rowid}")

# 字节 3:Record Header 大小(varint)
header_size = b[2]
print(f"Header size: {header_size} bytes")

# 字节 4..header_size+2:Serial Types
print(f"Serial types: {list(b[3:3+header_size-1])}")

# 数据区域
body_start = 2 + header_size
print(f"Body bytes: {list(b[body_start:body_start+20])}")
Cell offset in page: 3964
Payload size: 14
rowid: 1
Header size: 4 bytes
Serial types: [0, 23, 4]   ← NULL(id已是rowid), TEXT长5字节("Alice"), INT32(30)
Body bytes: [65, 108, 105, 99, 101, 0, 0, 0, 30, ...]
            A   l   i   c   e              30

完全吻合。Serial Type 23 = 奇数,TEXT,长度 = (23-13)/2 = 5,对应 "Alice"(5字节)。Serial Type 4 = 32位整数,对应 30。

六、当前代码结构

sqlite-rs/
└── src/
    ├── main.rs      ← 入口(打印所有行)
    ├── header.rs    ← 文件头(第 01 篇)
    ├── page.rs      ← 页头 + Cell 指针(第 02 篇)
    ├── pager.rs     ← 页读取器(第 02 篇)
    ├── btree.rs     ← B-Tree 遍历(第 03 篇)
    ├── varint.rs    ← varint 解码(本篇)
    └── record.rs    ← Record 解析(本篇)

七、关键点总结

  • 叶节点 Cell = payload_size(varint) + rowid(varint) + Record(payload)
  • Record = header_size(varint) + serial_types[](varint) + values[]
  • varint:最高位为延续标志,低 7 位为数据,最多 9 字节
  • Serial Type 同时编码类型和长度,TEXT/BLOB 的字节数 = (N - 13) / 2(N - 12) / 2
  • INTEGER PRIMARY KEY 列不存在 Record 里,它的值就是 rowid

下一篇:解析 sqlite_schema 表(它本身也是一个普通的 B-Tree 表),从中读取用户表的名字和根页号——这样我们就不需要再硬编码 root_page = 2,能实现真正的 .tables 命令了。