SpringBoot整合RocketMQ,老鸟们都是这么玩的!( 五 )

 public class EnvironmentIsolationConfig implements BeanPostProcessor {private RocketEnhanceProperties rocketEnhanceProperties;public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {this.rocketEnhanceProperties = rocketEnhanceProperties;}/*** 在装载Bean之前实现参数修改*/@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if(bean instanceof DefaultRocketMQListenerContainer){DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));}return container;}return bean;}} @ConfigurationProperties(prefix = "rocketmq.enhance")@Datapublic class RocketEnhanceProperties {private boolean enabledIsolation;private String environment;}3.3 封装后的使用3.3.1 引入依赖 <dependency><groupId>com.jianzh5</groupId><artifactId>cloud-rocket-starter</artifactId></dependency>3.3.2 自定义配置 rocketmq: ... enhance:# 启动隔离,用于激活配置类EnvironmentIsolationConfig# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果enabledIsolation: true# 隔离环境名称,拼接到topic后,topic_dev,默认空字符串environment: dev3.3.3 发送消息 @RestController@RequestMapping("enhance")@Slf4jpublic class EnhanceProduceController {//注入增强后的模板,可以自动实现环境隔离,日志记录@Setter(onMethod_ = @Autowired)private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;private static final String topic = "rocket_enhance";private static final String tag = "member";/*** 发送实体消息*/@GetMapping("/member")public SendResult member() {String key = UUID.randomUUID().toString();MemberMessage message = new MemberMessage();// 设置业务keymessage.setKey(key);// 设置消息来源,便于查询message.setSource("MEMBER");// 业务消息内容message.setUserName("Java日知录");message.setBirthday(LocalDate.now());return rocketMQEnhanceTemplate.send(topic, tag, message);}}注意这里使用的是封装后的模板工具类,一旦在配置文件中启动环境隔离,则生产者的消息也自动发送到隔离后的topic中 。
3.3.4 消费者@Slf4j@Component@RocketMQMessageListener(consumerGroup = "enhance_consumer_group",topic = "rocket_enhance",selectorExpression = "*",consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够)public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {@Overrideprotected void handleMessage(MemberMessage message) throws Exception {// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试System.out.println("业务消息处理:"+message.getUserName());}@Overrideprotected void handleMaxRetriesExceeded(MemberMessage message) {// 当超过指定重试次数消息时此处方法会被调用// 生产中可以进行回退或其他业务操作log.error("消息消费失败,请执行后续处理");}/*** 是否执行重试机制*/@Overrideprotected boolean isRetry() {return true;}@Overrideprotected boolean throwException() {// 是否抛出异常,false搭配retry自行处理异常return false;}@Overrideprotected boolean filter() {// 消息过滤return false;}/*** 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型*/@Overridepublic void onMessage(MemberMessage memberMessage) {super.dispatchMessage(memberMessage);}}为了方便消费者对RocketMQ中的消息进行处理,我们可以使用EnhanceMessageHandler来进行消息的处理和逻辑的处理 。
消费者实现了RocketMQListener的同时,可以继承EnhanceMessageHandler来进行公共逻辑的处理,而核心业务逻辑需要自己实现handleMessage方法 。如果需要对消息进行过滤或者去重的处理,则可以重写父类的filter方法进行实现 。这样可以更加方便地对消息进行处理,减轻开发者的工作量 。
以上,就是今天的主要内容,希望对你有所帮助!

【SpringBoot整合RocketMQ,老鸟们都是这么玩的!】


推荐阅读