Fix persistent subscription failure handler never being invoked#546
Fix persistent subscription failure handler never being invoked#546alexeyzimarev merged 6 commits intodevfrom
Conversation
When the handler throws and `ThrowOnError = true`, `PersistentSubscriptionBase.Nack`
re-threw the exception before reaching `_handleEventProcessingFailure`. Because
`Nack` is only ever entered through that re-throw path, the failure handler was
unreachable: the subscription dropped on every failure instead of issuing a
server-side Nack and letting KurrentDB retry/park the message.
Removing the early throw lets the default failure handler send `Nack(Retry, ...)`
so the server retries up to `MaxRetryCount` and parks afterwards, while the
subscription stays connected.
Adds an integration test that produces an event, fails the handler with
`maxRetryCount: 0`, and asserts the message lands on
`$persistentsubscription-{stream}::{group}-parked` (read with `resolveLinkTos: true`).
Closes #544
Review Summary by QodoFix persistent subscription nack not invoking failure handler
WalkthroughsDescription• Removes early throw in Nack method that prevented failure handler execution • Allows server-side retry and message parking when handler fails with ThrowOnError = true • Adds integration test verifying failed messages are parked correctly • Exposes SubscriptionId and Client in fixture for test access Diagramflowchart LR
A["Handler throws<br/>ThrowOnError=true"] --> B["Nack called"]
B --> C["Remove early throw"]
C --> D["Execute failure handler"]
D --> E["Send Nack Retry<br/>to server"]
E --> F["Server retries<br/>up to MaxRetryCount"]
F --> G["Message parked<br/>Subscription stays connected"]
File Changes1. src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/PersistentSubscriptionBase.cs
|
Code Review by Qodo
1. await missing .NoContext()
|
- Updates OpenTelemetry packages from 1.13.x to 1.15.x to clear GHSA-g94r-2vxg-569j (excessive memory allocation in baggage/B3/Jaeger propagation header parsing). 1.15.3 is the first patched version per the advisory. - Bumps OpenTelemetry.Instrumentation.AspNetCore to 1.15.2 (latest stable) and OpenTelemetry.Instrumentation.GrpcNetClient to 1.15.1-beta.1 (latest beta). - Switches the preview workflow to publish to nuget.org with NUGET_API_KEY, mirroring the release pipeline. MyGet is currently unreachable, and dev-branch builds are MinVer-tagged as previews so they're safe to push to nuget.org.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 33560109d3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } finally { | ||
| await fixture.DisposeAsync(); |
There was a problem hiding this comment.
Stop started subscription in teardown
This test explicitly starts a persistent subscription with autoStart: false, but the finally block only calls DisposeAsync(). In this fixture, DisposeAsync() only invokes Stop() when autoStart is true, so the subscription remains active after the test. In longer test runs this can leak active consumers/connections and introduce cross-test flakiness; stop the subscription explicitly before disposing.
Useful? React with 👍 / 👎.
- Drops the MyGet push from publish.yml; MyGet is unreachable and the release pipeline now publishes only to nuget.org. - Replaces the long-lived NUGET_API_KEY secret with NuGet OIDC trusted publishing via the NuGet/login@v1 action. Adds id-token: write at the job level so the runner can mint the OIDC token for the trust profile configured on nuget.org. - NuGet user defaults to 'Eventuous' but is overridable via the NUGET_USER repository variable. Once this is green the NUGET_API_KEY and MYGET_* secrets/variables can be revoked from the repository settings.
- Quote the api-key substitution to be defensive against future key formats. - Drop the leftover NUGET_AUTH_TOKEN env. It's a NuGet config-provider variable, not consumed by `dotnet nuget push` against nuget.org, and next to a trusted-publishing step it's actively misleading.
Three real issues from reviewers, plus one transitive-vuln cleanup. Skipped: the suggestion to add `.NoContext()` everywhere in the new test — that convention is for library async paths, not test code. * Log Ack-stage failures in PersistentSubscriptionBase.Nack Handler-pipeline failures are still logged exactly once (via `context.Nack` inside `EventSubscription.Handler`); anything that reaches `Nack` without `HasFailed()` (e.g. an `Ack` throwing after the handler returned) now gets its own `MessageHandlingFailed` entry, so it no longer fails silently before being routed to the failure handler. * Unwrap SubscriptionException in DefaultEventProcessingFailureHandler When `ThrowOnError = true`, `Handler` wraps the original exception in `SubscriptionException`. The default failure handler used `exception.Message` for the Nack reason, which made parked messages carry the generic "Error processing event ..." string. Now it prefers `InnerException.Message` when available so the parked reason carries the actual handler error. * Stop the persistent subscription before disposing in the new test The fixture only auto-stops when `autoStart: true`, so the test was leaking a running subscription. Track whether `Start()` succeeded and call `Stop()` in the `finally` before `DisposeAsync()`. Also tighten the parked-event helper to throw `TimeoutException` on timeout instead of returning null, removing the awkward `parked!.Value` chain. * Pin OpenTelemetry.Api in Eventuous.KurrentDB KurrentDB.Client transitively pulls OpenTelemetry.Api 1.12.0, which is also in scope for GHSA-g94r-2vxg-569j. Adding a direct PackageReference forces the central pinned 1.15.3 across Eventuous.KurrentDB and every project that consumes it, clearing the remaining NU1902 warnings without enabling repo-wide transitive pinning (which cascades into unrelated framework-version downgrades).
Test Results 66 files + 44 66 suites +44 40m 4s ⏱️ + 25m 13s Results for commit 8f20989. ± Comparison against base commit 2896e22. This pull request removes 5 and adds 16 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
`SpyglassRegistry.Aggregates` was a plain `List<T>` mutated from `[ModuleInitializer]` methods. When multiple sample assemblies are referenced from the same test process and TUnit loads/initialises them in parallel (which happens whenever more than one test forces an assembly via `RuntimeHelpers.RunModuleConstructor`), the concurrent `List.Add` calls race and entries are lost. That's the root cause of the flaky `Aggregates_contains_payment_state_as_standalone` test seen on the .NET 8 leg of CI: the `PaymentState` registration sometimes never made it into the list. Switch the backing store to a copy-on-write `SpyglassAggregateInfo[]` under a lock for writes, and read from the snapshot on every query. Lookups remain allocation-free for the hit path, registrations are serialised, and readers always see a consistent snapshot.
Summary
Four things in this branch:
1. Persistent subscription failure handler unreachable (closes #544)
throwinPersistentSubscriptionBase.Nackthat made_handleEventProcessingFailureunreachable, so the configured (or default) failure handler now actually runs when the handler throws withThrowOnError = true.Nack(Retry, ...)→ retries up toMaxRetryCount→ message parked. Subscription stays connected.maxRetryCount: 0, and asserts the message lands on$persistentsubscription-{stream}::{group}-parked(read withresolveLinkTos: true).PersistentSubscriptionFixtureexposesSubscriptionIdandClientso the test can address the parked stream.The reachability bug, in case it's useful for review:
Nackis only entered from thecatchinHandleEvent, that catch only fires whenHandlerre-throws, andHandleronly re-throws whenThrowOnError = true. Soif (Options.ThrowOnError) throw exception;was effectively unconditional, and the failure handler below it was dead code.2. OpenTelemetry 1.13.x → 1.15.x
Clears
GHSA-g94r-2vxg-569j(excessive memory allocation parsing baggage/B3/Jaeger headers — fixed in 1.15.3). All1.13.xreferences bumped, with the two beta-only packages on the matching 1.15.x betas.3. Preview pipeline: MyGet → nuget.org
MyGet has been unreachable, so the dev-branch publish job now mirrors the release pipeline and pushes to nuget.org (Release config + symbols). MinVer keeps preview builds versioned as pre-releases, so they're safe to land on nuget.org.
4. Release pipeline: trusted publishing, drop MyGet
dotnet nuget pushto MyGet frompublish.yml.NuGet/login@v1. Addsid-token: writeat the job level so the runner can mint the OIDC token for the profile already configured on nuget.org.Eventuousbut can be overridden via theNUGET_USERrepository variable.Once this lands and the next release runs green, the
NUGET_API_KEY,MYGET_API_KEY, andMYGET_FEED_NAMEsecrets/variables can be revoked from the repo settings. (The preview workflow still usesNUGET_API_KEY; if you want to flip preview to trusted publishing too, just say the word and I'll mirror the change there.)Test plan
Esdb_ShouldParkMessageWhenHandlerFailstest passes (was red before the fix)HandlerFailureResubscribe(catch-up subscription drop & resubscribe path) still passesSubscribeAndProduceMany) still passNuGet/loginsucceeds anddotnet nuget pushaccepts the short-lived key