Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wro
this.msgNum = msgNum;
}

public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
long storeTimestamp, long logicsOffset, long pagecacheRT, int msgNum) {
this.status = status;
this.wroteOffset = wroteOffset;
this.wroteBytes = wroteBytes;
this.msgId = msgId;
this.storeTimestamp = storeTimestamp;
this.logicsOffset = logicsOffset;
this.pagecacheRT = pagecacheRT;
this.msgNum = msgNum;
}

public long getPagecacheRT() {
return pagecacheRT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class IndexService implements CommitLogDispatchStore {
private final String storePath;
private final ArrayList<IndexFile> indexFileList = new ArrayList<>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final StringBuilder reusableKeyBuilder = new StringBuilder(128);
Comment thread
wang-jiahua marked this conversation as resolved.

public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
Expand Down Expand Up @@ -215,10 +216,12 @@ public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long
}

private String buildKey(final String topic, final String key) {
return topic + "#" + key;
reusableKeyBuilder.setLength(0);
return reusableKeyBuilder.append(topic).append('#').append(key).toString();
}
private String buildKey(final String topic, final String key, final String indexType) {
return topic + "#" + indexType + "#" + key;
reusableKeyBuilder.setLength(0);
return reusableKeyBuilder.append(topic).append('#').append(indexType).append('#').append(key).toString();
Comment thread
wang-jiahua marked this conversation as resolved.
}

public void buildIndex(DispatchRequest req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -35,38 +36,42 @@
public class QueueOffsetOperator {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

private ConcurrentMap<String, Long> topicQueueTable = new ConcurrentHashMap<>(1024);
private ConcurrentMap<String, Long> batchTopicQueueTable = new ConcurrentHashMap<>(1024);
private ConcurrentMap<String, AtomicLong> topicQueueTable = new ConcurrentHashMap<>(1024);
private ConcurrentMap<String, AtomicLong> batchTopicQueueTable = new ConcurrentHashMap<>(1024);

/**
* {TOPIC}-{QUEUE_ID} --> NEXT Consume Queue Offset
*/
private ConcurrentMap<String/* topic-queue-id */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);

public long getQueueOffset(String topicQueueKey) {
return ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L);
AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0));
return counter.get();
}

public Long getTopicQueueNextOffset(String topicQueueKey) {
return this.topicQueueTable.get(topicQueueKey);
AtomicLong counter = this.topicQueueTable.get(topicQueueKey);
return counter == null ? null : counter.get();
}

public void increaseQueueOffset(String topicQueueKey, short messageNum) {
Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L);
topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0));
counter.addAndGet(messageNum);
}

public void updateQueueOffset(String topicQueueKey, long offset) {
this.topicQueueTable.put(topicQueueKey, offset);
AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0));
counter.set(offset);
}

public long getBatchQueueOffset(String topicQueueKey) {
return ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L);
AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> new AtomicLong(0));
return counter.get();
}

public void increaseBatchQueueOffset(String topicQueueKey, short messageNum) {
Long batchQueueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L);
this.batchTopicQueueTable.put(topicQueueKey, batchQueueOffset + messageNum);
AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> new AtomicLong(0));
counter.addAndGet(messageNum);
}

public long getLmqOffset(String topic, int queueId, OffsetInitializer callback) throws ConsumeQueueException {
Expand Down Expand Up @@ -94,8 +99,8 @@ public void increaseLmqOffset(String topic, int queueId, short delta) throws Con
}

public long currentQueueOffset(String topicQueueKey) {
Long currentQueueOffset = this.topicQueueTable.get(topicQueueKey);
return currentQueueOffset == null ? 0L : currentQueueOffset;
AtomicLong counter = this.topicQueueTable.get(topicQueueKey);
return counter == null ? 0L : counter.get();
}

public synchronized void remove(String topic, Integer queueId) {
Expand All @@ -109,7 +114,11 @@ public synchronized void remove(String topic, Integer queueId) {
}

public void setTopicQueueTable(ConcurrentMap<String, Long> topicQueueTable) {
this.topicQueueTable = topicQueueTable;
ConcurrentMap<String, AtomicLong> table = new ConcurrentHashMap<>(topicQueueTable.size() * 2);
for (Map.Entry<String, Long> entry : topicQueueTable.entrySet()) {
table.put(entry.getKey(), new AtomicLong(entry.getValue()));
}
this.topicQueueTable = table;
}

public void setLmqTopicQueueTable(ConcurrentMap<String, Long> lmqTopicQueueTable) {
Expand All @@ -123,10 +132,18 @@ public void setLmqTopicQueueTable(ConcurrentMap<String, Long> lmqTopicQueueTable
}

public ConcurrentMap<String, Long> getTopicQueueTable() {
return topicQueueTable;
ConcurrentMap<String, Long> snapshot = new ConcurrentHashMap<>(this.topicQueueTable.size() * 2);
for (Map.Entry<String, AtomicLong> entry : this.topicQueueTable.entrySet()) {
snapshot.put(entry.getKey(), entry.getValue().get());
}
return snapshot;
}

public void setBatchTopicQueueTable(ConcurrentMap<String, Long> batchTopicQueueTable) {
this.batchTopicQueueTable = batchTopicQueueTable;
ConcurrentMap<String, AtomicLong> table = new ConcurrentHashMap<>(batchTopicQueueTable.size() * 2);
for (Map.Entry<String, Long> entry : batchTopicQueueTable.entrySet()) {
table.put(entry.getKey(), new AtomicLong(entry.getValue()));
}
this.batchTopicQueueTable = table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ protected ByteBuffer initialValue() {
};
private final int wheelLength;

private volatile boolean dirty;
Comment thread
wang-jiahua marked this conversation as resolved.

private long snapOffset;

public TimerWheel(String fileName, int slotsTotal, int precisionMs) throws IOException {
Expand Down Expand Up @@ -128,17 +130,25 @@ public void flush() {
if (mappedByteBuffer == null) {
return;
}
if (!dirty) {
return;
}
ByteBuffer bf = localBuffer.get();
bf.position(0);
bf.limit(wheelLength);
mappedByteBuffer.position(0);
mappedByteBuffer.limit(wheelLength);
for (int i = 0; i < wheelLength; i++) {
if (bf.get(i) != mappedByteBuffer.get(i)) {
mappedByteBuffer.put(i, bf.get(i));
int longAligned = wheelLength & ~7;
Comment thread
wang-jiahua marked this conversation as resolved.
for (int i = 0; i < longAligned; i += 8) {
long local = bf.getLong(i);
if (local != mappedByteBuffer.getLong(i)) {
mappedByteBuffer.putLong(i, local);
}
}
for (int i = longAligned; i < wheelLength; i++) {
byte b = bf.get(i);
if (b != mappedByteBuffer.get(i)) {
mappedByteBuffer.put(i, b);
}
}
this.mappedByteBuffer.force();
dirty = false;
Comment thread
wang-jiahua marked this conversation as resolved.
}

/**
Expand Down Expand Up @@ -287,11 +297,10 @@ public int getSlotIndex(long timeMs) {

public void putSlot(long timeMs, long firstPos, long lastPos) {
localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
// To be compatible with previous version.
// The previous version's precision is fixed at 1000ms and it store timeMs / 1000 in slot.
localBuffer.get().putLong(timeMs / precisionMs);
localBuffer.get().putLong(firstPos);
localBuffer.get().putLong(lastPos);
dirty = true;
Comment thread
wang-jiahua marked this conversation as resolved.
}

public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) {
Expand All @@ -301,6 +310,7 @@ public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic
localBuffer.get().putLong(lastPos);
localBuffer.get().putInt(num);
localBuffer.get().putInt(magic);
dirty = true;
Comment thread
wang-jiahua marked this conversation as resolved.
}

public void reviseSlot(long timeMs, long firstPos, long lastPos, boolean force) {
Expand All @@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long lastPos, boolean force)
} else {
if (IGNORE != firstPos) {
localBuffer.get().putLong(firstPos);
dirty = true;
Comment thread
wang-jiahua marked this conversation as resolved.
} else {
localBuffer.get().getLong();
}
if (IGNORE != lastPos) {
localBuffer.get().putLong(lastPos);
dirty = true;
Comment thread
wang-jiahua marked this conversation as resolved.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class BrokerStatsManagerTest {
private BrokerStatsManager brokerStatsManager;

private static final String TOPIC = "TOPIC_TEST";
private static final Integer QUEUE_ID = 0;
private static final int QUEUE_ID = 0;
private static final String GROUP_NAME = "GROUP_TEST";
private static final String CLUSTER_NAME = "DefaultCluster";

Expand Down
Loading