diff --git a/CHANGELOG.md b/CHANGELOG.md index a5388cc089..a16720a53d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) +- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298) - Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299) ## v1.1.1 diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 3c7af2f611..39031ccde3 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -2,8 +2,11 @@ package store import ( "context" + "sync" + "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/rs/zerolog" "github.com/evstack/ev-node/types" ) @@ -14,15 +17,37 @@ const ( // DefaultBlockDataCacheSize is the default number of block data entries to cache. DefaultBlockDataCacheSize = 200_000 + + asyncWriteBufferSize = 8192 + + // batchWindow is the time the write goroutine waits after receiving the first + // op before flushing. This allows bursts of metadata writes (e.g. 3-4 per + // height in the submitter) to be coalesced into a single Badger WriteBatch. + batchWindow = 100 * time.Microsecond ) +type asyncWriteOp struct { + key string + value []byte + isDelete bool +} + // CachedStore wraps a Store with LRU caching for frequently accessed data. // The underlying LRU cache is thread-safe, so no additional synchronization is needed. +// Metadata writes (SetMetadata, DeleteMetadata) are processed asynchronously via a +// buffered channel to avoid blocking Badger's write pipeline for critical operations +// like block production (batch commits). type CachedStore struct { Store headerCache *lru.Cache[uint64, *types.SignedHeader] blockDataCache *lru.Cache[uint64, *blockDataEntry] + + writeCh chan asyncWriteOp + done chan struct{} + stopMu sync.RWMutex + stopped bool + logger zerolog.Logger } type blockDataEntry struct { @@ -73,6 +98,9 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error Store: store, headerCache: headerCache, blockDataCache: blockDataCache, + writeCh: make(chan asyncWriteOp, asyncWriteBufferSize), + done: make(chan struct{}), + logger: zerolog.Nop(), } for _, opt := range opts { @@ -81,9 +109,58 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error } } + cs.startWriteLoop() + return cs, nil } +func (cs *CachedStore) startWriteLoop() { + go func() { + defer close(cs.done) + for op := range cs.writeCh { + ops := []asyncWriteOp{op} + + timer := time.NewTimer(batchWindow) + collect: + for { + select { + case op, ok := <-cs.writeCh: + if !ok { + timer.Stop() + break collect + } + ops = append(ops, op) + case <-timer.C: + break collect + } + } + + last := make(map[string]asyncWriteOp, len(ops)) + for _, o := range ops { + last[o.key] = o + } + + var puts []MetadataKV + var deletes []string + for _, o := range last { + if o.isDelete { + deletes = append(deletes, o.key) + } else { + puts = append(puts, MetadataKV{Key: o.key, Value: o.value}) + } + } + + if err := cs.BatchMetadata(context.Background(), puts, deletes); err != nil { + for _, o := range ops { + cs.logger.Error().Err(err).Str("key", o.key). + Bool("delete", o.isDelete). + Msg("async metadata batch write failed") + } + } + } + }() +} + // GetHeader returns the header at the given height, using the cache if available. func (cs *CachedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { // Try cache first @@ -162,7 +239,7 @@ func (cs *CachedStore) Rollback(ctx context.Context, height uint64, aggregator b } // PruneBlocks wraps the underlying store's PruneBlocks and invalidates caches -// up to the heigh that we purne +// up to the height that we prune func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { if err := cs.Store.PruneBlocks(ctx, height); err != nil { return err @@ -173,8 +250,42 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } -// Close closes the underlying store. +// SetMetadata queues an asynchronous metadata write. The write is persisted +// by the background goroutine via BatchMetadata. If the store has been stopped, +// the write falls back to synchronous execution on the underlying store. +func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error { + cs.stopMu.RLock() + defer cs.stopMu.RUnlock() + + if cs.stopped { + return cs.Store.SetMetadata(ctx, key, value) + } + valueCopy := append([]byte(nil), value...) + cs.writeCh <- asyncWriteOp{key: key, value: valueCopy} + return nil +} + +// DeleteMetadata queues an asynchronous metadata delete. If the store has been +// stopped, the delete falls back to synchronous execution. +func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error { + cs.stopMu.RLock() + defer cs.stopMu.RUnlock() + + if cs.stopped { + return cs.Store.DeleteMetadata(ctx, key) + } + cs.writeCh <- asyncWriteOp{key: key, isDelete: true} + return nil +} + +// Close drains pending async writes, then closes the underlying store. func (cs *CachedStore) Close() error { + cs.stopMu.Lock() + cs.stopped = true + close(cs.writeCh) + cs.stopMu.Unlock() + <-cs.done + cs.ClearCache() return cs.Store.Close() } diff --git a/pkg/store/cached_store_test.go b/pkg/store/cached_store_test.go index 2579eb48a6..432d90f035 100644 --- a/pkg/store/cached_store_test.go +++ b/pkg/store/cached_store_test.go @@ -2,8 +2,12 @@ package store import ( "context" + "fmt" "testing" + "time" + ds "github.com/ipfs/go-datastore" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -270,3 +274,129 @@ func TestCachedStore_Close(t *testing.T) { err = cachedStore.Close() require.NoError(t, err) } + +func TestCachedStore_AsyncSetMetadata(t *testing.T) { + t.Parallel() + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + require.NoError(t, cs.SetMetadata(ctx, "key1", []byte("value1"))) + + require.Eventually(t, func() bool { + v, err := base.GetMetadata(ctx, "key1") + return err == nil && string(v) == "value1" + }, time.Second, 10*time.Millisecond) +} + +func TestCachedStore_AsyncDeleteMetadata(t *testing.T) { + t.Parallel() + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + require.NoError(t, base.SetMetadata(ctx, "key1", []byte("value1"))) + + cs, err := NewCachedStore(base) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + require.NoError(t, cs.DeleteMetadata(ctx, "key1")) + + require.Eventually(t, func() bool { + _, err := base.GetMetadata(ctx, "key1") + return err != nil + }, time.Second, 10*time.Millisecond) +} + +func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { + ctx := context.Background() + + dir := t.TempDir() + kv, err := NewDefaultKVStore(dir, "", "test-db") + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + + const n = 100 + for i := range n { + k := fmt.Sprintf("key-%d", i) + require.NoError(t, cs.SetMetadata(ctx, k, []byte(k))) + } + + require.NoError(t, cs.Close()) + + kv2, err := NewDefaultKVStore(dir, "", "test-db") + require.NoError(t, err) + t.Cleanup(func() { kv2.Close() }) + reopened := New(kv2) + + for i := range n { + k := fmt.Sprintf("key-%d", i) + v, err := reopened.GetMetadata(ctx, k) + require.NoError(t, err) + require.Equal(t, []byte(k), v) + } +} + +func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) { + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + + ctx := context.Background() + require.NoError(t, cs.SetMetadata(ctx, "before", []byte("ok"))) + + require.NoError(t, cs.Close()) + + err = cs.SetMetadata(ctx, "after", []byte("sync")) + require.Error(t, err) +} + +func TestCachedStore_CoalescesSameKeyOps(t *testing.T) { + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + require.NoError(t, kv.Put(ctx, ds.NewKey(GetMetaKey("k")), []byte("original"))) + + base := New(kv) + + writeCh := make(chan asyncWriteOp, asyncWriteBufferSize) + done := make(chan struct{}) + cs := &CachedStore{ + Store: base, + writeCh: writeCh, + done: done, + logger: zerolog.Nop(), + } + cs.startWriteLoop() + + require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v1"))) + require.NoError(t, cs.DeleteMetadata(ctx, "k")) + require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v2"))) + + cs.stopMu.Lock() + cs.stopped = true + close(writeCh) + cs.stopMu.Unlock() + <-done + + v, err := base.GetMetadata(ctx, "k") + require.NoError(t, err) + require.Equal(t, []byte("v2"), v, "last write (Set) should win over delete") +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 975db4e163..4c045d1d89 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -190,6 +190,30 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte return nil } +func (s *DefaultStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + if len(puts) == 0 && len(deletes) == 0 { + return nil + } + batch, err := s.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create metadata batch: %w", err) + } + for _, kv := range puts { + if err := batch.Put(ctx, ds.NewKey(GetMetaKey(kv.Key)), kv.Value); err != nil { + return fmt.Errorf("failed to batch-put metadata key '%s': %w", kv.Key, err) + } + } + for _, key := range deletes { + if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(key))); err != nil { + return fmt.Errorf("failed to batch-delete metadata key '%s': %w", key, err) + } + } + if err := batch.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit metadata batch: %w", err) + } + return nil +} + // GetMetadata returns values stored for given key with SetMetadata. func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key))) diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 259c6cb600..e94de6dad1 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -211,6 +211,25 @@ func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error { return nil } +func (t *tracedStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + ctx, span := t.tracer.Start(ctx, "Store.BatchMetadata", + trace.WithAttributes( + attribute.Int("puts", len(puts)), + attribute.Int("deletes", len(deletes)), + ), + ) + defer span.End() + + err := t.inner.BatchMetadata(ctx, puts, deletes) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { ctx, span := t.tracer.Start(ctx, "Store.DeleteStateAtHeight", trace.WithAttributes(attribute.Int64("height", int64(height))), diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 3ae8d8902e..66211a2624 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -28,6 +28,7 @@ type tracingMockStore struct { getMetadataFn func(ctx context.Context, key string) ([]byte, error) setMetadataFn func(ctx context.Context, key string, value []byte) error + batchMetadataFn func(ctx context.Context, puts []MetadataKV, deletes []string) error deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error pruneBlocksFn func(ctx context.Context, height uint64) error @@ -105,6 +106,13 @@ func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value [] return nil } +func (m *tracingMockStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + if m.batchMetadataFn != nil { + return m.batchMetadataFn(ctx, puts, deletes) + } + return nil +} + func (m *tracingMockStore) DeleteMetadata(ctx context.Context, key string) error { if m.deleteMetadataFn != nil { return m.deleteMetadataFn(ctx, key) diff --git a/pkg/store/types.go b/pkg/store/types.go index b1b1f2bd5e..d37b5debf9 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -54,6 +54,12 @@ type Store interface { NewBatch(ctx context.Context) (Batch, error) } +// MetadataKV is a key-value pair for batched metadata operations. +type MetadataKV struct { + Key string + Value []byte +} + type Metadata interface { // SetMetadata saves arbitrary value in the store. // @@ -62,6 +68,10 @@ type Metadata interface { // DeleteMetadata removes a metadata key from the store. DeleteMetadata(ctx context.Context, key string) error + + // BatchMetadata writes and deletes metadata keys in a single Badger + // WriteBatch transaction, reducing contention on the write pipeline. + BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error } type Reader interface { diff --git a/test/mocks/store.go b/test/mocks/store.go index 911832ebab..75024718a5 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -39,6 +39,69 @@ func (_m *MockStore) EXPECT() *MockStore_Expecter { return &MockStore_Expecter{mock: &_m.Mock} } +// BatchMetadata provides a mock function for the type MockStore +func (_mock *MockStore) BatchMetadata(ctx context.Context, puts []store.MetadataKV, deletes []string) error { + ret := _mock.Called(ctx, puts, deletes) + + if len(ret) == 0 { + panic("no return value specified for BatchMetadata") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []store.MetadataKV, []string) error); ok { + r0 = returnFunc(ctx, puts, deletes) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_BatchMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchMetadata' +type MockStore_BatchMetadata_Call struct { + *mock.Call +} + +// BatchMetadata is a helper method to define mock.On call +// - ctx context.Context +// - puts []store.MetadataKV +// - deletes []string +func (_e *MockStore_Expecter) BatchMetadata(ctx interface{}, puts interface{}, deletes interface{}) *MockStore_BatchMetadata_Call { + return &MockStore_BatchMetadata_Call{Call: _e.mock.On("BatchMetadata", ctx, puts, deletes)} +} + +func (_c *MockStore_BatchMetadata_Call) Run(run func(ctx context.Context, puts []store.MetadataKV, deletes []string)) *MockStore_BatchMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []store.MetadataKV + if args[1] != nil { + arg1 = args[1].([]store.MetadataKV) + } + var arg2 []string + if args[2] != nil { + arg2 = args[2].([]string) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockStore_BatchMetadata_Call) Return(err error) *MockStore_BatchMetadata_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_BatchMetadata_Call) RunAndReturn(run func(ctx context.Context, puts []store.MetadataKV, deletes []string) error) *MockStore_BatchMetadata_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function for the type MockStore func (_mock *MockStore) Close() error { ret := _mock.Called()