Zookeeper 深度解析

Apache Zookeeper 是一个开源的分布式协调服务,为分布式应用提供一致性服务。它最初由 Yahoo 开发,后成为 Apache 顶级项目。本文将深入剖析 Zookeeper 的核心概念、数据模型、Watch 机制以及集群架构。

Zookeeper 简介

Zookeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google Chubby 的开源实现。它提供了配置维护、命名服务、分布式同步、组服务等功能。

Zookeeper 的核心特性:

  • 一致性:所有节点在同一时刻看到相同的数据
  • 原子性:操作要么成功要么失败,不存在中间状态
  • 可靠性:数据持久化,不因单点故障丢失
  • 实时性:客户端能在一定时间内获取最新数据
  • 顺序性:来自同一客户端的更新按发送顺序执行

数据模型

ZNode 结构

Zookeeper 的数据模型类似于文件系统,由 ZNode(节点)组成树形结构。每个 ZNode 都可以存储数据,也可以有子节点。

/
├── config
│   ├── database.properties
│   └── cache.properties
├── services
│   ├── service1
│   └── service2
└── locks
    ├── lock1
    └── lock2

ZNode 类型

类型 说明 特点
持久节点(PERSISTENT) 创建后一直存在 除非显式删除
临时节点(EPHEMERAL) 与会话绑定 会话结束自动删除
持久顺序节点(PERSISTENT_SEQUENTIAL) 持久节点 + 顺序编号 名称自动追加数字
临时顺序节点(EPHEMERAL_SEQUENTIAL) 临时节点 + 顺序编号 会话结束自动删除
// 创建持久节点
zk.create("/persistent", "data".getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// 创建临时节点
zk.create("/ephemeral", "data".getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

// 创建持久顺序节点
zk.create("/persistent-seq", "data".getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

// 创建临时顺序节点
zk.create("/ephemeral-seq", "data".getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

ZNode 属性

每个 ZNode 包含以下元数据(Stat 结构):

  • czxid:节点创建时的事务 ID
  • mzxid:节点最后修改时的事务 ID
  • ctime:节点创建时间
  • mtime:节点最后修改时间
  • version:数据版本号
  • cversion:子节点版本号
  • aversion:ACL 版本号
  • dataLength:数据长度(最大 1MB)
  • numChildren:直接子节点数量
// 获取 ZNode 数据和元数据
Stat stat = new Stat();
byte[] data = zk.getData("/path", false, stat);
System.out.println("Version: " + stat.getVersion());
System.out.println("Ctime: " + stat.getCtime());
System.out.println("Mtime: " + stat.getMtime());
System.out.println("DataLength: " + stat.getDataLength());

Watcher 机制

Watcher 是 Zookeeper 的核心机制,用于监听 ZNode 的变化并异步通知客户端。

Watcher 类型

事件类型 触发条件
NodeCreated 节点被创建
NodeDeleted 节点被删除
NodeDataChanged 节点数据发生变化
NodeChildrenChanged 子节点列表发生变化
None 连接状态发生变化

Watcher 使用

// 创建 Watcher
Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println("Event type: " + event.getType());
        System.out.println("Event path: " + event.getPath());
        System.out.println("Keeper state: " + event.getState());
        // 重新注册 Watcher(Watcher 是一次性的)
        try {
            zk.getData(event.getPath(), this, null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
};

// 监听节点数据变化
zk.getData("/path", watcher, stat);

// 监听子节点变化
zk.getChildren("/path", watcher);

// 监听节点是否存在
zk.exists("/path", watcher);

Watcher 特点

  • 一次性:Watcher 触发后自动失效,需要重新注册
  • 异步通知:服务端异步通知客户端,不阻塞
  • 轻量级:只通知发生了变化,不携带变化的具体内容
  • 顺序性:同一客户端的 Watcher 按顺序触发

会话管理

会话生命周期

客户端与 Zookeeper 服务器之间的连接即为一个会话(Session)。会话状态转换如下:

CONNECTING → CONNECTED → CLOSED
                ↓
           CONNECTING(重连)

会话状态

状态 说明
CONNECTING 正在连接服务器
CONNECTED 已成功连接
RECONNECTING 连接断开,正在重连
RECONNECTED 重连成功
CLOSED 会话已关闭
AUTH_FAILED 认证失败
// 创建会话,设置超时时间(毫秒)
int sessionTimeout = 30000; // 30 秒
ZooKeeper zk = new ZooKeeper("localhost:2181", sessionTimeout, watcher);

// 获取会话 ID 和超时时间
long sessionId = zk.getSessionId();
int timeout = zk.getSessionTimeout();
System.out.println("Session ID: " + sessionId);
System.out.println("Session timeout: " + timeout);

ACL 权限控制

ACL 组成

ACL(Access Control List)由三部分组成:

  • scheme:权限模式,定义认证方式
  • id:权限对象,具体的认证信息
  • permission:权限类型,允许的操作

权限模式

模式 说明
world 开放模式,所有人可访问
auth 认证模式,已认证用户可访问
digest 用户名密码认证模式
ip IP 地址认证模式
x509 证书认证模式

权限类型

权限 缩写 说明
CREATE c 创建子节点
READ r 读取节点数据和子节点列表
WRITE w 修改节点数据
DELETE d 删除子节点
ADMIN a 设置 ACL
// 设置 world 权限(所有人可读写)
zk.create("/open", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// 设置 digest 权限
List<ACL> acls = new ArrayList<>();
ACL acl = new ACL(ZooDefs.Perms.ALL,
    new Id("digest", DigestAuthenticationProvider.generateDigest("user:password")));
acls.add(acl);
zk.create("/secure", data, acls, CreateMode.PERSISTENT);

// 添加认证信息
zk.addAuthInfo("digest", "user:password".getBytes());

集群架构

集群角色

Zookeeper 集群采用 Leader-Follower 架构,通常部署奇数个节点以保证选举成功。

角色 职责
Leader 处理所有写请求,协调集群,发起投票
Follower 处理读请求,转发写请求给 Leader,参与选举投票
Observer 处理读请求,不参与选举投票,用于扩展读性能

Leader 选举

Zookeeper 使用 ZAB(ZooKeeper Atomic Broadcast)协议进行 Leader 选举。选举过程:

  1. 集群启动或 Leader 崩溃,所有节点进入 LOOKING 状态
  2. 每个节点发起投票,初始投票给自己(myid, zxid)
  3. 节点之间交换投票信息
  4. 根据规则更新投票:优先选 zxid 大的,zxid 相同则选 myid 大的
  5. 超过半数节点投票给同一节点,该节点成为 Leader
  6. 其他节点成为 Follower 或 Observer

集群配置

# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
dataLogDir=/var/lib/zookeeper/log
clientPort=2181
maxClientCnxns=1000

# 集群节点配置(格式:server.id=host:quorumPort:leaderElectionPort)
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
# 在各节点 dataDir 目录下创建 myid 文件
echo "1" > /var/lib/zookeeper/myid  # 节点1
echo "2" > /var/lib/zookeeper/myid  # 节点2
echo "3" > /var/lib/zookeeper/myid  # 节点3

ZAB 协议

ZAB 概述

ZAB(ZooKeeper Atomic Broadcast)是 Zookeeper 专门设计的原子广播协议,用于保证分布式数据的一致性。ZAB 包含两种模式:

  • 崩溃恢复模式:集群启动或 Leader 崩溃时,选举新 Leader
  • 消息广播模式:正常工作时,Leader 将写请求广播给所有 Follower

消息广播流程

  1. 客户端发送写请求给 Leader(或由 Follower 转发给 Leader)
  2. Leader 为请求分配全局唯一递增的 ZXID
  3. Leader 向所有 Follower 发送 Proposal(提案)
  4. Follower 将 Proposal 写入本地事务日志,返回 ACK
  5. Leader 收到超过半数 ACK 后,发送 Commit 消息
  6. Leader 和 Follower 提交事务,更新内存数据

ZXID 结构

ZXID 是 64 位整数,分为两部分:

  • 高 32 位:epoch(纪元),每次 Leader 选举后递增
  • 低 32 位:counter(计数器),每次事务递增,新 epoch 时重置为 0

典型应用场景

配置中心

利用 Zookeeper 的 Watcher 机制实现配置的动态推送:

public class ConfigCenter {
    private ZooKeeper zk;
    private static final String CONFIG_PATH = "/config";

    public void watchConfig(String key, ConfigChangeListener listener) throws Exception {
        String path = CONFIG_PATH + "/" + key;
        byte[] data = zk.getData(path, event -> {
            if (event.getType() == EventType.NodeDataChanged) {
                try {
                    byte[] newData = zk.getData(path, null, null);
                    listener.onChange(new String(newData));
                    // 重新注册 Watcher
                    watchConfig(key, listener);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, null);
        listener.onChange(new String(data));
    }
}

分布式锁

利用临时顺序节点实现公平的分布式锁:

public class DistributedLock {
    private ZooKeeper zk;
    private static final String LOCK_PATH = "/locks";
    private String currentNode;

    public void lock() throws Exception {
        // 创建临时顺序节点
        currentNode = zk.create(LOCK_PATH + "/lock-", null,
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        while (true) {
            // 获取所有子节点并排序
            List<String> children = zk.getChildren(LOCK_PATH, false);
            Collections.sort(children);

            String nodeName = currentNode.substring(LOCK_PATH.length() + 1);
            int index = children.indexOf(nodeName);

            if (index == 0) {
                // 当前节点是最小节点,获得锁
                return;
            }

            // 监听前一个节点
            String prevNode = LOCK_PATH + "/" + children.get(index - 1);
            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(prevNode, event -> {
                if (event.getType() == EventType.NodeDeleted) {
                    latch.countDown();
                }
            });

            if (stat != null) {
                latch.await(); // 等待前一个节点释放
            }
        }
    }

    public void unlock() throws Exception {
        zk.delete(currentNode, -1);
    }
}

服务注册与发现

public class ServiceRegistry {
    private ZooKeeper zk;
    private static final String REGISTRY_PATH = "/services";

    // 服务注册(创建临时节点,服务下线自动删除)
    public void register(String serviceName, String address) throws Exception {
        String servicePath = REGISTRY_PATH + "/" + serviceName;
        if (zk.exists(servicePath, false) == null) {
            zk.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        zk.create(servicePath + "/" + address, address.getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    // 服务发现(获取所有可用实例)
    public List<String> discover(String serviceName) throws Exception {
        String servicePath = REGISTRY_PATH + "/" + serviceName;
        return zk.getChildren(servicePath, event -> {
            // 监听服务列表变化,自动刷新
            try {
                discover(serviceName);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

Leader 选举实现

public class LeaderElection {
    private ZooKeeper zk;
    private static final String ELECTION_PATH = "/election";
    private String currentNode;

    public boolean isLeader() throws Exception {
        List<String> children = zk.getChildren(ELECTION_PATH, false);
        Collections.sort(children);
        String nodeName = currentNode.substring(ELECTION_PATH.length() + 1);
        return children.get(0).equals(nodeName);
    }

    public void participate() throws Exception {
        currentNode = zk.create(ELECTION_PATH + "/node-", null,
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        if (isLeader()) {
            System.out.println("I am the leader: " + currentNode);
        } else {
            watchPreviousNode();
        }
    }

    private void watchPreviousNode() throws Exception {
        List<String> children = zk.getChildren(ELECTION_PATH, false);
        Collections.sort(children);
        String nodeName = currentNode.substring(ELECTION_PATH.length() + 1);
        int index = children.indexOf(nodeName);
        String prevNode = ELECTION_PATH + "/" + children.get(index - 1);

        zk.exists(prevNode, event -> {
            if (event.getType() == EventType.NodeDeleted) {
                try {
                    if (isLeader()) {
                        System.out.println("I am the new leader: " + currentNode);
                    } else {
                        watchPreviousNode();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

性能优化

JVM 参数优化

# 推荐 JVM 参数
-Xms4g
-Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/zookeeper/heap.hprof

系统参数优化

# 增加文件描述符限制
ulimit -n 65535

# 调整 TCP 参数
echo "net.ipv4.tcp_tw_reuse = 1" >> /etc/sysctl.conf
echo "net.ipv4.tcp_fin_timeout = 30" >> /etc/sysctl.conf
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
sysctl -p

配置参数优化

# zoo.cfg 性能相关配置
tickTime=2000
# 数据快照间隔(事务数)
snapCount=100000
# 预分配日志文件大小(KB)
preAllocSize=65536
# 最大客户端连接数
maxClientCnxns=1000
# 跳过 ACL 检查(提升性能,牺牲安全)
skipACL=yes
# 强制同步写磁盘
forceSync=no
# 自动清理快照
autopurge.snapRetainCount=3
autopurge.purgeInterval=1

监控指标

指标 说明 告警阈值
avg_latency 平均请求延迟 > 100ms
max_latency 最大请求延迟 > 1000ms
outstanding_requests 待处理请求数 > 100
num_alive_connections 活跃连接数 接近 maxClientCnxns
watch_count 注册的 Watcher 数量 > 100000
znode_count ZNode 总数 > 1000000
# 通过 4 字命令查看监控信息
echo "mntr" | nc localhost 2181  # 监控信息
echo "stat" | nc localhost 2181  # 状态信息
echo "ruok" | nc localhost 2181  # 健康检查
echo "dump" | nc localhost 2181  # 会话和临时节点

最佳实践

节点设计

  • 节点路径不宜过深:层级过深影响性能,建议不超过 5 层
  • 控制数据大小:单个 ZNode 数据不超过 1MB,建议 1KB 以内
  • 合理使用临时节点:利用临时节点实现自动清理
  • 批量操作:使用 multi 接口批量执行操作,保证原子性

Watcher 最佳实践

  • 及时重新注册:Watcher 触发后立即重新注册,避免遗漏事件
  • 异步处理:Watcher 回调中避免耗时操作,防止阻塞
  • 幂等处理:Watcher 回调应设计为幂等,防止重复执行
  • 控制数量:避免注册过多 Watcher,影响服务端性能

集群部署

  • 奇数个节点:3、5、7 个节点,避免脑裂,保证选举成功
  • 跨机架部署:节点分布在不同机架,提高容灾能力
  • 独立磁盘:事务日志和数据快照使用独立磁盘,避免 IO 竞争
  • 定期备份:定期备份数据快照,防止数据丢失

安全配置

  • 启用认证:生产环境使用 digest 或 Kerberos 认证
  • 设置 ACL:对敏感路径设置严格的访问控制
  • 启用 TLS:加密客户端与服务端通信
  • 网络隔离:限制 Zookeeper 端口的访问来源

总结

Zookeeper 是构建分布式系统的重要基础设施,提供了简单而强大的协调原语。本文介绍了 Zookeeper 的核心知识:

  1. 数据模型:树形 ZNode 结构,支持持久和临时两类节点
  2. Watcher 机制:一次性、异步的事件通知机制
  3. 会话管理:客户端与服务端的连接生命周期
  4. ACL 权限控制:灵活的访问控制机制
  5. 集群架构:Leader-Follower 架构,ZAB 协议保证一致性
  6. 典型应用:配置中心、分布式锁、服务注册与发现、Leader 选举
  7. 性能优化:JVM、系统参数、配置调优

掌握 Zookeeper 可以帮助我们构建可靠的分布式系统,解决分布式环境下的协调难题。在实际应用中,建议根据具体场景选择合适的使用模式,并遵循最佳实践以达到最佳效果。