diff --git a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java index 27fc37dbec8..1740e290d2d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java @@ -347,7 +347,7 @@ public void decodeMsgListTest_messageNotNull() throws Exception { getMessageResult.addMessage(result); - getMessageResult.getMessageQueueOffset().add(0L); + getMessageResult.addQueueOffset(0L); List list = escapeBridge.decodeMsgList(getMessageResult, false); // skip deCompressBody test Assert.assertEquals(1, list.size()); Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody())); @@ -416,7 +416,7 @@ private GetMessageResult mockGetMessageResult(int count, String topic, byte[] bo SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, byteBuffer, body.length, new DefaultMappedFile()); result.addMessage(bufferResult); - result.getMessageQueueOffset().add(i + 0L); + result.addQueueOffset(i + 0L); } return result; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 089d2c1f22b..2af858b5045 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -198,7 +198,7 @@ public void addGetMessageResultTest() { clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId); GetMessageResult result = new GetMessageResult(); result.setStatus(GetMessageStatus.FOUND); - result.getMessageQueueOffset().add(100L); + result.addQueueOffset(100L); consumerService.handleGetMessageResult( context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100); Assert.assertEquals(1, context.getGetMessageResultList().size()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java index fa9f2dbfaff..514658ac8be 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java @@ -339,8 +339,8 @@ public void testHandleGetMessageResult_nullResult() { public void testHandleGetMessageResult_found() { int msgCount = 2; GetMessageResult getResult = mockGetMessageResult(GetMessageStatus.FOUND, msgCount, 100L); - getResult.getMessageQueueOffset().add(0L); - getResult.getMessageQueueOffset().add(1L); + getResult.addQueueOffset(0L); + getResult.addQueueOffset(1L); doNothing().when(popLiteMessageProcessor).recordPopLiteMetrics(any(), anyString(), anyString()); diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d1a36c9e136..b271db57e62 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -68,6 +68,7 @@ public class ConsumeQueue implements ConsumeQueueInterface { private final int queueId; private final ByteBuffer byteBufferIndex; + private final String topicQueueKey; private final String storePath; private final int mappedFileSize; private long maxPhysicOffset = -1; @@ -92,6 +93,7 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat this.topic = topic; this.queueId = queueId; + this.topicQueueKey = topic + "-" + queueId; String queueDir = this.storePath + File.separator + topic @@ -784,7 +786,6 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String @Override public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { - String topicQueueKey = getTopic() + "-" + getQueueId(); long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); msg.setQueueOffset(queueOffset); } @@ -792,7 +793,6 @@ public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageEx @Override public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) { - String topicQueueKey = getTopic() + "-" + getQueueId(); queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); } @@ -852,7 +852,8 @@ private boolean putMessagePositionInfo(final long offset, final int size, final this.setMaxPhysicOffset(offset + size); boolean appendResult; if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { - appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array()); + this.byteBufferIndex.flip(); + appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex); } else { appendResult = mappedFile.appendMessage(this.byteBufferIndex.array()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index aee767dae2f..1548d6d3a5d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -669,20 +669,17 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner long beginTime = this.getSystemClock().now(); CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); - - putResultFuture.thenAccept(result -> { + putResultFuture.whenComplete((result, ex) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost {}ms, topic={}, bodyLength={}", elapsedTime, msg.getTopic(), msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } }); - return putResultFuture; } @@ -698,19 +695,16 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess long beginTime = this.getSystemClock().now(); CompletableFuture putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch); - - putResultFuture.thenAccept(result -> { - long eclipseTime = this.getSystemClock().now() - beginTime; - if (eclipseTime > 500) { - LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); + putResultFuture.whenComplete((result, ex) -> { + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } - this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); - + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } }); - return putResultFuture; } @@ -890,7 +884,7 @@ public GetMessageResult getMessage(final String group, final String topic, final long minOffset = 0; long maxOffset = 0; - GetMessageResult getResult = new GetMessageResult(); + GetMessageResult getResult = new GetMessageResult(maxMsgNums); int filterMessageCount = 0; final long maxOffsetPy = this.commitLog.getMaxOffset(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index 654760b88c8..104401540a5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -22,20 +22,20 @@ import org.apache.rocketmq.common.message.MessageConst; public class DispatchRequest { - private final String topic; - private final int queueId; - private final long commitLogOffset; + private String topic; + private int queueId; + private long commitLogOffset; private int msgSize; - private final long tagsCode; - private final long storeTimestamp; - private final long consumeQueueOffset; - private final String keys; - private final boolean success; - private final String uniqKey; - - private final int sysFlag; - private final long preparedTransactionOffset; - private final Map propertiesMap; + private long tagsCode; + private long storeTimestamp; + private long consumeQueueOffset; + private String keys; + private boolean success; + private String uniqKey; + + private int sysFlag; + private long preparedTransactionOffset; + private Map propertiesMap; private byte[] bitMap; private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something @@ -48,6 +48,9 @@ public class DispatchRequest { private String offsetId; + DispatchRequest() { + } + public DispatchRequest( final String topic, final int queueId, @@ -62,6 +65,14 @@ public DispatchRequest( final long preparedTransactionOffset, final Map propertiesMap ) { + this.reset(topic, queueId, commitLogOffset, msgSize, tagsCode, storeTimestamp, + consumeQueueOffset, keys, uniqKey, sysFlag, preparedTransactionOffset, propertiesMap); + } + + void reset(String topic, int queueId, long commitLogOffset, int msgSize, + long tagsCode, long storeTimestamp, long consumeQueueOffset, + String keys, String uniqKey, int sysFlag, long preparedTransactionOffset, + Map propertiesMap) { this.topic = topic; this.queueId = queueId; this.commitLogOffset = commitLogOffset; @@ -72,11 +83,15 @@ public DispatchRequest( this.msgBaseOffset = consumeQueueOffset; this.keys = keys; this.uniqKey = uniqKey; - this.sysFlag = sysFlag; this.preparedTransactionOffset = preparedTransactionOffset; this.success = true; this.propertiesMap = propertiesMap; + this.bitMap = null; + this.bufferSize = -1; + this.batchSize = 1; + this.nextReputFromOffset = -1; + this.offsetId = null; } public DispatchRequest(String topic, int queueId, long consumeQueueOffset, long commitLogOffset, int size, long tagsCode) { diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index 6f322a19e19..60a21e82618 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store; import java.nio.ByteBuffer; +import java.util.AbstractList; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -25,7 +26,8 @@ public class GetMessageResult { private final List messageMapedList; private final List messageBufferList; - private final List messageQueueOffset; + private long[] messageQueueOffset; + private int queueOffsetSize; private GetMessageStatus status; private long nextBeginOffset; @@ -47,22 +49,25 @@ public class GetMessageResult { public static final GetMessageResult NO_MATCH_LOGIC_QUEUE = new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList()); + Collections.emptyList(), new long[0], 0); public GetMessageResult() { messageMapedList = new ArrayList<>(100); messageBufferList = new ArrayList<>(100); - messageQueueOffset = new ArrayList<>(100); + messageQueueOffset = new long[100]; + queueOffsetSize = 0; } public GetMessageResult(int resultSize) { - messageMapedList = new ArrayList<>(resultSize); - messageBufferList = new ArrayList<>(resultSize); - messageQueueOffset = new ArrayList<>(resultSize); + int initialCapacity = Math.min(Math.max(resultSize, 1), 256); + messageMapedList = new ArrayList<>(initialCapacity); + messageBufferList = new ArrayList<>(initialCapacity); + messageQueueOffset = new long[initialCapacity]; + queueOffsetSize = 0; } private GetMessageResult(GetMessageStatus status, long nextBeginOffset, long minOffset, long maxOffset, - List messageMapedList, List messageBufferList, List messageQueueOffset) { + List messageMapedList, List messageBufferList, long[] messageQueueOffset, int queueOffsetSize) { this.status = status; this.nextBeginOffset = nextBeginOffset; this.minOffset = minOffset; @@ -70,6 +75,7 @@ private GetMessageResult(GetMessageStatus status, long nextBeginOffset, long min this.messageMapedList = messageMapedList; this.messageBufferList = messageBufferList; this.messageQueueOffset = messageQueueOffset; + this.queueOffsetSize = queueOffsetSize; } public GetMessageStatus getStatus() { @@ -128,7 +134,12 @@ public void addMessage(final SelectMappedBufferResult mapedBuffer, final long qu this.msgCount4Commercial += (int) Math.ceil( mapedBuffer.getSize() / (double)commercialSizePerMsg); this.messageCount++; - this.messageQueueOffset.add(queueOffset); + if (queueOffsetSize == messageQueueOffset.length) { + long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1)]; + System.arraycopy(messageQueueOffset, 0, newArr, 0, queueOffsetSize); + messageQueueOffset = newArr; + } + messageQueueOffset[queueOffsetSize++] = queueOffset; } @@ -167,8 +178,25 @@ public void setMsgCount4Commercial(int msgCount4Commercial) { this.msgCount4Commercial = msgCount4Commercial; } + public void addQueueOffset(long offset) { + if (queueOffsetSize == messageQueueOffset.length) { + long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1) + 1]; + System.arraycopy(messageQueueOffset, 0, newArr, 0, queueOffsetSize); + messageQueueOffset = newArr; + } + messageQueueOffset[queueOffsetSize++] = offset; + } + public List getMessageQueueOffset() { - return messageQueueOffset; + final long[] arr = this.messageQueueOffset; + final int size = this.queueOffsetSize; + return new AbstractList() { + @Override public Long get(int index) { + if (index < 0 || index >= size) throw new IndexOutOfBoundsException(); + return arr[index]; + } + @Override public int size() { return size; } + }; } public long getColdDataSum() { diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 7a7b5f84368..4e5a4095013 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -99,6 +99,7 @@ public class DefaultMappedFile extends AbstractMappedFile { protected volatile long storeTimestamp = 0; protected boolean firstCreateInQueue = false; private long lastFlushTime = -1L; + private ByteBuffer cachedAppendSlice; protected MappedByteBuffer mappedByteBufferWaitToClean = null; protected long swapMapTime = 0L; @@ -305,8 +306,13 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C byteBuffer.position(0).limit(byteBuffer.capacity()); fileFromOffset += currentPos; } else { - byteBuffer = appendMessageBuffer().slice(); + byteBuffer = this.cachedAppendSlice; + if (byteBuffer == null) { + byteBuffer = appendMessageBuffer().slice(); + this.cachedAppendSlice = byteBuffer; + } byteBuffer.position(currentPos); + byteBuffer.limit(byteBuffer.capacity()); } try { @@ -365,8 +371,13 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina byteBuffer.position(0).limit(byteBuffer.capacity()); fileFromOffset += currentPos; } else { - byteBuffer = appendMessageBuffer().slice(); + byteBuffer = this.cachedAppendSlice; + if (byteBuffer == null) { + byteBuffer = appendMessageBuffer().slice(); + this.cachedAppendSlice = byteBuffer; + } byteBuffer.position(currentPos); + byteBuffer.limit(byteBuffer.capacity()); } AppendMessageResult result; @@ -519,6 +530,26 @@ public boolean appendMessageUsingFileChannel(byte[] data) { return false; } + @Override + public boolean appendMessageUsingFileChannel(ByteBuffer data) { + int currentPos = WROTE_POSITION_UPDATER.get(this); + int len = data.remaining(); + + if ((currentPos + len) <= this.fileSize) { + try { + this.fileChannel.position(currentPos); + this.fileChannel.write(data); + WROTE_POSITION_UPDATER.addAndGet(this, len); + return true; + } catch (Throwable e) { + log.error("Error occurred when append message to mappedFile.", e); + return false; + } + } + + return false; + } + /** * @return The current flushed position */ @@ -670,10 +701,10 @@ public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { if (this.hold()) { this.mappedByteBufferAccessCountSinceLastSwap++; - ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + ByteBuffer byteBuffer = this.mappedByteBuffer.duplicate(); byteBuffer.position(pos); + byteBuffer.limit(pos + size); ByteBuffer byteBufferNew = byteBuffer.slice(); - byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } else { log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " @@ -693,11 +724,11 @@ public SelectMappedBufferResult selectMappedBuffer(int pos) { if (pos < readPosition && pos >= 0) { if (this.hold()) { this.mappedByteBufferAccessCountSinceLastSwap++; - ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); - byteBuffer.position(pos); int size = readPosition - pos; + ByteBuffer byteBuffer = this.mappedByteBuffer.duplicate(); + byteBuffer.position(pos); + byteBuffer.limit(pos + size); ByteBuffer byteBufferNew = byteBuffer.slice(); - byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java index d4153ec91f4..11b6a9ff7f6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java @@ -119,6 +119,8 @@ public interface MappedFile { */ boolean appendMessageUsingFileChannel(byte[] data); + boolean appendMessageUsingFileChannel(java.nio.ByteBuffer data); + /** * Appends a raw message data represents by a byte array to the current {@code MappedFile}. *