Redis集群
Redis集群
概述
Redis集群是Redis的分布式解决方案,通过数据分片和自动故障转移,提供高可用性和水平扩展能力。Redis集群采用无中心架构,所有节点地位平等,通过Gossip协议进行通信。
集群架构
节点角色
主节点(Master)
- 处理读写请求
- 负责数据分片存储
- 参与集群管理
从节点(Slave)
- 复制主节点数据
- 处理读请求(可选)
- 主节点故障时可被提升为主节点
bash
# 集群拓扑示例
# 3主3从配置
Master1(7000) -> Slave1(7003)
Master2(7001) -> Slave2(7004)
Master3(7002) -> Slave3(7005)
# 集群拓扑示例
# 3主3从配置
Master1(7000) -> Slave1(7003)
Master2(7001) -> Slave2(7004)
Master3(7002) -> Slave3(7005)
数据分片
哈希槽(Hash Slot)
java
// Redis集群使用16384个哈希槽
public class RedisClusterSlot {
private static final int CLUSTER_SLOTS = 16384;
// 计算key对应的槽位
public static int calculateSlot(String key) {
// 提取hashtag
String hashTag = extractHashTag(key);
String keyToHash = hashTag != null ? hashTag : key;
// CRC16算法计算槽位
return CRC16.crc16(keyToHash.getBytes()) % CLUSTER_SLOTS;
}
// 提取hashtag:{user:1000}:profile -> user:1000
private static String extractHashTag(String key) {
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}', start + 1);
if (end != -1 && end != start + 1) {
return key.substring(start + 1, end);
}
}
return null;
}
}
// Redis集群使用16384个哈希槽
public class RedisClusterSlot {
private static final int CLUSTER_SLOTS = 16384;
// 计算key对应的槽位
public static int calculateSlot(String key) {
// 提取hashtag
String hashTag = extractHashTag(key);
String keyToHash = hashTag != null ? hashTag : key;
// CRC16算法计算槽位
return CRC16.crc16(keyToHash.getBytes()) % CLUSTER_SLOTS;
}
// 提取hashtag:{user:1000}:profile -> user:1000
private static String extractHashTag(String key) {
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}', start + 1);
if (end != -1 && end != start + 1) {
return key.substring(start + 1, end);
}
}
return null;
}
}
槽位分配
bash
# 查看槽位分配
redis-cli -c -p 7000 cluster slots
# 槽位分配示例
# 节点1:0-5460
# 节点2:5461-10922
# 节点3:10923-16383
# 查看槽位分配
redis-cli -c -p 7000 cluster slots
# 槽位分配示例
# 节点1:0-5460
# 节点2:5461-10922
# 节点3:10923-16383
集群搭建
配置文件
bash
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
appendfilename "appendonly-7000.aof"
dbfilename "dump-7000.rdb"
logfile "redis-7000.log"
daemonize yes
# 为每个节点创建类似配置文件
# redis-7001.conf, redis-7002.conf, ...
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
appendfilename "appendonly-7000.aof"
dbfilename "dump-7000.rdb"
logfile "redis-7000.log"
daemonize yes
# 为每个节点创建类似配置文件
# redis-7001.conf, redis-7002.conf, ...
启动节点
bash
# 启动所有节点
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf
# 启动所有节点
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf
创建集群
bash
# Redis 5.0+
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1
# Redis 3.0-4.0
redis-trib.rb create --replicas 1 \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
# Redis 5.0+
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1
# Redis 3.0-4.0
redis-trib.rb create --replicas 1 \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
客户端使用
Java客户端(Jedis)
java
@Configuration
public class RedisClusterConfig {
@Bean
public JedisCluster jedisCluster() {
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("127.0.0.1", 7000));
nodes.add(new HostAndPort("127.0.0.1", 7001));
nodes.add(new HostAndPort("127.0.0.1", 7002));
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
return new JedisCluster(nodes, 2000, 5000, 5, poolConfig);
}
}
@Service
public class RedisClusterService {
@Autowired
private JedisCluster jedisCluster;
public void set(String key, String value) {
jedisCluster.set(key, value);
}
public String get(String key) {
return jedisCluster.get(key);
}
// 使用hashtag确保相关key在同一节点
public void setUserData(String userId, String profile, String settings) {
String userKey = "{user:" + userId + "}";
jedisCluster.hset(userKey + ":profile", "data", profile);
jedisCluster.hset(userKey + ":settings", "data", settings);
}
// 批量操作需要在同一节点
public void batchOperation(String userId) {
String userKey = "{user:" + userId + "}";
// 使用pipeline提高性能
try (Jedis jedis = jedisCluster.getConnectionFromSlot(
JedisClusterCRC16.getSlot(userKey))) {
Pipeline pipeline = jedis.pipelined();
pipeline.hset(userKey + ":profile", "name", "John");
pipeline.hset(userKey + ":profile", "age", "30");
pipeline.hset(userKey + ":settings", "theme", "dark");
pipeline.sync();
}
}
}
@Configuration
public class RedisClusterConfig {
@Bean
public JedisCluster jedisCluster() {
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("127.0.0.1", 7000));
nodes.add(new HostAndPort("127.0.0.1", 7001));
nodes.add(new HostAndPort("127.0.0.1", 7002));
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
return new JedisCluster(nodes, 2000, 5000, 5, poolConfig);
}
}
@Service
public class RedisClusterService {
@Autowired
private JedisCluster jedisCluster;
public void set(String key, String value) {
jedisCluster.set(key, value);
}
public String get(String key) {
return jedisCluster.get(key);
}
// 使用hashtag确保相关key在同一节点
public void setUserData(String userId, String profile, String settings) {
String userKey = "{user:" + userId + "}";
jedisCluster.hset(userKey + ":profile", "data", profile);
jedisCluster.hset(userKey + ":settings", "data", settings);
}
// 批量操作需要在同一节点
public void batchOperation(String userId) {
String userKey = "{user:" + userId + "}";
// 使用pipeline提高性能
try (Jedis jedis = jedisCluster.getConnectionFromSlot(
JedisClusterCRC16.getSlot(userKey))) {
Pipeline pipeline = jedis.pipelined();
pipeline.hset(userKey + ":profile", "name", "John");
pipeline.hset(userKey + ":profile", "age", "30");
pipeline.hset(userKey + ":settings", "theme", "dark");
pipeline.sync();
}
}
}
Spring Data Redis
java
@Configuration
@EnableRedisRepositories
public class RedisClusterConfiguration {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.clusterNode("127.0.0.1", 7000);
clusterConfig.clusterNode("127.0.0.1", 7001);
clusterConfig.clusterNode("127.0.0.1", 7002);
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.shutdownTimeout(Duration.ZERO)
.build();
return new LettuceConnectionFactory(clusterConfig, clientConfig);
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void saveUser(User user) {
String key = "user:" + user.getId();
redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));
}
public User getUser(Long userId) {
String key = "user:" + userId;
return (User) redisTemplate.opsForValue().get(key);
}
}
@Configuration
@EnableRedisRepositories
public class RedisClusterConfiguration {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.clusterNode("127.0.0.1", 7000);
clusterConfig.clusterNode("127.0.0.1", 7001);
clusterConfig.clusterNode("127.0.0.1", 7002);
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.shutdownTimeout(Duration.ZERO)
.build();
return new LettuceConnectionFactory(clusterConfig, clientConfig);
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void saveUser(User user) {
String key = "user:" + user.getId();
redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));
}
public User getUser(Long userId) {
String key = "user:" + userId;
return (User) redisTemplate.opsForValue().get(key);
}
}
集群管理
节点管理
bash
# 查看集群信息
redis-cli -c -p 7000 cluster info
# 查看节点信息
redis-cli -c -p 7000 cluster nodes
# 添加新节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000
# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>
# 查看集群信息
redis-cli -c -p 7000 cluster info
# 查看节点信息
redis-cli -c -p 7000 cluster nodes
# 添加新节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000
# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>
槽位迁移
bash
# 重新分配槽位
redis-cli --cluster reshard 127.0.0.1:7000
# 手动迁移槽位
redis-cli -c -p 7000 cluster setslot 1000 migrating <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 importing <source-node-id>
# 迁移key
redis-cli -c -p 7000 migrate 127.0.0.1 7001 "" 0 5000 keys key1 key2
# 确认迁移
redis-cli -c -p 7000 cluster setslot 1000 node <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 node <target-node-id>
# 重新分配槽位
redis-cli --cluster reshard 127.0.0.1:7000
# 手动迁移槽位
redis-cli -c -p 7000 cluster setslot 1000 migrating <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 importing <source-node-id>
# 迁移key
redis-cli -c -p 7000 migrate 127.0.0.1 7001 "" 0 5000 keys key1 key2
# 确认迁移
redis-cli -c -p 7000 cluster setslot 1000 node <target-node-id>
redis-cli -c -p 7001 cluster setslot 1000 node <target-node-id>
故障转移
bash
# 手动故障转移
redis-cli -c -p 7003 cluster failover
# 强制故障转移
redis-cli -c -p 7003 cluster failover force
# 查看故障转移日志
tail -f redis-7000.log
# 手动故障转移
redis-cli -c -p 7003 cluster failover
# 强制故障转移
redis-cli -c -p 7003 cluster failover force
# 查看故障转移日志
tail -f redis-7000.log
高可用特性
自动故障检测
java
// 故障检测机制
public class ClusterFailureDetection {
// 节点状态
public enum NodeState {
ONLINE, // 正常
PFAIL, // 可能故障
FAIL // 确认故障
}
// 故障检测流程
public void detectFailure() {
// 1. 节点间定期ping
sendPingToAllNodes();
// 2. 超时未响应标记为PFAIL
markPossibleFailure();
// 3. 超过半数节点认为PFAIL,标记为FAIL
if (getPfailCount() > getClusterSize() / 2) {
markNodeFailed();
// 4. 触发故障转移
triggerFailover();
}
}
}
// 故障检测机制
public class ClusterFailureDetection {
// 节点状态
public enum NodeState {
ONLINE, // 正常
PFAIL, // 可能故障
FAIL // 确认故障
}
// 故障检测流程
public void detectFailure() {
// 1. 节点间定期ping
sendPingToAllNodes();
// 2. 超时未响应标记为PFAIL
markPossibleFailure();
// 3. 超过半数节点认为PFAIL,标记为FAIL
if (getPfailCount() > getClusterSize() / 2) {
markNodeFailed();
// 4. 触发故障转移
triggerFailover();
}
}
}
自动故障转移
bash
# 故障转移条件
# 1. 主节点被标记为FAIL
# 2. 从节点数据相对较新
# 3. 从节点获得足够投票
# 故障转移过程
# 1. 从节点发起选举
# 2. 其他主节点投票
# 3. 获得多数票的从节点成为新主节点
# 4. 更新集群配置
# 故障转移条件
# 1. 主节点被标记为FAIL
# 2. 从节点数据相对较新
# 3. 从节点获得足够投票
# 故障转移过程
# 1. 从节点发起选举
# 2. 其他主节点投票
# 3. 获得多数票的从节点成为新主节点
# 4. 更新集群配置
性能优化
客户端优化
java
@Component
public class RedisClusterOptimization {
// 连接池优化
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(200); // 最大连接数
config.setMaxIdle(50); // 最大空闲连接
config.setMinIdle(10); // 最小空闲连接
config.setTestOnBorrow(true); // 获取连接时测试
config.setTestOnReturn(false); // 归还连接时测试
config.setTestWhileIdle(true); // 空闲时测试
config.setTimeBetweenEvictionRunsMillis(30000); // 空闲检测间隔
config.setNumTestsPerEvictionRun(3); // 每次检测数量
config.setMinEvictableIdleTimeMillis(60000); // 最小空闲时间
return config;
}
// 批量操作优化
public void batchSet(Map<String, String> data) {
// 按槽位分组
Map<Integer, Map<String, String>> slotGroups = new HashMap<>();
for (Map.Entry<String, String> entry : data.entrySet()) {
int slot = JedisClusterCRC16.getSlot(entry.getKey());
slotGroups.computeIfAbsent(slot, k -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
// 并行执行
slotGroups.entrySet().parallelStream().forEach(group -> {
try (Jedis jedis = jedisCluster.getConnectionFromSlot(group.getKey())) {
Pipeline pipeline = jedis.pipelined();
group.getValue().forEach(pipeline::set);
pipeline.sync();
}
});
}
}
@Component
public class RedisClusterOptimization {
// 连接池优化
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(200); // 最大连接数
config.setMaxIdle(50); // 最大空闲连接
config.setMinIdle(10); // 最小空闲连接
config.setTestOnBorrow(true); // 获取连接时测试
config.setTestOnReturn(false); // 归还连接时测试
config.setTestWhileIdle(true); // 空闲时测试
config.setTimeBetweenEvictionRunsMillis(30000); // 空闲检测间隔
config.setNumTestsPerEvictionRun(3); // 每次检测数量
config.setMinEvictableIdleTimeMillis(60000); // 最小空闲时间
return config;
}
// 批量操作优化
public void batchSet(Map<String, String> data) {
// 按槽位分组
Map<Integer, Map<String, String>> slotGroups = new HashMap<>();
for (Map.Entry<String, String> entry : data.entrySet()) {
int slot = JedisClusterCRC16.getSlot(entry.getKey());
slotGroups.computeIfAbsent(slot, k -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
// 并行执行
slotGroups.entrySet().parallelStream().forEach(group -> {
try (Jedis jedis = jedisCluster.getConnectionFromSlot(group.getKey())) {
Pipeline pipeline = jedis.pipelined();
group.getValue().forEach(pipeline::set);
pipeline.sync();
}
});
}
}
内存优化
bash
# 内存优化配置
maxmemory 2gb
maxmemory-policy allkeys-lru
# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
# 内存优化配置
maxmemory 2gb
maxmemory-policy allkeys-lru
# 压缩配置
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
监控和运维
集群监控
java
@Component
public class RedisClusterMonitor {
@Autowired
private JedisCluster jedisCluster;
@Scheduled(fixedDelay = 30000)
public void monitorCluster() {
try {
// 获取集群信息
String clusterInfo = jedisCluster.clusterInfo();
parseClusterInfo(clusterInfo);
// 获取节点信息
String clusterNodes = jedisCluster.clusterNodes();
parseClusterNodes(clusterNodes);
// 检查槽位覆盖
checkSlotCoverage();
} catch (Exception e) {
log.error("Cluster monitoring failed", e);
}
}
private void parseClusterInfo(String info) {
Map<String, String> infoMap = Arrays.stream(info.split("\n"))
.filter(line -> line.contains(":"))
.collect(Collectors.toMap(
line -> line.split(":")[0],
line -> line.split(":")[1]
));
// 关键指标
String state = infoMap.get("cluster_state");
int slotsAssigned = Integer.parseInt(infoMap.get("cluster_slots_assigned"));
int slotsOk = Integer.parseInt(infoMap.get("cluster_slots_ok"));
int knownNodes = Integer.parseInt(infoMap.get("cluster_known_nodes"));
// 发送监控指标
sendMetrics("cluster.state", state.equals("ok") ? 1 : 0);
sendMetrics("cluster.slots.assigned", slotsAssigned);
sendMetrics("cluster.slots.ok", slotsOk);
sendMetrics("cluster.nodes.count", knownNodes);
}
}
@Component
public class RedisClusterMonitor {
@Autowired
private JedisCluster jedisCluster;
@Scheduled(fixedDelay = 30000)
public void monitorCluster() {
try {
// 获取集群信息
String clusterInfo = jedisCluster.clusterInfo();
parseClusterInfo(clusterInfo);
// 获取节点信息
String clusterNodes = jedisCluster.clusterNodes();
parseClusterNodes(clusterNodes);
// 检查槽位覆盖
checkSlotCoverage();
} catch (Exception e) {
log.error("Cluster monitoring failed", e);
}
}
private void parseClusterInfo(String info) {
Map<String, String> infoMap = Arrays.stream(info.split("\n"))
.filter(line -> line.contains(":"))
.collect(Collectors.toMap(
line -> line.split(":")[0],
line -> line.split(":")[1]
));
// 关键指标
String state = infoMap.get("cluster_state");
int slotsAssigned = Integer.parseInt(infoMap.get("cluster_slots_assigned"));
int slotsOk = Integer.parseInt(infoMap.get("cluster_slots_ok"));
int knownNodes = Integer.parseInt(infoMap.get("cluster_known_nodes"));
// 发送监控指标
sendMetrics("cluster.state", state.equals("ok") ? 1 : 0);
sendMetrics("cluster.slots.assigned", slotsAssigned);
sendMetrics("cluster.slots.ok", slotsOk);
sendMetrics("cluster.nodes.count", knownNodes);
}
}
性能监控
bash
# 监控命令
redis-cli -c -p 7000 info replication
redis-cli -c -p 7000 info memory
redis-cli -c -p 7000 info stats
# 关键指标
# - 内存使用率
# - 命令执行次数
# - 网络IO
# - 复制延迟
# - 槽位状态
# 监控命令
redis-cli -c -p 7000 info replication
redis-cli -c -p 7000 info memory
redis-cli -c -p 7000 info stats
# 关键指标
# - 内存使用率
# - 命令执行次数
# - 网络IO
# - 复制延迟
# - 槽位状态
最佳实践
1. 集群规划
- 节点数量:建议奇数个主节点,每个主节点至少一个从节点
- 硬件配置:主从节点配置相同,避免性能差异
- 网络规划:节点间网络延迟要低,带宽要足够
2. 数据设计
- 使用hashtag:相关数据使用相同hashtag确保在同一节点
- 避免大key:大key会影响迁移和故障转移
- 合理分片:避免热点数据集中在少数节点
3. 运维管理
- 监控告警:监控集群状态、节点状态、性能指标
- 备份策略:定期备份,测试恢复流程
- 扩容规划:提前规划扩容,避免紧急扩容
4. 故障处理
- 故障预案:制定详细的故障处理流程
- 数据恢复:掌握数据恢复方法
- 性能调优:根据业务特点调优参数
总结
Redis集群提供了高可用性和水平扩展能力,通过数据分片和自动故障转移,能够满足大规模应用的需求。在使用Redis集群时,需要注意数据设计、客户端优化、监控运维等方面,确保集群的稳定性和高性能。