Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ linters:
- linters:
- gosec
text: "G703"
# G115 (integer overflow on int->uintptr) ships only in gosec newer than
# our pinned CI golangci-lint v2.4, so match by text (a typed exclude would
# fail `config verify` there, like G706/G703 above). The broker converts
# file descriptors from socketpair / ParseUnixRights to uintptr for
# os.NewFile; an fd is a small non-negative int and never overflows uintptr.
- linters:
- gosec
text: "G115"
path: environment/broker_linux

formatters:
enable:
Expand Down
101 changes: 101 additions & 0 deletions environment/broker_e2e_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//go:build linux

package environment

import (
"context"
"os"
"strings"
"testing"
"time"

"github.com/flashcatcloud/flashduty-runner/permission"
"github.com/flashcatcloud/flashduty-runner/protocol"
)

// TestBrokerE2E_RealFduty drives the REAL fduty CLI through the runner broker to
// a REAL upstream (the local pgy). Opt-in: it only runs when EGRESS_E2E=1 with a
// real key + reachable base + the linux fduty staged in FDUTY_BIN. It is the
// deterministic end-to-end proof that the broker injects the real key (auth
// succeeds), the key never reaches the bash env, and concurrent fduty calls each
// get their own dispatched connection.
//
// Run (from the host, after building the linux test binary + fduty):
//
// GOOS=linux GOARCH=arm64 CGO_ENABLED=0 go test -c -o /tmp/egress-e2e/broker.test ./environment/
// docker run --rm -v /tmp/egress-e2e:/app -w /app \
// -e EGRESS_E2E=1 -e FDUTY_KEY=... -e FDUTY_BASE=http://host.docker.internal:11480 -e FDUTY_BIN=/app \
// ubuntu:22.04 ./broker.test -test.run TestBrokerE2E_RealFduty -test.v
func TestBrokerE2E_RealFduty(t *testing.T) {
if os.Getenv("EGRESS_E2E") != "1" {
t.Skip("set EGRESS_E2E=1 with FDUTY_KEY + FDUTY_BASE + FDUTY_BIN to run")
}
key := os.Getenv("FDUTY_KEY")
base := os.Getenv("FDUTY_BASE")
binDir := os.Getenv("FDUTY_BIN")
if key == "" || base == "" || binDir == "" {
t.Fatal("FDUTY_KEY, FDUTY_BASE, FDUTY_BIN are all required")
}
t.Setenv("FLASHDUTY_RUNNER_BIN_DIR", binDir)

checker := permission.NewChecker(map[string]string{"*": "allow"})
e, err := New(t.TempDir(), checker)
if err != nil {
t.Fatalf("New environment: %v", err)
}
cred := &protocol.BashCredential{Key: key, BaseURL: base}
ctx := context.Background()

// (a) Real authenticated read through the broker. A working key returns
// channel rows; a broken broker would surface a 401 / "app_key invalid".
res, err := e.executeBashCommand(ctx,
"fduty channel list 2>&1 | head -c 600", t.TempDir(), 60*time.Second, nil, cred)
if err != nil {
t.Fatalf("bash (channel list): %v", err)
}
t.Logf("broker fduty output:\n%s", res.Stdout)
low := strings.ToLower(res.Stdout)
if strings.Contains(low, "app_key") && strings.Contains(low, "invalid") {
t.Fatalf("auth FAILED through broker (sentinel not overwritten?): %s", res.Stdout)
}
if strings.Contains(res.Stdout, "401") || strings.Contains(low, "unauthorized") {
t.Fatalf("auth FAILED through broker (401): %s", res.Stdout)
}
if strings.TrimSpace(res.Stdout) == "" {
t.Fatalf("empty output through broker — expected channel rows")
}

// (b) The real key must NEVER be visible in the bash environment.
leak, err := e.executeBashCommand(ctx,
"env | grep -i FLASHDUTY_APP_KEY || echo NO_KEY", t.TempDir(), 30*time.Second, nil, cred)
if err != nil {
t.Fatalf("bash (env leak check): %v", err)
}
if !strings.Contains(leak.Stdout, "NO_KEY") || strings.Contains(leak.Stdout, key) {
t.Fatalf("app_key LEAKED into bash env: %q", leak.Stdout)
}
if !strings.Contains(leak.Stdout, "FLASHDUTY_CRED_FD") {
// sanity: broker mode really engaged (CRED_FD is set)
fd, _ := e.executeBashCommand(ctx, "echo CRED_FD=$FLASHDUTY_CRED_FD BASE=$FLASHDUTY_BASE_URL",
t.TempDir(), 30*time.Second, nil, cred)
t.Logf("broker env sanity: %s", strings.TrimSpace(fd.Stdout))
}

// (c) Concurrency: two fduty in one bash, each gets its own dispatched
// SOCK_STREAM connection; both must authenticate (no byte interleave).
conc, err := e.executeBashCommand(ctx,
"fduty channel list >/tmp/a 2>&1 & fduty channel list >/tmp/b 2>&1 & wait; "+
"echo A=$(wc -l </tmp/a) B=$(wc -l </tmp/b); "+
"if grep -qi 'app_key.*invalid\\|401\\|unauthorized' /tmp/a /tmp/b; then echo AUTH_FAIL; else echo AUTH_OK; fi",
t.TempDir(), 90*time.Second, nil, cred)
if err != nil {
t.Fatalf("bash (concurrent): %v", err)
}
t.Logf("concurrent result: %s", strings.TrimSpace(conc.Stdout))
if !strings.Contains(conc.Stdout, "AUTH_OK") {
t.Fatalf("concurrent fduty auth failed: %s", conc.Stdout)
}
if strings.Contains(conc.Stdout, "A=0") || strings.Contains(conc.Stdout, "B=0") {
t.Fatalf("a concurrent fduty produced no output: %s", conc.Stdout)
}
}
239 changes: 239 additions & 0 deletions environment/broker_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
//go:build linux

package environment

import (
"context"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
"sync"
"syscall"
"time"

"github.com/flashcatcloud/flashduty-runner/protocol"
)

// BrokerSupported reports whether this build can run the egress broker.
const BrokerSupported = true

// prSetDumpable is Linux's PR_SET_DUMPABLE prctl option.
const prSetDumpable = 4

// Control-channel protocol (1-byte framing over the SEQPACKET socket). The CLI
// sends ctrlReqDial; the broker replies ctrlRespOK + one fd via SCM_RIGHTS, or
// ctrlRespErr (no fd) when it cannot mint a connection or the request is
// malformed. ctrlRespOK shares the value of ctrlReqDial — both are 0x01 — which
// is fine: direction disambiguates them.
const (
ctrlReqDial byte = 0x01 // CLI → broker: "give me a connection"
ctrlRespOK byte = 0x01 // broker → CLI: success, fd follows in SCM_RIGHTS
ctrlRespErr byte = 0xFF // broker → CLI: refused, no fd
)

// upstreamResponseHeaderTimeout backstops a hung upstream that accepts the
// connection but never sends response headers. It is a long backstop, not the
// primary bound: the CLI's own client timeout and the bash process's lifetime
// (its death closes fd 3 and cancels in-flight requests) cap normal calls well
// before this fires.
const upstreamResponseHeaderTimeout = 60 * time.Second

// controlReadHeaderTimeout bounds how long a dispatched per-conn server waits for
// the in-sandbox fduty to send request headers over the local fd.
const controlReadHeaderTimeout = 30 * time.Second

var dumpableOnce sync.Once

// hardenProcessMemory marks the runner process non-dumpable so a same-uid,
// non-root process on a BYOC host cannot ptrace/read the per-person app_keys
// the broker holds in runner memory. Best-effort, once per process, Linux-only.
// execve resets dumpability, so bash children are unaffected.
func hardenProcessMemory() {
dumpableOnce.Do(func() {
_, _, _ = syscall.Syscall(syscall.SYS_PRCTL, uintptr(prSetDumpable), 0, 0)
})
}

// setupBrokerForCmd wires a per-invocation egress broker onto cmd before it is
// started: it creates a SOCK_SEQPACKET|SOCK_CLOEXEC control socketpair, hands
// the child end to the subprocess via cmd.ExtraFiles (inherited as fd 3), runs
// serveBrokerControl on the parent end, and returns the broker-mode env vars to
// merge into cmd.Env. The credential key/base_url never enter cmd.Env. The
// returned stop() tears down the control channel and closes the parent's hold
// on the child fd; call it after the command exits.
//
// All SOCK_CLOEXEC / SCM_RIGHTS machinery is confined to this Linux-only file so
// environment.go stays platform-neutral.
func setupBrokerForCmd(cmd *exec.Cmd, cred *protocol.BashCredential) (stop func(), env []string, err error) {
hardenProcessMemory()

sp, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return nil, nil, fmt.Errorf("broker socketpair: %w", err)
}
childEnd := os.NewFile(uintptr(sp[0]), "broker-ctl-child")
// The first ExtraFiles entry maps to fd 3 in the child (after 0,1,2).
cmd.ExtraFiles = append(cmd.ExtraFiles, childEnd)

srvStop, err := serveBrokerControl(sp[1], cred.Key, cred.BaseURL)
if err != nil {
_ = childEnd.Close()
_ = syscall.Close(sp[1])
return nil, nil, fmt.Errorf("broker start: %w", err)
}

env = []string{
"FLASHDUTY_CRED_FD=3",
"FLASHDUTY_BASE_URL=http://flashduty.broker.local",
}
stop = func() {
srvStop()
_ = childEnd.Close()
}
return stop, env, nil
}

// serveBrokerControl serves the SEQPACKET control channel on parentFD: for each
// handshake datagram it dispatches a dedicated SOCK_STREAM connection (via
// SCM_RIGHTS) running an httputil.ReverseProxy that rewrites the sentinel
// app_key to key and forwards to baseURL. The returned stop() closes the
// control channel and all dispatched connections. Call after cmd.Start().
func serveBrokerControl(parentFD int, key, baseURL string) (stop func(), err error) {
base, err := url.Parse(baseURL)
if err != nil {
return nil, fmt.Errorf("broker: parse base url: %w", err)
}
if base.Host == "" {
return nil, fmt.Errorf("broker: base url %q has no host", baseURL)
}

// Clone the default transport (don't share/mutate the process-global one) and
// add a response-header backstop. Real-cert HTTPS / plain HTTP per base scheme.
// Guard the type assertion the same way safeHTTPTransport does, in case the
// process default transport was ever swapped.
baseTransport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
baseTransport = &http.Transport{}
}
transport := baseTransport.Clone()
transport.ResponseHeaderTimeout = upstreamResponseHeaderTimeout

proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = base.Scheme
req.URL.Host = base.Host
req.Host = base.Host
q := req.URL.Query()
q.Set("app_key", key) // overwrite the sentinel
req.URL.RawQuery = q.Encode()
},
Transport: transport,
ErrorLog: slog.NewLogLogger(slog.Default().Handler(), slog.LevelWarn),
}

ctx, cancel := context.WithCancel(context.Background())
// conns tracks only the dispatched per-connection serveOneConn goroutines, so
// stop() can guarantee no upstream request is in flight after it returns. The
// control goroutine is deliberately NOT counted: it exits only when its
// Recvmsg peer fully closes (bash's fd 3 AND the runner's childEnd), and
// childEnd is closed by the caller AFTER stop() — so joining it here would
// deadlock (Close(parentFD) does not reliably interrupt a blocked raw
// Recvmsg). No dispatch can race conns.Wait(): stop() runs post-bash-exit with
// the control goroutine parked in Recvmsg and nothing writing the socket.
var conns sync.WaitGroup

go func() {
buf := make([]byte, 8)
for {
n, _, _, _, rerr := syscall.Recvmsg(parentFD, buf, nil, 0)
if rerr != nil || n == 0 {
return // control channel closed → bash exited
}
// Only the 1-byte dial request is valid. A garbage datagram (e.g. a
// confused in-sandbox process that inherited fd 3) is refused so the
// peer fails fast instead of hanging, and never mints a connection.
if buf[0] != ctrlReqDial {
_ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0)
continue
}
pair, perr := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if perr != nil {
_ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0)
continue
}
// Wrap OUR end and confirm it is usable BEFORE handing the peer its
// end. If net.FileConn failed AFTER we sent the fd, the peer would hold
// a live connection we never serve → its request hangs until its client
// timeout. Validating first lets us refuse cleanly with ctrlRespErr.
// net.FileConn dups pair[0]; myEnd.Close releases the original handle.
myEnd := os.NewFile(uintptr(pair[0]), "broker-conn")
conn, cerr := net.FileConn(myEnd)
_ = myEnd.Close()
if cerr != nil {
_ = syscall.Close(pair[1])
_ = syscall.Sendmsg(parentFD, []byte{ctrlRespErr}, nil, nil, 0)
continue
}
rights := syscall.UnixRights(pair[1])
if serr := syscall.Sendmsg(parentFD, []byte{ctrlRespOK}, rights, nil, 0); serr != nil {
_ = conn.Close()
_ = syscall.Close(pair[1])
continue
}
_ = syscall.Close(pair[1]) // child end is now duplicated in the peer
conns.Add(1)
go func() {
defer conns.Done()
serveOneConn(ctx, conn, proxy)
}()
}
}()

return func() {
cancel()
_ = syscall.Close(parentFD) // unblocks Recvmsg → control goroutine returns
conns.Wait()
}, nil
}

// serveOneConn serves HTTP/1.1 (keep-alive) on a single dispatched connection.
func serveOneConn(ctx context.Context, conn net.Conn, h http.Handler) {
srv := &http.Server{
Handler: h,
// Bound the time to read request headers (the in-sandbox fduty sends them
// promptly over the local fd); also satisfies gosec G112 (Slowloris).
ReadHeaderTimeout: controlReadHeaderTimeout,
// Tie each request's context to broker shutdown so cancel() (from stop())
// promptly cancels in-flight upstream requests, not just future Accepts.
BaseContext: func(net.Listener) context.Context { return ctx },
}
// oneConnListener yields conn once, then blocks until ctx is done so
// http.Server.Serve keeps the conn alive for sequential keep-alive requests.
l := &oneConnListener{conn: conn, ctx: ctx}
_ = srv.Serve(l) // returns when the listener errors (ctx done / conn gone)
_ = conn.Close()
}

type oneConnListener struct {
conn net.Conn
ctx context.Context
once sync.Once
}

func (l *oneConnListener) Accept() (net.Conn, error) {
first := false
l.once.Do(func() { first = true })
if first {
return l.conn, nil
}
<-l.ctx.Done() // block subsequent Accepts until shutdown
return nil, io.EOF
}
func (l *oneConnListener) Close() error { return nil }
func (l *oneConnListener) Addr() net.Addr { return l.conn.LocalAddr() }
Loading
Loading