From 663998501105b5fb6aae5b5adfc0a77a41a28200 Mon Sep 17 00:00:00 2001 From: Eugene Tolmachev Date: Wed, 22 Apr 2026 11:12:27 -0400 Subject: [PATCH] Add PassiveNack option and GetConsumerWith overload - Add PassiveNack bool to ChannelOptions: when true, Consumer.Nack only stops local tracking (removes from msgCtxs, stops activity, cancels renewal CTS) without calling AbandonMessageAsync, letting the broker lock expire naturally instead of immediately redelivering. - Wire IgnoreDuplicates and PassiveNack from ChannelOptions into the internal Consumer.Options receiver options. - Add Channels.GetConsumerWith overload taking ChannelOptions as the first parameter, enabling per-consumer option overrides. - Move ChannelOptions to Api.fs so it sits next to the Channels type. - Add integration test 'PassiveNack does not dead-letter immediately' verifying the message does not land on DLQ and is redelivered on the main consumer after the lock expires. - RELEASE_NOTES: 3.1.0 entry. --- RELEASE_NOTES.md | 3 + src/DatatypeChannels.ASB/Api.fs | 15 +++++ src/DatatypeChannels.ASB/Consumer.fs | 4 +- src/DatatypeChannels.ASB/DatatypeChannels.fs | 65 ++++++++++---------- tests/DatatypeChannels.ASB.Tests/Tests.fs | 24 ++++++++ 5 files changed, 77 insertions(+), 34 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 33dd9d1..3d1089e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,6 @@ +### 3.1.0 +* adding `PassiveNack` consumer option - cleanup the local state, but don't send the nack to the bus + ### 3.0.0 * target .NET 10 diff --git a/src/DatatypeChannels.ASB/Api.fs b/src/DatatypeChannels.ASB/Api.fs index f290cce..b75b24b 100644 --- a/src/DatatypeChannels.ASB/Api.fs +++ b/src/DatatypeChannels.ASB/Api.fs @@ -53,6 +53,18 @@ type OfReceived<'msg> = OfReceived of (ServiceBusReceivedMessage -> 'msg) /// Disassembles user message into AMQP properties type ToSend<'msg> = ToSend of ('msg -> ServiceBusMessage) +/// Channel-level options controlling consumer/publisher behavior. +type ChannelOptions = + { Prefetch: uint16 option // optional prefetch limit + IgnoreDuplicates: bool + PassiveNack: bool // when true, Nack only stops tracking the message without abandoning the lock on the broker + TempIdle: TimeSpan } // temporary queue idle lifetime + static member Default = + { Prefetch = None + TempIdle = TimeSpan.FromMinutes 5. + IgnoreDuplicates = true + PassiveNack = false } + /// Channels is a factory for constructing channel consumers and publishers. type Channels = inherit IDisposable @@ -60,6 +72,9 @@ type Channels = /// Construct a consumer, using specified message type, the source to bind to and the assember. abstract GetConsumer<'msg> : OfReceived<'msg> -> Source -> Task> + /// Construct a consumer with per-consumer option overrides. + abstract GetConsumerWith<'msg> : ChannelOptions -> OfReceived<'msg> -> Source -> Task> + /// Construct a publisher for the specified message type and disassembler. abstract GetPublisher<'msg> : ToSend<'msg> -> Topic -> Publisher<'msg> diff --git a/src/DatatypeChannels.ASB/Consumer.fs b/src/DatatypeChannels.ASB/Consumer.fs index 21cda93..dac8de2 100644 --- a/src/DatatypeChannels.ASB/Consumer.fs +++ b/src/DatatypeChannels.ASB/Consumer.fs @@ -33,6 +33,7 @@ type MessageContext = type internal Options() = inherit ServiceBusReceiverOptions() member val IgnoreDuplicates = true with get,set + member val PassiveNack = false with get,set let mkNew (options: Options) (startRenewal: MessageContext -> Task) @@ -111,7 +112,8 @@ let mkNew (options: Options) let! (receiver: ServiceBusReceiver, _, msgCtxs: ConcurrentDictionary<_,_>) = ctx match msgCtxs.TryGetValue receivedId with | true, msgCtx -> - do! receiver.AbandonMessageAsync msgCtx.Message + if not options.PassiveNack then + do! receiver.AbandonMessageAsync msgCtx.Message msgCtxs.TryRemove receivedId |> ignore msgCtx.Close() | _ -> failwithf "Message is not in the current session: %s" receivedId diff --git a/src/DatatypeChannels.ASB/DatatypeChannels.fs b/src/DatatypeChannels.ASB/DatatypeChannels.fs index 5891593..6b2f451 100644 --- a/src/DatatypeChannels.ASB/DatatypeChannels.fs +++ b/src/DatatypeChannels.ASB/DatatypeChannels.fs @@ -5,15 +5,6 @@ open System.Threading.Tasks open Azure.Messaging.ServiceBus open Azure.Messaging.ServiceBus.Administration -type ChannelOptions = - { Prefetch: uint16 option // optional prefetch limit - IgnoreDuplicates: bool - TempIdle: TimeSpan } // temporary queue idle lifetime - static member Default = - { Prefetch = None - TempIdle = TimeSpan.FromMinutes 5. - IgnoreDuplicates = true } - [] [] module Channels = @@ -31,32 +22,40 @@ module Channels = let withClient cont = cont client.Value let withAdminClient cont = cont adminClient.Value + let getConsumer (ofRecevied: OfReceived<'msg>) source (consumerOptions: ChannelOptions) : Task> = + let withBindings, receiveOptions, renew = + match source with + | Subscription binding -> + let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration + binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration + Subscription.withBinding log withAdminClient binding, + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), + renew + | Persistent (queueOptions, bindings) -> + let renew = Consumer.Renewable.mkNew queueOptions.LockDuration + queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration + Queue.withBindings log withAdminClient queueOptions bindings, + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), + renew + | Temporary bindings -> + Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = consumerOptions.TempIdle)) bindings, + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete), + Consumer.Renewable.noop + | DeadLetter path -> + (fun cont -> Task.FromResult path |> cont), + Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter), + Consumer.Renewable.noop + consumerOptions.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v) + receiveOptions.IgnoreDuplicates <- consumerOptions.IgnoreDuplicates + receiveOptions.PassiveNack <- consumerOptions.PassiveNack + Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings + { new Channels with member __.GetConsumer<'msg> ofRecevied source : Task> = - let withBindings, receiveOptions, renew = - match source with - | Subscription binding -> - let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration - binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration - Subscription.withBinding log withAdminClient binding, - Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), - renew - | Persistent (queueOptions, bindings) -> - let renew = Consumer.Renewable.mkNew queueOptions.LockDuration - queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration - Queue.withBindings log withAdminClient queueOptions bindings, - Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock), - renew - | Temporary bindings -> - Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = options.TempIdle)) bindings, - Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete), - Consumer.Renewable.noop - | DeadLetter path -> - (fun cont -> Task.FromResult path |> cont), - Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter), - Consumer.Renewable.noop - options.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v) - Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings + getConsumer ofRecevied source options + + member __.GetConsumerWith<'msg> (consumerOptions : ChannelOptions) ofRecevied source : Task> = + getConsumer ofRecevied source consumerOptions member __.GetPublisher<'msg> toSend (Topic topic) : Publisher<'msg> = let sender = lazy client.Value.CreateSender topic diff --git a/tests/DatatypeChannels.ASB.Tests/Tests.fs b/tests/DatatypeChannels.ASB.Tests/Tests.fs index d91bd7e..d9aeb96 100644 --- a/tests/DatatypeChannels.ASB.Tests/Tests.fs +++ b/tests/DatatypeChannels.ASB.Tests/Tests.fs @@ -144,6 +144,30 @@ let tests = received.Value.Msg =! "test-payload" } + itt "PassiveNack does not dead-letter immediately" <| fun testId -> + task { + // short LockDuration so the broker releases the message quickly after PassiveNack + let src = + (CreateQueueOptions("passive-nack-queue", LockDuration = TimeSpan.FromSeconds 30., MaxDeliveryCount = 5, AutoDeleteOnIdle = TimeSpan.FromMinutes 5.), + [Binding.onTest testId "passive-nack-sub" topic]) |> Persistent + let opts = { ChannelOptions.Default with PassiveNack = true } + let! consumer = channels.GetConsumerWith opts PlainText.ofReceived src + let! dlqConsumer = channels.GetConsumer PlainText.ofReceived (DeadLetter "passive-nack-queue") + let publisher = channels.GetPublisher (PlainText.toSend testId) topic + do! Task.Delay 5_000 // backend routing warmup + do! publisher |> Publisher.publish "test-payload" + let! received = TimeSpan.FromSeconds 5. |> consumer.Get + received.Value.Msg =! "test-payload" + do! consumer.Nack received.Value.Id + // PassiveNack should NOT have abandoned; nothing on DLQ + let! dlq = TimeSpan.FromSeconds 3. |> dlqConsumer.Get + dlq =! None + // message should redeliver on the main consumer once the lock expires (~30s) + let! redelivered = TimeSpan.FromSeconds 60. |> consumer.Get + redelivered.Value.Msg =! "test-payload" + do! consumer.Ack redelivered.Value.Id + } + itt "Exceeding the lock duration renews messages" <| fun testId -> task { let src = // 5min is the maximum LockDuration, we'll adjust it and setup the lock renewal