ZooKeeper分布式协调系统
ZooKeeper分布式协调系统
概述
Apache ZooKeeper是一个开源的分布式协调服务,为分布式应用提供一致性服务。它提供了简单的原语集合,分布式应用可以基于这些原语实现更高级的服务,如同步、配置管理、集群管理和命名服务。
核心概念
数据模型
ZNode结构
ZooKeeper的数据模型是一个层次化的命名空间,类似于文件系统。
java
// ZNode类型
public enum ZNodeType {
PERSISTENT, // 持久节点
PERSISTENT_SEQUENTIAL, // 持久顺序节点
EPHEMERAL, // 临时节点
EPHEMERAL_SEQUENTIAL // 临时顺序节点
}
// ZNode示例
public class ZNodeExample {
private ZooKeeper zk;
public void createNodes() throws Exception {
// 创建持久节点
zk.create("/app", "app data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建临时节点
zk.create("/app/session", "session data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建顺序节点
String sequentialPath = zk.create("/app/task-", "task data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("Created: " + sequentialPath); // /app/task-0000000001
}
}
// ZNode类型
public enum ZNodeType {
PERSISTENT, // 持久节点
PERSISTENT_SEQUENTIAL, // 持久顺序节点
EPHEMERAL, // 临时节点
EPHEMERAL_SEQUENTIAL // 临时顺序节点
}
// ZNode示例
public class ZNodeExample {
private ZooKeeper zk;
public void createNodes() throws Exception {
// 创建持久节点
zk.create("/app", "app data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建临时节点
zk.create("/app/session", "session data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建顺序节点
String sequentialPath = zk.create("/app/task-", "task data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("Created: " + sequentialPath); // /app/task-0000000001
}
}
数据特性
- 小数据:每个ZNode最多存储1MB数据
- 版本控制:每次修改都会增加版本号
- ACL控制:访问控制列表
- 统计信息:创建时间、修改时间、子节点数量等
会话机制
java
public class ZooKeeperSession {
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws Exception {
zk = new ZooKeeper(hosts, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
}
public void close() throws InterruptedException {
zk.close();
}
}
public class ZooKeeperSession {
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws Exception {
zk = new ZooKeeper(hosts, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
}
public void close() throws InterruptedException {
zk.close();
}
}
监听机制(Watcher)
java
public class ZooKeeperWatcher implements Watcher {
private ZooKeeper zk;
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + " " + event.getPath());
if (event.getType() == Event.EventType.NodeDataChanged) {
// 节点数据变化
handleDataChange(event.getPath());
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
// 子节点变化
handleChildrenChange(event.getPath());
}
// 重新设置监听(Watcher是一次性的)
try {
zk.exists(event.getPath(), this);
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleDataChange(String path) {
try {
byte[] data = zk.getData(path, false, null);
System.out.println("Data changed: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleChildrenChange(String path) {
try {
List<String> children = zk.getChildren(path, false);
System.out.println("Children changed: " + children);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZooKeeperWatcher implements Watcher {
private ZooKeeper zk;
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + " " + event.getPath());
if (event.getType() == Event.EventType.NodeDataChanged) {
// 节点数据变化
handleDataChange(event.getPath());
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
// 子节点变化
handleChildrenChange(event.getPath());
}
// 重新设置监听(Watcher是一次性的)
try {
zk.exists(event.getPath(), this);
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleDataChange(String path) {
try {
byte[] data = zk.getData(path, false, null);
System.out.println("Data changed: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleChildrenChange(String path) {
try {
List<String> children = zk.getChildren(path, false);
System.out.println("Children changed: " + children);
} catch (Exception e) {
e.printStackTrace();
}
}
}
典型应用场景
1. 分布式锁
java
public class DistributedLock {
private ZooKeeper zk;
private String lockPath;
private String currentPath;
private CountDownLatch latch;
public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}
public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
return attemptLock(timeout, unit);
}
private boolean attemptLock(long timeout, TimeUnit unit) throws Exception {
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
if (index == 0) {
// 获得锁
return true;
} else {
// 监听前一个节点
String prevPath = lockPath + "/" + children.get(index - 1);
latch = new CountDownLatch(1);
Stat stat = zk.exists(prevPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
}
});
if (stat == null) {
// 前一个节点已删除,重新尝试
return attemptLock(timeout, unit);
} else {
// 等待前一个节点删除
return latch.await(timeout, unit);
}
}
}
public void unlock() throws Exception {
if (currentPath != null) {
zk.delete(currentPath, -1);
currentPath = null;
}
}
}
public class DistributedLock {
private ZooKeeper zk;
private String lockPath;
private String currentPath;
private CountDownLatch latch;
public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}
public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
return attemptLock(timeout, unit);
}
private boolean attemptLock(long timeout, TimeUnit unit) throws Exception {
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
if (index == 0) {
// 获得锁
return true;
} else {
// 监听前一个节点
String prevPath = lockPath + "/" + children.get(index - 1);
latch = new CountDownLatch(1);
Stat stat = zk.exists(prevPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
}
});
if (stat == null) {
// 前一个节点已删除,重新尝试
return attemptLock(timeout, unit);
} else {
// 等待前一个节点删除
return latch.await(timeout, unit);
}
}
}
public void unlock() throws Exception {
if (currentPath != null) {
zk.delete(currentPath, -1);
currentPath = null;
}
}
}
2. 服务发现
java
public class ServiceDiscovery {
private ZooKeeper zk;
private String servicePath;
private List<String> serviceList = new ArrayList<>();
public ServiceDiscovery(ZooKeeper zk, String servicePath) {
this.zk = zk;
this.servicePath = servicePath;
}
// 注册服务
public void registerService(String serviceInfo) throws Exception {
String serviceName = zk.create(servicePath + "/service-",
serviceInfo.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Service registered: " + serviceName);
}
// 发现服务
public List<String> discoverServices() throws Exception {
List<String> children = zk.getChildren(servicePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
updateServiceList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
updateServiceList();
return new ArrayList<>(serviceList);
}
private void updateServiceList() throws Exception {
List<String> children = zk.getChildren(servicePath, false);
serviceList.clear();
for (String child : children) {
byte[] data = zk.getData(servicePath + "/" + child, false, null);
serviceList.add(new String(data));
}
System.out.println("Service list updated: " + serviceList);
}
}
public class ServiceDiscovery {
private ZooKeeper zk;
private String servicePath;
private List<String> serviceList = new ArrayList<>();
public ServiceDiscovery(ZooKeeper zk, String servicePath) {
this.zk = zk;
this.servicePath = servicePath;
}
// 注册服务
public void registerService(String serviceInfo) throws Exception {
String serviceName = zk.create(servicePath + "/service-",
serviceInfo.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Service registered: " + serviceName);
}
// 发现服务
public List<String> discoverServices() throws Exception {
List<String> children = zk.getChildren(servicePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
updateServiceList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
updateServiceList();
return new ArrayList<>(serviceList);
}
private void updateServiceList() throws Exception {
List<String> children = zk.getChildren(servicePath, false);
serviceList.clear();
for (String child : children) {
byte[] data = zk.getData(servicePath + "/" + child, false, null);
serviceList.add(new String(data));
}
System.out.println("Service list updated: " + serviceList);
}
}
3. 配置管理
java
public class ConfigurationManager {
private ZooKeeper zk;
private String configPath;
private Map<String, String> configCache = new ConcurrentHashMap<>();
private List<ConfigChangeListener> listeners = new ArrayList<>();
public interface ConfigChangeListener {
void onConfigChanged(String key, String value);
}
public ConfigurationManager(ZooKeeper zk, String configPath) {
this.zk = zk;
this.configPath = configPath;
loadConfigurations();
}
public void setConfig(String key, String value) throws Exception {
String path = configPath + "/" + key;
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(), stat.getVersion());
}
}
public String getConfig(String key) {
return configCache.get(key);
}
private void loadConfigurations() {
try {
List<String> children = zk.getChildren(configPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
loadConfigurations();
}
}
});
for (String child : children) {
watchConfigNode(child);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void watchConfigNode(String key) throws Exception {
String path = configPath + "/" + key;
byte[] data = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
watchConfigNode(key);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, null);
String value = new String(data);
String oldValue = configCache.put(key, value);
if (!value.equals(oldValue)) {
notifyListeners(key, value);
}
}
private void notifyListeners(String key, String value) {
for (ConfigChangeListener listener : listeners) {
listener.onConfigChanged(key, value);
}
}
public void addListener(ConfigChangeListener listener) {
listeners.add(listener);
}
}
public class ConfigurationManager {
private ZooKeeper zk;
private String configPath;
private Map<String, String> configCache = new ConcurrentHashMap<>();
private List<ConfigChangeListener> listeners = new ArrayList<>();
public interface ConfigChangeListener {
void onConfigChanged(String key, String value);
}
public ConfigurationManager(ZooKeeper zk, String configPath) {
this.zk = zk;
this.configPath = configPath;
loadConfigurations();
}
public void setConfig(String key, String value) throws Exception {
String path = configPath + "/" + key;
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(), stat.getVersion());
}
}
public String getConfig(String key) {
return configCache.get(key);
}
private void loadConfigurations() {
try {
List<String> children = zk.getChildren(configPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
loadConfigurations();
}
}
});
for (String child : children) {
watchConfigNode(child);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void watchConfigNode(String key) throws Exception {
String path = configPath + "/" + key;
byte[] data = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
watchConfigNode(key);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, null);
String value = new String(data);
String oldValue = configCache.put(key, value);
if (!value.equals(oldValue)) {
notifyListeners(key, value);
}
}
private void notifyListeners(String key, String value) {
for (ConfigChangeListener listener : listeners) {
listener.onConfigChanged(key, value);
}
}
public void addListener(ConfigChangeListener listener) {
listeners.add(listener);
}
}
4. 集群管理
java
public class ClusterManager {
private ZooKeeper zk;
private String clusterPath;
private String nodeId;
private boolean isLeader = false;
public ClusterManager(ZooKeeper zk, String clusterPath, String nodeId) {
this.zk = zk;
this.clusterPath = clusterPath;
this.nodeId = nodeId;
}
public void joinCluster() throws Exception {
// 创建集群路径
Stat stat = zk.exists(clusterPath, false);
if (stat == null) {
zk.create(clusterPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 注册节点
String nodePath = clusterPath + "/" + nodeId;
zk.create(nodePath, nodeId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 选举Leader
electLeader();
}
private void electLeader() throws Exception {
List<String> children = zk.getChildren(clusterPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
electLeader();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
Collections.sort(children);
if (!children.isEmpty()) {
String leader = children.get(0);
boolean wasLeader = isLeader;
isLeader = leader.equals(nodeId);
if (isLeader && !wasLeader) {
onBecomeLeader();
} else if (!isLeader && wasLeader) {
onLoseLeadership();
}
}
}
private void onBecomeLeader() {
System.out.println("Node " + nodeId + " became leader");
// 执行Leader职责
}
private void onLoseLeadership() {
System.out.println("Node " + nodeId + " lost leadership");
// 停止Leader职责
}
public boolean isLeader() {
return isLeader;
}
}
public class ClusterManager {
private ZooKeeper zk;
private String clusterPath;
private String nodeId;
private boolean isLeader = false;
public ClusterManager(ZooKeeper zk, String clusterPath, String nodeId) {
this.zk = zk;
this.clusterPath = clusterPath;
this.nodeId = nodeId;
}
public void joinCluster() throws Exception {
// 创建集群路径
Stat stat = zk.exists(clusterPath, false);
if (stat == null) {
zk.create(clusterPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 注册节点
String nodePath = clusterPath + "/" + nodeId;
zk.create(nodePath, nodeId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 选举Leader
electLeader();
}
private void electLeader() throws Exception {
List<String> children = zk.getChildren(clusterPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
electLeader();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
Collections.sort(children);
if (!children.isEmpty()) {
String leader = children.get(0);
boolean wasLeader = isLeader;
isLeader = leader.equals(nodeId);
if (isLeader && !wasLeader) {
onBecomeLeader();
} else if (!isLeader && wasLeader) {
onLoseLeadership();
}
}
}
private void onBecomeLeader() {
System.out.println("Node " + nodeId + " became leader");
// 执行Leader职责
}
private void onLoseLeadership() {
System.out.println("Node " + nodeId + " lost leadership");
// 停止Leader职责
}
public boolean isLeader() {
return isLeader;
}
}
ZooKeeper集群
集群配置
properties
# zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
# 集群配置
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
# zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
# 集群配置
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
集群特性
- 奇数节点:避免脑裂问题
- 过半存活:集群可用的最小条件
- 数据一致性:通过ZAB协议保证
- 自动故障转移:Leader故障时自动选举新Leader
性能优化
1. 客户端优化
java
public class ZooKeeperOptimization {
// 连接池
public class ZooKeeperPool {
private Queue<ZooKeeper> pool = new ConcurrentLinkedQueue<>();
private int maxSize = 10;
public ZooKeeper borrowConnection() throws Exception {
ZooKeeper zk = pool.poll();
if (zk == null || !zk.getState().isConnected()) {
zk = createConnection();
}
return zk;
}
public void returnConnection(ZooKeeper zk) {
if (pool.size() < maxSize && zk.getState().isConnected()) {
pool.offer(zk);
} else {
try {
zk.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// 批量操作
public void batchOperations() throws Exception {
ZooKeeper zk = getZooKeeper();
List<Op> ops = Arrays.asList(
Op.create("/batch/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.create("/batch/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.setData("/batch/node1", "newdata1".getBytes(), -1)
);
List<OpResult> results = zk.multi(ops);
// 处理结果
}
}
public class ZooKeeperOptimization {
// 连接池
public class ZooKeeperPool {
private Queue<ZooKeeper> pool = new ConcurrentLinkedQueue<>();
private int maxSize = 10;
public ZooKeeper borrowConnection() throws Exception {
ZooKeeper zk = pool.poll();
if (zk == null || !zk.getState().isConnected()) {
zk = createConnection();
}
return zk;
}
public void returnConnection(ZooKeeper zk) {
if (pool.size() < maxSize && zk.getState().isConnected()) {
pool.offer(zk);
} else {
try {
zk.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// 批量操作
public void batchOperations() throws Exception {
ZooKeeper zk = getZooKeeper();
List<Op> ops = Arrays.asList(
Op.create("/batch/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.create("/batch/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.setData("/batch/node1", "newdata1".getBytes(), -1)
);
List<OpResult> results = zk.multi(ops);
// 处理结果
}
}
2. 服务器优化
bash
# JVM参数优化
-Xms4g -Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50
# ZooKeeper参数优化
maxClientCnxns=200
snapCount=100000
preAllocSize=65536
# JVM参数优化
-Xms4g -Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50
# ZooKeeper参数优化
maxClientCnxns=200
snapCount=100000
preAllocSize=65536
监控和运维
1. 四字命令
bash
# 查看状态
echo stat | nc localhost 2181
# 查看配置
echo conf | nc localhost 2181
# 查看连接
echo cons | nc localhost 2181
# 查看监听
echo wchs | nc localhost 2181
# 查看状态
echo stat | nc localhost 2181
# 查看配置
echo conf | nc localhost 2181
# 查看连接
echo cons | nc localhost 2181
# 查看监听
echo wchs | nc localhost 2181
2. JMX监控
java
// 启用JMX
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
// 启用JMX
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
最佳实践
- 合理设计ZNode结构:避免过深的层次
- 控制数据大小:每个ZNode数据不超过1MB
- 正确使用临时节点:避免会话超时导致的问题
- 监听器管理:及时移除不需要的监听器
- 异常处理:处理连接断开、会话过期等异常
- 性能监控:监控延迟、吞吐量等指标
- 备份策略:定期备份ZooKeeper数据
总结
ZooKeeper作为分布式协调服务的经典实现,在分布式系统中发挥着重要作用。虽然在某些场景下被更现代的解决方案替代,但其简单可靠的设计理念和丰富的应用模式仍然值得学习和借鉴。