Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 5 additions & 32 deletions src/lean_spec/subspecs/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ class SyncService:
Ignored on non-aggregator nodes, which never import gossip attestations.
"""

process_block: Callable[[Store, SignedBlock], Store] | None = field(default=None)
"""Block processor function. Defaults to the spec's block processing."""

publish_aggregated_attestation: Callable[
[SignedAggregatedAttestation], Coroutine[None, None, None]
] = field(default=_noop_publish_agg)
Expand Down Expand Up @@ -161,18 +158,14 @@ class SyncService:
_pending_block_aggregates: list[SignedAggregatedAttestation] = field(default_factory=list)
"""Combined aggregates recovered from processed blocks.

Every processed block is deconstructed in the block wrapper, which
Every processed block is deconstructed during block processing, which
queues its combined aggregates when this node is in the aggregator
role. The gossip umbrella drains and publishes them after the store
is updated.
"""

def __post_init__(self) -> None:
"""Bind the default block processor and wire sub-components."""
# Tests can pass an explicit processor and skip this path.
if self.process_block is None:
self.process_block = self._default_process_block

"""Wire sub-components and apply the genesis-start state hint."""
self._init_components()

# Genesis validators already hold the full genesis state.
Expand All @@ -193,15 +186,14 @@ def _init_components(self) -> None:
store_view=self,
)

# The wrapper adds counter and persistence tracking around each block.
self._head_sync = HeadSync(
block_cache=self.block_cache,
backfill=self._backfill,
process_block=self._process_block_wrapper,
process_block=self.process_block,
)

def _default_process_block(self, store: Store, block: SignedBlock) -> Store:
"""Run the spec's block processor and emit forkchoice telemetry."""
def process_block(self, store: Store, block: SignedBlock) -> Store:
"""Apply a block to the store, emit telemetry, and persist when wired up."""
new_store = self.spec.on_block(store, block)

# Live chain pointers, exposed as gauges so dashboards reflect the current view.
Expand Down Expand Up @@ -247,25 +239,6 @@ def ancestors(start: Bytes32) -> set[Bytes32]:
metrics.lean_fork_choice_reorgs_total.inc()
metrics.lean_fork_choice_reorg_depth.observe(depth)

return new_store

def _process_block_wrapper(
self,
store: Store,
block: SignedBlock,
) -> Store:
"""Run the processor, bump the counter, and persist when wired up.

All block processing flows through one wrapper regardless of which
processor is configured.
"""
# Delegate to the actual block processor.
#
# The processor validates the block and updates forkchoice state.
processor = self.process_block
assert processor is not None
new_store = processor(store, block)

# Track processed blocks.
#
# We only count blocks that pass validation and update the store.
Expand Down
6 changes: 1 addition & 5 deletions tests/lean_spec/helpers/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from __future__ import annotations

from typing import Callable, NamedTuple, cast
from typing import NamedTuple, cast

from consensus_testing.keys import XmssKeyManager

Expand Down Expand Up @@ -507,15 +507,12 @@ def create_mock_sync_service(
*,
database: Database | None = None,
genesis_start: bool = False,
process_block: Callable[[Store, SignedBlock], Store] | None = None,
) -> SyncService:
"""Create a SyncService with mock dependencies for integration testing."""
mock_store = MockForkchoiceStore(head_slot=0)
peer_manager = PeerManager()
peer_manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED))

processor = process_block if process_block is not None else (lambda s, b: s.on_block(b))

return SyncService(
store=cast(Store, mock_store),
peer_manager=peer_manager,
Expand All @@ -525,5 +522,4 @@ def create_mock_sync_service(
spec=StoreInterceptingSpec(),
database=database,
genesis_start=genesis_start,
process_block=processor,
)
36 changes: 24 additions & 12 deletions tests/lean_spec/helpers/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from lean_spec.subspecs.networking import PeerId
from lean_spec.subspecs.networking.service.events import NetworkEvent
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.types import Bytes32, Slot, Uint64
from lean_spec.types import Bytes32, Checkpoint, Slot, Uint64


class StoreInterceptingSpec(LstarSpec):
Expand Down Expand Up @@ -135,17 +135,10 @@ async def publish(self, topic: str, data: bytes) -> None:

@dataclass
class _MockBlock:
"""Minimal block stub with a slot attribute for mock store lookups."""
"""Minimal block stub with slot and parent_root for mock store lookups."""

slot: Slot = field(default_factory=lambda: Slot(0))


@dataclass
class _MockCheckpoint:
"""Minimal checkpoint stub with a slot attribute for mock store access."""

root: Bytes32 = field(default_factory=Bytes32.zero)
slot: Slot = field(default_factory=lambda: Slot(0))
parent_root: Bytes32 = field(default_factory=Bytes32.zero)


class MockForkchoiceStore:
Expand All @@ -157,24 +150,35 @@ class MockForkchoiceStore:

Optional `reject_*` predicates return True to simulate validation
failure (`KeyError`), e.g. unknown attestation target.

The `on_block_*` flags let tests simulate forkchoice side-effects of
block processing (post-state indexing, justified or finalized advance)
without injecting a custom block processor.
"""

def __init__(self, head_slot: int = 0) -> None:
"""Initialize with a genesis block stub at Bytes32.zero()."""
genesis_root = Bytes32.zero()
self.blocks: dict[Bytes32, object] = {genesis_root: _MockBlock(slot=Slot(head_slot))}
self.head: Bytes32 = genesis_root
self.safe_target: Bytes32 = genesis_root
self.head_slot: Slot = Slot(head_slot)
self._attestations_received: list[SignedAttestation] = []
self._aggregated_attestations_received: list[SignedAggregatedAttestation] = []
self.validator_id = None
self.latest_justified = _MockCheckpoint()
self.latest_finalized = _MockCheckpoint()
self.latest_justified = Checkpoint(root=Bytes32.zero(), slot=Slot(0))
self.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(0))
self.states: dict[Bytes32, object] = {}
self.reject_attestation: Callable[[SignedAttestation], bool] | None = None
self.reject_aggregated_attestation: Callable[[SignedAggregatedAttestation], bool] | None = (
None
)
self.on_block_post_state: object | None = None
"""When set, on_block stores this object as states[block_root]."""
self.advance_justified_on_block: bool = False
"""When True, on_block advances latest_justified to the new block."""
self.advance_finalized_on_block: bool = False
"""When True, on_block advances latest_finalized to the new block."""

def on_block(
self,
Expand All @@ -185,7 +189,15 @@ def on_block(
root = hash_tree_root(block.block)
self.blocks[root] = block.block
self.head = root
# The mock has no real safe-target rule; head doubles as safe-target.
self.safe_target = root
self.head_slot = block.block.slot
if self.on_block_post_state is not None:
self.states[root] = self.on_block_post_state
if self.advance_justified_on_block:
self.latest_justified = Checkpoint(root=root, slot=block.block.slot)
if self.advance_finalized_on_block:
self.latest_finalized = Checkpoint(root=root, slot=block.block.slot)
return self

def on_gossip_attestation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from lean_spec.subspecs.sync.backfill_sync import BackfillSync
from lean_spec.subspecs.sync.block_cache import BlockCache
from lean_spec.subspecs.sync.head_sync import HeadSync, HeadSyncResult
from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex
from lean_spec.types import Bytes32, Checkpoint, Slot, Uint64, ValidatorIndex
from tests.lean_spec.helpers import MockForkchoiceStore, make_signed_block


Expand Down Expand Up @@ -64,7 +64,7 @@ def _store_with_head(
requested head slot so subsequent gap math sees the configured value.
"""
store = MockForkchoiceStore()
store.latest_finalized.slot = Slot(finalized_slot)
store.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(finalized_slot))

head_root = Bytes32(b"\x77" * 32)
head_block = make_signed_block(
Expand Down
45 changes: 12 additions & 33 deletions tests/lean_spec/subspecs/sync/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

from collections.abc import Callable
from types import MappingProxyType
from typing import cast

Expand All @@ -11,9 +10,7 @@

from lean_spec.forks.lstar.containers import (
SignedAggregatedAttestation,
SignedBlock,
)
from lean_spec.forks.lstar.store import Store
from lean_spec.subspecs.networking import PeerId
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.ssz.hash import hash_tree_root
Expand Down Expand Up @@ -45,30 +42,6 @@ def _signed_aggregated_attestation(key_manager: XmssKeyManager) -> SignedAggrega
return SignedAggregatedAttestation(data=attestation_data, proof=proof)


def _persist_process_block(
*,
include_post_state: bool,
prune: bool,
) -> Callable[[Store, SignedBlock], Store]:
post_state = make_genesis_state(num_validators=1)

def process_block(store: Store, block: SignedBlock) -> Store:
ms = cast(MockForkchoiceStore, store)
ms.on_block(block)
block_root = hash_tree_root(block.block)
if include_post_state:
ms.states[block_root] = post_state
slot = block.block.slot
ms.latest_justified = Checkpoint(root=block_root, slot=slot)
if prune:
ms.latest_finalized = Checkpoint(root=block_root, slot=slot)
else:
ms.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(0))
return cast(Store, ms)

return process_block


@pytest.fixture
def sync_service(peer_id: PeerId) -> SyncService:
"""Provide a complete SyncService for integration testing."""
Expand Down Expand Up @@ -579,7 +552,7 @@ def test_genesis_start_begins_in_syncing(self, peer_id: PeerId) -> None:


class TestBlockPersistence:
"""Tests for _process_block_wrapper and database persistence."""
"""Tests for process_block and database persistence."""

def test_process_block_increments_counter_without_database(
self,
Expand All @@ -595,7 +568,7 @@ def test_process_block_increments_counter_without_database(
parent_root=genesis_root,
state_root=Bytes32.zero(),
)
service.store = service._process_block_wrapper(service.store, block)
service.store = service.process_block(service.store, block)
assert service._blocks_processed == 1

def test_persist_skips_state_when_post_state_missing(
Expand All @@ -607,8 +580,10 @@ def test_persist_skips_state_when_post_state_missing(
service = create_mock_sync_service(
peer_id,
database=cast(Database, db),
process_block=_persist_process_block(include_post_state=False, prune=False),
)
mock_store = cast(MockForkchoiceStore, service.store)
# Justified advances each block; finalized stays at genesis (no prune).
mock_store.advance_justified_on_block = True
service.state = SyncState.SYNCING
genesis_root = service.store.head
block = make_signed_block(
Expand All @@ -617,7 +592,7 @@ def test_persist_skips_state_when_post_state_missing(
parent_root=genesis_root,
state_root=Bytes32.zero(),
)
service.store = service._process_block_wrapper(service.store, block)
service.store = service.process_block(service.store, block)

block_root = hash_tree_root(block.block)
assert db.calls[0].name == "batch_write_enter"
Expand Down Expand Up @@ -649,8 +624,12 @@ def test_persist_writes_state_and_prunes_when_finalized_advanced(
service = create_mock_sync_service(
peer_id,
database=cast(Database, db),
process_block=_persist_process_block(include_post_state=True, prune=True),
)
mock_store = cast(MockForkchoiceStore, service.store)
# Post-state indexing requires both a state to index and an advanced finalized.
mock_store.on_block_post_state = make_genesis_state(num_validators=1)
mock_store.advance_justified_on_block = True
mock_store.advance_finalized_on_block = True
service.state = SyncState.SYNCING
genesis_root = service.store.head
block = make_signed_block(
Expand All @@ -659,7 +638,7 @@ def test_persist_writes_state_and_prunes_when_finalized_advanced(
parent_root=genesis_root,
state_root=Bytes32.zero(),
)
service.store = service._process_block_wrapper(service.store, block)
service.store = service.process_block(service.store, block)

block_root = hash_tree_root(block.block)
assert db.calls[0].name == "batch_write_enter"
Expand Down
Loading