diff --git a/architecture/README.md b/architecture/README.md
index 36b0a4978..8f7ba22ce 100644
--- a/architecture/README.md
+++ b/architecture/README.md
@@ -38,7 +38,7 @@ flowchart TB
end
CLI -- "gRPC / HTTPS" --> SERVER
- CLI -- "SSH over HTTP CONNECT" --> SERVER
+ CLI -- "SSH over gRPC ForwardTcp" --> SERVER
SERVER -- "CRUD + Watch" --> DB
SERVER -- "Create / Delete Pods" --> SBX
SUPERVISOR -- "Fetch Policy + Credentials + Inference Bundle" --> SERVER
@@ -129,17 +129,17 @@ The first command installs the CLI. The second command bootstraps the cluster on
For more detail, see [Cluster Bootstrap Architecture](cluster-single-node.md).
-### Sandbox Connect (SSH Tunneling)
+### Sandbox Connect (SSH Forwarding)
Users can open interactive terminal sessions into running sandboxes. SSH traffic is tunneled through the gateway rather than exposing sandbox pods directly on the network.
The connection flow works as follows:
1. The CLI requests a session token from the gateway.
-2. The CLI opens an HTTP CONNECT tunnel to the gateway's SSH tunnel endpoint, passing the token and sandbox identifier.
-3. The gateway validates the token, confirms the sandbox is running, resolves the pod's network address, and establishes a TCP connection to the sandbox's embedded SSH server.
-4. A cryptographic handshake (HMAC-verified) confirms the gateway's identity to the sandbox.
-5. The CLI and sandbox exchange SSH traffic bidirectionally through the tunnel.
+2. The CLI opens a bidirectional gRPC `ForwardTcp` stream with `target.ssh`, passing the token and sandbox identifier.
+3. The gateway validates the token, confirms the sandbox is ready, and asks the already-connected supervisor to open an SSH-targeted relay.
+4. The supervisor connects the relay to the sandbox's embedded SSH server over the local Unix socket.
+5. The CLI and sandbox exchange SSH traffic bidirectionally through the gRPC stream and supervisor relay.
This design provides several benefits:
diff --git a/architecture/gateway-security.md b/architecture/gateway-security.md
index a32c3fb52..9eb29896d 100644
--- a/architecture/gateway-security.md
+++ b/architecture/gateway-security.md
@@ -234,16 +234,17 @@ The sandbox calls two RPCs over this authenticated channel:
- `GetSandboxSettings` -- fetches the YAML policy that governs the sandbox's behavior.
- `GetSandboxProviderEnvironment` -- fetches provider credentials as environment variables.
-## SSH Tunnel Authentication
+## SSH Forward Authentication
-SSH connections into sandboxes pass through the gateway's HTTP CONNECT tunnel at `/connect/ssh`. This adds a second authentication layer on top of mTLS.
+SSH connections into sandboxes pass through the gateway's bidirectional gRPC `ForwardTcp` stream with `target.ssh`. This adds a second authorization layer on top of gateway mTLS.
-### Request Headers
+### Forward Initialization
-| Header | Purpose |
+| Field | Purpose |
|---|---|
-| `x-sandbox-id` | Identifies the target sandbox |
-| `x-sandbox-token` | Session token (created via `CreateSshSession` RPC) |
+| `sandbox_id` | Identifies the target sandbox |
+| `target.ssh` | Requests the built-in SSH Unix-socket target |
+| `authorization_token` | Session token created via `CreateSshSession` |
The gateway validates the token against the stored `SshSession` record and checks:
@@ -269,16 +270,16 @@ The gateway enforces two concurrent connection limits to bound the impact of cre
| Per-token | 10 concurrent tunnels | Limits damage from a single leaked token |
| Per-sandbox | 20 concurrent tunnels | Prevents bypass via creating many tokens for one sandbox |
-These limits are tracked in-memory and decremented when tunnels close. Exceeding either limit returns HTTP 429 (Too Many Requests).
+These limits are tracked in-memory and decremented when streams close. Exceeding either limit returns gRPC `ResourceExhausted`.
### Supervisor-Initiated Relay Model
-The gateway never dials the sandbox. Instead, the sandbox supervisor opens an outbound `ConnectSupervisor` bidirectional gRPC stream to the gateway on startup and keeps it alive for the sandbox lifetime. SSH traffic for `/connect/ssh` (and exec traffic for `ExecSandbox`) rides this same TCP+TLS+HTTP/2 connection as separate multiplexed HTTP/2 streams. The gateway-side registry and `RelayStream` handler live in `crates/openshell-server/src/supervisor_session.rs`; the supervisor-side bridge lives in `crates/openshell-sandbox/src/supervisor_session.rs`.
+The gateway never dials the sandbox. Instead, the sandbox supervisor opens an outbound `ConnectSupervisor` bidirectional gRPC stream to the gateway on startup and keeps it alive for the sandbox lifetime. SSH traffic for `ForwardTcp(target.ssh)` and exec traffic for `ExecSandbox` ride this same TCP+TLS+HTTP/2 connection as separate multiplexed HTTP/2 streams. The gateway-side registry and `RelayStream` handler live in `crates/openshell-server/src/supervisor_session.rs`; the supervisor-side bridge lives in `crates/openshell-sandbox/src/supervisor_session.rs`.
Per-connection flow:
-1. CLI presents `x-sandbox-id` + `x-sandbox-token` at `/connect/ssh` and passes gateway token validation.
-2. Gateway calls `SupervisorSessionRegistry::open_relay(sandbox_id, ...)`, which allocates a `channel_id` (UUID) and sends a `RelayOpen` message to the supervisor over the already-established `ConnectSupervisor` stream. If no session is registered yet, it polls with exponential backoff up to a bounded timeout (30 s for `/connect/ssh`, 15 s for `ExecSandbox`).
+1. CLI opens `ForwardTcp` with `TcpForwardInit { sandbox_id, target.ssh, authorization_token }` and passes gateway token validation.
+2. Gateway calls `SupervisorSessionRegistry::open_relay_with_target(sandbox_id, SshRelayTarget, ...)`, which allocates a `channel_id` (UUID) and sends a `RelayOpen` message to the supervisor over the already-established `ConnectSupervisor` stream. If no session is registered yet, it polls with exponential backoff up to a bounded timeout.
3. The supervisor opens a new `RelayStream` RPC on the same `Channel` — a new HTTP/2 stream, no new TCP connection and no new TLS handshake. The first `RelayFrame` is a `RelayInit { channel_id }` that claims the pending slot on the gateway.
4. `claim_relay` pairs the gateway-side waiter with the supervisor-side RPC via a `tokio::io::duplex(64 KiB)` pair. Subsequent `RelayFrame::data` frames carry raw SSH bytes in both directions. The supervisor is a dumb byte bridge: it has no protocol awareness of the SSH bytes flowing through.
5. Inside the sandbox pod, the supervisor connects the relay to sshd over a Unix domain socket at `/run/openshell/ssh.sock` (see `crates/openshell-driver-kubernetes/src/main.rs`).
diff --git a/architecture/gateway.md b/architecture/gateway.md
index e83640a43..65f9e7a90 100644
--- a/architecture/gateway.md
+++ b/architecture/gateway.md
@@ -2,9 +2,9 @@
## Overview
-`openshell-server` is the gateway -- the central control plane for a cluster. It exposes two gRPC services (OpenShell and Inference) and HTTP endpoints on a single multiplexed port, manages sandbox lifecycle through a pluggable compute driver, persists state in SQLite or Postgres, and brokers SSH access into sandboxes through supervisor-initiated relay streams. The gateway coordinates all interactions between clients, the compute backend, and the persistence layer.
+`openshell-server` is the gateway -- the central control plane for a cluster. It exposes two gRPC services (OpenShell and Inference) and HTTP endpoints on a single multiplexed port, manages sandbox lifecycle through a pluggable compute driver, persists state in SQLite or Postgres, and brokers SSH and service access into sandboxes through supervisor-initiated relay streams. The gateway coordinates all interactions between clients, the compute backend, and the persistence layer.
-Each sandbox supervisor opens a persistent inbound gRPC session (`ConnectSupervisor`); the gateway multiplexes per-invocation `RelayStream` RPCs onto the same HTTP/2 connection to move bytes between clients and the in-sandbox SSH Unix socket. The gateway does not need to know, resolve, or reach the sandbox's network address.
+Each sandbox supervisor opens a persistent inbound gRPC session (`ConnectSupervisor`); the gateway multiplexes per-invocation `RelayStream` RPCs onto the same HTTP/2 connection to move bytes between clients and explicit in-sandbox targets such as the SSH Unix socket or a loopback TCP service. The gateway does not need to know, resolve, or reach the sandbox's network address.
## Architecture Diagram
@@ -21,8 +21,8 @@ graph TD
NAV["OpenShellServer (OpenShell service)"]
INF["InferenceServer (Inference service)"]
HTTP["HTTP Router (Axum)"]
- HEALTH["Health Endpoints"]
- SSH_TUNNEL["SSH Tunnel (/connect/ssh)"]
+ AUTH["Browser Auth (/auth/connect)"]
+ WS["WebSocket Tunnel (/_ws_tunnel)"]
SUP_REG["SupervisorSessionRegistry"]
STORE["Store (SQLite / Postgres)"]
COMPUTE["ComputeRuntime"]
@@ -40,13 +40,11 @@ graph TD
MUX -->|"other"| HTTP
GRPC_ROUTER -->|"/openshell.inference.v1.Inference/*"| INF
GRPC_ROUTER -->|"all other paths"| NAV
- HTTP --> HEALTH
- HTTP --> SSH_TUNNEL
+ HTTP --> AUTH
+ HTTP --> WS
NAV --> STORE
NAV --> COMPUTE
NAV --> SUP_REG
- SSH_TUNNEL --> STORE
- SSH_TUNNEL --> SUP_REG
INF --> STORE
COMPUTE --> DRIVER
COMPUTE --> STORE
@@ -65,12 +63,11 @@ graph TD
| Gateway runtime | `crates/openshell-server/src/lib.rs` | `ServerState` struct, `run_server()` accept loop |
| Protocol mux | `crates/openshell-server/src/multiplex.rs` | `MultiplexService`, `MultiplexedService`, `GrpcRouter`, `BoxBody`, HTTP/2 adaptive-window tuning |
| gRPC: OpenShell | `crates/openshell-server/src/grpc/mod.rs` | `OpenShellService` trait impl -- dispatches to per-concern handlers |
-| gRPC: Sandbox/Exec | `crates/openshell-server/src/grpc/sandbox.rs` | Sandbox CRUD, `ExecSandbox`, SSH session handlers, relay-backed exec proxy |
+| gRPC: Sandbox/Exec/Forward | `crates/openshell-server/src/grpc/sandbox.rs` | Sandbox CRUD, `ExecSandbox`, SSH session handlers, relay-backed exec proxy, `ForwardTcp` streams for SSH (`target.ssh`) and service forwarding (`target.tcp`) |
| gRPC: Inference | `crates/openshell-server/src/inference.rs` | `InferenceService` -- cluster inference config and sandbox bundle delivery |
| Supervisor sessions | `crates/openshell-server/src/supervisor_session.rs` | `SupervisorSessionRegistry`, `handle_connect_supervisor`, `handle_relay_stream`, reaper |
-| HTTP | `crates/openshell-server/src/http.rs` | Health endpoints, merged with SSH tunnel router |
+| HTTP | `crates/openshell-server/src/http.rs` | HTTP router for browser auth and WebSocket tunnel endpoints; health and metrics routers for dedicated listeners |
| Browser auth | `crates/openshell-server/src/auth.rs` | Cloudflare browser login relay at `/auth/connect` |
-| SSH tunnel | `crates/openshell-server/src/ssh_tunnel.rs` | HTTP CONNECT handler at `/connect/ssh` backed by `open_relay` |
| WS tunnel | `crates/openshell-server/src/ws_tunnel.rs` | WebSocket tunnel handler at `/_ws_tunnel` for Cloudflare-fronted clients |
| TLS | `crates/openshell-server/src/tls.rs` | `TlsAcceptor` wrapping rustls with ALPN |
| Persistence | `crates/openshell-server/src/persistence/mod.rs` | `Store` enum (SQLite/Postgres), generic object CRUD, protobuf codec |
@@ -86,7 +83,7 @@ Proto definitions consumed by the gateway:
| Proto file | Package | Defines |
|------------|---------|---------|
-| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, public sandbox resource model, provider/SSH/watch/policy messages, supervisor session messages (`ConnectSupervisor`, `RelayStream`, `RelayFrame`) |
+| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, public sandbox resource model, provider/SSH/watch/policy messages, CLI service-forward messages (`ForwardTcp`, `TcpForwardFrame`), supervisor session messages (`ConnectSupervisor`, `RelayStream`, `RelayFrame`) |
| `proto/compute_driver.proto` | `openshell.compute.v1` | Internal `ComputeDriver` service, driver-native sandbox observations, compute watch stream envelopes |
| `proto/inference.proto` | `openshell.inference.v1` | `Inference` service: `SetClusterInference`, `GetClusterInference`, `GetInferenceBundle` |
| `proto/datamodel.proto` | `openshell.datamodel.v1` | `Provider` |
@@ -109,7 +106,7 @@ The gateway boots in `cli::run_cli` (`crates/openshell-server/src/cli.rs`) and p
3. Build `ServerState` (shared via `Arc` across all handlers), including a fresh `SupervisorSessionRegistry`.
4. **Spawn background tasks**:
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, republishes platform events, and runs a periodic `ListSandboxes` snapshot reconcile.
- - `ssh_tunnel::spawn_session_reaper` -- sweeps expired or revoked SSH session tokens from the store hourly.
+ - `ssh_sessions::spawn_session_reaper` -- sweeps expired or revoked SSH session tokens from the store hourly.
- `supervisor_session::spawn_relay_reaper` -- sweeps orphaned pending relay channels every 30 seconds.
5. Create `MultiplexService`.
6. Bind `TcpListener` on `config.bind_address`.
@@ -145,9 +142,8 @@ All configuration is via CLI flags with environment variable fallbacks. The `--d
| `--vm-tls-key` | `OPENSHELL_VM_TLS_KEY` | None | Client private key copied into VM guests for gateway mTLS |
| `--ssh-gateway-host` | `OPENSHELL_SSH_GATEWAY_HOST` | `127.0.0.1` | Public hostname returned in SSH session responses |
| `--ssh-gateway-port` | `OPENSHELL_SSH_GATEWAY_PORT` | `8080` | Public port returned in SSH session responses |
-| `--ssh-connect-path` | `OPENSHELL_SSH_CONNECT_PATH` | `/connect/ssh` | HTTP path for SSH CONNECT/upgrade |
-The sandbox-side SSH listener is a Unix domain socket inside the sandbox. The path defaults to `/run/openshell/ssh.sock` and is configured on the compute driver (e.g. `openshell-driver-kubernetes --sandbox-ssh-socket-path`). The gateway never dials this socket itself; the supervisor bridges it onto a `RelayStream` when asked.
+The sandbox-side SSH listener is a Unix domain socket inside the sandbox. The path defaults to `/run/openshell/ssh.sock` and is configured on the compute driver (e.g. `openshell-driver-kubernetes --sandbox-ssh-socket-path`). The gateway never dials this socket itself; the supervisor bridges it onto a `RelayStream` when `ForwardTcp` requests `target.ssh`.
## Shared State
@@ -186,7 +182,7 @@ All traffic (gRPC and HTTP) shares a single TCP port. Multiplexing happens at th
1. Each accepted TCP stream (optionally TLS-wrapped) is passed to `hyper_util::server::conn::auto::Builder`, which auto-negotiates HTTP/1.1 or HTTP/2.
2. The HTTP/2 side is built with `adaptive_window(true)`. Hyper/h2 auto-sizes the per-stream flow-control window based on measured bandwidth-delay product, so bulk byte transfers on `RelayStream` (and `ExecSandbox` / `PushSandboxLogs`) are not throttled by the default 64 KiB window. Idle streams stay cheap; active streams grow as needed.
-3. The builder calls `serve_connection_with_upgrades()`, which supports HTTP upgrades (needed for the SSH tunnel's CONNECT method).
+3. The builder calls `serve_connection_with_upgrades()`, which supports HTTP upgrades used by WebSocket tunnel clients.
4. For each request, `MultiplexedService` inspects the `content-type` header:
- **Starts with `application/grpc`** -- routes to `GrpcRouter`.
- **Anything else** -- routes to the Axum HTTP router.
@@ -218,17 +214,19 @@ When TLS is enabled (`crates/openshell-server/src/tls.rs`):
The gateway brokers all byte-level access into a sandbox through a two-plane design on a single HTTP/2 connection initiated by the supervisor:
-1. **Control plane** -- `ConnectSupervisor(stream SupervisorMessage) returns (stream GatewayMessage)`. Long-lived, one per sandbox. Carries `SupervisorHello`, `SessionAccepted`/`SessionRejected`, heartbeats, and `RelayOpen`/`RelayClose` control messages.
-2. **Data plane** -- `RelayStream(stream RelayFrame) returns (stream RelayFrame)`. One short-lived call per SSH or exec invocation. The first inbound frame is a `RelayInit { channel_id }`; subsequent frames carry raw bytes in `RelayFrame.data` in either direction.
+1. **Control plane** -- `ConnectSupervisor(stream SupervisorMessage) returns (stream GatewayMessage)`. Long-lived, one per sandbox. Carries `SupervisorHello`, `SessionAccepted`/`SessionRejected`, heartbeats, `RelayOpen`, `RelayOpenResult`, and `RelayClose` control messages.
+2. **Data plane** -- `RelayStream(stream RelayFrame) returns (stream RelayFrame)`. One short-lived call per relay invocation. The first inbound frame is a `RelayInit { channel_id }`; subsequent frames carry raw bytes in `RelayFrame.data` in either direction.
-Both RPCs are defined in `proto/openshell.proto` and ride the same TCP + TLS + HTTP/2 connection from the supervisor. No new TLS handshake, no reverse HTTP CONNECT, no direct gateway-to-pod dial.
+Both RPCs are defined in `proto/openshell.proto` and ride the same TCP + TLS + HTTP/2 connection from the supervisor. No new TLS handshake, no reverse HTTP dialback, no direct gateway-to-pod dial.
+
+`RelayOpen` carries an optional explicit target in `proto/openshell.proto`: `SshRelayTarget` for the built-in SSH socket or `TcpRelayTarget` for loopback TCP targets. Supervisors treat an absent target as SSH for wire compatibility with older callers.
### `SupervisorSessionRegistry`
`crates/openshell-server/src/supervisor_session.rs` defines `SupervisorSessionRegistry`, a single instance of which lives on `ServerState.supervisor_sessions`. It holds two maps guarded by `std::sync::Mutex`:
- `sessions: HashMap` -- one entry per connected supervisor. Each `LiveSession` carries a unique `session_id`, the `mpsc::Sender` for the outbound stream, and a connection timestamp.
-- `pending_relays: HashMap` -- one entry per in-flight `open_relay` call awaiting the supervisor's `RelayStream` dial-back. Each `PendingRelay` wraps a `oneshot::Sender` and a creation timestamp.
+- `pending_relays: HashMap` -- one entry per in-flight relay request awaiting the supervisor's `RelayStream` dial-back. Each `PendingRelay` stores the sandbox ID, the full `RelayOpen` message for reconnect replay, a creation timestamp, and a `oneshot::Sender>` so the waiter can receive either the paired stream or a supervisor-reported target-open failure.
Core operations:
@@ -236,8 +234,10 @@ Core operations:
|--------|---------|
| `register(sandbox_id, session_id, tx)` | Insert a live session; returns the previous session's sender (if any) so the caller can close it. Used by `handle_connect_supervisor` when a supervisor reconnects. |
| `remove_if_current(sandbox_id, session_id)` | Remove the session only if its `session_id` still matches. Guards against the supersede race where an old session's cleanup task fires after a newer session already registered. |
-| `open_relay(sandbox_id, session_wait_timeout)` | Wait up to `session_wait_timeout` for a live session, allocate a fresh `channel_id` (UUID v4), insert the pending slot, send `RelayOpen { channel_id }` to the supervisor, and return `(channel_id, oneshot::Receiver)`. The receiver resolves once the supervisor's `RelayStream` arrives and `claim_relay` pairs them up. |
-| `claim_relay(channel_id)` | Consume the pending slot, construct a `tokio::io::duplex(64 KiB)` pair, hand the gateway-side half to the waiter via the oneshot, and return the supervisor-side half to `handle_relay_stream`. |
+| `open_relay(sandbox_id, session_wait_timeout)` | SSH-compatible wrapper around `open_relay_with_target`. It sends a `RelayOpen` with an explicit `SshRelayTarget` and returns `(channel_id, oneshot::Receiver>)`. |
+| `open_relay_with_target(sandbox_id, target, service_id, session_wait_timeout)` | Wait up to `session_wait_timeout` for a live session, allocate a fresh `channel_id` (UUID v4), insert the pending slot, send the full `RelayOpen { channel_id, target, service_id }` to the supervisor, and return a receiver that resolves when `claim_relay` pairs a `RelayStream` or when `RelayOpenResult { success: false }` reports the target-open failure. The target can be `SshRelayTarget` or loopback-only `TcpRelayTarget`, which is the base for future service-target relays. |
+| `fail_pending_relay(channel_id, error)` | Remove a pending relay and complete its waiter with `Status::unavailable(error)`. Called when the supervisor sends a failed `RelayOpenResult`, so callers fail promptly instead of waiting for the 10 s relay timeout. |
+| `claim_relay(channel_id)` | Consume the pending slot, construct a `tokio::io::duplex(64 KiB)` pair, hand `Ok(gateway-side half)` to the waiter via the oneshot, and return the supervisor-side half to `handle_relay_stream`. |
| `reap_expired_relays()` | Drop pending relays older than 10 s. Called by `spawn_relay_reaper` on a 30 s cadence. |
Session wait uses exponential backoff from 100 ms to 2 s while polling the sessions map. Pending-relay expiry is fixed at `RELAY_PENDING_TIMEOUT = 10 s`.
@@ -250,7 +250,7 @@ Lifecycle of a supervisor session:
2. Allocate a fresh `session_id` (UUID v4) and create an `mpsc::channel::(64)` for the outbound stream.
3. Call `registry.register(...)`. If it returns a previous sender, log that the previous session was superseded (dropping the previous `tx` closes the old outbound stream).
4. Send `SessionAccepted { session_id, heartbeat_interval_secs: 15 }`. If the send fails, call `remove_if_current` (so a concurrent reconnect isn't evicted) and return `Internal`.
-5. Spawn a session loop that `select!`s between inbound messages and a 15 s heartbeat timer. Inbound heartbeats are silent; `RelayOpenResult` is logged; `RelayClose` is logged; unknown payloads are logged as warnings.
+5. Spawn a session loop that `select!`s between inbound messages and a 15 s heartbeat timer. Inbound heartbeats are silent; successful `RelayOpenResult` messages are logged; failed `RelayOpenResult` messages call `fail_pending_relay` before logging; `RelayClose` is logged; unknown payloads are logged as warnings.
6. When the loop exits (inbound EOF, inbound error, or outbound channel closed), `remove_if_current` drops the registration -- unless a newer session has already replaced it.
### `handle_relay_stream`
@@ -264,50 +264,53 @@ Lifecycle of one relay call:
- **Gateway → supervisor**: read up to `RELAY_STREAM_CHUNK_SIZE = 16 KiB` at a time from the duplex read-half and emit `RelayFrame { Data }` messages on an outbound `mpsc::channel(16)`.
4. Return the outbound receiver as the RPC response stream.
-### Connect Flow (SSH Tunnel)
+### `ForwardTcp` Flow (SSH and Service Forwarding)
+
+`ForwardTcp` is the client-to-gateway byte stream for SSH and service forwarding. The first `TcpForwardFrame` must contain `TcpForwardInit`; all targets include the `authorization_token` issued by `CreateSshSession`. SSH connections use `target.ssh`, while service forwarding uses `target.tcp` with a loopback host and port.
```mermaid
sequenceDiagram
- participant Client as SSH client
- participant GW as Gateway (/connect/ssh)
+ participant Client as CLI
+ participant GW as Gateway (ForwardTcp)
participant Reg as SupervisorSessionRegistry
participant Sup as Sandbox Supervisor
- participant Daemon as In-sandbox sshd (Unix socket)
+ participant Target as In-sandbox target (SSH Unix socket or loopback TCP)
- Client->>GW: CONNECT /connect/ssh x-sandbox-id, x-sandbox-token
- GW->>GW: validate session + sandbox Ready
- GW->>Reg: open_relay(sandbox_id, 30s)
- Reg->>Sup: GatewayMessage::RelayOpen { channel_id }
+ Client->>GW: ForwardTcp(TcpForwardInit { target, authorization_token })
+ GW->>GW: validate sandbox Ready validate token validate loopback for target.tcp
+ GW->>Reg: open_relay_with_target(sandbox_id, target, service_id, 15s)
+ Reg->>Sup: GatewayMessage::RelayOpen { channel_id, target }
Note over Reg: waits for RelayStream on channel_id
- Sup->>Daemon: connect to Unix socket
+ Sup->>Target: dial SSH Unix socket or loopback TCP target
+ Sup-->>GW: RelayOpenResult { success/failure }
+ GW->>Reg: fail_pending_relay(channel_id) on failure
Sup->>GW: RelayStream(RelayFrame::Init { channel_id })
GW->>Reg: claim_relay(channel_id)
Reg-->>Sup: supervisor-side DuplexStream
Reg-->>GW: gateway-side DuplexStream
- GW-->>Client: 200 OK + HTTP upgrade
- Client<<->>GW: copy_bidirectional(upgraded, duplex)
+ Client<<->>GW: TcpForwardFrame::Data in both directions
GW<<->>Sup: RelayFrame::Data in both directions
- Sup<<->>Daemon: raw SSH bytes
+ Sup<<->>Target: raw bytes
```
-Timeouts on the tunnel path:
+Timeouts on the `ForwardTcp` path:
-- `open_relay` session wait: **30 s**. A first `sandbox connect` immediately after `sandbox create` must cover the supervisor's initial TLS + gRPC handshake on a cold pod.
-- `relay_rx` delivery timeout: 10 s. Covers the round-trip from the `RelayOpen` message to the supervisor's `RelayStream` dial-back.
+- `open_relay_with_target` session wait: **15 s**. The gateway waits for a live supervisor session before sending `RelayOpen`.
+- `relay_rx` delivery timeout: 10 s. Covers the round-trip from the `RelayOpen` message to the supervisor's `RelayOpenResult` and `RelayStream` dial-back. A failed `RelayOpenResult` completes the waiter immediately with `Unavailable`.
-Per-token and per-sandbox concurrent-tunnel limits (3 and 20 respectively) are still enforced before the upgrade.
+For all `ForwardTcp` targets, the gateway validates the `authorization_token` against the stored `SshSession`, rejects revoked, expired, or sandbox-mismatched tokens, and enforces per-token and per-sandbox concurrent connection limits (3 and 20 respectively). For `target.tcp`, the gateway also requires a loopback target host (`localhost`, `127.0.0.0/8`, or `::1`) and a port in `1..=65535`.
### Exec Flow
`ExecSandbox` reuses the same machinery from `grpc/sandbox.rs`:
1. Validate the request (`sandbox_id`, `command`, env-key format, other field rules), fetch the sandbox, require `Ready` phase.
-2. `state.supervisor_sessions.open_relay(&sandbox.id, 15s)` -- shorter timeout than SSH connect, because exec is typically called mid-lifetime after the supervisor session is already established.
-3. Wait up to 10 s for the relay `DuplexStream`.
+2. `state.supervisor_sessions.open_relay(&sandbox.id, 15s)` -- same session-wait timeout used by `ForwardTcp`, because exec is typically called mid-lifetime after the supervisor session is already established.
+3. Wait up to 10 s for the relay `DuplexStream`; a failed `RelayOpenResult` returns the target-open error promptly instead of timing out.
4. `stream_exec_over_relay`: bind an ephemeral localhost TCP listener, bridge that single-use TCP socket to the relay duplex, and drive a `russh` client through the local port. The `russh` session opens a channel, executes the shell-escaped command, and streams `ExecSandboxStdout`/`ExecSandboxStderr` chunks to the caller. On completion, send `ExecSandboxExit { exit_code }`.
5. On timeout (if `timeout_seconds > 0`), emit exit code 124 (matching `timeout(1)`).
-The supervisor-side SSH daemon is an SSH server bound to a Unix domain socket inside the sandbox's filesystem. Filesystem permissions on that socket are the only access-control boundary between the supervisor bridge and the daemon; all higher-level authorization is enforced at `CreateSshSession` / `ExecSandbox` in the gateway.
+The supervisor-side SSH daemon is an SSH server bound to a Unix domain socket inside the sandbox's filesystem. Filesystem permissions on that socket are the only access-control boundary between the supervisor bridge and the daemon; all higher-level authorization is enforced by gateway RPCs (`CreateSshSession` + `ForwardTcp` for SSH, `ExecSandbox` for exec).
### Regression Coverage
@@ -338,6 +341,7 @@ Defined in `proto/openshell.proto`, implemented in `crates/openshell-server/src/
| `DeleteSandbox` | Delete sandbox by name | Sets phase to `Deleting`, persists, notifies watch bus, then deletes via the compute driver. Cleans up store if the sandbox was already gone. |
| `WatchSandbox` | Stream sandbox updates | Server-streaming RPC. See [Watch Sandbox Stream](#watch-sandbox-stream) below. |
| `ExecSandbox` | Execute command in sandbox | Server-streaming RPC; data plane runs through `SupervisorSessionRegistry::open_relay`. See [Exec Flow](#exec-flow). |
+| `ForwardTcp` | Forward one CLI-side TCP connection into a sandbox | Bidirectional stream; consumes a `CreateSshSession` token, with `target.ssh` for SSH and `target.tcp` for loopback TCP services. See [`ForwardTcp` Flow (SSH and Service Forwarding)](#forwardtcp-flow-ssh-and-service-forwarding). |
#### Supervisor Session
@@ -352,7 +356,7 @@ Neither RPC is called by end users. They are the private control/data plane betw
| RPC | Description |
|-----|-------------|
-| `CreateSshSession` | Creates a session token for a `Ready` sandbox. Persists an `SshSession` record and returns gateway connection details (host, port, scheme, connect path). The resulting token is presented on the `/connect/ssh` HTTP CONNECT request. |
+| `CreateSshSession` | Creates a session token for a `Ready` sandbox. Persists an `SshSession` record and returns gateway connection details (host, port, scheme) plus optional expiry. The resulting token is presented as `authorization_token` on a `ForwardTcp` stream. |
| `RevokeSshSession` | Marks a session as revoked by setting `session.revoked = true` in the store. |
#### Provider Management
@@ -438,7 +442,7 @@ The `ClusterInferenceConfig` stored in the database contains only `provider_name
## HTTP Endpoints
-The HTTP router (`crates/openshell-server/src/http.rs`) merges two sub-routers:
+The main HTTP router (`crates/openshell-server/src/http.rs`) serves browser-auth and WebSocket tunnel endpoints on the multiplexed gateway port. Health and metrics routers are exposed on dedicated listeners when configured.
### Health Endpoints
@@ -448,14 +452,6 @@ The HTTP router (`crates/openshell-server/src/http.rs`) merges two sub-routers:
| `/healthz` | GET | `200 OK` (empty body) -- Kubernetes liveness probe |
| `/readyz` | GET | `200 OK` with JSON `{"status": "healthy", "version": ""}` -- Kubernetes readiness probe |
-### SSH Tunnel Endpoint
-
-| Path | Method | Response |
-|------|--------|----------|
-| `/connect/ssh` | CONNECT | Upgrades the connection to a bidirectional byte bridge tunneled through `SupervisorSessionRegistry::open_relay` |
-
-See [Connect Flow (SSH Tunnel)](#connect-flow-ssh-tunnel) for details.
-
### Cloudflare Endpoints
| Path | Method | Response |
@@ -680,7 +676,7 @@ Supervisor session telemetry is currently emitted as plain `tracing` events from
- `ResourceExhausted` for broadcast lag (missed messages).
- `Cancelled` for closed broadcast channels.
-- **HTTP errors**: The SSH tunnel handler returns HTTP status codes directly (`401`, `404`, `405`, `412`, `429`, `500`, `502`). `502` indicates the supervisor relay could not be opened; `429` indicates a per-token or per-sandbox concurrent-tunnel limit.
+- **Forwarding errors**: `ForwardTcp` returns gRPC status codes. `Unauthenticated` indicates a missing or invalid session token; `ResourceExhausted` indicates a per-token or per-sandbox connection limit; `Unavailable` indicates the supervisor relay could not be opened.
- **Connection errors**: Logged at `error` level but do not crash the gateway. TLS handshake failures and individual connection errors are caught and logged per-connection.
diff --git a/architecture/podman-rootless-networking.md b/architecture/podman-rootless-networking.md
index b267cfffa..de7e08b8c 100644
--- a/architecture/podman-rootless-networking.md
+++ b/architecture/podman-rootless-networking.md
@@ -250,9 +250,9 @@ A tmpfs is mounted at `/run/netns` in the container spec (`container.rs:458-463`
```
Client (CLI on user's machine)
|
- 1. gRPC: CreateSshSession -> gateway (returns token, connect_path)
- 2. HTTP CONNECT /connect/ssh to gateway
- (headers: x-sandbox-id, x-sandbox-token)
+ 1. gRPC: CreateSshSession -> gateway (returns token)
+ 2. gRPC: ForwardTcp(target.ssh) to gateway
+ (init: sandbox_id, authorization_token)
|
Gateway (host, port 8080)
|
@@ -364,9 +364,9 @@ Both drivers use the same reverse gRPC relay (`ConnectSupervisor` + `RelayStream
| `crates/openshell-sandbox/src/sandbox/linux/netns.rs` | Inner network namespace: veth pair, IP addressing, iptables rules |
| `crates/openshell-sandbox/src/proxy.rs` | HTTP CONNECT proxy: OPA policy, SSRF protection, L7 inspection |
| `crates/openshell-sandbox/src/ssh.rs` | SSH daemon on Unix socket, shell process netns entry via `setns()` |
-| `crates/openshell-sandbox/src/supervisor_session.rs` | gRPC ConnectSupervisor stream, RelayStream for SSH tunneling |
+| `crates/openshell-sandbox/src/supervisor_session.rs` | gRPC ConnectSupervisor stream, RelayStream for SSH and TCP target bridging |
| `crates/openshell-sandbox/src/grpc_client.rs` | gRPC channel to gateway (mTLS or plaintext, keep-alive, adaptive windowing) |
-| `crates/openshell-server/src/ssh_tunnel.rs` | Gateway-side SSH tunnel: HTTP CONNECT endpoint, relay bridging |
+| `crates/openshell-server/src/grpc/sandbox.rs` | Gateway-side `ForwardTcp` stream handling for SSH and TCP service forwarding |
| `crates/openshell-server/src/supervisor_session.rs` | SupervisorSessionRegistry, relay claim/open lifecycle |
| `crates/openshell-server/src/compute/mod.rs` | `ComputeRuntime::new_podman()` -- Podman compute driver initialization |
| `crates/openshell-core/src/config.rs` | Default constants: ports, network name |
diff --git a/architecture/sandbox-connect.md b/architecture/sandbox-connect.md
index 499532fb9..2ab7c51ec 100644
--- a/architecture/sandbox-connect.md
+++ b/architecture/sandbox-connect.md
@@ -8,28 +8,47 @@ Sandbox connect provides secure remote access into running sandbox environments.
2. **Command execution** (`sandbox create -- `) -- runs a command over SSH with stdout/stderr piped back
3. **File sync** (`sandbox create --upload`) -- uploads local files into the sandbox before command execution
-Gateway connectivity is **supervisor-initiated**: the gateway never dials the sandbox pod. On startup, each sandbox's supervisor opens a long-lived bidirectional gRPC stream (`ConnectSupervisor`) to the gateway and holds it for the sandbox's lifetime. **`CreateSshSession` → HTTP CONNECT and `ExecSandbox` both depend on that registration**: `open_relay` blocks until a live `ConnectSupervisor` entry exists for the `sandbox_id`; if the supervisor never registers (wrong endpoint, bad env, crash loop), the client hits the supervisor-session wait timeout instead of getting a relay. When a client asks the gateway for SSH, the gateway sends a `RelayOpen` message over that stream; the supervisor responds by initiating a `RelayStream` gRPC call that rides the same TCP+TLS+HTTP/2 connection as a new multiplexed stream. The supervisor bridges the bytes of that stream into a root-owned Unix socket where the embedded SSH daemon listens. **The in-container sshd is reached only on that local Unix socket** — the supervisor `UnixStream::connect`s to it. Do not assume the relay path terminates at a container-exposed TCP listener for sshd; any optional TCP surface is separate from the gateway relay bridge.
+Gateway connectivity is **supervisor-initiated**: the gateway never dials the sandbox pod. On startup, each sandbox's supervisor opens a long-lived bidirectional gRPC stream (`ConnectSupervisor`) to the gateway and holds it for the sandbox's lifetime. **`CreateSshSession`, `ForwardTcp`, and `ExecSandbox` all depend on that registration**: `open_relay` blocks until a live `ConnectSupervisor` entry exists for the `sandbox_id`; if the supervisor never registers (wrong endpoint, bad env, crash loop), the client hits the supervisor-session wait timeout instead of getting a relay. When a client asks the gateway for SSH, the OpenSSH `ProxyCommand` runs `openshell ssh-proxy`, which opens a bidirectional gRPC `ForwardTcp` stream to the gateway. Its first frame is `TcpForwardInit { target.ssh, authorization_token }`, where the token comes from `CreateSshSession`. The gateway validates the token, then sends a `RelayOpen` message over `ConnectSupervisor` with an explicit `SshRelayTarget`; older targetless messages remain SSH-compatible. The supervisor validates and dials the requested target before reporting a successful `RelayOpenResult`, then initiates a `RelayStream` gRPC call that rides the same TCP+TLS+HTTP/2 connection as a new multiplexed stream. For SSH targets, the supervisor bridges the bytes of that stream into a root-owned Unix socket where the embedded SSH daemon listens. **The in-container sshd is reached only on that local Unix socket** — the supervisor `UnixStream::connect`s to it. Do not assume the relay path terminates at a container-exposed TCP listener for sshd; any optional TCP surface is separate from the gateway relay bridge.
There is also a gateway-side `ExecSandbox` gRPC RPC that executes commands inside sandboxes without requiring an external SSH client. It uses the same relay mechanism.
+The OS-88 forwarding path also carries arbitrary TCP services: `openshell forward service --target-port ` binds a local TCP listener, opens one `ForwardTcp` bidirectional gRPC stream per accepted local connection, and sends `TcpForwardInit { target.tcp, authorization_token }` with a `TcpRelayTarget` for the requested loopback port inside the sandbox. This avoids the SSH `direct-tcpip` transport and keeps gateway auth, typed routing, session-token authorization, and relay target validation in the OpenShell protocol.
+
### Podman and relay environment
-The **Podman** compute driver (`crates/openshell-driver-podman/src/container.rs`, `build_env` / `build_container_spec`) must inject the same **relay-critical** environment variables into the container as the Kubernetes driver: `OPENSHELL_ENDPOINT` (gateway gRPC), `OPENSHELL_SANDBOX_ID`, and `OPENSHELL_SSH_SOCKET_PATH` (Unix path the embedded sshd binds and the supervisor dials). Without `OPENSHELL_SSH_SOCKET_PATH`, the in-container `openshell-sandbox` process does not know where to create the socket; without `OPENSHELL_ENDPOINT` / `OPENSHELL_SANDBOX_ID`, the supervisor cannot complete `ConnectSupervisor`, so the gateway never has a session to target with `RelayOpen`. Driver-owned keys overwrite user spec/template env so these cannot be overridden. **Podman container readiness** (libpod `HealthConfig` in `build_container_spec`) treats the sandbox as ready when a sentinel file exists, **or** `test -S` passes on the configured `sandbox_ssh_socket_path` (**supervisor / Unix-socket path**), **or** a legacy TCP listen check on the published SSH port — so the `Ready` phase used by `CreateSshSession` and the SSH tunnel can reflect Unix-socket–based startup, not only a TCP listener.
+The **Podman** compute driver (`crates/openshell-driver-podman/src/container.rs`, `build_env` / `build_container_spec`) must inject the same **relay-critical** environment variables into the container as the Kubernetes driver: `OPENSHELL_ENDPOINT` (gateway gRPC), `OPENSHELL_SANDBOX_ID`, and `OPENSHELL_SSH_SOCKET_PATH` (Unix path the embedded sshd binds and the supervisor dials). Without `OPENSHELL_SSH_SOCKET_PATH`, the in-container `openshell-sandbox` process does not know where to create the socket; without `OPENSHELL_ENDPOINT` / `OPENSHELL_SANDBOX_ID`, the supervisor cannot complete `ConnectSupervisor`, so the gateway never has a session to target with `RelayOpen`. Driver-owned keys overwrite user spec/template env so these cannot be overridden. **Podman container readiness** (libpod `HealthConfig` in `build_container_spec`) treats the sandbox as ready when a sentinel file exists, **or** `test -S` passes on the configured `sandbox_ssh_socket_path` (**supervisor / Unix-socket path**), **or** a legacy TCP listen check on the published SSH port — so the `Ready` phase used by `CreateSshSession` and `ForwardTcp` can reflect Unix-socket–based startup, not only a TCP listener.
## Two-Plane Architecture
The supervisor and gateway maintain two logical planes over **one TCP+TLS connection**, multiplexed by HTTP/2 streams:
-- **Control plane** -- the `ConnectSupervisor` bidirectional gRPC stream. Carries `SupervisorHello`, heartbeats, `RelayOpen`/`RelayClose` requests from the gateway, and `RelayOpenResult`/`RelayClose` replies from the supervisor. Lives for the lifetime of the sandbox supervisor process.
-- **Data plane** -- one `RelayStream` bidirectional gRPC call per SSH connect or exec invocation. Each call is a new HTTP/2 stream on the same connection. Frames are opaque bytes except for the first frame from the supervisor, which is a typed `RelayInit { channel_id }` used to pair the stream with a pending relay slot on the gateway.
+- **Control plane** -- the `ConnectSupervisor` bidirectional gRPC stream. Carries `SupervisorHello`, heartbeats, targetable `RelayOpen`/`RelayClose` requests from the gateway, and `RelayOpenResult`/`RelayClose` replies from the supervisor. Lives for the lifetime of the sandbox supervisor process.
+- **Data plane** -- one `RelayStream` bidirectional gRPC call per accepted relay. Each call is a new HTTP/2 stream on the same connection. Frames are opaque bytes except for the first frame from the supervisor, which is a typed `RelayInit { channel_id }` used to pair the stream with a pending relay slot on the gateway.
-Running both planes over one HTTP/2 connection means each relay avoids a fresh TLS handshake and benefits from a single authenticated transport boundary. Hyper/h2 `adaptive_window(true)` is enabled on both sides so bulk transfers (large file uploads, long exec stdout) aren't pinned to the default 64 KiB stream window.
+Running both planes over one HTTP/2 connection means each relay avoids a fresh TLS handshake and benefits from a single authenticated transport boundary. Hyper/h2 adaptive windows are enabled on the gateway, the sandbox supervisor channel, and CLI gRPC channels so bulk transfers (large file uploads, long exec stdout) aren't pinned to the default 64 KiB stream window.
The supervisor-initiated direction gives the model two properties:
1. The sandbox pod exposes no ingress surface. Network reachability is whatever the supervisor itself can reach outward.
2. Authentication reduces to one place: the existing gateway mTLS channel. There is no second application-layer handshake to design, rotate, or replay-protect.
+### Targetable relay base
+
+`RelayOpen` is targetable but remains SSH-compatible by default. In `proto/openshell.proto`, `RelayOpen.target` is an optional `oneof` with:
+
+- `SshRelayTarget` -- the built-in SSH target. This is the explicit target used by the server-side `open_relay()` wrapper, so existing SSH connect and `ExecSandbox` flows continue to request SSH without each caller constructing the target.
+- `TcpRelayTarget { host, port }` -- a supervisor-dialed TCP target inside the sandbox. The supervisor accepts only loopback hosts (`127.0.0.1`, `::1`, or `localhost`) and ports `1..=65535`.
+
+If `target` is absent, the supervisor treats the relay as `SshRelayTarget` for compatibility with older gateways or messages. The supervisor opens the target before sending `RelayOpenResult { success: true }`; if validation or dialing fails, it sends `success: false` with the error and does not start a `RelayStream`.
+
+### CLI forward service over gRPC
+
+**Files**: `proto/openshell.proto`, `crates/openshell-cli/src/run.rs`, `crates/openshell-server/src/grpc/sandbox.rs`
+
+`ForwardTcp` is a bidirectional gRPC stream between the CLI and gateway. The CLI sends `TcpForwardFrame::Init { sandbox_id, service_id, target.tcp, authorization_token }` as the first frame, followed by `TcpForwardFrame::Data` chunks from the accepted local TCP connection. The gateway validates that the sandbox exists and is `Ready`, validates the session token, validates that the target is loopback-only, calls `open_relay_with_target(TcpRelayTarget)`, waits for the supervisor's `RelayStream`, and bridges opaque bytes between the CLI stream and the relay stream.
+
+The spike command does not create persistent `SandboxService` records yet. It takes the target directly from CLI flags and uses the same loopback-only target restrictions that the supervisor enforces again at relay-open time.
+
## Components
### CLI SSH module
@@ -52,7 +71,7 @@ These helpers are re-exported from `crates/openshell-cli/src/run.rs` for backwar
**File**: `crates/openshell-cli/src/main.rs` (`Commands::SshProxy`)
-A top-level CLI subcommand (`ssh-proxy`) that the SSH `ProxyCommand` invokes. It receives `--gateway`, `--sandbox-id`, `--token`, and `--gateway-name` flags, then delegates to `sandbox_ssh_proxy()`. This process has no TTY of its own -- it pipes stdin/stdout directly to the gateway tunnel.
+A top-level CLI subcommand (`ssh-proxy`) that the SSH `ProxyCommand` invokes. It receives `--gateway`, `--sandbox-id`, `--token`, and `--gateway-name` flags, then delegates to `sandbox_ssh_proxy()`. This process has no TTY of its own -- it pipes stdin/stdout directly to the gateway `ForwardTcp` stream.
### gRPC session bootstrap
@@ -60,7 +79,7 @@ A top-level CLI subcommand (`ssh-proxy`) that the SSH `ProxyCommand` invokes. It
Two RPCs manage SSH session tokens:
-- `CreateSshSession(sandbox_id)` -- validates the sandbox exists and is `Ready`, generates a UUID token, persists an `SshSession` record, and returns the token plus gateway connection details (host, port, scheme, connect path, optional TTL).
+- `CreateSshSession(sandbox_id)` -- validates the sandbox exists and is `Ready`, generates a UUID token, persists an `SshSession` record, and returns the token plus gateway connection details (host, port, scheme, optional TTL).
- `RevokeSshSession(token)` -- marks the session's `revoked` flag to `true` in the persistence layer.
### Supervisor session registry
@@ -76,11 +95,13 @@ Key operations:
- `register(sandbox_id, session_id, tx)` -- inserts a new session and returns the previous sender if it superseded one. Used by `handle_connect_supervisor` to accept a new stream.
- `remove_if_current(sandbox_id, session_id)` -- removes only if the stored `session_id` matches. Guards against the supersede race where an old session's cleanup runs after a newer session has already registered.
-- `open_relay(sandbox_id, timeout)` -- called by the gateway tunnel and exec handlers. Waits up to `timeout` for a supervisor session to appear (with exponential backoff 100 ms → 2 s), registers a pending relay slot keyed by a fresh `channel_id`, sends `RelayOpen` to the supervisor, and returns a `oneshot::Receiver` that resolves when the supervisor claims the slot.
+- `open_relay(sandbox_id, timeout)` -- called by exec handlers. Wraps `open_relay_with_target()` with `SshRelayTarget`, waits up to `timeout` for a supervisor session to appear (with exponential backoff 100 ms → 2 s), registers a pending relay slot keyed by a fresh `channel_id`, sends `RelayOpen` to the supervisor, and returns a `oneshot::Receiver>` that resolves when the supervisor claims the slot or reports target-open failure.
+- `open_relay_with_target(sandbox_id, target, service_id, timeout)` -- lower-level relay opener for explicit `RelayOpen.target` values. It stores the full `RelayOpen` in the pending slot so replay after supervisor supersede preserves the requested target.
+- `fail_pending_relay(channel_id, error)` -- removes a pending relay and wakes the caller with `Status::unavailable` when the supervisor sends `RelayOpenResult { success: false }`.
- `claim_relay(channel_id)` -- called by `handle_relay_stream` when the supervisor's first `RelayFrame::Init` arrives. Removes the pending entry, enforces a 10-second staleness bound (`RELAY_PENDING_TIMEOUT`), creates a 64 KiB `tokio::io::duplex` pair, hands the gateway-side half to the waiter, and returns the supervisor-side half to be bridged against the inbound/outbound `RelayFrame` streams.
- `reap_expired_relays()` -- bounds leaks from pending slots the supervisor never claimed (e.g., supervisor crashed between `RelayOpen` and `RelayStream`). Scheduled every 30 s by `spawn_relay_reaper()` during server startup.
-The `ConnectSupervisor` handler (`handle_connect_supervisor`) validates `SupervisorHello`, assigns a fresh `session_id`, sends `SessionAccepted { heartbeat_interval_secs: 15 }`, spawns a loop that processes inbound messages (`Heartbeat`, `RelayOpenResult`, `RelayClose`), and emits a `GatewayHeartbeat` every 15 seconds.
+The `ConnectSupervisor` handler (`handle_connect_supervisor`) validates `SupervisorHello`, assigns a fresh `session_id`, sends `SessionAccepted { heartbeat_interval_secs: 15 }`, spawns a loop that processes inbound messages (`Heartbeat`, `RelayOpenResult`, `RelayClose`), and emits a `GatewayHeartbeat` every 15 seconds. Successful `RelayOpenResult` values are informational; failed results wake the pending relay waiter via `fail_pending_relay()` instead of only being logged.
### RelayStream handler
@@ -93,18 +114,19 @@ Accepts one inbound `RelayFrame` to extract `channel_id` from `RelayInit`, claim
The first frame that isn't `RelayInit` is rejected (`invalid_argument`). Any non-data frame after init closes the relay.
-### Gateway tunnel handler
+### Gateway `ForwardTcp` handler
-**File**: `crates/openshell-server/src/ssh_tunnel.rs`
+**File**: `crates/openshell-server/src/grpc/sandbox.rs` (`handle_forward_tcp`)
-An Axum route at `/connect/ssh` on the shared gateway port. Handles HTTP CONNECT requests by:
+Handles one CLI-to-gateway bidirectional `ForwardTcp` stream by:
-1. Validating the session token (present, not revoked, bound to the sandbox id in `X-Sandbox-Id`, not expired).
+1. Reading the first `TcpForwardFrame` and requiring `TcpForwardInit`.
2. Confirming the sandbox is in `Ready` phase.
-3. Enforcing per-token (max 3) and per-sandbox (max 20) concurrent connection limits.
-4. Calling `supervisor_sessions.open_relay(sandbox_id, 30s)` -- the 30-second wait covers the supervisor's initial mTLS + `ConnectSupervisor` handshake on a freshly-scheduled pod.
-5. Waiting up to 10 seconds for the supervisor to open its `RelayStream` and deliver the gateway-side `DuplexStream`.
-6. Performing the HTTP CONNECT upgrade on the client connection and calling `copy_bidirectional` between the upgraded client socket and the relay stream.
+3. Validating `authorization_token` against the `SshSession` row and enforcing per-token (max 3) and per-sandbox (max 20) concurrent connection limits.
+4. For `target.tcp`, validating that the target host is loopback-only and the port is `1..=65535`.
+5. Calling `supervisor_sessions.open_relay_with_target(...)` with the validated `SshRelayTarget` or `TcpRelayTarget`.
+6. Waiting up to 10 seconds for the supervisor to open its `RelayStream` and deliver the gateway-side `DuplexStream`, or to report target-open failure.
+7. Bridging opaque `TcpForwardFrame::Data` chunks between the CLI stream and the relay stream.
There is no gateway-to-sandbox TCP dial, handshake preface, or pod-IP resolution in this path.
@@ -112,23 +134,23 @@ There is no gateway-to-sandbox TCP dial, handshake preface, or pod-IP resolution
**File**: `crates/openshell-server/src/multiplex.rs`
-The gateway runs a single listener that multiplexes gRPC and HTTP on the same port. `MultiplexedService` routes based on the `content-type` header: requests with `application/grpc` go to the gRPC router; all others (including HTTP CONNECT) go to the HTTP router. The HTTP router (`crates/openshell-server/src/http.rs`) merges health endpoints with the SSH tunnel router. Hyper is configured with `http2().adaptive_window(true)` so the HTTP/2 stream windows grow under load rather than throttling `RelayStream` to the default 64 KiB window.
+The gateway runs a single listener that multiplexes gRPC and HTTP on the same port. `MultiplexedService` routes based on the `content-type` header: requests with `application/grpc` go to the gRPC router; all others go to the HTTP router for health endpoints. Hyper is configured with `http2().adaptive_window(true)` so the HTTP/2 stream windows grow under load rather than throttling `ForwardTcp` or `RelayStream` to the default 64 KiB window.
### Sandbox supervisor session
**File**: `crates/openshell-sandbox/src/supervisor_session.rs`
-`spawn(endpoint, sandbox_id, ssh_socket_path)` starts a background task that:
+`spawn(endpoint, sandbox_id, ssh_socket_path, netns_fd)` starts a background task that:
1. Opens a gRPC `Channel` to the gateway (`http2_adaptive_window(true)`). The same channel multiplexes the control stream and every relay.
2. Sends `SupervisorHello { sandbox_id, instance_id }` as the first outbound message.
3. Waits for `SessionAccepted` (or fails fast on `SessionRejected`).
4. Runs a loop that reads inbound `GatewayMessage` values and emits `SupervisorHeartbeat` at the accepted interval (min 5 s, usually 15 s).
-5. On `RelayOpen`, spawns `handle_relay_open()` which opens a new `RelayStream` RPC on the existing channel, sends `RelayInit { channel_id }` as the first frame, dials the local SSH Unix socket, and bridges bytes in both directions in 16 KiB chunks.
+5. On `RelayOpen`, spawns `handle_relay_open()` which resolves the target (`SshRelayTarget`, `TcpRelayTarget`, or targetless-as-SSH), validates loopback-only TCP targets, dials SSH through the Unix socket or TCP from the sandbox network namespace, sends `RelayOpenResult`, opens a new `RelayStream` RPC on the existing channel, sends `RelayInit { channel_id }` as the first frame, and bridges bytes in both directions in 16 KiB chunks.
Reconnect policy: the session loop wraps `run_single_session()` with exponential backoff (1 s → 30 s) on any error. A `session_established` / `session_failed` OCSF event is emitted on each attempt.
-The supervisor is a dumb byte bridge with no awareness of the SSH protocol flowing through it.
+After target selection, the supervisor is a dumb byte bridge with no awareness of the SSH protocol flowing through it.
### Sandbox SSH daemon
@@ -151,7 +173,7 @@ The `ExecSandbox` gRPC RPC provides programmatic command execution without requi
1. Validates `sandbox_id`, `command`, env keys, and field sizes; confirms the sandbox is `Ready`.
2. Calls `supervisor_sessions.open_relay(sandbox_id, 15s)` -- a shorter wait than connect because exec runs in steady state, not on cold start.
3. Waits up to 10 seconds for the relay `DuplexStream` to arrive.
-4. Starts a single-use localhost TCP listener on `127.0.0.1:0` and spawns a task that bridges a single accept to the `DuplexStream` with `copy_bidirectional`. This adapts the `DuplexStream` to something `russh::client::connect_stream` can dial.
+4. Starts a single-use localhost TCP listener on `127.0.0.1:0` and spawns a task that bridges a single accept to the `DuplexStream` with `copy_bidirectional`. This adapts the SSH-targeted `DuplexStream` to something `russh::client::connect_stream` can dial.
5. Connects `russh` to the local proxy, authenticates `none` as user `sandbox`, opens a channel, optionally requests a PTY, and executes the shell-escaped command.
6. Streams `stdout`/`stderr`/`exit` events back to the gRPC caller.
@@ -180,27 +202,26 @@ sequenceDiagram
User->>CLI: openshell sandbox connect foo
CLI->>GW: GetSandbox(name) -> sandbox.id
CLI->>GW: CreateSshSession(sandbox_id)
- GW-->>CLI: token, gateway_host, gateway_port, scheme, connect_path
+ GW-->>CLI: token, gateway_host, gateway_port, scheme
Note over CLI: Builds ProxyCommand string: exec()s ssh
User->>CLI: ssh spawns ssh-proxy subprocess
- CLI->>GW: CONNECT /connect/ssh X-Sandbox-Id, X-Sandbox-Token
- GW->>GW: Validate token + sandbox Ready
- GW->>Reg: open_relay(sandbox_id, 30s)
+ CLI->>GW: ForwardTcp stream TcpForwardInit{target.ssh, authorization_token}
+ GW->>GW: Validate authorization_token + sandbox Ready
+ GW->>Reg: open_relay_with_target(sandbox_id, target=ssh, 15s)
Reg-->>GW: (channel_id, relay_rx)
- GW->>Sup: RelayOpen{channel_id} (over ConnectSupervisor)
+ GW->>Sup: RelayOpen{channel_id, target=ssh} (over ConnectSupervisor)
+ Sup->>Sock: UnixStream::connect(/run/openshell/ssh.sock)
+ Sock-->>SSHD: connection accepted
+ Sup->>GW: RelayOpenResult{success=true}
Sup->>GW: RelayStream RPC (new HTTP/2 stream)
Sup->>GW: RelayFrame::Init{channel_id}
GW->>Reg: claim_relay(channel_id) -> DuplexStream pair
Reg-->>GW: gateway-side DuplexStream (via relay_rx)
- Sup->>Sock: UnixStream::connect(/run/openshell/ssh.sock)
- Sock-->>SSHD: connection accepted
- GW-->>CLI: 200 OK (upgrade)
-
- Note over CLI,SSHD: SSH protocol over: CLI↔GW (HTTP CONNECT) ↔ RelayStream frames ↔ Sup ↔ Unix socket ↔ SSHD
+ Note over CLI,SSHD: SSH protocol over: CLI↔GW (ForwardTcp gRPC) ↔ RelayStream frames ↔ Sup ↔ Unix socket ↔ SSHD
CLI->>SSHD: SSH handshake + auth_none
SSHD-->>CLI: Auth accepted
@@ -225,9 +246,9 @@ sequenceDiagram
- `-o SetEnv=TERM=xterm-256color`
- `sandbox` as the SSH user
4. If stdin is a terminal (interactive), the CLI calls `exec()` (Unix) to replace itself with the `ssh` process. Otherwise it spawns and waits.
-5. `sandbox_ssh_proxy()` connects via TCP (plain) or TLS (mTLS) to the gateway, sends a raw HTTP CONNECT request with `X-Sandbox-Id` and `X-Sandbox-Token` headers, and on a 200 response spawns two tasks to copy bytes between stdin/stdout and the tunnel.
-6. Gateway-side: `ssh_connect()` in `ssh_tunnel.rs` authorizes the request, opens a relay, waits for the supervisor's `RelayStream`, and bridges the upgraded HTTP connection to the relay with `tokio::io::copy_bidirectional`.
-7. Supervisor-side: on `RelayOpen`, `handle_relay_open()` in `crates/openshell-sandbox/src/supervisor_session.rs` opens a `RelayStream` RPC, sends `RelayInit`, dials `/run/openshell/ssh.sock`, and bridges the frames to the Unix socket.
+5. `sandbox_ssh_proxy()` opens a gRPC `ForwardTcp` stream, sends `TcpForwardInit { sandbox_id, service_id: "ssh-proxy:", target.ssh, authorization_token: token }`, and spawns two tasks to copy bytes between stdin/stdout and `TcpForwardFrame::Data` messages.
+6. Gateway-side: `handle_forward_tcp()` authorizes the SSH target with `authorization_token`, opens an SSH-targeted relay through `SupervisorSessionRegistry::open_relay_with_target()`, waits for the supervisor's `RelayStream`, and bridges `TcpForwardFrame::Data` to the relay stream.
+7. Supervisor-side: on `RelayOpen`, `handle_relay_open()` in `crates/openshell-sandbox/src/supervisor_session.rs` dials `/run/openshell/ssh.sock`, reports `RelayOpenResult { success: true }`, opens a `RelayStream` RPC, sends `RelayInit`, and bridges the frames to the Unix socket.
### Command Execution (CLI)
@@ -303,13 +324,14 @@ sequenceDiagram
Client->>GW: ExecSandbox(sandbox_id, command, stdin, timeout)
GW->>GW: Validate sandbox exists + Ready
- GW->>Reg: open_relay(sandbox_id, 15s)
+ GW->>Reg: open_relay(sandbox_id, 15s) -> target=ssh
Reg-->>GW: (channel_id, relay_rx)
- GW->>Sup: RelayOpen{channel_id}
+ GW->>Sup: RelayOpen{channel_id, target=ssh}
+ Sup->>SSHD: connect /run/openshell/ssh.sock
+ Sup->>GW: RelayOpenResult{success=true}
Sup->>GW: RelayStream + RelayInit{channel_id}
GW->>Reg: claim_relay -> DuplexStream
- Sup->>SSHD: connect /run/openshell/ssh.sock
Note over GW: start_single_use_ssh_proxy_over_relay (127.0.0.1:ephemeral -> DuplexStream)
@@ -412,7 +434,7 @@ Tests in `supervisor_session.rs` pin this behavior:
All gRPC traffic (control plane + data plane + other RPCs) rides one mTLS-authenticated TCP+TLS+HTTP/2 connection from the supervisor to the gateway. Client certificates prove the supervisor's identity; the server certificate proves the gateway's. Nothing sits between the supervisor and the SSH daemon except the Unix socket's filesystem permissions.
-The CLI continues to authenticate to the gateway with its own mTLS credentials (or Cloudflare bearer token in reverse-proxy deployments) and a per-session token returned by `CreateSshSession`. The session token is enforced at the gateway: token scope (sandbox id), revocation state, and optional expiry are all checked in `ssh_connect()` before `open_relay()` is called.
+The CLI continues to authenticate to the gateway with its own mTLS credentials (or Cloudflare bearer token in reverse-proxy deployments) and a per-session token returned by `CreateSshSession`. The session token is enforced at the gateway: token scope (sandbox id), revocation state, and optional expiry are all checked in `handle_forward_tcp()` before `open_relay_with_target()` is called for `target.ssh`.
### Unix socket access control
@@ -445,7 +467,7 @@ The sandbox generates a fresh Ed25519 host key on every startup. The CLI disable
## Sandbox Target Resolution
-The gateway does not resolve a sandbox's network address or port. The only identifier that matters is `sandbox_id`, which keys into the supervisor session registry.
+The gateway does not resolve a sandbox pod network address or port. The `sandbox_id` keys into the supervisor session registry, and the optional `RelayOpen.target` tells the already-connected supervisor what local target to dial inside the sandbox. SSH callers use `SshRelayTarget`; targetless messages also resolve to SSH. TCP relay targets are valid only for loopback destinations and are rejected by the supervisor before any `RelayStream` starts.
## API and Persistence
@@ -464,7 +486,6 @@ Response:
- `gateway_host` (string) -- resolved from `Config::ssh_gateway_host` (defaults to bind address if empty)
- `gateway_port` (uint32) -- resolved from `Config::ssh_gateway_port` (defaults to bind port if 0)
- `gateway_scheme` (string) -- `"https"` if TLS is configured, otherwise `"http"`
-- `connect_path` (string) -- from `Config::ssh_connect_path` (default: `/connect/ssh`)
- `host_key_fingerprint` (string) -- currently unused (empty)
- `expires_at_ms` (int64) -- session expiry; 0 disables expiry
@@ -478,6 +499,18 @@ Response:
- `revoked` (bool) -- true if a session was found and revoked
+### ForwardTcp
+
+**Proto**: `proto/openshell.proto` -- `TcpForwardFrame` / `TcpForwardInit`
+
+`ForwardTcp(stream TcpForwardFrame) returns (stream TcpForwardFrame)` carries opaque bytes between the CLI and gateway. The first frame must be `TcpForwardInit`:
+
+- `sandbox_id` (string) -- sandbox to connect to
+- `service_id` (string) -- optional audit/correlation identifier
+- `target.ssh` -- SSH target used by `ssh-proxy`
+- `target.tcp` -- loopback TCP target used by service forwarding
+- `authorization_token` (string) -- short-lived session token from `CreateSshSession`, required for all targets
+
### SshSession persistence
**Proto**: `proto/openshell.proto` -- `SshSession` message
@@ -512,8 +545,10 @@ Key messages:
| `SessionRejected` | gw → sup | `reason` |
| `SupervisorHeartbeat` | sup → gw | (empty) |
| `GatewayHeartbeat` | gw → sup | (empty) |
-| `RelayOpen` | gw → sup | `channel_id` (UUID) |
-| `RelayOpenResult` | sup → gw | `channel_id`, `success`, `error` |
+| `RelayOpen` | gw → sup | `channel_id` (UUID), optional `target` (`SshRelayTarget` or loopback-only `TcpRelayTarget`), `service_id` |
+| `SshRelayTarget` | gw → sup | Empty built-in SSH target; absence of `target` is treated the same way |
+| `TcpRelayTarget` | gw → sup | `host`, `port`; supervisor accepts only `127.0.0.1`, `::1`, or `localhost` and ports `1..=65535` |
+| `RelayOpenResult` | sup → gw | `channel_id`, `success`, `error`; failure wakes the pending gateway waiter |
| `RelayClose` | either | `channel_id`, `reason` |
| `RelayInit` | sup → gw (first `RelayFrame`) | `channel_id` |
| `RelayFrame` | either | `oneof { RelayInit init, bytes data }` |
@@ -554,7 +589,7 @@ This function is shared between the CLI and TUI via the `openshell-core::forward
| Stage | Duration | Where |
|---|---|---|
-| Supervisor session wait (SSH connect) | 30 s | `ssh_tunnel::ssh_connect` -> `open_relay` |
+| Supervisor session wait (`ForwardTcp`) | 15 s | `handle_forward_tcp` -> `open_relay_with_target` |
| Supervisor session wait (ExecSandbox) | 15 s | `handle_exec_sandbox` -> `open_relay` |
| Wait for supervisor to claim relay | 10 s | `relay_rx` wrapped in `tokio::time::timeout` |
| Pending-relay TTL (reaper) | 10 s | `RELAY_PENDING_TIMEOUT` in registry |
@@ -569,18 +604,18 @@ This function is shared between the CLI and TUI via the `openshell-core::forward
| Scenario | Status / Behavior | Source |
|---|---|---|
-| Missing `X-Sandbox-Id` or `X-Sandbox-Token` header | `401 Unauthorized` | `ssh_tunnel.rs` -- `header_value()` |
-| Empty header value | `400 Bad Request` | `ssh_tunnel.rs` -- `header_value()` |
-| Non-CONNECT method on `/connect/ssh` | `405 Method Not Allowed` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Token not found in persistence | `401 Unauthorized` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Token revoked or sandbox ID mismatch | `401 Unauthorized` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Token expired | `401 Unauthorized` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Sandbox not found | `404 Not Found` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Sandbox not in `Ready` phase | `412 Precondition Failed` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Per-token or per-sandbox concurrency limit hit | `429 Too Many Requests` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Supervisor session not connected after 30 s | `502 Bad Gateway` | `ssh_tunnel.rs` -- `ssh_connect()` |
-| Supervisor failed to claim relay within 10 s | Tunnel closed; `"relay open timed out"` logged | `ssh_tunnel.rs` -- spawned tunnel task |
-| Relay channel oneshot dropped | Tunnel closed; `"relay channel dropped"` logged | `ssh_tunnel.rs` -- spawned tunnel task |
+| Empty `ForwardTcp` stream or first frame is not `TcpForwardInit` | `invalid_argument` | `handle_forward_tcp` |
+| Missing `authorization_token` for `ForwardTcp` | `unauthenticated` | `acquire_forward_connection_guard` |
+| Token not found in persistence | `unauthenticated` | `validate_ssh_forward_token` |
+| Token revoked or sandbox ID mismatch | `unauthenticated` | `validate_ssh_forward_token` |
+| Token expired | `unauthenticated` | `validate_ssh_forward_token` |
+| Sandbox not found | `not_found` | `handle_forward_tcp` |
+| Sandbox not in `Ready` phase | `failed_precondition` | `handle_forward_tcp` |
+| Per-token or per-sandbox concurrency limit hit | `resource_exhausted` | `acquire_ssh_connection_slots` |
+| Supervisor session not connected after 15 s | `unavailable` | `handle_forward_tcp` |
+| Supervisor rejects relay target or cannot dial it | `ForwardTcp` stream returns the supervisor error, or `ExecSandbox` returns `unavailable`; pending relay waiter is woken with the supervisor error | `handle_relay_open`, `fail_pending_relay` |
+| Supervisor failed to claim relay within 10 s | `deadline_exceeded`; `"ForwardTcp: relay open timed out"` logged | `handle_forward_tcp` spawned task |
+| Relay channel oneshot dropped | `unavailable`; `"ForwardTcp: relay channel dropped"` logged | `handle_forward_tcp` spawned task |
| First `RelayFrame` not `RelayInit` or empty `channel_id` | `invalid_argument` on `RelayStream` | `supervisor_session.rs` -- `handle_relay_stream` |
| `RelayStream` arrives after pending entry expired (>10 s) | `deadline_exceeded` | `supervisor_session.rs` -- `claim_relay` |
| Gateway restart during live relay | CLI SSH detects via keepalive within ~45 s; relays are torn down with the TCP connection | CLI `ServerAliveInterval=15`, `ServerAliveCountMax=3` |
@@ -591,9 +626,9 @@ This function is shared between the CLI and TUI via the `openshell-core::forward
## Graceful Shutdown
-### Gateway tunnel teardown
+### Gateway forward teardown
-After `copy_bidirectional` completes on either side, `ssh_connect()` calls `AsyncWriteExt::shutdown()` on the upgraded client connection so SSH sees a clean EOF and can read any remaining protocol data (e.g., exit-status) before exiting.
+When the CLI-to-gateway stream ends, `bridge_forward_tcp_stream()` shuts down the relay write half so SSH sees a clean EOF and can read any remaining protocol data (e.g., exit-status) before exiting.
### RelayStream teardown
@@ -617,7 +652,6 @@ The sandbox SSH daemon's exit thread waits for the reader thread to finish forwa
|---|---|---|
| `ssh_gateway_host` | `127.0.0.1` | Public hostname/IP advertised in `CreateSshSessionResponse` |
| `ssh_gateway_port` | `8080` | Public port for gateway connections (0 = use bind port) |
-| `ssh_connect_path` | `/connect/ssh` | HTTP path for CONNECT requests |
| `sandbox_ssh_socket_path` | `/run/openshell/ssh.sock` | Path the supervisor binds its Unix socket on; passed to the sandbox as `OPENSHELL_SSH_SOCKET_PATH` |
| `ssh_session_ttl_secs` | (default in code) | Default TTL applied to new `SshSession` rows; 0 disables expiry |
diff --git a/architecture/system-architecture.md b/architecture/system-architecture.md
index 5c7fcdcf7..b21b22331 100644
--- a/architecture/system-architecture.md
+++ b/architecture/system-architecture.md
@@ -94,7 +94,7 @@ graph TB
CLI -- "gRPC over HTTPS (mTLS) :30051 NodePort" --> Gateway
TUI -- "gRPC polling (mTLS) every 2s" --> Gateway
SDK -- "gRPC over HTTPS (mTLS)" --> Gateway
- CLI -- "HTTP CONNECT upgrade /connect/ssh (mTLS)" --> Gateway
+ CLI -- "gRPC ForwardTcp target.ssh (mTLS)" --> Gateway
CLI -. "reads mTLS certs" .-> LocalConfig
%% ============================================================
@@ -108,9 +108,9 @@ graph TB
%% ============================================================
%% CONNECTIONS: Supervisor session (inbound from sandbox)
%% ============================================================
- RelayBridge -- "ConnectSupervisor (persistent bidi stream)" --> SupRegistry
- RelayBridge -- "RelayStream (per-invocation byte bridge, same HTTP/2 connection)" --> SupRegistry
- RelayBridge -- "Unix socket SSH bytes" --> SSHServer
+ RelayBridge -- "ConnectSupervisor (persistent bidi stream, targetable RelayOpen)" --> SupRegistry
+ RelayBridge -- "RelayStream (per-accepted-relay byte bridge, same HTTP/2 connection)" --> SupRegistry
+ RelayBridge -- "Unix socket SSH target bytes" --> SSHServer
%% ============================================================
%% CONNECTIONS: CRD Controller
@@ -153,7 +153,7 @@ graph TB
%% ============================================================
%% CLIENT SSH / EXEC (bytes tunneled via supervisor relay)
%% ============================================================
- CLI -- "HTTP CONNECT /connect/ssh + tar-over-SSH file sync (bytes bridged through SupervisorSessionRegistry)" --> Gateway
+ CLI -- "gRPC ForwardTcp(target.ssh) + tar-over-SSH file sync (bytes bridged through SupervisorSessionRegistry)" --> Gateway
%% ============================================================
%% STYLES
@@ -195,9 +195,9 @@ graph TB
1. **CLI/SDK to Gateway**: All control-plane traffic uses gRPC over HTTPS with mutual TLS (mTLS). Single multiplexed port (8080 inside cluster, 30051 NodePort).
-2. **Supervisor Session (inbound from sandbox)**: Each sandbox supervisor opens a persistent `ConnectSupervisor` bidi gRPC stream to the gateway over mTLS. The gateway tracks these in `SupervisorSessionRegistry`. When SSH or exec access is needed, the gateway sends `RelayOpen { channel_id }` on that stream; the supervisor responds by initiating a `RelayStream` RPC on the same HTTP/2 connection whose first frame is a `RelayInit { channel_id }`. Subsequent frames carry raw bytes in both directions. The gateway never dials the sandbox pod.
+2. **Supervisor Session (inbound from sandbox)**: Each sandbox supervisor opens a persistent `ConnectSupervisor` bidi gRPC stream to the gateway over mTLS. The gateway tracks these in `SupervisorSessionRegistry`. When SSH or exec access is needed, the gateway sends `RelayOpen { channel_id, target = SshRelayTarget }` on that stream; targetless relay requests remain SSH-compatible, and TCP targets are supervisor-validated as loopback-only. The supervisor dials the target before reporting a successful `RelayOpenResult`, then initiates a `RelayStream` RPC on the same HTTP/2 connection whose first frame is a `RelayInit { channel_id }`. Subsequent frames carry raw bytes in both directions. The gateway never dials the sandbox pod.
-3. **SSH / Exec Access**: CLI connects via HTTP CONNECT upgrade at `/connect/ssh` (or calls `ExecSandbox` gRPC). The gateway authenticates, calls `open_relay`, and bridges the client bytes through the supervisor's `RelayStream` to the supervisor's in-sandbox SSH daemon, which binds to a Unix socket (`/run/openshell/ssh.sock`) rather than a TCP port.
+3. **SSH / Exec Access**: CLI connects via the bidirectional gRPC `ForwardTcp` stream with `TcpForwardInit.target = SshRelayTarget` (or calls `ExecSandbox` gRPC). The gateway authenticates the SSH target with the short-lived session token, calls `open_relay_with_target(SshRelayTarget)`, and bridges the client bytes through the supervisor's `RelayStream` to the supervisor's in-sandbox SSH daemon, which binds to a Unix socket (`/run/openshell/ssh.sock`) rather than a TCP port.
4. **File Sync**: tar archives streamed over the relay-tunneled SSH session (no rsync dependency).
diff --git a/crates/openshell-cli/Cargo.toml b/crates/openshell-cli/Cargo.toml
index b3a006fdd..bf6065194 100644
--- a/crates/openshell-cli/Cargo.toml
+++ b/crates/openshell-cli/Cargo.toml
@@ -63,6 +63,7 @@ tokio-tungstenite = { workspace = true }
# Streams
futures = { workspace = true }
+tokio-stream = { workspace = true }
nix = { workspace = true }
# URL parsing
diff --git a/crates/openshell-cli/src/main.rs b/crates/openshell-cli/src/main.rs
index 3502c2b07..38e61c279 100644
--- a/crates/openshell-cli/src/main.rs
+++ b/crates/openshell-cli/src/main.rs
@@ -8,6 +8,7 @@ use clap_complete::engine::ArgValueCompleter;
use clap_complete::env::CompleteEnv;
use miette::Result;
use owo_colors::OwoColorize;
+use std::collections::HashMap;
use std::io::Write;
use openshell_bootstrap::{
@@ -234,6 +235,7 @@ const FORWARD_EXAMPLES: &str = "\x1b[1mALIAS\x1b[0m
\x1b[1mEXAMPLES\x1b[0m
$ openshell forward start 8080
$ openshell forward start 3000 my-sandbox
+ $ openshell forward service my-sandbox --target-port 8000 --local 8000
$ openshell forward stop 8080
$ openshell forward list
";
@@ -1667,6 +1669,26 @@ enum ForwardCommands {
/// List active port forwards.
#[command(help_template = LEAF_HELP_TEMPLATE, next_help_heading = "FLAGS")]
List,
+
+ /// Forward a local TCP port to a loopback service inside a sandbox over gRPC.
+ #[command(help_template = LEAF_HELP_TEMPLATE, next_help_heading = "FLAGS")]
+ Service {
+ /// Sandbox name (defaults to last-used sandbox).
+ #[arg(add = ArgValueCompleter::new(completers::complete_sandbox_names))]
+ name: Option,
+
+ /// Target service port inside the sandbox.
+ #[arg(long)]
+ target_port: u16,
+
+ /// Target service host inside the sandbox. Phase 1 accepts loopback only.
+ #[arg(long, default_value = "127.0.0.1")]
+ target_host: String,
+
+ /// Local bind address and port: [bind_address:]port. Defaults to the target port. Use port 0 for dynamic assignment.
+ #[arg(long)]
+ local: Option,
+ },
}
#[tokio::main]
@@ -1954,6 +1976,27 @@ async fn main() -> Result<()> {
}
}
}
+ ForwardCommands::Service {
+ name,
+ target_port,
+ target_host,
+ local,
+ } => {
+ let ctx = resolve_gateway(&cli.gateway, &cli.gateway_endpoint)?;
+ let mut tls = tls.with_gateway_name(&ctx.name);
+ apply_edge_auth(&mut tls, &ctx.name);
+ let name = resolve_sandbox_name(name, &ctx.name)?;
+ let local = local.unwrap_or_else(|| target_port.to_string());
+ run::service_forward_tcp(
+ &ctx.endpoint,
+ &name,
+ Some(&local),
+ &target_host,
+ target_port,
+ &tls,
+ )
+ .await?;
+ }
ForwardCommands::Start {
port,
name,
@@ -2350,7 +2393,7 @@ async fn main() -> Result<()> {
};
// Parse --label flags into a HashMap.
- let mut labels_map = std::collections::HashMap::new();
+ let mut labels_map = HashMap::new();
for label_str in &labels {
let parts: Vec<&str> = label_str.splitn(2, '=').collect();
if parts.len() != 2 {
diff --git a/crates/openshell-cli/src/run.rs b/crates/openshell-cli/src/run.rs
index 87489014a..442e45f1a 100644
--- a/crates/openshell-cli/src/run.rs
+++ b/crates/openshell-cli/src/run.rs
@@ -24,14 +24,16 @@ use openshell_bootstrap::{
};
use openshell_core::proto::{
ApproveAllDraftChunksRequest, ApproveDraftChunkRequest, ClearDraftChunksRequest,
- CreateProviderRequest, CreateSandboxRequest, DeleteProviderRequest, DeleteSandboxRequest,
- ExecSandboxRequest, GetClusterInferenceRequest, GetDraftHistoryRequest, GetDraftPolicyRequest,
- GetGatewayConfigRequest, GetProviderRequest, GetSandboxConfigRequest, GetSandboxLogsRequest,
- GetSandboxPolicyStatusRequest, GetSandboxRequest, HealthRequest, ListProvidersRequest,
- ListSandboxPoliciesRequest, ListSandboxesRequest, PolicySource, PolicyStatus, Provider,
- RejectDraftChunkRequest, Sandbox, SandboxPhase, SandboxPolicy, SandboxSpec, SandboxTemplate,
- SetClusterInferenceRequest, SettingScope, SettingValue, UpdateConfigRequest,
- UpdateProviderRequest, WatchSandboxRequest, exec_sandbox_event, setting_value,
+ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, DeleteProviderRequest,
+ DeleteSandboxRequest, ExecSandboxRequest, GetClusterInferenceRequest, GetDraftHistoryRequest,
+ GetDraftPolicyRequest, GetGatewayConfigRequest, GetProviderRequest, GetSandboxConfigRequest,
+ GetSandboxLogsRequest, GetSandboxPolicyStatusRequest, GetSandboxRequest, HealthRequest,
+ ListProvidersRequest, ListSandboxPoliciesRequest, ListSandboxesRequest, PolicySource,
+ PolicyStatus, Provider, RejectDraftChunkRequest, RevokeSshSessionRequest, Sandbox,
+ SandboxPhase, SandboxPolicy, SandboxSpec, SandboxTemplate, SetClusterInferenceRequest,
+ SettingScope, SettingValue, TcpForwardFrame, TcpForwardInit, TcpRelayTarget,
+ UpdateConfigRequest, UpdateProviderRequest, WatchSandboxRequest, exec_sandbox_event,
+ setting_value, tcp_forward_init,
};
use openshell_core::settings::{self, SettingValueKind};
use openshell_core::{ObjectId, ObjectName};
@@ -1964,7 +1966,7 @@ pub async fn sandbox_create_with_bootstrap(
tty_override,
Some(false),
auto_providers_override,
- &std::collections::HashMap::new(),
+ &HashMap::new(),
&tls,
)
.await
@@ -2020,7 +2022,7 @@ pub async fn sandbox_create(
tty_override: Option,
bootstrap_override: Option,
auto_providers_override: Option,
- labels: &std::collections::HashMap,
+ labels: &HashMap,
tls: &TlsOptions,
) -> Result<()> {
if editor.is_some() && !command.is_empty() {
@@ -2134,7 +2136,7 @@ pub async fn sandbox_create(
status.message()
));
}
- Err(status) => return Err(status).into_diagnostic(),
+ Err(status) => return Err(miette::miette!(status.to_string())),
};
let sandbox = response
.into_inner()
@@ -2967,6 +2969,295 @@ pub async fn sandbox_exec_grpc(
Ok(exit_code)
}
+pub async fn service_forward_tcp(
+ server: &str,
+ name: &str,
+ local: Option<&str>,
+ target_host: &str,
+ target_port: u16,
+ tls: &TlsOptions,
+) -> Result<()> {
+ let (bind_addr, bind_port) = parse_tcp_forward_spec(local, target_port)?;
+ let mut client = grpc_client(server, tls).await?;
+
+ let sandbox = fetch_ready_sandbox_for_forward(&mut client, name).await?;
+
+ let listener = tokio::net::TcpListener::bind((bind_addr.as_str(), bind_port))
+ .await
+ .into_diagnostic()
+ .wrap_err_with(|| format!("failed to bind local forward on {bind_addr}:{bind_port}"))?;
+ let local_addr = listener
+ .local_addr()
+ .into_diagnostic()
+ .wrap_err("failed to read local forward address")?;
+ eprintln!(
+ "{} Forwarding {} -> {}:{} in sandbox {} via gRPC",
+ "✓".green().bold(),
+ local_addr,
+ target_host,
+ target_port,
+ name,
+ );
+
+ let sandbox_id = sandbox.object_id().to_string();
+ let (fatal_tx, mut fatal_rx) = tokio::sync::mpsc::channel::(1);
+ let mut health_check = tokio::time::interval(Duration::from_secs(2));
+ health_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+ loop {
+ tokio::select! {
+ Some(reason) = fatal_rx.recv() => {
+ return Err(miette::miette!("service forward stopped: {reason}"));
+ }
+
+ _ = health_check.tick() => {
+ fetch_ready_sandbox_for_forward(&mut client, name).await?;
+ }
+
+ accepted = listener.accept() => {
+ let (socket, peer) = accepted
+ .into_diagnostic()
+ .wrap_err("failed to accept local forward connection")?;
+ let mut client = client.clone();
+ let sandbox_id = sandbox_id.clone();
+ let target_host = target_host.to_string();
+ let service_id = format!("service-forward:{name}:{target_host}:{target_port}");
+ let fatal_tx = fatal_tx.clone();
+ tokio::spawn(async move {
+ let token = match create_forward_session_token(&mut client, &sandbox_id).await {
+ Ok(token) => token,
+ Err(err) => {
+ tracing::warn!(peer = %peer, error = %err, "service forward session creation failed");
+ if err.fatal {
+ let _ = fatal_tx.send(err.message).await;
+ }
+ return;
+ }
+ };
+ if let Err(err) = forward_one_tcp_connection(
+ &mut client,
+ socket,
+ sandbox_id,
+ target_host,
+ target_port,
+ service_id,
+ token.clone(),
+ )
+ .await
+ {
+ tracing::warn!(peer = %peer, error = %err, "service forward connection failed");
+ if err.fatal {
+ let _ = fatal_tx.send(err.message).await;
+ }
+ }
+ let _ = client
+ .revoke_ssh_session(RevokeSshSessionRequest { token })
+ .await;
+ });
+ }
+ }
+ }
+}
+
+async fn create_forward_session_token(
+ client: &mut crate::tls::GrpcClient,
+ sandbox_id: &str,
+) -> std::result::Result {
+ let response = client
+ .create_ssh_session(CreateSshSessionRequest {
+ sandbox_id: sandbox_id.to_string(),
+ })
+ .await
+ .map_err(ForwardTcpConnectionError::from_status)?;
+ Ok(response.into_inner().token)
+}
+
+async fn fetch_ready_sandbox_for_forward(
+ client: &mut crate::tls::GrpcClient,
+ name: &str,
+) -> Result {
+ let response = match client
+ .get_sandbox(GetSandboxRequest {
+ name: name.to_string(),
+ })
+ .await
+ {
+ Ok(response) => response,
+ Err(status) if status.code() == Code::NotFound => {
+ return Err(miette::miette!(
+ "sandbox '{name}' no longer exists; stopping service forward"
+ ));
+ }
+ Err(status) => return Err(status).into_diagnostic(),
+ };
+
+ let sandbox = response
+ .into_inner()
+ .sandbox
+ .ok_or_else(|| miette::miette!("sandbox '{name}' not found"))?;
+
+ if SandboxPhase::try_from(sandbox.phase) != Ok(SandboxPhase::Ready) {
+ return Err(miette::miette!(
+ "sandbox '{}' is no longer ready (phase: {}); stopping service forward",
+ name,
+ phase_name(sandbox.phase)
+ ));
+ }
+
+ Ok(sandbox)
+}
+
+#[derive(Debug)]
+struct ForwardTcpConnectionError {
+ message: String,
+ fatal: bool,
+}
+
+impl ForwardTcpConnectionError {
+ fn transient(message: impl Into) -> Self {
+ Self {
+ message: message.into(),
+ fatal: false,
+ }
+ }
+
+ fn from_status(status: Status) -> Self {
+ let fatal = matches!(status.code(), Code::NotFound | Code::FailedPrecondition);
+ Self {
+ message: status.to_string(),
+ fatal,
+ }
+ }
+}
+
+impl std::fmt::Display for ForwardTcpConnectionError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&self.message)
+ }
+}
+
+impl std::error::Error for ForwardTcpConnectionError {}
+
+fn parse_tcp_forward_spec(local: Option<&str>, default_port: u16) -> Result<(String, u16)> {
+ let Some(spec) = local else {
+ return Ok(("127.0.0.1".to_string(), default_port));
+ };
+
+ if let Some(pos) = spec.rfind(':') {
+ let addr = &spec[..pos];
+ let port_str = &spec[pos + 1..];
+ if let Ok(port) = port_str.parse::() {
+ if addr.is_empty() {
+ return Err(miette::miette!("bind address is required before ':'"));
+ }
+ return Ok((addr.to_string(), port));
+ }
+ }
+
+ let port: u16 = spec.parse().map_err(|_| {
+ miette::miette!("invalid local forward spec '{spec}': expected [bind_address:]port")
+ })?;
+ Ok(("127.0.0.1".to_string(), port))
+}
+
+async fn forward_one_tcp_connection(
+ client: &mut crate::tls::GrpcClient,
+ socket: tokio::net::TcpStream,
+ sandbox_id: String,
+ target_host: String,
+ target_port: u16,
+ service_id: String,
+ authorization_token: String,
+) -> std::result::Result<(), ForwardTcpConnectionError> {
+ use tokio::io::{AsyncReadExt, AsyncWriteExt};
+ use tokio_stream::wrappers::ReceiverStream;
+
+ let (tx, rx) = tokio::sync::mpsc::channel::(16);
+ tx.send(TcpForwardFrame {
+ payload: Some(openshell_core::proto::tcp_forward_frame::Payload::Init(
+ TcpForwardInit {
+ sandbox_id,
+ service_id,
+ target: Some(tcp_forward_init::Target::Tcp(TcpRelayTarget {
+ host: target_host,
+ port: u32::from(target_port),
+ })),
+ authorization_token,
+ },
+ )),
+ })
+ .await
+ .map_err(|_| ForwardTcpConnectionError::transient("failed to initialize forward stream"))?;
+
+ let mut response = match client.forward_tcp(ReceiverStream::new(rx)).await {
+ Ok(response) => response.into_inner(),
+ Err(status) => {
+ let err = ForwardTcpConnectionError::from_status(status);
+ drain_and_shutdown_local_socket(socket).await;
+ return Err(err);
+ }
+ };
+
+ let (mut local_read, mut local_write) = socket.into_split();
+
+ let to_gateway = tokio::spawn(async move {
+ let mut buf = vec![0u8; 64 * 1024];
+ loop {
+ let n = local_read.read(&mut buf).await?;
+ if n == 0 {
+ break;
+ }
+ if tx
+ .send(TcpForwardFrame {
+ payload: Some(openshell_core::proto::tcp_forward_frame::Payload::Data(
+ buf[..n].to_vec(),
+ )),
+ })
+ .await
+ .is_err()
+ {
+ break;
+ }
+ }
+ Ok::<(), std::io::Error>(())
+ });
+
+ while let Some(frame) = response
+ .message()
+ .await
+ .map_err(ForwardTcpConnectionError::from_status)?
+ {
+ let Some(openshell_core::proto::tcp_forward_frame::Payload::Data(data)) = frame.payload
+ else {
+ continue;
+ };
+ if data.is_empty() {
+ continue;
+ }
+ local_write
+ .write_all(&data)
+ .await
+ .map_err(|err| ForwardTcpConnectionError::transient(err.to_string()))?;
+ }
+
+ let _ = local_write.shutdown().await;
+ to_gateway.abort();
+ Ok(())
+}
+
+async fn drain_and_shutdown_local_socket(mut socket: tokio::net::TcpStream) {
+ use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
+ let mut buf = [0u8; 4096];
+ loop {
+ match tokio::time::timeout(Duration::from_millis(25), socket.read(&mut buf)).await {
+ Ok(Ok(0)) | Err(_) => break,
+ Ok(Ok(_)) => continue,
+ Ok(Err(_)) => break,
+ }
+ }
+ let _ = socket.shutdown().await;
+}
+
/// Print a single YAML line with dimmed keys and regular values.
fn print_yaml_line(line: &str) {
// Find leading whitespace
@@ -3371,7 +3662,7 @@ async fn auto_create_provider(
id: String::new(),
name: exact_name.to_string(),
created_at_ms: 0,
- labels: std::collections::HashMap::new(),
+ labels: HashMap::new(),
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
@@ -3411,7 +3702,7 @@ async fn auto_create_provider(
id: String::new(),
name: name.clone(),
created_at_ms: 0,
- labels: std::collections::HashMap::new(),
+ labels: HashMap::new(),
}),
r#type: provider_type.to_string(),
credentials: discovered.credentials.clone(),
@@ -3569,7 +3860,7 @@ pub async fn provider_create(
id: String::new(),
name: name.to_string(),
created_at_ms: 0,
- labels: std::collections::HashMap::new(),
+ labels: HashMap::new(),
}),
r#type: provider_type,
credentials: credential_map,
@@ -3759,7 +4050,7 @@ pub async fn provider_update(
id: String::new(),
name: name.to_string(),
created_at_ms: 0,
- labels: std::collections::HashMap::new(),
+ labels: HashMap::new(),
}),
r#type: String::new(),
credentials: credential_map,
diff --git a/crates/openshell-cli/src/ssh.rs b/crates/openshell-cli/src/ssh.rs
index f883d9684..830dfc19c 100644
--- a/crates/openshell-cli/src/ssh.rs
+++ b/crates/openshell-cli/src/ssh.rs
@@ -3,30 +3,30 @@
//! SSH connection and proxy utilities.
-use crate::tls::{TlsOptions, build_rustls_config, grpc_client, require_tls_materials};
+use crate::tls::{TlsOptions, grpc_client};
use miette::{IntoDiagnostic, Result, WrapErr};
#[cfg(unix)]
use nix::sys::signal::{SaFlags, SigAction, SigHandler, SigSet, Signal, sigaction};
use openshell_core::ObjectId;
use openshell_core::forward::{
- build_proxy_command, find_ssh_forward_pid, resolve_ssh_gateway, shell_escape,
- validate_ssh_session_response, write_forward_pid,
+ build_proxy_command, find_ssh_forward_pid, format_gateway_url, resolve_ssh_gateway,
+ shell_escape, validate_ssh_session_response, write_forward_pid,
+};
+use openshell_core::proto::{
+ CreateSshSessionRequest, GetSandboxRequest, SshRelayTarget, TcpForwardFrame, TcpForwardInit,
+ tcp_forward_init,
};
-use openshell_core::proto::{CreateSshSessionRequest, GetSandboxRequest};
use owo_colors::OwoColorize;
-use rustls::pki_types::ServerName;
use std::fs;
use std::io::IsTerminal;
#[cfg(unix)]
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
-use std::sync::Arc;
use std::time::Duration;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
-use tokio::net::TcpStream;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command as TokioCommand;
-use tokio_rustls::TlsConnector;
+use tokio_stream::wrappers::ReceiverStream;
const FOREGROUND_FORWARD_STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(2);
@@ -100,8 +100,7 @@ async fn ssh_session_config(
// external tunnel endpoint (the cluster URL), not the server's internal
// scheme/host/port which may be plaintext HTTP on 127.0.0.1.
let gateway_url = if tls.is_bearer_auth() {
- let base = server.trim_end_matches('/');
- format!("{base}{}", session.connect_path)
+ server.trim_end_matches('/').to_string()
} else {
// If the server returned a loopback gateway address, override it with the
// cluster endpoint's host. This handles the case where the server defaults
@@ -110,10 +109,7 @@ async fn ssh_session_config(
let gateway_port_u16 = session.gateway_port as u16;
let (gateway_host, gateway_port) =
resolve_ssh_gateway(&session.gateway_host, gateway_port_u16, server);
- format!(
- "{}://{}:{}{}",
- session.gateway_scheme, gateway_host, gateway_port, session.connect_path
- )
+ format_gateway_url(&session.gateway_scheme, &gateway_host, gateway_port)
};
let gateway_name = tls
.gateway_name()
@@ -793,11 +789,86 @@ pub async fn sandbox_ssh_proxy(
token: &str,
tls: &TlsOptions,
) -> Result<()> {
+ let server = grpc_server_from_ssh_gateway_url(gateway_url)?;
+ let mut client = grpc_client(&server, tls).await?;
+
+ let (tx, rx) = tokio::sync::mpsc::channel::(16);
+ tx.send(TcpForwardFrame {
+ payload: Some(openshell_core::proto::tcp_forward_frame::Payload::Init(
+ TcpForwardInit {
+ sandbox_id: sandbox_id.to_string(),
+ service_id: format!("ssh-proxy:{sandbox_id}"),
+ target: Some(tcp_forward_init::Target::Ssh(SshRelayTarget {})),
+ authorization_token: token.to_string(),
+ },
+ )),
+ })
+ .await
+ .map_err(|_| miette::miette!("failed to initialize SSH forward stream"))?;
+
+ let mut response = client
+ .forward_tcp(ReceiverStream::new(rx))
+ .await
+ .into_diagnostic()?
+ .into_inner();
+
+ let stdin = tokio::io::stdin();
+ let stdout = tokio::io::stdout();
+
+ let to_remote = tokio::spawn(async move {
+ let mut stdin = stdin;
+ let mut buf = vec![0u8; 64 * 1024];
+ loop {
+ let Ok(n) = stdin.read(&mut buf).await else {
+ break;
+ };
+ if n == 0 {
+ break;
+ }
+ if tx
+ .send(TcpForwardFrame {
+ payload: Some(openshell_core::proto::tcp_forward_frame::Payload::Data(
+ buf[..n].to_vec(),
+ )),
+ })
+ .await
+ .is_err()
+ {
+ break;
+ }
+ }
+ });
+ let from_remote = tokio::spawn(async move {
+ let mut stdout = stdout;
+ loop {
+ let frame = match response.message().await {
+ Ok(Some(frame)) => frame,
+ Ok(None) | Err(_) => break,
+ };
+ let Some(openshell_core::proto::tcp_forward_frame::Payload::Data(data)) = frame.payload
+ else {
+ continue;
+ };
+ if data.is_empty() {
+ continue;
+ }
+ if stdout.write_all(&data).await.is_err() {
+ break;
+ }
+ let _ = stdout.flush().await;
+ }
+ });
+ let _ = from_remote.await;
+ to_remote.abort();
+
+ Ok(())
+}
+
+fn grpc_server_from_ssh_gateway_url(gateway_url: &str) -> Result {
let url: url::Url = gateway_url
.parse()
.into_diagnostic()
.wrap_err("invalid gateway URL")?;
-
let scheme = url.scheme();
let gateway_host = url
.host_str()
@@ -805,76 +876,7 @@ pub async fn sandbox_ssh_proxy(
let gateway_port = url
.port_or_known_default()
.ok_or_else(|| miette::miette!("gateway URL missing port"))?;
- let connect_path = url.path();
-
- let request = format!(
- "CONNECT {connect_path} HTTP/1.1\r\nHost: {gateway_host}\r\nX-Sandbox-Id: {sandbox_id}\r\nX-Sandbox-Token: {token}\r\n\r\n"
- );
-
- // The gateway returns 412 (Precondition Failed) when the sandbox pod
- // exists but hasn't reached Ready phase yet. This is a transient state
- // after sandbox allocation — retry with backoff instead of failing
- // immediately.
- const MAX_CONNECT_WAIT: Duration = Duration::from_secs(60);
- const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
-
- let start = std::time::Instant::now();
- let mut backoff = INITIAL_BACKOFF;
- let mut buf_stream;
-
- loop {
- let mut stream: Box =
- connect_gateway(scheme, gateway_host, gateway_port, tls).await?;
- stream
- .write_all(request.as_bytes())
- .await
- .into_diagnostic()?;
-
- // Wrap in a BufReader **before** reading the HTTP response. The gateway
- // may send the 200 OK response and the first SSH protocol bytes in the
- // same TCP segment / WebSocket frame. A plain `read()` would consume
- // those SSH bytes into our buffer and discard them, causing SSH to see a
- // truncated protocol banner and exit with code 255. BufReader ensures
- // any bytes read past the `\r\n\r\n` header boundary stay buffered and
- // are returned by subsequent reads during the bidirectional copy phase.
- buf_stream = BufReader::new(stream);
- let status = read_connect_status(&mut buf_stream).await?;
- if status == 200 {
- break;
- }
- if status == 412 && start.elapsed() < MAX_CONNECT_WAIT {
- tracing::debug!(
- elapsed = ?start.elapsed(),
- "sandbox not yet ready (HTTP 412), retrying in {backoff:?}"
- );
- tokio::time::sleep(backoff).await;
- backoff = (backoff * 2).min(Duration::from_secs(8));
- continue;
- }
- return Err(miette::miette!(
- "gateway CONNECT failed with status {status}"
- ));
- }
-
- let (reader, writer) = tokio::io::split(buf_stream);
- let stdin = tokio::io::stdin();
- let stdout = tokio::io::stdout();
-
- // Spawn both copy directions as independent tasks. Using separate spawned
- // tasks (instead of try_join!/select!) ensures that when one direction
- // completes or errors, the other continues independently until it also
- // finishes. This is critical: when the remote side closes the connection,
- // we must keep the stdin→gateway copy alive so SSH can finish sending its
- // protocol-close packets, and vice-versa.
- let to_remote = tokio::spawn(copy_ignoring_errors(stdin, writer));
- let from_remote = tokio::spawn(copy_ignoring_errors(reader, stdout));
- let _ = from_remote.await;
- // Once the remote→stdout direction is done, SSH has received all the data
- // it needs. Drop the stdin→gateway task – SSH will close its pipe when
- // it's done regardless.
- to_remote.abort();
-
- Ok(())
+ Ok(format_gateway_url(scheme, gateway_host, gateway_port))
}
/// Run the SSH proxy in "name mode": create a session on the fly, then proxy.
@@ -1095,97 +1097,6 @@ pub fn print_ssh_config(gateway: &str, name: &str) {
print!("{}", render_ssh_config(gateway, name));
}
-/// Copy all bytes from `reader` to `writer`, flushing on completion.
-/// Errors are intentionally discarded – connection teardown errors are
-/// expected during normal SSH session shutdown.
-async fn copy_ignoring_errors(mut reader: R, mut writer: W)
-where
- R: AsyncRead + Unpin,
- W: AsyncWrite + Unpin,
-{
- let _ = tokio::io::copy(&mut reader, &mut writer).await;
- let _ = AsyncWriteExt::flush(&mut writer).await;
- let _ = AsyncWriteExt::shutdown(&mut writer).await;
-}
-
-async fn connect_gateway(
- scheme: &str,
- host: &str,
- port: u16,
- tls: &TlsOptions,
-) -> Result> {
- // When using edge bearer auth, route through the WebSocket tunnel proxy
- // regardless of the origin scheme. The proxy handles edge auth headers
- // and TLS termination at the edge; the origin may be plaintext HTTP
- // behind the tunnel.
- if tls.is_bearer_auth() {
- let token = tls
- .edge_token
- .as_deref()
- .ok_or_else(|| miette::miette!("edge token required for tunnel"))?;
- let gateway_url = format!("https://{host}:{port}");
- let proxy = crate::edge_tunnel::start_tunnel_proxy(&gateway_url, token).await?;
- let tcp = TcpStream::connect(proxy.local_addr)
- .await
- .into_diagnostic()?;
- tcp.set_nodelay(true).into_diagnostic()?;
- return Ok(Box::new(tcp));
- }
-
- let tcp = TcpStream::connect((host, port)).await.into_diagnostic()?;
- tcp.set_nodelay(true).into_diagnostic()?;
- if scheme.eq_ignore_ascii_case("https") {
- let materials = require_tls_materials(&format!("https://{host}:{port}"), tls)?;
- let config = build_rustls_config(&materials)?;
- let connector = TlsConnector::from(Arc::new(config));
- let server_name = ServerName::try_from(host.to_string())
- .map_err(|_| miette::miette!("invalid server name: {host}"))?;
- let tls = connector
- .connect(server_name, tcp)
- .await
- .into_diagnostic()?;
- Ok(Box::new(tls))
- } else {
- Ok(Box::new(tcp))
- }
-}
-
-/// Read exactly the HTTP response status line and headers up to `\r\n\r\n`.
-///
-/// Uses byte-at-a-time reads so that the caller's `BufReader` retains any
-/// bytes that arrived after the header boundary (e.g. the SSH protocol
-/// banner that the gateway may send in the same TCP segment).
-async fn read_connect_status(stream: &mut R) -> Result {
- let mut buf = Vec::new();
- let mut byte = [0u8; 1];
- loop {
- let n = stream.read(&mut byte).await.into_diagnostic()?;
- if n == 0 {
- break;
- }
- buf.push(byte[0]);
- if buf.len() >= 4 && &buf[buf.len() - 4..] == b"\r\n\r\n" {
- break;
- }
- if buf.len() > 8192 {
- break;
- }
- }
- let text = String::from_utf8_lossy(&buf);
- let line = text.lines().next().unwrap_or("");
- let status = line
- .split_whitespace()
- .nth(1)
- .unwrap_or("0")
- .parse::()
- .unwrap_or(0);
- Ok(status)
-}
-
-trait ProxyStream: AsyncRead + AsyncWrite + Unpin + Send {}
-
-impl ProxyStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/openshell-cli/src/tls.rs b/crates/openshell-cli/src/tls.rs
index cd6483530..dcb282d83 100644
--- a/crates/openshell-cli/src/tls.rs
+++ b/crates/openshell-cli/src/tls.rs
@@ -253,6 +253,7 @@ pub async fn build_channel(server: &str, tls: &TlsOptions) -> Result {
let endpoint = Endpoint::from_shared(server.to_string())
.into_diagnostic()?
.connect_timeout(Duration::from_secs(10))
+ .http2_adaptive_window(true)
.http2_keep_alive_interval(Duration::from_secs(10))
.keep_alive_while_idle(true);
return endpoint.connect().await.into_diagnostic();
@@ -272,6 +273,7 @@ pub async fn build_channel(server: &str, tls: &TlsOptions) -> Result {
let endpoint = Endpoint::from_shared(local_url)
.into_diagnostic()?
.connect_timeout(Duration::from_secs(10))
+ .http2_adaptive_window(true)
.http2_keep_alive_interval(Duration::from_secs(10))
.keep_alive_while_idle(true);
return endpoint.connect().await.into_diagnostic();
@@ -280,6 +282,7 @@ pub async fn build_channel(server: &str, tls: &TlsOptions) -> Result {
let mut endpoint = Endpoint::from_shared(server.to_string())
.into_diagnostic()?
.connect_timeout(Duration::from_secs(10))
+ .http2_adaptive_window(true)
.http2_keep_alive_interval(Duration::from_secs(10))
.keep_alive_while_idle(true);
diff --git a/crates/openshell-cli/tests/ensure_providers_integration.rs b/crates/openshell-cli/tests/ensure_providers_integration.rs
index 34485377d..2760e20bc 100644
--- a/crates/openshell-cli/tests/ensure_providers_integration.rs
+++ b/crates/openshell-cli/tests/ensure_providers_integration.rs
@@ -459,6 +459,17 @@ impl OpenShell for TestOpenShell {
) -> Result, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented in test"))
}
+
+ type ForwardTcpStream = tokio_stream::wrappers::ReceiverStream<
+ Result,
+ >;
+
+ async fn forward_tcp(
+ &self,
+ _request: tonic::Request>,
+ ) -> Result, tonic::Status> {
+ Err(tonic::Status::unimplemented("not implemented in test"))
+ }
}
// ── TLS helpers ──────────────────────────────────────────────────────
diff --git a/crates/openshell-cli/tests/mtls_integration.rs b/crates/openshell-cli/tests/mtls_integration.rs
index e78c91578..307e339ce 100644
--- a/crates/openshell-cli/tests/mtls_integration.rs
+++ b/crates/openshell-cli/tests/mtls_integration.rs
@@ -346,6 +346,17 @@ impl OpenShell for TestOpenShell {
) -> Result, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented in test"))
}
+
+ type ForwardTcpStream = tokio_stream::wrappers::ReceiverStream<
+ Result,
+ >;
+
+ async fn forward_tcp(
+ &self,
+ _request: tonic::Request>,
+ ) -> Result, tonic::Status> {
+ Err(tonic::Status::unimplemented("not implemented in test"))
+ }
}
fn build_ca() -> (Certificate, KeyPair) {
diff --git a/crates/openshell-cli/tests/provider_commands_integration.rs b/crates/openshell-cli/tests/provider_commands_integration.rs
index 9bda696c1..f9332142e 100644
--- a/crates/openshell-cli/tests/provider_commands_integration.rs
+++ b/crates/openshell-cli/tests/provider_commands_integration.rs
@@ -409,6 +409,17 @@ impl OpenShell for TestOpenShell {
) -> Result, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented in test"))
}
+
+ type ForwardTcpStream = tokio_stream::wrappers::ReceiverStream<
+ Result,
+ >;
+
+ async fn forward_tcp(
+ &self,
+ _request: tonic::Request>,
+ ) -> Result, tonic::Status> {
+ Err(tonic::Status::unimplemented("not implemented in test"))
+ }
}
fn install_rustls_provider() {
diff --git a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs
index 79d482fdb..9ffd36834 100644
--- a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs
+++ b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs
@@ -198,7 +198,6 @@ impl OpenShell for TestOpenShell {
gateway_scheme: "https".to_string(),
gateway_host: "localhost".to_string(),
gateway_port: 443,
- connect_path: "/connect/ssh".to_string(),
..CreateSshSessionResponse::default()
}))
}
@@ -433,6 +432,17 @@ impl OpenShell for TestOpenShell {
) -> Result, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented in test"))
}
+
+ type ForwardTcpStream = tokio_stream::wrappers::ReceiverStream<
+ Result,
+ >;
+
+ async fn forward_tcp(
+ &self,
+ _request: tonic::Request>,
+ ) -> Result, tonic::Status> {
+ Err(tonic::Status::unimplemented("not implemented in test"))
+ }
}
fn install_rustls_provider() {
@@ -732,6 +742,9 @@ async fn sandbox_create_keeps_sandbox_with_forwarding() {
let _env = test_env(&fake_ssh_dir, &xdg_dir);
let tls = test_tls(&server);
install_fake_ssh(&fake_ssh_dir);
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let forward_port = listener.local_addr().unwrap().port();
+ drop(listener);
run::sandbox_create(
&server.endpoint,
@@ -746,7 +759,7 @@ async fn sandbox_create_keeps_sandbox_with_forwarding() {
None,
&[],
None,
- Some(openshell_core::forward::ForwardSpec::new(8080)),
+ Some(openshell_core::forward::ForwardSpec::new(forward_port)),
&["echo".to_string(), "OK".to_string()],
Some(false),
Some(false),
diff --git a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs
index 5c463de9e..9e9bb26b1 100644
--- a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs
+++ b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs
@@ -370,6 +370,17 @@ impl OpenShell for TestOpenShell {
) -> Result, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented in test"))
}
+
+ type ForwardTcpStream = tokio_stream::wrappers::ReceiverStream<
+ Result,
+ >;
+
+ async fn forward_tcp(
+ &self,
+ _request: tonic::Request>,
+ ) -> Result, tonic::Status> {
+ Err(tonic::Status::unimplemented("not implemented in test"))
+ }
}
// ── helpers ───────────────────────────────────────────────────────────
diff --git a/crates/openshell-core/src/config.rs b/crates/openshell-core/src/config.rs
index 40a87fc41..a08f9bed4 100644
--- a/crates/openshell-core/src/config.rs
+++ b/crates/openshell-core/src/config.rs
@@ -151,10 +151,6 @@ pub struct Config {
#[serde(default = "default_ssh_gateway_port")]
pub ssh_gateway_port: u16,
- /// Path for SSH CONNECT/upgrade requests.
- #[serde(default = "default_ssh_connect_path")]
- pub ssh_connect_path: String,
-
/// SSH listen port inside sandbox containers that expose a TCP endpoint.
#[serde(default = "default_sandbox_ssh_port")]
pub sandbox_ssh_port: u16,
@@ -240,7 +236,6 @@ impl Config {
grpc_endpoint: String::new(),
ssh_gateway_host: default_ssh_gateway_host(),
ssh_gateway_port: default_ssh_gateway_port(),
- ssh_connect_path: default_ssh_connect_path(),
sandbox_ssh_port: default_sandbox_ssh_port(),
sandbox_ssh_socket_path: default_sandbox_ssh_socket_path(),
ssh_handshake_secret: String::new(),
@@ -336,13 +331,6 @@ impl Config {
self
}
- /// Create a new configuration with the SSH connect path.
- #[must_use]
- pub fn with_ssh_connect_path(mut self, path: impl Into) -> Self {
- self.ssh_connect_path = path.into();
- self
- }
-
/// Create a new configuration with the sandbox SSH port.
#[must_use]
pub const fn with_sandbox_ssh_port(mut self, port: u16) -> Self {
@@ -414,10 +402,6 @@ const fn default_ssh_gateway_port() -> u16 {
DEFAULT_SERVER_PORT
}
-fn default_ssh_connect_path() -> String {
- "/connect/ssh".to_string()
-}
-
fn default_sandbox_ssh_socket_path() -> String {
"/run/openshell/ssh.sock".to_string()
}
diff --git a/crates/openshell-core/src/forward.rs b/crates/openshell-core/src/forward.rs
index e6c7d2f7c..655184465 100644
--- a/crates/openshell-core/src/forward.rs
+++ b/crates/openshell-core/src/forward.rs
@@ -470,6 +470,20 @@ pub fn resolve_ssh_gateway(
(gateway_host.to_string(), gateway_port)
}
+/// Format a gateway URL, bracketing IPv6 literals when needed.
+pub fn format_gateway_url(scheme: &str, host: &str, port: u16) -> String {
+ let host = if host
+ .parse::()
+ .is_ok_and(|ip| ip.is_ipv6())
+ && !host.starts_with('[')
+ {
+ format!("[{host}]")
+ } else {
+ host.to_string()
+ };
+ format!("{scheme}://{host}:{port}")
+}
+
/// Shell-escape a value for use inside a `ProxyCommand` string.
pub fn shell_escape(value: &str) -> String {
if value.is_empty() {
@@ -526,14 +540,11 @@ pub enum SshSessionResponseError {
InvalidScheme,
#[error("gateway_port must be in range 1..=65535")]
InvalidPort,
- #[error("connect_path must start with '/'")]
- ConnectPathNotAbsolute,
}
const MAX_SANDBOX_ID_LEN: usize = 128;
const MAX_TOKEN_LEN: usize = 4096;
const MAX_GATEWAY_HOST_LEN: usize = 253;
-const MAX_CONNECT_PATH_LEN: usize = 2048;
const MAX_FINGERPRINT_LEN: usize = 256;
fn is_sandbox_id_byte(b: u8) -> bool {
@@ -552,33 +563,6 @@ fn is_gateway_host_byte(b: u8) -> bool {
b.is_ascii_alphanumeric() || matches!(b, b'.' | b'-' | b':' | b'[' | b']')
}
-fn is_connect_path_byte(b: u8) -> bool {
- // RFC 3986 path charset (pchar) without `?`, `#`, space, backtick, or
- // backslash. `%` is permitted so percent-encoded segments round-trip.
- b.is_ascii_alphanumeric()
- || matches!(
- b,
- b'-' | b'.'
- | b'_'
- | b'~'
- | b'!'
- | b'$'
- | b'&'
- | b'\''
- | b'('
- | b')'
- | b'*'
- | b'+'
- | b','
- | b';'
- | b'='
- | b':'
- | b'@'
- | b'/'
- | b'%'
- )
-}
-
fn is_fingerprint_byte(b: u8) -> bool {
b.is_ascii_alphanumeric() || matches!(b, b':' | b'+' | b'/' | b'=' | b'-')
}
@@ -613,25 +597,6 @@ pub fn validate_ssh_session_response(
if resp.gateway_port == 0 || resp.gateway_port > u32::from(u16::MAX) {
return Err(SshSessionResponseError::InvalidPort);
}
- if resp.connect_path.is_empty() {
- return Err(SshSessionResponseError::Empty {
- field: "connect_path",
- });
- }
- if !resp.connect_path.starts_with('/') {
- return Err(SshSessionResponseError::ConnectPathNotAbsolute);
- }
- if resp.connect_path.len() > MAX_CONNECT_PATH_LEN {
- return Err(SshSessionResponseError::TooLong {
- field: "connect_path",
- max: MAX_CONNECT_PATH_LEN,
- });
- }
- if !resp.connect_path.bytes().all(is_connect_path_byte) {
- return Err(SshSessionResponseError::InvalidChars {
- field: "connect_path",
- });
- }
if !resp.host_key_fingerprint.is_empty() {
if resp.host_key_fingerprint.len() > MAX_FINGERPRINT_LEN {
return Err(SshSessionResponseError::TooLong {
@@ -736,6 +701,26 @@ mod tests {
assert_eq!(port, 8080);
}
+ #[test]
+ fn format_gateway_url_brackets_ipv6_literals() {
+ assert_eq!(
+ format_gateway_url("https", "::1", 8080),
+ "https://[::1]:8080"
+ );
+ }
+
+ #[test]
+ fn format_gateway_url_leaves_dns_and_bracketed_ipv6_unchanged() {
+ assert_eq!(
+ format_gateway_url("https", "gateway.example.com", 443),
+ "https://gateway.example.com:443"
+ );
+ assert_eq!(
+ format_gateway_url("https", "[::1]", 8080),
+ "https://[::1]:8080"
+ );
+ }
+
#[test]
fn shell_escape_empty() {
assert_eq!(shell_escape(""), "''");
@@ -758,7 +743,6 @@ mod tests {
gateway_scheme: "https".to_string(),
gateway_host: "gateway.example.com".to_string(),
gateway_port: 443,
- connect_path: "/connect/ssh".to_string(),
host_key_fingerprint: String::new(),
expires_at_ms: 0,
}
@@ -858,33 +842,6 @@ mod tests {
}
}
- #[test]
- fn validate_ssh_session_response_rejects_connect_path_without_leading_slash() {
- let mut r = valid_session_response();
- r.connect_path = "connect/ssh".to_string();
- assert!(matches!(
- validate_ssh_session_response(&r),
- Err(SshSessionResponseError::ConnectPathNotAbsolute)
- ));
- }
-
- #[test]
- fn validate_ssh_session_response_rejects_injected_connect_path() {
- // `$`, `(`, `)` are valid RFC 3986 sub-delims (pchar) so the validator
- // permits them; shell_escape is the second defensive layer. The
- // following characters are rejected at the validator boundary because
- // they are either unambiguously hostile in a shell context or invalid
- // per RFC 3986 in the path component.
- for bad in ["/x`id`y", "/x y", "/x\nb", "/x\\b", "/x?q=1", "/x#frag"] {
- let mut r = valid_session_response();
- r.connect_path = bad.to_string();
- assert!(
- validate_ssh_session_response(&r).is_err(),
- "expected reject for connect_path={bad:?}"
- );
- }
- }
-
#[test]
fn build_proxy_command_escapes_shell_metacharacters() {
// Attacker-controlled values in every escapable position.
diff --git a/crates/openshell-ocsf/src/format/shorthand.rs b/crates/openshell-ocsf/src/format/shorthand.rs
index 7e2296de9..85424cb67 100644
--- a/crates/openshell-ocsf/src/format/shorthand.rs
+++ b/crates/openshell-ocsf/src/format/shorthand.rs
@@ -62,6 +62,7 @@ pub fn severity_tag(severity_id: u8) -> &'static str {
/// Max length for the reason text in `[reason:...]` before truncation.
const MAX_REASON_LEN: usize = 80;
+const MAX_MESSAGE_LEN: usize = 120;
/// Format a `[reason:...]` tag from `status_detail` (or `message` fallback)
/// for denied events. Returns an empty string if neither field is set.
@@ -81,6 +82,19 @@ fn reason_tag(base: &BaseEventData) -> String {
}
}
+fn message_tag(base: &BaseEventData) -> String {
+ let text = base.message.as_deref().unwrap_or("");
+ if text.is_empty() {
+ return String::new();
+ }
+ let text = text.replace(['\n', '\r'], " ");
+ if text.len() > MAX_MESSAGE_LEN {
+ format!(" [msg:{}...]", &text[..MAX_MESSAGE_LEN])
+ } else {
+ format!(" [msg:{text}]")
+ }
+}
+
impl OcsfEvent {
/// Produce the single-line shorthand for `openshell.log` and gRPC log push.
///
@@ -141,7 +155,13 @@ impl OcsfEvent {
(false, true) => format!(" {action}"),
(false, false) => format!(" {action}{arrow}"),
};
- format!("NET:{activity} {sev}{detail}{rule_ctx}{reason_ctx}")
+ let message_ctx =
+ if detail.is_empty() && rule_ctx.is_empty() && reason_ctx.is_empty() {
+ message_tag(&e.base)
+ } else {
+ String::new()
+ };
+ format!("NET:{activity} {sev}{detail}{rule_ctx}{reason_ctx}{message_ctx}")
}
Self::HttpActivity(e) => {
@@ -542,6 +562,33 @@ mod tests {
);
}
+ #[test]
+ fn test_network_activity_shorthand_shows_message_when_no_key_fields() {
+ let event = OcsfEvent::NetworkActivity(NetworkActivityEvent {
+ base: {
+ let mut b = base(4001, "Network Activity", 4, "Network Activity", 1, "Open");
+ b.set_message("relay open (channel_id=ch-42)");
+ b
+ },
+ src_endpoint: None,
+ dst_endpoint: None,
+ proxy_endpoint: None,
+ actor: None,
+ firewall_rule: None,
+ connection_info: None,
+ action: None,
+ disposition: None,
+ observation_point_id: None,
+ is_src_dst_assignment_known: None,
+ });
+
+ let shorthand = event.format_shorthand();
+ assert_eq!(
+ shorthand,
+ "NET:OPEN [INFO] [msg:relay open (channel_id=ch-42)]"
+ );
+ }
+
#[test]
fn test_http_activity_shorthand_denied_shows_reason() {
let mut b = base(4002, "HTTP Activity", 4, "Network Activity", 99, "Other");
diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs
index 34ee80bb5..e6092c537 100644
--- a/crates/openshell-sandbox/src/lib.rs
+++ b/crates/openshell-sandbox/src/lib.rs
@@ -685,7 +685,7 @@ pub async fn run_sandbox(
sandbox_id.as_ref(),
ssh_socket_path.as_ref(),
) {
- supervisor_session::spawn(endpoint.clone(), id.clone(), socket.clone());
+ supervisor_session::spawn(endpoint.clone(), id.clone(), socket.clone(), ssh_netns_fd);
info!("supervisor session task spawned");
}
diff --git a/crates/openshell-sandbox/src/sandbox/linux/netns.rs b/crates/openshell-sandbox/src/sandbox/linux/netns.rs
index bbd02255f..0ac8b88b0 100644
--- a/crates/openshell-sandbox/src/sandbox/linux/netns.rs
+++ b/crates/openshell-sandbox/src/sandbox/linux/netns.rs
@@ -11,7 +11,7 @@ use miette::{IntoDiagnostic, Result};
use std::net::IpAddr;
use std::os::unix::io::RawFd;
use std::process::Command;
-use tracing::{debug, info, warn};
+use tracing::{debug, warn};
use uuid::Uuid;
/// Default subnet for sandbox networking.
diff --git a/crates/openshell-sandbox/src/supervisor_session.rs b/crates/openshell-sandbox/src/supervisor_session.rs
index 490a0cba7..49c52f9c2 100644
--- a/crates/openshell-sandbox/src/supervisor_session.rs
+++ b/crates/openshell-sandbox/src/supervisor_session.rs
@@ -4,24 +4,28 @@
//! Persistent supervisor-to-gateway session.
//!
//! Maintains a long-lived `ConnectSupervisor` bidirectional gRPC stream to the
-//! gateway. When the gateway sends `RelayOpen`, the supervisor initiates a
-//! `RelayStream` gRPC call (a new HTTP/2 stream multiplexed over the same
-//! TCP+TLS connection as the control stream) and bridges it to the local SSH
-//! daemon. The supervisor is a dumb byte bridge — it has no protocol awareness
-//! of the SSH or NSSH1 bytes flowing through.
-
+//! gateway. When the gateway sends `RelayOpen`, the supervisor dials the
+//! requested local target, initiates a `RelayStream` gRPC call (a new HTTP/2
+//! stream multiplexed over the same TCP+TLS connection as the control stream),
+//! and bridges bytes. The supervisor is a dumb byte bridge after target
+//! selection — it has no protocol awareness of the bytes flowing through.
+
+use std::net::IpAddr;
+#[cfg(target_os = "linux")]
+use std::os::fd::RawFd;
use std::time::Duration;
use openshell_core::proto::open_shell_client::OpenShellClient;
use openshell_core::proto::{
- GatewayMessage, RelayFrame, RelayInit, SupervisorHeartbeat, SupervisorHello, SupervisorMessage,
- gateway_message, supervisor_message,
+ GatewayMessage, RelayFrame, RelayInit, RelayOpen, RelayOpenResult, SupervisorHeartbeat,
+ SupervisorHello, SupervisorMessage, TcpRelayTarget, gateway_message, relay_open,
+ supervisor_message,
};
use openshell_ocsf::{
- ActivityId, Endpoint, NetworkActivityBuilder, OcsfEvent, SandboxContext, SeverityId, StatusId,
- ocsf_emit,
+ ActivityId, ConnectionInfo, Endpoint, NetworkActivityBuilder, OcsfEvent, SandboxContext,
+ SeverityId, StatusId, ocsf_emit,
};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
@@ -91,33 +95,103 @@ fn session_failed_event(
.build()
}
-fn relay_open_event(ctx: &SandboxContext, channel_id: &str) -> OcsfEvent {
- NetworkActivityBuilder::new(ctx)
+fn relay_target_endpoint(open: &RelayOpen) -> Option {
+ let relay_open::Target::Tcp(target) = open.target.as_ref()? else {
+ return None;
+ };
+ let host = target.host.trim();
+ let port = u16::try_from(target.port).ok()?;
+ if let Ok(ip) = host.parse() {
+ Some(Endpoint::from_ip(ip, port))
+ } else {
+ Some(Endpoint::from_domain(host, port))
+ }
+}
+
+fn relay_target_kind(open: &RelayOpen) -> &'static str {
+ match open.target.as_ref() {
+ Some(relay_open::Target::Tcp(_)) => "tcp relay",
+ Some(relay_open::Target::Ssh(_)) | None => "ssh relay",
+ }
+}
+
+fn relay_target_message(
+ open: &RelayOpen,
+ state: &str,
+ ssh_socket_path: &std::path::Path,
+) -> String {
+ let target = match open.target.as_ref() {
+ Some(relay_open::Target::Tcp(target)) => {
+ format!("{}:{}", target.host.trim(), target.port)
+ }
+ Some(relay_open::Target::Ssh(_)) | None => {
+ format!("unix:{}", ssh_socket_path.display())
+ }
+ };
+
+ format!(
+ "{} {state} (channel_id={}, target={target})",
+ relay_target_kind(open),
+ open.channel_id
+ )
+}
+
+fn relay_open_event(
+ ctx: &SandboxContext,
+ open: &RelayOpen,
+ ssh_socket_path: &std::path::Path,
+) -> OcsfEvent {
+ let mut builder = NetworkActivityBuilder::new(ctx)
.activity(ActivityId::Open)
.severity(SeverityId::Informational)
.status(StatusId::Success)
- .message(format!("relay open (channel_id={channel_id})"))
- .build()
+ .message(relay_target_message(open, "open", ssh_socket_path));
+ if let Some(endpoint) = relay_target_endpoint(open) {
+ builder = builder
+ .dst_endpoint(endpoint)
+ .connection_info(ConnectionInfo::new("tcp"));
+ }
+ builder.build()
}
-fn relay_closed_event(ctx: &SandboxContext, channel_id: &str) -> OcsfEvent {
- NetworkActivityBuilder::new(ctx)
+fn relay_closed_event(
+ ctx: &SandboxContext,
+ open: &RelayOpen,
+ ssh_socket_path: &std::path::Path,
+) -> OcsfEvent {
+ let mut builder = NetworkActivityBuilder::new(ctx)
.activity(ActivityId::Close)
.severity(SeverityId::Informational)
.status(StatusId::Success)
- .message(format!("relay closed (channel_id={channel_id})"))
- .build()
+ .message(relay_target_message(open, "closed", ssh_socket_path));
+ if let Some(endpoint) = relay_target_endpoint(open) {
+ builder = builder
+ .dst_endpoint(endpoint)
+ .connection_info(ConnectionInfo::new("tcp"));
+ }
+ builder.build()
}
-fn relay_failed_event(ctx: &SandboxContext, channel_id: &str, error: &str) -> OcsfEvent {
- NetworkActivityBuilder::new(ctx)
+fn relay_failed_event(
+ ctx: &SandboxContext,
+ open: &RelayOpen,
+ ssh_socket_path: &std::path::Path,
+ error: &str,
+) -> OcsfEvent {
+ let mut builder = NetworkActivityBuilder::new(ctx)
.activity(ActivityId::Fail)
.severity(SeverityId::Low)
.status(StatusId::Failure)
.message(format!(
- "relay bridge failed (channel_id={channel_id}): {error}"
- ))
- .build()
+ "{}: {error}",
+ relay_target_message(open, "bridge failed", ssh_socket_path)
+ ));
+ if let Some(endpoint) = relay_target_endpoint(open) {
+ builder = builder
+ .dst_endpoint(endpoint)
+ .connection_info(ConnectionInfo::new("tcp"));
+ }
+ builder.build()
}
fn relay_close_from_gateway_event(
@@ -139,6 +213,10 @@ fn relay_close_from_gateway_event(
/// HTTP/2 frame size so each `RelayFrame::data` fits in one frame.
const RELAY_CHUNK_SIZE: usize = 16 * 1024;
+trait TargetStream: AsyncRead + AsyncWrite + Send + Unpin {}
+
+impl TargetStream for T where T: AsyncRead + AsyncWrite + Send + Unpin {}
+
fn map_stream_message(
message: Result