ZooKeeper分布式协调系统

ZooKeeper分布式协调系统

概述

Apache ZooKeeper是一个开源的分布式协调服务,为分布式应用提供一致性服务。它提供了简单的原语集合,分布式应用可以基于这些原语实现更高级的服务,如同步、配置管理、集群管理和命名服务。

核心概念

数据模型

ZNode结构

ZooKeeper的数据模型是一个层次化的命名空间,类似于文件系统。

java
// ZNode类型
public enum ZNodeType {
    PERSISTENT,              // 持久节点
    PERSISTENT_SEQUENTIAL,   // 持久顺序节点
    EPHEMERAL,              // 临时节点
    EPHEMERAL_SEQUENTIAL    // 临时顺序节点
}

// ZNode示例
public class ZNodeExample {
    private ZooKeeper zk;
    
    public void createNodes() throws Exception {
        // 创建持久节点
        zk.create("/app", "app data".getBytes(), 
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        
        // 创建临时节点
        zk.create("/app/session", "session data".getBytes(),
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        
        // 创建顺序节点
        String sequentialPath = zk.create("/app/task-", "task data".getBytes(),
                                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println("Created: " + sequentialPath); // /app/task-0000000001
    }
}
// ZNode类型
public enum ZNodeType {
    PERSISTENT,              // 持久节点
    PERSISTENT_SEQUENTIAL,   // 持久顺序节点
    EPHEMERAL,              // 临时节点
    EPHEMERAL_SEQUENTIAL    // 临时顺序节点
}

// ZNode示例
public class ZNodeExample {
    private ZooKeeper zk;
    
    public void createNodes() throws Exception {
        // 创建持久节点
        zk.create("/app", "app data".getBytes(), 
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        
        // 创建临时节点
        zk.create("/app/session", "session data".getBytes(),
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        
        // 创建顺序节点
        String sequentialPath = zk.create("/app/task-", "task data".getBytes(),
                                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println("Created: " + sequentialPath); // /app/task-0000000001
    }
}

数据特性

  • 小数据:每个ZNode最多存储1MB数据
  • 版本控制:每次修改都会增加版本号
  • ACL控制:访问控制列表
  • 统计信息:创建时间、修改时间、子节点数量等

会话机制

java
public class ZooKeeperSession {
    private ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    
    public void connect(String hosts) throws Exception {
        zk = new ZooKeeper(hosts, 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
            }
        });
        
        connectedSignal.await();
    }
    
    public void close() throws InterruptedException {
        zk.close();
    }
}
public class ZooKeeperSession {
    private ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    
    public void connect(String hosts) throws Exception {
        zk = new ZooKeeper(hosts, 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
            }
        });
        
        connectedSignal.await();
    }
    
    public void close() throws InterruptedException {
        zk.close();
    }
}

监听机制(Watcher)

java
public class ZooKeeperWatcher implements Watcher {
    private ZooKeeper zk;
    
    @Override
    public void process(WatchedEvent event) {
        System.out.println("Event: " + event.getType() + " " + event.getPath());
        
        if (event.getType() == Event.EventType.NodeDataChanged) {
            // 节点数据变化
            handleDataChange(event.getPath());
        } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
            // 子节点变化
            handleChildrenChange(event.getPath());
        }
        
        // 重新设置监听(Watcher是一次性的)
        try {
            zk.exists(event.getPath(), this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handleDataChange(String path) {
        try {
            byte[] data = zk.getData(path, false, null);
            System.out.println("Data changed: " + new String(data));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handleChildrenChange(String path) {
        try {
            List<String> children = zk.getChildren(path, false);
            System.out.println("Children changed: " + children);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ZooKeeperWatcher implements Watcher {
    private ZooKeeper zk;
    
    @Override
    public void process(WatchedEvent event) {
        System.out.println("Event: " + event.getType() + " " + event.getPath());
        
        if (event.getType() == Event.EventType.NodeDataChanged) {
            // 节点数据变化
            handleDataChange(event.getPath());
        } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
            // 子节点变化
            handleChildrenChange(event.getPath());
        }
        
        // 重新设置监听(Watcher是一次性的)
        try {
            zk.exists(event.getPath(), this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handleDataChange(String path) {
        try {
            byte[] data = zk.getData(path, false, null);
            System.out.println("Data changed: " + new String(data));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handleChildrenChange(String path) {
        try {
            List<String> children = zk.getChildren(path, false);
            System.out.println("Children changed: " + children);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

典型应用场景

1. 分布式锁

java
public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath;
    private String currentPath;
    private CountDownLatch latch;
    
    public DistributedLock(ZooKeeper zk, String lockPath) {
        this.zk = zk;
        this.lockPath = lockPath;
    }
    
    public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
        // 创建临时顺序节点
        currentPath = zk.create(lockPath + "/lock-", new byte[0],
                               ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        return attemptLock(timeout, unit);
    }
    
    private boolean attemptLock(long timeout, TimeUnit unit) throws Exception {
        List<String> children = zk.getChildren(lockPath, false);
        Collections.sort(children);
        
        int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
        
        if (index == 0) {
            // 获得锁
            return true;
        } else {
            // 监听前一个节点
            String prevPath = lockPath + "/" + children.get(index - 1);
            latch = new CountDownLatch(1);
            
            Stat stat = zk.exists(prevPath, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeDeleted) {
                        latch.countDown();
                    }
                }
            });
            
            if (stat == null) {
                // 前一个节点已删除,重新尝试
                return attemptLock(timeout, unit);
            } else {
                // 等待前一个节点删除
                return latch.await(timeout, unit);
            }
        }
    }
    
    public void unlock() throws Exception {
        if (currentPath != null) {
            zk.delete(currentPath, -1);
            currentPath = null;
        }
    }
}
public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath;
    private String currentPath;
    private CountDownLatch latch;
    
    public DistributedLock(ZooKeeper zk, String lockPath) {
        this.zk = zk;
        this.lockPath = lockPath;
    }
    
    public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
        // 创建临时顺序节点
        currentPath = zk.create(lockPath + "/lock-", new byte[0],
                               ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        return attemptLock(timeout, unit);
    }
    
    private boolean attemptLock(long timeout, TimeUnit unit) throws Exception {
        List<String> children = zk.getChildren(lockPath, false);
        Collections.sort(children);
        
        int index = children.indexOf(currentPath.substring(lockPath.length() + 1));
        
        if (index == 0) {
            // 获得锁
            return true;
        } else {
            // 监听前一个节点
            String prevPath = lockPath + "/" + children.get(index - 1);
            latch = new CountDownLatch(1);
            
            Stat stat = zk.exists(prevPath, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeDeleted) {
                        latch.countDown();
                    }
                }
            });
            
            if (stat == null) {
                // 前一个节点已删除,重新尝试
                return attemptLock(timeout, unit);
            } else {
                // 等待前一个节点删除
                return latch.await(timeout, unit);
            }
        }
    }
    
    public void unlock() throws Exception {
        if (currentPath != null) {
            zk.delete(currentPath, -1);
            currentPath = null;
        }
    }
}

2. 服务发现

java
public class ServiceDiscovery {
    private ZooKeeper zk;
    private String servicePath;
    private List<String> serviceList = new ArrayList<>();
    
    public ServiceDiscovery(ZooKeeper zk, String servicePath) {
        this.zk = zk;
        this.servicePath = servicePath;
    }
    
    // 注册服务
    public void registerService(String serviceInfo) throws Exception {
        String serviceName = zk.create(servicePath + "/service-", 
                                      serviceInfo.getBytes(),
                                      ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                                      CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Service registered: " + serviceName);
    }
    
    // 发现服务
    public List<String> discoverServices() throws Exception {
        List<String> children = zk.getChildren(servicePath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        updateServiceList();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        
        updateServiceList();
        return new ArrayList<>(serviceList);
    }
    
    private void updateServiceList() throws Exception {
        List<String> children = zk.getChildren(servicePath, false);
        serviceList.clear();
        
        for (String child : children) {
            byte[] data = zk.getData(servicePath + "/" + child, false, null);
            serviceList.add(new String(data));
        }
        
        System.out.println("Service list updated: " + serviceList);
    }
}
public class ServiceDiscovery {
    private ZooKeeper zk;
    private String servicePath;
    private List<String> serviceList = new ArrayList<>();
    
    public ServiceDiscovery(ZooKeeper zk, String servicePath) {
        this.zk = zk;
        this.servicePath = servicePath;
    }
    
    // 注册服务
    public void registerService(String serviceInfo) throws Exception {
        String serviceName = zk.create(servicePath + "/service-", 
                                      serviceInfo.getBytes(),
                                      ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                                      CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Service registered: " + serviceName);
    }
    
    // 发现服务
    public List<String> discoverServices() throws Exception {
        List<String> children = zk.getChildren(servicePath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        updateServiceList();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        
        updateServiceList();
        return new ArrayList<>(serviceList);
    }
    
    private void updateServiceList() throws Exception {
        List<String> children = zk.getChildren(servicePath, false);
        serviceList.clear();
        
        for (String child : children) {
            byte[] data = zk.getData(servicePath + "/" + child, false, null);
            serviceList.add(new String(data));
        }
        
        System.out.println("Service list updated: " + serviceList);
    }
}

3. 配置管理

java
public class ConfigurationManager {
    private ZooKeeper zk;
    private String configPath;
    private Map<String, String> configCache = new ConcurrentHashMap<>();
    private List<ConfigChangeListener> listeners = new ArrayList<>();
    
    public interface ConfigChangeListener {
        void onConfigChanged(String key, String value);
    }
    
    public ConfigurationManager(ZooKeeper zk, String configPath) {
        this.zk = zk;
        this.configPath = configPath;
        loadConfigurations();
    }
    
    public void setConfig(String key, String value) throws Exception {
        String path = configPath + "/" + key;
        Stat stat = zk.exists(path, false);
        
        if (stat == null) {
            zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            zk.setData(path, value.getBytes(), stat.getVersion());
        }
    }
    
    public String getConfig(String key) {
        return configCache.get(key);
    }
    
    private void loadConfigurations() {
        try {
            List<String> children = zk.getChildren(configPath, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        loadConfigurations();
                    }
                }
            });
            
            for (String child : children) {
                watchConfigNode(child);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void watchConfigNode(String key) throws Exception {
        String path = configPath + "/" + key;
        byte[] data = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDataChanged) {
                    try {
                        watchConfigNode(key);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, null);
        
        String value = new String(data);
        String oldValue = configCache.put(key, value);
        
        if (!value.equals(oldValue)) {
            notifyListeners(key, value);
        }
    }
    
    private void notifyListeners(String key, String value) {
        for (ConfigChangeListener listener : listeners) {
            listener.onConfigChanged(key, value);
        }
    }
    
    public void addListener(ConfigChangeListener listener) {
        listeners.add(listener);
    }
}
public class ConfigurationManager {
    private ZooKeeper zk;
    private String configPath;
    private Map<String, String> configCache = new ConcurrentHashMap<>();
    private List<ConfigChangeListener> listeners = new ArrayList<>();
    
    public interface ConfigChangeListener {
        void onConfigChanged(String key, String value);
    }
    
    public ConfigurationManager(ZooKeeper zk, String configPath) {
        this.zk = zk;
        this.configPath = configPath;
        loadConfigurations();
    }
    
    public void setConfig(String key, String value) throws Exception {
        String path = configPath + "/" + key;
        Stat stat = zk.exists(path, false);
        
        if (stat == null) {
            zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            zk.setData(path, value.getBytes(), stat.getVersion());
        }
    }
    
    public String getConfig(String key) {
        return configCache.get(key);
    }
    
    private void loadConfigurations() {
        try {
            List<String> children = zk.getChildren(configPath, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        loadConfigurations();
                    }
                }
            });
            
            for (String child : children) {
                watchConfigNode(child);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void watchConfigNode(String key) throws Exception {
        String path = configPath + "/" + key;
        byte[] data = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDataChanged) {
                    try {
                        watchConfigNode(key);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, null);
        
        String value = new String(data);
        String oldValue = configCache.put(key, value);
        
        if (!value.equals(oldValue)) {
            notifyListeners(key, value);
        }
    }
    
    private void notifyListeners(String key, String value) {
        for (ConfigChangeListener listener : listeners) {
            listener.onConfigChanged(key, value);
        }
    }
    
    public void addListener(ConfigChangeListener listener) {
        listeners.add(listener);
    }
}

4. 集群管理

java
public class ClusterManager {
    private ZooKeeper zk;
    private String clusterPath;
    private String nodeId;
    private boolean isLeader = false;
    
    public ClusterManager(ZooKeeper zk, String clusterPath, String nodeId) {
        this.zk = zk;
        this.clusterPath = clusterPath;
        this.nodeId = nodeId;
    }
    
    public void joinCluster() throws Exception {
        // 创建集群路径
        Stat stat = zk.exists(clusterPath, false);
        if (stat == null) {
            zk.create(clusterPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 注册节点
        String nodePath = clusterPath + "/" + nodeId;
        zk.create(nodePath, nodeId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        
        // 选举Leader
        electLeader();
    }
    
    private void electLeader() throws Exception {
        List<String> children = zk.getChildren(clusterPath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        electLeader();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        
        Collections.sort(children);
        
        if (!children.isEmpty()) {
            String leader = children.get(0);
            boolean wasLeader = isLeader;
            isLeader = leader.equals(nodeId);
            
            if (isLeader && !wasLeader) {
                onBecomeLeader();
            } else if (!isLeader && wasLeader) {
                onLoseLeadership();
            }
        }
    }
    
    private void onBecomeLeader() {
        System.out.println("Node " + nodeId + " became leader");
        // 执行Leader职责
    }
    
    private void onLoseLeadership() {
        System.out.println("Node " + nodeId + " lost leadership");
        // 停止Leader职责
    }
    
    public boolean isLeader() {
        return isLeader;
    }
}
public class ClusterManager {
    private ZooKeeper zk;
    private String clusterPath;
    private String nodeId;
    private boolean isLeader = false;
    
    public ClusterManager(ZooKeeper zk, String clusterPath, String nodeId) {
        this.zk = zk;
        this.clusterPath = clusterPath;
        this.nodeId = nodeId;
    }
    
    public void joinCluster() throws Exception {
        // 创建集群路径
        Stat stat = zk.exists(clusterPath, false);
        if (stat == null) {
            zk.create(clusterPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 注册节点
        String nodePath = clusterPath + "/" + nodeId;
        zk.create(nodePath, nodeId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        
        // 选举Leader
        electLeader();
    }
    
    private void electLeader() throws Exception {
        List<String> children = zk.getChildren(clusterPath, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        electLeader();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        
        Collections.sort(children);
        
        if (!children.isEmpty()) {
            String leader = children.get(0);
            boolean wasLeader = isLeader;
            isLeader = leader.equals(nodeId);
            
            if (isLeader && !wasLeader) {
                onBecomeLeader();
            } else if (!isLeader && wasLeader) {
                onLoseLeadership();
            }
        }
    }
    
    private void onBecomeLeader() {
        System.out.println("Node " + nodeId + " became leader");
        // 执行Leader职责
    }
    
    private void onLoseLeadership() {
        System.out.println("Node " + nodeId + " lost leadership");
        // 停止Leader职责
    }
    
    public boolean isLeader() {
        return isLeader;
    }
}

ZooKeeper集群

集群配置

properties
# zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2

# 集群配置
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
# zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2

# 集群配置
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

集群特性

  • 奇数节点:避免脑裂问题
  • 过半存活:集群可用的最小条件
  • 数据一致性:通过ZAB协议保证
  • 自动故障转移:Leader故障时自动选举新Leader

性能优化

1. 客户端优化

java
public class ZooKeeperOptimization {
    
    // 连接池
    public class ZooKeeperPool {
        private Queue<ZooKeeper> pool = new ConcurrentLinkedQueue<>();
        private int maxSize = 10;
        
        public ZooKeeper borrowConnection() throws Exception {
            ZooKeeper zk = pool.poll();
            if (zk == null || !zk.getState().isConnected()) {
                zk = createConnection();
            }
            return zk;
        }
        
        public void returnConnection(ZooKeeper zk) {
            if (pool.size() < maxSize && zk.getState().isConnected()) {
                pool.offer(zk);
            } else {
                try {
                    zk.close();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    
    // 批量操作
    public void batchOperations() throws Exception {
        ZooKeeper zk = getZooKeeper();
        
        List<Op> ops = Arrays.asList(
            Op.create("/batch/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
            Op.create("/batch/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
            Op.setData("/batch/node1", "newdata1".getBytes(), -1)
        );
        
        List<OpResult> results = zk.multi(ops);
        // 处理结果
    }
}
public class ZooKeeperOptimization {
    
    // 连接池
    public class ZooKeeperPool {
        private Queue<ZooKeeper> pool = new ConcurrentLinkedQueue<>();
        private int maxSize = 10;
        
        public ZooKeeper borrowConnection() throws Exception {
            ZooKeeper zk = pool.poll();
            if (zk == null || !zk.getState().isConnected()) {
                zk = createConnection();
            }
            return zk;
        }
        
        public void returnConnection(ZooKeeper zk) {
            if (pool.size() < maxSize && zk.getState().isConnected()) {
                pool.offer(zk);
            } else {
                try {
                    zk.close();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    
    // 批量操作
    public void batchOperations() throws Exception {
        ZooKeeper zk = getZooKeeper();
        
        List<Op> ops = Arrays.asList(
            Op.create("/batch/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
            Op.create("/batch/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
            Op.setData("/batch/node1", "newdata1".getBytes(), -1)
        );
        
        List<OpResult> results = zk.multi(ops);
        // 处理结果
    }
}

2. 服务器优化

bash
# JVM参数优化
-Xms4g -Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50

# ZooKeeper参数优化
maxClientCnxns=200
snapCount=100000
preAllocSize=65536
# JVM参数优化
-Xms4g -Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50

# ZooKeeper参数优化
maxClientCnxns=200
snapCount=100000
preAllocSize=65536

监控和运维

1. 四字命令

bash
# 查看状态
echo stat | nc localhost 2181

# 查看配置
echo conf | nc localhost 2181

# 查看连接
echo cons | nc localhost 2181

# 查看监听
echo wchs | nc localhost 2181
# 查看状态
echo stat | nc localhost 2181

# 查看配置
echo conf | nc localhost 2181

# 查看连接
echo cons | nc localhost 2181

# 查看监听
echo wchs | nc localhost 2181

2. JMX监控

java
// 启用JMX
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
// 启用JMX
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false

最佳实践

  1. 合理设计ZNode结构:避免过深的层次
  2. 控制数据大小:每个ZNode数据不超过1MB
  3. 正确使用临时节点:避免会话超时导致的问题
  4. 监听器管理:及时移除不需要的监听器
  5. 异常处理:处理连接断开、会话过期等异常
  6. 性能监控:监控延迟、吞吐量等指标
  7. 备份策略:定期备份ZooKeeper数据

总结

ZooKeeper作为分布式协调服务的经典实现,在分布式系统中发挥着重要作用。虽然在某些场景下被更现代的解决方案替代,但其简单可靠的设计理念和丰富的应用模式仍然值得学习和借鉴。