网站公告: 首页--雷竞技-雷竞技官网DOTA2,LOL,CSGO电竞及体育赛事竞猜

24小时全国服务热线

13633452353

如果您有任何疑问或是问题,请随时与我们联系

查看联系方式>>
监听器 当前位置: 首页 > 雷竞技集团 > 防盗产品 > 监听器

TDMQ RocketMQ 版事务消息原理解析

浏览次数:    时间:2025-08-02 04:14:11

  在分布式架构系统中,确保跨服务应用的数据一致性始终是系统设计的核心挑战之一。TDMQ RocketMQ 版作为一款基于 Apache RocketMQ 构建的企业级消息中间件,凭借其高可用性和高可靠性特点,通过提供完善的事务消息机制,为这一难题提供了专业的解决方案。本文将结合核心源码,深入解析 RocketMQ 事务消息的实现原理,希望能帮助开发者去构建更健壮的分布式事务系统。

  事务消息是 RocketMQ 提供的一种高级特性消息,通过将二阶段提交的动作和本地事务绑定,来保障分布式场景下消息生产和本地事务的最终一致性,相比普通消息,主要是扩展了二次确认和本地事务状态回查补偿的机制。

  在电商平台中,积分兑换商品这一常见功能就涉及到分布式事务,用户发起兑换后,可能涉及创建兑换订单、扣除用户积分、通知发货服务、扣减库存等一系列动作,此时就要保证订单服务和多个下游业务执行结果的最终一致性,如果积分扣除成功但订单创建失败,会导致用户积分被扣但未获得商品,如果订单创建成功但积分扣除失败,会导致用户获得商品但未扣除积分。

  我们可以采用 TDMQ RocketMQ 版事务消息来实现这一功能,具体分为以下三个阶段:

  用户提交订单并选择使用积分兑换后,订单服务向 RocketMQ 服务端对应的业务 Topic 发送一条事务消息,内容包含 “用户 user-001 发起订单 Order-001 并使用 1000 积分兑换商品 A”。此时,该消息对下游的积分服务和库存服务等均不可见,避免在订单服务事务完成前,积分服务提前扣减积分,确保积分不会被误扣。

  事务消息发送成功后,订单服务继续执行本地事务,创建订单并预占积分,若本地事务成功,则提交二次确认 Commit 到 RocketMQ 服务端,消息被继续投递到下游,反之,提交 Rollback,事务结束,积分状态保持不变。

  积分服务和库存服务预先订阅上面的 Topic,接收到消息后,积分服务扣减积分,库存服务更新库存。若消费过程中因网络异常、服务不可用等问题导致失败,RocketMQ 将自动触发重试机制,若多次重试仍未成功,消息将转入死信队列,后续由人工介入核对,通过补偿流程保障积分和库存数据的最终一致性。

  通过以上三个阶段,RocketMQ 事务消息机制在积分核销场景中,保障了订单本地事务和消息发送的同时成功/失败,成功实现了分布式事务的最终一致性。类似的,在金融交易、企业多系统数据同步等场景中,RocketMQ 事务消息都能凭借其可靠的机制,保障跨服务操作的最终一致性。那么,RocketMQ 事务消息究竟是如何在底层实现这些复杂操作,确保最终一致性的呢?接下来,我们深入探究其背后的原理。

  在分析 RocketMQ 事务消息的实现原理之前,有必要先了解一下这些概念和术语:

  生产者发送事务消息到 RocketMQ 服务端后,消息会被持久化并标记为“暂不可投递”的状态,直到本地事务执行完成并确认后,消息才会决定是否对消费者可见,此状态下的消息,称为半消息(Half Message)。

  实现事务最终一致性的关键机制,一阶段为发送 Half Message,二阶段为生产者执行本地事务,并根据执行结果向 RocketMQ 服务端提交 Commit(允许投递)或 Rollback(丢弃消息)的确认结果,以此来决定 Half Message 的去留。

  用于给消息状态打标记,没有对应 OP 消息的 Half Message,就说明二阶段确认状态未知,需要 RocketMQ 服务端进行本地事务状态主动回查,OP 消息的内容为对应的 Half Message 的存储的 Offset。

  了解完基本概念,结合上面的业务场景,我们来看 RocketMQ 事务消息的实现流程:

  服务端存储这条消息后返回发送成功的响应,此时消息对下游消费者不可见,处于Half Message 状态。

  生产者收到半消息成功的响应后,继续往下执行本地事务(如更新业务数据库)。

  根据本地事务的执行结果,生产者会向 RocketMQ 服务端提交最终状态,也就是二次确认。

  确认结果为 Commit 时,服务端会将事务消息继续向下投递给消费者,确认结果为 Rollback 时,服务端将会丢弃该消息,不再向下投递。

  确认结果是 Unknown 或一直没有收到确认结果时,一定时间后,将会触发事务状态主动回查。

  当生产者未提交最终状态或者二次确认的结果为 Unknown 时,RocketMQ 服务端将会主动发起事务结果查询请求到生产者服务。

  生产者收到请求后提交二次确认结果,逻辑再次回到第5步,此时如果生产者服务暂时不可用,则 RocketMQ 服务端会在指定时间间隔后,继续主动发起回查请求,直到超过最大回查次数后,回滚消息。

  如此,不管本地事务是否执行成功,都能实现事务状态的最终一致性。以上步骤,可用时序图直观体现为:

  了解了事务消息基本的实现流程后,你可能会有疑问,半消息为什么对消费者不可见?二次确认 Commit 或者 Rollback 后,服务端如何投递或者删除半消息?前面提到,Half Message 在服务端做了持久化,但在消费端却不可见,实现这一效果的方式,就是 Topic 替换:首先将事务消息的 Real Topic 和队列信息作为属性暂存起来,以便后续二阶段提交结果为 Commit 时,能正确地投递到下游消费者,然后将消息的 Topic 改为系统 Topic RMQ_SYS_TRANS_HALF_TOPIC,队列 ID 改为0,用户的消费者正常不会订阅这个系统 Topic,自然也就不能看到 Half Message。

  Half Message 被成功投递到上面的系统 Topic 后,开始执行本地事务,如果生产者提交的本地事务二次确认结果为 Commit,则在消息属性中获取消息的 Real Topic、队列等信息,设置 Topic = Real Topic后,再投递下游,最后删除 Half Message(逻辑删除),如果二次确认结果为 Rollback,则只需要逻辑删除对应的 Half Message 即可。这里逻辑删除的实现,就是前面提到的 OP Topic,OP 队列中的消息,记录了 Half Message 对应的二次确认状态,根据这个状态,RocketMQ 服务端会进行第二个核心机制:事务状态主动回查。

  Half Message 写入后,可能会因为种种原因,导致 RocketMQ 服务端一直收不到二次确认结果,比如网络异常、生产者服务暂时不可用、本地事务死锁导致执行时间超长等,此时,就需要 RocketMQ 服务端主动去询问生产者服务本地事务是否执行成功,以决定 Half Message 的最终去留。

  RocketMQ 服务端会启动事务检查定时任务,默认每60秒执行一次,最大回查15次,可通过 TransactionCheckInterval 和 TransactionCheckMax 这两项配置按业务实际情况进行定制化调整。回查时,会对比 Half 队列和 OP 队列的偏移量,若发现 Half 消息未在 OP 队列中有对应的记录且 Half Message 的留存时间超过了事务超时时间(前面分析过,Half Message 是否被二次确认过,是根据 OP 队列来判断的),则触发主动回查动作,向生产者服务发起事务状态检查请求,如此,就解决了部分事务消息状态悬而未决的问题,实现了本地事务和消息发送之间的最终一致性。

  分析完具体的实现原理,接下来我们对照 Half Message 发送、二次确认提交、事务主动回查这三个关键部分的源码实现,来具体看看以上理论在代码中的体现:

  对照以上源码,可以看到,生产者方法发送消息后,首先会对事务监听器做非空校验,因为后面本地事务的执行以及事务状态的主动回查,都需要依赖它来完成,接下来的主要逻辑有四点:

  给原始消息加一个 TRAN_MSG=true 的属性,这是后面判定一条消息为事务消息的条件。

  同步发送 Half Message,若发送失败,则不再执行本地事务,保证了“同失败”的事务一致性。

  若发送成功,则开始执行我们设置的本地事务,并根据执行结果修改本地事务状态值。

  根据事务状态值,来 endTransaction 做收尾工作,这里包含了下面我们要说的事务回查和 Half Message 的删除。

  可以看到,不管生产者提交的二次确认结果是 Commit 还是 Rollback,都会执行 deletePrepareMessage 方法,向 OP 队列写入消息,标识这条 Half Message 已经被处理过了,而并不是把这条消息物理删除掉。和 Rollback 不同的是,Commit 时,需要先获取并设置消息的 Real Topic 和 Real QueueId(这个在第一步发送 Half Message 时已经记录在了消息属性中),然后向下投递,此时,消息对下游消费者可见。

  如果 OP Topic 中有 Half Message 的相关记录,就不再回查,否则,判读消息是否需要被跳过回查(消息超过最大检查次数、超过了消息最大保留时间、刚写入的消息),并且对于超出最大检查次数的消息,丢弃操作其实是将消息转移到 TRANS_CHECK_MAX_TIME_TOPIC 这个系统 Topic。

  直到判断出 Half Message 没有对应的 OP 记录,并且消息留存时长超过了事务超时时间,开始组装发送回查请求到生产者端。

  这里以 TDMQ 版 RocketMQ 5.x 版本集群为例,演示事务消息的使用方式和效果。 1、首先登录腾讯云控制台,新建一个消息类型为事务消息的 Topic。

  4、运行代码后,在控制台的消息查询页面,可以看到已经有一条投递完成等待消费的消息。

  5、启动消费者,订阅这个 Topic,成功消费消息后,在腾讯云控制台查看消息轨迹:

  6、修改代码,假设本地事务执行失败,使处于 Half Message 状态的事务消息回滚。

  7、此时,可以发现消息发送成功了,但在控制台的消息消息查询页面是不可见的,启动消费者也不能消费到这条消息。

  本文从理论与源码双视角剖析了 TDMQ RocketMQ 版事务消息的三大核心流程——半消息的发送存储、二阶段提交及事务状态回查的实现机制。在实际生产中,建议开发者通过幂等设计规避重复消费,合理设置事务超时时间,并关注 Topic 类型限制等约束条件,以充分发挥事务消息在分布式场景中的价值。

【返回列表页】
地址:广东省广州市 电话:020-66889888   手机:13633452353
版权所有:Copyright © 2012-2025 雷竞技,官方网站 版权所有  ICP备案编号:粤ICP备4543623s号