事务消息机制源码解析
代码解析:
- 发送事务消息入口
1 | package org.apache.rocketmq.client.producer; |
如果TransactionListenner
为空则抛出异常,否则调用defaultMQProducerImpl
类下的sendMessageInTransaction
方法发送事务消息
发送事务消息具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82package org.apache.rocketmq.client.impl.producer;
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
在发送前先设置消息的PROPERTY_TRANSACTION_PREPARED属性为true,指定这是个半消息。同时设置消息的PROPERTY_PRODUCER_GROUP属性值为当前消息的生产者组是哪个。然后向Broker发消息。
- Broker接收生产者发送消息请求
1 | package org.apache.rocketmq.broker.processor; |
Broker
收到请求后,判断消息是不是半消息,如果是则调用TransactionMessageService
类的prepareMessage
方法,否则走普通消息的逻辑,即调用MessageStore
的putMessage
方法。
1 | public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { |
TransactionalMessageBridge
是桥接类,作用主要是封装事务消息,然后用MessageStore
的putMessage
方法对半消息持久化。
1 | package org.apache.rocketmq.broker.transaction.queue; |
在消息持久化之前,先将消息原本的Topic
跟QueueId
备份,然后将消息的主题设置成RMQ_SYS_TRANS_HALF_TOPIC
,将QueueID
固定为0。注意此时的半消息对消费者是不可见的。半消息持久化完成后,会把消息发送的结果返回给生产者。
- 收到消息发送成功响应,生产者执行本地事务
1 | package org.apache.rocketmq.client.impl.producer; |
如果消息发送成功,生产者会调用TransactionListener
的executeLocalTransaction
方法执行本地事务。localTransactionState
初始化状态为UNKNOWN
,事务执行完成后会返回COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOWN其一。如果发送失败,说明半消息没有持久化成功。此时设置
LocalTransactionState为
ROLLBACK_MESSAGE,但后续执行
RollBackMessage`的时候会提示找不到半消息。
- 结束事务
1 | package org.apache.rocketmq.client.impl.producer; |
把transactionID
、CommitLogOffset
、CommitOrRollBack
、ProducerGroup
、TranStateTableOffset
(queueOffset
)这一系列参数写到请求的头部,然后发到Broker。
在Boker端会用EndTransactionProcessor
去处理请求。
- EndTransactionProcessor
1 | package org.apache.rocketmq.broker.processor; |
Broker处理endTransaction
请求时,先根据MessageSysFlag
是TRANSACTION_COMMIT_TYPE
还是TRANSACTION_ROLLBACK_TYPE
。
- 如果
MessageSysFlag
是TRANSACTION_COMMIT_TYPE
, 则根据调用TransactionMessageService
类的commitMessage
方法。从语义上来说commitMessage
是提交消息,其真实逻辑是根据返回的result
的commtLogOfset
从commitLog
获取半消息。如果结果返回成功了,那么先调用checkPreparedMessage
去检查半消息的合法性,半消息的producrGroup
、TranStateTableOffset
、commitLogOffset
应该与请求头的参数相同,同时半消息不应该为空。如果半消息没问题,那么就调用endMessageTransaction
方法。将半消息的Topic
和queueId
还原成在属性中备份的真实值。回到processRequest
方法,因为在半消息持久化前,MessageSysFlag
被设置成TRANSACTION_NOT_TYPE
, 现在要commit消息,需要把MessageSysFlag
改成TRANSACTION_COMMIT_TYPE
,然后调用sendFinalMessage
将还原后的消息持久化,调用commitLogDispatcherBuildComsumer.dispatch
更新topic
的consumerQueue
,经过这些过程消息对消费者可见。sendFinalMessage
执行完以后调用TransactionService
的deletePrepareMessage
将半消息删除。这个删除不是真正意义上的删除,而是构建一个RMQ_SYS_TRANS_OP_HALF_TOPIC
队列,queueID
跟半消息的一样都是0,然后把消息插到队列里面去,消息内容是半消息的queueOffset
,并将TAGS
属性设为d(d也就是TransactionalMessageUtil.REMOVETAG
),标记这个消息的状态是已经commit
或者rollback
了。除此之外,还会创建一个哈希表opQueueMap
, 让半消息的队列messageQueue
映射到opQueue
。 - 如果
MessageSysFlag
是TRANSACTION_ROLLBACK_TYPE
。先调用rollbackMessage
,根据返回的result
获取半消息。检查半消息合法性后,删除半消息,逻辑跟前面的一样。
- 事务回查机制
1 | package org.apache.rocketmq.broker.transaction; |
1 | package org.apache.rocketmq.broker.transaction.queue; |
事务回查的核心功能是通过TransactionMessageServieImpl
的check
方法实现的。TransactionMessageCheckServie
的run
方法会启动一个线程,每隔checkInterval
时间执行一次onWaitEnd
方法调用check
方法。在check方法中,首先根据半消息的topic
拿到msgQueue
,根据msgQueue
拿到opQueue
。在前面我们说过,半消息的msgQueue
跟opQueue
是一一对应的关系。如果opQueueMap
为空,则创建一个空的opQueue
,把两个队列的对应关系插入到opQueueMap
里面。在用halfOffset
和opOffset
记录各自消费进度后,调用 fillOpRemoveMap
去填充removeMap
和doneOpOffset
。在填充之前先用pullOpMessage
从transactionalMessageBridge
把从pullOffsetOfOp
开始的32个message复制到现在的opQueue
里。填充逻辑如下:
对于半消息队列来说来说 min offset
就是当前消费进度,max offset
是最后一个消息的offset。对于op消息队列来说,queue offset
当前op消息的内容,也就是在半消息中的offset。max offset
最大是32,因为我们一次最多pull32个op消息到op消息队列。这个方法会遍历整个op消息队列,每次获取当前消息的内容queue offset
然后跟min offset
去比对,如果queue offset < min offset
说明这个消息已经是已经被处理过了,不用再管。其余的按照key为半消息队列offset, value为op消息队列offset插入removeMap,说明key对应的半消息不需要回查。
在填充完removeMap
后会进入一个循环 ,从halfOffset
开始去遍历半消息,removeMap
的key包含这个offset,说明这个offset对应的半消息已经被commit或者rollback了,不需要回查,执行下一个offset+1对应 半消息的检查。如果是不在removeMap
里面,就用needDiscard
去判断这个消息有没有超过最大回查次数15次,如果超过15次了那就不查了。同时要判断这个半消息存盘后有没有超过72个小时,因为超过72小时这个半消息会被丢弃。接下来有一个checkImmunityTimeStr
,这个是回查免疫时间,就是消息存盘后这个回查免疫时间内是不进行回查的。如果在这个时间内,而且而且是首次回查,就不进行回查。如果是生产者没有设置首次回查免疫时间,如果在免疫时间内就跳过当前半消息以及后续所有的的半消息的回查,因为同一个consumer中的消息是有顺序的。如果前面条件都满足,就调用putBackHalfMsgQueue
再次存盘。因为这次回查是否能够确定状态是不知道的,为了下一次回查,将这个半消息再次存盘。然后调用resolveHalfMsg
进行回查逻辑。每次回查结束都更新半消息队列和op消息队列。这个while循环是有时间限制的,如果超过时间限制则会跳出循环。