1 分布式同步问题概述
在分布式系统中,常见的同步问题:
互斥访问:多个进程同时访问共享资源时需要保证互斥。
任务协调(Barrier):多个进程必须同时达到某个条件才能继续执行。
顺序执行:保证任务按照严格的顺序处理。
Leader 选举:多个节点中选出唯一主节点负责协调。
ZooKeeper 提供的分布式协调能力是基于以下核心机制:
临时节点(Ephemeral Node):保证客户端断开时自动释放资源。
顺序节点(Sequential Node):保证节点创建顺序,实现公平性。
Watcher 机制:事件通知,避免轮询,提高性能。
版本号(Stat.version):CAS(Compare And Set)操作,保证数据一致性。
这些机制组合,提供了轻量级的分布式同步原语。
2 分布式锁
2.1 基本原理
分布式锁的实现思路:
客户端在 /locks 下创建一个临时顺序节点。
客户端获取 /locks 下所有节点,按顺序排序。
如果自己是最小节点 → 获得锁。
否则监听比自己小的节点删除事件,等待锁释放。
session 断开 → 节点自动删除 → 锁释放。
优点:
临时节点保证异常客户端锁自动释放
顺序节点保证公平性
前置节点监听避免惊群效应
2.2 锁实现步骤
String lockRoot = "/locks";
String lockPath = zk.create(lockRoot + "/lock-", "thread1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren(lockRoot, false);
Collections.sort(children);
String nodeName = lockPath.substring(lockRoot.length() + 1);
if (nodeName.equals(children.get(0))) {
System.out.println("获取锁成功");
} else {
int index = children.indexOf(nodeName);
String prevNode = children.get(index - 1);
zk.exists(lockRoot + "/" + prevNode, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("前置节点释放,尝试获取锁");
}
});
}
2.3 实现分析
临时顺序节点保证锁在客户端异常断开时释放,并保证公平锁。
Watcher 监听前置节点,减少事件通知量,避免惊群效应。
CAS 可以用在锁相关元数据更新,保证原子操作。
注意监听节点被删除后可能立即有更小节点 → 需要循环检测。
3 分布式 Barrier
3.1 Barrier 原理
Barrier 根节点 /barrier
每个参与者创建临时节点 /barrier/nodeX
根节点子节点数量达到预设数量 → Barrier 成功
使用 Watcher 通知其他节点继续执行
3.2 Barrier 实现示例
String barrierRoot = "/barrier";
int totalNodes = 3;
String nodePath = zk.create(barrierRoot + "/node-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
List<String> children = zk.getChildren(barrierRoot, event -> {
System.out.println("Barrier 状态变化: " + event.getType());
});
if (children.size() >= totalNodes) {
System.out.println("所有节点到达,Barrier 条件满足");
break;
}
Thread.sleep(100);
}
3.3 注意事项
临时节点保证异常退出自动释放
Watcher 一次性触发,需要重新注册
节点数量过大可能影响性能,可以使用分层 Barrier
4 分布式队列
4.1 原理
队列根节点 /queue
生产者创建顺序节点 /queue/task-000000001
消费者获取子节点列表并排序
消费最小节点并删除
顺序节点保证 FIFO
4.2 生产者示例
String queueRoot = "/queue";
String task = "task1";
String taskPath = zk.create(queueRoot + "/task-", task.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("任务加入队列: " + taskPath);
4.3 消费者示例
List<String> tasks = zk.getChildren(queueRoot, false);
Collections.sort(tasks);
if (!tasks.isEmpty()) {
String taskNode = tasks.get(0);
byte[] data = zk.getData(queueRoot + "/" + taskNode, false, null);
System.out.println("处理任务: " + new String(data));
zk.delete(queueRoot + "/" + taskNode, -1);
}
4.4 优化和注意事项
Watcher 可以注册在根节点,实现任务到达异步通知
节点数量过多可能影响 getChildren 性能 → 分段或分页处理
任务失败可重新加入队列处理
5 Leader 选举
5.1 原理
候选节点在 /leader 下创建临时顺序节点
获取最小节点 → Leader
其他节点监听前置节点删除
Leader 异常退出 → 最小节点继承成为新的 Leader
5.2 示例实现
String leaderRoot = "/leader";
String nodePath = zk.create(leaderRoot + "/node-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> nodes = zk.getChildren(leaderRoot, false);
Collections.sort(nodes);
String nodeName = nodePath.substring(leaderRoot.length() + 1);
if (nodeName.equals(nodes.get(0))) {
System.out.println("成为 Leader");
} else {
int index = nodes.indexOf(nodeName);
String prevNode = nodes.get(index - 1);
zk.exists(leaderRoot + "/" + prevNode, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("前置节点释放,尝试成为 Leader");
}
});
}
5.3 注意事项
临时节点保证 Leader 异常退出自动释放
前置节点监听减少惊群效应
Watcher 异步回调可能导致延迟,可结合循环检测
6 ZooKeeper 同步原语设计总结
核心机制总结:
临时节点保证异常退出自动释放资源
顺序节点保证公平性 / FIFO / Leader 选举
Watcher 实现事件驱动,避免轮询
版本控制(Stat.version)实现 CAS 原子操作
实践经验:
节点数量不要过多,避免 getChildren 性能瓶颈
Watcher 一次性触发 → 关键操作需循环检测
临时节点 + 顺序节点组合 → 可实现锁、队列、Leader、Barrier
异常处理非常重要,包括 session 超时、节点删除异常