diff --git a/go.mod b/go.mod index 896d49b..eb545c9 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.25.10 require ( github.com/coder/websocket v1.8.15 - github.com/pilot-protocol/common v0.5.0 + github.com/pilot-protocol/common v0.5.4 golang.org/x/net v0.56.0 ) diff --git a/go.sum b/go.sum index 3a9a6ea..e01b756 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/coder/websocket v1.8.15 h1:6B2JPeOGlpff2Uz6vOEH1Vzpi0iUz20A+lPVhPHtNUA= github.com/coder/websocket v1.8.15/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= -github.com/pilot-protocol/common v0.5.0 h1:+rakrcpwbg8dfuQDWAgOI1N/XC3c6WSwar2SlW+cZ28= -github.com/pilot-protocol/common v0.5.0/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4= +github.com/pilot-protocol/common v0.5.4 h1:lz/cOXOjYCNIJvkWpb+pxXWpCqNKM/IY7D5fnVAq+8g= +github.com/pilot-protocol/common v0.5.4/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4= golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= diff --git a/server.go b/server.go index f4af19d..0e5abe7 100644 --- a/server.go +++ b/server.go @@ -41,6 +41,7 @@ type Server struct { conns []*net.UDPConn // SO_REUSEPORT sockets — one per reader goroutine sendBatchConns []*ipv4.PacketConn // ipv4 wrappers of conns[i] for WriteBatch (sendmmsg) — one per worker fd nodes *nodeMap // sharded node_id → observed endpoint + last-seen + dstNodes sync.Map // node_id → struct{}: endpoints that requested dest-carrying delivery (0x09) readyCh chan struct{} relayCh chan relayJob // buffered channel for relay workers pool sync.Pool // reusable payload buffers (inner relay payload) @@ -531,7 +532,9 @@ func (s *Server) handlePacket(data []byte, remote *net.UDPAddr) { switch msgType { case protocol.BeaconMsgDiscover: - s.handleDiscover(data[1:], remote) + s.handleDiscover(data[1:], remote, false) + case protocol.BeaconMsgDiscoverEx: + s.handleDiscover(data[1:], remote, true) case protocol.BeaconMsgPunchRequest: s.handlePunchRequest(data[1:], remote) case protocol.BeaconMsgRelay: @@ -543,7 +546,7 @@ func (s *Server) handlePacket(data []byte, remote *net.UDPAddr) { } } -func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { +func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr, wantDest bool) { if len(data) < 4 { return } @@ -557,6 +560,13 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { nodeID := binary.BigEndian.Uint32(data[0:4]) + // This endpoint requested destination-carrying delivery; remember the + // node id so its final-hop relay frames include the destination (0x09), + // letting the receiver resolve which local endpoint the frame is for. + if wantDest { + s.dstNodes.Store(nodeID, struct{}{}) + } + // Per-nodeID endpoint update rate limit (PILOT-334) — prevents a single // nodeID from flapping its endpoint via rapid Discover messages. s.discoverRateMu.Lock() @@ -925,10 +935,20 @@ func (s *Server) relayWorker(sendConn *ipv4.PacketConn) { var addr *net.UDPAddr if localOk { addr = localAddr - outBuf = outBuf[:1+4+len(job.payload)] - outBuf[0] = protocol.BeaconMsgRelayDeliver - binary.BigEndian.PutUint32(outBuf[1:5], job.senderID) - copy(outBuf[5:], job.payload) + if _, wantDest := s.dstNodes.Load(job.destID); wantDest { + // Destination-carrying endpoint: include the dest node id + // so the receiver can resolve which endpoint the frame is for. + outBuf = outBuf[:1+4+4+len(job.payload)] + outBuf[0] = protocol.BeaconMsgRelayDeliverDest + binary.BigEndian.PutUint32(outBuf[1:5], job.senderID) + binary.BigEndian.PutUint32(outBuf[5:9], job.destID) + copy(outBuf[9:], job.payload) + } else { + outBuf = outBuf[:1+4+len(job.payload)] + outBuf[0] = protocol.BeaconMsgRelayDeliver + binary.BigEndian.PutUint32(outBuf[1:5], job.senderID) + copy(outBuf[5:], job.payload) + } } else { // Tier 2: peer beacon lookup. peerNodes is an // atomic.Pointer; reads are lock-free.