From 0189afdd72455ec76d7ea82d3dc04111eb0a8e3d Mon Sep 17 00:00:00 2001 From: acmore Date: Sun, 10 May 2026 09:18:26 +0800 Subject: [PATCH 1/2] feat: add detached exec jobs commands --- docs/command-reference.md | 36 +- internal/cli/command_ctor_test.go | 9 + internal/cli/exec_jobs.go | 179 +++++---- internal/cli/exec_jobs_test.go | 88 ++++- internal/cli/jobs.go | 538 ++++++++++++++++++++++++++++ internal/cli/jobs_logs_wait_test.go | 218 +++++++++++ internal/cli/jobs_stop_test.go | 90 +++++ internal/cli/pdsh.go | 9 +- internal/cli/pdsh_test.go | 8 +- internal/cli/root.go | 1 + internal/cli/root_test.go | 2 +- internal/cli/timeouts.go | 6 + 12 files changed, 1093 insertions(+), 91 deletions(-) create mode 100644 internal/cli/jobs.go create mode 100644 internal/cli/jobs_logs_wait_test.go create mode 100644 internal/cli/jobs_stop_test.go diff --git a/docs/command-reference.md b/docs/command-reference.md index a28f41f..527c16b 100644 --- a/docs/command-reference.md +++ b/docs/command-reference.md @@ -27,6 +27,11 @@ - `okdev target set [--pod | --role ]` - `okdev agent list` - `okdev exec [session] [--shell /bin/bash] [--no-tty] [--pod | --role | --label ] [--exclude ] [--container ] [--detach] [--timeout ] [--log-dir ] [--no-prefix] [--fanout N] [-- command...]` +- `okdev jobs list [session] [--job-id ] [--container ] [--fanout N]` +- `okdev jobs logs [session] [-f|--follow] [--container ] [--fanout N]` +- `okdev jobs stop [session] [--container ] [--fanout N]` +- `okdev jobs wait [session] [--container ] [--fanout N]` +- `okdev exec-jobs [session] [--job-id ] [--container ] [--fanout N]` - `okdev cp [session] [--all | --pod | --role | --label ] [--exclude ] [--container ] [--fanout N]` - `okdev logs [session] [--container | --all] [--tail N] [--since 5m] [--follow] [--previous]` - `okdev ssh [session] [--setup-key] [--user root] [--cmd "..."] [--no-tmux] [--forward-agent|--no-forward-agent]` @@ -86,12 +91,13 @@ - `--fanout N`: maximum concurrent pod executions (default 16). - `--pod`, `--role`, and `--label` are mutually exclusive. -### `okdev exec-jobs [session]` +### `okdev jobs list [session]` - Lists detached `okdev exec --detach` jobs across the session's running pods. - Reads job metadata from `/tmp/okdev-exec/*.json` in the target container. -- Shows pod, container, job id, pid, current state, exit code when available, and log path in table form for text output. -- State values: `running` (wrapper alive and user command in flight), `exited` (user command finished; `EXIT` column holds the captured status), and `orphaned` (metadata still says `running` but the pid has exited or been recycled - typically the wrapper was `SIGKILL`ed or the container was restarted before the completion metadata could be written). +- Text output is grouped by logical job id, with one row per detached launch showing job id, summarized state, pod count, earliest start time, and original command. +- JSON output includes both the logical job summary and the per-pod `podStates` records. +- State values: `running` (wrapper alive and user command in flight), `exited` (user command finished; exit code is recorded in `podStates` / JSON), and `orphaned` (metadata still says `running` but the pid has exited or been recycled - typically the wrapper was `SIGKILL`ed or the container was restarted before the completion metadata could be written). Grouped text summaries render forms such as `running(1/2)`, `exited(2/2)`, and `failed(1/2)`. - When any pod fails to list its jobs, the command still prints the jobs it was able to collect from the healthy pods and reports the failures in a `FAILED:` footer (or an `errors` array in `--json` output); exit status is non-zero so scripts can detect partial failures. - `--pod`: target specific pods by name (repeatable or comma-separated). - `--role`: target pods by `okdev.io/workload-role` label (case-insensitive). @@ -102,6 +108,30 @@ - `--fanout N`: maximum concurrent pod queries (default 16). - `--ready-only`: inspect only pods that are already running. +### `okdev jobs logs [session]` + +- Streams the detached job's combined stdout/stderr aggregated across all pods in the logical job. +- Each line is prefixed by the pod short name so multi-pod output stays attributable. +- `-f` / `--follow`: keep following until every pod in the job reaches a terminal state. +- If some pod logs are unavailable, okdev still streams the logs it can read and reports the missing pods in a `FAILED:` footer before returning non-zero. + +### `okdev jobs wait [session]` + +- Polls the detached job until all pod-local records are terminal. +- Returns success only when every pod exits cleanly. +- Returns non-zero when any pod exits non-zero, becomes `orphaned`, cannot be queried, or the job id does not exist. + +### `okdev jobs stop [session]` + +- Stops every still-running pod in the logical job by sending `SIGTERM`, waiting 10 seconds, then sending `SIGKILL` to survivors. +- Prints which pods were signaled during the stop flow. +- Returns non-zero when any pod could not be queried or signaled, or if pods are still running after the stop attempt. + +### `okdev exec-jobs [session]` + +- Compatibility alias for `okdev jobs list [session]`. +- Uses the same grouped logical-job output and flags as `okdev jobs list`. + ### `okdev cp [session] ` - Copies files or directories between the local machine and session pods. diff --git a/internal/cli/command_ctor_test.go b/internal/cli/command_ctor_test.go index 187e1a2..3a22792 100644 --- a/internal/cli/command_ctor_test.go +++ b/internal/cli/command_ctor_test.go @@ -40,6 +40,15 @@ func TestCommandConstructorsExposeExpectedMetadata(t *testing.T) { if cmd := newExecJobsCmd(&Options{}); cmd.Use != "exec-jobs [session]" || cmd.Flags().Lookup("pod") == nil || cmd.Flags().Lookup("role") == nil || cmd.Flags().Lookup("label") == nil || cmd.Flags().Lookup("exclude") == nil || cmd.Flags().Lookup("container") == nil || cmd.Flags().Lookup("fanout") == nil { t.Fatalf("unexpected exec-jobs command shape") } + if cmd := newJobsCmd(&Options{}); cmd.Use != "jobs" || cmd.Short == "" { + t.Fatalf("unexpected jobs command shape") + } else { + for _, sub := range []string{"list", "logs", "stop", "wait"} { + if _, _, err := cmd.Find([]string{sub}); err != nil { + t.Fatalf("expected jobs subcommand %q: %v", sub, err) + } + } + } if cmd := newPortForwardCmd(&Options{}); cmd.Use != "port-forward [session] ..." || cmd.Short == "" || cmd.Flags().Lookup("pod") == nil || cmd.Flags().Lookup("role") == nil || cmd.Flags().Lookup("address") == nil || cmd.Flags().Lookup("ready-only") == nil { t.Fatalf("unexpected port-forward command shape") } diff --git a/internal/cli/exec_jobs.go b/internal/cli/exec_jobs.go index 862203c..733bbed 100644 --- a/internal/cli/exec_jobs.go +++ b/internal/cli/exec_jobs.go @@ -13,60 +13,8 @@ import ( "github.com/acmore/okdev/internal/connect" "github.com/acmore/okdev/internal/kube" "github.com/acmore/okdev/internal/output" - "github.com/spf13/cobra" ) -func newExecJobsCmd(opts *Options) *cobra.Command { - var podNames []string - var role string - var labels []string - var exclude []string - var container string - var jobID string - var fanout int - var readyOnly bool - - cmd := &cobra.Command{ - Use: "exec-jobs [session]", - Short: "List detached exec jobs", - Args: validateExecArgs, - ValidArgsFunction: sessionCompletionFunc(opts), - RunE: func(cmd *cobra.Command, args []string) error { - applySessionArg(opts, args) - cc, err := resolveCommandContext(opts, resolveSessionName) - if err != nil { - return err - } - if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil { - return err - } - - pods, err := selectSessionPods(cmd.Context(), cc, podNames, role, labels, exclude, readyOnly) - if err != nil { - return err - } - targetContainer := container - if targetContainer == "" { - targetContainer = resolveTargetContainer(cc.cfg) - } - effectiveFanout := fanout - if effectiveFanout <= 0 { - effectiveFanout = pdshDefaultFanout - } - return runExecJobs(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, jobID, effectiveFanout, cmd.OutOrStdout(), strings.EqualFold(opts.Output, "json")) - }, - } - cmd.Flags().StringSliceVar(&podNames, "pod", nil, "Target specific pods by name (repeatable/comma-separated)") - cmd.Flags().StringVar(&role, "role", "", "Target pods by workload role") - cmd.Flags().StringSliceVar(&labels, "label", nil, "Target pods by label key=value (repeatable)") - cmd.Flags().StringSliceVar(&exclude, "exclude", nil, "Exclude specific pods (repeatable/comma-separated)") - cmd.Flags().StringVar(&container, "container", "", "Override target container") - cmd.Flags().StringVar(&jobID, "job-id", "", "Filter to a specific detached exec job id") - cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries") - cmd.Flags().BoolVar(&readyOnly, "ready-only", false, "Query only pods that are already running (skip readiness check)") - return cmd -} - type execJobView struct { Pod string `json:"pod"` Container string `json:"container"` @@ -80,19 +28,28 @@ type execJobView struct { Command string `json:"command"` } +type logicalExecJobView struct { + JobID string `json:"jobId"` + SummaryState string `json:"state"` + Pods int `json:"pods"` + StartedAt string `json:"startedAt"` + Command string `json:"command"` + PodStates []execJobView `json:"podStates,omitempty"` +} + type podDetachJobsResult struct { pod string jobs []detachMetadata err error } -// execJobsOutput is the JSON shape for `okdev exec-jobs`. Per-pod errors -// are carried alongside the successful job rows so operators can see both -// results from one listing instead of losing everything on the first -// failure. +// execJobsOutput is the JSON shape for `okdev jobs list` / `okdev exec-jobs`. +// Per-pod errors are carried alongside the successful grouped job rows so +// operators can see both results from one listing instead of losing +// everything on the first failure. type execJobsOutput struct { - Jobs []execJobView `json:"jobs"` - Errors []execJobsPodError `json:"errors,omitempty"` + Jobs []logicalExecJobView `json:"jobs"` + Errors []execJobsPodError `json:"errors,omitempty"` } type execJobsPodError struct { @@ -102,32 +59,27 @@ type execJobsPodError struct { func runExecJobs(ctx context.Context, client connect.ExecClient, namespace string, pods []kube.PodSummary, container string, jobID string, fanout int, out io.Writer, jsonOutput bool) error { rows, podErrors := collectDetachJobs(ctx, client, namespace, pods, container, jobID, fanout) + jobs := groupDetachJobs(rows) if jsonOutput { - if err := outputJSON(out, execJobsOutput{Jobs: rows, Errors: podErrors}); err != nil { + if err := outputJSON(out, execJobsOutput{Jobs: jobs, Errors: podErrors}); err != nil { return err } } else { - if len(rows) == 0 && len(podErrors) == 0 { + if len(jobs) == 0 && len(podErrors) == 0 { fmt.Fprintln(out, "No detached exec jobs found") } - if len(rows) > 0 { - table := make([][]string, 0, len(rows)) - for _, row := range rows { - exit := "-" - if row.ExitCode != nil { - exit = fmt.Sprintf("%d", *row.ExitCode) - } + if len(jobs) > 0 { + table := make([][]string, 0, len(jobs)) + for _, job := range jobs { table = append(table, []string{ - row.Pod, - row.Container, - row.JobID, - fmt.Sprintf("%d", row.PID), - row.State, - exit, - row.LogPath, + job.JobID, + job.SummaryState, + fmt.Sprintf("%d", job.Pods), + job.StartedAt, + job.Command, }) } - output.PrintTable(out, []string{"POD", "CONTAINER", "JOB ID", "PID", "STATE", "EXIT", "LOG"}, table) + output.PrintTable(out, []string{"JOB ID", "STATE", "PODS", "STARTED", "COMMAND"}, table) } if len(podErrors) > 0 { fmt.Fprintf(out, "\nFAILED:\n") @@ -207,6 +159,83 @@ func collectDetachJobs(ctx context.Context, client connect.ExecClient, namespace return rows, podErrors } +func groupDetachJobs(rows []execJobView) []logicalExecJobView { + byJob := make(map[string][]execJobView) + for _, row := range rows { + byJob[row.JobID] = append(byJob[row.JobID], row) + } + + logical := make([]logicalExecJobView, 0, len(byJob)) + for jobID, members := range byJob { + sort.Slice(members, func(i, j int) bool { + if members[i].Pod != members[j].Pod { + return members[i].Pod < members[j].Pod + } + return members[i].StartedAt < members[j].StartedAt + }) + logical = append(logical, logicalExecJobView{ + JobID: jobID, + SummaryState: summarizeLogicalJobState(members), + Pods: len(members), + StartedAt: earliestStartedAt(members), + Command: members[0].Command, + PodStates: members, + }) + } + + sort.Slice(logical, func(i, j int) bool { + if logical[i].StartedAt != logical[j].StartedAt { + return logical[i].StartedAt < logical[j].StartedAt + } + return logical[i].JobID < logical[j].JobID + }) + return logical +} + +func summarizeLogicalJobState(rows []execJobView) string { + if len(rows) == 0 { + return "unknown" + } + total := len(rows) + counts := make(map[string]int, total) + for _, row := range rows { + counts[row.State]++ + } + if running := counts["running"]; running > 0 { + return fmt.Sprintf("running(%d/%d)", running, total) + } + if failed := counts["orphaned"] + failedRows(rows); failed > 0 { + return fmt.Sprintf("failed(%d/%d)", failed, total) + } + if exited := counts["exited"]; exited == total { + return fmt.Sprintf("exited(%d/%d)", exited, total) + } + return fmt.Sprintf("mixed(%d/%d)", total, total) +} + +func failedRows(rows []execJobView) int { + failed := 0 + for _, row := range rows { + if row.ExitCode != nil && *row.ExitCode != 0 { + failed++ + } + } + return failed +} + +func earliestStartedAt(rows []execJobView) string { + if len(rows) == 0 { + return "" + } + earliest := rows[0].StartedAt + for _, row := range rows[1:] { + if earliest == "" || (row.StartedAt != "" && row.StartedAt < earliest) { + earliest = row.StartedAt + } + } + return earliest +} + func detachJobsCommand() []string { // Each output line is "\t" where is 1 if the job's // pid is still running and its /proc//environ contains the diff --git a/internal/cli/exec_jobs_test.go b/internal/cli/exec_jobs_test.go index f61cd49..cf7afe7 100644 --- a/internal/cli/exec_jobs_test.go +++ b/internal/cli/exec_jobs_test.go @@ -85,8 +85,8 @@ func TestRunExecJobsRendersPartialFailureFooter(t *testing.T) { t.Fatalf("unexpected error: %v", err) } got := out.String() - // Healthy pod row must still render. - for _, want := range []string{"okdev-sess-worker-0", "job-a", "running"} { + // Healthy logical job row must still render. + for _, want := range []string{"JOB ID", "job-a", "running(1/1)", "python train.py"} { if !strings.Contains(got, want) { t.Fatalf("expected table to include healthy pod row, missing %q in %q", want, got) } @@ -100,8 +100,8 @@ func TestRunExecJobsRendersPartialFailureFooter(t *testing.T) { func TestRunExecJobs(t *testing.T) { client := &fakePdshExecClient{ outputs: map[string]string{ - "okdev-sess-worker-0": "{\"jobId\":\"job-a\",\"pod\":\"okdev-sess-worker-0\",\"container\":\"dev\",\"pid\":48217,\"command\":\"python train.py\",\"startedAt\":\"2026-04-23T03:00:00Z\",\"stdoutPath\":\"/tmp/okdev-exec/job-a.log\",\"stderrPath\":\"/tmp/okdev-exec/job-a.log\",\"metaPath\":\"/tmp/okdev-exec/job-a.json\",\"state\":\"running\"}\n", - "okdev-sess-worker-1": "", + "okdev-sess-worker-0": "{\"jobId\":\"job-shared\",\"pod\":\"okdev-sess-worker-0\",\"container\":\"dev\",\"pid\":48217,\"command\":\"python train.py\",\"startedAt\":\"2026-04-23T03:00:00Z\",\"stdoutPath\":\"/tmp/okdev-exec/job-shared.log\",\"stderrPath\":\"/tmp/okdev-exec/job-shared.log\",\"metaPath\":\"/tmp/okdev-exec/job-shared.json\",\"state\":\"running\"}\n", + "okdev-sess-worker-1": "{\"jobId\":\"job-shared\",\"pod\":\"okdev-sess-worker-1\",\"container\":\"dev\",\"pid\":49102,\"command\":\"python train.py\",\"startedAt\":\"2026-04-23T03:00:01Z\",\"stdoutPath\":\"/tmp/okdev-exec/job-shared.log\",\"stderrPath\":\"/tmp/okdev-exec/job-shared.log\",\"metaPath\":\"/tmp/okdev-exec/job-shared.json\",\"state\":\"exited\",\"exitCode\":0}\n", }, errs: map[string]error{}, } @@ -114,13 +114,44 @@ func TestRunExecJobs(t *testing.T) { t.Fatalf("runExecJobs: %v", err) } got := out.String() - for _, want := range []string{"POD", "JOB ID", "STATE", "okdev-sess-worker-0", "job-a", "running"} { + for _, want := range []string{"JOB ID", "STATE", "PODS", "COMMAND", "job-shared", "running(1/2)", "2", "python train.py"} { if !strings.Contains(got, want) { t.Fatalf("expected table output containing %q, got %q", want, got) } } } +func TestRunExecJobsJSONGroupsLogicalJobs(t *testing.T) { + client := &fakePdshExecClient{ + outputs: map[string]string{ + "okdev-sess-worker-0": "{\"jobId\":\"job-shared\",\"pod\":\"okdev-sess-worker-0\",\"container\":\"dev\",\"pid\":48217,\"command\":\"python train.py\",\"startedAt\":\"2026-04-23T03:00:00Z\",\"stdoutPath\":\"/tmp/okdev-exec/job-shared.log\",\"stderrPath\":\"/tmp/okdev-exec/job-shared.log\",\"metaPath\":\"/tmp/okdev-exec/job-shared.json\",\"state\":\"running\"}\n", + "okdev-sess-worker-1": "{\"jobId\":\"job-shared\",\"pod\":\"okdev-sess-worker-1\",\"container\":\"dev\",\"pid\":49102,\"command\":\"python train.py\",\"startedAt\":\"2026-04-23T03:00:01Z\",\"stdoutPath\":\"/tmp/okdev-exec/job-shared.log\",\"stderrPath\":\"/tmp/okdev-exec/job-shared.log\",\"metaPath\":\"/tmp/okdev-exec/job-shared.json\",\"state\":\"exited\",\"exitCode\":0}\n", + }, + errs: map[string]error{}, + } + pods := []kube.PodSummary{ + {Name: "okdev-sess-worker-0", Phase: "Running"}, + {Name: "okdev-sess-worker-1", Phase: "Running"}, + } + var out bytes.Buffer + if err := runExecJobs(context.Background(), client, "default", pods, "dev", "", pdshDefaultFanout, &out, true); err != nil { + t.Fatalf("runExecJobs: %v", err) + } + got := out.String() + for _, want := range []string{ + "\"jobId\": \"job-shared\"", + "\"state\": \"running(1/2)\"", + "\"pods\": 2", + "\"podStates\": [", + "\"pod\": \"okdev-sess-worker-0\"", + "\"pod\": \"okdev-sess-worker-1\"", + } { + if !strings.Contains(got, want) { + t.Fatalf("expected grouped json output containing %q, got %q", want, got) + } + } +} + func TestParseDetachMetadataLinesDowngradesRunningWhenPidDead(t *testing.T) { // The remote script tags each line with "\t". When alive // is "0" and the metadata still claims "running", the listing should @@ -216,3 +247,50 @@ func TestCollectDetachJobsFiltersByJobID(t *testing.T) { t.Fatalf("expected only job-b, got %+v", rows) } } + +func TestGroupDetachJobsSummarizesLogicalJob(t *testing.T) { + rows := []execJobView{ + { + Pod: "okdev-sess-worker-0", + Container: "dev", + JobID: "job-shared", + PID: 100, + State: "running", + LogPath: "/tmp/okdev-exec/job-shared-worker-0.log", + MetaPath: "/tmp/okdev-exec/job-shared-worker-0.json", + StartedAt: "2026-05-09T10:00:00Z", + Command: "python train.py", + }, + { + Pod: "okdev-sess-worker-1", + Container: "dev", + JobID: "job-shared", + PID: 101, + State: "exited", + ExitCode: intPtr(0), + LogPath: "/tmp/okdev-exec/job-shared-worker-1.log", + MetaPath: "/tmp/okdev-exec/job-shared-worker-1.json", + StartedAt: "2026-05-09T10:00:01Z", + Command: "python train.py", + }, + } + + jobs := groupDetachJobs(rows) + if len(jobs) != 1 { + t.Fatalf("expected 1 logical job, got %d", len(jobs)) + } + if jobs[0].JobID != "job-shared" { + t.Fatalf("unexpected job id %q", jobs[0].JobID) + } + if jobs[0].Pods != 2 { + t.Fatalf("expected 2 pods, got %d", jobs[0].Pods) + } + if jobs[0].SummaryState != "running(1/2)" { + t.Fatalf("unexpected summary state %q", jobs[0].SummaryState) + } + if jobs[0].Command != "python train.py" { + t.Fatalf("unexpected command %q", jobs[0].Command) + } +} + +func intPtr(v int) *int { return &v } diff --git a/internal/cli/jobs.go b/internal/cli/jobs.go new file mode 100644 index 0000000..9c8a2b8 --- /dev/null +++ b/internal/cli/jobs.go @@ -0,0 +1,538 @@ +package cli + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/acmore/okdev/internal/connect" + "github.com/acmore/okdev/internal/kube" + "github.com/spf13/cobra" +) + +var errDetachJobNotFound = errors.New("detached job not found") + +var ( + detachJobPollEvery = detachJobPollInterval + detachJobStopGrace = detachJobStopGracePeriod +) + +type detachJobClient interface { + connect.ExecClient + ExecShInContainer(context.Context, string, string, string, string) ([]byte, error) + StreamShInContainer(context.Context, string, string, string, string, io.Writer, io.Writer) error +} + +func newJobsCmd(opts *Options) *cobra.Command { + cmd := &cobra.Command{ + Use: "jobs", + Short: "Manage detached exec jobs", + } + cmd.AddCommand(newJobsListCmd(opts)) + cmd.AddCommand(newJobsLogsCmd(opts)) + cmd.AddCommand(newJobsStopCmd(opts)) + cmd.AddCommand(newJobsWaitCmd(opts)) + return cmd +} + +func newJobsListCmd(opts *Options) *cobra.Command { + return newJobsListCmdWithUse(opts, "list [session]", "List detached jobs") +} + +func newExecJobsCmd(opts *Options) *cobra.Command { + return newJobsListCmdWithUse(opts, "exec-jobs [session]", "List detached exec jobs") +} + +func newJobsListCmdWithUse(opts *Options, use string, short string) *cobra.Command { + var podNames []string + var role string + var labels []string + var exclude []string + var container string + var jobID string + var fanout int + var readyOnly bool + + cmd := &cobra.Command{ + Use: use, + Short: short, + Args: validateExecArgs, + ValidArgsFunction: sessionCompletionFunc(opts), + RunE: func(cmd *cobra.Command, args []string) error { + applySessionArg(opts, args) + cc, err := resolveCommandContext(opts, resolveSessionName) + if err != nil { + return err + } + if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil { + return err + } + + pods, err := selectSessionPods(cmd.Context(), cc, podNames, role, labels, exclude, readyOnly) + if err != nil { + return err + } + targetContainer := container + if targetContainer == "" { + targetContainer = resolveTargetContainer(cc.cfg) + } + effectiveFanout := fanout + if effectiveFanout <= 0 { + effectiveFanout = pdshDefaultFanout + } + return runExecJobs(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, jobID, effectiveFanout, cmd.OutOrStdout(), opts.Output == "json") + }, + } + cmd.Flags().StringSliceVar(&podNames, "pod", nil, "Target specific pods by name (repeatable/comma-separated)") + cmd.Flags().StringVar(&role, "role", "", "Target pods by workload role") + cmd.Flags().StringSliceVar(&labels, "label", nil, "Target pods by label key=value (repeatable)") + cmd.Flags().StringSliceVar(&exclude, "exclude", nil, "Exclude specific pods (repeatable/comma-separated)") + cmd.Flags().StringVar(&container, "container", "", "Override target container") + cmd.Flags().StringVar(&jobID, "job-id", "", "Filter to a specific detached exec job id") + cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries") + cmd.Flags().BoolVar(&readyOnly, "ready-only", false, "Query only pods that are already running (skip readiness check)") + return cmd +} + +func newJobsLogsCmd(opts *Options) *cobra.Command { + var follow bool + var container string + var fanout int + cmd := &cobra.Command{ + Use: "logs [session]", + Short: "Show detached job logs", + Args: func(cmd *cobra.Command, args []string) error { + if len(args) == 0 || len(args) > 2 { + return errors.New("requires and optional [session]") + } + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + applySessionArg(opts, args[1:]) + cc, err := resolveCommandContext(opts, resolveSessionName) + if err != nil { + return err + } + if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil { + return err + } + pods, err := selectSessionPods(cmd.Context(), cc, nil, "", nil, nil, true) + if err != nil { + return err + } + targetContainer := strings.TrimSpace(container) + if targetContainer == "" { + targetContainer = resolveTargetContainer(cc.cfg) + } + return runJobsLogs(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, strings.TrimSpace(args[0]), fanoutOrDefault(fanout), follow, cmd.OutOrStdout()) + }, + } + cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow logs until all pods in the job finish") + cmd.Flags().StringVar(&container, "container", "", "Override target container") + cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries") + return cmd +} + +func newJobsStopCmd(opts *Options) *cobra.Command { + var container string + var fanout int + cmd := &cobra.Command{ + Use: "stop [session]", + Short: "Stop a detached job", + Args: func(cmd *cobra.Command, args []string) error { + if len(args) == 0 || len(args) > 2 { + return errors.New("requires and optional [session]") + } + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + applySessionArg(opts, args[1:]) + cc, err := resolveCommandContext(opts, resolveSessionName) + if err != nil { + return err + } + if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil { + return err + } + pods, err := selectSessionPods(cmd.Context(), cc, nil, "", nil, nil, true) + if err != nil { + return err + } + targetContainer := strings.TrimSpace(container) + if targetContainer == "" { + targetContainer = resolveTargetContainer(cc.cfg) + } + return runJobsStop(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, strings.TrimSpace(args[0]), fanoutOrDefault(fanout), cmd.OutOrStdout()) + }, + } + cmd.Flags().StringVar(&container, "container", "", "Override target container") + cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries") + return cmd +} + +func newJobsWaitCmd(opts *Options) *cobra.Command { + var container string + var fanout int + cmd := &cobra.Command{ + Use: "wait [session]", + Short: "Wait for a detached job to finish", + Args: func(cmd *cobra.Command, args []string) error { + if len(args) == 0 || len(args) > 2 { + return errors.New("requires and optional [session]") + } + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + applySessionArg(opts, args[1:]) + cc, err := resolveCommandContext(opts, resolveSessionName) + if err != nil { + return err + } + if err := ensureExistingSessionOwnership(cc.opts, cc.kube, cc.namespace, cc.sessionName); err != nil { + return err + } + pods, err := selectSessionPods(cmd.Context(), cc, nil, "", nil, nil, true) + if err != nil { + return err + } + targetContainer := strings.TrimSpace(container) + if targetContainer == "" { + targetContainer = resolveTargetContainer(cc.cfg) + } + return runJobsWait(cmd.Context(), cc.kube, cc.namespace, pods, targetContainer, strings.TrimSpace(args[0]), fanoutOrDefault(fanout)) + }, + } + cmd.Flags().StringVar(&container, "container", "", "Override target container") + cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod queries") + return cmd +} + +func fanoutOrDefault(v int) int { + if v <= 0 { + return pdshDefaultFanout + } + return v +} + +func runJobsLogs(ctx context.Context, client detachJobClient, namespace string, pods []kube.PodSummary, container, jobID string, fanout int, follow bool, out io.Writer) error { + job, podErrors, err := resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if err != nil { + return err + } + shortNames := shortPodNames(jobPodNames(job)) + prefixes := formatPodPrefixes(shortNames, false) + prefixByPod := make(map[string]string, len(job.PodStates)) + for i, row := range job.PodStates { + prefixByPod[row.Pod] = prefixes[i] + } + + streamCtx := ctx + var cancel context.CancelFunc + if follow { + streamCtx, cancel = context.WithCancel(ctx) + defer cancel() + } + + var ( + wg sync.WaitGroup + writeMu sync.Mutex + errMu sync.Mutex + logErrors []execJobsPodError + ) + recordErr := func(pod string, err error) { + if err == nil { + return + } + errMu.Lock() + defer errMu.Unlock() + logErrors = append(logErrors, execJobsPodError{Pod: pod, Error: err.Error()}) + } + + for _, row := range job.PodStates { + row := row + if strings.TrimSpace(row.LogPath) == "" { + recordErr(row.Pod, errors.New("missing detached job log path")) + continue + } + wg.Add(1) + go func() { + defer wg.Done() + err := streamDetachJobLog(streamCtx, client, namespace, row, prefixByPod[row.Pod], follow, out, &writeMu) + if follow && streamCtx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + return + } + recordErr(row.Pod, err) + }() + } + + if !follow { + wg.Wait() + printExecJobsErrors(out, append(podErrors, logErrors...)) + if len(podErrors)+len(logErrors) > 0 { + return fmt.Errorf("failed to stream detached job %q logs on %d pod(s)", jobID, len(podErrors)+len(logErrors)) + } + return nil + } + + followPodErrors := append([]execJobsPodError{}, podErrors...) + ticker := time.NewTicker(detachJobPollEvery) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + cancel() + wg.Wait() + return ctx.Err() + case <-ticker.C: + current, errs, resolveErr := resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if resolveErr != nil { + cancel() + wg.Wait() + return resolveErr + } + if len(errs) > 0 { + followPodErrors = errs + cancel() + wg.Wait() + printExecJobsErrors(out, append(followPodErrors, logErrors...)) + return fmt.Errorf("failed to follow detached job %q logs on %d pod(s)", jobID, len(followPodErrors)+len(logErrors)) + } + if allLogicalJobTerminal(current) { + cancel() + wg.Wait() + printExecJobsErrors(out, append(followPodErrors, logErrors...)) + if len(logErrors) > 0 { + return fmt.Errorf("failed to follow detached job %q logs on %d pod(s)", jobID, len(logErrors)) + } + return nil + } + } + } +} + +func runJobsWait(ctx context.Context, client connect.ExecClient, namespace string, pods []kube.PodSummary, container, jobID string, fanout int) error { + ticker := time.NewTicker(detachJobPollEvery) + defer ticker.Stop() + for { + job, podErrors, err := resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if err != nil { + return err + } + if len(podErrors) > 0 { + return fmt.Errorf("failed to query detached job %q on %d pod(s)", jobID, len(podErrors)) + } + if allLogicalJobTerminal(job) { + if logicalJobFailed(job) { + return fmt.Errorf("detached job %q finished with state %s", jobID, job.SummaryState) + } + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +func runJobsStop(ctx context.Context, client detachJobClient, namespace string, pods []kube.PodSummary, container, jobID string, fanout int, out io.Writer) error { + job, podErrors, err := resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if err != nil { + return err + } + initialPodErrors := append([]execJobsPodError{}, podErrors...) + + shortNames := shortPodNames(jobPodNames(job)) + prefixes := formatPodPrefixes(shortNames, false) + prefixByPod := make(map[string]string, len(job.PodStates)) + for i, row := range job.PodStates { + prefixByPod[row.Pod] = prefixes[i] + } + + signalErrors := make([]execJobsPodError, 0) + for _, row := range job.PodStates { + if row.State != "running" { + continue + } + result, signalErr := signalDetachJob(ctx, client, namespace, row, "TERM") + if signalErr != nil { + signalErrors = append(signalErrors, execJobsPodError{Pod: row.Pod, Error: signalErr.Error()}) + fmt.Fprintf(out, "%s error: %v\n", prefixByPod[row.Pod], signalErr) + continue + } + fmt.Fprintf(out, "%s sent SIGTERM pid=%d (%s)\n", prefixByPod[row.Pod], row.PID, result) + } + + deadline := time.NewTimer(detachJobStopGrace) + defer deadline.Stop() + ticker := time.NewTicker(detachJobPollEvery) + defer ticker.Stop() + + current := job +waitLoop: + for { + if allLogicalJobTerminal(current) { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-deadline.C: + break waitLoop + case <-ticker.C: + current, podErrors, err = resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if err != nil { + return err + } + if len(podErrors) > 0 { + break waitLoop + } + } + } + + killErrors := make([]execJobsPodError, 0) + if !allLogicalJobTerminal(current) { + for _, row := range current.PodStates { + if row.State != "running" { + continue + } + result, signalErr := signalDetachJob(ctx, client, namespace, row, "KILL") + if signalErr != nil { + killErrors = append(killErrors, execJobsPodError{Pod: row.Pod, Error: signalErr.Error()}) + fmt.Fprintf(out, "%s error: %v\n", prefixByPod[row.Pod], signalErr) + continue + } + fmt.Fprintf(out, "%s sent SIGKILL pid=%d (%s)\n", prefixByPod[row.Pod], row.PID, result) + } + } + + current, podErrors, err = resolveLogicalDetachJob(ctx, client, namespace, pods, container, jobID, fanout) + if err != nil { + return err + } + allErrors := append(append(append([]execJobsPodError{}, initialPodErrors...), podErrors...), signalErrors...) + allErrors = append(allErrors, killErrors...) + printExecJobsErrors(out, allErrors) + if len(allErrors) > 0 { + return fmt.Errorf("failed to stop detached job %q on %d pod(s)", jobID, len(allErrors)) + } + if !allLogicalJobTerminal(current) { + return fmt.Errorf("detached job %q still has running pods", jobID) + } + return nil +} + +func resolveLogicalDetachJob(ctx context.Context, client connect.ExecClient, namespace string, pods []kube.PodSummary, container, jobID string, fanout int) (logicalExecJobView, []execJobsPodError, error) { + rows, podErrors := collectDetachJobs(ctx, client, namespace, pods, container, jobID, fanout) + jobs := groupDetachJobs(rows) + if len(jobs) == 0 { + return logicalExecJobView{}, podErrors, fmt.Errorf("%w %q", errDetachJobNotFound, jobID) + } + return jobs[0], podErrors, nil +} + +func jobPodNames(job logicalExecJobView) []string { + names := make([]string, 0, len(job.PodStates)) + for _, row := range job.PodStates { + names = append(names, row.Pod) + } + return names +} + +func allLogicalJobTerminal(job logicalExecJobView) bool { + if len(job.PodStates) == 0 { + return false + } + for _, row := range job.PodStates { + if row.State == "running" { + return false + } + } + return true +} + +func logicalJobFailed(job logicalExecJobView) bool { + for _, row := range job.PodStates { + if row.State == "orphaned" { + return true + } + if row.ExitCode != nil && *row.ExitCode != 0 { + return true + } + if row.State != "exited" { + return true + } + } + return false +} + +func streamDetachJobLog(ctx context.Context, client detachJobClient, namespace string, row execJobView, prefix string, follow bool, out io.Writer, mu *sync.Mutex) error { + script := readDetachJobLogScript(row.LogPath) + if follow { + script = followDetachJobLogScript(row.LogPath) + } + var stderr bytes.Buffer + writer := newPrefixedWriter(prefix, out, mu) + defer writer.Flush() + err := client.StreamShInContainer(ctx, namespace, row.Pod, row.Container, script, writer, &stderr) + if err != nil && strings.TrimSpace(stderr.String()) != "" { + return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String())) + } + return err +} + +func readDetachJobLogScript(logPath string) string { + quoted := shellQuote(logPath) + return fmt.Sprintf("if [ ! -f %s ]; then echo 'missing detached job log' >&2; exit 1; fi\ncat %s\n", quoted, quoted) +} + +func followDetachJobLogScript(logPath string) string { + quoted := shellQuote(logPath) + return fmt.Sprintf("if [ ! -f %s ]; then echo 'missing detached job log' >&2; exit 1; fi\nexec tail -n +1 -f %s\n", quoted, quoted) +} + +func signalDetachJob(ctx context.Context, client detachJobClient, namespace string, row execJobView, signalName string) (string, error) { + out, err := client.ExecShInContainer(ctx, namespace, row.Pod, row.Container, signalDetachJobScript(row.JobID, row.PID, signalName)) + if err != nil { + return "", err + } + return strings.TrimSpace(string(out)), nil +} + +func signalDetachJobScript(jobID string, pid int, signalName string) string { + return fmt.Sprintf( + "pid=%d\n"+ + "env_path=\"/proc/$pid/environ\"\n"+ + "if [ ! -r \"$env_path\" ]; then\n"+ + " echo not-running\n"+ + " exit 0\n"+ + "fi\n"+ + "if ! tr '\\000' '\\n' <\"$env_path\" | grep -qx %s; then\n"+ + " echo job-mismatch >&2\n"+ + " exit 1\n"+ + "fi\n"+ + "if kill -%s \"$pid\" 2>/dev/null; then\n"+ + " echo signaled\n"+ + " exit 0\n"+ + "fi\n"+ + "echo not-running\n", + pid, + shellQuote("OKDEV_JOB_ID="+jobID), + signalName, + ) +} + +func printExecJobsErrors(out io.Writer, errs []execJobsPodError) { + if len(errs) == 0 { + return + } + fmt.Fprintf(out, "\nFAILED:\n") + for _, pe := range errs { + fmt.Fprintf(out, "pod=%s error=%q\n", pe.Pod, pe.Error) + } +} diff --git a/internal/cli/jobs_logs_wait_test.go b/internal/cli/jobs_logs_wait_test.go new file mode 100644 index 0000000..e1c3c40 --- /dev/null +++ b/internal/cli/jobs_logs_wait_test.go @@ -0,0 +1,218 @@ +package cli + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "sync" + "testing" + + "github.com/acmore/okdev/internal/kube" +) + +func TestNewJobsLogsCmdAddsFollowAlias(t *testing.T) { + cmd := newJobsLogsCmd(&Options{}) + flag := cmd.Flags().Lookup("follow") + if flag == nil { + t.Fatal("expected --follow flag") + } + if flag.Shorthand != "f" { + t.Fatalf("expected -f shorthand, got %q", flag.Shorthand) + } +} + +func TestRunJobsLogsFollowStreamsPrefixedPodLogsUntilTerminal(t *testing.T) { + client := &fakeJobsClient{ + listOutputs: map[string][]string{ + "okdev-sess-worker-0": { + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "running", nil), + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "exited", intPtr(0)), + }, + "okdev-sess-worker-1": { + detachMetadataJSON("job-shared", "okdev-sess-worker-1", "dev", 101, "running", nil), + detachMetadataJSON("job-shared", "okdev-sess-worker-1", "dev", 101, "exited", intPtr(0)), + }, + }, + streamPlans: map[string]fakeJobsStreamPlan{ + "okdev-sess-worker-0|follow": {stdout: "alpha\n", waitForCancel: true}, + "okdev-sess-worker-1|follow": {stdout: "beta\n", waitForCancel: true}, + }, + } + pods := []kube.PodSummary{ + {Name: "okdev-sess-worker-0", Phase: "Running"}, + {Name: "okdev-sess-worker-1", Phase: "Running"}, + } + var out bytes.Buffer + if err := runJobsLogs(context.Background(), client, "default", pods, "dev", "job-shared", pdshDefaultFanout, true, &out); err != nil { + t.Fatalf("runJobsLogs: %v", err) + } + got := out.String() + for _, want := range []string{"[worker-0] alpha", "[worker-1] beta"} { + if !strings.Contains(got, want) { + t.Fatalf("expected %q in %q", want, got) + } + } + if strings.Contains(got, "FAILED:") { + t.Fatalf("expected no failure footer, got %q", got) + } +} + +func TestRunJobsWaitBlocksUntilAllPodsExitSuccessfully(t *testing.T) { + client := &fakeJobsClient{ + listOutputs: map[string][]string{ + "okdev-sess-worker-0": { + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "running", nil), + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "exited", intPtr(0)), + }, + "okdev-sess-worker-1": { + detachMetadataJSON("job-shared", "okdev-sess-worker-1", "dev", 101, "running", nil), + detachMetadataJSON("job-shared", "okdev-sess-worker-1", "dev", 101, "exited", intPtr(0)), + }, + }, + } + pods := []kube.PodSummary{ + {Name: "okdev-sess-worker-0", Phase: "Running"}, + {Name: "okdev-sess-worker-1", Phase: "Running"}, + } + if err := runJobsWait(context.Background(), client, "default", pods, "dev", "job-shared", pdshDefaultFanout); err != nil { + t.Fatalf("runJobsWait: %v", err) + } +} + +func TestRunJobsWaitFailsOnNonZeroExit(t *testing.T) { + client := &fakeJobsClient{ + listOutputs: map[string][]string{ + "okdev-sess-worker-0": { + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "exited", intPtr(137)), + }, + }, + } + pods := []kube.PodSummary{ + {Name: "okdev-sess-worker-0", Phase: "Running"}, + } + err := runJobsWait(context.Background(), client, "default", pods, "dev", "job-shared", pdshDefaultFanout) + if err == nil { + t.Fatal("expected failure") + } + if !strings.Contains(err.Error(), "failed(1/1)") { + t.Fatalf("unexpected error: %v", err) + } +} + +type fakeJobsClient struct { + mu sync.Mutex + listOutputs map[string][]string + listErrs map[string][]error + listCalls map[string]int + streamPlans map[string]fakeJobsStreamPlan + execShResponses map[string]fakeJobsExecResponse + execScripts []string + onExec func(pod, key string) +} + +type fakeJobsStreamPlan struct { + stdout string + stderr string + err error + waitForCancel bool +} + +type fakeJobsExecResponse struct { + stdout string + err error +} + +func (f *fakeJobsClient) ExecInteractive(ctx context.Context, namespace, pod string, tty bool, command []string, stdin io.Reader, stdout io.Writer, stderr io.Writer) error { + return f.ExecInteractiveInContainer(ctx, namespace, pod, "", tty, command, stdin, stdout, stderr) +} + +func (f *fakeJobsClient) ExecInteractiveInContainer(ctx context.Context, namespace, pod, container string, tty bool, command []string, stdin io.Reader, stdout io.Writer, stderr io.Writer) error { + f.mu.Lock() + if f.listCalls == nil { + f.listCalls = make(map[string]int) + } + idx := f.listCalls[pod] + f.listCalls[pod]++ + outputs := f.listOutputs[pod] + errs := f.listErrs[pod] + var out string + var err error + if len(outputs) > 0 { + if idx >= len(outputs) { + out = outputs[len(outputs)-1] + } else { + out = outputs[idx] + } + } + if len(errs) > 0 { + if idx >= len(errs) { + err = errs[len(errs)-1] + } else { + err = errs[idx] + } + } + f.mu.Unlock() + if out != "" { + fmt.Fprint(stdout, out) + } + return err +} + +func (f *fakeJobsClient) ExecShInContainer(ctx context.Context, namespace, pod, container, script string) ([]byte, error) { + f.mu.Lock() + f.execScripts = append(f.execScripts, pod+":"+script) + key := execKeyForScript(script) + resp := f.execShResponses[pod+"|"+key] + onExec := f.onExec + f.mu.Unlock() + if onExec != nil { + onExec(pod, key) + } + return []byte(resp.stdout), resp.err +} + +func (f *fakeJobsClient) StreamShInContainer(ctx context.Context, namespace, pod, container, script string, stdout, stderr io.Writer) error { + f.mu.Lock() + key := "read" + if strings.Contains(script, "tail -n +1 -f") { + key = "follow" + } + plan, ok := f.streamPlans[pod+"|"+key] + f.mu.Unlock() + if !ok { + return fmt.Errorf("unexpected stream script for %s: %s", pod, script) + } + if plan.stdout != "" { + fmt.Fprint(stdout, plan.stdout) + } + if plan.stderr != "" { + fmt.Fprint(stderr, plan.stderr) + } + if plan.waitForCancel { + <-ctx.Done() + return ctx.Err() + } + return plan.err +} + +func execKeyForScript(script string) string { + switch { + case strings.Contains(script, "kill -TERM"): + return "TERM" + case strings.Contains(script, "kill -KILL"): + return "KILL" + default: + return "other" + } +} + +func detachMetadataJSON(jobID, pod, container string, pid int, state string, exitCode *int) string { + exitPart := "" + if exitCode != nil { + exitPart = fmt.Sprintf(",\"exitCode\":%d", *exitCode) + } + return fmt.Sprintf("{\"jobId\":\"%s\",\"pod\":\"%s\",\"container\":\"%s\",\"pid\":%d,\"command\":\"python train.py\",\"startedAt\":\"2026-05-09T10:00:00Z\",\"stdoutPath\":\"/tmp/okdev-exec/%s.log\",\"stderrPath\":\"/tmp/okdev-exec/%s.log\",\"metaPath\":\"/tmp/okdev-exec/%s.json\",\"state\":\"%s\"%s}\n", + jobID, pod, container, pid, jobID, jobID, jobID, state, exitPart) +} diff --git a/internal/cli/jobs_stop_test.go b/internal/cli/jobs_stop_test.go new file mode 100644 index 0000000..16712fa --- /dev/null +++ b/internal/cli/jobs_stop_test.go @@ -0,0 +1,90 @@ +package cli + +import ( + "bytes" + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/acmore/okdev/internal/kube" +) + +func TestRunJobsStopEscalatesToSIGKILLAfterGrace(t *testing.T) { + oldPollEvery := detachJobPollEvery + oldStopGrace := detachJobStopGrace + detachJobPollEvery = 5 * time.Millisecond + detachJobStopGrace = 15 * time.Millisecond + defer func() { + detachJobPollEvery = oldPollEvery + detachJobStopGrace = oldStopGrace + }() + + client := &fakeJobsClient{ + listOutputs: map[string][]string{ + "okdev-sess-worker-0": { + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "running", nil), + }, + }, + execShResponses: map[string]fakeJobsExecResponse{ + "okdev-sess-worker-0|TERM": {stdout: "signaled\n"}, + "okdev-sess-worker-0|KILL": {stdout: "signaled\n"}, + }, + } + client.onExec = func(pod, key string) { + if key != "KILL" { + return + } + client.mu.Lock() + defer client.mu.Unlock() + client.listOutputs[pod] = append(client.listOutputs[pod], detachMetadataJSON("job-shared", pod, "dev", 100, "exited", intPtr(137))) + } + pods := []kube.PodSummary{{Name: "okdev-sess-worker-0", Phase: "Running"}} + var out bytes.Buffer + if err := runJobsStop(context.Background(), client, "default", pods, "dev", "job-shared", pdshDefaultFanout, &out); err != nil { + t.Fatalf("runJobsStop: %v", err) + } + got := out.String() + for _, want := range []string{"sent SIGTERM", "sent SIGKILL"} { + if !strings.Contains(got, want) { + t.Fatalf("expected %q in %q", want, got) + } + } + if len(client.execScripts) < 2 || !strings.Contains(client.execScripts[0], "kill -TERM") || !strings.Contains(client.execScripts[1], "kill -KILL") { + t.Fatalf("expected TERM then KILL, got %+v", client.execScripts) + } +} + +func TestRunJobsStopReturnsPartialFailure(t *testing.T) { + oldPollEvery := detachJobPollEvery + oldStopGrace := detachJobStopGrace + detachJobPollEvery = 5 * time.Millisecond + detachJobStopGrace = 15 * time.Millisecond + defer func() { + detachJobPollEvery = oldPollEvery + detachJobStopGrace = oldStopGrace + }() + + client := &fakeJobsClient{ + listOutputs: map[string][]string{ + "okdev-sess-worker-0": { + detachMetadataJSON("job-shared", "okdev-sess-worker-0", "dev", 100, "running", nil), + }, + }, + execShResponses: map[string]fakeJobsExecResponse{ + "okdev-sess-worker-0|TERM": {err: errors.New("permission denied")}, + "okdev-sess-worker-0|KILL": {err: errors.New("permission denied")}, + }, + } + pods := []kube.PodSummary{{Name: "okdev-sess-worker-0", Phase: "Running"}} + var out bytes.Buffer + err := runJobsStop(context.Background(), client, "default", pods, "dev", "job-shared", pdshDefaultFanout, &out) + if err == nil { + t.Fatal("expected failure") + } + got := out.String() + if !strings.Contains(got, "FAILED:") || !strings.Contains(got, "permission denied") { + t.Fatalf("expected failure footer, got %q", got) + } +} diff --git a/internal/cli/pdsh.go b/internal/cli/pdsh.go index c42e161..1061be2 100644 --- a/internal/cli/pdsh.go +++ b/internal/cli/pdsh.go @@ -608,8 +608,10 @@ func mustDetachJSONTemplateWithExit(v detachMetadata) string { return out } -func newDetachJobSpec(pod, container, cmd string) detachJobSpec { - jobID := newDetachJobID() +func newDetachJobSpec(jobID, pod, container, cmd string) detachJobSpec { + if strings.TrimSpace(jobID) == "" { + jobID = newDetachJobID() + } startedAt := time.Now().UTC().Format(time.RFC3339) logPath := filepath.Join(detachMetadataDir, jobID+".log") metaPath := filepath.Join(detachMetadataDir, jobID+".json") @@ -664,6 +666,7 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str results := make(chan podDetachResult, len(pods)) sem := make(chan struct{}, fanout) + jobID := newDetachJobID() var wg sync.WaitGroup for i, pod := range pods { @@ -672,7 +675,7 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str defer wg.Done() sem <- struct{}{} defer func() { <-sem }() - spec := newDetachJobSpec(pod.Name, container, cmdStr) + spec := newDetachJobSpec(jobID, pod.Name, container, cmdStr) command := detachCommand(spec) var remoteStdout, remoteStderr bytes.Buffer err := connect.RunOnContainer(ctx, client, namespace, pod.Name, container, command, false, nil, &remoteStdout, &remoteStderr) diff --git a/internal/cli/pdsh_test.go b/internal/cli/pdsh_test.go index d8f0dcc..cb5989a 100644 --- a/internal/cli/pdsh_test.go +++ b/internal/cli/pdsh_test.go @@ -227,13 +227,13 @@ func (s *slowExecClient) ExecInteractiveInContainer(ctx context.Context, namespa func TestRunDetachExec(t *testing.T) { launch, _ := json.Marshal(detachLaunchInfo{ - JobID: "job-worker-0", + JobID: "job-shared", PID: 48217, LogPath: "/tmp/okdev-exec/job-worker-0.log", MetaPath: "/tmp/okdev-exec/job-worker-0.json", }) launch2, _ := json.Marshal(detachLaunchInfo{ - JobID: "job-worker-1", + JobID: "job-shared", PID: 49102, LogPath: "/tmp/okdev-exec/job-worker-1.log", MetaPath: "/tmp/okdev-exec/job-worker-1.json", @@ -256,8 +256,8 @@ func TestRunDetachExec(t *testing.T) { } got := stdout.String() for _, want := range []string{ - "worker-0] detached job_id=job-worker-0 pid=48217 log=/tmp/okdev-exec/job-worker-0.log meta=/tmp/okdev-exec/job-worker-0.json", - "worker-1] detached job_id=job-worker-1 pid=49102 log=/tmp/okdev-exec/job-worker-1.log meta=/tmp/okdev-exec/job-worker-1.json", + "worker-0] detached job_id=job-shared pid=48217 log=/tmp/okdev-exec/job-worker-0.log meta=/tmp/okdev-exec/job-worker-0.json", + "worker-1] detached job_id=job-shared pid=49102 log=/tmp/okdev-exec/job-worker-1.log meta=/tmp/okdev-exec/job-worker-1.json", } { if !strings.Contains(got, want) { t.Fatalf("expected detach metadata output %q, got %q", want, got) diff --git a/internal/cli/root.go b/internal/cli/root.go index 3fd9a2a..bcc093c 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -57,6 +57,7 @@ func newRootCmdWithOptions() (*cobra.Command, *Options) { cmd.AddCommand(newTargetCmd(opts)) cmd.AddCommand(newAgentCmd(opts)) cmd.AddCommand(newExecCmd(opts)) + cmd.AddCommand(newJobsCmd(opts)) cmd.AddCommand(newExecJobsCmd(opts)) cmd.AddCommand(newPortForwardCmd(opts)) cmd.AddCommand(newCpCmd(opts)) diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go index 997bbdd..5876326 100644 --- a/internal/cli/root_test.go +++ b/internal/cli/root_test.go @@ -26,7 +26,7 @@ func TestNewRootCmdRegistersExpectedCommandsAndFlags(t *testing.T) { } for _, name := range []string{ "version", "init", "validate", "up", "down", "status", "list", "use", - "target", "agent", "exec", "logs", "ssh", "ssh-proxy", "ports", "port-forward", "sync", "prune", "migrate", "completion", + "target", "agent", "exec", "jobs", "logs", "ssh", "ssh-proxy", "ports", "port-forward", "sync", "prune", "migrate", "completion", } { if _, _, err := cmd.Find([]string{name}); err != nil { t.Fatalf("expected subcommand %q: %v", name, err) diff --git a/internal/cli/timeouts.go b/internal/cli/timeouts.go index 2a6cf61..622f17e 100644 --- a/internal/cli/timeouts.go +++ b/internal/cli/timeouts.go @@ -196,4 +196,10 @@ const ( // Maximum concurrent pod exec operations in multi-pod mode. pdshDefaultFanout = 16 + + // Poll interval for detached job status reconciliation. + detachJobPollInterval = 200 * time.Millisecond + + // Grace period between SIGTERM and SIGKILL when stopping a detached job. + detachJobStopGracePeriod = 10 * time.Second ) From b23b274360fbe74b821960c2db36fb6d57da21e7 Mon Sep 17 00:00:00 2001 From: acmore Date: Sun, 10 May 2026 09:20:15 +0800 Subject: [PATCH 2/2] style: gofmt jobs logs tests --- internal/cli/jobs_logs_wait_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cli/jobs_logs_wait_test.go b/internal/cli/jobs_logs_wait_test.go index e1c3c40..163031d 100644 --- a/internal/cli/jobs_logs_wait_test.go +++ b/internal/cli/jobs_logs_wait_test.go @@ -109,7 +109,7 @@ type fakeJobsClient struct { streamPlans map[string]fakeJobsStreamPlan execShResponses map[string]fakeJobsExecResponse execScripts []string - onExec func(pod, key string) + onExec func(pod, key string) } type fakeJobsStreamPlan struct {