Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changes

- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298)
- Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299)

## v1.1.1
Expand Down
115 changes: 113 additions & 2 deletions pkg/store/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package store

import (
"context"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/types"
)
Expand All @@ -14,15 +17,37 @@ const (

// DefaultBlockDataCacheSize is the default number of block data entries to cache.
DefaultBlockDataCacheSize = 200_000

asyncWriteBufferSize = 8192

// batchWindow is the time the write goroutine waits after receiving the first
// op before flushing. This allows bursts of metadata writes (e.g. 3-4 per
// height in the submitter) to be coalesced into a single Badger WriteBatch.
batchWindow = 100 * time.Microsecond
)

type asyncWriteOp struct {
key string
value []byte
isDelete bool
}

// CachedStore wraps a Store with LRU caching for frequently accessed data.
// The underlying LRU cache is thread-safe, so no additional synchronization is needed.
// Metadata writes (SetMetadata, DeleteMetadata) are processed asynchronously via a
// buffered channel to avoid blocking Badger's write pipeline for critical operations
// like block production (batch commits).
type CachedStore struct {
Store

headerCache *lru.Cache[uint64, *types.SignedHeader]
blockDataCache *lru.Cache[uint64, *blockDataEntry]

writeCh chan asyncWriteOp
done chan struct{}
stopMu sync.RWMutex
stopped bool
logger zerolog.Logger
}

type blockDataEntry struct {
Expand Down Expand Up @@ -73,6 +98,9 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
Store: store,
headerCache: headerCache,
blockDataCache: blockDataCache,
writeCh: make(chan asyncWriteOp, asyncWriteBufferSize),
done: make(chan struct{}),
logger: zerolog.Nop(),
}

for _, opt := range opts {
Expand All @@ -81,9 +109,58 @@ func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error
}
}

cs.startWriteLoop()

return cs, nil
}

func (cs *CachedStore) startWriteLoop() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the performance gained through this change? also what is the performance difference with non automatic fsync on the db, we should be able to write to the db fast, it keeps it in cache then it flushes every n blocks. Are we already doing that?

go func() {
defer close(cs.done)
for op := range cs.writeCh {
ops := []asyncWriteOp{op}

timer := time.NewTimer(batchWindow)
collect:
for {
select {
case op, ok := <-cs.writeCh:
if !ok {
timer.Stop()
break collect
Comment on lines +124 to +130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace this with a separate function to avoid using a goto?

}
ops = append(ops, op)
case <-timer.C:
break collect
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

last := make(map[string]asyncWriteOp, len(ops))
for _, o := range ops {
last[o.key] = o
}

var puts []MetadataKV
var deletes []string
for _, o := range last {
if o.isDelete {
deletes = append(deletes, o.key)
} else {
puts = append(puts, MetadataKV{Key: o.key, Value: o.value})
}
}

if err := cs.BatchMetadata(context.Background(), puts, deletes); err != nil {
for _, o := range ops {
cs.logger.Error().Err(err).Str("key", o.key).
Bool("delete", o.isDelete).
Msg("async metadata batch write failed")
}
}
}
}()
}

// GetHeader returns the header at the given height, using the cache if available.
func (cs *CachedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) {
// Try cache first
Expand Down Expand Up @@ -162,7 +239,7 @@ func (cs *CachedStore) Rollback(ctx context.Context, height uint64, aggregator b
}

// PruneBlocks wraps the underlying store's PruneBlocks and invalidates caches
// up to the heigh that we purne
// up to the height that we prune
func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
if err := cs.Store.PruneBlocks(ctx, height); err != nil {
return err
Expand All @@ -173,8 +250,42 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error {
return nil
}

// Close closes the underlying store.
// SetMetadata queues an asynchronous metadata write. The write is persisted
// by the background goroutine via BatchMetadata. If the store has been stopped,
// the write falls back to synchronous execution on the underlying store.
func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
cs.stopMu.RLock()
defer cs.stopMu.RUnlock()

if cs.stopped {
return cs.Store.SetMetadata(ctx, key, value)
}
valueCopy := append([]byte(nil), value...)
cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do something like this

      select {
      case cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}:
          return nil
      default:
          return cs.Store.SetMetadata(ctx, key, valueCopy)
      }

If the channel is full, this will stall and it will not be possible call Close() which aquires the same lock

Same applies for DeleteMetadata

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, see #3298 (comment), this is to keep ordering.
If this is full (which should be unlikely), the submitter / pruner will take longer, while sabing the pending keys to the db.

Ordering is important to not get stale values.

return nil
}
Comment on lines +256 to +266
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking send contradicts PR description's backpressure fallback.

The PR description states "When the buffer is full, operations fall back to synchronous writes (backpressure)". However, the current implementation uses a blocking send (cs.writeCh <- asyncWriteOp{...}) which will block the caller indefinitely if the 8192-slot buffer is full, rather than falling back to synchronous writes.

If the fallback behavior is intended, use a non-blocking send:

Proposed fix for fallback behavior
 func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
 	cs.stopMu.RLock()
 	defer cs.stopMu.RUnlock()

 	if cs.stopped {
 		return cs.Store.SetMetadata(ctx, key, value)
 	}
 	valueCopy := append([]byte(nil), value...)
-	cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
-	return nil
+	select {
+	case cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}:
+		return nil
+	default:
+		return cs.Store.SetMetadata(ctx, key, value)
+	}
 }

If blocking is intentional (for guaranteed ordering), update the PR description to reflect this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store.go` around lines 256 - 266, The current
CachedStore.SetMetadata uses a blocking send to cs.writeCh (asyncWriteOp) which
contradicts the PR description's required backpressure fallback; change the send
to a non-blocking select that attempts to enqueue the asyncWriteOp to writeCh
and, if the channel is full, synchronously call the underlying
Store.SetMetadata(ctx, key, valueCopy) instead (preserve stopMu read lock and
the stopped check, and keep copying value into valueCopy before either enqueuing
or falling back).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is intentional.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienrbrt, understood — the blocking send is intentional to ensure ordered and guaranteed delivery of metadata write operations to the background goroutine. I'll note this for future reviews.

( ˘•ω•˘ )


✏️ Learnings added
Learnt from: julienrbrt
Repo: evstack/ev-node PR: 3298
File: pkg/store/cached_store.go:256-266
Timestamp: 2026-04-28T21:47:13.754Z
Learning: In `evstack/ev-node`, `CachedStore.SetMetadata` and `CachedStore.DeleteMetadata` in `pkg/store/cached_store.go` use a **blocking** send to `cs.writeCh` intentionally. Despite the PR description mentioning a "backpressure fallback to synchronous writes", the blocking behavior is a deliberate design choice for guaranteed ordering. Do not flag the blocking channel send as a bug or suggest replacing it with a non-blocking select fallback.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: alpe
Repo: evstack/ev-node PR: 3131
File: block/internal/syncing/syncer_backoff_test.go:241-245
Timestamp: 2026-03-06T09:40:36.029Z
Learning: In evstack/ev-node, the module declares go 1.25.6. Since Go 1.22, loop variables are per-iteration by default, so loop variable capture is not a concern. Do not flag or fix loop variable capture in this codebase for any Go files; this rule applies to all Go files in the repository, not just the specific test file.


// DeleteMetadata queues an asynchronous metadata delete. If the store has been
// stopped, the delete falls back to synchronous execution.
func (cs *CachedStore) DeleteMetadata(ctx context.Context, key string) error {
cs.stopMu.RLock()
defer cs.stopMu.RUnlock()

if cs.stopped {
return cs.Store.DeleteMetadata(ctx, key)
}
cs.writeCh <- asyncWriteOp{key: key, isDelete: true}
return nil
}

// Close drains pending async writes, then closes the underlying store.
func (cs *CachedStore) Close() error {
cs.stopMu.Lock()
cs.stopped = true
close(cs.writeCh)
cs.stopMu.Unlock()
<-cs.done

cs.ClearCache()
return cs.Store.Close()
}
130 changes: 130 additions & 0 deletions pkg/store/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package store

import (
"context"
"fmt"
"testing"
"time"

ds "github.com/ipfs/go-datastore"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -270,3 +274,129 @@ func TestCachedStore_Close(t *testing.T) {
err = cachedStore.Close()
require.NoError(t, err)
}

func TestCachedStore_AsyncSetMetadata(t *testing.T) {
t.Parallel()
ctx := context.Background()

kv, err := NewTestInMemoryKVStore()
require.NoError(t, err)

base := New(kv)
cs, err := NewCachedStore(base)
require.NoError(t, err)
t.Cleanup(func() { cs.Close() })

require.NoError(t, cs.SetMetadata(ctx, "key1", []byte("value1")))

require.Eventually(t, func() bool {
v, err := base.GetMetadata(ctx, "key1")
return err == nil && string(v) == "value1"
}, time.Second, 10*time.Millisecond)
}

func TestCachedStore_AsyncDeleteMetadata(t *testing.T) {
t.Parallel()
ctx := context.Background()

kv, err := NewTestInMemoryKVStore()
require.NoError(t, err)

base := New(kv)
require.NoError(t, base.SetMetadata(ctx, "key1", []byte("value1")))

cs, err := NewCachedStore(base)
require.NoError(t, err)
t.Cleanup(func() { cs.Close() })

require.NoError(t, cs.DeleteMetadata(ctx, "key1"))

require.Eventually(t, func() bool {
_, err := base.GetMetadata(ctx, "key1")
return err != nil
}, time.Second, 10*time.Millisecond)
}

func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
ctx := context.Background()

dir := t.TempDir()
kv, err := NewDefaultKVStore(dir, "", "test-db")
require.NoError(t, err)

base := New(kv)
cs, err := NewCachedStore(base)
require.NoError(t, err)

const n = 100
for i := range n {
k := fmt.Sprintf("key-%d", i)
require.NoError(t, cs.SetMetadata(ctx, k, []byte(k)))
}

require.NoError(t, cs.Close())

kv2, err := NewDefaultKVStore(dir, "", "test-db")
require.NoError(t, err)
t.Cleanup(func() { kv2.Close() })
reopened := New(kv2)

for i := range n {
k := fmt.Sprintf("key-%d", i)
v, err := reopened.GetMetadata(ctx, k)
require.NoError(t, err)
require.Equal(t, []byte(k), v)
}
}

func TestCachedStore_WriteAfterClose_FallsBack(t *testing.T) {
kv, err := NewTestInMemoryKVStore()
require.NoError(t, err)

base := New(kv)
cs, err := NewCachedStore(base)
require.NoError(t, err)

ctx := context.Background()
require.NoError(t, cs.SetMetadata(ctx, "before", []byte("ok")))

require.NoError(t, cs.Close())

err = cs.SetMetadata(ctx, "after", []byte("sync"))
require.Error(t, err)
}

func TestCachedStore_CoalescesSameKeyOps(t *testing.T) {
ctx := context.Background()

kv, err := NewTestInMemoryKVStore()
require.NoError(t, err)

require.NoError(t, kv.Put(ctx, ds.NewKey(GetMetaKey("k")), []byte("original")))

base := New(kv)

writeCh := make(chan asyncWriteOp, asyncWriteBufferSize)
done := make(chan struct{})
cs := &CachedStore{
Store: base,
writeCh: writeCh,
done: done,
logger: zerolog.Nop(),
}
cs.startWriteLoop()

require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v1")))
require.NoError(t, cs.DeleteMetadata(ctx, "k"))
require.NoError(t, cs.SetMetadata(ctx, "k", []byte("v2")))

cs.stopMu.Lock()
cs.stopped = true
close(writeCh)
cs.stopMu.Unlock()
<-done

v, err := base.GetMetadata(ctx, "k")
require.NoError(t, err)
require.Equal(t, []byte("v2"), v, "last write (Set) should win over delete")
}
24 changes: 24 additions & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,30 @@ func (s *DefaultStore) SetMetadata(ctx context.Context, key string, value []byte
return nil
}

func (s *DefaultStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error {
if len(puts) == 0 && len(deletes) == 0 {
return nil
}
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create metadata batch: %w", err)
}
for _, kv := range puts {
if err := batch.Put(ctx, ds.NewKey(GetMetaKey(kv.Key)), kv.Value); err != nil {
return fmt.Errorf("failed to batch-put metadata key '%s': %w", kv.Key, err)
}
}
for _, key := range deletes {
if err := batch.Delete(ctx, ds.NewKey(GetMetaKey(key))); err != nil {
return fmt.Errorf("failed to batch-delete metadata key '%s': %w", key, err)
}
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit metadata batch: %w", err)
}
return nil
}

// GetMetadata returns values stored for given key with SetMetadata.
func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, error) {
data, err := s.db.Get(ctx, ds.NewKey(GetMetaKey(key)))
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,25 @@ func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error {
return nil
}

func (t *tracedStore) BatchMetadata(ctx context.Context, puts []MetadataKV, deletes []string) error {
ctx, span := t.tracer.Start(ctx, "Store.BatchMetadata",
trace.WithAttributes(
attribute.Int("puts", len(puts)),
attribute.Int("deletes", len(deletes)),
),
)
defer span.End()

err := t.inner.BatchMetadata(ctx, puts, deletes)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (t *tracedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error {
ctx, span := t.tracer.Start(ctx, "Store.DeleteStateAtHeight",
trace.WithAttributes(attribute.Int64("height", int64(height))),
Expand Down
Loading
Loading