Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions docs/command-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
- `okdev target set [--pod <name> | --role <role>]`
- `okdev agent list`
- `okdev exec [session] [--shell /bin/bash] [--no-tty] [--pod <name> | --role <role> | --label <k=v>] [--exclude <pod>] [--container <name>] [--detach] [--timeout <duration>] [--log-dir <path>] [--no-prefix] [--fanout N] [-- command...]`
- `okdev jobs list [session] [--job-id <id>] [--container <name>] [--fanout N]`
- `okdev jobs logs <job-id> [session] [-f|--follow] [--container <name>] [--fanout N]`
- `okdev jobs stop <job-id> [session] [--container <name>] [--fanout N]`
- `okdev jobs wait <job-id> [session] [--container <name>] [--fanout N]`
- `okdev exec-jobs [session] [--job-id <id>] [--container <name>] [--fanout N]`
- `okdev cp [session] <src> <dst> [--all | --pod <name> | --role <role> | --label <k=v>] [--exclude <pod>] [--container <name>] [--fanout N]`
- `okdev logs [session] [--container <name> | --all] [--tail N] [--since 5m] [--follow] [--previous]`
- `okdev ssh [session] [--setup-key] [--user root] [--cmd "..."] [--no-tmux] [--forward-agent|--no-forward-agent]`
Expand Down Expand Up @@ -86,12 +91,13 @@
- `--fanout N`: maximum concurrent pod executions (default 16).
- `--pod`, `--role`, and `--label` are mutually exclusive.

### `okdev exec-jobs [session]`
### `okdev jobs list [session]`

- Lists detached `okdev exec --detach` jobs across the session's running pods.
- Reads job metadata from `/tmp/okdev-exec/*.json` in the target container.
- Shows pod, container, job id, pid, current state, exit code when available, and log path in table form for text output.
- State values: `running` (wrapper alive and user command in flight), `exited` (user command finished; `EXIT` column holds the captured status), and `orphaned` (metadata still says `running` but the pid has exited or been recycled - typically the wrapper was `SIGKILL`ed or the container was restarted before the completion metadata could be written).
- Text output is grouped by logical job id, with one row per detached launch showing job id, summarized state, pod count, earliest start time, and original command.
- JSON output includes both the logical job summary and the per-pod `podStates` records.
- State values: `running` (wrapper alive and user command in flight), `exited` (user command finished; exit code is recorded in `podStates` / JSON), and `orphaned` (metadata still says `running` but the pid has exited or been recycled - typically the wrapper was `SIGKILL`ed or the container was restarted before the completion metadata could be written). Grouped text summaries render forms such as `running(1/2)`, `exited(2/2)`, and `failed(1/2)`.
- When any pod fails to list its jobs, the command still prints the jobs it was able to collect from the healthy pods and reports the failures in a `FAILED:` footer (or an `errors` array in `--json` output); exit status is non-zero so scripts can detect partial failures.
- `--pod`: target specific pods by name (repeatable or comma-separated).
- `--role`: target pods by `okdev.io/workload-role` label (case-insensitive).
Expand All @@ -102,6 +108,30 @@
- `--fanout N`: maximum concurrent pod queries (default 16).
- `--ready-only`: inspect only pods that are already running.

### `okdev jobs logs <job-id> [session]`

- Streams the detached job's combined stdout/stderr aggregated across all pods in the logical job.
- Each line is prefixed by the pod short name so multi-pod output stays attributable.
- `-f` / `--follow`: keep following until every pod in the job reaches a terminal state.
- If some pod logs are unavailable, okdev still streams the logs it can read and reports the missing pods in a `FAILED:` footer before returning non-zero.

### `okdev jobs wait <job-id> [session]`

- Polls the detached job until all pod-local records are terminal.
- Returns success only when every pod exits cleanly.
- Returns non-zero when any pod exits non-zero, becomes `orphaned`, cannot be queried, or the job id does not exist.

### `okdev jobs stop <job-id> [session]`

- Stops every still-running pod in the logical job by sending `SIGTERM`, waiting 10 seconds, then sending `SIGKILL` to survivors.
- Prints which pods were signaled during the stop flow.
- Returns non-zero when any pod could not be queried or signaled, or if pods are still running after the stop attempt.

### `okdev exec-jobs [session]`

- Compatibility alias for `okdev jobs list [session]`.
- Uses the same grouped logical-job output and flags as `okdev jobs list`.

### `okdev cp [session] <src> <dst>`

- Copies files or directories between the local machine and session pods.
Expand Down
9 changes: 9 additions & 0 deletions internal/cli/command_ctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func TestCommandConstructorsExposeExpectedMetadata(t *testing.T) {
if cmd := newExecJobsCmd(&Options{}); cmd.Use != "exec-jobs [session]" || cmd.Flags().Lookup("pod") == nil || cmd.Flags().Lookup("role") == nil || cmd.Flags().Lookup("label") == nil || cmd.Flags().Lookup("exclude") == nil || cmd.Flags().Lookup("container") == nil || cmd.Flags().Lookup("fanout") == nil {
t.Fatalf("unexpected exec-jobs command shape")
}
if cmd := newJobsCmd(&Options{}); cmd.Use != "jobs" || cmd.Short == "" {
t.Fatalf("unexpected jobs command shape")
} else {
for _, sub := range []string{"list", "logs", "stop", "wait"} {
if _, _, err := cmd.Find([]string{sub}); err != nil {
t.Fatalf("expected jobs subcommand %q: %v", sub, err)
}
}
}
if cmd := newPortForwardCmd(&Options{}); cmd.Use != "port-forward [session] <local:remote>..." || cmd.Short == "" || cmd.Flags().Lookup("pod") == nil || cmd.Flags().Lookup("role") == nil || cmd.Flags().Lookup("address") == nil || cmd.Flags().Lookup("ready-only") == nil {
t.Fatalf("unexpected port-forward command shape")
}
Expand Down
179 changes: 104 additions & 75 deletions internal/cli/exec_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,8 @@ import (
"github.com/acmore/okdev/internal/connect"
"github.com/acmore/okdev/internal/kube"
"github.com/acmore/okdev/internal/output"
"github.com/spf13/cobra"
)

func newExecJobsCmd(opts *Options) *cobra.Command {
var podNames []string
var role string
var labels []string
var exclude []string
var container string
var jobID string
var fanout int
var readyOnly bool

cmd := &cobra.Command{
Use: "exec-jobs [session]",
Short: "List detached exec jobs",
Args: validateExecArgs,
ValidArgsFunction: sessionCompletionFunc(opts),
RunE: func(cmd *cobra.Command, args []string) error {
applySessionArg(opts, args)
cc, err := resolveCommandContext(opts, resolveSessionName)
if err != nil {
return err
}
if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil {
return err
}

pods, err := selectSessionPods(cmd.Context(), cc, podNames, role, labels, exclude, readyOnly)
if err != nil {
return err
}
targetContainer := container
if targetContainer == "" {
targetContainer = resolveTargetContainer(cc.cfg)
}
effectiveFanout := fanout
if effectiveFanout <= 0 {
effectiveFanout = pdshDefaultFanout
}
return runExecJobs(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, jobID, effectiveFanout, cmd.OutOrStdout(), strings.EqualFold(opts.Output, "json"))
},
}
cmd.Flags().StringSliceVar(&podNames, "pod", nil, "Target specific pods by name (repeatable/comma-separated)")
cmd.Flags().StringVar(&role, "role", "", "Target pods by workload role")
cmd.Flags().StringSliceVar(&labels, "label", nil, "Target pods by label key=value (repeatable)")
cmd.Flags().StringSliceVar(&exclude, "exclude", nil, "Exclude specific pods (repeatable/comma-separated)")
cmd.Flags().StringVar(&container, "container", "", "Override target container")
cmd.Flags().StringVar(&jobID, "job-id", "", "Filter to a specific detached exec job id")
cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries")
cmd.Flags().BoolVar(&readyOnly, "ready-only", false, "Query only pods that are already running (skip readiness check)")
return cmd
}

type execJobView struct {
Pod string `json:"pod"`
Container string `json:"container"`
Expand All @@ -80,19 +28,28 @@ type execJobView struct {
Command string `json:"command"`
}

type logicalExecJobView struct {
JobID string `json:"jobId"`
SummaryState string `json:"state"`
Pods int `json:"pods"`
StartedAt string `json:"startedAt"`
Command string `json:"command"`
PodStates []execJobView `json:"podStates,omitempty"`
}

type podDetachJobsResult struct {
pod string
jobs []detachMetadata
err error
}

// execJobsOutput is the JSON shape for `okdev exec-jobs`. Per-pod errors
// are carried alongside the successful job rows so operators can see both
// results from one listing instead of losing everything on the first
// failure.
// execJobsOutput is the JSON shape for `okdev jobs list` / `okdev exec-jobs`.
// Per-pod errors are carried alongside the successful grouped job rows so
// operators can see both results from one listing instead of losing
// everything on the first failure.
type execJobsOutput struct {
Jobs []execJobView `json:"jobs"`
Errors []execJobsPodError `json:"errors,omitempty"`
Jobs []logicalExecJobView `json:"jobs"`
Errors []execJobsPodError `json:"errors,omitempty"`
}

type execJobsPodError struct {
Expand All @@ -102,32 +59,27 @@ type execJobsPodError struct {

func runExecJobs(ctx context.Context, client connect.ExecClient, namespace string, pods []kube.PodSummary, container string, jobID string, fanout int, out io.Writer, jsonOutput bool) error {
rows, podErrors := collectDetachJobs(ctx, client, namespace, pods, container, jobID, fanout)
jobs := groupDetachJobs(rows)
if jsonOutput {
if err := outputJSON(out, execJobsOutput{Jobs: rows, Errors: podErrors}); err != nil {
if err := outputJSON(out, execJobsOutput{Jobs: jobs, Errors: podErrors}); err != nil {
return err
}
} else {
if len(rows) == 0 && len(podErrors) == 0 {
if len(jobs) == 0 && len(podErrors) == 0 {
fmt.Fprintln(out, "No detached exec jobs found")
}
if len(rows) > 0 {
table := make([][]string, 0, len(rows))
for _, row := range rows {
exit := "-"
if row.ExitCode != nil {
exit = fmt.Sprintf("%d", *row.ExitCode)
}
if len(jobs) > 0 {
table := make([][]string, 0, len(jobs))
for _, job := range jobs {
table = append(table, []string{
row.Pod,
row.Container,
row.JobID,
fmt.Sprintf("%d", row.PID),
row.State,
exit,
row.LogPath,
job.JobID,
job.SummaryState,
fmt.Sprintf("%d", job.Pods),
job.StartedAt,
job.Command,
})
}
output.PrintTable(out, []string{"POD", "CONTAINER", "JOB ID", "PID", "STATE", "EXIT", "LOG"}, table)
output.PrintTable(out, []string{"JOB ID", "STATE", "PODS", "STARTED", "COMMAND"}, table)
}
if len(podErrors) > 0 {
fmt.Fprintf(out, "\nFAILED:\n")
Expand Down Expand Up @@ -207,6 +159,83 @@ func collectDetachJobs(ctx context.Context, client connect.ExecClient, namespace
return rows, podErrors
}

func groupDetachJobs(rows []execJobView) []logicalExecJobView {
byJob := make(map[string][]execJobView)
for _, row := range rows {
byJob[row.JobID] = append(byJob[row.JobID], row)
}

logical := make([]logicalExecJobView, 0, len(byJob))
for jobID, members := range byJob {
sort.Slice(members, func(i, j int) bool {
if members[i].Pod != members[j].Pod {
return members[i].Pod < members[j].Pod
}
return members[i].StartedAt < members[j].StartedAt
})
logical = append(logical, logicalExecJobView{
JobID: jobID,
SummaryState: summarizeLogicalJobState(members),
Pods: len(members),
StartedAt: earliestStartedAt(members),
Command: members[0].Command,
PodStates: members,
})
}

sort.Slice(logical, func(i, j int) bool {
if logical[i].StartedAt != logical[j].StartedAt {
return logical[i].StartedAt < logical[j].StartedAt
}
return logical[i].JobID < logical[j].JobID
})
return logical
}

func summarizeLogicalJobState(rows []execJobView) string {
if len(rows) == 0 {
return "unknown"
}
total := len(rows)
counts := make(map[string]int, total)
for _, row := range rows {
counts[row.State]++
}
if running := counts["running"]; running > 0 {
return fmt.Sprintf("running(%d/%d)", running, total)
}
if failed := counts["orphaned"] + failedRows(rows); failed > 0 {
return fmt.Sprintf("failed(%d/%d)", failed, total)
}
if exited := counts["exited"]; exited == total {
return fmt.Sprintf("exited(%d/%d)", exited, total)
}
return fmt.Sprintf("mixed(%d/%d)", total, total)
}

func failedRows(rows []execJobView) int {
failed := 0
for _, row := range rows {
if row.ExitCode != nil && *row.ExitCode != 0 {
failed++
}
}
return failed
}

func earliestStartedAt(rows []execJobView) string {
if len(rows) == 0 {
return ""
}
earliest := rows[0].StartedAt
for _, row := range rows[1:] {
if earliest == "" || (row.StartedAt != "" && row.StartedAt < earliest) {
earliest = row.StartedAt
}
}
return earliest
}

func detachJobsCommand() []string {
// Each output line is "<alive>\t<json>" where <alive> is 1 if the job's
// pid is still running and its /proc/<pid>/environ contains the
Expand Down
Loading
Loading