diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java index d118bd863..e3d6ce3d4 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java @@ -960,9 +960,6 @@ private void confirm(ConfirmingMessage confirmingMsg, boolean delivered) { } // confirm up to the current seq onConfirm(confirmingMsg.seq); - if (resendTask != null && !resendTask.isDone()) { - resendTask.cancel(true); - } } protected abstract void onConfirm(long seq); diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java index 6efa2b125..756450015 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java @@ -1636,4 +1636,68 @@ public void qoS2PubWithSameSourcePacketId() { assertFalse(types.contains(QOS2_DROPPED), "no qos2 message should be dropped"); assertFalse(types.contains(QOS2_DIST_ERROR), "no qos2 distribution error expected"); } + + @Test + public void qoS1ResendKeepsRunningAfterPartialAck() { + when(settingProvider.provide(eq(ResendTimeoutSeconds), anyString())).thenReturn(1); + when(settingProvider.provide(eq(MaxResendTimes), anyString())).thenReturn(1); + when(settingProvider.provide(eq(ReceivingMaximum), anyString())).thenReturn(2); + + channel.pipeline().removeLast(); + channel.pipeline().addLast(new ChannelDuplexHandler() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + super.handlerAdded(ctx); + ctx.pipeline().addLast( + MQTT3TransientSessionHandler.builder().settings(new TenantSettings(tenantId, settingProvider)) + .tenantMeter(tenantMeter).oomCondition(oomCondition).userSessionId(userSessionId(clientInfo)) + .keepAliveTimeSeconds(120).clientInfo(clientInfo).willMessage(null).ctx(ctx).build()); + ctx.pipeline().remove(this); + } + }); + transientSessionHandler = (MQTT3TransientSessionHandler) channel.pipeline().last(); + + mockCheckPermission(true); + mockDistMatch(true); + transientSessionHandler.subscribe(System.nanoTime(), topicFilter, QoS.AT_LEAST_ONCE); + channel.runPendingTasks(); + ArgumentCaptor longCaptor = ArgumentCaptor.forClass(Long.class); + verify(localDistService).match(anyLong(), eq(topicFilter), longCaptor.capture(), any()); + + transientSessionHandler.publish(s2cMessageList(topic, 2, QoS.AT_LEAST_ONCE), + Collections.singleton(new IMQTTTransientSession.MatchedTopicFilter(topicFilter, longCaptor.getValue()))); + channel.runPendingTasks(); + + MqttPublishMessage first = channel.readOutbound(); + MqttPublishMessage second = channel.readOutbound(); + assertNotNull(first); + assertNotNull(second); + + int firstId = first.variableHeader().packetId(); + int secondId = second.variableHeader().packetId(); + + channel.writeInbound(MQTTMessageUtils.pubAckMessage(firstId)); + channel.runPendingTasks(); + + testTicker.advanceTimeBy(2, TimeUnit.SECONDS); + channel.advanceTimeBy(2, TimeUnit.SECONDS); + channel.runScheduledPendingTasks(); + channel.runPendingTasks(); + channel.flushOutbound(); + + MqttPublishMessage resent = channel.readOutbound(); + assertNotNull(resent); + assertEquals(resent.variableHeader().packetId(), secondId); + assertTrue(resent.fixedHeader().isDup()); + + testTicker.advanceTimeBy(2, TimeUnit.SECONDS); + channel.advanceTimeBy(2, TimeUnit.SECONDS); + channel.runScheduledPendingTasks(); + channel.runPendingTasks(); + + verify(eventCollector, atLeast(1)).report(argThat(e -> + e instanceof QoS1Dropped d && d.reason() == DropReason.MaxRetried)); + verify(eventCollector, atLeast(1)).report(argThat(e -> + e instanceof QoS1Confirmed c && !c.delivered())); + } }