侧边栏壁纸
  • 累计撰写 268 篇文章
  • 累计创建 140 个标签
  • 累计收到 16 条评论

目 录CONTENT

文章目录

RocketMQ 的事务消息

Sherlock
2020-04-05 / 0 评论 / 0 点赞 / 1354 阅读 / 0 字
温馨提示:
本文最后更新于2023-10-09,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RocketMQ 的事务消息要解决得是本地事务执行与消息发送的原子性问题。

确保 MQ Producer 准确的发送出消息,不多发,也不漏发。

1.概念介绍

  • 半(预)消息

指暂时无法传递的消息。当消息成功发送到 MQ Server,但 MQ Server 未从Producer(生产者)接收到消息的第二次确认时,该消息将被标记为“暂时无法传递”。处于此状态的消息称为半消息。

  • 消息状态检查

网络断开连接或Producer(生产者)应用重新启动可能会导致对事务性消息的第二次确认丢失。当 MQ Server 发现一条消息长时间停留在半消息状态时,它将向消息 Producer(生产者)发送请求,检查消息的最终状态(提交或回滚)。

2.执行流程图

RocketMQ_Execute_Flow_Chart.png

  • 1.应用模块(生产者)遇到要发送事务消息的场景时,先发送 prepare 消息(半消息)到 MQ Server。
  • 2.半消息发送成功后,应用模块执行本地事务。
  • 3.根据本地事务结果,将提交或回滚消息发送到 MQ Server。
  • 4.如果在本地事务执行期间缺少提交/回滚消息,或者生产者处于长时间等待状态(超时),MQ Server 将向同一组中的每个生产者发送检查消息,回查事务状态(最多重试15次,超过了默认丢弃此消息)。
  • 5.生产者根据本地事务状态回复提交/回滚消息。
  • 6.提交的消息将传递给消费者,回滚的消息将被丢弃。

MQ 消费的成功机制由 MQ 自己保证。

3.详细设计

3.1 大纲

RocketMQtransactionalmessageoutline.png

如图所示,为了隐藏存储的实现细节,所有事务性消息操作都集中在事务服务接口上。RocketMQ 提供了自己的存储系统的默认实现,我们使用事务桥来实现事务存储逻辑,而不是直接修改 RocketMQ 的存储层。

3.2 发送事务消息

时序图:
sendingtransactionalmessage.png

该图描述了发送事务性消息的时间关系。从该图可以清楚地看到事务消息如何在两个阶段中提交。

以RocketMQ 4.5.2版本为例,事务消息有专门的一个队列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare消息都先往这里放,当消息收到 Commit 请求后,就把消息再塞到真实的 Topic 队列里,供 Consumer 消费,同时向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一条消息。

简易流程图如下:
RocketMQtxmsgsendingactivity.png

图片来自:RocketMQ事务消息学习及刨坑过程

RocketMQ Client 即我们工程中导入的依赖 jar 包,RocketMQ Broker 端即部署的服务端,NameServer 图中暂未体现。

应用模块成对出现,上游为事务消息生产端,下游为事务消息消费端(事务消息对消费端是透明的,与普通消息一致)。

3.3 检查事务消息

应用模块的事务因为中断、重启,或是其他的网络原因,导致无法立即响应的,RocketMQ 会当做 UNKNOW 处理,RocketMQ 事务消息还提供了一个补救方案:定时查询事务消息的数据库事务状态。

时序图:
checkingtransactionalmessage.png

该图描述了事务消息的检查逻辑,当 MQ Server 发现一条消息长时间停留在半消息状态时,它将向生产者发送请求,以获取当前事务的状态。

简易流程图如下:
RocketMQtxmsgcheckingactivity.png

图片来自:RocketMQ事务消息学习及刨坑过程


事务消息和 XA 协议、TCC 模式的简单对比可以参考:RocketMq事务消息 - 简书

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区