Springboot发布订阅
逆风飞翔的小叔 人气:0一、redis发布订阅简介
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。可以参考下面两张图进行理解。
二、几个核心概念解释
1.频道
频道(channel)类似于一个快递柜,快递员往里面放快递,收件人去里面取快递。管道(channel)是由中间件(redis)提供的,一个redisServer中有多个channel。
2、消息发布者
可以理解为消息的生产者,消息发布者通过中间件(redis、mq等)向某个频道(管道)发送消息。
3、消息接收者
也可以理解为消息消费者,消息接收者通过订阅某个频道(管道)来接收发布者发布的消息。
发布者无需关心是否有人接收消息,发布者只需要把消息发布到某个管道中即可;
三、适用场景
1、核心业务完成后,非核心业务需要记录日志,发邮件,发短信之类的操作,一般来说,通过这种方式,核心业务与非核心业务起到了一个解耦的作用;
2、事件订阅,比如订阅UP主,博主相关的消息等;
3、监听事件,比如在分布式微服务场景下,当应用A的某个数据发生变化时,应用B需要同步更新自己的数据做自身业务操作,对于应用A来说并不关心哪个应用,就可以通过这种方式实现;
四、与springboot的整合
1、导入基础依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2、配置文件
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://IP:3306/school?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
username: root
password: root
druid:
max-active: 100
initial-size: 10
max-wait: 60000
min-idle: 5
redis:
host: localhost
port: 6379
cache:
type: redis
3、自定义RedisSubConfig
往容器(RedisMessageListenerContainer)内添加消息监听器,注意,container的参数列表是可以传多个监听器的,但是要定义监听器的bean。在定义监听器的方法体内绑定消息处理器和管道(channel),一个监听器可以监听多个管道,可以通过数组或者添加多个channel的方式定义;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisSubConfig { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); //订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener container.addMessageListener(listener, new ChannelTopic("redis.user")); //container.addMessageListener(listener, new ChannelTopic("redis.news")); return container; } }
4、自定义消息监听器
需要实现MessageListener 接口,重写里面的onMessage方法,方法体内需要创建一个MessageListenerAdapter(这是一种规范写法,用于绑定消息处理器和监听器)。
这种写法和很多消息中间件对应的API很相似,即通过一个监听的代码块来完成监听到消息后具体的业务操作;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @Component public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { // 获取消息 byte[] messageBody = message.getBody(); // 使用值序列化器转换 Object msg = redisTemplate.getValueSerializer().deserialize(messageBody); // 获取监听的频道 byte[] channelByte = message.getChannel(); // 使用字符串序列化器转换 Object channel = redisTemplate.getStringSerializer().deserialize(channelByte); // 渠道名称转换 String patternStr = new String(pattern); System.out.println(patternStr); System.out.println("---频道---: " + channel); System.out.println("---消息内容---: " + msg); } }
5、redistemplate的序列化
@Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); template.setValueSerializer(jackson2JsonRedisSerializer); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; }
6、功能测试
下面写一个测试的接口,模拟业务处理完毕后,向 redis.user 这个通道发送一条消息,看看监听器中是否能够正常接收到消息即可;
@RestController public class RedisPubController { @Autowired private RedisTemplate redisTemplate; @GetMapping("/getUserById") public String getUserById(String userId){ //TODO 执行主业务 redisTemplate.convertAndSend("redis.user", userId); return "hello:" + userId; } }
启动工程后调用接口:
http://localhost:8083/getByUserIdId?userId=1
通过断点可以观察发现,通道中的消息被监听器逻辑监听到了,接下来就可以愉快的处理自己的业务了;
本篇通过案例演示了如何基于springboot整合使用redis的发布订阅功能,更多的细节需要结合实际的开发场景进行完善
加载全部内容