From c56e79c8c215fdc217ebcb17fc94ef4e4080acd5 Mon Sep 17 00:00:00 2001 From: artboy <80608452+itxaiohanglover@users.noreply.github.com> Date: Sat, 20 Jun 2026 01:26:57 +0800 Subject: [PATCH] [ISSUE #10529] Fix incorrect queue permissions in gRPC QueryRouteResponse When the gRPC proxy builds QueryRouteResponse, the permission of each MessageQueue could be incorrect when readQueueNums and writeQueueNums differ. The old implementation grouped queues by permission type (READ, WRITE, READ_WRITE) and assigned increasing queue ids within each group, which preserved the count of each permission but assigned the permission to the wrong queue id. Fix this by deriving the permission per queue id: a queue id is readable if id < readQueueNums and writable if id < writeQueueNums, so READ_WRITE when both conditions hold, WRITE when only writable, and READ when only readable. Add test cases that verify the per-queue-id permission mapping for both readQueueNums < writeQueueNums and the reverse. --- .../proxy/grpc/v2/route/RouteActivity.java | 103 +++++++++--------- .../grpc/v2/route/RouteActivityTest.java | 28 ++++- 2 files changed, 79 insertions(+), 52 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index 75f7089c5e0..685c6ad1462 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -242,58 +242,59 @@ protected List genMessageQueueFromQueueData(QueueData queueData, R TopicMessageType topicMessageType, Broker broker) { List messageQueueList = new ArrayList<>(); - int r = 0; - int w = 0; - int rw = 0; - int n = 0; - if (PermName.isWriteable(queueData.getPerm()) && PermName.isReadable(queueData.getPerm())) { - rw = Math.min(queueData.getWriteQueueNums(), queueData.getReadQueueNums()); - r = queueData.getReadQueueNums() - rw; - w = queueData.getWriteQueueNums() - rw; - } else if (PermName.isWriteable(queueData.getPerm())) { - w = queueData.getWriteQueueNums(); - } else if (PermName.isReadable(queueData.getPerm())) { - r = queueData.getReadQueueNums(); - } else if (!PermName.isAccessible(queueData.getPerm())) { - n = Math.max(1, Math.max(queueData.getWriteQueueNums(), queueData.getReadQueueNums())); - } - - // r here means readOnly queue nums, w means writeOnly queue nums, while rw means both readable and writable queue nums. - int queueIdIndex = 0; - for (int i = 0; i < r; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } - - for (int i = 0; i < w; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } - - for (int i = 0; i < rw; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ_WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } + int readQueueNums = queueData.getReadQueueNums(); + int writeQueueNums = queueData.getWriteQueueNums(); + boolean isReadable = PermName.isReadable(queueData.getPerm()); + boolean isWriteable = PermName.isWriteable(queueData.getPerm()); - for (int i = 0; i < n; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.NONE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); + if (isReadable && isWriteable) { + // Derive the permission per queue id so that it matches the actual queue id. + // Queue id < readQueueNums means readable, id < writeQueueNums means writable. + int totalQueueNums = Math.max(readQueueNums, writeQueueNums); + for (int queueId = 0; queueId < totalQueueNums; queueId++) { + Permission permission; + if (queueId < readQueueNums && queueId < writeQueueNums) { + permission = Permission.READ_WRITE; + } else if (queueId < writeQueueNums) { + permission = Permission.WRITE; + } else { + permission = Permission.READ; + } + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(permission) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (isWriteable) { + for (int queueId = 0; queueId < writeQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.WRITE) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (isReadable) { + for (int queueId = 0; queueId < readQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.READ) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } + } else if (!PermName.isAccessible(queueData.getPerm())) { + int noneQueueNums = Math.max(1, Math.max(writeQueueNums, readQueueNums)); + for (int queueId = 0; queueId < noneQueueNums; queueId++) { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueId) + .setPermission(Permission.NONE) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } } return messageQueueList; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java index abbf82452ef..5dbb4741c8f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java @@ -264,7 +264,8 @@ public void testGenPartitionFromQueueData() throws Exception { assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); - // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and 4 write only queues. + // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and 4 write only queues. + // Queue ids 0..3 are READ_WRITE (readable and writable), ids 4..7 are WRITE (writable only). QueueData queueDataWith4R8WPermRW = createQueueData(4, 8, PermName.PERM_READ | PermName.PERM_WRITE); List partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); assertEquals(8, partitionWith4R8WPermRW.size()); @@ -272,6 +273,31 @@ public void testGenPartitionFromQueueData() throws Exception { assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + for (int i = 0; i < 4; i++) { + assertEquals(Permission.READ_WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + } + for (int i = 4; i < 8; i++) { + assertEquals(Permission.WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + } + + // test queueData with 8 read queues, 4 write queues, and rw permission, expect 4 rw queues and 4 read only queues. + // Queue ids 0..3 are READ_WRITE (readable and writable), ids 4..7 are READ (readable only). + QueueData queueDataWith8R4WPermRW = createQueueData(8, 4, PermName.PERM_READ | PermName.PERM_WRITE); + List partitionWith8R4WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R4WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); + assertEquals(8, partitionWith8R4WPermRW.size()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); + assertEquals(0, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); + for (int i = 0; i < 4; i++) { + assertEquals(Permission.READ_WRITE, partitionWith8R4WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + } + for (int i = 4; i < 8; i++) { + assertEquals(Permission.READ, partitionWith8R4WPermRW.get(i).getPermission()); + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + } // test queueData with 2 read queues, 2 write queues, and no permission, expect 2 no permission queues. QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0);