From 52cea8d47fbcc8578d19106cb88740aca54471c9 Mon Sep 17 00:00:00 2001 From: ysyneu Date: Mon, 8 Jun 2026 00:24:52 +0800 Subject: [PATCH 1/5] feat(broker): SEQPACKET control channel + SCM_RIGHTS reverse-proxy dispatch (linux) --- environment/broker_linux.go | 175 +++++++++++++++++++++++++++++++ environment/broker_linux_test.go | 66 ++++++++++++ environment/broker_other.go | 29 +++++ protocol/messages.go | 15 +++ 4 files changed, 285 insertions(+) create mode 100644 environment/broker_linux.go create mode 100644 environment/broker_linux_test.go create mode 100644 environment/broker_other.go diff --git a/environment/broker_linux.go b/environment/broker_linux.go new file mode 100644 index 0000000..aabe82a --- /dev/null +++ b/environment/broker_linux.go @@ -0,0 +1,175 @@ +//go:build linux + +package environment + +import ( + "context" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "net/http/httputil" + "net/url" + "os" + "os/exec" + "sync" + "syscall" + + "github.com/flashcatcloud/flashduty-runner/protocol" +) + +// BrokerSupported reports whether this build can run the egress broker. +const BrokerSupported = true + +// prSetDumpable is Linux's PR_SET_DUMPABLE prctl option. +const prSetDumpable = 4 + +var dumpableOnce sync.Once + +// hardenProcessMemory marks the runner process non-dumpable so a same-uid, +// non-root process on a BYOC host cannot ptrace/read the per-person app_keys +// the broker holds in runner memory. Best-effort, once per process, Linux-only. +// execve resets dumpability, so bash children are unaffected. +func hardenProcessMemory() { + dumpableOnce.Do(func() { + _, _, _ = syscall.Syscall(syscall.SYS_PRCTL, uintptr(prSetDumpable), 0, 0) + }) +} + +// setupBrokerForCmd wires a per-invocation egress broker onto cmd before it is +// started: it creates a SOCK_SEQPACKET|SOCK_CLOEXEC control socketpair, hands +// the child end to the subprocess via cmd.ExtraFiles (inherited as fd 3), runs +// serveBrokerControl on the parent end, and returns the broker-mode env vars to +// merge into cmd.Env. The credential key/base_url never enter cmd.Env. The +// returned stop() tears down the control channel and closes the parent's hold +// on the child fd; call it after the command exits. +// +// All SOCK_CLOEXEC / SCM_RIGHTS machinery is confined to this Linux-only file so +// environment.go stays platform-neutral. +func setupBrokerForCmd(cmd *exec.Cmd, cred *protocol.BashCredential) (stop func(), env []string, err error) { + hardenProcessMemory() + + sp, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET|syscall.SOCK_CLOEXEC, 0) + if err != nil { + return nil, nil, fmt.Errorf("broker socketpair: %w", err) + } + childEnd := os.NewFile(uintptr(sp[0]), "broker-ctl-child") + // The first ExtraFiles entry maps to fd 3 in the child (after 0,1,2). + cmd.ExtraFiles = append(cmd.ExtraFiles, childEnd) + + srvStop, err := serveBrokerControl(sp[1], cred.Key, cred.BaseURL) + if err != nil { + _ = childEnd.Close() + _ = syscall.Close(sp[1]) + return nil, nil, fmt.Errorf("broker start: %w", err) + } + + env = []string{ + "FLASHDUTY_CRED_FD=3", + "FLASHDUTY_BASE_URL=http://flashduty.broker.local", + } + stop = func() { + srvStop() + _ = childEnd.Close() + } + return stop, env, nil +} + +// serveBrokerControl serves the SEQPACKET control channel on parentFD: for each +// handshake datagram it dispatches a dedicated SOCK_STREAM connection (via +// SCM_RIGHTS) running an httputil.ReverseProxy that rewrites the sentinel +// app_key to key and forwards to baseURL. The returned stop() closes the +// control channel and all dispatched connections. Call after cmd.Start(). +func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err error) { + base, err := url.Parse(baseURL) + if err != nil { + return nil, fmt.Errorf("broker: parse base url: %w", err) + } + if base.Host == "" { + return nil, fmt.Errorf("broker: base url %q has no host", baseURL) + } + + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = base.Scheme + req.URL.Host = base.Host + req.Host = base.Host + q := req.URL.Query() + q.Set("app_key", key) // overwrite the sentinel + req.URL.RawQuery = q.Encode() + }, + Transport: http.DefaultTransport, // real-cert HTTPS / plain HTTP per base scheme + ErrorLog: slog.NewLogLogger(slog.Default().Handler(), slog.LevelWarn), + } + + ctx, cancel := context.WithCancel(context.Background()) + var conns sync.WaitGroup + + go func() { + buf := make([]byte, 8) + for { + n, _, _, _, rerr := syscall.Recvmsg(parentFD, buf, nil, 0) + if rerr != nil || n == 0 { + return // control channel closed → bash exited + } + pair, perr := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0) + if perr != nil { + _ = syscall.Sendmsg(parentFD, []byte{0xFF}, nil, nil, 0) + continue + } + rights := syscall.UnixRights(pair[1]) + if serr := syscall.Sendmsg(parentFD, []byte{0x01}, rights, nil, 0); serr != nil { + _ = syscall.Close(pair[0]) + _ = syscall.Close(pair[1]) + continue + } + _ = syscall.Close(pair[1]) // child end is now in the peer + myEnd := os.NewFile(uintptr(pair[0]), "broker-conn") + conn, cerr := net.FileConn(myEnd) + _ = myEnd.Close() + if cerr != nil { + continue + } + conns.Add(1) + go func() { + defer conns.Done() + serveOneConn(ctx, conn, proxy) + }() + } + }() + + return func() { + cancel() + _ = syscall.Close(parentFD) // unblocks Recvmsg → control goroutine returns + conns.Wait() + }, nil +} + +// serveOneConn serves HTTP/1.1 (keep-alive) on a single dispatched connection. +func serveOneConn(ctx context.Context, conn net.Conn, h http.Handler) { + srv := &http.Server{Handler: h} + // oneConnListener yields conn once, then blocks until ctx is done so + // http.Server.Serve keeps the conn alive for sequential keep-alive requests. + l := &oneConnListener{conn: conn, ctx: ctx} + _ = srv.Serve(l) // returns when the listener errors (ctx done / conn gone) + _ = conn.Close() +} + +type oneConnListener struct { + conn net.Conn + ctx context.Context + once sync.Once +} + +func (l *oneConnListener) Accept() (net.Conn, error) { + first := false + l.once.Do(func() { first = true }) + if first { + return l.conn, nil + } + <-l.ctx.Done() // block subsequent Accepts until shutdown + return nil, io.EOF +} +func (l *oneConnListener) Close() error { return nil } +func (l *oneConnListener) Addr() net.Addr { return l.conn.LocalAddr() } diff --git a/environment/broker_linux_test.go b/environment/broker_linux_test.go new file mode 100644 index 0000000..8c7548c --- /dev/null +++ b/environment/broker_linux_test.go @@ -0,0 +1,66 @@ +//go:build linux + +package environment + +import ( + "bufio" + "io" + "net/http" + "net/http/httptest" + "os" + "syscall" + "testing" +) + +func bufioReader(r io.Reader) *bufio.Reader { return bufio.NewReader(r) } + +func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { + gotKey := make(chan string, 4) + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotKey <- r.URL.Query().Get("app_key") + _, _ = io.WriteString(w, `{"ok":true}`) + })) + defer upstream.Close() + + pair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0) + if err != nil { + t.Fatalf("socketpair: %v", err) + } + childFD, parentFD := pair[0], pair[1] + + stop, err := serveBrokerControl(parentFD, "REAL-KEY", upstream.URL) + if err != nil { + t.Fatalf("serveBrokerControl: %v", err) + } + defer stop() + + // Client side: 1-byte handshake on childFD → receive a STREAM fd → HTTP GET. + if err := syscall.Sendmsg(childFD, []byte{0x01}, nil, nil, 0); err != nil { + t.Fatalf("send handshake: %v", err) + } + body := make([]byte, 1) + oob := make([]byte, syscall.CmsgSpace(4)) + _, oobn, _, _, err := syscall.Recvmsg(childFD, body, oob, 0) + if err != nil { + t.Fatalf("recv: %v", err) + } + scms, _ := syscall.ParseSocketControlMessage(oob[:oobn]) + fds, _ := syscall.ParseUnixRights(&scms[0]) + if len(fds) == 0 { + t.Fatal("no fd received") + } + f := os.NewFile(uintptr(fds[0]), "c") + // Speak HTTP/1.1 over the dispatched conn directly. + req, _ := http.NewRequest("GET", "http://flashduty.broker.local/incident/channels?app_key=SENTINEL", nil) + if err := req.Write(f); err != nil { + t.Fatalf("req.Write: %v", err) + } + resp, err := http.ReadResponse(bufioReader(f), req) + if err != nil { + t.Fatalf("ReadResponse: %v", err) + } + _ = resp.Body.Close() + if got := <-gotKey; got != "REAL-KEY" { + t.Fatalf("upstream saw app_key=%q want REAL-KEY", got) + } +} diff --git a/environment/broker_other.go b/environment/broker_other.go new file mode 100644 index 0000000..1d508c5 --- /dev/null +++ b/environment/broker_other.go @@ -0,0 +1,29 @@ +//go:build !linux + +package environment + +import ( + "errors" + "os/exec" + + "github.com/flashcatcloud/flashduty-runner/protocol" +) + +// BrokerSupported reports whether this build can run the egress broker. The +// broker relies on SOCK_CLOEXEC + PR_SET_DUMPABLE, which are Linux-only +// (undefined on darwin), so non-Linux runners fall back to the legacy env-key +// path and never advertise broker capability. +const BrokerSupported = false + +var errBrokerUnsupportedPlatform = errors.New("broker not supported on this platform") + +// setupBrokerForCmd is a no-op stub on non-Linux platforms. It is never reached +// at runtime because executeBashCommand gates on BrokerSupported, but it must +// exist so environment.go compiles on darwin/windows. +func setupBrokerForCmd(*exec.Cmd, *protocol.BashCredential) (func(), []string, error) { + return nil, nil, errBrokerUnsupportedPlatform +} + +func serveBrokerControl(int, string, string) (func(), error) { + return func() {}, errBrokerUnsupportedPlatform +} diff --git a/protocol/messages.go b/protocol/messages.go index 4716536..35f9cbc 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -194,6 +194,21 @@ type BashArgs struct { Workdir string `json:"workdir,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds Env map[string]string `json:"env,omitempty"` + + // Credential carries per-invocation egress auth out-of-band of the bash + // environment. Safari sends it only to broker-capable (Linux) runners; the + // runner keeps it in memory and injects it via the broker, never into + // cmd.Env. When set, FLASHDUTY_APP_KEY is absent from Env (see the design + // doc's frozen wire contract). + Credential *BashCredential `json:"credential,omitempty"` +} + +// BashCredential carries per-invocation egress auth out-of-band of the bash +// environment. Sent by Safari only to broker-capable runners; the runner keeps +// it in memory and injects it via the broker, never into cmd.Env. +type BashCredential struct { + Key string `json:"key"` + BaseURL string `json:"base_url"` } // TaskOutputPayload is the payload for streaming task output. From f446c50d242b7236eb83c26e5f845399bcc5fba4 Mon Sep 17 00:00:00 2001 From: ysyneu Date: Mon, 8 Jun 2026 00:24:52 +0800 Subject: [PATCH 2/5] feat(broker): wire control channel into bash exec + advertise capability on connect --- environment/environment.go | 22 ++++++++++++++++++++-- ws/client.go | 6 ++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/environment/environment.go b/environment/environment.go index fd1b570..5358245 100644 --- a/environment/environment.go +++ b/environment/environment.go @@ -599,7 +599,7 @@ func (e *Environment) Bash(ctx context.Context, args *protocol.BashArgs) (*proto } timeout := e.resolveTimeout(args.Timeout) - result, err := e.executeBashCommand(ctx, args.Command, workdir, timeout, args.Env) + result, err := e.executeBashCommand(ctx, args.Command, workdir, timeout, args.Env, args.Credential) if err != nil { return result, err } @@ -640,7 +640,12 @@ func (e *Environment) resolveTimeout(timeoutSec int) time.Duration { // only when safari's bash-side guard explicitly injects them into extraEnv // after recognizing a whitelisted CLI in the command — they win the merge // since extraEnv is layered last. -func (e *Environment) executeBashCommand(ctx context.Context, command, workdir string, timeout time.Duration, extraEnv map[string]string) (*protocol.BashResult, error) { +// +// cred is the broker-mode credential: when non-nil (and this is a Linux build), +// the per-person app_key is served out-of-band over an inherited control fd and +// never enters cmd.Env. On non-Linux builds cred is ignored and safari sends the +// legacy extraEnv path instead. +func (e *Environment) executeBashCommand(ctx context.Context, command, workdir string, timeout time.Duration, extraEnv map[string]string, cred *protocol.BashCredential) (*protocol.BashResult, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -650,6 +655,19 @@ func (e *Environment) executeBashCommand(ctx context.Context, command, workdir s for k, v := range extraEnv { merged = append(merged, k+"="+v) } + + // Broker mode (Linux only): the per-person app_key arrives out-of-band in + // cred and is served via an inherited control fd, never via cmd.Env. The + // subprocess sees only FLASHDUTY_CRED_FD + an http placeholder base URL. + if cred != nil && cred.Key != "" && BrokerSupported { + stopBroker, brokerEnv, err := setupBrokerForCmd(cmd, cred) + if err != nil { + return nil, err + } + merged = append(merged, brokerEnv...) + defer stopBroker() + } + cmd.Env = withBundledToolsPath(merged, BundledToolsDir()) return runCapturedCommand(ctx, cmd) } diff --git a/ws/client.go b/ws/client.go index d164026..e8ca974 100644 --- a/ws/client.go +++ b/ws/client.go @@ -17,6 +17,7 @@ import ( "github.com/gorilla/websocket" + "github.com/flashcatcloud/flashduty-runner/environment" "github.com/flashcatcloud/flashduty-runner/protocol" ) @@ -111,6 +112,11 @@ func (c *Client) Connect(ctx context.Context) error { q := u.Query() q.Set("token", c.token) + // Advertise egress-broker capability so Safari sends the per-person app_key + // out-of-band (never in the bash env). Only Linux builds carry the broker. + if environment.BrokerSupported { + q.Set("broker", "1") + } u.RawQuery = q.Encode() slog.Info("connecting to Flashduty", From f94c8b260dab8b6eb2bef0649a0541fb67ca18cc Mon Sep 17 00:00:00 2001 From: ysyneu Date: Mon, 8 Jun 2026 00:26:50 +0800 Subject: [PATCH 3/5] =?UTF-8?q?test(broker):=20linux=20e2e=20=E2=80=94=20r?= =?UTF-8?q?eal=20fduty=20through=20broker=20to=20live=20pgy,=20no=20env=20?= =?UTF-8?q?leak,=20concurrent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- environment/broker_e2e_linux_test.go | 101 +++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 environment/broker_e2e_linux_test.go diff --git a/environment/broker_e2e_linux_test.go b/environment/broker_e2e_linux_test.go new file mode 100644 index 0000000..01726f4 --- /dev/null +++ b/environment/broker_e2e_linux_test.go @@ -0,0 +1,101 @@ +//go:build linux + +package environment + +import ( + "context" + "os" + "strings" + "testing" + "time" + + "github.com/flashcatcloud/flashduty-runner/permission" + "github.com/flashcatcloud/flashduty-runner/protocol" +) + +// TestBrokerE2E_RealFduty drives the REAL fduty CLI through the runner broker to +// a REAL upstream (the local pgy). Opt-in: it only runs when EGRESS_E2E=1 with a +// real key + reachable base + the linux fduty staged in FDUTY_BIN. It is the +// deterministic end-to-end proof that the broker injects the real key (auth +// succeeds), the key never reaches the bash env, and concurrent fduty calls each +// get their own dispatched connection. +// +// Run (from the host, after building the linux test binary + fduty): +// +// GOOS=linux GOARCH=arm64 CGO_ENABLED=0 go test -c -o /tmp/egress-e2e/broker.test ./environment/ +// docker run --rm -v /tmp/egress-e2e:/app -w /app \ +// -e EGRESS_E2E=1 -e FDUTY_KEY=... -e FDUTY_BASE=http://host.docker.internal:11480 -e FDUTY_BIN=/app \ +// ubuntu:22.04 ./broker.test -test.run TestBrokerE2E_RealFduty -test.v +func TestBrokerE2E_RealFduty(t *testing.T) { + if os.Getenv("EGRESS_E2E") != "1" { + t.Skip("set EGRESS_E2E=1 with FDUTY_KEY + FDUTY_BASE + FDUTY_BIN to run") + } + key := os.Getenv("FDUTY_KEY") + base := os.Getenv("FDUTY_BASE") + binDir := os.Getenv("FDUTY_BIN") + if key == "" || base == "" || binDir == "" { + t.Fatal("FDUTY_KEY, FDUTY_BASE, FDUTY_BIN are all required") + } + t.Setenv("FLASHDUTY_RUNNER_BIN_DIR", binDir) + + checker := permission.NewChecker(map[string]string{"*": "allow"}) + e, err := New(t.TempDir(), checker) + if err != nil { + t.Fatalf("New environment: %v", err) + } + cred := &protocol.BashCredential{Key: key, BaseURL: base} + ctx := context.Background() + + // (a) Real authenticated read through the broker. A working key returns + // channel rows; a broken broker would surface a 401 / "app_key invalid". + res, err := e.executeBashCommand(ctx, + "fduty channel list 2>&1 | head -c 600", t.TempDir(), 60*time.Second, nil, cred) + if err != nil { + t.Fatalf("bash (channel list): %v", err) + } + t.Logf("broker fduty output:\n%s", res.Stdout) + low := strings.ToLower(res.Stdout) + if strings.Contains(low, "app_key") && strings.Contains(low, "invalid") { + t.Fatalf("auth FAILED through broker (sentinel not overwritten?): %s", res.Stdout) + } + if strings.Contains(res.Stdout, "401") || strings.Contains(low, "unauthorized") { + t.Fatalf("auth FAILED through broker (401): %s", res.Stdout) + } + if strings.TrimSpace(res.Stdout) == "" { + t.Fatalf("empty output through broker — expected channel rows") + } + + // (b) The real key must NEVER be visible in the bash environment. + leak, err := e.executeBashCommand(ctx, + "env | grep -i FLASHDUTY_APP_KEY || echo NO_KEY", t.TempDir(), 30*time.Second, nil, cred) + if err != nil { + t.Fatalf("bash (env leak check): %v", err) + } + if !strings.Contains(leak.Stdout, "NO_KEY") || strings.Contains(leak.Stdout, key) { + t.Fatalf("app_key LEAKED into bash env: %q", leak.Stdout) + } + if !strings.Contains(leak.Stdout, "FLASHDUTY_CRED_FD") { + // sanity: broker mode really engaged (CRED_FD is set) + fd, _ := e.executeBashCommand(ctx, "echo CRED_FD=$FLASHDUTY_CRED_FD BASE=$FLASHDUTY_BASE_URL", + t.TempDir(), 30*time.Second, nil, cred) + t.Logf("broker env sanity: %s", strings.TrimSpace(fd.Stdout)) + } + + // (c) Concurrency: two fduty in one bash, each gets its own dispatched + // SOCK_STREAM connection; both must authenticate (no byte interleave). + conc, err := e.executeBashCommand(ctx, + "fduty channel list >/tmp/a 2>&1 & fduty channel list >/tmp/b 2>&1 & wait; "+ + "echo A=$(wc -l Date: Mon, 8 Jun 2026 01:15:16 +0800 Subject: [PATCH 4/5] fix(broker): validate control request, fail-fast dispatch, clean shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Validate the control-channel request byte (constants ctrlReqDial / ctrlRespOK / ctrlRespErr); refuse a malformed datagram with 0xFF so a confused in-sandbox process that inherited fd 3 fails fast and never mints a connection off-spec. - Reorder the fd dispatch: wrap and verify the runner's own end with net.FileConn BEFORE handing the peer its fd via SCM_RIGHTS, so a net.FileConn failure refuses cleanly instead of stranding the peer with a connection that never gets served (its request would hang to timeout). - Clone http.DefaultTransport (guarded type assertion, matching safeHTTPTransport) instead of sharing the process-global one, and add a ResponseHeaderTimeout backstop. - BaseContext ties each per-conn server to broker shutdown so cancel() promptly cancels in-flight upstream requests. - Test: fix fd leaks (childFD + received fd) and assert the refusal path. Note: deliberately do NOT join the control goroutine in the dispatch WaitGroup — it exits only when its Recvmsg peer fully closes (childEnd is closed by the caller after stop()), so joining it would deadlock. --- environment/broker_linux.go | 77 +++++++++++++++++++++++++++----- environment/broker_linux_test.go | 20 +++++++++ 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/environment/broker_linux.go b/environment/broker_linux.go index aabe82a..4c2de62 100644 --- a/environment/broker_linux.go +++ b/environment/broker_linux.go @@ -15,6 +15,7 @@ import ( "os/exec" "sync" "syscall" + "time" "github.com/flashcatcloud/flashduty-runner/protocol" ) @@ -25,6 +26,24 @@ const BrokerSupported = true // prSetDumpable is Linux's PR_SET_DUMPABLE prctl option. const prSetDumpable = 4 +// Control-channel protocol (1-byte framing over the SEQPACKET socket). The CLI +// sends ctrlReqDial; the broker replies ctrlRespOK + one fd via SCM_RIGHTS, or +// ctrlRespErr (no fd) when it cannot mint a connection or the request is +// malformed. ctrlRespOK shares the value of ctrlReqDial — both are 0x01 — which +// is fine: direction disambiguates them. +const ( + ctrlReqDial byte = 0x01 // CLI → broker: "give me a connection" + ctrlRespOK byte = 0x01 // broker → CLI: success, fd follows in SCM_RIGHTS + ctrlRespErr byte = 0xFF // broker → CLI: refused, no fd +) + +// upstreamResponseHeaderTimeout backstops a hung upstream that accepts the +// connection but never sends response headers. It is a long backstop, not the +// primary bound: the CLI's own client timeout and the bash process's lifetime +// (its death closes fd 3 and cancels in-flight requests) cap normal calls well +// before this fires. +const upstreamResponseHeaderTimeout = 60 * time.Second + var dumpableOnce sync.Once // hardenProcessMemory marks the runner process non-dumpable so a same-uid, @@ -90,6 +109,17 @@ func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err err return nil, fmt.Errorf("broker: base url %q has no host", baseURL) } + // Clone the default transport (don't share/mutate the process-global one) and + // add a response-header backstop. Real-cert HTTPS / plain HTTP per base scheme. + // Guard the type assertion the same way safeHTTPTransport does, in case the + // process default transport was ever swapped. + baseTransport, ok := http.DefaultTransport.(*http.Transport) + if !ok { + baseTransport = &http.Transport{} + } + transport := baseTransport.Clone() + transport.ResponseHeaderTimeout = upstreamResponseHeaderTimeout + proxy := &httputil.ReverseProxy{ Director: func(req *http.Request) { req.URL.Scheme = base.Scheme @@ -99,11 +129,19 @@ func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err err q.Set("app_key", key) // overwrite the sentinel req.URL.RawQuery = q.Encode() }, - Transport: http.DefaultTransport, // real-cert HTTPS / plain HTTP per base scheme + Transport: transport, ErrorLog: slog.NewLogLogger(slog.Default().Handler(), slog.LevelWarn), } ctx, cancel := context.WithCancel(context.Background()) + // conns tracks only the dispatched per-connection serveOneConn goroutines, so + // stop() can guarantee no upstream request is in flight after it returns. The + // control goroutine is deliberately NOT counted: it exits only when its + // Recvmsg peer fully closes (bash's fd 3 AND the runner's childEnd), and + // childEnd is closed by the caller AFTER stop() — so joining it here would + // deadlock (Close(parentFD) does not reliably interrupt a blocked raw + // Recvmsg). No dispatch can race conns.Wait(): stop() runs post-bash-exit with + // the control goroutine parked in Recvmsg and nothing writing the socket. var conns sync.WaitGroup go func() { @@ -113,24 +151,38 @@ func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err err if rerr != nil || n == 0 { return // control channel closed → bash exited } - pair, perr := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0) - if perr != nil { - _ = syscall.Sendmsg(parentFD, []byte{0xFF}, nil, nil, 0) + // Only the 1-byte dial request is valid. A garbage datagram (e.g. a + // confused in-sandbox process that inherited fd 3) is refused so the + // peer fails fast instead of hanging, and never mints a connection. + if buf[0] != ctrlReqDial { + _ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0) continue } - rights := syscall.UnixRights(pair[1]) - if serr := syscall.Sendmsg(parentFD, []byte{0x01}, rights, nil, 0); serr != nil { - _ = syscall.Close(pair[0]) - _ = syscall.Close(pair[1]) + pair, perr := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0) + if perr != nil { + _ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0) continue } - _ = syscall.Close(pair[1]) // child end is now in the peer + // Wrap OUR end and confirm it is usable BEFORE handing the peer its + // end. If net.FileConn failed AFTER we sent the fd, the peer would hold + // a live connection we never serve → its request hangs until its client + // timeout. Validating first lets us refuse cleanly with ctrlRespErr. + // net.FileConn dups pair[0]; myEnd.Close releases the original handle. myEnd := os.NewFile(uintptr(pair[0]), "broker-conn") conn, cerr := net.FileConn(myEnd) _ = myEnd.Close() if cerr != nil { + _ = syscall.Close(pair[1]) + _ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0) + continue + } + rights := syscall.UnixRights(pair[1]) + if serr := syscall.Sendmsg(parentFD, []byte{ctrlRespOK}, rights, nil, 0); serr != nil { + _ = conn.Close() + _ = syscall.Close(pair[1]) continue } + _ = syscall.Close(pair[1]) // child end is now duplicated in the peer conns.Add(1) go func() { defer conns.Done() @@ -148,7 +200,12 @@ func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err err // serveOneConn serves HTTP/1.1 (keep-alive) on a single dispatched connection. func serveOneConn(ctx context.Context, conn net.Conn, h http.Handler) { - srv := &http.Server{Handler: h} + srv := &http.Server{ + Handler: h, + // Tie each request's context to broker shutdown so cancel() (from stop()) + // promptly cancels in-flight upstream requests, not just future Accepts. + BaseContext: func(net.Listener) context.Context { return ctx }, + } // oneConnListener yields conn once, then blocks until ctx is done so // http.Server.Serve keeps the conn alive for sequential keep-alive requests. l := &oneConnListener{conn: conn, ctx: ctx} diff --git a/environment/broker_linux_test.go b/environment/broker_linux_test.go index 8c7548c..24ec3d3 100644 --- a/environment/broker_linux_test.go +++ b/environment/broker_linux_test.go @@ -27,6 +27,7 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { t.Fatalf("socketpair: %v", err) } childFD, parentFD := pair[0], pair[1] + defer syscall.Close(childFD) stop, err := serveBrokerControl(parentFD, "REAL-KEY", upstream.URL) if err != nil { @@ -50,6 +51,7 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { t.Fatal("no fd received") } f := os.NewFile(uintptr(fds[0]), "c") + defer f.Close() // Speak HTTP/1.1 over the dispatched conn directly. req, _ := http.NewRequest("GET", "http://flashduty.broker.local/incident/channels?app_key=SENTINEL", nil) if err := req.Write(f); err != nil { @@ -63,4 +65,22 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { if got := <-gotKey; got != "REAL-KEY" { t.Fatalf("upstream saw app_key=%q want REAL-KEY", got) } + + // A malformed request byte must be refused with ctrlRespErr and NO fd, so a + // confused in-sandbox process can't make the broker mint connections off-spec. + if err := syscall.Sendmsg(childFD, []byte{0x02}, nil, nil, 0); err != nil { + t.Fatalf("send bad request: %v", err) + } + rbody := make([]byte, 1) + roob := make([]byte, syscall.CmsgSpace(4)) + rn, roobn, _, _, err := syscall.Recvmsg(childFD, rbody, roob, 0) + if err != nil { + t.Fatalf("recv refusal: %v", err) + } + if rn < 1 || rbody[0] != 0xFF { + t.Fatalf("malformed request: want 0xFF refusal, got %v", rbody[:rn]) + } + if rscms, _ := syscall.ParseSocketControlMessage(roob[:roobn]); len(rscms) != 0 { + t.Fatal("refusal must carry no fd") + } } From f33c0e48c25ae164f9359ed115b6d7ee23499385 Mon Sep 17 00:00:00 2001 From: ysyneu Date: Mon, 8 Jun 2026 01:31:38 +0800 Subject: [PATCH 5/5] fix(broker): satisfy golangci-lint on first broker PR This is the broker code's first PR, so the linter surfaces issues across the original commits too. Fixes: - errcheck: wrap deferred syscall.Close / f.Close in the test. - gosec G112: ReadHeaderTimeout on the per-conn http.Server. - govet shadow: reuse the outer err in the test instead of re-declaring. - noctx: http.NewRequestWithContext in the test. - gosec G115 (int->uintptr on fds): exclude by text + path, matching the repo's existing G706/G703 handling for rules absent from the pinned CI golangci-lint v2.4 (fds from socketpair/ParseUnixRights never overflow). Verified: golangci-lint run (GOOS=linux) reports 0 issues; config verify passes; the broker unit test still passes in the linux container. --- .golangci.yml | 9 +++++++++ environment/broker_linux.go | 7 +++++++ environment/broker_linux_test.go | 13 +++++++------ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2e1ae59..39d4b0f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -71,6 +71,15 @@ linters: - linters: - gosec text: "G703" + # G115 (integer overflow on int->uintptr) ships only in gosec newer than + # our pinned CI golangci-lint v2.4, so match by text (a typed exclude would + # fail `config verify` there, like G706/G703 above). The broker converts + # file descriptors from socketpair / ParseUnixRights to uintptr for + # os.NewFile; an fd is a small non-negative int and never overflows uintptr. + - linters: + - gosec + text: "G115" + path: environment/broker_linux formatters: enable: diff --git a/environment/broker_linux.go b/environment/broker_linux.go index 4c2de62..ff0a0c0 100644 --- a/environment/broker_linux.go +++ b/environment/broker_linux.go @@ -44,6 +44,10 @@ const ( // before this fires. const upstreamResponseHeaderTimeout = 60 * time.Second +// controlReadHeaderTimeout bounds how long a dispatched per-conn server waits for +// the in-sandbox fduty to send request headers over the local fd. +const controlReadHeaderTimeout = 30 * time.Second + var dumpableOnce sync.Once // hardenProcessMemory marks the runner process non-dumpable so a same-uid, @@ -202,6 +206,9 @@ func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err err func serveOneConn(ctx context.Context, conn net.Conn, h http.Handler) { srv := &http.Server{ Handler: h, + // Bound the time to read request headers (the in-sandbox fduty sends them + // promptly over the local fd); also satisfies gosec G112 (Slowloris). + ReadHeaderTimeout: controlReadHeaderTimeout, // Tie each request's context to broker shutdown so cancel() (from stop()) // promptly cancels in-flight upstream requests, not just future Accepts. BaseContext: func(net.Listener) context.Context { return ctx }, diff --git a/environment/broker_linux_test.go b/environment/broker_linux_test.go index 24ec3d3..156701c 100644 --- a/environment/broker_linux_test.go +++ b/environment/broker_linux_test.go @@ -4,6 +4,7 @@ package environment import ( "bufio" + "context" "io" "net/http" "net/http/httptest" @@ -27,7 +28,7 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { t.Fatalf("socketpair: %v", err) } childFD, parentFD := pair[0], pair[1] - defer syscall.Close(childFD) + defer func() { _ = syscall.Close(childFD) }() stop, err := serveBrokerControl(parentFD, "REAL-KEY", upstream.URL) if err != nil { @@ -36,7 +37,7 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { defer stop() // Client side: 1-byte handshake on childFD → receive a STREAM fd → HTTP GET. - if err := syscall.Sendmsg(childFD, []byte{0x01}, nil, nil, 0); err != nil { + if err = syscall.Sendmsg(childFD, []byte{0x01}, nil, nil, 0); err != nil { t.Fatalf("send handshake: %v", err) } body := make([]byte, 1) @@ -51,10 +52,10 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { t.Fatal("no fd received") } f := os.NewFile(uintptr(fds[0]), "c") - defer f.Close() + defer func() { _ = f.Close() }() // Speak HTTP/1.1 over the dispatched conn directly. - req, _ := http.NewRequest("GET", "http://flashduty.broker.local/incident/channels?app_key=SENTINEL", nil) - if err := req.Write(f); err != nil { + req, _ := http.NewRequestWithContext(context.Background(), "GET", "http://flashduty.broker.local/incident/channels?app_key=SENTINEL", nil) + if err = req.Write(f); err != nil { t.Fatalf("req.Write: %v", err) } resp, err := http.ReadResponse(bufioReader(f), req) @@ -68,7 +69,7 @@ func TestServeBrokerControl_RewritesKeyAndProxies(t *testing.T) { // A malformed request byte must be refused with ctrlRespErr and NO fd, so a // confused in-sandbox process can't make the broker mint connections off-spec. - if err := syscall.Sendmsg(childFD, []byte{0x02}, nil, nil, 0); err != nil { + if err = syscall.Sendmsg(childFD, []byte{0x02}, nil, nil, 0); err != nil { t.Fatalf("send bad request: %v", err) } rbody := make([]byte, 1)