东莞市盛裕绒艺玩具有限公司

东莞市盛裕绒艺玩具有限公司

优德88登录

15380497142
联系方式
全国服务热线: 15380497142

咨询热线:18580786066
联系人:郭佳宏
地址:周市镇新镇路698号

补习系列(13)-springboot redis 与发布订阅

来源:优德88登录   发布时间:2019-11-13   点击量:229

目录

一、订阅发布常见应用二、Redis 与订阅发布三、SpringBoot 与订阅发布A. 消息模型B. 序列化C. 发布消息D. 接收消息小结

一、订阅发布

订阅发布是一种常见的设计模式,常见于消息系统的场景。如下面的图:

[图来自百科]消息发布者是消息载体的生产者,其通过某些主题来向调度中心发送消息;而消息订阅者会事先向调度中心订阅其"感兴趣"的主题,随后会获得新消息。在这里,调度中心是一个负责消息控制中转的逻辑实体,可以是消息队列如ActiveMQ,也可以是Web服务等等。

常见应用

微博,每个用户的粉丝都是该用户的订阅者,当用户发完微博,所有粉丝都将收到他的动态;新闻,资讯站点通常有多个频道,每个频道就是一个主题,用户可以通过主题来做订阅(如RSS),这样当新闻发布时,订阅者可以获得更新。

二、Redis 与订阅发布

Redis 支持 (pub/sub) 的订阅发布能力,客户端可以通过channel(频道)来实现消息的发布及接收。

    客户端通过 SUBSCRIBE 命令订阅 channel;
    客户端通过PUBLISH 命令向channel 发送消息;

而后,订阅 channel的客户端可实时收到消息。

除了简单的SUBSCRIBE/PUBLISH命令之外,Redis还支持订阅某一个模式的主题(正则表达式),如下:

PSUBSCRIBE /topic/cars/*

于是,我们可以利用这点实现相对复杂的订阅能力,比如:

在电商平台中订阅多个品类的商品促销信息;智能家居场景,APP可以订阅所有房间的设备消息。...

尽管如此,Redis pub/sub 机制存在一些缺点:

消息无法持久化,存在丢失风险;没有类似 RabbitMQ的ACK机制;由于是广播机制,无法通过添加worker 提升消费能力;

因此,Redis 的订阅发布建议用于实时且可靠性要求不高的场景。

三、SpringBoot 与订阅发布

接下来,看一下SpringBoot 怎么实现订阅发布的功能。

spring-boot-starter-data-redis 帮我们实现了Jedis的引入,pom 依赖如下:

<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${spring-boot.version}</version> </dependency>

application.properties 中指定配置

# redis 连接配置spring.redis.database=0 spring.redis.host=127.0.0.1spring.redis.password=spring.redis.port=6379spring.redis.ssl=false# 连接池最大数spring.redis.pool.max-active=10 # 空闲连接最大数spring.redis.pool.max-idle=10# 获取连接最大等待时间(s)spring.redis.pool.max-wait=600000

A. 消息模型

消息模型描述了订阅发布的数据对象,这要求生产者与消费者都能理解以下面的POJO为例:

public static class SimpleMessage { private String publisher; private String content; private Date createTime;

在SimpleMessage类中,我们声明了几个字段:

字段名说明
publisher发布者
content文本内容
createTime创建时间

B. 序列化

如下的代码采用了JSON 作为序列化方式:

@Configurationpublic class RedisConfig { private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class); /** * 序列化定制 * * @return */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } /** * 操作模板 * * @param connectionFactory * @param jackson2JsonRedisSerializer * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(connectionFactory); // 设置key/hashkey序列化 RedisSerializer<String> stringSerializer = new StringRedisSerializer(); template.setKeySerializer(stringSerializer); template.setHashKeySerializer(stringSerializer); // 设置值序列化 template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; }

C. 发布消息

消息发布,需要先指定一个ChannelTopic对象,随后通过RedisTemplate方法操作。

@Servicepublic class RedisPubSub { private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class); @Autowired private RedisTemplate<String, Object> redisTemplate; private ChannelTopic topic = new ChannelTopic("/redis/pubsub"); @Scheduled(initialDelay = 5000, fixedDelay = 10000) private void schedule() { logger.info("publish message"); publish("admin", "hey you must go now!"); } /** * 推送消息 * * @param publisher * @param message */ public void publish(String publisher, String content) { logger.info("message send {} by {}", content, publisher); SimpleMessage pushMsg = new SimpleMessage(); pushMsg.setContent(content); pushMsg.setCreateTime(new Date()); pushMsg.setPublisher(publisher); redisTemplate.convertAndSend(topic.getTopic(), pushMsg); }

上述代码使用一个定时器(@Schedule)来做发布,为了保证运行需要在主类中启用定时器注解:

@EnableScheduling@SpringBootApplicationpublic class BootSampleRedis{...}

D. 接收消息

定义一个消息接收处理的Bean:

@Component public static class MessageSubscriber { public void onMessage(SimpleMessage message, String pattern) { logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message)); } }

接下来,利用 MessageListenerAdapter 可将消息通知到Bean方法:

/** * 消息监听器,使用MessageAdapter可实现自动化解码及方法代理 * * @return */ @Bean public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) { MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage"); adapter.setSerializer(jackson2JsonRedisSerializer); adapter.afterPropertiesSet(); return adapter; }

最后,关联到消息发布的Topic:

/** * 将订阅器绑定到容器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener, new PatternTopic("/redis/*")); return container; }

运行结果启动程序,从控制台可输出:

.RedisPubSub : publish message.RedisPubSub : message send hey you must go now! by admin.RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007}

这样,我们便完成了订阅发布功能。

示例程序下载

小结

消息订阅发布是分布式系统中的常用手段,也经常用来实现系统解耦、性能优化等目的;当前小节结合SpringBoot 演示了 Redis订阅发布(pub/sub)的实现,在部分场景下可以参考使用。欢迎继续关注"美码师的补习系列-springboot篇" ,期待更多精彩内容^-^

, 1, 0, 9);

相关产品

COPYRIGHTS©2017 优德88登录 ALL RIGHTS RESERVED 备案号:229