RocketMQ与SpringBoot整合进行生产级二次封装( 五 )

4.封装消费最终类 注意:收到的消息是先委派给父类,父类进行调度管理
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 实体类消费监听器,在实现RocketMQListener中间还加了一层BaseMqMessageListener来处理基础业务消息 * * @author tianxincoord@163.com * @since 2022/5/12 */@Slf4j@Component@RocketMQMessageListener(topic = RocketMqBizConstant.SOURCE_TOPIC,consumerGroup = RocketMqBizConstant.SOURCE_GROUP,selectorExpression = RocketMqBizConstant.SOURCE_TAG,// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小consumeThreadMax = 5)public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>implements RocketMQListener<RocketMqEntityMessage> {/*** 此处只是说明封装的思想,更多还是要根据业务操作决定* 内功心法有了,无论什么招式都可以发挥最大威力*/@Overrideprotected String consumerName() {return "RocketMQ二次封装消息消费者";}@Overridepublic void onMessage(RocketMqEntityMessage message) {// 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型super.dispatchMessage(message);}@Overrideprotected void handleMessage(RocketMqEntityMessage message) throws Exception {// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试System.out.println("业务消息处理");}@Overrideprotected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {// 当超过指定重试次数消息时此处方法会被调用// 生产中可以进行回退或其他业务操作}@Overrideprotected boolean isRetry() {return false;}@Overrideprotected int maxRetryTimes() {// 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用return 5;}@Overrideprotected boolean isThrowException() {// 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常return false;}}5.封装后对于子类来说,只需要告诉父类要不要做就拥有了最开始说的所有功能,简化了使用,此时子类消费者只需要专注于自己的业务核心处理就可以了
2.4 广播消息的应用场景

  1. 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息
  2. 也就是需要所有服务节点都需要同事做某一件事情的时候,此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理
  3. 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的,所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释,否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 广播消息 * 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息 *也就是需要所有服务节点都需要同事做某一件事情的时候 * 此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理 * * 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876 *这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的 *所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释 *否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题 * * @author tianxincoord@163.com * @since 2022/5/12 */@Slf4j@Component@RocketMQMessageListener(topic = RocketMqBizConstant.SOURCE_TOPIC,consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG// messageModel = MessageModel.BROADCASTING)public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {/*** MessageExt:内置的消息实体,生产中根据需要自己封装实体*/@Overridepublic void onMessage(MessageExt message) {log.info("收到广播消息【{}】", new String(message.getBody()));}}


推荐阅读