diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index d0f3ae7d..66feed21 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -113,9 +113,6 @@ class SyncService: Ignored on non-aggregator nodes, which never import gossip attestations. """ - process_block: Callable[[Store, SignedBlock], Store] | None = field(default=None) - """Block processor function. Defaults to the spec's block processing.""" - publish_aggregated_attestation: Callable[ [SignedAggregatedAttestation], Coroutine[None, None, None] ] = field(default=_noop_publish_agg) @@ -161,18 +158,14 @@ class SyncService: _pending_block_aggregates: list[SignedAggregatedAttestation] = field(default_factory=list) """Combined aggregates recovered from processed blocks. - Every processed block is deconstructed in the block wrapper, which + Every processed block is deconstructed during block processing, which queues its combined aggregates when this node is in the aggregator role. The gossip umbrella drains and publishes them after the store is updated. """ def __post_init__(self) -> None: - """Bind the default block processor and wire sub-components.""" - # Tests can pass an explicit processor and skip this path. - if self.process_block is None: - self.process_block = self._default_process_block - + """Wire sub-components and apply the genesis-start state hint.""" self._init_components() # Genesis validators already hold the full genesis state. @@ -193,15 +186,14 @@ def _init_components(self) -> None: store_view=self, ) - # The wrapper adds counter and persistence tracking around each block. self._head_sync = HeadSync( block_cache=self.block_cache, backfill=self._backfill, - process_block=self._process_block_wrapper, + process_block=self.process_block, ) - def _default_process_block(self, store: Store, block: SignedBlock) -> Store: - """Run the spec's block processor and emit forkchoice telemetry.""" + def process_block(self, store: Store, block: SignedBlock) -> Store: + """Apply a block to the store, emit telemetry, and persist when wired up.""" new_store = self.spec.on_block(store, block) # Live chain pointers, exposed as gauges so dashboards reflect the current view. @@ -247,25 +239,6 @@ def ancestors(start: Bytes32) -> set[Bytes32]: metrics.lean_fork_choice_reorgs_total.inc() metrics.lean_fork_choice_reorg_depth.observe(depth) - return new_store - - def _process_block_wrapper( - self, - store: Store, - block: SignedBlock, - ) -> Store: - """Run the processor, bump the counter, and persist when wired up. - - All block processing flows through one wrapper regardless of which - processor is configured. - """ - # Delegate to the actual block processor. - # - # The processor validates the block and updates forkchoice state. - processor = self.process_block - assert processor is not None - new_store = processor(store, block) - # Track processed blocks. # # We only count blocks that pass validation and update the store. diff --git a/tests/lean_spec/helpers/builders.py b/tests/lean_spec/helpers/builders.py index f8c168f5..cbb33e1f 100644 --- a/tests/lean_spec/helpers/builders.py +++ b/tests/lean_spec/helpers/builders.py @@ -7,7 +7,7 @@ from __future__ import annotations -from typing import Callable, NamedTuple, cast +from typing import NamedTuple, cast from consensus_testing.keys import XmssKeyManager @@ -507,15 +507,12 @@ def create_mock_sync_service( *, database: Database | None = None, genesis_start: bool = False, - process_block: Callable[[Store, SignedBlock], Store] | None = None, ) -> SyncService: """Create a SyncService with mock dependencies for integration testing.""" mock_store = MockForkchoiceStore(head_slot=0) peer_manager = PeerManager() peer_manager.add_peer(PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED)) - processor = process_block if process_block is not None else (lambda s, b: s.on_block(b)) - return SyncService( store=cast(Store, mock_store), peer_manager=peer_manager, @@ -525,5 +522,4 @@ def create_mock_sync_service( spec=StoreInterceptingSpec(), database=database, genesis_start=genesis_start, - process_block=processor, ) diff --git a/tests/lean_spec/helpers/mocks.py b/tests/lean_spec/helpers/mocks.py index 95bbd2aa..ba3e3246 100644 --- a/tests/lean_spec/helpers/mocks.py +++ b/tests/lean_spec/helpers/mocks.py @@ -19,7 +19,7 @@ from lean_spec.subspecs.networking import PeerId from lean_spec.subspecs.networking.service.events import NetworkEvent from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.types import Bytes32, Slot, Uint64 +from lean_spec.types import Bytes32, Checkpoint, Slot, Uint64 class StoreInterceptingSpec(LstarSpec): @@ -135,17 +135,10 @@ async def publish(self, topic: str, data: bytes) -> None: @dataclass class _MockBlock: - """Minimal block stub with a slot attribute for mock store lookups.""" + """Minimal block stub with slot and parent_root for mock store lookups.""" slot: Slot = field(default_factory=lambda: Slot(0)) - - -@dataclass -class _MockCheckpoint: - """Minimal checkpoint stub with a slot attribute for mock store access.""" - - root: Bytes32 = field(default_factory=Bytes32.zero) - slot: Slot = field(default_factory=lambda: Slot(0)) + parent_root: Bytes32 = field(default_factory=Bytes32.zero) class MockForkchoiceStore: @@ -157,6 +150,10 @@ class MockForkchoiceStore: Optional `reject_*` predicates return True to simulate validation failure (`KeyError`), e.g. unknown attestation target. + + The `on_block_*` flags let tests simulate forkchoice side-effects of + block processing (post-state indexing, justified or finalized advance) + without injecting a custom block processor. """ def __init__(self, head_slot: int = 0) -> None: @@ -164,17 +161,24 @@ def __init__(self, head_slot: int = 0) -> None: genesis_root = Bytes32.zero() self.blocks: dict[Bytes32, object] = {genesis_root: _MockBlock(slot=Slot(head_slot))} self.head: Bytes32 = genesis_root + self.safe_target: Bytes32 = genesis_root self.head_slot: Slot = Slot(head_slot) self._attestations_received: list[SignedAttestation] = [] self._aggregated_attestations_received: list[SignedAggregatedAttestation] = [] self.validator_id = None - self.latest_justified = _MockCheckpoint() - self.latest_finalized = _MockCheckpoint() + self.latest_justified = Checkpoint(root=Bytes32.zero(), slot=Slot(0)) + self.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(0)) self.states: dict[Bytes32, object] = {} self.reject_attestation: Callable[[SignedAttestation], bool] | None = None self.reject_aggregated_attestation: Callable[[SignedAggregatedAttestation], bool] | None = ( None ) + self.on_block_post_state: object | None = None + """When set, on_block stores this object as states[block_root].""" + self.advance_justified_on_block: bool = False + """When True, on_block advances latest_justified to the new block.""" + self.advance_finalized_on_block: bool = False + """When True, on_block advances latest_finalized to the new block.""" def on_block( self, @@ -185,7 +189,15 @@ def on_block( root = hash_tree_root(block.block) self.blocks[root] = block.block self.head = root + # The mock has no real safe-target rule; head doubles as safe-target. + self.safe_target = root self.head_slot = block.block.slot + if self.on_block_post_state is not None: + self.states[root] = self.on_block_post_state + if self.advance_justified_on_block: + self.latest_justified = Checkpoint(root=root, slot=block.block.slot) + if self.advance_finalized_on_block: + self.latest_finalized = Checkpoint(root=root, slot=block.block.slot) return self def on_gossip_attestation( diff --git a/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py b/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py index 8c10cf7c..5dc2a5a0 100644 --- a/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py +++ b/tests/lean_spec/subspecs/sync/test_head_sync_backfill_routing.py @@ -20,7 +20,7 @@ from lean_spec.subspecs.sync.backfill_sync import BackfillSync from lean_spec.subspecs.sync.block_cache import BlockCache from lean_spec.subspecs.sync.head_sync import HeadSync, HeadSyncResult -from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex +from lean_spec.types import Bytes32, Checkpoint, Slot, Uint64, ValidatorIndex from tests.lean_spec.helpers import MockForkchoiceStore, make_signed_block @@ -64,7 +64,7 @@ def _store_with_head( requested head slot so subsequent gap math sees the configured value. """ store = MockForkchoiceStore() - store.latest_finalized.slot = Slot(finalized_slot) + store.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(finalized_slot)) head_root = Bytes32(b"\x77" * 32) head_block = make_signed_block( diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index 55d1e786..754e8d5c 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -2,7 +2,6 @@ from __future__ import annotations -from collections.abc import Callable from types import MappingProxyType from typing import cast @@ -11,9 +10,7 @@ from lean_spec.forks.lstar.containers import ( SignedAggregatedAttestation, - SignedBlock, ) -from lean_spec.forks.lstar.store import Store from lean_spec.subspecs.networking import PeerId from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.ssz.hash import hash_tree_root @@ -45,30 +42,6 @@ def _signed_aggregated_attestation(key_manager: XmssKeyManager) -> SignedAggrega return SignedAggregatedAttestation(data=attestation_data, proof=proof) -def _persist_process_block( - *, - include_post_state: bool, - prune: bool, -) -> Callable[[Store, SignedBlock], Store]: - post_state = make_genesis_state(num_validators=1) - - def process_block(store: Store, block: SignedBlock) -> Store: - ms = cast(MockForkchoiceStore, store) - ms.on_block(block) - block_root = hash_tree_root(block.block) - if include_post_state: - ms.states[block_root] = post_state - slot = block.block.slot - ms.latest_justified = Checkpoint(root=block_root, slot=slot) - if prune: - ms.latest_finalized = Checkpoint(root=block_root, slot=slot) - else: - ms.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(0)) - return cast(Store, ms) - - return process_block - - @pytest.fixture def sync_service(peer_id: PeerId) -> SyncService: """Provide a complete SyncService for integration testing.""" @@ -579,7 +552,7 @@ def test_genesis_start_begins_in_syncing(self, peer_id: PeerId) -> None: class TestBlockPersistence: - """Tests for _process_block_wrapper and database persistence.""" + """Tests for process_block and database persistence.""" def test_process_block_increments_counter_without_database( self, @@ -595,7 +568,7 @@ def test_process_block_increments_counter_without_database( parent_root=genesis_root, state_root=Bytes32.zero(), ) - service.store = service._process_block_wrapper(service.store, block) + service.store = service.process_block(service.store, block) assert service._blocks_processed == 1 def test_persist_skips_state_when_post_state_missing( @@ -607,8 +580,10 @@ def test_persist_skips_state_when_post_state_missing( service = create_mock_sync_service( peer_id, database=cast(Database, db), - process_block=_persist_process_block(include_post_state=False, prune=False), ) + mock_store = cast(MockForkchoiceStore, service.store) + # Justified advances each block; finalized stays at genesis (no prune). + mock_store.advance_justified_on_block = True service.state = SyncState.SYNCING genesis_root = service.store.head block = make_signed_block( @@ -617,7 +592,7 @@ def test_persist_skips_state_when_post_state_missing( parent_root=genesis_root, state_root=Bytes32.zero(), ) - service.store = service._process_block_wrapper(service.store, block) + service.store = service.process_block(service.store, block) block_root = hash_tree_root(block.block) assert db.calls[0].name == "batch_write_enter" @@ -649,8 +624,12 @@ def test_persist_writes_state_and_prunes_when_finalized_advanced( service = create_mock_sync_service( peer_id, database=cast(Database, db), - process_block=_persist_process_block(include_post_state=True, prune=True), ) + mock_store = cast(MockForkchoiceStore, service.store) + # Post-state indexing requires both a state to index and an advanced finalized. + mock_store.on_block_post_state = make_genesis_state(num_validators=1) + mock_store.advance_justified_on_block = True + mock_store.advance_finalized_on_block = True service.state = SyncState.SYNCING genesis_root = service.store.head block = make_signed_block( @@ -659,7 +638,7 @@ def test_persist_writes_state_and_prunes_when_finalized_advanced( parent_root=genesis_root, state_root=Bytes32.zero(), ) - service.store = service._process_block_wrapper(service.store, block) + service.store = service.process_block(service.store, block) block_root = hash_tree_root(block.block) assert db.calls[0].name == "batch_write_enter"