From 2c27de7adce8de11777d36a1baf2b8d83ca3f118 Mon Sep 17 00:00:00 2001 From: Raezil Date: Thu, 21 May 2026 19:26:18 +0200 Subject: [PATCH] feat: add dead letter queue with panic recovery Failed and panicking handlers are routed to an opt-in DeadLetterQueue (store.DLQ = GoEventBus.NewDeadLetterQueue()) instead of being silently counted in errorCount. Panics are wrapped as errors with errors.Is/As support and do not kill worker goroutines or the calling goroutine. New API: DeadLetter struct, DeadLetterQueue with Len/Entries/Drain/Replay. Replay re-subscribes all entries, increments Attempts, and keeps entries that fail to re-enqueue. Recovery moved from worker() into execute() so sync-mode panics are also caught. README updated with full DLQ section. Co-Authored-By: Claude Sonnet 4.6 --- README.MD | 117 +++++++++++++++++++++++ dlq.go | 96 +++++++++++++++++++ dlq_test.go | 250 ++++++++++++++++++++++++++++++++++++++++++++++++++ eventstore.go | 34 +++++-- 4 files changed, 491 insertions(+), 6 deletions(-) create mode 100644 dlq.go create mode 100644 dlq_test.go diff --git a/README.MD b/README.MD index 2d665ab..d787ace 100644 --- a/README.MD +++ b/README.MD @@ -18,6 +18,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l - [Lifecycle Hooks](#lifecycle-hooks) - [Back-pressure Policies](#back-pressure-policies) - [Async Mode](#async-mode) +- [Dead Letter Queue](#dead-letter-queue) - [Transactions](#transactions) - [Scheduling](#scheduling) - [API Reference](#api-reference) @@ -35,6 +36,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l - **Middleware** — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour - **Lifecycle hooks** — `OnBefore`, `OnAfter`, `OnError` for observability without touching handler logic - **Back-pressure** — choose `DropOldest`, `Block`, or `ReturnError` per store +- **Dead letter queue** — failed and panicking events are routed to an inspectable, replayable secondary store instead of being silently counted - **Transactions** — batch events and commit or roll back as a unit - **Scheduling** — fire events at a future `time.Time` or after a `time.Duration` - **Metrics** — published, processed, and error counters via a single call @@ -237,6 +239,88 @@ if err := store.Drain(ctx); err != nil { --- +## Dead Letter Queue + +Attach a `DeadLetterQueue` to an `EventStore` and every event whose handler returns an error **or panics** is captured there instead of being silently swallowed into the error counter. Panics are wrapped as errors with full `errors.Is` / `errors.As` support when the original panic value was itself an `error`. + +```go +store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) +store.DLQ = GoEventBus.NewDeadLetterQueue() +``` + +### Inspecting failures + +```go +store.Publish() + +for _, dl := range store.DLQ.Entries() { + fmt.Printf("event=%s err=%v attempts=%d failed=%s\n", + dl.Event.ID, dl.Err, dl.Attempts, dl.FailedAt.Format(time.RFC3339)) +} +``` + +`Entries()` returns a snapshot copy — mutations to the slice do not affect the queue. + +### Draining + +```go +failed := store.DLQ.Drain() // empties the queue and returns all entries +``` + +### Replaying + +`Replay` re-subscribes all entries and calls `Publish` once after. Each entry has its `Attempts` counter incremented. Entries that fail to re-enqueue (e.g. buffer full) are kept in the queue and the first `Subscribe` error is returned. + +```go +if err := store.DLQ.Replay(ctx, store); err != nil { + log.Printf("replay partially failed: %v", err) +} +``` + +A common pattern is to gate replays on `Attempts` to avoid infinite retry loops: + +```go +const maxAttempts = 3 + +for _, dl := range store.DLQ.Drain() { + if dl.Attempts >= maxAttempts { + log.Printf("dropping %s after %d attempts: %v", dl.Event.ID, dl.Attempts, dl.Err) + continue + } + _ = store.Subscribe(ctx, dl.Event) +} +store.Publish() +``` + +### Fan-out and the DLQ + +Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. + +```go +disp.Register("order.placed", + auditLogger, // fails -> one dead letter + invoiceHandler, // succeeds -> no dead letter +) +``` + +### Panic recovery + +The DLQ also catches handler panics. In sync and async modes alike, the panic is converted to an error, routed to the DLQ and error hooks, and execution continues. The worker pool is never killed by a misbehaving handler. + +```go +disp.Register("risky", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + panic("something went wrong") // caught — not fatal +}) + +store.DLQ = GoEventBus.NewDeadLetterQueue() +store.Publish() + +dl := store.DLQ.Entries()[0] +fmt.Println(dl.Err) // "handler panic: something went wrong" +``` + +--- + ## Transactions Group multiple events into a single commit. If any handler returns an error, `Commit` stops and returns that error. Call `Rollback` to discard buffered events without publishing. @@ -351,6 +435,13 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two. +**Fields** + +| Field | Type | Description | +|---|---|---| +| `Async` | `bool` | Enable async worker pool dispatch | +| `DLQ` | `*DeadLetterQueue` | Optional dead letter queue; `nil` by default | + #### Methods | Method | Description | @@ -371,6 +462,32 @@ Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two. Returns a `*Transaction` scoped to this store. +### `DeadLetter` + +```go +type DeadLetter struct { + Event Event + Err error // handler error, or a wrapped panic value + FailedAt time.Time + Attempts int // 1 on first failure; incremented on each Replay call +} +``` + +### `DeadLetterQueue` + +```go +func NewDeadLetterQueue() *DeadLetterQueue +``` + +| Method | Description | +|---|---| +| `Len() int` | Number of entries currently in the queue | +| `Entries() []DeadLetter` | Snapshot copy — safe to iterate without holding a lock | +| `Drain() []DeadLetter` | Remove and return all entries, leaving the queue empty | +| `Replay(ctx, store) error` | Re-subscribe all entries, call `Publish`, keep failures in queue | + +Attach to a store via `store.DLQ = GoEventBus.NewDeadLetterQueue()`. When `DLQ` is `nil` (default) behaviour is unchanged. + ### `Transaction` | Method | Description | diff --git a/dlq.go b/dlq.go new file mode 100644 index 0000000..ec31632 --- /dev/null +++ b/dlq.go @@ -0,0 +1,96 @@ +package GoEventBus + +import ( + "context" + "sync" + "time" +) + +// DeadLetter holds a failed event and the reason it could not be processed. +type DeadLetter struct { + Event Event + Err error // handler error, or a wrapped panic value + FailedAt time.Time + Attempts int // 1 on first failure; incremented on each Replay call +} + +// DeadLetterQueue is a thread-safe store for events that failed during dispatch. +// Attach one to an EventStore via store.DLQ to enable dead-letter routing. +// +// store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) +// store.DLQ = GoEventBus.NewDeadLetterQueue() +type DeadLetterQueue struct { + mu sync.Mutex + entries []DeadLetter +} + +// NewDeadLetterQueue returns an empty DeadLetterQueue ready for use. +func NewDeadLetterQueue() *DeadLetterQueue { + return &DeadLetterQueue{} +} + +func (q *DeadLetterQueue) add(dl DeadLetter) { + q.mu.Lock() + q.entries = append(q.entries, dl) + q.mu.Unlock() +} + +// Len returns the number of dead letters currently in the queue. +func (q *DeadLetterQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return len(q.entries) +} + +// Entries returns a snapshot copy of all dead letters without removing them. +func (q *DeadLetterQueue) Entries() []DeadLetter { + q.mu.Lock() + defer q.mu.Unlock() + out := make([]DeadLetter, len(q.entries)) + copy(out, q.entries) + return out +} + +// Drain removes and returns all dead letters, leaving the queue empty. +func (q *DeadLetterQueue) Drain() []DeadLetter { + q.mu.Lock() + defer q.mu.Unlock() + out := q.entries + q.entries = nil + return out +} + +// Replay re-enqueues all dead letters into store for reprocessing and calls +// store.Publish() once after. Each entry has its Attempts incremented before +// re-subscribing. Entries that fail to re-enqueue (e.g. buffer full) are kept +// in the queue. Returns the first Subscribe error encountered, or nil. +func (q *DeadLetterQueue) Replay(ctx context.Context, store *EventStore) error { + q.mu.Lock() + entries := q.entries + q.entries = nil + q.mu.Unlock() + + var failed []DeadLetter + var firstErr error + for _, dl := range entries { + dl.Attempts++ + if err := store.Subscribe(ctx, dl.Event); err != nil { + if firstErr == nil { + firstErr = err + } + dl.Err = err + failed = append(failed, dl) + } + } + + if len(failed) > 0 { + q.mu.Lock() + q.entries = append(failed, q.entries...) + q.mu.Unlock() + } + + if firstErr == nil { + store.Publish() + } + return firstErr +} diff --git a/dlq_test.go b/dlq_test.go new file mode 100644 index 0000000..bb176be --- /dev/null +++ b/dlq_test.go @@ -0,0 +1,250 @@ +package GoEventBus + +import ( + "context" + "errors" + "strings" + "sync/atomic" + "testing" + "time" +) + +// TestDLQ_HandlerError verifies that a handler returning an error routes the +// event to the DLQ with Attempts==1 and the correct error. +func TestDLQ_HandlerError(t *testing.T) { + sentinel := errors.New("boom") + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + return Result{}, sentinel + }) + + es := NewEventStore(&disp, 8, DropOldest) + es.DLQ = NewDeadLetterQueue() + + _ = es.Subscribe(context.Background(), Event{ID: "e1", Projection: "evt"}) + es.Publish() + + if es.DLQ.Len() != 1 { + t.Fatalf("expected 1 dead letter; got %d", es.DLQ.Len()) + } + dl := es.DLQ.Entries()[0] + if dl.Event.ID != "e1" { + t.Errorf("wrong event ID: %q", dl.Event.ID) + } + if !errors.Is(dl.Err, sentinel) { + t.Errorf("expected sentinel error; got %v", dl.Err) + } + if dl.Attempts != 1 { + t.Errorf("expected Attempts==1; got %d", dl.Attempts) + } + if dl.FailedAt.IsZero() { + t.Error("FailedAt should not be zero") + } +} + +// TestDLQ_HandlerPanic verifies that a panicking handler routes the event to +// the DLQ with a wrapped panic error and does not crash the process. +func TestDLQ_HandlerPanic(t *testing.T) { + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + panic("something exploded") + }) + + es := NewEventStore(&disp, 8, DropOldest) + es.DLQ = NewDeadLetterQueue() + + _ = es.Subscribe(context.Background(), Event{ID: "p1", Projection: "evt"}) + es.Publish() // must not panic + + if es.DLQ.Len() != 1 { + t.Fatalf("expected 1 dead letter; got %d", es.DLQ.Len()) + } + dl := es.DLQ.Entries()[0] + if !strings.Contains(dl.Err.Error(), "handler panic") { + t.Errorf("expected panic wrapper in error; got %v", dl.Err) + } + if !strings.Contains(dl.Err.Error(), "something exploded") { + t.Errorf("expected original panic message in error; got %v", dl.Err) + } +} + +// TestDLQ_PanicWrapsError verifies that a panic(err) (not a string) is wrapped +// with errors.Is semantics. +func TestDLQ_PanicWrapsError(t *testing.T) { + sentinel := errors.New("inner error") + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + panic(sentinel) + }) + + es := NewEventStore(&disp, 8, DropOldest) + es.DLQ = NewDeadLetterQueue() + + _ = es.Subscribe(context.Background(), Event{Projection: "evt"}) + es.Publish() + + dl := es.DLQ.Entries()[0] + if !errors.Is(dl.Err, sentinel) { + t.Errorf("expected errors.Is to find sentinel; got %v", dl.Err) + } +} + +// TestDLQ_NilDLQ verifies that behaviour is unchanged when no DLQ is attached. +func TestDLQ_NilDLQ(t *testing.T) { + sentinel := errors.New("fail") + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + return Result{}, sentinel + }) + + es := NewEventStore(&disp, 8, DropOldest) + // DLQ intentionally not set + + _ = es.Subscribe(context.Background(), Event{Projection: "evt"}) + es.Publish() // must not panic + + _, _, errs := es.Metrics() + if errs != 1 { + t.Errorf("expected errorCount==1; got %d", errs) + } +} + +// TestDLQ_Entries_IsCopy verifies that Entries returns a copy so mutations +// to the returned slice do not affect the queue. +func TestDLQ_Entries_IsCopy(t *testing.T) { + q := NewDeadLetterQueue() + q.add(DeadLetter{Event: Event{ID: "x"}, Attempts: 1}) + + snap := q.Entries() + snap[0].Attempts = 99 + + if q.Entries()[0].Attempts == 99 { + t.Error("Entries() returned a reference, not a copy") + } +} + +// TestDLQ_Drain empties the queue and returns all entries. +func TestDLQ_Drain(t *testing.T) { + q := NewDeadLetterQueue() + q.add(DeadLetter{Event: Event{ID: "a"}, Attempts: 1}) + q.add(DeadLetter{Event: Event{ID: "b"}, Attempts: 1}) + + out := q.Drain() + if len(out) != 2 { + t.Fatalf("expected 2 entries; got %d", len(out)) + } + if q.Len() != 0 { + t.Errorf("queue should be empty after Drain; len=%d", q.Len()) + } +} + +// TestDLQ_Replay re-enqueues dead letters and verifies they are reprocessed. +func TestDLQ_Replay(t *testing.T) { + var calls atomic.Int32 + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + calls.Add(1) + return Result{}, nil + }) + + es := NewEventStore(&disp, 16, DropOldest) + es.DLQ = NewDeadLetterQueue() + + // Manually seed the DLQ with two entries. + es.DLQ.add(DeadLetter{Event: Event{ID: "d1", Projection: "evt"}, Attempts: 1}) + es.DLQ.add(DeadLetter{Event: Event{ID: "d2", Projection: "evt"}, Attempts: 1}) + + if err := es.DLQ.Replay(context.Background(), es); err != nil { + t.Fatalf("Replay returned error: %v", err) + } + + // Replay calls Publish internally. + if es.DLQ.Len() != 0 { + t.Errorf("expected empty DLQ after successful replay; len=%d", es.DLQ.Len()) + } + if calls.Load() != 2 { + t.Errorf("expected 2 handler calls; got %d", calls.Load()) + } +} + +// TestDLQ_Replay_IncrementsAttempts verifies that Replay increments Attempts +// on entries that fail to re-subscribe and keeps them in the queue. +func TestDLQ_Replay_IncrementsAttempts(t *testing.T) { + disp := Dispatcher{} + // Buffer of 1 — pre-fill it so Subscribe always returns ErrBufferFull. + es := NewEventStore(&disp, 1, ReturnError) + es.DLQ = NewDeadLetterQueue() + // Pre-fill the single slot. + _ = es.Subscribe(context.Background(), Event{Projection: "x"}) + + es.DLQ.add(DeadLetter{Event: Event{ID: "stuck", Projection: "x"}, Attempts: 1}) + + err := es.DLQ.Replay(context.Background(), es) + if !errors.Is(err, ErrBufferFull) { + t.Fatalf("expected ErrBufferFull; got %v", err) + } + if es.DLQ.Len() != 1 { + t.Fatalf("failed entry should remain in DLQ; len=%d", es.DLQ.Len()) + } + if es.DLQ.Entries()[0].Attempts != 2 { + t.Errorf("expected Attempts==2 after failed replay; got %d", es.DLQ.Entries()[0].Attempts) + } +} + +// TestDLQ_Async verifies DLQ routing works in async mode. +func TestDLQ_Async(t *testing.T) { + sentinel := errors.New("async fail") + disp := Dispatcher{} + disp.Register("evt", func(_ context.Context, ev Event) (Result, error) { + return Result{}, sentinel + }) + + es := NewEventStore(&disp, 16, DropOldest) + es.DLQ = NewDeadLetterQueue() + es.Async = true + + for i := 0; i < 5; i++ { + _ = es.Subscribe(context.Background(), Event{Projection: "evt"}) + } + es.Publish() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := es.Drain(ctx); err != nil { + t.Fatalf("Drain: %v", err) + } + + if es.DLQ.Len() != 5 { + t.Errorf("expected 5 dead letters; got %d", es.DLQ.Len()) + } + for _, dl := range es.DLQ.Entries() { + if !errors.Is(dl.Err, sentinel) { + t.Errorf("unexpected error: %v", dl.Err) + } + } +} + +// TestDLQ_FanOut_PartialFailure verifies that in a fan-out, only the failing +// handler's event is dead-lettered; the successful handler is not. +func TestDLQ_FanOut_PartialFailure(t *testing.T) { + sentinel := errors.New("partial fail") + var goodCalled atomic.Int32 + disp := Dispatcher{} + disp.Register("evt", + func(_ context.Context, ev Event) (Result, error) { return Result{}, sentinel }, + func(_ context.Context, ev Event) (Result, error) { goodCalled.Add(1); return Result{}, nil }, + ) + + es := NewEventStore(&disp, 8, DropOldest) + es.DLQ = NewDeadLetterQueue() + + _ = es.Subscribe(context.Background(), Event{ID: "fanout", Projection: "evt"}) + es.Publish() + + if es.DLQ.Len() != 1 { + t.Errorf("expected 1 dead letter (only the failing handler); got %d", es.DLQ.Len()) + } + if goodCalled.Load() != 1 { + t.Errorf("expected successful handler to run once; got %d", goodCalled.Load()) + } +} diff --git a/eventstore.go b/eventstore.go index c7cffe2..fec586e 100644 --- a/eventstore.go +++ b/eventstore.go @@ -3,6 +3,7 @@ package GoEventBus import ( "context" "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -105,6 +106,9 @@ type EventStore struct { // txMu serialises Rollback against concurrent Subscribe calls. txMu sync.Mutex + + // DLQ, when non-nil, receives every event that fails or panics during dispatch. + DLQ *DeadLetterQueue } // NewEventStore initializes a new EventStore. It spins up a default worker pool. @@ -136,12 +140,7 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli func (es *EventStore) worker() { for w := range es.workCh { func(work eventWork) { - defer func() { - if r := recover(); r != nil { - atomic.AddUint64(&es.errorCount, 1) - } - es.wg.Done() - }() + defer es.wg.Done() es.execute(work.handler, work.ev) }(w) } @@ -237,12 +236,32 @@ func (es *EventStore) Publish() { } // execute runs the handler with middleware and hooks. +// It recovers from panics, treating them as errors so the DLQ and error hooks +// still fire and the caller (sync or worker goroutine) is never killed. func (es *EventStore) execute(h HandlerFunc, ev Event) { ctx := ev.Ctx if ctx == nil { ctx = context.Background() } + defer func() { + if r := recover(); r != nil { + var panicErr error + if e, ok := r.(error); ok { + panicErr = fmt.Errorf("handler panic: %w", e) + } else { + panicErr = fmt.Errorf("handler panic: %v", r) + } + atomic.AddUint64(&es.errorCount, 1) + if es.DLQ != nil { + es.DLQ.add(DeadLetter{Event: ev, Err: panicErr, FailedAt: time.Now(), Attempts: 1}) + } + for _, hook := range es.errorHooks { + hook(ctx, ev, panicErr) + } + } + }() + for _, hook := range es.beforeHooks { hook(ctx, ev) } @@ -258,6 +277,9 @@ func (es *EventStore) execute(h HandlerFunc, ev Event) { } if err != nil { atomic.AddUint64(&es.errorCount, 1) + if es.DLQ != nil { + es.DLQ.add(DeadLetter{Event: ev, Err: err, FailedAt: time.Now(), Attempts: 1}) + } for _, hook := range es.errorHooks { hook(ctx, ev, err) }