Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions driver/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,37 @@ import (
const maxSendChunk = ipcutil.MaxMessageSize - 64

// Conn implements net.Conn over a Pilot Protocol stream.
//
// Concurrency: like *net.TCPConn, a Conn may be used by at most one reader
// and one writer goroutine at a time. Read serialises concurrent callers
// with readMu (so recvBuf is never corrupted), but interleaving two
// readers still yields each a non-deterministic slice of the stream; do
// not do that. Write is safe for one writer; concurrent writers may
// interleave chunks on the wire. SetDeadline/SetReadDeadline/
// SetWriteDeadline are safe to call from any goroutine.
type Conn struct {
id uint32
localAddr protocol.SocketAddr
remoteAddr protocol.SocketAddr
ipc *ipcClient
recvCh chan []byte
recvBuf []byte // leftover from previous read
closed bool

mu sync.Mutex
readDeadline time.Time
deadlineCh chan struct{} // closed when deadline is set/changed
// readMu serialises Read so recvBuf (leftover from a previous read)
// cannot be observed or mutated by two readers at once.
readMu sync.Mutex
recvBuf []byte // leftover from previous read; guarded by readMu

mu sync.Mutex
closed bool
readDeadline time.Time
writeDeadline time.Time
deadlineCh chan struct{} // closed when deadline is set/changed
}

func (c *Conn) Read(b []byte) (int, error) {
c.readMu.Lock()
defer c.readMu.Unlock()

// Drain leftover first
if len(c.recvBuf) > 0 {
n := copy(b, c.recvBuf)
Expand Down Expand Up @@ -78,17 +94,36 @@ func (c *Conn) Read(b []byte) (int, error) {
}
}

// Write enqueues b to the local daemon over IPC, splitting it into
// maxSendChunk-sized cmdSend frames.
//
// Send semantics: a nil error and n == len(b) mean every chunk was handed
// to the local daemon over IPC — NOT that the bytes were transmitted on the
// wire or acknowledged by the peer. The Pilot stream layer in the daemon
// handles retransmission/ordering after this point; Write does not block on
// it. Errors reported here are local IPC write failures or a passed
// write deadline.
func (c *Conn) Write(b []byte) (int, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, protocol.ErrConnClosed
}
wdl := c.writeDeadline
c.mu.Unlock()

if !wdl.IsZero() && !time.Now().Before(wdl) {
return 0, os.ErrDeadlineExceeded
}

total := len(b)
written := 0
for written < total {
// Honour the write deadline between chunks so a large, slow write
// to a backed-up IPC socket cannot block past the deadline.
if !wdl.IsZero() && !time.Now().Before(wdl) {
return written, os.ErrDeadlineExceeded
}
chunk := total - written
if chunk > maxSendChunk {
chunk = maxSendChunk
Expand Down Expand Up @@ -125,7 +160,15 @@ func (c *Conn) LocalAddr() net.Addr { return pilotAddr(c.localAddr) }
func (c *Conn) RemoteAddr() net.Addr { return pilotAddr(c.remoteAddr) }

func (c *Conn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t)
c.mu.Lock()
c.readDeadline = t
c.writeDeadline = t
// Signal any blocked Read to re-check.
if c.deadlineCh != nil {
close(c.deadlineCh)
}
c.deadlineCh = make(chan struct{})
c.mu.Unlock()
return nil
}

Expand All @@ -141,7 +184,18 @@ func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}

func (c *Conn) SetWriteDeadline(t time.Time) error { return nil }
// SetWriteDeadline sets a deadline for Write. A passed deadline causes Write
// to return os.ErrDeadlineExceeded. Because Write never blocks waiting on a
// remote peer (it only enqueues chunks to the local daemon over IPC), the
// deadline is enforced before each chunk rather than via an interrupt — a
// zero time clears it. This satisfies the net.Conn contract instead of the
// previous silent no-op.
func (c *Conn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
c.writeDeadline = t
c.mu.Unlock()
return nil
}

// pilotAddr wraps SocketAddr to satisfy net.Addr.
type pilotAddr protocol.SocketAddr
Expand Down
48 changes: 21 additions & 27 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func DefaultSocketPath() string {
return "/tmp/pilot.sock"
}

// defaultDialTimeout bounds DialAddr / Listen / Broadcast so a wedged or
// non-responsive daemon can't block the caller forever. The daemon resolves
// + dials within this window in the normal case (direct punch or relay
// fallback both complete well under it); callers needing a tighter bound use
// DialAddrTimeout. Operations that legitimately block in the daemon
// (WaitForTrust) deliberately keep the unbounded sendAndWait path.
const defaultDialTimeout = 30 * time.Second

// Handshake sub-commands (must match daemon SubHandshake* constants)
const (
subHandshakeSend byte = 0x01
Expand Down Expand Up @@ -83,32 +91,11 @@ func (d *Driver) Dial(addr string) (*Conn, error) {
return d.DialAddr(sa.Addr, sa.Port)
}

// DialAddr opens a stream connection to a remote Addr + port.
// DialAddr opens a stream connection to a remote Addr + port. It applies
// defaultDialTimeout so a non-responsive daemon cannot block the caller
// indefinitely; use DialAddrTimeout to supply an explicit bound.
func (d *Driver) DialAddr(dst protocol.Addr, port uint16) (*Conn, error) {
msg := make([]byte, 1+protocol.AddrSize+2)
msg[0] = cmdDial
dst.MarshalTo(msg, 1)
binary.BigEndian.PutUint16(msg[1+protocol.AddrSize:], port)

resp, err := d.ipc.sendAndWait(msg, cmdDialOK)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}

if len(resp) < 4 {
return nil, fmt.Errorf("invalid dial response")
}

connID := binary.BigEndian.Uint32(resp[0:4])
recvCh := d.ipc.registerRecvCh(connID)

return &Conn{
id: connID,
remoteAddr: protocol.SocketAddr{Addr: dst, Port: port},
ipc: d.ipc,
recvCh: recvCh,
deadlineCh: make(chan struct{}),
}, nil
return d.DialAddrTimeout(dst, port, defaultDialTimeout)
}

// DialAddrTimeout opens a stream connection with a client-side timeout.
Expand Down Expand Up @@ -146,7 +133,7 @@ func (d *Driver) Listen(port uint16) (*Listener, error) {
msg[0] = cmdBind
binary.BigEndian.PutUint16(msg[1:3], port)

resp, err := d.ipc.sendAndWait(msg, cmdBindOK)
resp, err := d.ipc.sendAndWaitTimeout(msg, cmdBindOK, defaultDialTimeout)
if err != nil {
return nil, fmt.Errorf("bind: %w", err)
}
Expand All @@ -167,6 +154,13 @@ func (d *Driver) Listen(port uint16) (*Listener, error) {
// SendTo sends an unreliable unicast datagram to the given address:port.
// Broadcast addresses (Node=0xFFFFFFFF) are not accepted on this path; use
// Broadcast, which requires the daemon's admin token.
//
// Send semantics: this is fire-and-forget. A nil return means only that the
// frame was successfully enqueued to the local daemon over IPC — it does NOT
// indicate the datagram was transmitted on the wire, routed, or delivered to
// the peer. Datagrams are unreliable; there is no acknowledgement. The only
// errors reported are local IPC failures (empty/oversized frame, socket
// write error).
func (d *Driver) SendTo(dst protocol.Addr, port uint16, data []byte) error {
if dst.IsBroadcast() {
return fmt.Errorf("broadcast address requires admin token: use Driver.Broadcast")
Expand All @@ -192,7 +186,7 @@ func (d *Driver) Broadcast(netID uint16, port uint16, data []byte, adminToken st
binary.BigEndian.PutUint16(msg[5:7], uint16(len(tokenBytes)))
copy(msg[7:7+len(tokenBytes)], tokenBytes)
copy(msg[7+len(tokenBytes):], data)
if _, err := d.ipc.sendAndWait(msg, cmdBroadcastOK); err != nil {
if _, err := d.ipc.sendAndWaitTimeout(msg, cmdBroadcastOK, defaultDialTimeout); err != nil {
return err
}
return nil
Expand Down
Loading
Loading