博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
解决Redis集群条件下键空间通知服务器接收不到消息的问题
阅读量:6258 次
发布时间:2019-06-22

本文共 7901 字,大约阅读时间需要 26 分钟。

解决Redis集群条件下键空间通知服务器接收不到消息的问题

键空间通知介绍

键空间通知使得客户端可以通过订阅频道或模式, 来接收那些以某种方式改动了 Redis 数据集的事件。

可以通过对redis的redis.conf文件中配置notify-keyspace-events参数可以指定服务器发送哪种类型的通知。下面对于一些参数的描述。默认情况下此功能是关闭的。

字符 通知
K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
g DELEXPIRERENAME 等类型无关的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
A 参数 g$lshzxe 的别名

所以当你配置文件中配置为AKE时就表示发送所有类型的通知。

在程序中接入

使用SpringData可以轻松的实现对于redis键空间通知的接收操作。只需要作如下配置即可

所使用的jar包

'org.springframework.boot:spring-boot-starter-data-redis'复制代码

配置监听器

@Configuration@ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()")public static class RedisStandAloneAutoConfiguration {    @Bean    public RedisMessageListenerContainer customizeRedisListenerContainer(            RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) {        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);        redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyspace@0__:*"));        return redisMessageListenerContainer;    }}复制代码

其中PatternTopic构造器里面填写的是你所要监听哪一个通道。

例如在redis中执行set blog buxuewushu。我配置文件中配置的AKE所以所有消息都会发送,他就会发送两条信息。

PUBLISH __keyspace@0__:blog setPUBLISH __keyevent@0__:set blog复制代码

所以我在上面配置的监听规则__keyspace@0__:*就是监听0号库发送的所有space信息都会接收到。

配置处理器

上面我们配置了监听Redis的哪条通道,现在我们需要配置接收到了信息以后如何处理的事情。所以此时我们需要在程序中写处理器

@Slf4j@Componentpublic class KeyExpiredEventMessageListener implements MessageListener {    @Override    public void onMessage(Message message, byte[] pattern) {        log.info("监听失效的redisKey:{},值是:{}", new String(message.getChannel()), new String(message.getBody()));    }}复制代码

只需要实现MessageListener即可。我们只是将监听到的键和发送的信息打印出来。

效果展示

此时我们启动本地的redis,然后执行set blog buxuewushu命令,可以在程序中看到。下面的输出。即我们已经监听到了redis发送的消息了。

c.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog,值是:set复制代码

此时如果我们将规则变成__key*__:*那么会收到什么呢?还是执行set blog buxuewushu命令

c.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyevent@0__:set,值是:blog复制代码

我们看到执行一个set命令可以收到两个消息,一个是space消息一个是event消息。

集群条件下

我们刚才的测试都是在单机Redis下测试的,当将Redis转为集群模式时,会发现接收不到了消息了。此时我们启动本机的redis的集群。关于如何在本机利用docker一键部署集群可以参考我的一篇文章。启动完redis集群以后我们还是启动程序进行测试。

redis集群配置如下,监听规则改为如下

spring:  redis:    cluster:      nodes:      - 127.0.0.1:7000      - 127.0.0.1:7001      - 127.0.0.1:7002      - 127.0.0.1:7003      - 127.0.0.1:7004      - 127.0.0.1:7005复制代码

我们redis中如下的命令

127.0.0.1:7002> set blog buxuwshu-> Redirected to slot [7653] located at 127.0.0.1:7001OK127.0.0.1:7001> set blog1 buxuwshu-> Redirected to slot [2090] located at 127.0.0.1:7000OK127.0.0.1:7000> set blog2 buxuwshu-> Redirected to slot [14409] located at 127.0.0.1:7002OK127.0.0.1:7002> set blog3 buxuwshu-> Redirected to slot [10344] located at 127.0.0.1:7001OK127.0.0.1:7001> set blog4 buxuwshuOK127.0.0.1:7001> set blog5 buxuwshu-> Redirected to slot [2222] located at 127.0.0.1:7000OK复制代码

在程序中打印如下

c.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog3,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog4,值是:set复制代码

我们看到只打印了blogblog1blog4的键,而我们通过上面观察,打印的键都是分布在7001端口上的。因此我们预测程序只是监听了7001端口发送的消息。而通过N次测试,程序不是每次都在监听7001端口,而是随机的。但是每次只会监听一个端口。

问题所在

接下来让我们通过找寻源码,看看到底是哪出的问题。

JedisSlotBasedConnectionHandlergetConnection方法中

public Jedis getConnection() {    // In antirez's redis-rb-cluster implementation,    // getRandomConnection always return valid connection (able to    // ping-pong)    // or exception if all connections are invalid    List
pools = cache.getShuffledNodesPool(); for (JedisPool pool : pools) { Jedis jedis = null; try { jedis = pool.getResource(); if (jedis == null) { continue; } String result = jedis.ping(); if (result.equalsIgnoreCase("pong")) return jedis; jedis.close(); } catch (JedisException ex) { if (jedis != null) { jedis.close(); } } } throw new JedisNoReachableClusterNodeException("No reachable node in cluster"); }复制代码

可以看到注释中写着会获得一个随机的有效连接。也可以通过代码看到,获得连接池的信息以后遍历,直到有一个信息能够ping-pong通就直接返回此连接进行监听。而Redis的消息发送是在本地发送的。因此默认只能监听到集群中一台机器发送的消息。

本地发送解释:例如有三个主机01,02,03。此时如果有个set键buxuewushu落到了主机01上,那么此消息就会通过01这台主机发送,因此如果此时服务监听的02机器,那么这个消息就会监听不到。

解决办法

既然我们知道了在集群条件下,每次监听只会随机取一个端口进行监听。那么我们就自己写监听机制,监听集群条件下的所有主机的端口就行了。

我们可以看到在SpringData中提供了RedisMessageListenerContainer类来与Redis服务器进行通信。 此类中有个start方法,可以看到是建立了与Redis的异步通信操作。所以我们的改造点就放在这就行。思路如下。

  • 程序启动时,获得集群的配置信息
  • 根据集群配置的Master数配置相同的RedisMessageListenerContainer进行监听

主要代码如下

public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();        if (redisClusterConnection != null) {            Iterable
nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisConnectionFactory factory = new JedisConnectionFactory( new JedisShardInfo(node.getHost(), node.getPort())); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false); beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition()); RedisMessageListenerContainer container = beanFactory .getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } container.addMessageListener(messageListener, new PatternTopic("__key*__:*")); container.start(); } } } }复制代码

此时我们再启动程序,还是在Redis中如下的输入

127.0.0.1:7002> set blog0 buxuewushu-> Redirected to slot [6155] located at 127.0.0.1:7001OK127.0.0.1:7001> set blog1 buxuewushu-> Redirected to slot [2090] located at 127.0.0.1:7000OK127.0.0.1:7000> set blog2 buxuewushu-> Redirected to slot [14409] located at 127.0.0.1:7002OK127.0.0.1:7002> set blog3 buxuewushu-> Redirected to slot [10344] located at 127.0.0.1:7001OK127.0.0.1:7001> set blog4 buxuewushuOK127.0.0.1:7001> set blog5 buxuewushu-> Redirected to slot [2222] located at 127.0.0.1:7000OK复制代码

这时我们可以看到在程序中我们接收到了所有端口的信息了。

c.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog0,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog1,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog2,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog3,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog4,值是:setc.e.s.r.KeyExpiredEventMessageListener   : 监听到的信息:__keyspace@0__:blog5,值是:set复制代码

此时相当于我们建立了三个连接来监听三个redis服务器发送的消息。

小贴士:模式能匹配通配符,例如__keyspace@0__:blog*表示只接收blog开头的key值的信息,其他key值信息不接收

转载于:https://juejin.im/post/5cc55493f265da039c055bb7

你可能感兴趣的文章
ASCII流程图
查看>>
Linux知识积累(5) 关机shutdown和重启reboot
查看>>
HTML5为输入框添加语音输入功能
查看>>
[LeetCode] Find Permutation 找全排列
查看>>
os.environ() 说明
查看>>
Python学习札记(二十) 函数式编程1 介绍 高阶函数介绍
查看>>
tomcat安装不成功.提示:failed to install tomcat6 service ,check your setting and permissions
查看>>
[转]当当网高可用架构之道--转
查看>>
ROS学习网址【原创】
查看>>
mysql数据库对时间进行默认的设置
查看>>
喵哈哈村的魔法考试 Round #3 (Div.2) 题解
查看>>
音频 API 一览
查看>>
hive的select重命名字段显示成中文
查看>>
JVM类加载机制与对象的生命周期
查看>>
zabbix主动被动模式说明/区别
查看>>
神奇的AC
查看>>
数据库防火墙——实现数据库的访问行为控制、危险操作阻断、可疑行为审计...
查看>>
PCIE_DMA实例一:xapp1052详细使用说明
查看>>
MySQL也有潜规则 – Select 语句不加 Order By 如何排序?
查看>>
Struts(二十八):自定义拦截器
查看>>