SpringBoot整合RocketMQ,老鸟们都是这么玩的!( 二 )

消费者开始消费时会出现类型转换异常错误Cannot construct instance of java.time.LocalDate,错误详情如下:

SpringBoot整合RocketMQ,老鸟们都是这么玩的!

文章插图
原因:RocketMQ内置使用的转换器是RocketMQMessageConverter,转换Json时使用的是MappingJackson2MessageConverter,但是这个转换器不支持时间类型 。
解决办法:需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,并添加支持时间模块
 @Configurationpublic class RocketMQEnhanceConfig {/*** 解决RocketMQ Jackson不支持Java时间类型配置* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}*/@Bean@Primarypublic RocketMQMessageConverter enhanceRocketMQMessageConverter(){RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();for (MessageConverter messageConverter : messageConverterList) {if(messageConverter instanceof MappingJackson2MessageConverter){MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();objectMapper.registerModules(new JavaTimeModule());}}return converter;}}2.3 RockeMQ环境隔离在使用RocketMQ时,通常会在代码中直接指定消息主题(topic),而且开发环境和测试环境可能共用一个RocketMQ环境 。如果没有进行处理,在开发环境发送的消息就可能被测试环境的消费者消费,测试环境发送的消息也可能被开发环境的消费者消费,从而导致数据混乱的问题 。
为了解决这个问题,我们可以根据不同的环境实现自动隔离 。通过简单配置一个选项,如dev、test、prod等不同环境,所有的消息都会被自动隔离 。例如,当发送的消息主题为consumer_topic?时,可以自动在topic后面加上环境后缀,如consumer_topic_dev 。
那么,我们该如何实现呢?
可以编写一个配置类实现BeanPostProcessor,并重写postProcessBeforeInitialization方法,在监听器实例初始化前修改对应的topic 。
BeanPostProcessor是Spring框架中的一个接口,它的作用是在Spring容器实例化、配置完bean之后,在bean初始化前后进行一些额外的处理工作 。
具体来说,BeanPostProcessor接口定义了两个方法:
postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前进行处理,可以对bean做一些修改等操作 。
postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后进行处理,可以进行一些清理或者其他操作 。BeanPostProcessor可以在应用程序中对Bean的创建和初始化过程进行拦截和修改,对Bean的生命周期进行干预和操作 。
它可以对所有的Bean类实例进行增强处理,使得开发人员可以在Bean初始化前后自定义一些操作,从而实现自己的业务需求 。比如,可以通过BeanPostProcessor来实现注入某些必要的属性值、加入某一个对象等等 。
实现方案如下:
  1. 在配置文件中增加相关配置
rocketmq: enhance:# 启动隔离,用于激活配置类EnvironmentIsolationConfig# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果enabledIsolation: true# 隔离环境名称,拼接到topic后,topic_dev,默认空字符串environment: dev
  1. 新增配置类,在实例化消息监听者之前把topic修改掉
 @Configurationpublic class EnvironmentIsolationConfig implements BeanPostProcessor {@Value("${rocketmq.enhance.enabledIsolation:true}")private boolean enabledIsolation;@Value("${rocketmq.enhance.environment:''}")private String environmentName;/*** 在装载Bean之前实现参数修改*/@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if(bean instanceof DefaultRocketMQListenerContainer){DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;//拼接Topicif(enabledIsolation && StringUtils.hasText(environmentName)){container.setTopic(String.join("_", container.getTopic(),environmentName));}return container;}return bean;}}
  1. 启动项目可以看到日志中消息监听的队列已经被修改了
 2023-03-23 17:04:59.726 [main] INFOo.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{cnotallow='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', cnotallow=CONCURRENTLY, selectorType=TAG, selectorExpressinotallow='*', messageModel=CLUSTERING}


推荐阅读