diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index 75f7089c5e0..685c6ad1462 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -242,58 +242,59 @@ protected List genMessageQueueFromQueueData(QueueData queueData, R TopicMessageType topicMessageType, Broker broker) { List messageQueueList = new ArrayList<>(); - int r = 0; - int w = 0; - int rw = 0; - int n = 0; - if (PermName.isWriteable(queueData.getPerm()) && PermName.isReadable(queueData.getPerm())) { - rw = Math.min(queueData.getWriteQueueNums(), queueData.getReadQueueNums()); - r = queueData.getReadQueueNums() - rw; - w = queueData.getWriteQueueNums() - rw; - } else if (PermName.isWriteable(queueData.getPerm())) { - w = queueData.getWriteQueueNums(); - } else if (PermName.isReadable(queueData.getPerm())) { - r = queueData.getReadQueueNums(); - } else if (!PermName.isAccessible(queueData.getPerm())) { - n = Math.max(1, Math.max(queueData.getWriteQueueNums(), queueData.getReadQueueNums())); - } - - // r here means readOnly queue nums, w means writeOnly queue nums, while rw means both readable and writable queue nums. - int queueIdIndex = 0; - for (int i = 0; i < r; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } - - for (int i = 0; i < w; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } - - for (int i = 0; i < rw; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ_WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } + int readQueueNums = queueData.getReadQueueNums(); + int writeQueueNums = queueData.getWriteQueueNums(); + boolean isReadable = PermName.isReadable(queueData.getPerm()); + boolean isWriteable = PermName.isWriteable(queueData.getPerm()); - for (int i = 0; i < n; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.NONE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); + if (isReadable && isWriteable) { + // Derive the permission per queue id so that it matches the actual queue id. + // Queue id < readQueueNums means readable, id < writeQueueNums means writable. + int totalQueueNums = Math.max(readQueueNums, writeQueueNums); + for (int queueId = 0; queueId < totalQueueNums; queueId++) { + Permission permission; + if (queueId < readQueueNums && queueId < writeQueueNums) { + permission = Permission.READ_WRITE; + } else if (queueId < writeQueueNums) { + permission = Permission.WRITE; + } else { + permission = Permission.READ; + } + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(permission) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (isWriteable) { + for (int queueId = 0; queueId < writeQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.WRITE) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (isReadable) { + for (int queueId = 0; queueId < readQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.READ) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (!PermName.isAccessible(queueData.getPerm())) { + int noneQueueNums = Math.max(1, Math.max(writeQueueNums, readQueueNums)); + for (int queueId = 0; queueId < noneQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.NONE) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } } return messageQueueList; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java index abbf82452ef..5dbb4741c8f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java @@ -264,7 +264,8 @@ public void testGenPartitionFromQueueData() throws Exception { assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); - // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and 4 write only queues. + // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and 4 write only queues. + // Queue ids 0..3 are READ_WRITE (readable and writable), ids 4..7 are WRITE (writable only). QueueData queueDataWith4R8WPermRW = createQueueData(4, 8, PermName.PERM_READ | PermName.PERM_WRITE); List partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); assertEquals(8, partitionWith4R8WPermRW.size()); @@ -272,6 +273,31 @@ public void testGenPartitionFromQueueData() throws Exception { assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + for (int i = 0; i < 4; i++) { + assertEquals(Permission.READ_WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + } + for (int i = 4; i < 8; i++) { + assertEquals(Permission.WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + } + + // test queueData with 8 read queues, 4 write queues, and rw permission, expect 4 rw queues and 4 read only queues. + // Queue ids 0..3 are READ_WRITE (readable and writable), ids 4..7 are READ (readable only). + QueueData queueDataWith8R4WPermRW = createQueueData(8, 4, PermName.PERM_READ | PermName.PERM_WRITE); + List partitionWith8R4WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R4WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); + assertEquals(8, partitionWith8R4WPermRW.size()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); + assertEquals(0, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); + for (int i = 0; i < 4; i++) { + assertEquals(Permission.READ_WRITE, partitionWith8R4WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + } + for (int i = 4; i < 8; i++) { + assertEquals(Permission.READ, partitionWith8R4WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + } // test queueData with 2 read queues, 2 write queues, and no permission, expect 2 no permission queues. QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0);