0%

1. Operating System

(1) Definition

Operating system is a body of software which is responsible for making sure the system operates correctly and efficiently in an easy-to-use manner. Operating system acchieve this goal through a general techenique that we call virtualization.

(2) Responsibility of the OS

Make it easy to run programs, allow programs to share memory, enable programs to interact with diveces,.etc.

(3) Four Themes of This Book

  1. Virtualization
  2. Concurrency
  3. Persistence
  4. Security

2. Virtualzition

(1) Definition

Transform physical resources into a more general, powerful and easy-to-use virtual form of itself.

(2) Virtualizing The CPU

Turning a single CPU into a seemingly infinite number of CPUs and thus allowing many programs to seemingly run at the same time

(3) Virtualizing The Memory

Each process accesses its own private virtual address space, which the OS somehow maps onto the physical address of the machine.

Note: Physical Memory Model

Memory is just an array of bytes; to read memory, one must specify an address to be access the data stored there; to write memory, one must specify the data to be written to the given address.

3. Concurrency

(1) Definition

A host of problems that arise when working on many things at onces in the same program.

(2) Senario

Use two threads to increment a shared variable in a loop. As the number of interation set to 1000, the final value is 2000 as expected. When the number of iteraction set to 100000, the correct answer is not guaranteed.

(3) Cause

The increment process takes 3 instructions: load the value of the vairable from memory into a register, increment, store it back to memory. These instructions do not execute atomically.

4. Persistence

(1) File System

File System is responsible for storing any files the user creates in a reliable and efficient manner on the disk of the system.

(2) Standard Library

OS provides uniform library to make it easy to access devices through system calls.

(3) Performance Optimization

Most file system delay writes for a while, hoping to batch them to a larger group.

(4) Reliability Opimization

To handle the problems of system crashes during writes, most file system incorporate some kind of intricate write protocol like copy-on-write to ensure that if failure occurs during writing sequence, the system can recover to a reasonable state afterwards.

5. Design Goals of Operating System

  • Abstraction
  • Minimize the overhead
  • Protection between application(Isolation)
  • Reliability
  • Security
  • Mobility
  • Energy-efficiency

事务消息机制源码解析

代码解析:

  1. 发送事务消息入口
1
2
3
4
5
6
7
8
9
10
package org.apache.rocketmq.client.producer;
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}

msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

如果TransactionListenner为空则抛出异常,否则调用defaultMQProducerImpl类下的sendMessageInTransaction方法发送事务消息

  1. 发送事务消息具体实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    package org.apache.rocketmq.client.impl.producer;
    public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
    throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
    sendResult = this.send(msg);
    } catch (Exception e) {
    throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
    case SEND_OK: {
    try {
    if (sendResult.getTransactionId() != null) {
    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
    }
    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    if (null != transactionId && !"".equals(transactionId)) {
    msg.setTransactionId(transactionId);
    }
    if (null != localTransactionExecuter) {
    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
    } else if (transactionListener != null) {
    log.debug("Used new transaction API");
    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
    if (null == localTransactionState) {
    localTransactionState = LocalTransactionState.UNKNOW;
    }

    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
    log.info("executeLocalTransactionBranch return {}", localTransactionState);
    log.info(msg.toString());
    }
    } catch (Throwable e) {
    log.info("executeLocalTransactionBranch exception", e);
    log.info(msg.toString());
    localException = e;
    }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
    break;
    default:
    break;
    }

    try {
    this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
    }

在发送前先设置消息的PROPERTY_TRANSACTION_PREPARED属性为true,指定这是个半消息。同时设置消息的PROPERTY_PRODUCER_GROUP属性值为当前消息的生产者组是哪个。然后向Broker发消息。

  1. Broker接收生产者发送消息请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package org.apache.rocketmq.broker.processor;
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

response.setOpaque(request.getOpaque());

response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

log.debug("receive SendMessage request command, {}", request);

final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}

response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}

final byte[] body = request.getBody();

int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);

if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}

msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

Broker收到请求后,判断消息是不是半消息,如果是则调用TransactionMessageService类的prepareMessage方法,否则走普通消息的逻辑,即调用MessageStoreputMessage方法。

1
2
3
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}

TransactionalMessageBridge是桥接类,作用主要是封装事务消息,然后用MessageStoreputMessage方法对半消息持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.apache.rocketmq.broker.transaction.queue;
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

在消息持久化之前,先将消息原本的TopicQueueId备份,然后将消息的主题设置成RMQ_SYS_TRANS_HALF_TOPIC,将QueueID固定为0。注意此时的半消息对消费者是不可见的。半消息持久化完成后,会把消息发送的结果返回给生产者。

  1. 收到消息发送成功响应,生产者执行本地事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package org.apache.rocketmq.client.impl.producer;
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

如果消息发送成功,生产者会调用TransactionListenerexecuteLocalTransaction方法执行本地事务。localTransactionState初始化状态为UNKNOWN,事务执行完成后会返回COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOWN其一。如果发送失败,说明半消息没有持久化成功。此时设置LocalTransactionStateROLLBACK_MESSAGE,但后续执行RollBackMessage`的时候会提示找不到半消息。

  1. 结束事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.apache.rocketmq.client.impl.producer;
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

transactionIDCommitLogOffsetCommitOrRollBackProducerGroupTranStateTableOffset(queueOffset)这一系列参数写到请求的头部,然后发到Broker。

在Boker端会用EndTransactionProcessor去处理请求。

  1. EndTransactionProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package org.apache.rocketmq.broker.processor;
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}

if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());

break;
}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
} else {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}

private RemotingCommand checkPrepareMessage(MessageExt msgExt, EndTransactionRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (msgExt != null) {
final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The producer group wrong");
return response;
}

if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The transaction state table offset wrong");
return response;
}

if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The commit log offset wrong");
return response;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find prepared transaction message failed");
return response;
}
response.setCode(ResponseCode.SUCCESS);
return response;
}

private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}

Broker处理endTransaction请求时,先根据MessageSysFlagTRANSACTION_COMMIT_TYPE还是TRANSACTION_ROLLBACK_TYPE

  • 如果MessageSysFlagTRANSACTION_COMMIT_TYPE, 则根据调用TransactionMessageService类的commitMessage方法。从语义上来说commitMessage是提交消息,其真实逻辑是根据返回的resultcommtLogOfsetcommitLog获取半消息。如果结果返回成功了,那么先调用checkPreparedMessage去检查半消息的合法性,半消息的producrGroupTranStateTableOffsetcommitLogOffset应该与请求头的参数相同,同时半消息不应该为空。如果半消息没问题,那么就调用endMessageTransaction方法。将半消息的TopicqueueId还原成在属性中备份的真实值。回到processRequest方法,因为在半消息持久化前,MessageSysFlag被设置成TRANSACTION_NOT_TYPE, 现在要commit消息,需要把MessageSysFlag改成TRANSACTION_COMMIT_TYPE,然后调用sendFinalMessage将还原后的消息持久化,调用commitLogDispatcherBuildComsumer.dispatch更新topicconsumerQueue,经过这些过程消息对消费者可见。sendFinalMessage执行完以后调用TransactionServicedeletePrepareMessage将半消息删除。这个删除不是真正意义上的删除,而是构建一个RMQ_SYS_TRANS_OP_HALF_TOPIC队列,queueID跟半消息的一样都是0,然后把消息插到队列里面去,消息内容是半消息的queueOffset,并将TAGS属性设为d(d也就是TransactionalMessageUtil.REMOVETAG),标记这个消息的状态是已经commit或者rollback了。除此之外,还会创建一个哈希表opQueueMap, 让半消息的队列messageQueue映射到opQueue
  • 如果MessageSysFlagTRANSACTION_ROLLBACK_TYPE。先调用rollbackMessage,根据返回的result获取半消息。检查半消息合法性后,删除半消息,逻辑跟前面的一样。
  1. 事务回查机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.rocketmq.broker.transaction;
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}

protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package org.apache.rocketmq.broker.transaction.queue;
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}

List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}

if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}

long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}

}

private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
if (null == pullResult) {
return null;
}
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
} else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
return pullResult;
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
log.debug("Remove map: {}", removeMap);
log.debug("Done op list: {}", doneOpOffset);
return pullResult;
}

事务回查的核心功能是通过TransactionMessageServieImplcheck方法实现的。TransactionMessageCheckServierun方法会启动一个线程,每隔checkInterval时间执行一次onWaitEnd方法调用check方法。在check方法中,首先根据半消息的topic拿到msgQueue,根据msgQueue拿到opQueue。在前面我们说过,半消息的msgQueueopQueue是一一对应的关系。如果opQueueMap为空,则创建一个空的opQueue,把两个队列的对应关系插入到opQueueMap里面。在用halfOffsetopOffset记录各自消费进度后,调用 fillOpRemoveMap去填充removeMapdoneOpOffset。在填充之前先用pullOpMessagetransactionalMessageBridge把从pullOffsetOfOp开始的32个message复制到现在的opQueue里。填充逻辑如下:

image-20210831214240963

对于半消息队列来说来说 min offset就是当前消费进度,max offset是最后一个消息的offset。对于op消息队列来说,queue offset当前op消息的内容,也就是在半消息中的offset。max offset最大是32,因为我们一次最多pull32个op消息到op消息队列。这个方法会遍历整个op消息队列,每次获取当前消息的内容queue offset然后跟min offset去比对,如果queue offset < min offset说明这个消息已经是已经被处理过了,不用再管。其余的按照key为半消息队列offset, value为op消息队列offset插入removeMap,说明key对应的半消息不需要回查。

在填充完removeMap后会进入一个循环 ,从halfOffset开始去遍历半消息,removeMap的key包含这个offset,说明这个offset对应的半消息已经被commit或者rollback了,不需要回查,执行下一个offset+1对应 半消息的检查。如果是不在removeMap里面,就用needDiscard去判断这个消息有没有超过最大回查次数15次,如果超过15次了那就不查了。同时要判断这个半消息存盘后有没有超过72个小时,因为超过72小时这个半消息会被丢弃。接下来有一个checkImmunityTimeStr,这个是回查免疫时间,就是消息存盘后这个回查免疫时间内是不进行回查的。如果在这个时间内,而且而且是首次回查,就不进行回查。如果是生产者没有设置首次回查免疫时间,如果在免疫时间内就跳过当前半消息以及后续所有的的半消息的回查,因为同一个consumer中的消息是有顺序的。如果前面条件都满足,就调用putBackHalfMsgQueue再次存盘。因为这次回查是否能够确定状态是不知道的,为了下一次回查,将这个半消息再次存盘。然后调用resolveHalfMsg进行回查逻辑。每次回查结束都更新半消息队列和op消息队列。这个while循环是有时间限制的,如果超过时间限制则会跳出循环。

前言

当年在上大学物理的时候的课程项目要求每个人都做一下物理学前沿知识的报告,我的小组选了Quantum Computation这一个课题。在这个报告中我主要负责Quantum Computation涉及到的一些基本的Quantum Mechanics的知识,其中的大部分内容总结自Dirac的《The Principles of Quantum Mechanics》和量子计算的bible《Quantum Computation and Quantum Information》。这是我这篇报告的由来。

Read more »