Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
ControlMessage,
ControlPrune,
Message,
PrunePeerInfo,
SubOpts,
)
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic, TopicKind
Expand Down Expand Up @@ -341,14 +340,8 @@ def _make_enr(self) -> dict[str, Any]:
output["ip4"] = enr.ip4
if enr.udp_port is not None:
output["udpPort"] = int(enr.udp_port)
if enr.udp6_port is not None:
output["udp6Port"] = int(enr.udp6_port)
if enr.ip6:
output["ip6"] = enr.ip6
if enr.quic_port is not None:
output["quicPort"] = int(enr.quic_port)
if enr.quic6_port is not None:
output["quic6Port"] = int(enr.quic6_port)
if (ma := enr.multiaddr()) is not None:
output["multiaddr"] = str(ma)
if (eth2 := enr.eth2_data) is not None:
Expand All @@ -359,8 +352,6 @@ def _make_enr(self) -> dict[str, Any]:
}
if (subnets := enr.attestation_subnets) is not None:
output["attestationSubnets"] = [int(s) for s in subnets.subscribed_subnets()]
if (sync := enr.sync_committee_subnets) is not None:
output["syncCommitteeSubnets"] = [int(s) for s in sync.subscribed_subnets()]
output["isAggregator"] = enr.is_aggregator
output["signatureValid"] = enr.verify_signature()
output["isValid"] = enr.is_valid()
Expand Down Expand Up @@ -444,22 +435,12 @@ def _build_iwant(d: dict[str, Any]) -> ControlIWant:

def _build_prune(d: dict[str, Any]) -> ControlPrune:
"""Build a ControlPrune from a dict."""
peers = [_build_prune_peer(p) for p in d.get("peers", [])]
return ControlPrune(
topic_id=TopicId(d.get("topicId", "")),
peers=peers,
backoff=d.get("backoff", 0),
)


def _build_prune_peer(p: dict[str, Any]) -> PrunePeerInfo:
"""Build a PrunePeerInfo from a dict."""
return PrunePeerInfo(
peer_id=_from_hex(p["peerId"]) if p.get("peerId") else b"",
signed_peer_record=(_from_hex(p["signedPeerRecord"]) if p.get("signedPeerRecord") else b""),
)


def _build_idontwant(d: dict[str, Any]) -> ControlIDontWant:
"""Build a ControlIDontWant from a dict."""
return ControlIDontWant(message_ids=[_from_hex(mid) for mid in d.get("messageIds", [])])
18 changes: 9 additions & 9 deletions src/lean_spec/subspecs/networking/client/event_source/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@
Gossip uses raw Snappy block format. Req-resp uses Snappy framing with CRC32C.


GOSSIPSUB v1.1 REQUIREMENTS
GOSSIPSUB v1.2 REQUIREMENTS
---------------------------
The Ethereum consensus spec requires gossipsub v1.1 (protocol "/meshsub/1.1.0").
Key v1.1 features used:
The node advertises gossipsub v1.2 (protocol "/meshsub/1.2.0").
Key v1.2 features used:

- IDONTWANT control messages for bandwidth optimization.
- Peer scoring: Misbehaving peers get lower scores.
- Extended validators: Message validation before forwarding.
- Flood publishing: High-priority messages bypass mesh constraints.


References:
- Ethereum P2P spec: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md
- Gossipsub v1.1: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
- Gossipsub v1.2: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
- SSZ spec: https://github.com/ethereum/consensus-specs/blob/dev/ssz/simple-serialize.md
- Snappy format: https://github.com/google/snappy/blob/main/format_description.txt
"""
Expand Down Expand Up @@ -889,7 +890,7 @@ async def _negotiate_inbound_stream(
# preserve it for later use (to avoid losing buffered data).
try:
wrapper = QuicStreamAdapter(stream)
gs_id = self._gossipsub_behavior._instance_id % 0xFFFF
gs_id = self._gossipsub_behavior._short_id
logger.debug(
"[GS %x] Accepting inbound stream %d from %s, negotiating protocol...",
gs_id,
Expand Down Expand Up @@ -958,7 +959,7 @@ async def _handle_gossipsub_inbound_stream(
peer_id: Peer that opened the stream.
conn: Connection the stream belongs to (used to open our
outbound stream when needed).
protocol_id: Negotiated gossipsub protocol id (v1.1 or v1.2).
protocol_id: Negotiated gossipsub protocol id (v1.2).
wrapper: Adapter holding any bytes already buffered during
multistream-select. Reusing this wrapper preserves those
bytes for the gossipsub behavior.
Expand All @@ -973,9 +974,8 @@ async def _handle_gossipsub_inbound_stream(
# - Outbound: we opened this to send our RPCs
# - Inbound: they opened this to send us RPCs
#
# We support both v1.1 and v1.2 - the difference is IDONTWANT
# messages which we can handle gracefully.
gs_id = self._gossipsub_behavior._instance_id % 0xFFFF
# We advertise gossipsub v1.2 only.
gs_id = self._gossipsub_behavior._short_id
logger.debug(
"[GS %x] Received inbound gossipsub stream (%s) from %s",
gs_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ class GossipMessageError(Exception):

Includes:

- GossipSub v1.1 and v1.2
- GossipSub v1.2
- Request/response protocols (Status, BlocksByRoot)
"""
20 changes: 1 addition & 19 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
MAX_PAYLOAD_SIZE: Final[int] = 10 * 1024 * 1024
"""Maximum uncompressed payload size in bytes (10 MiB)."""

TTFB_TIMEOUT: Final[float] = 5.0
"""Time-to-first-byte timeout.

Maximum time to wait for the first byte of a response after sending a request.
If no data arrives within this window, the request is considered failed.
"""

RESP_TIMEOUT: Final[float] = 10.0
"""Response timeout.

Expand All @@ -38,22 +31,11 @@
Per Ethereum spec, prepended to the message hash when decompression succeeds.
"""

GOSSIPSUB_PROTOCOL_ID_V11: Final = ProtocolId("/meshsub/1.1.0")
"""Gossipsub v1.1 protocol ID - peer scoring, extended validators.

This is the minimum version required by the Ethereum consensus spec.
"""

GOSSIPSUB_PROTOCOL_ID_V12: Final = ProtocolId("/meshsub/1.2.0")
"""Gossipsub v1.2 protocol ID - IDONTWANT bandwidth optimization."""

GOSSIPSUB_DEFAULT_PROTOCOL_ID: Final = GOSSIPSUB_PROTOCOL_ID_V12
"""
Default protocol ID per Ethereum consensus spec requirements.

The Ethereum consensus P2P spec states:
"Clients MUST support the gossipsub v1 libp2p Protocol including the gossipsub v1.1 extension."
"""
"""Default gossipsub protocol ID advertised during stream negotiation."""

PRUNE_BACKOFF: Final[int] = 60
"""Default PRUNE backoff duration in seconds.
Expand Down
3 changes: 1 addition & 2 deletions src/lean_spec/subspecs/networking/enr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from . import keys
from .enr import ENR
from .eth2 import FAR_FUTURE_EPOCH, AttestationSubnets, Eth2Data, SyncCommitteeSubnets
from .eth2 import FAR_FUTURE_EPOCH, AttestationSubnets, Eth2Data
from .keys import EnrKey

__all__ = [
Expand All @@ -16,6 +16,5 @@
"keys",
"Eth2Data",
"AttestationSubnets",
"SyncCommitteeSubnets",
"FAR_FUTURE_EPOCH",
]
36 changes: 3 additions & 33 deletions src/lean_spec/subspecs/networking/enr/enr.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
)

from . import keys
from .eth2 import AttestationSubnets, Eth2Data, SyncCommitteeSubnets
from .eth2 import AttestationSubnets, Eth2Data
from .keys import EnrKey

ENR_PREFIX: Final = "enr:"
Expand Down Expand Up @@ -127,48 +127,26 @@ def ip4(self) -> str | None:
ip_bytes = self.get(keys.IP)
return ".".join(str(b) for b in ip_bytes) if ip_bytes and len(ip_bytes) == 4 else None

@property
def ip6(self) -> str | None:
"""IPv6 address as colon-separated hex."""
ip_bytes = self.get(keys.IP6)
if ip_bytes and len(ip_bytes) == 16:
return ":".join(ip_bytes[i : i + 2].hex() for i in range(0, 16, 2))
return None

@property
def udp_port(self) -> Port | None:
"""UDP port for discovery."""
raw = self.get(keys.UDP)
return Port(int.from_bytes(raw, "big")) if raw else None

@property
def udp6_port(self) -> Port | None:
"""IPv6-specific UDP port."""
raw = self.get(keys.UDP6)
return Port(int.from_bytes(raw, "big")) if raw else None

@property
def quic_port(self) -> Port | None:
"""QUIC port for QUIC connections."""
raw = self.get(keys.QUIC)
return Port(int.from_bytes(raw, "big")) if raw else None

@property
def quic6_port(self) -> Port | None:
"""IPv6-specific QUIC port."""
raw = self.get(keys.QUIC6)
return Port(int.from_bytes(raw, "big")) if raw else None

def multiaddr(self) -> Multiaddr | None:
"""
Construct QUIC multiaddress from endpoint info.
"""Construct QUIC multiaddress from endpoint info.

Use QUIC port if available, otherwise UDP port.
"""
port = self.quic_port or self.udp_port
if self.ip4 and port:
return Multiaddr(f"/ip4/{self.ip4}/udp/{port}/quic-v1")
if self.ip6 and port:
return Multiaddr(f"/ip6/{self.ip6}/udp/{port}/quic-v1")
return None

@property
Expand All @@ -189,14 +167,6 @@ def attestation_subnets(self) -> AttestationSubnets | None:
attnets = self.get(keys.ATTNETS)
return AttestationSubnets.decode_bytes(attnets) if attnets and len(attnets) == 8 else None

@property
def sync_committee_subnets(self) -> SyncCommitteeSubnets | None:
"""Parse syncnets key (SSZ Bitvector[4])."""
syncnets = self.get(keys.SYNCNETS)
if syncnets and len(syncnets) == 1:
return SyncCommitteeSubnets.decode_bytes(syncnets)
return None

@property
def is_aggregator(self) -> bool:
"""Check if node advertises aggregator capability."""
Expand Down
46 changes: 0 additions & 46 deletions src/lean_spec/subspecs/networking/enr/eth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

Subnet subscription keys (SSZ Bitvectors):
- attnets: Bitvector[64] - attestation subnets (bit i = subscribed to subnet i)
- syncnets: Bitvector[4] - sync committee subnets

See: https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md
"""
Expand Down Expand Up @@ -87,48 +86,3 @@ def subscribed_subnets(self) -> list[SubnetId]:
def subscription_count(self) -> int:
"""Number of subscribed subnets."""
return sum(1 for b in self.data if b)


class SyncCommitteeSubnets(BaseBitvector):
"""
Sync committee subnet subscriptions (ENR `syncnets` key).

SSZ Bitvector[4] where bit i indicates subscription to sync subnet i.
"""

LENGTH: ClassVar[int] = 4
"""4 sync committee subnets."""

@classmethod
def none(cls) -> "SyncCommitteeSubnets":
"""No subscriptions."""
return cls(data=[Boolean(False)] * cls.LENGTH)

@classmethod
def all(cls) -> "SyncCommitteeSubnets":
"""Subscribe to all 4 subnets."""
return cls(data=[Boolean(True)] * cls.LENGTH)

@classmethod
def from_subnet_ids(cls, subnet_ids: list[int]) -> "SyncCommitteeSubnets":
"""Subscribe to specific sync subnets."""
bits = [Boolean(False)] * cls.LENGTH
for sid in subnet_ids:
if not 0 <= sid < cls.LENGTH:
raise ValueError(f"Sync subnet ID must be 0-3, got {sid}")
bits[sid] = Boolean(True)
return cls(data=bits)

def is_subscribed(self, subnet_id: int) -> bool:
"""Check if subscribed to a sync subnet."""
if not 0 <= subnet_id < self.LENGTH:
raise ValueError(f"Sync subnet ID must be 0-3, got {subnet_id}")
return bool(self.data[subnet_id])

def subscribed_subnets(self) -> list[SubnetId]:
"""List of subscribed sync subnet IDs."""
return [SubnetId(i) for i in range(self.LENGTH) if self.data[i]]

def subscription_count(self) -> int:
"""Number of subscribed sync subnets."""
return sum(1 for b in self.data if b)
12 changes: 0 additions & 12 deletions src/lean_spec/subspecs/networking/enr/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,16 @@ class EnrKey(str):
UDP: Final = EnrKey("udp")
"""UDP port for discovery (big-endian integer)."""

IP6: Final = EnrKey("ip6")
"""IPv6 address (16 bytes)."""

UDP6: Final = EnrKey("udp6")
"""IPv6-specific UDP port (big-endian integer)."""

# QUIC Keys
QUIC: Final = EnrKey("quic")
"""QUIC port for QUIC connections (big-endian integer)."""

QUIC6: Final = EnrKey("quic6")
"""IPv6-specific QUIC port (big-endian integer)."""

# Ethereum Consensus Extensions
ETH2: Final = EnrKey("eth2")
"""Ethereum consensus fork data (16 bytes)."""

ATTNETS: Final = EnrKey("attnets")
"""Attestation subnet subscriptions (8 bytes bitvector)."""

SYNCNETS: Final = EnrKey("syncnets")
"""Sync committee subnet subscriptions (1 byte bitvector)."""

IS_AGGREGATOR: Final = EnrKey("is_aggregator")
"""Aggregator capability flag (1 byte: 0x00 = false, 0x01 = true)."""
Loading
Loading