Redis集群

Redis集群

概述

Redis集群是Redis的分布式解决方案,通过数据分片和自动故障转移,提供高可用性和水平扩展能力。Redis集群采用无中心架构,所有节点地位平等,通过Gossip协议进行通信。

集群架构

节点角色

主节点(Master)

  • 处理读写请求
  • 负责数据分片存储
  • 参与集群管理

从节点(Slave)

  • 复制主节点数据
  • 处理读请求(可选)
  • 主节点故障时可被提升为主节点
bash
# 集群拓扑示例
# 3主3从配置
Master1(7000) -> Slave1(7003)
Master2(7001) -> Slave2(7004)  
Master3(7002) -> Slave3(7005)
# 集群拓扑示例
# 3主3从配置
Master1(7000) -> Slave1(7003)
Master2(7001) -> Slave2(7004)  
Master3(7002) -> Slave3(7005)

数据分片

哈希槽(Hash Slot)

java
// Redis集群使用16384个哈希槽
public class RedisClusterSlot {
    
    private static final int CLUSTER_SLOTS = 16384;
    
    // 计算key对应的槽位
    public static int calculateSlot(String key) {
        // 提取hashtag
        String hashTag = extractHashTag(key);
        String keyToHash = hashTag != null ? hashTag : key;
        
        // CRC16算法计算槽位
        return CRC16.crc16(keyToHash.getBytes()) % CLUSTER_SLOTS;
    }
    
    // 提取hashtag:{user:1000}:profile -> user:1000
    private static String extractHashTag(String key) {
        int start = key.indexOf('{');
        if (start != -1) {
            int end = key.indexOf('}', start + 1);
            if (end != -1 && end != start + 1) {
                return key.substring(start + 1, end);
            }
        }
        return null;
    }
}
// Redis集群使用16384个哈希槽
public class RedisClusterSlot {
    
    private static final int CLUSTER_SLOTS = 16384;
    
    // 计算key对应的槽位
    public static int calculateSlot(String key) {
        // 提取hashtag
        String hashTag = extractHashTag(key);
        String keyToHash = hashTag != null ? hashTag : key;
        
        // CRC16算法计算槽位
        return CRC16.crc16(keyToHash.getBytes()) % CLUSTER_SLOTS;
    }
    
    // 提取hashtag:{user:1000}:profile -> user:1000
    private static String extractHashTag(String key) {
        int start = key.indexOf('{');
        if (start != -1) {
            int end = key.indexOf('}', start + 1);
            if (end != -1 && end != start + 1) {
                return key.substring(start + 1, end);
            }
        }
        return null;
    }
}

槽位分配

bash
# 查看槽位分配
redis-cli -c -p 7000 cluster slots

# 槽位分配示例
# 节点1:0-5460
# 节点2:5461-10922  
# 节点3:10923-16383
# 查看槽位分配
redis-cli -c -p 7000 cluster slots

# 槽位分配示例
# 节点1:0-5460
# 节点2:5461-10922  
# 节点3:10923-16383

集群搭建

配置文件

bash
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
appendfilename "appendonly-7000.aof"
dbfilename "dump-7000.rdb"
logfile "redis-7000.log"
daemonize yes

# 为每个节点创建类似配置文件
# redis-7001.conf, redis-7002.conf, ...
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
appendfilename "appendonly-7000.aof"
dbfilename "dump-7000.rdb"
logfile "redis-7000.log"
daemonize yes

# 为每个节点创建类似配置文件
# redis-7001.conf, redis-7002.conf, ...

启动节点

bash
# 启动所有节点
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf
# 启动所有节点
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf

创建集群

bash
# Redis 5.0+
redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
  --cluster-replicas 1

# Redis 3.0-4.0
redis-trib.rb create --replicas 1 \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
# Redis 5.0+
redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
  --cluster-replicas 1

# Redis 3.0-4.0
redis-trib.rb create --replicas 1 \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

客户端使用

Java客户端(Jedis)

java
@Configuration
public class RedisClusterConfig {
    
    @Bean
    public JedisCluster jedisCluster() {
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("127.0.0.1", 7000));
        nodes.add(new HostAndPort("127.0.0.1", 7001));
        nodes.add(new HostAndPort("127.0.0.1", 7002));
        
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(10);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        
        return new JedisCluster(nodes, 2000, 5000, 5, poolConfig);
    }
}

@Service
public class RedisClusterService {
    
    @Autowired
    private JedisCluster jedisCluster;
    
    public void set(String key, String value) {
        jedisCluster.set(key, value);
    }
    
    public String get(String key) {
        return jedisCluster.get(key);
    }
    
    // 使用hashtag确保相关key在同一节点
    public void setUserData(String userId, String profile, String settings) {
        String userKey = "{user:" + userId + "}";
        jedisCluster.hset(userKey + ":profile", "data", profile);
        jedisCluster.hset(userKey + ":settings", "data", settings);
    }
    
    // 批量操作需要在同一节点
    public void batchOperation(String userId) {
        String userKey = "{user:" + userId + "}";
        
        // 使用pipeline提高性能
        try (Jedis jedis = jedisCluster.getConnectionFromSlot(
                JedisClusterCRC16.getSlot(userKey))) {
            Pipeline pipeline = jedis.pipelined();
            pipeline.hset(userKey + ":profile", "name", "John");
            pipeline.hset(userKey + ":profile", "age", "30");
            pipeline.hset(userKey + ":settings", "theme", "dark");
            pipeline.sync();
        }
    }
}
@Configuration
public class RedisClusterConfig {
    
    @Bean
    public JedisCluster jedisCluster() {
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("127.0.0.1", 7000));
        nodes.add(new HostAndPort("127.0.0.1", 7001));
        nodes.add(new HostAndPort("127.0.0.1", 7002));
        
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(10);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        
        return new JedisCluster(nodes, 2000, 5000, 5, poolConfig);
    }
}

@Service
public class RedisClusterService {
    
    @Autowired
    private JedisCluster jedisCluster;
    
    public void set(String key, String value) {
        jedisCluster.set(key, value);
    }
    
    public String get(String key) {
        return jedisCluster.get(key);
    }
    
    // 使用hashtag确保相关key在同一节点
    public void setUserData(String userId, String profile, String settings) {
        String userKey = "{user:" + userId + "}";
        jedisCluster.hset(userKey + ":profile", "data", profile);
        jedisCluster.hset(userKey + ":settings", "data", settings);
    }
    
    // 批量操作需要在同一节点
    public void batchOperation(String userId) {
        String userKey = "{user:" + userId + "}";
        
        // 使用pipeline提高性能
        try (Jedis jedis = jedisCluster.getConnectionFromSlot(
                JedisClusterCRC16.getSlot(userKey))) {
            Pipeline pipeline = jedis.pipelined();
            pipeline.hset(userKey + ":profile", "name", "John");
            pipeline.hset(userKey + ":profile", "age", "30");
            pipeline.hset(userKey + ":settings", "theme", "dark");
            pipeline.sync();
        }
    }
}

Spring Data Redis

java
@Configuration
@EnableRedisRepositories
public class RedisClusterConfiguration {
    
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
        clusterConfig.clusterNode("127.0.0.1", 7000);
        clusterConfig.clusterNode("127.0.0.1", 7001);
        clusterConfig.clusterNode("127.0.0.1", 7002);
        
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
            .commandTimeout(Duration.ofSeconds(2))
            .shutdownTimeout(Duration.ZERO)
            .build();
            
        return new LettuceConnectionFactory(clusterConfig, clientConfig);
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

@Service
public class UserService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void saveUser(User user) {
        String key = "user:" + user.getId();
        redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));
    }
    
    public User getUser(Long userId) {
        String key = "user:" + userId;
        return (User) redisTemplate.opsForValue().get(key);
    }
}
@Configuration
@EnableRedisRepositories
public class RedisClusterConfiguration {
    
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
        clusterConfig.clusterNode("127.0.0.1", 7000);
        clusterConfig.clusterNode("127.0.0.1", 7001);
        clusterConfig.clusterNode("127.0.0.1", 7002);
        
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
            .commandTimeout(Duration.ofSeconds(2))
            .shutdownTimeout(Duration.ZERO)
            .build();
            
        return new LettuceConnectionFactory(clusterConfig, clientConfig);
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

@Service
public class UserService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void saveUser(User user) {
        String key = "user:" + user.getId();
        redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));
    }
    
    public User getUser(Long userId) {
        String key = "user:" + userId;
        return (User) redisTemplate.opsForValue().get(key);
    }
}

集群管理

节点管理

bash
# 查看集群信息
redis-cli -c -p 7000 cluster info

# 查看节点信息
redis-cli -c -p 7000 cluster nodes

# 添加新节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>
# 查看集群信息
redis-cli -c -p 7000 cluster info

# 查看节点信息
redis-cli -c -p 7000 cluster nodes

# 添加新节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>

槽位迁移

bash
# 重新分配槽位
redis-cli --cluster reshard 127.0.0.1:7000

# 手动迁移槽位
redis-cli -c -p 7000 cluster setslot 1000 migrating <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 importing <source-node-id>

# 迁移key
redis-cli -c -p 7000 migrate 127.0.0.1 7001 "" 0 5000 keys key1 key2

# 确认迁移
redis-cli -c -p 7000 cluster setslot 1000 node <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 node <target-node-id>
# 重新分配槽位
redis-cli --cluster reshard 127.0.0.1:7000

# 手动迁移槽位
redis-cli -c -p 7000 cluster setslot 1000 migrating <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 importing <source-node-id>

# 迁移key
redis-cli -c -p 7000 migrate 127.0.0.1 7001 "" 0 5000 keys key1 key2

# 确认迁移
redis-cli -c -p 7000 cluster setslot 1000 node <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 node <target-node-id>

故障转移

bash
# 手动故障转移
redis-cli -c -p 7003 cluster failover

# 强制故障转移
redis-cli -c -p 7003 cluster failover force

# 查看故障转移日志
tail -f redis-7000.log
# 手动故障转移
redis-cli -c -p 7003 cluster failover

# 强制故障转移
redis-cli -c -p 7003 cluster failover force

# 查看故障转移日志
tail -f redis-7000.log

高可用特性

自动故障检测

java
// 故障检测机制
public class ClusterFailureDetection {
    
    // 节点状态
    public enum NodeState {
        ONLINE,     // 正常
        PFAIL,      // 可能故障
        FAIL        // 确认故障
    }
    
    // 故障检测流程
    public void detectFailure() {
        // 1. 节点间定期ping
        sendPingToAllNodes();
        
        // 2. 超时未响应标记为PFAIL
        markPossibleFailure();
        
        // 3. 超过半数节点认为PFAIL,标记为FAIL
        if (getPfailCount() > getClusterSize() / 2) {
            markNodeFailed();
            
            // 4. 触发故障转移
            triggerFailover();
        }
    }
}
// 故障检测机制
public class ClusterFailureDetection {
    
    // 节点状态
    public enum NodeState {
        ONLINE,     // 正常
        PFAIL,      // 可能故障
        FAIL        // 确认故障
    }
    
    // 故障检测流程
    public void detectFailure() {
        // 1. 节点间定期ping
        sendPingToAllNodes();
        
        // 2. 超时未响应标记为PFAIL
        markPossibleFailure();
        
        // 3. 超过半数节点认为PFAIL,标记为FAIL
        if (getPfailCount() > getClusterSize() / 2) {
            markNodeFailed();
            
            // 4. 触发故障转移
            triggerFailover();
        }
    }
}

自动故障转移

bash
# 故障转移条件
# 1. 主节点被标记为FAIL
# 2. 从节点数据相对较新
# 3. 从节点获得足够投票

# 故障转移过程
# 1. 从节点发起选举
# 2. 其他主节点投票
# 3. 获得多数票的从节点成为新主节点
# 4. 更新集群配置
# 故障转移条件
# 1. 主节点被标记为FAIL
# 2. 从节点数据相对较新
# 3. 从节点获得足够投票

# 故障转移过程
# 1. 从节点发起选举
# 2. 其他主节点投票
# 3. 获得多数票的从节点成为新主节点
# 4. 更新集群配置

性能优化

客户端优化

java
@Component
public class RedisClusterOptimization {
    
    // 连接池优化
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(200);           // 最大连接数
        config.setMaxIdle(50);             // 最大空闲连接
        config.setMinIdle(10);             // 最小空闲连接
        config.setTestOnBorrow(true);      // 获取连接时测试
        config.setTestOnReturn(false);     // 归还连接时测试
        config.setTestWhileIdle(true);     // 空闲时测试
        config.setTimeBetweenEvictionRunsMillis(30000);  // 空闲检测间隔
        config.setNumTestsPerEvictionRun(3);             // 每次检测数量
        config.setMinEvictableIdleTimeMillis(60000);     // 最小空闲时间
        return config;
    }
    
    // 批量操作优化
    public void batchSet(Map<String, String> data) {
        // 按槽位分组
        Map<Integer, Map<String, String>> slotGroups = new HashMap<>();
        
        for (Map.Entry<String, String> entry : data.entrySet()) {
            int slot = JedisClusterCRC16.getSlot(entry.getKey());
            slotGroups.computeIfAbsent(slot, k -> new HashMap<>())
                     .put(entry.getKey(), entry.getValue());
        }
        
        // 并行执行
        slotGroups.entrySet().parallelStream().forEach(group -> {
            try (Jedis jedis = jedisCluster.getConnectionFromSlot(group.getKey())) {
                Pipeline pipeline = jedis.pipelined();
                group.getValue().forEach(pipeline::set);
                pipeline.sync();
            }
        });
    }
}
@Component
public class RedisClusterOptimization {
    
    // 连接池优化
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(200);           // 最大连接数
        config.setMaxIdle(50);             // 最大空闲连接
        config.setMinIdle(10);             // 最小空闲连接
        config.setTestOnBorrow(true);      // 获取连接时测试
        config.setTestOnReturn(false);     // 归还连接时测试
        config.setTestWhileIdle(true);     // 空闲时测试
        config.setTimeBetweenEvictionRunsMillis(30000);  // 空闲检测间隔
        config.setNumTestsPerEvictionRun(3);             // 每次检测数量
        config.setMinEvictableIdleTimeMillis(60000);     // 最小空闲时间
        return config;
    }
    
    // 批量操作优化
    public void batchSet(Map<String, String> data) {
        // 按槽位分组
        Map<Integer, Map<String, String>> slotGroups = new HashMap<>();
        
        for (Map.Entry<String, String> entry : data.entrySet()) {
            int slot = JedisClusterCRC16.getSlot(entry.getKey());
            slotGroups.computeIfAbsent(slot, k -> new HashMap<>())
                     .put(entry.getKey(), entry.getValue());
        }
        
        // 并行执行
        slotGroups.entrySet().parallelStream().forEach(group -> {
            try (Jedis jedis = jedisCluster.getConnectionFromSlot(group.getKey())) {
                Pipeline pipeline = jedis.pipelined();
                group.getValue().forEach(pipeline::set);
                pipeline.sync();
            }
        });
    }
}

内存优化

bash
# 内存优化配置
maxmemory 2gb
maxmemory-policy allkeys-lru

# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
# 内存优化配置
maxmemory 2gb
maxmemory-policy allkeys-lru

# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

监控和运维

集群监控

java
@Component
public class RedisClusterMonitor {
    
    @Autowired
    private JedisCluster jedisCluster;
    
    @Scheduled(fixedDelay = 30000)
    public void monitorCluster() {
        try {
            // 获取集群信息
            String clusterInfo = jedisCluster.clusterInfo();
            parseClusterInfo(clusterInfo);
            
            // 获取节点信息
            String clusterNodes = jedisCluster.clusterNodes();
            parseClusterNodes(clusterNodes);
            
            // 检查槽位覆盖
            checkSlotCoverage();
            
        } catch (Exception e) {
            log.error("Cluster monitoring failed", e);
        }
    }
    
    private void parseClusterInfo(String info) {
        Map<String, String> infoMap = Arrays.stream(info.split("\n"))
            .filter(line -> line.contains(":"))
            .collect(Collectors.toMap(
                line -> line.split(":")[0],
                line -> line.split(":")[1]
            ));
            
        // 关键指标
        String state = infoMap.get("cluster_state");
        int slotsAssigned = Integer.parseInt(infoMap.get("cluster_slots_assigned"));
        int slotsOk = Integer.parseInt(infoMap.get("cluster_slots_ok"));
        int knownNodes = Integer.parseInt(infoMap.get("cluster_known_nodes"));
        
        // 发送监控指标
        sendMetrics("cluster.state", state.equals("ok") ? 1 : 0);
        sendMetrics("cluster.slots.assigned", slotsAssigned);
        sendMetrics("cluster.slots.ok", slotsOk);
        sendMetrics("cluster.nodes.count", knownNodes);
    }
}
@Component
public class RedisClusterMonitor {
    
    @Autowired
    private JedisCluster jedisCluster;
    
    @Scheduled(fixedDelay = 30000)
    public void monitorCluster() {
        try {
            // 获取集群信息
            String clusterInfo = jedisCluster.clusterInfo();
            parseClusterInfo(clusterInfo);
            
            // 获取节点信息
            String clusterNodes = jedisCluster.clusterNodes();
            parseClusterNodes(clusterNodes);
            
            // 检查槽位覆盖
            checkSlotCoverage();
            
        } catch (Exception e) {
            log.error("Cluster monitoring failed", e);
        }
    }
    
    private void parseClusterInfo(String info) {
        Map<String, String> infoMap = Arrays.stream(info.split("\n"))
            .filter(line -> line.contains(":"))
            .collect(Collectors.toMap(
                line -> line.split(":")[0],
                line -> line.split(":")[1]
            ));
            
        // 关键指标
        String state = infoMap.get("cluster_state");
        int slotsAssigned = Integer.parseInt(infoMap.get("cluster_slots_assigned"));
        int slotsOk = Integer.parseInt(infoMap.get("cluster_slots_ok"));
        int knownNodes = Integer.parseInt(infoMap.get("cluster_known_nodes"));
        
        // 发送监控指标
        sendMetrics("cluster.state", state.equals("ok") ? 1 : 0);
        sendMetrics("cluster.slots.assigned", slotsAssigned);
        sendMetrics("cluster.slots.ok", slotsOk);
        sendMetrics("cluster.nodes.count", knownNodes);
    }
}

性能监控

bash
# 监控命令
redis-cli -c -p 7000 info replication
redis-cli -c -p 7000 info memory
redis-cli -c -p 7000 info stats

# 关键指标
# - 内存使用率
# - 命令执行次数
# - 网络IO
# - 复制延迟
# - 槽位状态
# 监控命令
redis-cli -c -p 7000 info replication
redis-cli -c -p 7000 info memory
redis-cli -c -p 7000 info stats

# 关键指标
# - 内存使用率
# - 命令执行次数
# - 网络IO
# - 复制延迟
# - 槽位状态

最佳实践

1. 集群规划

  • 节点数量:建议奇数个主节点,每个主节点至少一个从节点
  • 硬件配置:主从节点配置相同,避免性能差异
  • 网络规划:节点间网络延迟要低,带宽要足够

2. 数据设计

  • 使用hashtag:相关数据使用相同hashtag确保在同一节点
  • 避免大key:大key会影响迁移和故障转移
  • 合理分片:避免热点数据集中在少数节点

3. 运维管理

  • 监控告警:监控集群状态、节点状态、性能指标
  • 备份策略:定期备份,测试恢复流程
  • 扩容规划:提前规划扩容,避免紧急扩容

4. 故障处理

  • 故障预案:制定详细的故障处理流程
  • 数据恢复:掌握数据恢复方法
  • 性能调优:根据业务特点调优参数

总结

Redis集群提供了高可用性和水平扩展能力,通过数据分片和自动故障转移,能够满足大规模应用的需求。在使用Redis集群时,需要注意数据设计、客户端优化、监控运维等方面,确保集群的稳定性和高性能。