Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,8 @@ public void registerProcessor() {
*/
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
remotingServer.registerProcessor(RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
fastRemotingServer.registerProcessor(RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/**
* notificationProcessor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ public List<PopConsumerRecord> deleteRecords(List<PopConsumerRecord> consumerRec
return remain;
}

public void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList,
List<PopConsumerRecord> deleteRecordList) {
if (deleteRecordList.isEmpty()) {
consumerRecordStore.writeRecords(writeRecordList);
return;
}

List<PopConsumerRecord> storeDeleteRecords = new ArrayList<>(deleteRecordList.size());
List<PopConsumerRecord> bufferDeleteRecords = new ArrayList<>(deleteRecordList.size());
deleteRecordList.forEach(consumerRecord -> {
ConsumerRecords consumerRecords = consumerRecordTable.get(this.getKey(consumerRecord));
if (consumerRecords != null && consumerRecords.contains(consumerRecord)) {
bufferDeleteRecords.add(consumerRecord);
} else {
storeDeleteRecords.add(consumerRecord);
}
});

consumerRecordStore.writeAndDeleteRecords(writeRecordList, storeDeleteRecords);
deleteRecords(bufferDeleteRecords);
}

public int cleanupRecords(Consumer<PopConsumerRecord> consumer) {
int remain = 0;
Iterator<Map.Entry<String, ConsumerRecords>> iterator = consumerRecordTable.entrySet().iterator();
Expand Down Expand Up @@ -231,6 +253,10 @@ public boolean delete(PopConsumerRecord record) {
return recordTreeMap.remove(record.getOffset()) != null;
}

public boolean contains(PopConsumerRecord record) {
return recordTreeMap.containsKey(record.getOffset());
}

public long getMinOffsetInBuffer() {
Map.Entry<Long, PopConsumerRecord> entry = removeTreeMap.firstEntry();
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface PopConsumerKVStore {
*/
void deleteRecords(List<PopConsumerRecord> consumerRecordList);

/**
* Writes and deletes consumer records from the storage in a single batch.
* @param writeRecordList The list of consumer records to be written.
* @param deleteRecordList The list of consumer records to be deleted.
*/
void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList, List<PopConsumerRecord> deleteRecordList);

/**
* Scans and returns a list of expired consumer records within the specified time range.
* @param lowerTime The start time (inclusive) of the time range to search, in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
}
}

@Override
public void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList,
List<PopConsumerRecord> deleteRecordList) {
if (!writeRecordList.isEmpty() || !deleteRecordList.isEmpty()) {
try (WriteBatch writeBatch = new WriteBatch()) {
for (PopConsumerRecord record : deleteRecordList) {
writeBatch.delete(columnFamilyHandle, record.getKeyBytes());
}
for (PopConsumerRecord record : writeRecordList) {
writeBatch.put(columnFamilyHandle, record.getKeyBytes(), record.getValueBytes());
}
this.db.write(writeOptions, writeBatch);
} catch (RocksDBException e) {
throw new RuntimeException("Write and delete record error", e);
}
}
}

@Override
// https://github.com/facebook/rocksdb/issues/10300
public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.remoting.protocol.body.ChangeInvisibleTimeRequestEntry;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.AppendMessageStatus;
Expand Down Expand Up @@ -496,7 +497,6 @@ public CompletableFuture<Boolean> ackAsync(
public void changeInvisibilityDuration(long popTime, long invisibleTime, long changedPopTime,
long changedInvisibleTime, String groupId, String topicId,
int queueId, long offset, boolean suspend) {

if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService change, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}",
Expand All @@ -517,21 +517,67 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
if (skipWrite) {
log.info("PopConsumerService change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
} else {
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
}

List<PopConsumerRecord> ckRecords = skipWrite ? Collections.emptyList() : Collections.singletonList(ckRecord);
List<PopConsumerRecord> ackRecords = Collections.singletonList(ackRecord);
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
if (popConsumerCache.deleteRecords(Collections.singletonList(ackRecord)).isEmpty()) {
return;
popConsumerCache.writeAndDeleteRecords(ckRecords, ackRecords);
} else {
this.popConsumerStore.writeAndDeleteRecords(ckRecords, ackRecords);
}
}

public void batchChangeInvisibilityDuration(List<ChangeInvisibleTimeRequestEntry> changeRecords) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changeInvisibilityDuration can call batchChangeInvisibilityDuration

if (changeRecords == null || changeRecords.isEmpty()) {
return;
}

List<PopConsumerRecord> ckRecords = new ArrayList<>(changeRecords.size());
List<PopConsumerRecord> ackRecords = new ArrayList<>(changeRecords.size());
List<PopConsumerRecord> storeAckRecords = new ArrayList<>(changeRecords.size());

for (ChangeInvisibleTimeRequestEntry changeRecord : changeRecords) {
if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService batch change, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}",
changeRecord.getPopTime(), changeRecord.getOldInvisibleTime(), changeRecord.getConsumerGroup(),
changeRecord.getTopic(), changeRecord.getQueueId(), changeRecord.getOffset(),
changeRecord.getChangedPopTime(), changeRecord.getChangedInvisibleTime());
}

PopConsumerRecord ckRecord = new PopConsumerRecord(
changeRecord.getChangedPopTime(), changeRecord.getConsumerGroup(), changeRecord.getTopic(),
changeRecord.getQueueId(), 0, changeRecord.getChangedInvisibleTime(), changeRecord.getOffset(),
null, changeRecord.isSuspend());

PopConsumerRecord ackRecord = new PopConsumerRecord(
changeRecord.getPopTime(), changeRecord.getConsumerGroup(), changeRecord.getTopic(),
changeRecord.getQueueId(), 0, changeRecord.getOldInvisibleTime(), changeRecord.getOffset(),
null, changeRecord.isSuspend());

boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(changeRecord.getConsumerGroup());

if (skipWrite) {
log.info("PopConsumerService batch change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", changeRecord.getPopTime(),
changeRecord.getConsumerGroup(), changeRecord.getTopic(), changeRecord.getQueueId(),
changeRecord.getOffset());
} else {
ckRecords.add(ckRecord);
}

ackRecords.add(ackRecord);
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
storeAckRecords.add(ackRecord);
}
}

// If the new CK has the same key as the old CK (same visibilityTimeout),
// the write already overwrites the old record in RocksDB, skip delete
// to avoid removing the newly written record.
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
popConsumerCache.writeAndDeleteRecords(ckRecords, ackRecords);
} else {
this.popConsumerStore.writeAndDeleteRecords(ckRecords, storeAckRecords);
}
}

Expand Down
Loading
Loading