SpringBoot整合RocketMQ,老鸟们都是这么玩的!( 四 )

这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送 。
3.2.3 消费者的封装@Slf4jpublic abstract class EnhanceMessageHandler<T extends BaseMessage> {/*** 默认重试次数*/private static final int MAX_RETRY_TIMES = 3;/*** 延时等级*/private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;@Resourceprivate RocketMQEnhanceTemplate rocketMQEnhanceTemplate;/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void handleMaxRetriesExceeded(T message);/*** 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理* @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean filter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常* 返回true,则由rocketmq机制自动重试* false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean throwException();/*** 最大重试次数** @return 最大重试次数,默认5次*/protected int getMaxRetryTimes() {return MAX_RETRY_TIMES;}/*** isRetry开启时,重新入队延迟时间* @return -1:立即入队重试*/protected int getDelayLevel() {return DELAY_LEVEL;}/*** 使用模板模式构建消息消费框架,可自由扩展或删减*/public void dispatchMessage(T message) {// 基础日志记录被父类处理了log.info("消费者收到消息[{}]", JSONObject.toJSON(message));if (filter(message)) {log.info("消息id{}不满足消费条件,已过滤 。",message.getKey());return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > getMaxRetryTimes()) {handleMaxRetriesExceeded(message);return;}try {long now = System.currentTimeMillis();handleMessage(message);long costTime = System.currentTimeMillis() - now;log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime);} catch (Exception e) {log.error("消息{}消费异常", message.getKey(),e);// 是捕获异常还是抛出,由子类决定if (throwException()) {//抛出异常,由DefaultMessageListenerConcurrently类处理throw new RuntimeException(e);}//此时如果不开启重试机制,则默认ACK了if (isRetry()) {handleRetry(message);}}}protected void handleRetry(T message) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (annotation == null) {return;}//重新构建消息体String messageSource = message.getSource();if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);}message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());} catch (Exception ex) {// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息//由生产者直接发送throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常处理,异常重试等公共逻辑,消息过滤(查重)、业务处理则交由子类实现 。
3.2.4 基础配置类@Configuration@EnableConfigurationProperties(RocketEnhanceProperties.class)public class RocketMQEnhanceAutoConfiguration {/*** 注入增强的RocketMQEnhanceTemplate*/@Beanpublic RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){return new RocketMQEnhanceTemplate(rocketMQTemplate);}/*** 解决RocketMQ Jackson不支持Java时间类型配置* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}*/@Bean@Primarypublic RocketMQMessageConverter enhanceRocketMQMessageConverter(){RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();for (MessageConverter messageConverter : messageConverterList) {if(messageConverter instanceof MappingJackson2MessageConverter){MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();objectMapper.registerModules(new JavaTimeModule());}}return converter;}/*** 环境隔离配置*/@Bean@ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue=https://www.isolves.com/it/cxkf/kj/2023-04-12/"true")public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){return new EnvironmentIsolationConfig(rocketEnhanceProperties);}}


推荐阅读