Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void decodeMsgListTest_messageNotNull() throws Exception {


getMessageResult.addMessage(result);
getMessageResult.getMessageQueueOffset().add(0L);
getMessageResult.addQueueOffset(0L);
List<MessageExt> list = escapeBridge.decodeMsgList(getMessageResult, false); // skip deCompressBody test
Assert.assertEquals(1, list.size());
Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
Expand Down Expand Up @@ -416,7 +416,7 @@ private GetMessageResult mockGetMessageResult(int count, String topic, byte[] bo
SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, byteBuffer, body.length, new DefaultMappedFile());

result.addMessage(bufferResult);
result.getMessageQueueOffset().add(i + 0L);
result.addQueueOffset(i + 0L);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void addGetMessageResultTest() {
clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId);
GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
result.getMessageQueueOffset().add(100L);
result.addQueueOffset(100L);
consumerService.handleGetMessageResult(
context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100);
Assert.assertEquals(1, context.getGetMessageResultList().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ public void testHandleGetMessageResult_nullResult() {
public void testHandleGetMessageResult_found() {
int msgCount = 2;
GetMessageResult getResult = mockGetMessageResult(GetMessageStatus.FOUND, msgCount, 100L);
getResult.getMessageQueueOffset().add(0L);
getResult.getMessageQueueOffset().add(1L);
getResult.addQueueOffset(0L);
getResult.addQueueOffset(1L);

doNothing().when(popLiteMessageProcessor).recordPopLiteMetrics(any(), anyString(), anyString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ConsumeQueue implements ConsumeQueueInterface {
private final int queueId;
private final ByteBuffer byteBufferIndex;

private final String topicQueueKey;
private final String storePath;
private final int mappedFileSize;
private long maxPhysicOffset = -1;
Expand All @@ -92,6 +93,7 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat

this.topic = topic;
this.queueId = queueId;
this.topicQueueKey = topic + "-" + queueId;

String queueDir = this.storePath
+ File.separator + topic
Expand Down Expand Up @@ -784,15 +786,13 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String

@Override
public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
}

@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg,
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
}

Expand Down Expand Up @@ -852,7 +852,8 @@ private boolean putMessagePositionInfo(final long offset, final int size, final
this.setMaxPhysicOffset(offset + size);
boolean appendResult;
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
this.byteBufferIndex.flip();
appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex);
Comment thread
wang-jiahua marked this conversation as resolved.
} else {
appendResult = mappedFile.appendMessage(this.byteBufferIndex.array());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,20 +669,17 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

putResultFuture.thenAccept(result -> {
putResultFuture.whenComplete((result, ex) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
elapsedTime, msg.getTopic(), msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});

return putResultFuture;
}

Expand All @@ -698,19 +695,16 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

putResultFuture.thenAccept(result -> {
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
putResultFuture.whenComplete((result, ex) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});

return putResultFuture;
}

Expand Down Expand Up @@ -890,7 +884,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
long minOffset = 0;
long maxOffset = 0;

GetMessageResult getResult = new GetMessageResult();
GetMessageResult getResult = new GetMessageResult(maxMsgNums);
int filterMessageCount = 0;

final long maxOffsetPy = this.commitLog.getMaxOffset();
Expand Down
43 changes: 29 additions & 14 deletions store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@
import org.apache.rocketmq.common.message.MessageConst;

public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
private String topic;
private int queueId;
private long commitLogOffset;
private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
private final String uniqKey;

private final int sysFlag;
private final long preparedTransactionOffset;
private final Map<String, String> propertiesMap;
private long tagsCode;
private long storeTimestamp;
private long consumeQueueOffset;
private String keys;
private boolean success;
private String uniqKey;

private int sysFlag;
private long preparedTransactionOffset;
private Map<String, String> propertiesMap;
private byte[] bitMap;

private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
Expand All @@ -48,6 +48,9 @@ public class DispatchRequest {

private String offsetId;

DispatchRequest() {
}

public DispatchRequest(
final String topic,
final int queueId,
Expand All @@ -62,6 +65,14 @@ public DispatchRequest(
final long preparedTransactionOffset,
final Map<String, String> propertiesMap
) {
this.reset(topic, queueId, commitLogOffset, msgSize, tagsCode, storeTimestamp,
consumeQueueOffset, keys, uniqKey, sysFlag, preparedTransactionOffset, propertiesMap);
}

void reset(String topic, int queueId, long commitLogOffset, int msgSize,
long tagsCode, long storeTimestamp, long consumeQueueOffset,
String keys, String uniqKey, int sysFlag, long preparedTransactionOffset,
Map<String, String> propertiesMap) {
this.topic = topic;
this.queueId = queueId;
this.commitLogOffset = commitLogOffset;
Expand All @@ -72,11 +83,15 @@ public DispatchRequest(
this.msgBaseOffset = consumeQueueOffset;
this.keys = keys;
this.uniqKey = uniqKey;

this.sysFlag = sysFlag;
this.preparedTransactionOffset = preparedTransactionOffset;
this.success = true;
this.propertiesMap = propertiesMap;
this.bitMap = null;
this.bufferSize = -1;
this.batchSize = 1;
this.nextReputFromOffset = -1;
this.offsetId = null;
}

public DispatchRequest(String topic, int queueId, long consumeQueueOffset, long commitLogOffset, int size, long tagsCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.store;

import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -25,7 +26,8 @@ public class GetMessageResult {

private final List<SelectMappedBufferResult> messageMapedList;
private final List<ByteBuffer> messageBufferList;
private final List<Long> messageQueueOffset;
private long[] messageQueueOffset;
private int queueOffsetSize;

private GetMessageStatus status;
private long nextBeginOffset;
Expand All @@ -47,29 +49,33 @@ public class GetMessageResult {

public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
Collections.emptyList(), new long[0], 0);

public GetMessageResult() {
messageMapedList = new ArrayList<>(100);
messageBufferList = new ArrayList<>(100);
messageQueueOffset = new ArrayList<>(100);
messageQueueOffset = new long[100];
queueOffsetSize = 0;
}

public GetMessageResult(int resultSize) {
messageMapedList = new ArrayList<>(resultSize);
messageBufferList = new ArrayList<>(resultSize);
messageQueueOffset = new ArrayList<>(resultSize);
int initialCapacity = Math.min(Math.max(resultSize, 1), 256);
messageMapedList = new ArrayList<>(initialCapacity);
messageBufferList = new ArrayList<>(initialCapacity);
messageQueueOffset = new long[initialCapacity];
queueOffsetSize = 0;
}

private GetMessageResult(GetMessageStatus status, long nextBeginOffset, long minOffset, long maxOffset,
List<SelectMappedBufferResult> messageMapedList, List<ByteBuffer> messageBufferList, List<Long> messageQueueOffset) {
List<SelectMappedBufferResult> messageMapedList, List<ByteBuffer> messageBufferList, long[] messageQueueOffset, int queueOffsetSize) {
this.status = status;
this.nextBeginOffset = nextBeginOffset;
this.minOffset = minOffset;
this.maxOffset = maxOffset;
this.messageMapedList = messageMapedList;
this.messageBufferList = messageBufferList;
this.messageQueueOffset = messageQueueOffset;
this.queueOffsetSize = queueOffsetSize;
}

public GetMessageStatus getStatus() {
Expand Down Expand Up @@ -128,7 +134,12 @@ public void addMessage(final SelectMappedBufferResult mapedBuffer, final long qu
this.msgCount4Commercial += (int) Math.ceil(
mapedBuffer.getSize() / (double)commercialSizePerMsg);
this.messageCount++;
this.messageQueueOffset.add(queueOffset);
if (queueOffsetSize == messageQueueOffset.length) {
long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1)];
System.arraycopy(messageQueueOffset, 0, newArr, 0, queueOffsetSize);
messageQueueOffset = newArr;
}
messageQueueOffset[queueOffsetSize++] = queueOffset;
Comment thread
wang-jiahua marked this conversation as resolved.
}


Expand Down Expand Up @@ -167,8 +178,25 @@ public void setMsgCount4Commercial(int msgCount4Commercial) {
this.msgCount4Commercial = msgCount4Commercial;
}

public void addQueueOffset(long offset) {
if (queueOffsetSize == messageQueueOffset.length) {
long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1) + 1];
System.arraycopy(messageQueueOffset, 0, newArr, 0, queueOffsetSize);
messageQueueOffset = newArr;
}
messageQueueOffset[queueOffsetSize++] = offset;
}

public List<Long> getMessageQueueOffset() {
return messageQueueOffset;
final long[] arr = this.messageQueueOffset;
final int size = this.queueOffsetSize;
return new AbstractList<Long>() {
@Override public Long get(int index) {
if (index < 0 || index >= size) throw new IndexOutOfBoundsException();
return arr[index];
}
@Override public int size() { return size; }
};
}
Comment thread
wang-jiahua marked this conversation as resolved.

public long getColdDataSum() {
Expand Down
Loading
Loading