RocketMQ与SpringBoot整合进行生产级二次封装( 三 )

  • 基于上述讨论点,封装建议基于实体类来,实体类不管是排查问题、新人熟悉系统代码、信息校验等String和JSONObject无法像实体类一样轻松胜任
  • 2.2 RocketMQTemplate封装2.2.1 封装基础实体类
    1. 基础消息实体类封装了除了业务消息外所有其他公共字段,主要看下面代码中的字段和注释
    2. 基础抽象消息实体,包含基础的消息、根据自己的业务消息设置更多的字段 其中也可以包含所有消费者可能用得到的方法等,比如做些数据的加解密
    package com.codecoord.rocketmq.domain;import lombok.Data;import java.time.LocalDateTime;import java.util.UUID;/** * 基础消息实体,包含基础的消息 * 根据自己的业务消息设置更多的字段 * * @author tianxincoord@163.com * @since 2022/6/16 */@Datapublic abstract class BaseMqMessage {/*** 业务键,用于RocketMQ控制台查看消费情况*/protected String key;/*** 发送消息来源,用于排查问题*/protected String source = "";/*** 发送时间*/protected LocalDateTime sendTime = LocalDateTime.now();/*** 跟踪id,用于slf4j等日志记录跟踪id,方便查询业务链*/protected String traceId = UUID.randomUUID().toString();/*** 重试次数,用于判断重试次数,超过重试次数发送异常警告*/protected Integer retryTimes = 0;}
    1. 有了此基础抽象实体类,那么剩下的所有业务消息实体只需要继承此基类,然后在自己业务类中包含自己需要的字段即可,因为这些公共字段不管是向上转型还是向下转型,子类和父类都可以看得到
    2.2.2 RocketMQTemplate
    1. RocketMQTemplate发送消息的代码如果不封装,我们发送消息需要这样 String destination = topic + ":" + tag; template.syncSend(destination, message);
    2. 每个人发送消息都要自己处理这个冒号,直接传入topic和tag不香吗?按照抽离变化点中的变化点,只有消息是变化的,除此之外的其他规则交给封装类
    3. RocketMQTemplate主要封装发送消息的日志、异常的处理、消息key设置、等等其他配置
    4. 封装代码类如下,下面包含了主要发送方式,更多自己添加即可 这里就是消息发送的点餐机器,同时也提供了封装方法也提供原始RocketMQTemplate供使用 此处只是提供一种方式,生产中按照项目组商量决定
    package com.codecoord.rocketmq.template;import com.alibaba.fastjson.JSONObject;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.util.JsonUtil;import org.Apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * RocketMQ模板类 * * @author tianxincoord@163.com * @since 2022/4/15 */@Componentpublic class RocketMqTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);@Resource(name = "rocketMQTemplate")private RocketMQTemplate template;/*** 获取模板,如果封装的方法不够提供原生的使用方式*/public RocketMQTemplate getTemplate() {return template;}/*** 构建目的地*/public String buildDestination(String topic, String tag) {return topic + RocketMqSysConstant.DELIMITER + tag;}/*** 发送同步消息*/public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {// 注意分隔符return send(topic + RocketMqSysConstant.DELIMITER + tag, message);}public <T extends BaseMqMessage> SendResult send(String destination, T message) {// 设置业务键,此处根据公共的参数进行处理// 更多的其它基础业务处理...Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();SendResult sendResult = template.syncSend(destination, sendMessage);// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));return sendResult;}/*** 发送延迟消息*/public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);}public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));return sendResult;}}


    推荐阅读