前三篇我们已经能从 B-Tree 遍历中拿到每一行的 (页号, 页内偏移)。这一篇解决最关键的问题:怎么把那个偏移处的字节流解码成真正的列值——比如 (id=42, name="Alice", age=30)。
这需要理解两个核心编码:varint(可变长整数)和 Serial Type(列类型描述符)。
一、叶节点 Cell 的完整格式
叶节点表(LeafTable)的每个 Cell 格式如下:
┌─────────────────────────────────────────────────────────┐
│ Payload 字节数 (varint) │
├─────────────────────────────────────────────────────────┤
│ rowid (varint) │
├─────────────────────────────────────────────────────────┤
│ Payload(Record) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Header 字节数 (varint) │ │
│ │ Serial Type[0] (varint) ← 第 0 列的类型 │ │
│ │ Serial Type[1] (varint) ← 第 1 列的类型 │ │
│ │ ... │ │
│ ├─────────────────────────────────────────────────┤ │
│ │ 列值[0] (长度由 Serial Type[0] 决定) │ │
│ │ 列值[1] (长度由 Serial Type[1] 决定) │ │
│ │ ... │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
关键点:Header 先描述所有列的类型,Body 再依次存列值。解码时先读 Header 拿到类型列表,再按类型从 Body 读对应长度的字节。
二、varint 编码
varint(可变长整数)是 SQLite 用于节省空间的整数编码。规则:
- 每个字节的最高位(bit 7)是延续标志:1 表示后面还有字节,0 表示这是最后一个字节
- 每个字节的低 7 位是实际数据位
- 最多 9 个字节,第 9 个字节用全部 8 位存数据
- 小整数只需 1 字节,大整数最多 9 字节
示例:
数值 50 → 0x32 (1 字节,bit7=0 表示结束)
数值 200 → 0x81 0x48 (2 字节,0x81 的 bit7=1 继续,0x48 的 bit7=0 结束)
取低7位:0000001 0101000 → 1001001000 → 200
数值 16384 → 0x81 0x80 0x00 (3 字节)
// src/varint.rs
/// 从字节切片的指定位置解码一个 varint
/// 返回 (值, 消耗的字节数)
pub fn decode_varint(data: &[u8], pos: usize) -> (i64, usize) {
let mut result: i64 = 0;
let mut bytes_read = 0;
for i in 0..9 {
let byte = data[pos + i];
bytes_read += 1;
if i == 8 {
// 第 9 个字节:全部 8 位都是数据位
result = (result << 8) | (byte as i64);
break;
} else {
// 前 8 个字节:低 7 位是数据
result = (result << 7) | ((byte & 0x7F) as i64);
if byte & 0x80 == 0 {
// bit7 = 0,结束
break;
}
}
}
(result, bytes_read)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_byte() {
assert_eq!(decode_varint(&[0x32], 0), (50, 1));
assert_eq!(decode_varint(&[0x00], 0), (0, 1));
assert_eq!(decode_varint(&[0x7F], 0), (127, 1));
}
#[test]
fn test_two_bytes() {
// 200 = 0b11001000 → varint: 0x81 0x48
assert_eq!(decode_varint(&[0x81, 0x48], 0), (200, 2));
}
}
三、Serial Type:列类型描述符
Record Header 中每列对应一个 varint 类型描述符(Serial Type),它同时编码了类型和字节长度:
| Serial Type 值 | 含义 | 数据字节数 |
|---|---|---|
| 0 | NULL | 0 |
| 1 | 8 位有符号整数 | 1 |
| 2 | 16 位有符号整数(大端) | 2 |
| 3 | 24 位有符号整数(大端) | 3 |
| 4 | 32 位有符号整数(大端) | 4 |
| 5 | 48 位有符号整数(大端) | 6 |
| 6 | 64 位有符号整数(大端) | 8 |
| 7 | IEEE 754 64 位浮点数 | 8 |
| 8 | 整数 0(不占字节) | 0 |
| 9 | 整数 1(不占字节) | 0 |
| 10, 11 | 保留 | — |
| N ≥ 12,偶数 | BLOB,长度 = (N - 12) / 2 | (N-12)/2 |
| N ≥ 13,奇数 | TEXT,长度 = (N - 13) / 2 | (N-13)/2 |
TEXT 和 BLOB 的长度编码在 Serial Type 里,不需要单独存长度字段——这是非常紧凑的设计。
// src/record.rs
use crate::varint::decode_varint;
/// SQLite 列值
#[derive(Debug, Clone)]
pub enum Value {
Null,
Integer(i64),
Float(f64),
Text(String),
Blob(Vec<u8>),
}
impl std::fmt::Display for Value {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Value::Null => write!(f, "NULL"),
Value::Integer(n) => write!(f, "{}", n),
Value::Float(n) => write!(f, "{}", n),
Value::Text(s) => write!(f, "{}", s),
Value::Blob(b) => write!(f, "<blob {} bytes>", b.len()),
}
}
}
/// Serial Type 对应的字节长度
fn serial_type_size(st: i64) -> usize {
match st {
0 => 0, // NULL
1 => 1,
2 => 2,
3 => 3,
4 => 4,
5 => 6,
6 | 7 => 8,
8 | 9 => 0, // 常量 0 / 1,不占字节
n if n >= 12 && n % 2 == 0 => ((n - 12) / 2) as usize, // BLOB
n if n >= 13 && n % 2 == 1 => ((n - 13) / 2) as usize, // TEXT
_ => panic!("invalid serial type: {}", st),
}
}
/// 从字节流的指定偏移读取一个列值
fn read_value(data: &[u8], offset: usize, serial_type: i64) -> Value {
let d = &data[offset..];
match serial_type {
0 => Value::Null,
1 => Value::Integer(d[0] as i8 as i64),
2 => Value::Integer(i16::from_be_bytes([d[0], d[1]]) as i64),
3 => {
// 24 位有符号整数,手动处理符号扩展
let raw = ((d[0] as i32) << 16) | ((d[1] as i32) << 8) | (d[2] as i32);
let signed = if raw & 0x800000 != 0 { raw | !0xFFFFFF } else { raw };
Value::Integer(signed as i64)
}
4 => Value::Integer(i32::from_be_bytes([d[0], d[1], d[2], d[3]]) as i64),
5 => {
// 48 位有符号整数
let raw = ((d[0] as i64) << 40)
| ((d[1] as i64) << 32)
| ((d[2] as i64) << 24)
| ((d[3] as i64) << 16)
| ((d[4] as i64) << 8)
| (d[5] as i64);
let signed = if raw & (1 << 47) != 0 { raw | (!0i64 << 48) } else { raw };
Value::Integer(signed)
}
6 => Value::Integer(i64::from_be_bytes([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7]])),
7 => Value::Float(f64::from_be_bytes([d[0],d[1],d[2],d[3],d[4],d[5],d[6],d[7]])),
8 => Value::Integer(0),
9 => Value::Integer(1),
n if n >= 12 && n % 2 == 0 => {
let len = ((n - 12) / 2) as usize;
Value::Blob(d[..len].to_vec())
}
n if n >= 13 && n % 2 == 1 => {
let len = ((n - 13) / 2) as usize;
Value::Text(String::from_utf8_lossy(&d[..len]).into_owned())
}
_ => panic!("invalid serial type: {}", serial_type),
}
}
解析完整 Record
// src/record.rs (续)
/// 解析叶节点 Cell,返回 (rowid, 列值列表)
pub fn parse_leaf_cell(page_data: &[u8], cell_offset: usize) -> (i64, Vec<Value>) {
let mut pos = cell_offset;
// 1. 读取 Payload 总字节数(varint)
let (payload_size, n) = decode_varint(page_data, pos);
pos += n;
// 2. 读取 rowid(varint)
let (rowid, n) = decode_varint(page_data, pos);
pos += n;
// 3. 解析 Record(Payload 从 pos 开始)
let payload_start = pos;
// 3a. 读取 Record Header 长度(varint)
let (header_size, n) = decode_varint(page_data, pos);
pos += n;
// 3b. 读取所有 Serial Type
let header_end = payload_start + header_size as usize;
let mut serial_types = Vec::new();
while pos < header_end {
let (st, n) = decode_varint(page_data, pos);
pos += n;
serial_types.push(st);
}
// 3c. 按 Serial Type 依次读取列值
let mut values = Vec::new();
for st in &serial_types {
let value = read_value(page_data, pos, *st);
pos += serial_type_size(*st);
values.push(value);
}
(rowid, values)
}
四、组装验证
更新 main.rs,遍历 B-Tree 并打印所有行:
// src/main.rs
mod header;
mod page;
mod pager;
mod btree;
mod varint;
mod record;
use header::DbHeader;
use btree::collect_leaf_cells;
use pager::Pager;
use record::parse_leaf_cell;
fn main() {
let path = std::env::args().nth(1).unwrap_or_else(|| {
eprintln!("Usage: sqlite-rs <database.db>");
std::process::exit(1);
});
let db_header = DbHeader::read_from_file(&path).unwrap();
let mut pager = Pager::open(&path, db_header.page_size).unwrap();
// 暂时硬编码 users 表的根页为 Page 2
// 第 05 篇从 sqlite_schema 动态读取
let root_page = 2u32;
let cells = collect_leaf_cells(&mut pager, root_page);
println!("{:>5} {}", "rowid", "columns");
println!("{}", "-".repeat(40));
for cell_addr in &cells {
let page_data = pager.read_page(cell_addr.page_num).unwrap();
let (rowid, values) = parse_leaf_cell(&page_data, cell_addr.offset as usize);
let cols: Vec<String> = values.iter().map(|v| v.to_string()).collect();
println!("{:>5} {}", rowid, cols.join(" | "));
}
println!();
println!("Total: {} rows", cells.len());
}
cargo run -- test.db
rowid columns
----------------------------------------
1 Alice | 30
2 Bob | 25
3 Charlie | 35
Total: 3 rows
注意:id 列是 INTEGER PRIMARY KEY,SQLite 把它作为 rowid 别名——所以 Record 里实际上不存 id 列,id 的值就是 rowid 本身。这就是为什么 columns 只显示 name | age 两列,而不是三列。
用 500 行的大库验证:
cargo run -- big.db | head -6
cargo run -- big.db | tail -3
rowid columns
----------------------------------------
1 User1 | 21
2 User2 | 22
3 User3 | 23
4 User4 | 24
498 User498 | 48
499 User499 | 49
500 User500 | 20
Total: 500 rows
顺序正确,数据完整。
五、hexdump 验证一行
用 Python 直接查看第一个 Cell 的原始字节,手动解码:
data = open('test.db', 'rb').read()
# Page 1 的 Cell 指针数组从偏移 108 开始
# Cell[0] 的偏移 = data[108:110] 大端序
cell_offset = int.from_bytes(data[108:110], 'big')
print(f"Cell offset in page: {cell_offset}")
# 读取 Cell(Page 1 偏移 0 处就是页头,Cell 从 cell_offset 起)
b = data[cell_offset:]
# 字节 1:payload 大小(varint)
payload_size = b[0]
print(f"Payload size: {payload_size}")
# 字节 2:rowid(varint)
rowid = b[1]
print(f"rowid: {rowid}")
# 字节 3:Record Header 大小(varint)
header_size = b[2]
print(f"Header size: {header_size} bytes")
# 字节 4..header_size+2:Serial Types
print(f"Serial types: {list(b[3:3+header_size-1])}")
# 数据区域
body_start = 2 + header_size
print(f"Body bytes: {list(b[body_start:body_start+20])}")
Cell offset in page: 3964
Payload size: 14
rowid: 1
Header size: 4 bytes
Serial types: [0, 23, 4] ← NULL(id已是rowid), TEXT长5字节("Alice"), INT32(30)
Body bytes: [65, 108, 105, 99, 101, 0, 0, 0, 30, ...]
A l i c e 30
完全吻合。Serial Type 23 = 奇数,TEXT,长度 = (23-13)/2 = 5,对应 "Alice"(5字节)。Serial Type 4 = 32位整数,对应 30。
六、当前代码结构
sqlite-rs/
└── src/
├── main.rs ← 入口(打印所有行)
├── header.rs ← 文件头(第 01 篇)
├── page.rs ← 页头 + Cell 指针(第 02 篇)
├── pager.rs ← 页读取器(第 02 篇)
├── btree.rs ← B-Tree 遍历(第 03 篇)
├── varint.rs ← varint 解码(本篇)
└── record.rs ← Record 解析(本篇)
七、关键点总结
- 叶节点 Cell =
payload_size(varint)+rowid(varint)+Record(payload) - Record =
header_size(varint)+serial_types[](varint)+values[] - varint:最高位为延续标志,低 7 位为数据,最多 9 字节
- Serial Type 同时编码类型和长度,TEXT/BLOB 的字节数 =
(N - 13) / 2或(N - 12) / 2 INTEGER PRIMARY KEY列不存在 Record 里,它的值就是 rowid
下一篇:解析 sqlite_schema 表(它本身也是一个普通的 B-Tree 表),从中读取用户表的名字和根页号——这样我们就不需要再硬编码 root_page = 2,能实现真正的 .tables 命令了。