请教一下,基于 MQ 的事务消息比如 RocketMQ 的事务消息和在事务中投递消息,如果投递消息失败则回滚的差异在哪里了。
新手上路,多多包涵。
1
zhgg0 2021-05-27 22:51:59 +08:00 1
在事务中投递消息,如果投递成功,接下来的事务失败咋回滚消息?
|
2
yeqizhang 2021-05-28 00:58:55 +08:00 via Android
无论如何都是要等本地事务执行成功后再尝试提交消息(事务消息在本地事务执行前只是半提交),如果提交消息的代码穿插在本地事务中间,就会出现一楼说的消息提交后本地事务又失败了怎么办。提交消息成功没有回滚这个说法,事务消息那个回滚是因为消息中间间支持了半提交的消息。
如果你用普通消息,本地事务执行完后最后发送消息失败,需要重试发送,或者对已经执行成功的本地事务进行处理。事务消息那是另外一种做法 |
3
fkname 2021-05-28 09:36:08 +08:00
事务消息是确保你发送的消息肯定在发送方是成功执行的,而在事务中发送消息消息会立刻发送出去,事务无法限制消息的发送。
|
4
SilenceLL OP |
5
SilenceLL OP 是基于失败的成本考虑吗?如果先发 half 消息失败成本低。如果先提交事务再发送 half 消息导致事务回滚成本高。
|
6
yeqizhang 2021-05-29 18:16:18 +08:00 via Android
@SilenceLL 你 4 楼说的不行,不应该将发送消息的代码放入到数据库事务中,否则在重试发送消息的过程中过多的重试会拉长数据库事务执行时间。
至于 5 楼说的和发普通消息没区别了,具体事务消息对此有何优点,我也没找到,网上很少有说回滚本地事务操作这块的内容的 |
7
yeqizhang 2021-05-29 23:27:48 +08:00
忽略我之前说的哈,是我之前学习时没搞明白并且也忘了。现在在这里补充一下我今天看的,当作在这做个笔记,如果打扰到楼主,感到抱歉~
首先我把标题改一下,楼主问的也挺好的,就是问 事务消息这种形式和在本地事务中耦合发送普通消息的差异。 在本地事务中耦合发送普通消息时,就是楼主一开始问的“在事务中投递消息,如果投递消息失败则回滚”,这个是没有问题的,最多是可能需要尝试发几次,本地事务很好回滚,这个没啥问题。 而“事务消息”,如果预发送和本地事务都执行成功了,那提交消息这一步到最后一定需要执行成功,没有回滚这一说。 而在本地事务中投递消息,会有什么问题呢,以下摘自 https://blog.csdn.net/zxjoke/article/details/105260252 // 开始事务 try { // 1.执行数据库操作 // 2.发送 mq 消息 // 3.提交事务 }catch (Exception e){ // 4.回滚事务 } 上面代码看起来确实没什么问题,消息发送失败,回滚事务。 但是实际上第二步有可能存在消息已经发送到 MQ 服务端,但是由于网络问题未及时收到 MQ 的响应消息,从而导致消息发送端认为消息消息发送失败。 这就会导致订单事务回滚了,但是手续费系统却能消费消息,两边数据库又不一致了。 熟悉 MQ 的同学,可能会想到,消息发送失败,可以重试啊。 是的,我们可以增加重试次数,重新发送消息。但是这里我们需要注意,由于消息发送耦合在事务中,过多的重试会拉长数据库事务执行时间,事务处理时间过长,导致事务中锁的持有时间变长,影响整体的数据库吞吐量。 实际业务中,不太建议将消息发送耦合在数据库事务中。 |
8
SilenceLL OP 比如这样子:
public class Test { @Transactional public void createOrder() { execCreateOrder(); sendHalfMessage(); commitCreateOrder(); } @RocketMQTransactionListener static class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("----executeLocalTransaction----{}", msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class)); return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("----checkLocalTransaction----{}", msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class)); String keys = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class); Order order = orderService.query(orderId); if (null == order) { return RocketMQLocalTransactionState.UNKNOWN; } return RocketMQLocalTransactionState.COMMIT; } } } |
9
SilenceLL OP |