Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/cli/broker_dial_darwin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:build darwin

package cli

import "syscall"

// controlSockType is the control-channel socket type used by the broker dial
// test. darwin's AF_UNIX has no SOCK_SEQPACKET support, so the native test
// falls back to SOCK_DGRAM, which preserves datagram boundaries identically for
// the CLI dialer's Sendmsg/Recvmsg+SCM_RIGHTS path. Production runners are
// Linux-only (SOCK_SEQPACKET).
const controlSockType = syscall.SOCK_DGRAM
9 changes: 9 additions & 0 deletions internal/cli/broker_dial_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build linux

package cli

import "syscall"

// controlSockType is the control-channel socket type used by the broker dial
// test. On Linux (production) the runner uses SOCK_SEQPACKET.
const controlSockType = syscall.SOCK_SEQPACKET
12 changes: 12 additions & 0 deletions internal/cli/broker_dial_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:build !unix

package cli

import (
"errors"
"net/http"
)

func newBrokerHTTPClient(int) *http.Client { return nil }

var errBrokerUnsupported = errors.New("flashduty: broker mode is not supported on this platform")
86 changes: 86 additions & 0 deletions internal/cli/broker_dial_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:build unix

package cli

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"sync"
"syscall"
"time"
)

// errBrokerUnsupported is returned when broker mode is requested on a build that
// cannot provide it. On unix this is effectively unreachable (newBrokerHTTPClient
// never returns nil), but defaultNewClient references it on every platform.
var errBrokerUnsupported = errors.New("flashduty: broker mode is not supported on this platform")

// brokerDialer owns the inherited control fd and serializes per-dial handshakes.
// Each Dial sends a 1-byte request datagram on the control channel and receives
// one dedicated SOCK_STREAM fd back via SCM_RIGHTS.
type brokerDialer struct {
mu sync.Mutex // serialize send+recv so concurrent dials don't cross fds
credFD int
}

func (d *brokerDialer) dial(_ context.Context, _, _ string) (net.Conn, error) {
d.mu.Lock()
defer d.mu.Unlock()

if err := syscall.Sendmsg(d.credFD, []byte{0x01}, nil, nil, 0); err != nil {
return nil, fmt.Errorf("broker handshake send: %w", err)
}
body := make([]byte, 1)
oob := make([]byte, syscall.CmsgSpace(4)) // room for exactly one fd
n, oobn, _, _, err := syscall.Recvmsg(d.credFD, body, oob, 0)
if err != nil {
return nil, fmt.Errorf("broker handshake recv: %w", err)
}
if n < 1 || body[0] != 0x01 {
return nil, fmt.Errorf("broker refused connection (code %v)", body[:n])
}
scms, err := syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
return nil, fmt.Errorf("broker parse scm: %w", err)
}
if len(scms) == 0 {
return nil, fmt.Errorf("broker sent no fd")
}
fds, err := syscall.ParseUnixRights(&scms[0])
if err != nil || len(fds) == 0 {
return nil, fmt.Errorf("broker parse rights: %w", err)
}
f := os.NewFile(uintptr(fds[0]), "broker-conn")
conn, err := net.FileConn(f) // dups + registers with the netpoller
_ = f.Close()
if err != nil {
return nil, fmt.Errorf("broker fileconn: %w", err)
}
return conn, nil
}

// newBrokerHTTPClient builds an *http.Client whose Transport.DialContext routes
// every connection over the inherited control fd. Timeout matches the SDK's
// historical default (30s) so behavior is unchanged for non-streaming calls;
// streaming export relies on request context like before.
func newBrokerHTTPClient(credFD int) *http.Client {
d := &brokerDialer{credFD: credFD}
return &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: d.dial,
DisableCompression: false,
MaxIdleConns: 0,
// All dials target the same logical host (the broker sentinel base
// URL) over the one control fd, so a single idle keep-alive conn is
// enough for pagination loops; cap it so dispatched conns don't linger.
MaxIdleConnsPerHost: 1,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 0,
},
}
}
Loading
Loading