From f7ea4ad43b6b9da9762e6f3a5a4746281152f5bd Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 17:20:24 +0200 Subject: [PATCH 1/7] perf(store): save metadata async --- pkg/store/cached_store.go | 88 +++++++++++++++++++++++++++++++++- pkg/store/cached_store_test.go | 84 ++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 1 deletion(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 3c7af2f611..22b8bcecc7 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -2,8 +2,10 @@ package store import ( "context" + "sync" lru "github.com/hashicorp/golang-lru/v2" + "github.com/rs/zerolog" "github.com/evstack/ev-node/types" ) @@ -14,15 +16,32 @@ const ( // DefaultBlockDataCacheSize is the default number of block data entries to cache. DefaultBlockDataCacheSize = 200_000 + + asyncWriteBufferSize = 8192 ) +type asyncWriteOp struct { + key string + value []byte + isDelete bool +} + // CachedStore wraps a Store with LRU caching for frequently accessed data. // The underlying LRU cache is thread-safe, so no additional synchronization is needed. +// Metadata writes (SetMetadata, DeleteMetadata) are processed asynchronously via a +// buffered channel to avoid blocking Badger's write pipeline for critical operations +// like block production (batch commits). type CachedStore struct { Store headerCache *lru.Cache[uint64, *types.SignedHeader] blockDataCache *lru.Cache[uint64, *blockDataEntry] + + writeCh chan asyncWriteOp + done chan struct{} + stopMu sync.RWMutex + stopped bool + logger zerolog.Logger } type blockDataEntry struct { @@ -73,6 +92,9 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error Store: store, headerCache: headerCache, blockDataCache: blockDataCache, + writeCh: make(chan asyncWriteOp, asyncWriteBufferSize), + done: make(chan struct{}), + logger: zerolog.Nop(), } for _, opt := range opts { @@ -81,9 +103,30 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error } } + cs.startWriteLoop() + return cs, nil } +func (cs *CachedStore) startWriteLoop() { + go func() { + defer close(cs.done) + for op := range cs.writeCh { + var err error + if op.isDelete { + err = cs.Store.DeleteMetadata(context.Background(), op.key) + } else { + err = cs.Store.SetMetadata(context.Background(), op.key, op.value) + } + if err != nil { + cs.logger.Error().Err(err).Str("key", op.key). + Bool("delete", op.isDelete). + Msg("async metadata write failed") + } + } + }() +} + // GetHeader returns the header at the given height, using the cache if available. func (cs *CachedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { // Try cache first @@ -173,8 +216,51 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } -// Close closes the underlying store. +// SetMetadata queues an asynchronous metadata write. The write is persisted +// by the background goroutine. If the buffer is full or the store has been +// stopped, the write falls back to synchronous execution on the underlying store. +func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error { + cs.stopMu.RLock() + if cs.stopped { + cs.stopMu.RUnlock() + return cs.Store.SetMetadata(ctx, key, value) + } + select { + case cs.writeCh <- asyncWriteOp{key: key, value: value}: + cs.stopMu.RUnlock() + return nil + default: + cs.stopMu.RUnlock() + return cs.Store.SetMetadata(ctx, key, value) + } +} + +// DeleteMetadata queues an asynchronous metadata delete. If the buffer is full +// or the store has been stopped, the delete falls back to synchronous execution. +func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error { + cs.stopMu.RLock() + if cs.stopped { + cs.stopMu.RUnlock() + return cs.Store.DeleteMetadata(ctx, key) + } + select { + case cs.writeCh <- asyncWriteOp{key: key, isDelete: true}: + cs.stopMu.RUnlock() + return nil + default: + cs.stopMu.RUnlock() + return cs.Store.DeleteMetadata(ctx, key) + } +} + +// Close drains pending async writes, then closes the underlying store. func (cs *CachedStore) Close() error { + cs.stopMu.Lock() + cs.stopped = true + close(cs.writeCh) + cs.stopMu.Unlock() + <-cs.done + cs.ClearCache() return cs.Store.Close() } diff --git a/pkg/store/cached_store_test.go b/pkg/store/cached_store_test.go index 2579eb48a6..5d62a2b25e 100644 --- a/pkg/store/cached_store_test.go +++ b/pkg/store/cached_store_test.go @@ -3,6 +3,7 @@ package store import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -270,3 +271,86 @@ func TestCachedStore_Close(t *testing.T) { err = cachedStore.Close() require.NoError(t, err) } + +func TestCachedStore_AsyncSetMetadata(t *testing.T) { + t.Parallel() + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + require.NoError(t, cs.SetMetadata(ctx, "key1", []byte("value1"))) + + require.Eventually(t, func() bool { + v, err := base.GetMetadata(ctx, "key1") + return err == nil && string(v) == "value1" + }, time.Second, 10*time.Millisecond) +} + +func TestCachedStore_AsyncDeleteMetadata(t *testing.T) { + t.Parallel() + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + require.NoError(t, base.SetMetadata(ctx, "key1", []byte("value1"))) + + cs, err := NewCachedStore(base) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + require.NoError(t, cs.DeleteMetadata(ctx, "key1")) + + require.Eventually(t, func() bool { + _, err := base.GetMetadata(ctx, "key1") + return err != nil + }, time.Second, 10*time.Millisecond) +} + +func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + + ctx := context.Background() + const n = 100 + for i := 0; i < n; i++ { + k := []byte{byte(i)} + require.NoError(t, cs.SetMetadata(ctx, string(k), k)) + } + + // Wait for the last key to land via pass-through read + require.Eventually(t, func() bool { + v, err := cs.GetMetadata(ctx, string([]byte{byte(n - 1)})) + return err == nil && len(v) == 1 && v[0] == byte(n-1) + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, cs.Close()) +} + +func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) { + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + base := New(kv) + cs, err := NewCachedStore(base) + require.NoError(t, err) + + ctx := context.Background() + require.NoError(t, cs.SetMetadata(ctx, "before", []byte("ok"))) + + require.NoError(t, cs.Close()) + + err = cs.SetMetadata(ctx, "after", []byte("sync")) + require.Error(t, err) +} From af7e3265df400398a56a94182cfeaad117e907d0 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 17:35:52 +0200 Subject: [PATCH 2/7] cl --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0097fb829b..7b3ac226a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) +- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298) ## v1.1.1 From 6c53bc4c94462059863b11346b91c46920080900 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 19:27:25 +0200 Subject: [PATCH 3/7] Optimize metadata writes with batching --- pkg/store/cached_store.go | 35 ++++++++++++++++------ pkg/store/store.go | 24 +++++++++++++++ pkg/store/tracing.go | 19 ++++++++++++ pkg/store/tracing_test.go | 8 +++++ pkg/store/types.go | 10 +++++++ test/mocks/store.go | 63 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 150 insertions(+), 9 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 22b8bcecc7..8fc56865c1 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -112,16 +112,33 @@ func (cs *CachedStore) startWriteLoop() { go func() { defer close(cs.done) for op := range cs.writeCh { - var err error - if op.isDelete { - err = cs.Store.DeleteMetadata(context.Background(), op.key) - } else { - err = cs.Store.SetMetadata(context.Background(), op.key, op.value) + ops := []asyncWriteOp{op} + drain: + for { + select { + case op := <-cs.writeCh: + ops = append(ops, op) + default: + break drain + } } - if err != nil { - cs.logger.Error().Err(err).Str("key", op.key). - Bool("delete", op.isDelete). - Msg("async metadata write failed") + + var puts []MetadataKV + var deletes []string + for _, o := range ops { + if o.isDelete { + deletes = append(deletes, o.key) + } else { + puts = append(puts, MetadataKV{Key: o.key, Value: o.value}) + } + } + + if err := cs.Store.BatchMetadata(context.Background(), puts, deletes); err != nil { + for _, o := range ops { + cs.logger.Error().Err(err).Str("key", o.key). + Bool("delete", o.isDelete). + Msg("async metadata batch write failed") + } } } }() diff --git a/pkg/store/store.go b/pkg/store/store.go index 975db4e163..4c045d1d89 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -190,6 +190,30 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte return nil } +func (s *DefaultStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + if len(puts) == 0 && len(deletes) == 0 { + return nil + } + batch, err := s.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create metadata batch: %w", err) + } + for _, kv := range puts { + if err := batch.Put(ctx, ds.NewKey(GetMetaKey(kv.Key)), kv.Value); err != nil { + return fmt.Errorf("failed to batch-put metadata key '%s': %w", kv.Key, err) + } + } + for _, key := range deletes { + if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(key))); err != nil { + return fmt.Errorf("failed to batch-delete metadata key '%s': %w", key, err) + } + } + if err := batch.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit metadata batch: %w", err) + } + return nil +} + // GetMetadata returns values stored for given key with SetMetadata. func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key))) diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 259c6cb600..e94de6dad1 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -211,6 +211,25 @@ func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error { return nil } +func (t *tracedStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + ctx, span := t.tracer.Start(ctx, "Store.BatchMetadata", + trace.WithAttributes( + attribute.Int("puts", len(puts)), + attribute.Int("deletes", len(deletes)), + ), + ) + defer span.End() + + err := t.inner.BatchMetadata(ctx, puts, deletes) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { ctx, span := t.tracer.Start(ctx, "Store.DeleteStateAtHeight", trace.WithAttributes(attribute.Int64("height", int64(height))), diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 3ae8d8902e..66211a2624 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -28,6 +28,7 @@ type tracingMockStore struct { getMetadataFn func(ctx context.Context, key string) ([]byte, error) setMetadataFn func(ctx context.Context, key string, value []byte) error + batchMetadataFn func(ctx context.Context, puts []MetadataKV, deletes []string) error deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error pruneBlocksFn func(ctx context.Context, height uint64) error @@ -105,6 +106,13 @@ func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value [] return nil } +func (m *tracingMockStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error { + if m.batchMetadataFn != nil { + return m.batchMetadataFn(ctx, puts, deletes) + } + return nil +} + func (m *tracingMockStore) DeleteMetadata(ctx context.Context, key string) error { if m.deleteMetadataFn != nil { return m.deleteMetadataFn(ctx, key) diff --git a/pkg/store/types.go b/pkg/store/types.go index b1b1f2bd5e..d37b5debf9 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -54,6 +54,12 @@ type Store interface { NewBatch(ctx context.Context) (Batch, error) } +// MetadataKV is a key-value pair for batched metadata operations. +type MetadataKV struct { + Key string + Value []byte +} + type Metadata interface { // SetMetadata saves arbitrary value in the store. // @@ -62,6 +68,10 @@ type Metadata interface { // DeleteMetadata removes a metadata key from the store. DeleteMetadata(ctx context.Context, key string) error + + // BatchMetadata writes and deletes metadata keys in a single Badger + // WriteBatch transaction, reducing contention on the write pipeline. + BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error } type Reader interface { diff --git a/test/mocks/store.go b/test/mocks/store.go index 911832ebab..75024718a5 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -39,6 +39,69 @@ func (_m *MockStore) EXPECT() *MockStore_Expecter { return &MockStore_Expecter{mock: &_m.Mock} } +// BatchMetadata provides a mock function for the type MockStore +func (_mock *MockStore) BatchMetadata(ctx context.Context, puts []store.MetadataKV, deletes []string) error { + ret := _mock.Called(ctx, puts, deletes) + + if len(ret) == 0 { + panic("no return value specified for BatchMetadata") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []store.MetadataKV, []string) error); ok { + r0 = returnFunc(ctx, puts, deletes) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_BatchMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchMetadata' +type MockStore_BatchMetadata_Call struct { + *mock.Call +} + +// BatchMetadata is a helper method to define mock.On call +// - ctx context.Context +// - puts []store.MetadataKV +// - deletes []string +func (_e *MockStore_Expecter) BatchMetadata(ctx interface{}, puts interface{}, deletes interface{}) *MockStore_BatchMetadata_Call { + return &MockStore_BatchMetadata_Call{Call: _e.mock.On("BatchMetadata", ctx, puts, deletes)} +} + +func (_c *MockStore_BatchMetadata_Call) Run(run func(ctx context.Context, puts []store.MetadataKV, deletes []string)) *MockStore_BatchMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []store.MetadataKV + if args[1] != nil { + arg1 = args[1].([]store.MetadataKV) + } + var arg2 []string + if args[2] != nil { + arg2 = args[2].([]string) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockStore_BatchMetadata_Call) Return(err error) *MockStore_BatchMetadata_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_BatchMetadata_Call) RunAndReturn(run func(ctx context.Context, puts []store.MetadataKV, deletes []string) error) *MockStore_BatchMetadata_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function for the type MockStore func (_mock *MockStore) Close() error { ret := _mock.Called() From aca2ec76273f98eaa05e19ed9883e2ebccd05f6c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 19:57:15 +0200 Subject: [PATCH 4/7] feedback --- pkg/store/cached_store.go | 53 +++++++++++++++++++--------------- pkg/store/cached_store_test.go | 40 ++++++++++++++++++------- 2 files changed, 58 insertions(+), 35 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 8fc56865c1..a8265433e4 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -3,6 +3,7 @@ package store import ( "context" "sync" + "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/rs/zerolog" @@ -18,6 +19,11 @@ const ( DefaultBlockDataCacheSize = 200_000 asyncWriteBufferSize = 8192 + + // batchWindow is the time the write goroutine waits after receiving the first + // op before flushing. This allows bursts of metadata writes (e.g. 3-4 per + // height in the submitter) to be coalesced into a single Badger WriteBatch. + batchWindow = 100 * time.Microsecond ) type asyncWriteOp struct { @@ -113,13 +119,19 @@ func (cs *CachedStore) startWriteLoop() { defer close(cs.done) for op := range cs.writeCh { ops := []asyncWriteOp{op} - drain: + + timer := time.NewTimer(batchWindow) + collect: for { select { - case op := <-cs.writeCh: + case op, ok := <-cs.writeCh: + if !ok { + timer.Stop() + break collect + } ops = append(ops, op) - default: - break drain + case <-timer.C: + break collect } } @@ -234,40 +246,33 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { } // SetMetadata queues an asynchronous metadata write. The write is persisted -// by the background goroutine. If the buffer is full or the store has been -// stopped, the write falls back to synchronous execution on the underlying store. +// by the background goroutine via BatchMetadata. If the store has been stopped, +// the write falls back to synchronous execution on the underlying store. func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error { cs.stopMu.RLock() if cs.stopped { cs.stopMu.RUnlock() return cs.Store.SetMetadata(ctx, key, value) } - select { - case cs.writeCh <- asyncWriteOp{key: key, value: value}: - cs.stopMu.RUnlock() - return nil - default: - cs.stopMu.RUnlock() - return cs.Store.SetMetadata(ctx, key, value) - } + cs.stopMu.RUnlock() + + valueCopy := append([]byte(nil), value...) + cs.writeCh <- asyncWriteOp{key: key, value: valueCopy} + return nil } -// DeleteMetadata queues an asynchronous metadata delete. If the buffer is full -// or the store has been stopped, the delete falls back to synchronous execution. +// DeleteMetadata queues an asynchronous metadata delete. If the store has been +// stopped, the delete falls back to synchronous execution. func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error { cs.stopMu.RLock() if cs.stopped { cs.stopMu.RUnlock() return cs.Store.DeleteMetadata(ctx, key) } - select { - case cs.writeCh <- asyncWriteOp{key: key, isDelete: true}: - cs.stopMu.RUnlock() - return nil - default: - cs.stopMu.RUnlock() - return cs.Store.DeleteMetadata(ctx, key) - } + cs.stopMu.RUnlock() + + cs.writeCh <- asyncWriteOp{key: key, isDelete: true} + return nil } // Close drains pending async writes, then closes the underlying store. diff --git a/pkg/store/cached_store_test.go b/pkg/store/cached_store_test.go index 5d62a2b25e..d02a0a4e61 100644 --- a/pkg/store/cached_store_test.go +++ b/pkg/store/cached_store_test.go @@ -2,9 +2,11 @@ package store import ( "context" + "fmt" "testing" "time" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -315,27 +317,43 @@ func TestCachedStore_AsyncDeleteMetadata(t *testing.T) { } func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { + ctx := context.Background() + kv, err := NewTestInMemoryKVStore() require.NoError(t, err) base := New(kv) - cs, err := NewCachedStore(base) - require.NoError(t, err) - ctx := context.Background() + writeCh := make(chan asyncWriteOp, asyncWriteBufferSize) + done := make(chan struct{}) + + cs := &CachedStore{ + Store: base, + writeCh: writeCh, + done: done, + logger: zerolog.Nop(), + } + + cs.startWriteLoop() + const n = 100 for i := 0; i < n; i++ { - k := []byte{byte(i)} - require.NoError(t, cs.SetMetadata(ctx, string(k), k)) + k := fmt.Sprintf("key-%d", i) + require.NoError(t, cs.SetMetadata(ctx, k, []byte(k))) } - // Wait for the last key to land via pass-through read - require.Eventually(t, func() bool { - v, err := cs.GetMetadata(ctx, string([]byte{byte(n - 1)})) - return err == nil && len(v) == 1 && v[0] == byte(n-1) - }, 2*time.Second, 10*time.Millisecond) + cs.stopMu.Lock() + cs.stopped = true + close(writeCh) + cs.stopMu.Unlock() + <-done - require.NoError(t, cs.Close()) + for i := 0; i < n; i++ { + k := fmt.Sprintf("key-%d", i) + v, err := base.GetMetadata(ctx, k) + require.NoError(t, err) + require.Equal(t, []byte(k), v) + } } func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) { From 2d3dfd7336d8d327389179da0ebb670550265f73 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 20:00:26 +0200 Subject: [PATCH 5/7] De-duplicate batched writes by key in cached store --- pkg/store/cached_store.go | 11 +++++++--- pkg/store/cached_store_test.go | 40 ++++++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index a8265433e4..142c8ee4d3 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -121,7 +121,7 @@ func (cs *CachedStore) startWriteLoop() { ops := []asyncWriteOp{op} timer := time.NewTimer(batchWindow) - collect: + collect: for { select { case op, ok := <-cs.writeCh: @@ -135,9 +135,14 @@ func (cs *CachedStore) startWriteLoop() { } } + last := make(map[string]asyncWriteOp, len(ops)) + for _, o := range ops { + last[o.key] = o + } + var puts []MetadataKV var deletes []string - for _, o := range ops { + for _, o := range last { if o.isDelete { deletes = append(deletes, o.key) } else { @@ -145,7 +150,7 @@ func (cs *CachedStore) startWriteLoop() { } } - if err := cs.Store.BatchMetadata(context.Background(), puts, deletes); err != nil { + if err := cs.BatchMetadata(context.Background(), puts, deletes); err != nil { for _, o := range ops { cs.logger.Error().Err(err).Str("key", o.key). Bool("delete", o.isDelete). diff --git a/pkg/store/cached_store_test.go b/pkg/store/cached_store_test.go index d02a0a4e61..6f0ab57236 100644 --- a/pkg/store/cached_store_test.go +++ b/pkg/store/cached_store_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -337,7 +338,7 @@ func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { cs.startWriteLoop() const n = 100 - for i := 0; i < n; i++ { + for i := range n { k := fmt.Sprintf("key-%d", i) require.NoError(t, cs.SetMetadata(ctx, k, []byte(k))) } @@ -348,7 +349,7 @@ func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { cs.stopMu.Unlock() <-done - for i := 0; i < n; i++ { + for i := range n { k := fmt.Sprintf("key-%d", i) v, err := base.GetMetadata(ctx, k) require.NoError(t, err) @@ -372,3 +373,38 @@ func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) { err = cs.SetMetadata(ctx, "after", []byte("sync")) require.Error(t, err) } + +func TestCachedStore_CoalescesSameKeyOps(t *testing.T) { + ctx := context.Background() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + require.NoError(t, kv.Put(ctx, ds.NewKey(GetMetaKey("k")), []byte("original"))) + + base := New(kv) + + writeCh := make(chan asyncWriteOp, asyncWriteBufferSize) + done := make(chan struct{}) + cs := &CachedStore{ + Store: base, + writeCh: writeCh, + done: done, + logger: zerolog.Nop(), + } + cs.startWriteLoop() + + require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v1"))) + require.NoError(t, cs.DeleteMetadata(ctx, "k")) + require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v2"))) + + cs.stopMu.Lock() + cs.stopped = true + close(writeCh) + cs.stopMu.Unlock() + <-done + + v, err := base.GetMetadata(ctx, "k") + require.NoError(t, err) + require.Equal(t, []byte("v2"), v, "last write (Set) should win over delete") +} From bd849026d03a72d1708a5985168f9d298b99ab27 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 28 Apr 2026 23:26:16 +0200 Subject: [PATCH 6/7] fix --- pkg/store/cached_store.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 142c8ee4d3..527fd9004b 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -255,12 +255,11 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { // the write falls back to synchronous execution on the underlying store. func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error { cs.stopMu.RLock() + defer cs.stopMu.RUnlock() + if cs.stopped { - cs.stopMu.RUnlock() return cs.Store.SetMetadata(ctx, key, value) } - cs.stopMu.RUnlock() - valueCopy := append([]byte(nil), value...) cs.writeCh <- asyncWriteOp{key: key, value: valueCopy} return nil @@ -270,12 +269,11 @@ func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte // stopped, the delete falls back to synchronous execution. func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error { cs.stopMu.RLock() + defer cs.stopMu.RUnlock() + if cs.stopped { - cs.stopMu.RUnlock() return cs.Store.DeleteMetadata(ctx, key) } - cs.stopMu.RUnlock() - cs.writeCh <- asyncWriteOp{key: key, isDelete: true} return nil } From c24f901c07af38b463a4e12e8571699ffb93c64d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 29 Apr 2026 10:49:20 +0200 Subject: [PATCH 7/7] updates --- pkg/store/cached_store.go | 2 +- pkg/store/cached_store_test.go | 30 +++++++++++------------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 527fd9004b..39031ccde3 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -239,7 +239,7 @@ func (cs *CachedStore) Rollback(ctx context.Context, height uint64, aggregator b } // PruneBlocks wraps the underlying store's PruneBlocks and invalidates caches -// up to the heigh that we purne +// up to the height that we prune func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { if err := cs.Store.PruneBlocks(ctx, height); err != nil { return err diff --git a/pkg/store/cached_store_test.go b/pkg/store/cached_store_test.go index 6f0ab57236..432d90f035 100644 --- a/pkg/store/cached_store_test.go +++ b/pkg/store/cached_store_test.go @@ -320,22 +320,13 @@ func TestCachedStore_AsyncDeleteMetadata(t *testing.T) { func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { ctx := context.Background() - kv, err := NewTestInMemoryKVStore() + dir := t.TempDir() + kv, err := NewDefaultKVStore(dir, "", "test-db") require.NoError(t, err) base := New(kv) - - writeCh := make(chan asyncWriteOp, asyncWriteBufferSize) - done := make(chan struct{}) - - cs := &CachedStore{ - Store: base, - writeCh: writeCh, - done: done, - logger: zerolog.Nop(), - } - - cs.startWriteLoop() + cs, err := NewCachedStore(base) + require.NoError(t, err) const n = 100 for i := range n { @@ -343,15 +334,16 @@ func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) { require.NoError(t, cs.SetMetadata(ctx, k, []byte(k))) } - cs.stopMu.Lock() - cs.stopped = true - close(writeCh) - cs.stopMu.Unlock() - <-done + require.NoError(t, cs.Close()) + + kv2, err := NewDefaultKVStore(dir, "", "test-db") + require.NoError(t, err) + t.Cleanup(func() { kv2.Close() }) + reopened := New(kv2) for i := range n { k := fmt.Sprintf("key-%d", i) - v, err := base.GetMetadata(ctx, k) + v, err := reopened.GetMetadata(ctx, k) require.NoError(t, err) require.Equal(t, []byte(k), v) }