去重 消息幂等通用解决方案,写得真好( 二 )


但这在分布式的场景下想找一个通用的方案几乎是不可能的 。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的 。
基于关系数据库事务插入消息表假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:
update t_order set status = 'SUCCESS' where order_no= 'order123';要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了 。

  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务
说明:
  1. 这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了 。这保证我们消费代码只会执行一次 。
  2. 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费 。这保证了消息不丢失 。
事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的 。
基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表 。
但是这里有它的局限性
  1. 消息的消费逻辑必须是依赖于关系型数据库事务 。如果消费的消费过程中还涉及其他数据的修改,例如redis这种不支持事务特性的数据源,则这些数据是不可回滚的 。
  2. 数据库的数据必须是在一个库,跨库无法解决
注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发 。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同) 。
更复杂的业务场景如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值 。并且由于基于事务,可能导致锁表时间过长等性能问题 。
例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):
  1. 检查库存(RPC)
  2. 锁库存(RPC)
  3. 开启事务,插入订单表(MySQL)
  4. 调用某些其他下游服务(RPC)
  5. 更新订单状态
  6. commit 事务(MySQL)
这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的 。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚 。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等” 。
再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发 。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁 。
那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?
拆解消息执行过程其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:
  1. 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
  2. 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费
  3. 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统

  4. 推荐阅读