Skip to content
27 changes: 20 additions & 7 deletions core/ipc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,27 @@ func buildEndpoints(neigh *state.Neighbour) []*protocol.EndpointInfo {
if ap, err := nep.DynEP.Get(); err == nil {
resolved = stringPtr(ap.String())
}
var bindID, bindInterface, bindSource *string
if nep.LocalBind != "" {
bindID = stringPtr(string(nep.LocalBind))
}
if nep.Bind.Interface != "" {
bindInterface = stringPtr(nep.Bind.Interface)
}
if nep.Bind.Source.IsValid() {
bindSource = stringPtr(nep.Bind.Source.String())
}
eps = append(eps, &protocol.EndpointInfo{
Address: nep.DynEP.Value,
Resolved: resolved,
Active: ep.IsActive(),
RemoteInit: ep.IsRemote(),
Metric: ep.Metric(),
FilteredRttNs: int64(nep.FilteredPing()),
StabilizedRttNs: int64(nep.StabilizedPing()),
Address: nep.DynEP.Value,
Resolved: resolved,
Active: ep.IsActive(),
RemoteInit: ep.IsRemote(),
Metric: ep.Metric(),
FilteredRttNs: int64(nep.FilteredPing()),
StabilizedRttNs: int64(nep.StabilizedPing()),
LocalBindId: bindID,
LocalBindInterface: bindInterface,
LocalBindSource: bindSource,
})
}
slices.SortFunc(eps, func(a, b *protocol.EndpointInfo) int {
Expand Down
1 change: 1 addition & 0 deletions core/nylon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Nylon struct {
router struct {
LastStarvationRequest time.Time
IO map[state.NodeId]*IOPending
RouteComputePending atomic.Bool

// ForwardTable contains the full routing table
ForwardTable atomic.Pointer[bart.Table[RouteTableEntry]]
Expand Down
48 changes: 37 additions & 11 deletions core/nylon_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"errors"
"fmt"
"net/netip"
"reflect"
"slices"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (n *Nylon) reconcileRouterState(next *state.CentralCfg) error {
continue
}
// configure existing neighbours
reconcileConfiguredEndpoints(neigh, cfg.Endpoints, &n.RouterTunables)
reconcileConfiguredEndpoints(neigh, configuredEndpoints(n.LocalCfg.EndpointBinds, neigh.Id, cfg.Endpoints, &n.RouterTunables))
neighs = append(neighs, neigh)
delete(desired, neigh.Id)
}
Expand All @@ -88,9 +89,7 @@ func (n *Nylon) reconcileRouterState(next *state.CentralCfg) error {
Routes: make(map[netip.Prefix]state.NeighRoute),
Eps: make([]state.Endpoint, 0, len(cfg.Endpoints)),
}
for _, ep := range cfg.Endpoints {
stNeigh.Eps = append(stNeigh.Eps, state.NewEndpoint(ep, false, nil, &n.RouterTunables))
}
stNeigh.Eps = append(stNeigh.Eps, configuredEndpoints(n.LocalCfg.EndpointBinds, id, cfg.Endpoints, &n.RouterTunables)...)
neighs = append(neighs, stNeigh)
}
n.RouterState.Neighbours = neighs
Expand All @@ -107,10 +106,35 @@ func (n *Nylon) reconcileRouterState(next *state.CentralCfg) error {
return nil
}

func reconcileConfiguredEndpoints(neigh *state.Neighbour, desired []*state.DynamicEndpoint, t *state.RouterTunables) {
desiredByValue := make(map[string]*state.DynamicEndpoint, len(desired))
func configuredEndpoints(binds []state.LocalEndpointBind, peer state.NodeId, endpoints []*state.DynamicEndpoint, t *state.RouterTunables) []state.Endpoint {
eps := make([]state.Endpoint, 0, len(endpoints))
for _, ep := range endpoints {
matched := false
for idx, bind := range binds {
if bind.Peer != peer || bind.Endpoint != ep.Value {
continue
}
nep := state.NewEndpoint(ep, false, nil, t)
nep.LocalBind = bind.LocalBind(idx).ID
nep.Bind = bind.LocalBind(idx)
eps = append(eps, nep)
matched = true
}
if !matched {
eps = append(eps, state.NewEndpoint(ep, false, nil, t))
}
}
return eps
}

func endpointKey(ep *state.NylonEndpoint) string {
return fmt.Sprintf("%s\x00%s\x00%s", ep.DynEP.Value, ep.Bind.Interface, ep.Bind.Source)
}

func reconcileConfiguredEndpoints(neigh *state.Neighbour, desired []state.Endpoint) {
desiredByKey := make(map[string]state.Endpoint, len(desired))
for _, ep := range desired {
desiredByValue[ep.Value] = ep
desiredByKey[endpointKey(ep.AsNylonEndpoint())] = ep
}

eps := make([]state.Endpoint, 0, len(neigh.Eps)+len(desired))
Expand All @@ -122,16 +146,18 @@ func reconcileConfiguredEndpoints(neigh *state.Neighbour, desired []*state.Dynam
continue
}
// only keep if desired
if desiredEp, ok := desiredByValue[nep.DynEP.Value]; ok {
key := endpointKey(nep)
if _, ok := desiredByKey[key]; ok {
eps = append(eps, ep)
seen[desiredEp.Value] = struct{}{}
seen[key] = struct{}{}
}
}
for _, ep := range desired {
if _, ok := seen[ep.Value]; ok {
key := endpointKey(ep.AsNylonEndpoint())
if _, ok := seen[key]; ok {
continue
}
eps = append(eps, state.NewEndpoint(ep, false, nil, t))
eps = append(eps, ep)
}
neigh.Eps = eps
}
Expand Down
87 changes: 58 additions & 29 deletions core/nylon_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"math/rand/v2"
"net"
"slices"
"sync"
"time"
Expand All @@ -15,6 +16,8 @@ import (

type EpPing struct {
TimeSent time.Time
Node state.NodeId
Endpoint *state.NylonEndpoint
}

func (n *Nylon) Probe(node state.NodeId, ep *state.NylonEndpoint, waitErr bool) error {
Expand Down Expand Up @@ -46,6 +49,8 @@ func (n *Nylon) Probe(node state.NodeId, ep *state.NylonEndpoint, waitErr bool)

n.PingBuf.Set(token, EpPing{
TimeSent: time.Now(),
Node: node,
Endpoint: ep,
}, ttlcache.DefaultTTL)
}()

Expand Down Expand Up @@ -83,6 +88,29 @@ func handleProbe(n *Nylon, pkt *protocol.Ny_Probe, endpoint conn.Endpoint, peer
}
}

func endpointMatchesPacket(ep *state.NylonEndpoint, packetEp conn.Endpoint) bool {
ap, err := ep.DynEP.Get()
if err != nil || ap != packetEp.DstIPPort() {
return false
}
if ep.Bind.Source.IsValid() && ep.Bind.Source != packetEp.SrcIP() {
return false
}
if !ep.Bind.Source.IsValid() && ep.Bind.Interface != "" {
iface, err := net.InterfaceByName(ep.Bind.Interface)
if err != nil {
return false
}
srcIf, ok := packetEp.(interface {
SrcIfidx() int32
})
if !ok || int32(iface.Index) != srcIf.SrcIfidx() {
return false
}
}
return true
}

func handleProbePing(n *Nylon, node state.NodeId, wgEndpoint conn.Endpoint) {
if node == n.LocalCfg.Id {
return
Expand All @@ -91,8 +119,7 @@ func handleProbePing(n *Nylon, node state.NodeId, wgEndpoint conn.Endpoint) {
for _, neigh := range n.RouterState.Neighbours {
for _, dep := range neigh.Eps {
dep := dep.AsNylonEndpoint()
ap, err := dep.DynEP.Get()
if err == nil && ap == wgEndpoint.DstIPPort() && neigh.Id == node {
if neigh.Id == node && endpointMatchesPacket(dep, wgEndpoint) {
// we have a link

// refresh wireguard ep
Expand All @@ -104,7 +131,7 @@ func handleProbePing(n *Nylon, node state.NodeId, wgEndpoint conn.Endpoint) {
dep.Renew()

if n.DBG_log_probe {
n.Log.Debug("probe from", "addr", ap.String())
n.Log.Debug("probe from", "addr", wgEndpoint.DstToString(), "src", wgEndpoint.SrcIP())
}
return
}
Expand All @@ -124,33 +151,35 @@ func handleProbePing(n *Nylon, node state.NodeId, wgEndpoint conn.Endpoint) {
}

func handleProbePong(n *Nylon, node state.NodeId, token uint64, ep conn.Endpoint) {
// check if link exists
for _, neigh := range n.RouterState.Neighbours {
for _, dep := range neigh.Eps {
dpLink := dep.AsNylonEndpoint()
ap, err := dpLink.DynEP.Get()
if err == nil && ap == ep.DstIPPort() && neigh.Id == node {
linkHealth, ok := n.PingBuf.GetAndDelete(token)
if ok {
health := linkHealth.Value()
latency := time.Since(health.TimeSent)
// we have a link
if n.DBG_log_probe {
n.Log.Debug("probe back", "peer", node, "ping", latency)
}
dpLink.Renew()
dpLink.UpdatePing(latency)

// update wireguard endpoint
dpLink.WgEndpoint = ep

ComputeRoutes(n.RouterState, n)
}
return
}
}
linkHealth, ok := n.PingBuf.GetAndDelete(token)
if !ok {
n.Log.Warn("probe came back and couldn't find token", "from", ep.DstToString(), "node", node)
return
}
health := linkHealth.Value()
dpLink := health.Endpoint
if health.Node != node || dpLink == nil {
n.Log.Warn("probe came back for unexpected node", "from", ep.DstToString(), "node", node, "expected", health.Node)
return
}
latency := time.Since(health.TimeSent)
// we have a link
if n.DBG_log_probe {
n.Log.Debug("probe back", "peer", node, "ping", latency)
}
dpLink.Renew()
dpLink.UpdatePing(latency)

if dpLink.Bind.Source.IsValid() && dpLink.Bind.Source != ep.SrcIP() {
n.Log.Warn("bound probe returned on unexpected source", "from", ep.DstToString(), "node", node, "expected", dpLink.Bind.Source, "actual", ep.SrcIP())
} else {
// update wireguard endpoint
dpLink.WgEndpoint = ep
}
n.Log.Warn("probe came back and couldn't find link", "from", ep.DstToString(), "node", node)

// Probe pongs arrive frequently on larger meshes. Coalesce route
// recomputation so RTT samples can update without saturating dispatch.
n.ScheduleRouteCompute(n.StarvationDelay)
}

func (n *Nylon) probeLinks(active bool) error {
Expand Down
9 changes: 5 additions & 4 deletions core/nylon_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"
)

// Dispatch Dispatches the function to run on the main thread without waiting for it to complete
func (n *Nylon) Dispatch(fun func() error) {
// Dispatch dispatches the function to run on the main thread without waiting for it to complete.
// It returns false when the dispatch queue is full and the function was dropped.
func (n *Nylon) Dispatch(fun func() error) bool {
defer func() {
if r := recover(); r != nil {
n.Cancel(fmt.Errorf("dispatch panic: %v", r))
Expand All @@ -17,10 +18,10 @@ func (n *Nylon) Dispatch(fun func() error) {
for {
select {
case n.DispatchChannel <- fun:
return
return true
default:
n.Log.Error("dispatch channel is full, discarded function", "fun", runtime.FuncForPC(reflect.ValueOf(fun).Pointer()).Name(), "len", len(n.DispatchChannel))
return
return false
}
}
}
Expand Down
44 changes: 35 additions & 9 deletions core/nylon_tc.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,49 @@ func (n *Nylon) handleNylonPacket(packet []byte, endpoint conn.Endpoint, peer *d
}
}()

controlPackets := make([]*protocol.Ny, 0, len(bundle.Packets))

for _, pkt := range bundle.Packets {
switch pkt.Type.(type) {
case *protocol.Ny_SeqnoRequestOp:
n.Dispatch(func() error {
return n.routerHandleSeqnoRequest(neigh, pkt.GetSeqnoRequestOp())
})
controlPackets = append(controlPackets, pkt)
case *protocol.Ny_RouteOp:
n.Dispatch(func() error {
return n.routerHandleRouteUpdate(neigh, pkt.GetRouteOp())
})
controlPackets = append(controlPackets, pkt)
case *protocol.Ny_AckRetractOp:
n.Dispatch(func() error {
return n.routerHandleAckRetract(neigh, pkt.GetAckRetractOp())
})
controlPackets = append(controlPackets, pkt)
case *protocol.Ny_ProbeOp:
// we don't want to wait for dispatch before responding to this packet
handleProbe(n, pkt.GetProbeOp(), endpoint, peer, neigh)
}
}

if len(controlPackets) == 0 {
return
}

n.Dispatch(func() error {
routeUpdated := false
for _, pkt := range controlPackets {
switch pkt.Type.(type) {
case *protocol.Ny_SeqnoRequestOp:
if err := n.routerHandleSeqnoRequest(neigh, pkt.GetSeqnoRequestOp()); err != nil {
return err
}
case *protocol.Ny_RouteOp:
applied, err := n.routerApplyRouteUpdate(neigh, pkt.GetRouteOp())
if err != nil {
return err
}
routeUpdated = routeUpdated || applied
case *protocol.Ny_AckRetractOp:
if err := n.routerHandleAckRetract(neigh, pkt.GetAckRetractOp()); err != nil {
return err
}
}
}
if routeUpdated {
ComputeRoutes(n.RouterState, n)
}
return nil
})
}
Loading