3. RocketMQ二次封装在解释为什么要二次封装之前先来看看RocketMQ官方文档中推荐的最佳实践
- 消息发送成功或者失败要打印消息日志,用于业务排查问题 。
- 如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题 。
- RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理 。可以借助关系数据库进行去重 。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等 。
接下来讨论的是在RocketMQ中发送消息时选择何种消息类型最为合适 。
在RocketMQ中有四种可选格式:
- 发送Json对象
- 发送转Json后的String对象
- 根据业务封装对应实体类
- 直接使用原生MessageExt接收 。
有了上面两点结论以后我们来看看为什么要对RocketMQ二次封装 。
3.1 为什么要二次封装按照上述最佳实践,一个完整的消息传递链路从生产到消费应包括 准备消息、发送消息、记录消息日志、处理发送失败、记录接收消息日志、处理业务逻辑、异常处理和异常重试 等步骤 。
虽然使用原生RocketMQ可以完成这些动作,但每个生产者和消费者都需要编写大量重复的代码来完成相同的任务,这就是需要进行二次封装的原因 。我们希望通过二次封装,生产者只需准备好消息实体并调用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理 。
在二次封装中,关键是找出框架在日常使用中所涵盖的许多操作,以及区分哪些操作是可变的,哪些是不变的 。以上述例子为例,实际上只有生产者的消息准备和消费者的业务处理是可变的操作,需要根据需求进行处理,而其他步骤可以固定下来形成一个模板 。
当然,本文提到的二次封装不是指对源代码进行封装,而是针对工具的原始使用方式进行的封装 。可以将其与MyBatis和Mybatis-plus区分开来 。这两者都能完成任务,只不过Mybatis-plus更为简单便捷 。
3.2 实现二次封装实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能 。同时,在自定义starter中还需要解决文章第二部分中提到的一些问题 。
代码结构如下所示:
image-20230403160031944
3.2.1 消息实体类的封装
/** * 消息实体,所有消息都需要继承此类 * 公众号:JAVA日知录 */@Datapublic abstract class BaseMessage {/*** 业务键,用于RocketMQ控制台查看消费情况*/protected String key;/*** 发送消息来源,用于排查问题*/protected String source = "";/*** 发送时间*/protected LocalDateTime sendTime = LocalDateTime.now();/*** 重试次数,用于判断重试次数,超过重试次数发送异常警告*/protected Integer retryTimes = 0;}后面所有发送的消息实体都需要继承此实体类 。3.2.2 消息发送工具类的封装
@Slf4j@RequiredArgsConstructor(onConstructor = @__(@Autowired))public class RocketMQEnhanceTemplate {private final RocketMQTemplate template;@Resourceprivate RocketEnhanceProperties rocketEnhanceProperties;public RocketMQTemplate getTemplate() {return template;}/*** 根据系统上下文自动构建隔离后的topic* 构建目的地*/public String buildDestination(String topic, String tag) {topic = reBuildTopic(topic);return topic + ":" + tag;}/*** 根据环境重新隔离topic* @param topic 原始topic*/private String reBuildTopic(String topic) {if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){return topic +"_" + rocketEnhanceProperties.getEnvironment();}return topic;}/*** 发送同步消息*/public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {// 注意分隔符return send(buildDestination(topic,tag), message);}public <T extends BaseMessage> SendResult send(String destination, T message) {// 设置业务键,此处根据公共的参数进行处理// 更多的其它基础业务处理...Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();SendResult sendResult = template.syncSend(destination, sendMessage);// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));return sendResult;}/*** 发送延迟消息*/public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {return send(buildDestination(topic,tag), message, delayLevel);}public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));return sendResult;}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 在 SpringBoot 中使用 Spring AOP 实现接口鉴权
- SpringBoot中如何实现限流,这种方式才叫优雅!
- SpringBoot中使用PostgreSQL数据库
- SpringBoot对SpringMVC的自动配置,你知道多少?
- AI开发大一统:谷歌OpenXLA开源,整合所有框架和AI芯片
- SpringBoot启动控制台的banner是怎么回事
- SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
- 158资源整合网怎么样 创业网致富网3158
- 在分布式系统中,SpringBoot 实现接口幂等性
- coso内部控制框架?COSO内部控制与企业风险管理整合框架的比较?
