- 主页 > 生活百科 > >
RocketMQ与SpringBoot整合进行生产级二次封装( 四 )
- 这个类是最基础的原始封装类,相当于餐馆提供的点餐服务 。上面提供无业务特性的发送,比如想要发送日志消息或者动态发送消息目的场景
3.2.3 增强RocketMQTemplate
- 以订单处理中心来说,变化点仅仅只是单号等业务数据不一样,发往订单处理中心的消息不管是topic还是tag等等其实完全都一样,那么此时可以根据业务来增加封装
- 增强原始功能需要注意下面两个点 所有父类能出现的地方,子类都能出现:也就是子类拥有功能 >= 父类 ,比如Java的List,只要入参是List的地方,传ArrayList和LinkedList完全可以 增强功能不能改变原始功能的行为:比如父类有一个方法say是说话,结果子类覆写了say改成了行为是吃饭,然后当调用者调用say的时候得到了一个完全预期外的结果
- 就以订单中心消息发送为例,封装OrderMessageTemplate继承自RocketMqTemplate,此时前者就拥有了封装父类的所有基础方法,拥有了所有父类的功能 。然后可以在前者增加自身业务特性的发送方法,比如发送订单处理消息
package com.codecoord.rocketmq.template;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import org.apache.rocketmq.client.producer.SendResult;import org.springframework.stereotype.Component;import javax.annotation.Resource;import javax.validation.constraints.NotNull;import java.time.LocalDate;import java.time.LocalDateTime;/** * 订单类发送消息模板工具类 * * @author tianxincode@163.com * @since 2022/6/16 */@Componentpublic class OrderMessageTemplate extends RocketMqTemplate {/// 如果不采用继承也可以直接注入使用/* @Resourceprivate RocketMqTemplate rocketMqTemplate; *//*** 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理* 这样对于调用者而言,可以更加简化调用*/public SendResult sendOrderPaid(@NotNull String orderId, String body) {RocketMqEntityMessage message = new RocketMqEntityMessage();message.setKey(orderId);message.setSource("订单支付");message.setMessage(body);// 这两个字段只是为了测试message.setBirthday(LocalDate.now());message.setTradeTime(LocalDateTime.now());return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);}} - 此时对于调用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息发送到订单处理中心
- 封装后的好处,假如现在有10个订单来源,现在需要调整消息发送格式,如果不进行封装那么10个来源发送的地方都需要改;如果进行了二次封装,只需要改sendOrderPaid方法即可,而且还不会出错,此时优势就体现出来了
2.3 RocketMQListener封装
- RocketMQListener是消费消息的核心,同时也涉及到更多的操作,比如:基础日志记录、异常处理、消息重试、警告通知等等等
- 按照抽离变化点,RocketMQListener只应该处理与自身业务相关的,除此之外的其它应该交给父类,子类只需要告诉父类要不要异常处理、要不要重试等等,点餐式服务
- 封装消息消费的抽象类 注意泛型限定为标准基础消息类,这样能到消费者的一定有统一的标准类BaseMqMessage 下面简单封装示例
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.template.RocketMqTemplate;import com.codecoord.rocketmq.util.JsonUtil;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.MDC;import javax.annotation.Resource;import java.time.Instant;import java.util.Objects;/** * 抽象消息监听器,封装了所有公共处理业务,如 * 1、基础日志记录 * 2、异常处理 * 3、消息重试 * 4、警告通知 * 5、.... * * @author tianxincoord@163.com * @since 2022/4/17 */public abstract class BaseMqMessageListener<T extends BaseMqMessage> {/*** 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化*/protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Resourceprivate RocketMqTemplate rocketMqTemplate;/*** 消息者名称** @return 消费者名称*/protected abstract String consumerName();/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void overMaxRetryTimesMessage(T message);/*** 是否过滤消息,例如某些** @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean isFilter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常** @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean isThrowException();/*** 最大重试次数** @return 最大重试次数,默认10次*/protected int maxRetryTimes() {return 10;}/*** isRetry开启时,重新入队延迟时间** @return -1:立即入队重试*/protected int retryDelayLevel() {return -1;}/*** 由父类来完成基础的日志和调配,下面的只是提供一个思路*/public void dispatchMessage(T message) {MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());// 基础日志记录被父类处理了logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));if (isFilter(message)) {logger.info("消息不满足消费条件,已过滤");return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > maxRetryTimes()) {overMaxRetryTimesMessage(message);return;}try {long start = Instant.now().toEpochMilli();handleMessage(message);long end = Instant.now().toEpochMilli();logger.info("消息消费成功,耗时[{}ms]", (end - start));} catch (Exception e) {logger.error("消息消费异常", e);// 是捕获异常还是抛出,由子类决定if (isThrowException()) {throw new RuntimeException(e);}if (isRetry()) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (Objects.nonNull(annotation)) {message.setSource(message.getSource() + "消息重试");message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());} catch (Exception ex) {throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}}}}
推荐阅读
-
本田雅阁|?买中级车还买雅阁?这车起步2.0T配8AT,标配主动刹车,起步22.68万
-
-
-
【厉害了中国制造|俄军舰火速抵达海外基地,战争要打响了?美以联手空袭叙利亚】
-
能咋地把图片导入cad,如何将正射影像按原坐标导入cad
-
#大视角#金贡很暴躁,lwx略显土味,fpx皮肤凌晨官宣!doinb特效最欢乐
-
山猫侃球|走了黑曼巴!还有千千万万个黑曼巴!美国新生儿取名科比人数暴增
-
发型|秋季发型别乱剪,新流行发型20款来了,无论长短都很美
-
《开国将帅》:李晨演毛岸英就算了,但宋庆龄扮演者让我心情复杂
-
-
秦奋|秦奋现身东京街头,粉丝巧遇,网友调侃:有一米五吗?
-
Windows|不满足Win11系统需求照样升级:教你绕过TPM、内存等限制
-
2020年双十一是满多少减多少-2020双十一能便宜多少
-
孔孝真|孔孝真晒对手李敏镐剧照 与小十岁男友大婚在即 秋日写真展现美貌
-
-
背包又大又重 盘点最伤害女性的十件事 脸色不好怎么办?让你看起来像个迷人宝宝的10个小窍门
-
娱乐大扒扒|穿焦糖色鱼尾裙出镜,皮肤都在反光,古力娜扎8年肤色变化太明显
-
「吕丽萍」60岁吕丽萍太不幸,一婚被抛弃收场,二婚丈夫去世,三婚又遇难题
-
“视频监控智能运维系统” 实现电力施工企业安全可视化管控
-