diff --git a/README.MD b/README.MD index d787ace..5239b8a 100644 --- a/README.MD +++ b/README.MD @@ -14,6 +14,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l - [Installation](#installation) - [Quick Start](#quick-start) - [Fan-out](#fan-out) +- [Batch Handlers](#batch-handlers) - [Middleware](#middleware) - [Lifecycle Hooks](#lifecycle-hooks) - [Back-pressure Policies](#back-pressure-policies) @@ -32,6 +33,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l - **Lock-free ring buffer** — atomic operations with cache-line padding keep dispatch sub-microsecond - **Fan-out dispatch** — register multiple independent handlers per projection; each runs its own middleware chain +- **Batch handlers** — collect up to N events per projection and deliver them as a slice; ideal for bulk DB writes and HTTP batching - **Sync or async** — flip `store.Async = true` to hand work off to a fixed worker pool - **Middleware** — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour - **Lifecycle hooks** — `OnBefore`, `OnAfter`, `OnError` for observability without touching handler logic @@ -143,6 +145,103 @@ In async mode each handler invocation becomes its own work item, so the four han --- +## Batch Handlers + +`RegisterBatch` collects all events for a projection that accumulate between `Publish` calls and delivers them to the handler as a slice — one call per chunk of up to `size` events. This eliminates per-event overhead for bulk operations like database inserts or HTTP batch APIs. + +```go +store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest) + +store.RegisterBatch("metrics.recorded", 50, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) { + rows := make([]MetricRow, len(evs)) + for i, ev := range evs { + rows[i] = ev.Data.(MetricRow) + } + return nil, db.BulkInsert(ctx, rows) +}) +``` + +### Chunking + +If more events arrive than the configured `size`, the handler is called multiple times — once per full chunk, then once for the remainder. + +```go +// 7 events, size=3 → called with [3 events], [3 events], [1 event] +store.RegisterBatch("tick", 3, handler) +``` + +### Per-event results + +Return a `[]Result` aligned with the input slice to pass per-event results to `OnAfter` hooks. A nil or shorter slice is fine — missing positions are treated as zero `Result` values. + +```go +store.RegisterBatch("order.placed", 100, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) { + results := make([]GoEventBus.Result, len(evs)) + for i, ev := range evs { + results[i] = GoEventBus.Result{Message: "processed"} + } + return results, nil +}) +``` + +### Fan-out with batch handlers + +Multiple batch handlers can be registered for the same projection. Each receives the full chunk independently. + +```go +store.RegisterBatch("order.placed", 50, writeToDatabase) +store.RegisterBatch("order.placed", 50, pushToAnalytics) +``` + +Regular per-event handlers and batch handlers can coexist on the same projection. Both fire on each `Publish` call. + +```go +disp.Register("order.placed", auditLogger) // called once per event +store.RegisterBatch("order.placed", 50, bulkWriter) // called once per chunk +``` + +### Error handling and the DLQ + +A batch handler returns a single error for the whole chunk. On error, every event in the failing chunk is sent to the DLQ and `OnError` fires once per event. Chunks that succeeded in the same `Publish` cycle are unaffected. + +```go +store.DLQ = GoEventBus.NewDeadLetterQueue() + +store.RegisterBatch("write", 25, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) { + if err := db.BulkInsert(ctx, evs); err != nil { + return nil, err // all 25 events land in the DLQ + } + return nil, nil +}) +``` + +### Panics + +Panics in a batch handler are recovered and treated identically to a returned error — the chunk lands in the DLQ, `OnError` fires, and the worker pool continues running. + +### Lifecycle hooks + +`OnBefore`, `OnAfter`, and `OnError` fire **once per event** even for batch handlers, keeping observability consistent regardless of handler type. Middleware is not applied to batch handlers (the signatures are incompatible); use hooks for cross-cutting concerns instead. + +### Async mode + +Batch handlers participate in the async worker pool when `store.Async = true`. Each chunk is dispatched as one work item. + +```go +store.Async = true + +store.RegisterBatch("events", 100, bulkWriter) + +for i := 0; i < 300; i++ { + _ = store.Subscribe(ctx, GoEventBus.Event{Projection: "events", Data: rows[i]}) +} + +store.Publish() +_ = store.Drain(context.Background()) // wait for all chunks to complete +``` + +--- + ## Middleware Middleware wraps the handler chain and is applied in the order it is registered. Use it for logging, tracing, timeout injection, or retries. @@ -174,7 +273,7 @@ Each handler in a fan-out gets its own independent copy of the middleware chain. ## Lifecycle Hooks -Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. +Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. For batch handlers, hooks fire once per event in the chunk rather than once per chunk. ```go store.OnBefore(func(ctx context.Context, ev GoEventBus.Event) { @@ -294,12 +393,12 @@ store.Publish() ### Fan-out and the DLQ -Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. +Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. The same rule applies to batch handlers: only the failing chunk's events land in the DLQ. ```go disp.Register("order.placed", - auditLogger, // fails -> one dead letter - invoiceHandler, // succeeds -> no dead letter + auditLogger, // fails -> one dead letter per event + invoiceHandler, // succeeds -> no dead letters ) ``` @@ -392,9 +491,9 @@ func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc) ```go type Event struct { ID string - Projection interface{} // key used to look up handlers - Data any // type-safe payload (preferred) - Args map[string]any // legacy payload (deprecated) + Projection interface{} // key used to look up handlers + Data any // type-safe payload (preferred) + Args map[string]any // legacy payload (deprecated) } ``` @@ -405,6 +504,14 @@ type HandlerFunc func(context.Context, Event) (Result, error) type Middleware func(HandlerFunc) HandlerFunc ``` +### `BatchHandlerFunc` + +```go +type BatchHandlerFunc func(context.Context, []Event) ([]Result, error) +``` + +Receives a slice of events collected during a `Publish` cycle. Returns one `Result` per input event (may be nil or shorter — missing entries default to zero `Result`) and a single error covering the whole batch. Middleware is not applied; use lifecycle hooks for cross-cutting concerns. + ### `Result` ```go @@ -448,19 +555,17 @@ Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two. |---|---| | `Subscribe(ctx, Event) error` | Enqueue an event; applies back-pressure per `OverrunPolicy` | | `Publish()` | Dispatch all pending events to their handlers | -| `Use(Middleware)` | Append middleware to the chain | -| `OnBefore(BeforeHook)` | Register a hook that runs before each handler | -| `OnAfter(AfterHook)` | Register a hook that runs after each handler | +| `RegisterBatch(projection, size, BatchHandlerFunc)` | Register a batch handler; events are delivered in chunks of up to `size` | +| `Use(Middleware)` | Append middleware to the chain (applied to per-event handlers only) | +| `OnBefore(BeforeHook)` | Register a hook that runs before each handler invocation | +| `OnAfter(AfterHook)` | Register a hook that runs after each handler invocation | | `OnError(ErrorHook)` | Register a hook that runs when a handler errors | | `Drain(ctx) error` | Wait for all async handlers; shuts down the worker pool | | `Close(ctx) error` | Alias for `Drain` | -| `Metrics() (published, processed, errors uint64)` | Snapshot event counters | +| `Metrics() (published, processed, errors uint64)` | Snapshot event counters; `processed` counts individual events, not batch calls | | `Schedule(ctx, time.Time, Event) *time.Timer` | Fire an event at a time | | `ScheduleAfter(ctx, time.Duration, Event) *time.Timer` | Fire an event after a duration | - -#### `BeginTransaction() *Transaction` - -Returns a `*Transaction` scoped to this store. +| `BeginTransaction() *Transaction` | Returns a `*Transaction` scoped to this store | ### `DeadLetter` diff --git a/batch.go b/batch.go new file mode 100644 index 0000000..f195e3b --- /dev/null +++ b/batch.go @@ -0,0 +1,93 @@ +package GoEventBus + +import ( + "context" + "fmt" + "sync/atomic" + "time" +) + +// BatchHandlerFunc receives a slice of events at once. It returns one Result per +// input event (the slice may be nil or shorter than events — missing entries are +// treated as zero Results) and a single error covering the whole batch. +// Middleware is not applied to batch handlers; use lifecycle hooks for +// cross-cutting concerns. +type BatchHandlerFunc func(context.Context, []Event) ([]Result, error) + +type batchEntry struct { + handler BatchHandlerFunc + size int +} + +// RegisterBatch registers h as a batch handler for projection. On each Publish +// call, all pending events for projection are collected and delivered to h in +// chunks of up to size events. Multiple calls for the same projection accumulate +// handlers (fan-out); each handler receives the full chunk independently. +// size must be positive; values ≤ 0 are clamped to 1. +func (es *EventStore) RegisterBatch(projection interface{}, size int, h BatchHandlerFunc) { + if size <= 0 { + size = 1 + } + es.batchHandlers[projection] = append(es.batchHandlers[projection], batchEntry{handler: h, size: size}) +} + +// executeBatch runs a batch handler, fires lifecycle hooks, and routes failures +// to the DLQ. It never panics — panics in the handler are recovered and treated +// as errors, mirroring the behaviour of execute for single-event handlers. +func (es *EventStore) executeBatch(h BatchHandlerFunc, events []Event) { + ctx := context.Background() + for _, ev := range events { + if ev.Ctx != nil { + ctx = ev.Ctx + break + } + } + + // Recover panics so the worker pool is never killed by a misbehaving handler. + var ( + results []Result + callErr error + ) + func() { + defer func() { + if r := recover(); r != nil { + if e, ok := r.(error); ok { + callErr = fmt.Errorf("handler panic: %w", e) + } else { + callErr = fmt.Errorf("handler panic: %v", r) + } + } + }() + for _, ev := range events { + for _, hook := range es.beforeHooks { + hook(ctx, ev) + } + } + results, callErr = h(ctx, events) + }() + + atomic.AddUint64(&es.processedCount, uint64(len(events))) + + for i, ev := range events { + var res Result + if i < len(results) { + res = results[i] + } + for _, hook := range es.afterHooks { + hook(ctx, ev, res, callErr) + } + } + + if callErr != nil { + atomic.AddUint64(&es.errorCount, 1) + now := time.Now() + for _, ev := range events { + if es.DLQ != nil { + es.DLQ.add(DeadLetter{Event: ev, Err: callErr, FailedAt: now, Attempts: 1}) + } + for _, hook := range es.errorHooks { + hook(ctx, ev, callErr) + } + } + } +} diff --git a/batch_test.go b/batch_test.go new file mode 100644 index 0000000..f35f232 --- /dev/null +++ b/batch_test.go @@ -0,0 +1,312 @@ +package GoEventBus + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" +) + +// TestBatch_Basic verifies that a batch handler receives all events published for its projection. +func TestBatch_Basic(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + + var got []Event + var mu sync.Mutex + es.RegisterBatch("order.placed", 100, func(_ context.Context, evs []Event) ([]Result, error) { + mu.Lock() + got = append(got, evs...) + mu.Unlock() + return nil, nil + }) + + for i := 0; i < 5; i++ { + _ = es.Subscribe(bg, Event{Projection: "order.placed"}) + } + es.Publish() + + mu.Lock() + defer mu.Unlock() + if len(got) != 5 { + t.Fatalf("batch handler received %d events; want 5", len(got)) + } +} + +// TestBatch_Chunking verifies that events are chunked by the configured size. +func TestBatch_Chunking(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + + var calls []int // lengths of each call + var mu sync.Mutex + es.RegisterBatch("tick", 3, func(_ context.Context, evs []Event) ([]Result, error) { + mu.Lock() + calls = append(calls, len(evs)) + mu.Unlock() + return nil, nil + }) + + for i := 0; i < 7; i++ { + _ = es.Subscribe(bg, Event{Projection: "tick"}) + } + es.Publish() + + mu.Lock() + defer mu.Unlock() + // 7 events with size=3 → chunks of [3, 3, 1] + if len(calls) != 3 { + t.Fatalf("handler called %d times; want 3", len(calls)) + } + if calls[0] != 3 || calls[1] != 3 || calls[2] != 1 { + t.Fatalf("chunk sizes %v; want [3 3 1]", calls) + } +} + +// TestBatch_FanOut verifies that multiple batch handlers on the same projection +// each receive all events independently. +func TestBatch_FanOut(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + + var countA, countB int32 + es.RegisterBatch("evt", 100, func(_ context.Context, evs []Event) ([]Result, error) { + atomic.AddInt32(&countA, int32(len(evs))) + return nil, nil + }) + es.RegisterBatch("evt", 100, func(_ context.Context, evs []Event) ([]Result, error) { + atomic.AddInt32(&countB, int32(len(evs))) + return nil, nil + }) + + for i := 0; i < 4; i++ { + _ = es.Subscribe(bg, Event{Projection: "evt"}) + } + es.Publish() + + if countA != 4 { + t.Errorf("handler A received %d events; want 4", countA) + } + if countB != 4 { + t.Errorf("handler B received %d events; want 4", countB) + } +} + +// TestBatch_MixedWithRegular verifies that batch and regular handlers for the +// same projection both fire independently. +func TestBatch_MixedWithRegular(t *testing.T) { + disp := Dispatcher{} + var regularCalls int32 + disp.Register("mixed", func(_ context.Context, ev Event) (Result, error) { + atomic.AddInt32(®ularCalls, 1) + return Result{}, nil + }) + + es := NewEventStore(&disp, 1<<16, DropOldest) + var batchTotal int32 + es.RegisterBatch("mixed", 100, func(_ context.Context, evs []Event) ([]Result, error) { + atomic.AddInt32(&batchTotal, int32(len(evs))) + return nil, nil + }) + + for i := 0; i < 3; i++ { + _ = es.Subscribe(bg, Event{Projection: "mixed"}) + } + es.Publish() + + if regularCalls != 3 { + t.Errorf("regular handler called %d times; want 3", regularCalls) + } + if batchTotal != 3 { + t.Errorf("batch handler received %d events; want 3", batchTotal) + } +} + +// TestBatch_DLQ verifies that a batch handler error sends all events in the +// failing chunk to the dead-letter queue. +func TestBatch_DLQ(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + es.DLQ = NewDeadLetterQueue() + + batchErr := errors.New("bulk insert failed") + es.RegisterBatch("write", 100, func(_ context.Context, _ []Event) ([]Result, error) { + return nil, batchErr + }) + + for i := 0; i < 4; i++ { + _ = es.Subscribe(bg, Event{Projection: "write"}) + } + es.Publish() + + entries := es.DLQ.Entries() + if len(entries) != 4 { + t.Fatalf("DLQ has %d entries; want 4", len(entries)) + } + for _, dl := range entries { + if !errors.Is(dl.Err, batchErr) { + t.Errorf("DLQ entry error = %v; want %v", dl.Err, batchErr) + } + if dl.Attempts != 1 { + t.Errorf("DLQ entry Attempts = %d; want 1", dl.Attempts) + } + } +} + +// TestBatch_DLQ_PerChunk verifies that only the failing chunk lands in DLQ +// when events span multiple chunks and one chunk succeeds. +func TestBatch_DLQ_PerChunk(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + es.DLQ = NewDeadLetterQueue() + + var calls int32 + batchErr := errors.New("second chunk failed") + es.RegisterBatch("write", 2, func(_ context.Context, evs []Event) ([]Result, error) { + n := atomic.AddInt32(&calls, 1) + if n == 2 { + return nil, batchErr + } + return nil, nil + }) + + for i := 0; i < 4; i++ { + _ = es.Subscribe(bg, Event{Projection: "write"}) + } + es.Publish() + + // 4 events, size=2 → 2 chunks. First succeeds, second fails → 2 in DLQ. + entries := es.DLQ.Entries() + if len(entries) != 2 { + t.Fatalf("DLQ has %d entries; want 2", len(entries)) + } +} + +// TestBatch_PanicRecovery verifies that a panicking batch handler is recovered +// and all events in the chunk land in the DLQ. +func TestBatch_PanicRecovery(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + es.DLQ = NewDeadLetterQueue() + + es.RegisterBatch("panic", 100, func(_ context.Context, _ []Event) ([]Result, error) { + panic("batch exploded") + }) + + for i := 0; i < 3; i++ { + _ = es.Subscribe(bg, Event{Projection: "panic"}) + } + es.Publish() // must not panic + + if es.DLQ.Len() != 3 { + t.Errorf("DLQ has %d entries; want 3", es.DLQ.Len()) + } +} + +// TestBatch_OnErrorHook verifies that OnError fires once per event in a +// failing batch chunk. +func TestBatch_OnErrorHook(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + + var errorFires int32 + es.OnError(func(_ context.Context, _ Event, _ error) { + atomic.AddInt32(&errorFires, 1) + }) + + es.RegisterBatch("failing", 100, func(_ context.Context, _ []Event) ([]Result, error) { + return nil, errors.New("fail") + }) + + for i := 0; i < 5; i++ { + _ = es.Subscribe(bg, Event{Projection: "failing"}) + } + es.Publish() + + if errorFires != 5 { + t.Errorf("OnError fired %d times; want 5", errorFires) + } +} + +// TestBatch_Metrics verifies that processedCount is incremented per event, not +// per batch call. +func TestBatch_Metrics(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + es.RegisterBatch("m", 100, func(_ context.Context, _ []Event) ([]Result, error) { + return nil, nil + }) + + for i := 0; i < 6; i++ { + _ = es.Subscribe(bg, Event{Projection: "m"}) + } + es.Publish() + + _, processed, _ := es.Metrics() + if processed != 6 { + t.Errorf("processed = %d; want 6", processed) + } +} + +// TestBatch_Async verifies batch handlers work correctly in async mode. +func TestBatch_Async(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + es.Async = true + + var received int32 + es.RegisterBatch("async", 3, func(_ context.Context, evs []Event) ([]Result, error) { + atomic.AddInt32(&received, int32(len(evs))) + return nil, nil + }) + + for i := 0; i < 9; i++ { + _ = es.Subscribe(bg, Event{Projection: "async"}) + } + es.Publish() + _ = es.Drain(context.Background()) + + if received != 9 { + t.Errorf("async batch received %d events; want 9", received) + } +} + +// TestBatch_Results verifies that per-event Results returned by the handler are +// forwarded to OnAfter hooks. +func TestBatch_Results(t *testing.T) { + disp := Dispatcher{} + es := NewEventStore(&disp, 1<<16, DropOldest) + + es.RegisterBatch("res", 100, func(_ context.Context, evs []Event) ([]Result, error) { + out := make([]Result, len(evs)) + for i := range out { + out[i] = Result{Message: "ok"} + } + return out, nil + }) + + var afterMessages []string + var mu sync.Mutex + es.OnAfter(func(_ context.Context, _ Event, r Result, _ error) { + mu.Lock() + afterMessages = append(afterMessages, r.Message) + mu.Unlock() + }) + + for i := 0; i < 3; i++ { + _ = es.Subscribe(bg, Event{Projection: "res"}) + } + es.Publish() + + mu.Lock() + defer mu.Unlock() + if len(afterMessages) != 3 { + t.Fatalf("OnAfter fired %d times; want 3", len(afterMessages)) + } + for _, msg := range afterMessages { + if msg != "ok" { + t.Errorf("OnAfter result.Message = %q; want \"ok\"", msg) + } + } +} diff --git a/eventstore.go b/eventstore.go index fec586e..57c5863 100644 --- a/eventstore.go +++ b/eventstore.go @@ -66,9 +66,19 @@ type Event struct { } // internal work unit for async dispatch -type eventWork struct { - handler HandlerFunc - ev Event +type workKind uint8 + +const ( + workSingle workKind = iota + workBatch +) + +type work struct { + kind workKind + handler HandlerFunc // workSingle + ev Event // workSingle + batchFn BatchHandlerFunc // workBatch + events []Event // workBatch } // EventStore is a high-performance, lock-free ring buffer with middleware and hooks support. @@ -92,9 +102,12 @@ type EventStore struct { afterHooks []AfterHook errorHooks []ErrorHook + // Batch handlers (projection → []batchEntry); populated via RegisterBatch. + batchHandlers map[interface{}][]batchEntry + // Async worker pool asyncWorkers int - workCh chan eventWork + workCh chan work wg sync.WaitGroup shutdownOnce sync.Once shutdownSignal chan struct{} @@ -125,8 +138,9 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli buf: make([]atomic.Pointer[Event], bufferSize), events: make([]Event, bufferSize), OverrunPolicy: policy, + batchHandlers: make(map[interface{}][]batchEntry), asyncWorkers: runtime.NumCPU(), - workCh: make(chan eventWork, bufferSize), + workCh: make(chan work, bufferSize), shutdownSignal: make(chan struct{}), } // start worker pool @@ -136,12 +150,17 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli return es } -// worker processes eventWork from the channel until shutdown. +// worker processes work items from the channel until shutdown. func (es *EventStore) worker() { for w := range es.workCh { - func(work eventWork) { + func(w work) { defer es.wg.Done() - es.execute(work.handler, work.ev) + switch w.kind { + case workSingle: + es.execute(w.handler, w.ev) + case workBatch: + es.executeBatch(w.batchFn, w.events) + } }(w) } } @@ -201,6 +220,9 @@ func (es *EventStore) Subscribe(ctx context.Context, e Event) error { } // Publish processes all pending events, applying middleware and hooks. +// Regular handlers receive one event at a time. Batch handlers registered via +// RegisterBatch receive events grouped by projection in chunks of up to their +// configured size. func (es *EventStore) Publish() { head := atomic.LoadUint64(&es.head) tail := atomic.LoadUint64(&es.tail) @@ -210,6 +232,13 @@ func (es *EventStore) Publish() { disp := *es.dispatcher mask := es.size - 1 + hasBatch := len(es.batchHandlers) > 0 + + // batchGroups collects events per projection for batch dispatch. + var batchGroups map[interface{}][]Event + if hasBatch { + batchGroups = make(map[interface{}][]Event) + } for i := tail; i < head; i++ { p := es.buf[i&mask].Load() @@ -222,7 +251,7 @@ func (es *EventStore) Publish() { if es.Async { es.wg.Add(1) select { - case es.workCh <- eventWork{handler, ev}: + case es.workCh <- work{kind: workSingle, handler: handler, ev: ev}: case <-es.shutdownSignal: es.wg.Done() } @@ -231,7 +260,40 @@ func (es *EventStore) Publish() { } } } + if hasBatch { + if _, ok := es.batchHandlers[ev.Projection]; ok { + batchGroups[ev.Projection] = append(batchGroups[ev.Projection], ev) + } + } + } + + // Dispatch batch handlers in chunks. + for proj, entries := range es.batchHandlers { + events := batchGroups[proj] + if len(events) == 0 { + continue + } + for _, entry := range entries { + for start := 0; start < len(events); start += entry.size { + end := start + entry.size + if end > len(events) { + end = len(events) + } + chunk := events[start:end] + if es.Async { + es.wg.Add(1) + select { + case es.workCh <- work{kind: workBatch, batchFn: entry.handler, events: chunk}: + case <-es.shutdownSignal: + es.wg.Done() + } + } else { + es.executeBatch(entry.handler, chunk) + } + } + } } + atomic.StoreUint64(&es.tail, head) }