Apache Zookeeper 是一个开源的分布式协调服务,为分布式应用提供一致性服务。它最初由 Yahoo 开发,后成为 Apache 顶级项目。本文将深入剖析 Zookeeper 的核心概念、数据模型、Watch 机制以及集群架构。
Zookeeper 简介
Zookeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google Chubby 的开源实现。它提供了配置维护、命名服务、分布式同步、组服务等功能。
Zookeeper 的核心特性:
- 一致性:所有节点在同一时刻看到相同的数据
- 原子性:操作要么成功要么失败,不存在中间状态
- 可靠性:数据持久化,不因单点故障丢失
- 实时性:客户端能在一定时间内获取最新数据
- 顺序性:来自同一客户端的更新按发送顺序执行
数据模型
ZNode 结构
Zookeeper 的数据模型类似于文件系统,由 ZNode(节点)组成树形结构。每个 ZNode 都可以存储数据,也可以有子节点。
/
├── config
│ ├── database.properties
│ └── cache.properties
├── services
│ ├── service1
│ └── service2
└── locks
├── lock1
└── lock2
ZNode 类型
| 类型 | 说明 | 特点 |
|---|---|---|
| 持久节点(PERSISTENT) | 创建后一直存在 | 除非显式删除 |
| 临时节点(EPHEMERAL) | 与会话绑定 | 会话结束自动删除 |
| 持久顺序节点(PERSISTENT_SEQUENTIAL) | 持久节点 + 顺序编号 | 名称自动追加数字 |
| 临时顺序节点(EPHEMERAL_SEQUENTIAL) | 临时节点 + 顺序编号 | 会话结束自动删除 |
// 创建持久节点
zk.create("/persistent", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建临时节点
zk.create("/ephemeral", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建持久顺序节点
zk.create("/persistent-seq", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
// 创建临时顺序节点
zk.create("/ephemeral-seq", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
ZNode 属性
每个 ZNode 包含以下元数据(Stat 结构):
- czxid:节点创建时的事务 ID
- mzxid:节点最后修改时的事务 ID
- ctime:节点创建时间
- mtime:节点最后修改时间
- version:数据版本号
- cversion:子节点版本号
- aversion:ACL 版本号
- dataLength:数据长度(最大 1MB)
- numChildren:直接子节点数量
// 获取 ZNode 数据和元数据
Stat stat = new Stat();
byte[] data = zk.getData("/path", false, stat);
System.out.println("Version: " + stat.getVersion());
System.out.println("Ctime: " + stat.getCtime());
System.out.println("Mtime: " + stat.getMtime());
System.out.println("DataLength: " + stat.getDataLength());
Watcher 机制
Watcher 是 Zookeeper 的核心机制,用于监听 ZNode 的变化并异步通知客户端。
Watcher 类型
| 事件类型 | 触发条件 |
|---|---|
| NodeCreated | 节点被创建 |
| NodeDeleted | 节点被删除 |
| NodeDataChanged | 节点数据发生变化 |
| NodeChildrenChanged | 子节点列表发生变化 |
| None | 连接状态发生变化 |
Watcher 使用
// 创建 Watcher
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event type: " + event.getType());
System.out.println("Event path: " + event.getPath());
System.out.println("Keeper state: " + event.getState());
// 重新注册 Watcher(Watcher 是一次性的)
try {
zk.getData(event.getPath(), this, null);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 监听节点数据变化
zk.getData("/path", watcher, stat);
// 监听子节点变化
zk.getChildren("/path", watcher);
// 监听节点是否存在
zk.exists("/path", watcher);
Watcher 特点
- 一次性:Watcher 触发后自动失效,需要重新注册
- 异步通知:服务端异步通知客户端,不阻塞
- 轻量级:只通知发生了变化,不携带变化的具体内容
- 顺序性:同一客户端的 Watcher 按顺序触发
会话管理
会话生命周期
客户端与 Zookeeper 服务器之间的连接即为一个会话(Session)。会话状态转换如下:
CONNECTING → CONNECTED → CLOSED
↓
CONNECTING(重连)
会话状态
| 状态 | 说明 |
|---|---|
| CONNECTING | 正在连接服务器 |
| CONNECTED | 已成功连接 |
| RECONNECTING | 连接断开,正在重连 |
| RECONNECTED | 重连成功 |
| CLOSED | 会话已关闭 |
| AUTH_FAILED | 认证失败 |
// 创建会话,设置超时时间(毫秒)
int sessionTimeout = 30000; // 30 秒
ZooKeeper zk = new ZooKeeper("localhost:2181", sessionTimeout, watcher);
// 获取会话 ID 和超时时间
long sessionId = zk.getSessionId();
int timeout = zk.getSessionTimeout();
System.out.println("Session ID: " + sessionId);
System.out.println("Session timeout: " + timeout);
ACL 权限控制
ACL 组成
ACL(Access Control List)由三部分组成:
- scheme:权限模式,定义认证方式
- id:权限对象,具体的认证信息
- permission:权限类型,允许的操作
权限模式
| 模式 | 说明 |
|---|---|
| world | 开放模式,所有人可访问 |
| auth | 认证模式,已认证用户可访问 |
| digest | 用户名密码认证模式 |
| ip | IP 地址认证模式 |
| x509 | 证书认证模式 |
权限类型
| 权限 | 缩写 | 说明 |
|---|---|---|
| CREATE | c | 创建子节点 |
| READ | r | 读取节点数据和子节点列表 |
| WRITE | w | 修改节点数据 |
| DELETE | d | 删除子节点 |
| ADMIN | a | 设置 ACL |
// 设置 world 权限(所有人可读写)
zk.create("/open", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 设置 digest 权限
List<ACL> acls = new ArrayList<>();
ACL acl = new ACL(ZooDefs.Perms.ALL,
new Id("digest", DigestAuthenticationProvider.generateDigest("user:password")));
acls.add(acl);
zk.create("/secure", data, acls, CreateMode.PERSISTENT);
// 添加认证信息
zk.addAuthInfo("digest", "user:password".getBytes());
集群架构
集群角色
Zookeeper 集群采用 Leader-Follower 架构,通常部署奇数个节点以保证选举成功。
| 角色 | 职责 |
|---|---|
| Leader | 处理所有写请求,协调集群,发起投票 |
| Follower | 处理读请求,转发写请求给 Leader,参与选举投票 |
| Observer | 处理读请求,不参与选举投票,用于扩展读性能 |
Leader 选举
Zookeeper 使用 ZAB(ZooKeeper Atomic Broadcast)协议进行 Leader 选举。选举过程:
- 集群启动或 Leader 崩溃,所有节点进入 LOOKING 状态
- 每个节点发起投票,初始投票给自己(myid, zxid)
- 节点之间交换投票信息
- 根据规则更新投票:优先选 zxid 大的,zxid 相同则选 myid 大的
- 超过半数节点投票给同一节点,该节点成为 Leader
- 其他节点成为 Follower 或 Observer
集群配置
# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
dataLogDir=/var/lib/zookeeper/log
clientPort=2181
maxClientCnxns=1000
# 集群节点配置(格式:server.id=host:quorumPort:leaderElectionPort)
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
# 在各节点 dataDir 目录下创建 myid 文件
echo "1" > /var/lib/zookeeper/myid # 节点1
echo "2" > /var/lib/zookeeper/myid # 节点2
echo "3" > /var/lib/zookeeper/myid # 节点3
ZAB 协议
ZAB 概述
ZAB(ZooKeeper Atomic Broadcast)是 Zookeeper 专门设计的原子广播协议,用于保证分布式数据的一致性。ZAB 包含两种模式:
- 崩溃恢复模式:集群启动或 Leader 崩溃时,选举新 Leader
- 消息广播模式:正常工作时,Leader 将写请求广播给所有 Follower
消息广播流程
- 客户端发送写请求给 Leader(或由 Follower 转发给 Leader)
- Leader 为请求分配全局唯一递增的 ZXID
- Leader 向所有 Follower 发送 Proposal(提案)
- Follower 将 Proposal 写入本地事务日志,返回 ACK
- Leader 收到超过半数 ACK 后,发送 Commit 消息
- Leader 和 Follower 提交事务,更新内存数据
ZXID 结构
ZXID 是 64 位整数,分为两部分:
- 高 32 位:epoch(纪元),每次 Leader 选举后递增
- 低 32 位:counter(计数器),每次事务递增,新 epoch 时重置为 0
典型应用场景
配置中心
利用 Zookeeper 的 Watcher 机制实现配置的动态推送:
public class ConfigCenter {
private ZooKeeper zk;
private static final String CONFIG_PATH = "/config";
public void watchConfig(String key, ConfigChangeListener listener) throws Exception {
String path = CONFIG_PATH + "/" + key;
byte[] data = zk.getData(path, event -> {
if (event.getType() == EventType.NodeDataChanged) {
try {
byte[] newData = zk.getData(path, null, null);
listener.onChange(new String(newData));
// 重新注册 Watcher
watchConfig(key, listener);
} catch (Exception e) {
e.printStackTrace();
}
}
}, null);
listener.onChange(new String(data));
}
}
分布式锁
利用临时顺序节点实现公平的分布式锁:
public class DistributedLock {
private ZooKeeper zk;
private static final String LOCK_PATH = "/locks";
private String currentNode;
public void lock() throws Exception {
// 创建临时顺序节点
currentNode = zk.create(LOCK_PATH + "/lock-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
// 获取所有子节点并排序
List<String> children = zk.getChildren(LOCK_PATH, false);
Collections.sort(children);
String nodeName = currentNode.substring(LOCK_PATH.length() + 1);
int index = children.indexOf(nodeName);
if (index == 0) {
// 当前节点是最小节点,获得锁
return;
}
// 监听前一个节点
String prevNode = LOCK_PATH + "/" + children.get(index - 1);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(prevNode, event -> {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await(); // 等待前一个节点释放
}
}
}
public void unlock() throws Exception {
zk.delete(currentNode, -1);
}
}
服务注册与发现
public class ServiceRegistry {
private ZooKeeper zk;
private static final String REGISTRY_PATH = "/services";
// 服务注册(创建临时节点,服务下线自动删除)
public void register(String serviceName, String address) throws Exception {
String servicePath = REGISTRY_PATH + "/" + serviceName;
if (zk.exists(servicePath, false) == null) {
zk.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zk.create(servicePath + "/" + address, address.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// 服务发现(获取所有可用实例)
public List<String> discover(String serviceName) throws Exception {
String servicePath = REGISTRY_PATH + "/" + serviceName;
return zk.getChildren(servicePath, event -> {
// 监听服务列表变化,自动刷新
try {
discover(serviceName);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
Leader 选举实现
public class LeaderElection {
private ZooKeeper zk;
private static final String ELECTION_PATH = "/election";
private String currentNode;
public boolean isLeader() throws Exception {
List<String> children = zk.getChildren(ELECTION_PATH, false);
Collections.sort(children);
String nodeName = currentNode.substring(ELECTION_PATH.length() + 1);
return children.get(0).equals(nodeName);
}
public void participate() throws Exception {
currentNode = zk.create(ELECTION_PATH + "/node-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
if (isLeader()) {
System.out.println("I am the leader: " + currentNode);
} else {
watchPreviousNode();
}
}
private void watchPreviousNode() throws Exception {
List<String> children = zk.getChildren(ELECTION_PATH, false);
Collections.sort(children);
String nodeName = currentNode.substring(ELECTION_PATH.length() + 1);
int index = children.indexOf(nodeName);
String prevNode = ELECTION_PATH + "/" + children.get(index - 1);
zk.exists(prevNode, event -> {
if (event.getType() == EventType.NodeDeleted) {
try {
if (isLeader()) {
System.out.println("I am the new leader: " + currentNode);
} else {
watchPreviousNode();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
性能优化
JVM 参数优化
# 推荐 JVM 参数
-Xms4g
-Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/zookeeper/heap.hprof
系统参数优化
# 增加文件描述符限制
ulimit -n 65535
# 调整 TCP 参数
echo "net.ipv4.tcp_tw_reuse = 1" >> /etc/sysctl.conf
echo "net.ipv4.tcp_fin_timeout = 30" >> /etc/sysctl.conf
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
sysctl -p
配置参数优化
# zoo.cfg 性能相关配置
tickTime=2000
# 数据快照间隔(事务数)
snapCount=100000
# 预分配日志文件大小(KB)
preAllocSize=65536
# 最大客户端连接数
maxClientCnxns=1000
# 跳过 ACL 检查(提升性能,牺牲安全)
skipACL=yes
# 强制同步写磁盘
forceSync=no
# 自动清理快照
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| avg_latency | 平均请求延迟 | > 100ms |
| max_latency | 最大请求延迟 | > 1000ms |
| outstanding_requests | 待处理请求数 | > 100 |
| num_alive_connections | 活跃连接数 | 接近 maxClientCnxns |
| watch_count | 注册的 Watcher 数量 | > 100000 |
| znode_count | ZNode 总数 | > 1000000 |
# 通过 4 字命令查看监控信息
echo "mntr" | nc localhost 2181 # 监控信息
echo "stat" | nc localhost 2181 # 状态信息
echo "ruok" | nc localhost 2181 # 健康检查
echo "dump" | nc localhost 2181 # 会话和临时节点
最佳实践
节点设计
- 节点路径不宜过深:层级过深影响性能,建议不超过 5 层
- 控制数据大小:单个 ZNode 数据不超过 1MB,建议 1KB 以内
- 合理使用临时节点:利用临时节点实现自动清理
- 批量操作:使用 multi 接口批量执行操作,保证原子性
Watcher 最佳实践
- 及时重新注册:Watcher 触发后立即重新注册,避免遗漏事件
- 异步处理:Watcher 回调中避免耗时操作,防止阻塞
- 幂等处理:Watcher 回调应设计为幂等,防止重复执行
- 控制数量:避免注册过多 Watcher,影响服务端性能
集群部署
- 奇数个节点:3、5、7 个节点,避免脑裂,保证选举成功
- 跨机架部署:节点分布在不同机架,提高容灾能力
- 独立磁盘:事务日志和数据快照使用独立磁盘,避免 IO 竞争
- 定期备份:定期备份数据快照,防止数据丢失
安全配置
- 启用认证:生产环境使用 digest 或 Kerberos 认证
- 设置 ACL:对敏感路径设置严格的访问控制
- 启用 TLS:加密客户端与服务端通信
- 网络隔离:限制 Zookeeper 端口的访问来源
总结
Zookeeper 是构建分布式系统的重要基础设施,提供了简单而强大的协调原语。本文介绍了 Zookeeper 的核心知识:
- 数据模型:树形 ZNode 结构,支持持久和临时两类节点
- Watcher 机制:一次性、异步的事件通知机制
- 会话管理:客户端与服务端的连接生命周期
- ACL 权限控制:灵活的访问控制机制
- 集群架构:Leader-Follower 架构,ZAB 协议保证一致性
- 典型应用:配置中心、分布式锁、服务注册与发现、Leader 选举
- 性能优化:JVM、系统参数、配置调优
掌握 Zookeeper 可以帮助我们构建可靠的分布式系统,解决分布式环境下的协调难题。在实际应用中,建议根据具体场景选择合适的使用模式,并遵循最佳实践以达到最佳效果。