Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 3.1.0
* adding `PassiveNack` consumer option - cleanup the local state, but don't send the nack to the bus

### 3.0.0
* target .NET 10

Expand Down
15 changes: 15 additions & 0 deletions src/DatatypeChannels.ASB/Api.fs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,28 @@ type OfReceived<'msg> = OfReceived of (ServiceBusReceivedMessage -> 'msg)
/// Disassembles user message into AMQP properties
type ToSend<'msg> = ToSend of ('msg -> ServiceBusMessage)

/// Channel-level options controlling consumer/publisher behavior.
type ChannelOptions =
{ Prefetch: uint16 option // optional prefetch limit
IgnoreDuplicates: bool
PassiveNack: bool // when true, Nack only stops tracking the message without abandoning the lock on the broker
TempIdle: TimeSpan } // temporary queue idle lifetime
static member Default =
{ Prefetch = None
TempIdle = TimeSpan.FromMinutes 5.
IgnoreDuplicates = true
PassiveNack = false }

/// Channels is a factory for constructing channel consumers and publishers.
type Channels =
inherit IDisposable

/// Construct a consumer, using specified message type, the source to bind to and the assember.
abstract GetConsumer<'msg> : OfReceived<'msg> -> Source -> Task<Consumer<'msg>>

/// Construct a consumer with per-consumer option overrides.
abstract GetConsumerWith<'msg> : ChannelOptions -> OfReceived<'msg> -> Source -> Task<Consumer<'msg>>

/// Construct a publisher for the specified message type and disassembler.
abstract GetPublisher<'msg> : ToSend<'msg> -> Topic -> Publisher<'msg>

Expand Down
4 changes: 3 additions & 1 deletion src/DatatypeChannels.ASB/Consumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MessageContext =
type internal Options() =
inherit ServiceBusReceiverOptions()
member val IgnoreDuplicates = true with get,set
member val PassiveNack = false with get,set

let mkNew (options: Options)
(startRenewal: MessageContext -> Task)
Expand Down Expand Up @@ -111,7 +112,8 @@ let mkNew (options: Options)
let! (receiver: ServiceBusReceiver, _, msgCtxs: ConcurrentDictionary<_,_>) = ctx
match msgCtxs.TryGetValue receivedId with
| true, msgCtx ->
do! receiver.AbandonMessageAsync msgCtx.Message
if not options.PassiveNack then
do! receiver.AbandonMessageAsync msgCtx.Message
msgCtxs.TryRemove receivedId |> ignore
msgCtx.Close()
| _ -> failwithf "Message is not in the current session: %s" receivedId
Expand Down
65 changes: 32 additions & 33 deletions src/DatatypeChannels.ASB/DatatypeChannels.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ open System.Threading.Tasks
open Azure.Messaging.ServiceBus
open Azure.Messaging.ServiceBus.Administration

type ChannelOptions =
{ Prefetch: uint16 option // optional prefetch limit
IgnoreDuplicates: bool
TempIdle: TimeSpan } // temporary queue idle lifetime
static member Default =
{ Prefetch = None
TempIdle = TimeSpan.FromMinutes 5.
IgnoreDuplicates = true }

[<RequireQualifiedAccess>]
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Channels =
Expand All @@ -31,32 +22,40 @@ module Channels =
let withClient cont = cont client.Value
let withAdminClient cont = cont adminClient.Value

let getConsumer (ofRecevied: OfReceived<'msg>) source (consumerOptions: ChannelOptions) : Task<Consumer<'msg>> =
let withBindings, receiveOptions, renew =
match source with
| Subscription binding ->
let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration
binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration
Subscription.withBinding log withAdminClient binding,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Persistent (queueOptions, bindings) ->
let renew = Consumer.Renewable.mkNew queueOptions.LockDuration
queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration
Queue.withBindings log withAdminClient queueOptions bindings,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Temporary bindings ->
Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = consumerOptions.TempIdle)) bindings,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete),
Consumer.Renewable.noop
| DeadLetter path ->
(fun cont -> Task.FromResult path |> cont),
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter),
Consumer.Renewable.noop
consumerOptions.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v)
receiveOptions.IgnoreDuplicates <- consumerOptions.IgnoreDuplicates
receiveOptions.PassiveNack <- consumerOptions.PassiveNack
Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings

{ new Channels with
member __.GetConsumer<'msg> ofRecevied source : Task<Consumer<'msg>> =
let withBindings, receiveOptions, renew =
match source with
| Subscription binding ->
let renew = Consumer.Renewable.mkNew binding.Subscription.LockDuration
binding.Subscription.LockDuration <- min binding.Subscription.LockDuration Consumer.Renewable.maxLockDuration
Subscription.withBinding log withAdminClient binding,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Persistent (queueOptions, bindings) ->
let renew = Consumer.Renewable.mkNew queueOptions.LockDuration
queueOptions.LockDuration <- min queueOptions.LockDuration Consumer.Renewable.maxLockDuration
Queue.withBindings log withAdminClient queueOptions bindings,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock),
renew
| Temporary bindings ->
Queue.withBindings log withAdminClient (CreateQueueOptions(Guid.NewGuid().ToString(), AutoDeleteOnIdle = options.TempIdle)) bindings,
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete),
Consumer.Renewable.noop
| DeadLetter path ->
(fun cont -> Task.FromResult path |> cont),
Consumer.Options(ReceiveMode = ServiceBusReceiveMode.PeekLock, SubQueue = SubQueue.DeadLetter),
Consumer.Renewable.noop
options.Prefetch |> Option.iter (fun v -> receiveOptions.PrefetchCount <- int v)
Consumer.mkNew receiveOptions renew ofRecevied withClient withBindings
getConsumer ofRecevied source options

member __.GetConsumerWith<'msg> (consumerOptions : ChannelOptions) ofRecevied source : Task<Consumer<'msg>> =
getConsumer ofRecevied source consumerOptions

member __.GetPublisher<'msg> toSend (Topic topic) : Publisher<'msg> =
let sender = lazy client.Value.CreateSender topic
Expand Down
24 changes: 24 additions & 0 deletions tests/DatatypeChannels.ASB.Tests/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ let tests =
received.Value.Msg =! "test-payload"
}

itt "PassiveNack does not dead-letter immediately" <| fun testId ->
task {
// short LockDuration so the broker releases the message quickly after PassiveNack
let src =
(CreateQueueOptions("passive-nack-queue", LockDuration = TimeSpan.FromSeconds 30., MaxDeliveryCount = 5, AutoDeleteOnIdle = TimeSpan.FromMinutes 5.),
[Binding.onTest testId "passive-nack-sub" topic]) |> Persistent
let opts = { ChannelOptions.Default with PassiveNack = true }
let! consumer = channels.GetConsumerWith opts PlainText.ofReceived src
let! dlqConsumer = channels.GetConsumer PlainText.ofReceived (DeadLetter "passive-nack-queue")
let publisher = channels.GetPublisher (PlainText.toSend testId) topic
do! Task.Delay 5_000 // backend routing warmup
do! publisher |> Publisher.publish "test-payload"
let! received = TimeSpan.FromSeconds 5. |> consumer.Get
received.Value.Msg =! "test-payload"
do! consumer.Nack received.Value.Id
// PassiveNack should NOT have abandoned; nothing on DLQ
let! dlq = TimeSpan.FromSeconds 3. |> dlqConsumer.Get
dlq =! None
// message should redeliver on the main consumer once the lock expires (~30s)
let! redelivered = TimeSpan.FromSeconds 60. |> consumer.Get
redelivered.Value.Msg =! "test-payload"
do! consumer.Ack redelivered.Value.Id
}

itt "Exceeding the lock duration renews messages" <| fun testId ->
task {
let src = // 5min is the maximum LockDuration, we'll adjust it and setup the lock renewal
Expand Down
Loading