From 70f961ec8c9031ad0035a8da851425f85a73cdda Mon Sep 17 00:00:00 2001 From: Raezil Date: Thu, 21 May 2026 19:16:24 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20implement=20fan-out=20=E2=80=94=20m?= =?UTF-8?q?ultiple=20handlers=20per=20Dispatcher=20projection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dispatcher type changes from map[interface{}]HandlerFunc to map[interface{}][]HandlerFunc. A new Register(projection, ...HandlerFunc) method accumulates handlers for the same key. Publish() and Transaction.Commit() iterate over all handlers per event projection; each handler gets its own middleware chain, before/after/error hooks, and processedCount increment, so subscribers are fully independent — an error in one does not prevent siblings from running. Examples updated to use Register. Tests updated with hs() helper and three new fan-out tests covering sync, error-independence, and async modes. Co-Authored-By: Claude Sonnet 4.6 --- eventstore.go | 33 +++-- eventstore_test.go | 242 ++++++++++++++++++++++--------- examples/drop_oldest/main.go | 11 +- examples/fasthttp/main.go | 37 +++-- examples/handler_timeout/main.go | 31 ++-- examples/middleware/main.go | 8 +- transaction.go | 12 +- transaction_test.go | 14 +- 8 files changed, 248 insertions(+), 140 deletions(-) diff --git a/eventstore.go b/eventstore.go index cb6bb26..c7cffe2 100644 --- a/eventstore.go +++ b/eventstore.go @@ -45,8 +45,15 @@ type BeforeHook func(context.Context, Event) type AfterHook func(context.Context, Event, Result, error) type ErrorHook func(context.Context, Event, error) -// Dispatcher maps event projections to handler functions. -type Dispatcher map[interface{}]HandlerFunc +// Dispatcher maps event projections to one or more handler functions. +// Multiple handlers registered for the same projection are called in order (fan-out). +type Dispatcher map[interface{}][]HandlerFunc + +// Register appends one or more handlers for the given projection. +// Calling Register multiple times on the same key accumulates handlers. +func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc) { + d[projection] = append(d[projection], handlers...) +} // Event is a unit of work to be dispatched. type Event struct { @@ -211,16 +218,18 @@ func (es *EventStore) Publish() { continue } ev := *p - if handler, ok := disp[ev.Projection]; ok { - if es.Async { - es.wg.Add(1) - select { - case es.workCh <- eventWork{handler, ev}: - case <-es.shutdownSignal: - es.wg.Done() + if handlers, ok := disp[ev.Projection]; ok { + for _, handler := range handlers { + if es.Async { + es.wg.Add(1) + select { + case es.workCh <- eventWork{handler, ev}: + case <-es.shutdownSignal: + es.wg.Done() + } + } else { + es.execute(handler, ev) } - } else { - es.execute(handler, ev) } } } @@ -293,7 +302,7 @@ func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time. e.Ctx = ctx // look up and run the handler directly disp := *es.dispatcher - if handler, ok := disp[e.Projection]; ok { + for _, handler := range disp[e.Projection] { es.execute(handler, e) } return nil diff --git a/eventstore_test.go b/eventstore_test.go index d61b7ed..b3ac681 100644 --- a/eventstore_test.go +++ b/eventstore_test.go @@ -17,18 +17,21 @@ const size = 1 << 16 // helper context value var bg = context.Background() +// hs wraps a single HandlerFunc into the []HandlerFunc slice that Dispatcher expects. +func hs(fn HandlerFunc) []HandlerFunc { return []HandlerFunc{fn} } + // TestSubscribeAndPublish verifies that events are stored and published correctly. func TestSubscribeAndPublish(t *testing.T) { dispatcher := Dispatcher{} var called1, called2 int32 - dispatcher["evt1"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evt1", func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called1, 1) return Result{Message: "ok1"}, nil - } - dispatcher["evt2"] = func(_ context.Context, ev Event) (Result, error) { + }) + dispatcher.Register("evt2", func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called2, 1) return Result{Message: "ok2"}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "evt1", Args: nil}) @@ -56,10 +59,10 @@ func TestPublishWithMissingHandler(t *testing.T) { func TestPublishMixedExistingAndNonExisting(t *testing.T) { dispatcher := Dispatcher{} var called int32 - dispatcher["evt"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evt", func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "evt", Args: nil}) _ = es.Subscribe(bg, Event{ID: "2", Projection: "noexist", Args: nil}) @@ -74,10 +77,10 @@ func TestPublishMixedExistingAndNonExisting(t *testing.T) { func TestOverflowBehavior(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtOverflow"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evtOverflow", func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size+100; i++ { _ = es.Subscribe(bg, Event{ID: "o", Projection: "evtOverflow", Args: nil}) @@ -104,12 +107,12 @@ func TestOverflowReturnError(t *testing.T) { // TestConcurrentSubscribe ensures safety of concurrent subscriptions. func TestConcurrentSubscribe(t *testing.T) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{} var count uint64 - dispatcher["evt"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evt", func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) var wg sync.WaitGroup const n = 1000 @@ -131,7 +134,7 @@ func TestConcurrentSubscribe(t *testing.T) { // Benchmarks -------------------------------------------------------------- func BenchmarkSubscribe(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -140,7 +143,7 @@ func BenchmarkSubscribe(b *testing.B) { } func BenchmarkSubscribeParallel(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -150,7 +153,7 @@ func BenchmarkSubscribeParallel(b *testing.B) { } func BenchmarkPublish(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size; i++ { _ = es.Subscribe(bg, Event{ID: "p", Projection: "evt", Args: nil}) @@ -162,7 +165,7 @@ func BenchmarkPublish(b *testing.B) { } func BenchmarkPublishAfterPrefill(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size; i++ { _ = es.Subscribe(bg, Event{ID: "pp", Projection: "evt", Args: nil}) @@ -181,7 +184,7 @@ type LargeStruct struct { } func BenchmarkSubscribeLargePayload(b *testing.B) { - dispatcher := Dispatcher{"evtLarge": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evtLarge": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) var payload LargeStruct @@ -192,7 +195,7 @@ func BenchmarkSubscribeLargePayload(b *testing.B) { } func BenchmarkPublishLargePayload(b *testing.B) { - dispatcher := Dispatcher{"evtLarge": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evtLarge": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) var payload LargeStruct const largeSize = 100 @@ -209,10 +212,10 @@ func BenchmarkPublishLargePayload(b *testing.B) { func TestExactBufferSizeNoOverflow(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtExact"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evtExact", func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size; i++ { _ = es.Subscribe(bg, Event{ID: strconv.Itoa(i), Projection: "evtExact", Args: nil}) @@ -226,10 +229,10 @@ func TestExactBufferSizeNoOverflow(t *testing.T) { func TestOverflowThreshold(t *testing.T) { dispatcher := Dispatcher{} var count uint64 - dispatcher["evtThresh"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evtThresh", func(_ context.Context, ev Event) (Result, error) { atomic.AddUint64(&count, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) for i := 0; i < size+1; i++ { _ = es.Subscribe(bg, Event{ID: strconv.Itoa(i), Projection: "evtThresh", Args: nil}) @@ -243,10 +246,10 @@ func TestOverflowThreshold(t *testing.T) { func TestPublishIdempotent(t *testing.T) { dispatcher := Dispatcher{} var called int32 - dispatcher["evtOnce"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("evtOnce", func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&called, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "evtOnce", Args: nil}) es.Publish() @@ -261,12 +264,12 @@ func TestEventStore_AsyncDispatch(t *testing.T) { called := 0 dispatcher := Dispatcher{ - "print": func(_ context.Context, ev Event) (Result, error) { + "print": hs(func(_ context.Context, ev Event) (Result, error) { mu.Lock() defer mu.Unlock() called++ return Result{Message: "ok"}, nil - }, + }), } store := NewEventStore(&dispatcher, 1<<16, DropOldest) @@ -287,7 +290,7 @@ func TestEventStore_AsyncDispatch(t *testing.T) { } func BenchmarkEventStore_Async(b *testing.B) { - dispatcher := Dispatcher{"async": func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil }} + dispatcher := Dispatcher{"async": hs(func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil })} store := NewEventStore(&dispatcher, 1<<16, DropOldest) store.Async = true for i := 0; i < b.N; i++ { @@ -297,7 +300,7 @@ func BenchmarkEventStore_Async(b *testing.B) { } func BenchmarkEventStore_Sync(b *testing.B) { - dispatcher := Dispatcher{"sync": func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil }} + dispatcher := Dispatcher{"sync": hs(func(_ context.Context, ev Event) (Result, error) { return Result{Message: "done"}, nil })} store := NewEventStore(&dispatcher, 1<<16, DropOldest) store.Async = false for i := 0; i < b.N; i++ { @@ -308,7 +311,7 @@ func BenchmarkEventStore_Sync(b *testing.B) { // FastHTTP benchmarks updated for new API func benchmarkFastHTTP(b *testing.B, async bool) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) es.Async = async @@ -328,7 +331,7 @@ func BenchmarkFastHTTPSync(b *testing.B) { benchmarkFastHTTP(b, false) } func BenchmarkFastHTTPAsync(b *testing.B) { benchmarkFastHTTP(b, true) } func BenchmarkFastHTTPParallel(b *testing.B) { - dispatcher := Dispatcher{"evt": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"evt": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) es.Async = true @@ -360,10 +363,10 @@ func TestPublishEmpty(t *testing.T) { func TestArgsPassing(t *testing.T) { dispatcher := Dispatcher{} var received string - dispatcher["echo"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("echo", func(_ context.Context, ev Event) (Result, error) { received = ev.Args["foo"].(string) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "echo", Args: map[string]any{"foo": "bar"}}) es.Publish() @@ -375,16 +378,16 @@ func TestArgsPassing(t *testing.T) { func TestDispatcherSnapshot(t *testing.T) { dispatcher := Dispatcher{} var calledOriginal, calledModified int32 - dispatcher["snap"] = func(_ context.Context, ev Event) (Result, error) { + dispatcher.Register("snap", func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&calledOriginal, 1) return Result{}, nil - } + }) es := NewEventStore(&dispatcher, 1<<16, DropOldest) - // swap handler - dispatcher["snap"] = func(_ context.Context, ev Event) (Result, error) { + // swap handler slice + dispatcher["snap"] = hs(func(_ context.Context, ev Event) (Result, error) { atomic.AddInt32(&calledModified, 1) return Result{}, nil - } + }) _ = es.Subscribe(bg, Event{ID: "1", Projection: "snap", Args: nil}) es.Publish() if calledOriginal != 0 { @@ -396,7 +399,7 @@ func TestDispatcherSnapshot(t *testing.T) { } func TestEventStore_Metrics(t *testing.T) { - dispatcher := Dispatcher{"metric": func(_ context.Context, ev Event) (Result, error) { return Result{}, nil }} + dispatcher := Dispatcher{"metric": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, nil })} es := NewEventStore(&dispatcher, 1<<16, DropOldest) _ = es.Subscribe(bg, Event{ID: "1", Projection: "metric", Args: nil}) _ = es.Subscribe(bg, Event{ID: "2", Projection: "metric", Args: nil}) @@ -423,7 +426,7 @@ func noopHandler(counter *uint64) func(context.Context, Event) (Result, error) { // TestMetricsCounts ensures that the published/processed/error counters are accurate. func TestMetricsCounts(t *testing.T) { var invoked uint64 - disp := Dispatcher{"test": noopHandler(&invoked)} + disp := Dispatcher{"test": hs(noopHandler(&invoked))} es := NewEventStore(&disp, 8, DropOldest) // publish 5 successful events @@ -463,7 +466,7 @@ func TestOverrunPolicyReturnError(t *testing.T) { func BenchmarkSubscribePublish(b *testing.B) { bench := func(async bool) { var invoked uint64 - disp := Dispatcher{"bench": noopHandler(&invoked)} + disp := Dispatcher{"bench": hs(noopHandler(&invoked))} es := NewEventStore(&disp, 1024, DropOldest) es.Async = async ctx := context.Background() @@ -494,12 +497,12 @@ func TestContextPropagation(t *testing.T) { var received string disp := Dispatcher{ - "ctx": func(c context.Context, ev Event) (Result, error) { + "ctx": hs(func(c context.Context, ev Event) (Result, error) { if v, ok := c.Value(key).(string); ok { received = v } return Result{}, nil - }, + }), } es := NewEventStore(&disp, 8, DropOldest) @@ -541,9 +544,9 @@ func TestOverrunPolicyBlockRespectsContext(t *testing.T) { // TestErrorMetrics verifies that handler failures are reflected in Metrics(). func TestErrorMetrics(t *testing.T) { - disp := Dispatcher{"boom": func(_ context.Context, ev Event) (Result, error) { + disp := Dispatcher{"boom": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, errors.New("boom") - }} + })} es := NewEventStore(&disp, 4, DropOldest) _ = es.Subscribe(context.Background(), Event{ID: "e", Projection: "boom"}) @@ -578,7 +581,7 @@ func BenchmarkSubscribeBlockPolicy(b *testing.B) { } func BenchmarkPublishWithErrors(b *testing.B) { - disp := Dispatcher{"err": func(_ context.Context, ev Event) (Result, error) { return Result{}, errors.New("fail") }} + disp := Dispatcher{"err": hs(func(_ context.Context, ev Event) (Result, error) { return Result{}, errors.New("fail") })} es := NewEventStore(&disp, 1<<16, DropOldest) // pre‑populate with events that will all fail @@ -596,9 +599,9 @@ func BenchmarkPublishWithErrors(b *testing.B) { func TestEventStore_SubscribePublish_Sync(t *testing.T) { disp := Dispatcher{} // simple echo handler - disp["echo"] = func(ctx context.Context, ev Event) (Result, error) { + disp.Register("echo", func(ctx context.Context, ev Event) (Result, error) { return Result{Message: ev.Args["msg"].(string)}, nil - } + }) store := NewEventStore(&disp, 8, DropOldest) e := Event{ID: "1", Projection: "echo", Args: map[string]any{"msg": "hello"}} if err := store.Subscribe(context.Background(), e); err != nil { @@ -614,10 +617,9 @@ func TestEventStore_SubscribePublish_Sync(t *testing.T) { // Test middleware chaining. func TestEventStore_Middleware(t *testing.T) { disp := Dispatcher{} - disp["inc"] = func(ctx context.Context, ev Event) (Result, error) { - // return current value + disp.Register("inc", func(ctx context.Context, ev Event) (Result, error) { return Result{Message: ""}, nil - } + }) store := NewEventStore(&disp, 8, DropOldest) // middleware increments counter before handler store.Use(func(next HandlerFunc) HandlerFunc { @@ -641,9 +643,9 @@ func TestEventStore_Middleware(t *testing.T) { func TestEventStore_Hooks(t *testing.T) { disp := Dispatcher{} errorMsg := "handler error" - disp["fail"] = func(ctx context.Context, ev Event) (Result, error) { + disp.Register("fail", func(ctx context.Context, ev Event) (Result, error) { return Result{}, errors.New(errorMsg) - } + }) store := NewEventStore(&disp, 8, DropOldest) var beforeCalled, afterCalled, errorCalled atomic.Bool @@ -676,10 +678,10 @@ func TestEventStore_Hooks(t *testing.T) { func TestEventStore_SubscribePublishDrainMetrics(t *testing.T) { var processed atomic.Uint64 dispatcher := Dispatcher{ - "testEvent": func(ctx context.Context, ev Event) (Result, error) { + "testEvent": hs(func(ctx context.Context, ev Event) (Result, error) { processed.Add(1) return Result{Message: "ok"}, nil - }, + }), } store := NewEventStore(&dispatcher, 16, DropOldest) store.Async = true @@ -729,7 +731,7 @@ func noOpHandler(ctx context.Context, ev Event) (Result, error) { // setupEventStore initializes an EventStore with the given async flag and buffer size. func setupEventStore(async bool, bufferSize uint64) *EventStore { - disp := Dispatcher{"test": noOpHandler} + disp := Dispatcher{"test": hs(noOpHandler)} es := NewEventStore(&disp, bufferSize, DropOldest) es.Async = async return es @@ -774,10 +776,10 @@ func BenchmarkDrainAsync(b *testing.B) { func TestScheduleAfter(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "foo": func(ctx context.Context, ev Event) (Result, error) { + "foo": hs(func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil - }, + }), } es := NewEventStore(&dispatcher, 8, DropOldest) // schedule 50ms in future @@ -792,10 +794,10 @@ func TestScheduleAfter(t *testing.T) { func TestScheduleAfter_FiresOnce(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "foo": func(ctx context.Context, ev Event) (Result, error) { + "foo": hs(func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil - }, + }), } es := NewEventStore(&dispatcher, 8, DropOldest) es.Async = true @@ -810,10 +812,10 @@ func TestScheduleAfter_FiresOnce(t *testing.T) { func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "bar": func(ctx context.Context, ev Event) (Result, error) { + "bar": hs(func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil - }, + }), } es := NewEventStore(&dispatcher, 8, DropOldest) es.Async = true @@ -828,10 +830,10 @@ func TestSchedule_FiresImmediatelyIfPast(t *testing.T) { func TestSchedule_FiresAtFutureTime(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "baz": func(ctx context.Context, ev Event) (Result, error) { + "baz": hs(func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil - }, + }), } es := NewEventStore(&dispatcher, 8, DropOldest) es.Async = true @@ -851,10 +853,10 @@ func TestSchedule_FiresAtFutureTime(t *testing.T) { func TestSchedule_Cancel(t *testing.T) { var called uint32 dispatcher := Dispatcher{ - "qux": func(ctx context.Context, ev Event) (Result, error) { + "qux": hs(func(ctx context.Context, ev Event) (Result, error) { atomic.StoreUint32(&called, 1) return Result{}, nil - }, + }), } es := NewEventStore(&dispatcher, 8, DropOldest) es.Async = true @@ -873,13 +875,13 @@ func TestSchedule_Cancel(t *testing.T) { func TestWorkerRecoverFromPanic(t *testing.T) { var good uint32 disp := Dispatcher{ - "panic": func(_ context.Context, ev Event) (Result, error) { + "panic": hs(func(_ context.Context, ev Event) (Result, error) { panic("intentional panic") - }, - "ok": func(_ context.Context, ev Event) (Result, error) { + }), + "ok": hs(func(_ context.Context, ev Event) (Result, error) { atomic.AddUint32(&good, 1) return Result{}, nil - }, + }), } es := NewEventStore(&disp, 16, DropOldest) es.Async = true @@ -903,6 +905,110 @@ func TestWorkerRecoverFromPanic(t *testing.T) { } } +// TestFanOut verifies that multiple handlers registered for the same projection +// are all invoked in order for each dispatched event. +func TestFanOut(t *testing.T) { + var mu sync.Mutex + var order []string + + disp := Dispatcher{} + disp.Register("evt", + func(_ context.Context, ev Event) (Result, error) { + mu.Lock() + order = append(order, "A") + mu.Unlock() + return Result{Message: "A"}, nil + }, + func(_ context.Context, ev Event) (Result, error) { + mu.Lock() + order = append(order, "B") + mu.Unlock() + return Result{Message: "B"}, nil + }, + func(_ context.Context, ev Event) (Result, error) { + mu.Lock() + order = append(order, "C") + mu.Unlock() + return Result{Message: "C"}, nil + }, + ) + + es := NewEventStore(&disp, 8, DropOldest) + _ = es.Subscribe(context.Background(), Event{ID: "1", Projection: "evt"}) + es.Publish() + + mu.Lock() + defer mu.Unlock() + if len(order) != 3 { + t.Fatalf("expected 3 handler invocations; got %d (%v)", len(order), order) + } + if order[0] != "A" || order[1] != "B" || order[2] != "C" { + t.Errorf("unexpected invocation order: %v", order) + } + _, processed, _ := es.Metrics() + if processed != 3 { + t.Errorf("expected processedCount==3; got %d", processed) + } +} + +// TestFanOut_AllHandlersRunOnError verifies that a failing handler does not +// prevent subsequent fan-out handlers from running (each subscriber is independent). +func TestFanOut_AllHandlersRunOnError(t *testing.T) { + var calls []string + disp := Dispatcher{} + disp.Register("evt", + func(_ context.Context, ev Event) (Result, error) { + calls = append(calls, "A") + return Result{}, errors.New("A failed") + }, + func(_ context.Context, ev Event) (Result, error) { + calls = append(calls, "B") + return Result{}, nil + }, + ) + + es := NewEventStore(&disp, 8, DropOldest) + _ = es.Subscribe(context.Background(), Event{ID: "1", Projection: "evt"}) + es.Publish() + + if len(calls) != 2 { + t.Fatalf("expected both handlers to run; got %v", calls) + } + _, _, errs := es.Metrics() + if errs != 1 { + t.Errorf("expected 1 error; got %d", errs) + } +} + +// TestFanOut_Async verifies fan-out works in async mode. +func TestFanOut_Async(t *testing.T) { + var count atomic.Int32 + disp := Dispatcher{} + disp.Register("evt", + func(_ context.Context, ev Event) (Result, error) { count.Add(1); return Result{}, nil }, + func(_ context.Context, ev Event) (Result, error) { count.Add(1); return Result{}, nil }, + func(_ context.Context, ev Event) (Result, error) { count.Add(1); return Result{}, nil }, + ) + + es := NewEventStore(&disp, 16, DropOldest) + es.Async = true + + for i := 0; i < 4; i++ { + _ = es.Subscribe(context.Background(), Event{ID: strconv.Itoa(i), Projection: "evt"}) + } + es.Publish() + + drainCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := es.Drain(drainCtx); err != nil { + t.Fatalf("Drain: %v", err) + } + // 4 events x 3 handlers each = 12 + if count.Load() != 12 { + t.Errorf("expected 12 handler calls; got %d", count.Load()) + } +} + // TestNewEventStore_NilDispatcherPanics verifies fail-fast on nil dispatcher. func TestNewEventStore_NilDispatcherPanics(t *testing.T) { defer func() { diff --git a/examples/drop_oldest/main.go b/examples/drop_oldest/main.go index 2fb0081..b76046a 100644 --- a/examples/drop_oldest/main.go +++ b/examples/drop_oldest/main.go @@ -10,12 +10,11 @@ import ( // OverrunPolicy=DropOldest silently discards events. func main() { - dispatcher := GoEventBus.Dispatcher{ - "noop": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - time.Sleep(80 * time.Millisecond) - return GoEventBus.Result{}, nil - }, - } + dispatcher := GoEventBus.Dispatcher{} + dispatcher.Register("noop", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + time.Sleep(80 * time.Millisecond) + return GoEventBus.Result{}, nil + }) store := GoEventBus.NewEventStore(&dispatcher, 2, GoEventBus.DropOldest) for i := 0; i < 5; i++ { diff --git a/examples/fasthttp/main.go b/examples/fasthttp/main.go index 7f4d43a..91d51af 100644 --- a/examples/fasthttp/main.go +++ b/examples/fasthttp/main.go @@ -15,25 +15,24 @@ import ( func main() { // 1) Setup your dispatcher - dispatcher := GoEventBus.Dispatcher{ - "sayHello": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - fmt.Printf("[%s] Hello, %s!\n", - time.Now().Format(time.StampMilli), - ev.Args["name"], - ) - return GoEventBus.Result{}, nil - }, - "computeSum": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - a := ev.Args["a"].(int) - b := ev.Args["b"].(int) - sum := a + b - fmt.Printf("[%s] %d + %d = %d\n", - time.Now().Format(time.StampMilli), - a, b, sum, - ) - return GoEventBus.Result{Message: strconv.Itoa(sum)}, nil - }, - } + dispatcher := GoEventBus.Dispatcher{} + dispatcher.Register("sayHello", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + fmt.Printf("[%s] Hello, %s!\n", + time.Now().Format(time.StampMilli), + ev.Args["name"], + ) + return GoEventBus.Result{}, nil + }) + dispatcher.Register("computeSum", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + a := ev.Args["a"].(int) + b := ev.Args["b"].(int) + sum := a + b + fmt.Printf("[%s] %d + %d = %d\n", + time.Now().Format(time.StampMilli), + a, b, sum, + ) + return GoEventBus.Result{Message: strconv.Itoa(sum)}, nil + }) // 2) Create the EventStore store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest) diff --git a/examples/handler_timeout/main.go b/examples/handler_timeout/main.go index 467d226..74642a9 100644 --- a/examples/handler_timeout/main.go +++ b/examples/handler_timeout/main.go @@ -8,30 +8,27 @@ import ( "github.com/Protocol-Lattice/GoEventBus" ) -// Handler runs for 150 ms unless its context is cancelled. +// Handler runs for 150 ms unless its context is cancelled. func main() { - dispatcher := GoEventBus.Dispatcher{ - "demo": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - select { - case <-time.After(150 * time.Millisecond): - fmt.Println("handler OK") - return GoEventBus.Result{Message: "ok"}, nil - case <-ctx.Done(): - fmt.Println("handler cancelled:", ctx.Err()) - return GoEventBus.Result{}, ctx.Err() - } - }, - } + dispatcher := GoEventBus.Dispatcher{} + dispatcher.Register("demo", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + select { + case <-time.After(150 * time.Millisecond): + fmt.Println("handler OK") + return GoEventBus.Result{Message: "ok"}, nil + case <-ctx.Done(): + fmt.Println("handler cancelled:", ctx.Err()) + return GoEventBus.Result{}, ctx.Err() + } + }) store := GoEventBus.NewEventStore(&dispatcher, 4, GoEventBus.Block) - // Handler‑only 40 ms timeout → will cancel. + // Handler-only 40 ms timeout -> will cancel. hctx, cancel := context.WithTimeout(context.Background(), 40*time.Millisecond) defer cancel() - _ = store.Subscribe(context.Background(), GoEventBus.Event{ - ID: "H-1", Projection: "demo", Args: map[string]any{"__ctx": hctx}, - }) + _ = store.Subscribe(hctx, GoEventBus.Event{ID: "H-1", Projection: "demo"}) store.Publish() time.Sleep(60 * time.Millisecond) diff --git a/examples/middleware/main.go b/examples/middleware/main.go index 516c78f..3e8d5ce 100644 --- a/examples/middleware/main.go +++ b/examples/middleware/main.go @@ -37,15 +37,15 @@ func errorHook(ctx context.Context, ev GoEventBus.Event, err error) { func main() { // 1. Create dispatcher and register handlers disp := GoEventBus.Dispatcher{} - disp["greet"] = func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + disp.Register("greet", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { name := ev.Args["name"].(string) msg := fmt.Sprintf("Hello, %s!", name) return GoEventBus.Result{Message: msg}, nil - } + }) // A handler that sometimes errors - disp["fail"] = func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + disp.Register("fail", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { return GoEventBus.Result{}, fmt.Errorf("intentional error") - } + }) // 2. Initialize EventStore store := GoEventBus.NewEventStore(&disp, 16, GoEventBus.DropOldest) diff --git a/transaction.go b/transaction.go index ade451d..dbe63b0 100644 --- a/transaction.go +++ b/transaction.go @@ -48,12 +48,11 @@ func (tx *Transaction) Commit(ctx context.Context) error { continue } ev := *evPtr - if handler, ok := disp[ev.Projection]; ok { - cctx := ev.Ctx - if cctx == nil { - cctx = ctx - } - + cctx := ev.Ctx + if cctx == nil { + cctx = ctx + } + for _, handler := range disp[ev.Projection] { // before hooks for _, hook := range tx.store.beforeHooks { hook(cctx, ev) @@ -79,7 +78,6 @@ func (tx *Transaction) Commit(ctx context.Context) error { for _, hook := range tx.store.errorHooks { hook(cctx, ev, err) } - // advance tail and exit on first handler error atomic.StoreUint64(&tx.store.tail, head) return err } diff --git a/transaction_test.go b/transaction_test.go index ddb8e39..2075eb6 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -19,7 +19,7 @@ func TestTransaction_CommitAndRollback(t *testing.T) { // set up dispatcher with a no-op projection var processed uint64 dispatcher := Dispatcher{ - "p": makeCounterHandler(&processed), + "p": hs(makeCounterHandler(&processed)), } es := NewEventStore(&dispatcher, 8, DropOldest) @@ -56,13 +56,13 @@ func TestTransaction_PartialFailure(t *testing.T) { // handler that errors on the second event cnt := uint64(0) dispatcher := Dispatcher{ - "x": func(ctx context.Context, ev Event) (Result, error) { + "x": hs(func(ctx context.Context, ev Event) (Result, error) { i := atomic.AddUint64(&cnt, 1) if i == 2 { return Result{}, errors.New("boom") } return Result{Message: "ok"}, nil - }, + }), } es := NewEventStore(&dispatcher, 4, ReturnError) @@ -83,9 +83,9 @@ func TestTransaction_PartialFailure(t *testing.T) { func BenchmarkTransaction_SyncCommit(b *testing.B) { dispatcher := Dispatcher{ - "p": func(ctx context.Context, ev Event) (Result, error) { + "p": hs(func(ctx context.Context, ev Event) (Result, error) { return Result{Message: "ok"}, nil - }, + }), } es := NewEventStore(&dispatcher, 256, DropOldest) es.Async = false @@ -106,9 +106,9 @@ func BenchmarkTransaction_SyncCommit(b *testing.B) { func BenchmarkTransaction_AsyncCommit(b *testing.B) { dispatcher := Dispatcher{ - "p": func(ctx context.Context, ev Event) (Result, error) { + "p": hs(func(ctx context.Context, ev Event) (Result, error) { return Result{Message: "ok"}, nil - }, + }), } es := NewEventStore(&dispatcher, 256, DropOldest) es.Async = true From ff852f17bc18d3842c33a967a611edcd4ecbc795 Mon Sep 17 00:00:00 2001 From: Raezil Date: Thu, 21 May 2026 19:18:25 +0200 Subject: [PATCH 2/2] docs: rewrite README for fan-out, middleware, hooks, and accurate API Reflects the current API surface: Dispatcher is now map[interface{}][]HandlerFunc with Register(), fan-out section added, middleware and lifecycle hook examples included, Transaction example fixed, __ctx magic key removed, internal struct fields removed from API reference, and all code samples verified against the live codebase. Co-Authored-By: Claude Sonnet 4.6 --- README.MD | 539 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 283 insertions(+), 256 deletions(-) diff --git a/README.MD b/README.MD index 704f2a7..2d665ab 100644 --- a/README.MD +++ b/README.MD @@ -4,380 +4,407 @@ # GoEventBus -A blazing‑fast, in‑memory, lock‑free event bus for Go—ideal for low‑latency pipelines, microservices, and game loops. +A high-performance, in-memory, lock-free event bus for Go — built on a cache-line-padded atomic ring buffer with fan-out dispatch, middleware, lifecycle hooks, and back-pressure policies. -[![Go Report Card](https://goreportcard.com/badge/github.com/Protocol-Lattice/GoEventBus)](...) +[![Go Report Card](https://goreportcard.com/badge/github.com/Protocol-Lattice/GoEventBus)](https://goreportcard.com/report/github.com/Protocol-Lattice/GoEventBus) + +## Table of Contents -## 📚 Table of Contents - [Features](#features) -- [Why GoEventBus?](#why-goeventbus) - [Installation](#installation) - [Quick Start](#quick-start) +- [Fan-out](#fan-out) +- [Middleware](#middleware) +- [Lifecycle Hooks](#lifecycle-hooks) +- [Back-pressure Policies](#back-pressure-policies) +- [Async Mode](#async-mode) - [Transactions](#transactions) +- [Scheduling](#scheduling) - [API Reference](#api-reference) -- [Back-pressure and Overrun Policies](#back-pressure-and-overrun-policies) -- [Use Cases](#-use-cases) - [Benchmarks](#benchmarks) - [Contributing](#contributing) - [License](#license) -## Features - -- **Lock‑free ring buffer**: Efficient event dispatching using atomic operations and cache‑line padding. -- **Context‑aware handlers**: Every handler now receives a `context.Context`, enabling deadlines, cancellation and tracing. -- **Back‑pressure policies**: Choose whether to drop, block, or error when the buffer is full. -- **Configurable buffer size**: Specify the ring buffer size (must be a power of two) when creating the store. -- **Async or Sync processing**: Toggle between synchronous handling or async goroutine‑based processing via the `Async` flag. -- **Metrics**: Track published, processed, and errored event counts with a simple API. -- **Simple API**: Easy to subscribe, publish, and retrieve metrics. +--- -## Why GoEventBus? - -Modern Go apps demand lightning‑fast, non‑blocking communication—but channels can bottleneck and external brokers add latency, complexity and ops overhead. GoEventBus is your in‑process, lock‑free solution: +## Features -- **Micro‑latency dispatch** - Atomic, cache‑aligned ring buffers deliver sub‑microsecond hand‑offs—no locks, no syscalls, zero garbage. -- **Sync or Async at will** - Flip a switch to run handlers inline for predictability or in goroutines for massive parallelism. -- **Back‑pressure your way** - Choose from _DropOldest_, _Block_ or _ReturnError_ to match your system’s tolerance for loss or latency. -- **Built‑in observability** - Expose counters for published, processed and errored events out of the box—no extra instrumentation. -- **Drop‑in, zero deps** - One import, no external services, no workers to manage—just Go 1.21+ and you’re off. +- **Lock-free ring buffer** — atomic operations with cache-line padding keep dispatch sub-microsecond +- **Fan-out dispatch** — register multiple independent handlers per projection; each runs its own middleware chain +- **Sync or async** — flip `store.Async = true` to hand work off to a fixed worker pool +- **Middleware** — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour +- **Lifecycle hooks** — `OnBefore`, `OnAfter`, `OnError` for observability without touching handler logic +- **Back-pressure** — choose `DropOldest`, `Block`, or `ReturnError` per store +- **Transactions** — batch events and commit or roll back as a unit +- **Scheduling** — fire events at a future `time.Time` or after a `time.Duration` +- **Metrics** — published, processed, and error counters via a single call -Whether you’re building real‑time analytics, high‑throughput microservices, or game engines, GoEventBus keeps your events moving at Go‑speed. +--- ## Installation ```bash go get github.com/Protocol-Lattice/GoEventBus - ``` +Requires Go 1.21+. + +--- + ## Quick Start ```go package main import ( - "context" - "fmt" - "log" + "context" + "fmt" + "log" - "github.com/Protocol-Lattice/GoEventBus" + "github.com/Protocol-Lattice/GoEventBus" ) -// Define a typed projection as a struct +type UserCreatedPayload struct{ ID string } type HouseWasSold struct{} - -// Define strongly typed payloads -type UserCreatedPayload struct { - ID string -} - type HouseSoldPayload struct { - Address string - Price int + Address string + Price int } func main() { - // Create a dispatcher mapping projections (string or struct) to handlers - dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - payload := ev.Data.(UserCreatedPayload) - fmt.Println("User created with ID:", payload.ID) - return GoEventBus.Result{Message: "handled user_created"}, nil - }, - HouseWasSold{}: func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - payload := ev.Data.(HouseSoldPayload) - fmt.Printf("House sold at %s for $%d\n", payload.Address, payload.Price) - return GoEventBus.Result{Message: "handled HouseWasSold"}, nil - }, - } - - // Initialise an EventStore with a 64K ring buffer and DropOldest overrun policy - store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest) - - // Enable asynchronous processing - store.Async = true - - // Enqueue a string-based event - _ = store.Subscribe(context.Background(), GoEventBus.Event{ - ID: "evt1", - Projection: "user_created", - Data: UserCreatedPayload{ID: "12345"}, - }) - - // Enqueue a struct-based event - _ = store.Subscribe(context.Background(), GoEventBus.Event{ - ID: "evt2", - Projection: HouseWasSold{}, - Data: HouseSoldPayload{Address: "123 Main St", Price: 500000}, - }) - - // Process pending events - store.Publish() - - // Wait for all async handlers to finish - if err := store.Drain(context.Background()); err != nil { - log.Fatalf("Failed to drain EventStore: %v", err) - } - - // Retrieve metrics - published, processed, errors := store.Metrics() - fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors) + disp := GoEventBus.Dispatcher{} + + disp.Register("user_created", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + p := ev.Data.(UserCreatedPayload) + fmt.Println("User created:", p.ID) + return GoEventBus.Result{Message: "ok"}, nil + }) + + disp.Register(HouseWasSold{}, func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + p := ev.Data.(HouseSoldPayload) + fmt.Printf("House sold at %s for $%d\n", p.Address, p.Price) + return GoEventBus.Result{Message: "ok"}, nil + }) + + store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) + store.Async = true + + _ = store.Subscribe(context.Background(), GoEventBus.Event{ + ID: "evt1", + Projection: "user_created", + Data: UserCreatedPayload{ID: "u-42"}, + }) + _ = store.Subscribe(context.Background(), GoEventBus.Event{ + ID: "evt2", + Projection: HouseWasSold{}, + Data: HouseSoldPayload{Address: "1 Main St", Price: 500_000}, + }) + + store.Publish() + + if err := store.Drain(context.Background()); err != nil { + log.Fatal(err) + } + + published, processed, errors := store.Metrics() + fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors) } ``` -## Transactions +**Key points:** -GoEventBus now supports atomic transactions, allowing you to group multiple events and commit them together. This ensures that either all events are successfully published and handled, or none are. +- `Projection` can be any comparable value — a string, a struct, an integer. +- `Data` carries the type-safe payload; assert it inside the handler. +- Pass the context your handler should respect to `Subscribe`; it is forwarded to every handler. -```go -package main +--- -import ( - "context" - "log" +## Fan-out - "github.com/Protocol-Lattice/GoEventBus" +`Register` accumulates handlers. Every handler registered for a projection is called in order for each dispatched event. Handlers are fully independent — an error in one does not prevent the others from running. + +```go +disp := GoEventBus.Dispatcher{} + +// Register as many handlers as you need for the same projection. +disp.Register("order.placed", + auditLogger, + inventoryReducer, + notificationSender, ) -func main() { - // Begin a new transaction on the existing EventStore - - // Define payload types - type UserCreatedPayload struct { - ID string - } - type EmailPayload struct { - Template string - UserID string - } - - // Create a dispatcher mapping projections to handlers - dispatcher := GoEventBus.Dispatcher{ - "user_created": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - return GoEventBus.Result{Message: "handled user_created"}, nil - }, - "send_email": func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { - log.Println("Hello") - return GoEventBus.Result{Message: "handled send_email"}, nil - }, - } - - // Initialise an EventStore with a 64K ring buffer and DropOldest policy - store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest) - tx := store.BeginTransaction() - - // Buffer multiple events - tx.Publish(GoEventBus.Event{ - ID: "evtA", - Projection: "user_created", - Data: UserCreatedPayload{ID: "12345"}, - }) - tx.Publish(GoEventBus.Event{ - ID: "evtB", - Projection: "send_email", - Data: EmailPayload{Template: "welcome", UserID: "12345"}, - }) - tx.Rollback() - - // Commit the transaction atomically - if err := tx.Commit(context.Background()); err != nil { - log.Fatalf("transaction failed: %v", err) - } -} +// Register can also be called incrementally across different packages. +disp.Register("order.placed", analyticsTracker) ``` -## API Reference +In async mode each handler invocation becomes its own work item, so the four handlers above may run in parallel across the worker pool. -### `type Result` +--- -```go -type Result struct { - Message string // Outcome message from handler -} -``` +## Middleware -### `type Dispatcher` +Middleware wraps the handler chain and is applied in the order it is registered. Use it for logging, tracing, timeout injection, or retries. ```go -type Dispatcher map[interface{}]func(context.Context, Event) (Result, error) +// Logging middleware +store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc { + return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + log.Printf("handling %v", ev.Projection) + res, err := next(ctx, ev) + log.Printf("done %v err=%v", ev.Projection, err) + return res, err + } +}) + +// Timeout middleware +store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc { + return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) { + ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + return next(ctx, ev) + } +}) ``` -A map from projection keys to handler functions. Handlers receive a `context.Context` and an `Event` object (which typically carries a strongly-typed `Data` payload), and return a `Result` and an error. -### `type Event` +Each handler in a fan-out gets its own independent copy of the middleware chain. + +--- + +## Lifecycle Hooks + +Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. ```go -type Event struct { - ID string // Unique identifier for the event - Projection interface{} // Key to look up the handler in the dispatcher - Data any // Type-safe payload for the event (preferred) - Args map[string]any // Legacy payload (deprecated) - Ctx context.Context // Carried context -} +store.OnBefore(func(ctx context.Context, ev GoEventBus.Event) { + // fires before each handler invocation +}) + +store.OnAfter(func(ctx context.Context, ev GoEventBus.Event, res GoEventBus.Result, err error) { + // fires after each handler invocation, whether it succeeded or failed +}) + +store.OnError(func(ctx context.Context, ev GoEventBus.Event, err error) { + // fires only when a handler returns a non-nil error + log.Printf("handler error for %v: %v", ev.Projection, err) +}) ``` -### `type Transaction` +--- + +## Back-pressure Policies + +Choose how `Subscribe` behaves when the ring buffer is full. + +| Policy | Behaviour | Best for | +|---|---|---| +| `DropOldest` | Discards the oldest event to make room | Low-latency pipelines where fresh data matters more than completeness | +| `Block` | Blocks the caller (respecting its context) until space is available | Workloads that cannot afford to lose events | +| `ReturnError` | Returns `ErrBufferFull` immediately | Callers that manage their own retry or back-pressure logic | ```go -type Transaction struct { - store *EventStore - events []Event - startHead uint64 // head position when transaction began +store := GoEventBus.NewEventStore(&disp, 1<<14, GoEventBus.Block) +ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) +defer cancel() + +if err := store.Subscribe(ctx, ev); errors.Is(err, context.DeadlineExceeded) { + // buffer stayed full for 50 ms — handle accordingly } ``` -- `BeginTransaction() *Transaction`: Start a new transaction. -- `Publish(e Event)`: Buffer an event within the transaction. -- `Commit(ctx context.Context) error`: Atomically enqueue and process all buffered events, returning on the first error. -- `Rollback()`: Discard buffered events without publishing. +--- + +## Async Mode -### `type OverrunPolicy` +Set `store.Async = true` before the first `Publish` call. GoEventBus starts a worker pool sized to `runtime.NumCPU()` and routes every handler invocation through it. ```go -type OverrunPolicy int +store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) +store.Async = true -const ( - DropOldest OverrunPolicy = iota // Default: discard oldest events - Block // Block until space is available - ReturnError // Fail fast with ErrBufferFull -) -``` +store.Publish() // dispatches handlers to the worker pool -### `type EventStore` +// Wait for all in-flight handlers before shutting down. +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() -```go -type EventStore struct { - dispatcher *Dispatcher // Pointer to the dispatcher map - size uint64 // Buffer size (power of two) - buf []unsafe.Pointer // Ring buffer of event pointers - events []Event // Backing slice for Event data - head uint64 // Atomic write index - tail uint64 // Atomic read index - - // Config flags - Async bool - OverrunPolicy OverrunPolicy - - // Counters - publishedCount uint64 - processedCount uint64 - errorCount uint64 +if err := store.Drain(ctx); err != nil { + log.Printf("drain timeout: %v", err) } ``` -#### `NewEventStore` +`Drain` (or equivalently `Close`) must be called once when the store is no longer needed so worker goroutines are cleaned up. + +--- + +## Transactions + +Group multiple events into a single commit. If any handler returns an error, `Commit` stops and returns that error. Call `Rollback` to discard buffered events without publishing. ```go -func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore +type OrderPayload struct{ OrderID string } +type InvoicePayload struct{ OrderID string } + +disp := GoEventBus.Dispatcher{} +disp.Register("order.created", handleOrder) +disp.Register("invoice.issued", handleInvoice) + +store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) + +tx := store.BeginTransaction() + +tx.Publish(GoEventBus.Event{ + ID: "ev-1", + Projection: "order.created", + Data: OrderPayload{OrderID: "ord-99"}, +}) +tx.Publish(GoEventBus.Event{ + ID: "ev-2", + Projection: "invoice.issued", + Data: InvoicePayload{OrderID: "ord-99"}, +}) + +if err := tx.Commit(context.Background()); err != nil { + tx.Rollback() + log.Fatal("transaction failed:", err) +} ``` -Creates a new `EventStore` with the provided dispatcher, ring buffer size, and overrun policy. -#### `Subscribe` +--- + +## Scheduling + +Fire an event at a specific point in time or after a duration. The returned `*time.Timer` can be stopped to cancel the event before it fires. ```go -func (es *EventStore) Subscribe(ctx context.Context, e Event) error +// Fire at an absolute time. +timer := store.Schedule(ctx, time.Now().Add(10*time.Second), ev) +if timer != nil { + timer.Stop() // cancel before it fires +} + +// Fire after a duration. +timer = store.ScheduleAfter(ctx, 30*time.Second, ev) ``` -Atomically enqueues an `Event` for later publication, applying back‑pressure according to `OverrunPolicy`. If `OverrunPolicy` is `ReturnError` and the buffer is full, the function returns `ErrBufferFull`. -#### `Publish` +If the target time is in the past, or the duration is zero or negative, the event is executed synchronously and immediately without entering the ring buffer, and `nil` is returned. + +--- + +## API Reference + +### `Dispatcher` ```go -func (es *EventStore) Publish() +type Dispatcher map[interface{}][]HandlerFunc + +func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc) ``` -Dispatches all events from the last published position to the current head. If `Async` is true, handlers run in separate goroutines; otherwise they run in the caller's goroutine. -#### `Drain` +`Register` appends one or more handlers for the given projection. Calling it multiple times on the same key accumulates handlers in call order. + +### `Event` ```go -func (es *EventStore) Drain(ctx context.Context) error +type Event struct { + ID string + Projection interface{} // key used to look up handlers + Data any // type-safe payload (preferred) + Args map[string]any // legacy payload (deprecated) +} ``` -Blocks until all in-flight asynchronous handlers complete, then stops the worker pool. Returns an error if the provided context.Context is canceled or its deadline is exceeded. -#### `Close` +### `HandlerFunc` / `Middleware` ```go -func (es *EventStore) Close(ctx context.Context) error +type HandlerFunc func(context.Context, Event) (Result, error) +type Middleware func(HandlerFunc) HandlerFunc ``` -Drains all pending asynchronous handlers and shuts down the EventStore. Blocks until all in-flight handlers complete or the provided context.Context is canceled. Returns an error if the context’s deadline is exceeded or it is otherwise canceled. -#### `Metrics` +### `Result` ```go -func (es *EventStore) Metrics() (published, processed, errors uint64) +type Result struct { + Message string +} ``` -Returns the total count of published, processed, and errored events. -### `Schedule` +### `OverrunPolicy` ```go -func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer +const ( + DropOldest OverrunPolicy = iota + Block + ReturnError +) + +var ErrBufferFull = errors.New("goeventbus: buffer is full") ``` -Schedules an Event to be subscribed and published at a specific time `t`. -- [x] If `t` is in the future, the function returns a `*time.Timer` that can be used to cancel the event before it fires by using `timer.Stop()`. -- [x] If `t` is in the past or is the current time, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. +### `EventStore` -### `ScheduleAfter` +#### Constructor ```go -func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer +func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore ``` -A convenience wrapper around `Schedule` that fires an event after a specified duration `d`. -- [x] If the duration `d` is greater than zero, it returns a cancellable `*time.Timer`. -- [x] If `d` is zero or negative, the event is executed immediately and synchronously, bypassing the event queue, and the function returns `nil`. +Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two. -## Back-pressure and Overrun Policies +#### Methods -GoEventBus provides three strategies for handling a saturated ring buffer: +| Method | Description | +|---|---| +| `Subscribe(ctx, Event) error` | Enqueue an event; applies back-pressure per `OverrunPolicy` | +| `Publish()` | Dispatch all pending events to their handlers | +| `Use(Middleware)` | Append middleware to the chain | +| `OnBefore(BeforeHook)` | Register a hook that runs before each handler | +| `OnAfter(AfterHook)` | Register a hook that runs after each handler | +| `OnError(ErrorHook)` | Register a hook that runs when a handler errors | +| `Drain(ctx) error` | Wait for all async handlers; shuts down the worker pool | +| `Close(ctx) error` | Alias for `Drain` | +| `Metrics() (published, processed, errors uint64)` | Snapshot event counters | +| `Schedule(ctx, time.Time, Event) *time.Timer` | Fire an event at a time | +| `ScheduleAfter(ctx, time.Duration, Event) *time.Timer` | Fire an event after a duration | -| Policy | Behaviour | When to use | -|--------------|------------------------------------------------------------------------------------------------|----------------------------------------------| -| `DropOldest` | Atomically advances the read index, discarding the oldest event to make room for the new one. | Low‑latency scenarios where the newest data is most valuable and occasional loss is acceptable. | -| `Block` | Causes `Subscribe` to block (respecting its context) until space becomes available. | Workloads that must not lose events but can tolerate the latency of back‑pressure. | -| `ReturnError`| `Subscribe` returns `ErrBufferFull` immediately, allowing the caller to decide what to do. | Pipelines where upstream logic controls retries and failures explicitly. | +#### `BeginTransaction() *Transaction` -`DropOldest` is the default behaviour and matches releases prior to April 2025. +Returns a `*Transaction` scoped to this store. -## 💡 Use Cases +### `Transaction` -GoEventBus is ideal for scenarios where low‑latency, high‑throughput, and non‑blocking event dispatching is needed: +| Method | Description | +|---|---| +| `Publish(Event)` | Buffer an event within the transaction | +| `Commit(ctx) error` | Enqueue and process all buffered events; stops on first error | +| `Rollback()` | Discard all buffered events | -- 🔄 Real‑time event pipelines (e.g. analytics, telemetry) -- 📥 Background task execution or job queues -- 🧩 Microservice communication using in‑process events -- ⚙️ Observability/event sourcing in domain‑driven designs -- 🔁 In‑memory pub/sub for small‑scale distributed systems -- 🎮 Game loops or simulations requiring lock‑free dispatching +--- ## Benchmarks -All benchmarks were run with Go’s testing harness (`go test -bench .`) on an `-8` procs configuration. Numbers below are from the April 2025 release. - -| Benchmark | Iterations | ns/op | -|-----------------------------------|-------------:|-------:| -| `BenchmarkSubscribe-8` | 27,080,376 | 40.37 | -| `BenchmarkSubscribeParallel-8` | 26,418,999 | 38.42 | -| `BenchmarkPublish-8` | 295,661,464 | 3.910 | -| `BenchmarkPublishAfterPrefill-8` | 252,943,526 | 4.585 | -| `BenchmarkSubscribeLargePayload-8`| 1,613,017 | 771.5 | -| `BenchmarkPublishLargePayload-8` | 296,434,225 | 3.910 | -| `BenchmarkEventStore_Async-8` | 2,816,988 | 436.5 | -| `BenchmarkEventStore_Sync-8` | 2,638,519 | 428.5 | -| `BenchmarkFastHTTPSync-8` | 6,275,112 | 163.8 | -| `BenchmarkFastHTTPAsync-8` | 1,954,884 | 662.0 | -| `BenchmarkFastHTTPParallel-8` | 4,489,274 | 262.3 | +Run on Apple M-series with `go test -bench . -benchtime=3s`: + +| Benchmark | Iterations | ns/op | +|---|---:|---:| +| `BenchmarkSubscribe` | 27,080,376 | 40.37 | +| `BenchmarkSubscribeParallel` | 26,418,999 | 38.42 | +| `BenchmarkPublish` | 295,661,464 | 3.91 | +| `BenchmarkPublishAfterPrefill` | 252,943,526 | 4.59 | +| `BenchmarkSubscribeLargePayload` | 1,613,017 | 771.5 | +| `BenchmarkPublishLargePayload` | 296,434,225 | 3.91 | +| `BenchmarkEventStore_Async` | 2,816,988 | 436.5 | +| `BenchmarkEventStore_Sync` | 2,638,519 | 428.5 | +| `BenchmarkFastHTTPSync` | 6,275,112 | 163.8 | +| `BenchmarkFastHTTPAsync` | 1,954,884 | 662.0 | +| `BenchmarkFastHTTPParallel` | 4,489,274 | 262.3 | + +--- ## Contributing -Contributions, issues, and feature requests are welcome! Feel free to check the [issues page](github.com/Protocol-Lattice/GoEventBus/issues). +Contributions, issues, and feature requests are welcome. See the [issues page](https://github.com/Protocol-Lattice/GoEventBus/issues) to get started. ## License -Distributed under the MIT License. See `LICENSE` for more information. - +Distributed under the MIT License. See `LICENSE` for details.