Add AF5 DeadLetterManager and RSocket DLQ responder#152
Conversation
CodeDrivenMitch
left a comment
There was a problem hiding this comment.
Looks good overall, I've been testing with it, nice work.
Requesting changes to move the discover() method to a start handler, to reduce runtime load. I will push something I have locally to this branch, see if you like it or want to adapt it.
In addition, I see no tests at all, which would be preferable to have.
| } | ||
|
|
||
| @Suppress("UNCHECKED_CAST") | ||
| private fun discover(): List<DlqEntry> { |
There was a problem hiding this comment.
This call is done on every processor report, and every dlq call.
While fine in essence, the configuration of the application does not change runtime.
We can do this once, in an onStart handler, and keep it as a map/list in this component :)
… keep the per-poll payload small (a 1000-letter cap caused ~7s refresh cycles on local DLQs at page-size 25).
|
CodeDrivenMitch
left a comment
There was a problem hiding this comment.
Revising the PR I noticed that we completely lost all dlq customization settings that we had in AF4. In addition, there's a bug regarding sequence identifiers. We'll have to change that before we merge.
| // a stable identifier from the first letter's message id and apply it to every | ||
| // letter in the sequence. Operations look up sequences by walking deadLetters() | ||
| // and matching this synthetic id — see findSequence(...). | ||
| val syntheticSequenceId = letters.firstOrNull()?.message()?.identifier() ?: "" |
There was a problem hiding this comment.
There are several problems with this approach:
- The sequence identifier changes when retrying a message successfully. This is an UX problem.
- The sequence identifier is not recognizable to the user. Message IDs are UUIDs that don't relate to anything.
Axon 4 didn't expose the sequence neither. It recalculated the sequence identifier based on configuration;
private fun sequenceIdentifierFor(
processingGroup: String,
letter: DeadLetter<out EventMessage<*>>
): String = eventProcessingConfig
.sequencingPolicy(processingGroup)
.getSequenceIdentifierFor(letter.message())
?.let {
if (it is String) it else it.hashCode().toString()
}
?: letter.message().identifier
We should to that here as well.
| private fun discover(): List<DlqEntry> = entries ?: discoverEntries().also { entries = it } | ||
|
|
||
| private fun DeadLetter<out EventMessage>.toApiLetter(sequenceIdentifier: String): ApiDeadLetter { | ||
| val message = this.message() |
There was a problem hiding this comment.
We have completely lost of DLQ settings that were included in AF4, from AxoniqConsoleDlqMode. We should maintain that.
| processingGroup: String, | ||
| offset: Int = 0, | ||
| size: Int = 25, | ||
| // Per-sequence cap intentionally small. The list query is meant to give the platform UI |
There was a problem hiding this comment.
I think we can drop this huge comment. Probably AI output?
| class DeadLetterManagerTest { | ||
|
|
||
| // --------------------------------------------------------------------------------------- | ||
| // Discovery / processing group naming |
There was a problem hiding this comment.
Better to use @Nested from junit
| * fakes with mockk; the real DLQ implementation isn't on the classpath we want to exercise. | ||
| * | ||
| * Intentionally out of scope: | ||
| * - `findDeadLetterProcessor` reflective walk over `EventHandlingComponent` decorators — that |
There was a problem hiding this comment.
I understand the point, but if I had to choose between unit and integration tests for this module, integration wins every time.
The integtration with AF5 is very complex, and configuration internals might change which we depend on. So a test in the following fashion, but for deadletters, would actually be an amazing benefit; https://github.com/AxonIQ/platform-framework-client/blob/4fc458613fa5b573dfddf467edaf440d86cf9017/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt



Adds a DeadLetterManager that operates on the AF5 SequencedDeadLetterQueue API, with native paginated reads for JdbcSequencedDeadLetterQueue via PagingJdbcIterable, an in-memory fallback for the default variant, and unwrapping of the CachingSequencedDeadLetterQueue wrapper.
The new RSocketDlqResponder exposes the full DLQ surface over RSocket - sequence list, paginated SEQUENCE_LETTERS route, retry/process-many, delete/delete-many, single-letter delete - and returns counts so the platform UI can confirm how many letters actually moved.
AxoniqPlatformDeadLetterConfigurerEnhancer auto-wires DLQ support into every PooledStreamingEventProcessor, and ProcessingGroupInfoSource exposes sibling EventHandlingComponents under the same processor group.|
Relates: https://github.com/AxonIQ/axoniq-platform/pull/578