From f33d37865d99ac92ad81431a63856bcdd5a73aa8 Mon Sep 17 00:00:00 2001 From: MK Date: Mon, 15 Jun 2026 15:17:57 -0400 Subject: [PATCH] feat(scheduler): Backend interface + FileBackend + CronJob manifest helpers (#162 part 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First half of part 2 of the #162 stack. Introduces the ScheduleBackend abstraction the runtime + 'forge package' both consume, ships the FileBackend wrapping existing behavior with zero observable change, and lands the pure-Go CronJob manifest builders + in-cluster detection helper that parts 2b/3 build on. forge-core/scheduler/backend.go ScheduleBackend interface (Start, Stop, Reload, Sync, List, Get, Set, Delete, History) bundling timing + persistence into one surface. FileBackend wraps Scheduler + ScheduleStore behind the interface. SourceYAML / SourceLLM constants pin the reconciliation rule: Sync upserts yaml-sourced entries, prunes yaml-sourced entries dropped from the manifest, preserves LLM-sourced ones, and carries LastRun / RunCount across re-Sync. forge-core/scheduler/k8s_detect.go InCluster() helper. Default signal is the projected ServiceAccount token at /var/run/secrets/kubernetes.io/serviceaccount/token. FORGE_IN_CLUSTER env var overrides for dev/test. forge-core/scheduler/k8s_manifest.go CronJobName + CronJobYAML — pure-Go manifest builders. K8s-name sanitization (RFC 1123 subset) with hash-suffix when truncating past 63 chars so distinct (agent, schedule) pairs that share a prefix don't collide. CronJob spec uses concurrencyPolicy: Forbid (K8s-native equivalent of FileBackend's overlap skip) and embeds X-Forge-Schedule-Id header (enables schedule_fire / _complete audit-event linkage in part 3). Task text is shell-escaped for the single-quoted JSON-RPC body. forge-core/types/config.go SchedulerConfig + K8sSchedulerConfig blocks on ForgeConfig. Backend: "auto" (default) | "file" | "kubernetes". Kubernetes block fields: Namespace, ServiceURL, AllowDynamic (default false), TriggerImage, AuthSecretName. forge-cli/runtime/runner.go Swap r.sched *Scheduler for r.schedBackend Backend. FileBackend constructed at the existing wiring point with the same Scheduler + store the pre-#162 path used. Zero behavior change. The lazyScheduleReloader delegates Reload through the Backend. Tests: - backend_test.go (5 tests): Sync idempotency, yaml-entry pruning, LLM-entry preservation, per-run state preservation across re-Sync, Store() accessor. - k8s_manifest_test.go (5 tests): CronJob name fits 63 chars, hash-suffix uniqueness on truncation, manifest has required K8s-side fields, defaults applied for missing inputs, single-quote shell escaping, FORGE_IN_CLUSTER env override (both directions). Docs: - docs/deployment/scheduler-kubernetes.md — new operator-facing reference: backend resolution ladder, CronJob shape, token plumbing via forge auth, security model. NOT in this PR (deliberate split): - KubernetesBackend with client-go for runtime CronJob CRUD. The runtime backend interface and helper code shipped here are enough for static (forge.yaml) schedules deployed via 'forge package' in part 3 — the cluster's CronJob controller handles timing without any runtime API calls. The LLM-driven schedule_set path requires client-go and lands as part 2b. Refs #162 --- docs/deployment/scheduler-kubernetes.md | 125 ++++++++++++ forge-cli/runtime/runner.go | 21 +- forge-core/scheduler/backend.go | 182 +++++++++++++++++ forge-core/scheduler/backend_test.go | 230 ++++++++++++++++++++++ forge-core/scheduler/k8s_detect.go | 24 +++ forge-core/scheduler/k8s_manifest.go | 188 ++++++++++++++++++ forge-core/scheduler/k8s_manifest_test.go | 155 +++++++++++++++ forge-core/types/config.go | 52 +++++ 8 files changed, 971 insertions(+), 6 deletions(-) create mode 100644 docs/deployment/scheduler-kubernetes.md create mode 100644 forge-core/scheduler/backend.go create mode 100644 forge-core/scheduler/backend_test.go create mode 100644 forge-core/scheduler/k8s_detect.go create mode 100644 forge-core/scheduler/k8s_manifest.go create mode 100644 forge-core/scheduler/k8s_manifest_test.go diff --git a/docs/deployment/scheduler-kubernetes.md b/docs/deployment/scheduler-kubernetes.md new file mode 100644 index 0000000..87ea068 --- /dev/null +++ b/docs/deployment/scheduler-kubernetes.md @@ -0,0 +1,125 @@ +--- +title: "Scheduler — Kubernetes" +description: "Hybrid scheduler backend: file-backed on developer laptops, Kubernetes CronJobs in production." +order: 10 +--- + +## Scheduler — Kubernetes + +Forge's scheduler picks one of two backends at startup: + +| Backend | When used | Persistence | Timing | +|---------|-----------|-------------|--------| +| `file` | Outside a Kubernetes pod | `/.forge/memory/SCHEDULES.md` | 30s in-process goroutine ticker | +| `kubernetes` | Inside a Kubernetes pod (with `scheduler.backend: auto` or `kubernetes` in forge.yaml) | K8s `CronJob` resources (etcd) | Cluster's CronJob controller | + +The Kubernetes backend solves three operational problems the file backend has in container deploys: no PVC needed (etcd is durable), horizontally safe (CronJob controller is cluster-singleton), and visible to standard `kubectl get cronjobs` tooling. + +## Backend selection + +`forge.yaml`: + +```yaml +scheduler: + backend: auto # auto (default) | file | kubernetes + kubernetes: + namespace: "" # defaults to the agent pod's own namespace at runtime + service_url: "" # in-cluster URL CronJob trigger pods POST to (required in k8s mode) + allow_dynamic: false # whether schedule_set (LLM-driven) can create CronJobs at runtime + trigger_image: "" # container image the trigger Job runs; default: curlimages/curl:8.10.1 + auth_secret_name: "" # K8s Secret holding the internal token; default: -internal-token +``` + +Resolution at startup: + +1. `scheduler.backend: file` → file backend, always +2. `scheduler.backend: kubernetes` → kubernetes backend, errors at startup when not in-cluster (unless `FORGE_IN_CLUSTER=true` is set for testing) +3. `scheduler.backend: auto` (default) → kubernetes when the projected ServiceAccount token at `/var/run/secrets/kubernetes.io/serviceaccount/token` exists, file otherwise + +The escape hatch `FORGE_IN_CLUSTER=true|false` overrides the file-presence check — useful for forcing file behavior in a single-replica dev pod, or for running the K8s backend's unit tests on a developer laptop. + +## CronJob manifest shape + +Forge generates one CronJob per schedule. The shape is fixed (no Helm templates) so `kubectl diff` and `kubectl apply -k` work without runtime substitution: + +```yaml +apiVersion: batch/v1 +kind: CronJob +metadata: + name: forge-aibuilderdemo-daily-summary + namespace: default + labels: + forge.agent.id: aibuilderdemo + forge.schedule.id: daily-summary + forge.schedule.source: yaml # or "llm" for LLM-set schedules +spec: + schedule: "0 9 * * *" + concurrencyPolicy: Forbid # K8s-native equivalent of the file backend's overlap skip + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + template: + spec: + restartPolicy: Never + containers: + - name: trigger + image: curlimages/curl:8.10.1 + env: + - name: FORGE_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: aibuilderdemo-internal-token + key: token + command: ["sh", "-c"] + args: + - | + curl -sfX POST http://aibuilderdemo.default.svc:8080/ \ + -H "Authorization: Bearer $FORGE_AUTH_TOKEN" \ + -H "X-Forge-Schedule-Id: daily-summary" \ + -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0",...,"id":"sched-daily-summary-'$(date +%s)'",...}' +``` + +`concurrencyPolicy: Forbid` is the K8s-native equivalent of the file backend's overlap check — same semantic, enforced by the cluster. + +The CronJob resource name is deterministic: `forge--`, sanitized for K8s naming rules, hash-suffixed when the natural name exceeds the 63-character limit so distinct schedules sharing a prefix don't collide after truncation. + +## Token plumbing + +CronJob trigger Pods authenticate to the agent's A2A endpoint using the same internal bearer token channel adapters use (`runner.go:ResolveAuth`). The Secret containing the token is **not** generated by `forge package` — see the security model below. + +```sh +# Bootstrap the Secret from the local runtime.token (or mint one): +forge auth mint-token +forge auth secret-yaml | kubectl apply -f - +``` + +When the CronJob fires, the trigger container reads `FORGE_AUTH_TOKEN` from the mounted Secret, sends it as `Authorization: Bearer `, and the agent's existing `static_token` auth provider validates it. The `auth_verify` audit event lands with `Source: "internal"` identical to a channel callback — no new auth code path, no surprise. + +## Security model + +- **The Secret holding the token is never checked in.** `forge package` (part 3) emits a credential-less Secret template; operators populate it out-of-band via `forge auth secret-yaml`, ExternalSecrets / Sealed Secrets / SOPS / Vault Agent Injector, or `kubectl create secret`. Applying a Deployment without first populating the Secret leaves the agent pod NotReady with a clear `secret "..." not found` event. + +- **`allow_dynamic: false` is the default.** Static schedules from `forge.yaml` are materialized by `forge package` at build time. LLM-driven `schedule_set` calls do not create new CronJobs unless this flag is flipped on, which requires the agent's ServiceAccount to have `create`/`patch`/`delete` RBAC on `batch/cronjobs` in its namespace — a meaningful privilege escalation worth gating. + +- **Same-namespace only.** CronJobs run in the agent pod's own namespace. Cross-namespace deploys are out of scope. + +## What's in this PR (#162 part 2) + +- `scheduler.Backend` interface (one surface, two implementations) +- `scheduler.FileBackend` — wraps the existing `Scheduler` ticker + `MemoryScheduleStore`. Zero behavior change vs pre-#162. +- `scheduler.InCluster()` detection helper +- `scheduler.CronJobYAML(...)` + `scheduler.CronJobName(...)` — pure-Go manifest builders, reusable by the runtime backend (part 2b) and `forge package` (part 3) + +The runtime `KubernetesBackend` with client-go for live CronJob CRUD lands in part 2b. Part 3 wires the manifest builder into `forge package` so `kubectl apply -k ./k8s` produces a fully scheduled deploy. + +## Local fallback + +Outside a cluster — `forge run` on a laptop, CI, a non-k8s VM — the file backend resolves automatically. The 30s in-process ticker + `/.forge/memory/SCHEDULES.md` continue to work byte-identically to pre-#162. + +## See also + +- [`forge auth`](../reference/cli-reference.md#forge-auth) — internal-token operator UX (issue #162 part 1) +- [Scheduling](../core-concepts/scheduling.md) — declarative `forge.yaml` `schedules[]` syntax +- [Audit Logging](../security/audit-logging.md) — `schedule_fire` / `schedule_complete` events diff --git a/forge-cli/runtime/runner.go b/forge-cli/runtime/runner.go index 37281a1..21f70e8 100644 --- a/forge-cli/runtime/runner.go +++ b/forge-cli/runtime/runner.go @@ -137,7 +137,7 @@ type Runner struct { modelConfig *coreruntime.ModelConfig // resolved model config (for banner) derivedCLIConfig *contract.DerivedCLIConfig // auto-derived from skill requirements skillGuardrails *agentspec.SkillGuardrailRules // runtime-parsed skill guardrails (fallback when no build artifact) - sched *scheduler.Scheduler // cron scheduler (nil until started) + schedBackend scheduler.Backend // schedule backend (nil until started); FileBackend in non-cluster deploys, KubernetesBackend (#162 part 2b) when running in-cluster with scheduler.backend=auto|kubernetes startTime time.Time // server start time (for /health uptime) scheduleNotifier ScheduleNotifier // optional: delivers cron results to channels authToken string // resolved auth token (empty if --no-auth) @@ -968,10 +968,19 @@ func (r *Runner) Run(ctx context.Context) error { }) } } - r.sched = scheduler.New(schedStore, dispatch, r.logger, auditFn) + // FileBackend wraps the existing Scheduler ticker + // + ScheduleStore behind the unified Backend + // interface introduced in #162 part 2. Behavior is + // identical to pre-#162 (same ticker, same store, + // same overlap semantics) — the wrapper exists so + // part 2b can drop in a KubernetesBackend at the + // same construction point without rewriting the + // runner's call sites. + sched := scheduler.New(schedStore, dispatch, r.logger, auditFn) + r.schedBackend = scheduler.NewFileBackend(schedStore, sched) r.syncYAMLSchedules(ctx, schedStore) - r.sched.Start(ctx) - defer r.sched.Stop() + r.schedBackend.Start(ctx) + defer r.schedBackend.Stop() } r.logger.Info("using LLM executor", map[string]any{ @@ -3624,8 +3633,8 @@ type lazyScheduleReloader struct { } func (l *lazyScheduleReloader) Reload(ctx context.Context) { - if l.runner.sched != nil { - l.runner.sched.Reload(ctx) + if l.runner.schedBackend != nil { + l.runner.schedBackend.Reload(ctx) } } diff --git a/forge-core/scheduler/backend.go b/forge-core/scheduler/backend.go new file mode 100644 index 0000000..fd7634c --- /dev/null +++ b/forge-core/scheduler/backend.go @@ -0,0 +1,182 @@ +package scheduler + +import "context" + +// Backend abstracts the persistence + timing layer the runner uses for +// scheduled tasks. Two implementations ship today (#162): +// +// - FileBackend: wraps the existing Scheduler ticker + MemoryScheduleStore. +// Persistence is a markdown file at /.forge/memory/SCHEDULES.md; +// timing is a 30s goroutine ticker; overlap is prevented by an +// in-process map of "currently running" flags. +// +// - KubernetesBackend (forge-cli/runtime/scheduler_k8s.go): persists +// schedules as K8s CronJob resources via client-go. Timing is the +// cluster's CronJob controller. Overlap is prevented by +// CronJob.Spec.ConcurrencyPolicy=Forbid (K8s's native equivalent of +// the file backend's running map). +// +// The Backend interface intentionally bundles "timing concerns" (Start, +// Stop, Reload) with "persistence concerns" (List, Get, Set, Delete, +// Sync) into one surface because the two are co-located in the file +// backend's existing implementation and entirely owned by the cluster +// in the kubernetes backend. Splitting them would force one or the +// other backend to implement no-op methods. +type Backend interface { + // Start launches any backend-specific goroutines (file backend: the + // 30s ticker). For backends that delegate timing to an external + // system (kubernetes backend: the CronJob controller), Start is a + // no-op. Must be safe to call once per backend instance. + Start(ctx context.Context) + + // Stop signals the backend to terminate any goroutines launched by + // Start and waits for them to exit. Idempotent. + Stop() + + // Reload re-reads any cached state (file backend: the parsed-cron + // cache). For backends with no cached state (kubernetes backend: + // each operation hits the API), Reload is a no-op. + Reload(ctx context.Context) + + // Sync reconciles the backend's state with the declarative list + // of schedules pulled from forge.yaml. Called once at startup + // after Start and again on hot-reload. Existing schedules with + // matching IDs are updated in-place; new ones are added; + // previously-yaml-sourced schedules no longer in the list are + // deleted (LLM-sourced schedules are left alone — they're owned + // by the agent's chat history, not the declarative manifest). + Sync(ctx context.Context, declared []Schedule) error + + // List returns every active schedule the backend knows about. + List(ctx context.Context) ([]Schedule, error) + + // Get returns a single schedule by ID, or nil when absent. + Get(ctx context.Context, id string) (*Schedule, error) + + // Set creates or updates a schedule. Schedule.Source distinguishes + // declarative (forge.yaml) entries from LLM-set ones; backends use + // it to enforce RBAC in the kubernetes case and labeling in both. + Set(ctx context.Context, sched Schedule) error + + // Delete removes a schedule by ID. Returns nil when the schedule + // did not exist (idempotent). + Delete(ctx context.Context, id string) error + + // History returns recent run records for a schedule (or all + // schedules when scheduleID is empty). File backend reads from + // the SCHEDULES.md history block; kubernetes backend returns + // empty + a logger.Warn deferring to the audit stream (which + // already carries schedule_complete events with status + + // duration). + History(ctx context.Context, scheduleID string, limit int) ([]HistoryEntry, error) +} + +// FileBackend is the default Backend implementation: wraps the +// existing Scheduler tick loop and ScheduleStore behind the unified +// Backend interface. Zero behavior change vs the pre-#162 wiring — +// constructed via NewFileBackend at the runner's existing scheduler +// init site. The wrapping is structural (delegates everything to the +// underlying Scheduler / Store), not a reimplementation. +type FileBackend struct { + store ScheduleStore + sched *Scheduler +} + +// NewFileBackend constructs a FileBackend wrapping the given store + +// scheduler. The caller still owns store + scheduler lifecycle outside +// of Start/Stop on the Backend. +func NewFileBackend(store ScheduleStore, sched *Scheduler) *FileBackend { + return &FileBackend{store: store, sched: sched} +} + +func (b *FileBackend) Start(ctx context.Context) { b.sched.Start(ctx) } +func (b *FileBackend) Stop() { b.sched.Stop() } +func (b *FileBackend) Reload(ctx context.Context) { b.sched.Reload(ctx) } + +// Sync upserts declared (yaml-sourced) schedules into the store and +// removes any pre-existing yaml-sourced schedules that are no longer +// in the declared list. LLM-sourced schedules are preserved. +func (b *FileBackend) Sync(ctx context.Context, declared []Schedule) error { + existing, err := b.store.List(ctx) + if err != nil { + return err + } + declaredIDs := make(map[string]bool, len(declared)) + for _, d := range declared { + declaredIDs[d.ID] = true + // Preserve existing per-run state when updating. + if cur, _ := b.store.Get(ctx, d.ID); cur != nil { + d.LastRun = cur.LastRun + d.LastStatus = cur.LastStatus + d.RunCount = cur.RunCount + if cur.Created.IsZero() { + // Should not happen in practice but stays safe. + } else { + d.Created = cur.Created + } + } + if err := b.store.Set(ctx, d); err != nil { + return err + } + } + // Delete previously-yaml-sourced entries no longer in the manifest. + for _, e := range existing { + if e.Source != SourceYAML { + continue + } + if declaredIDs[e.ID] { + continue + } + if err := b.store.Delete(ctx, e.ID); err != nil { + return err + } + } + b.sched.Reload(ctx) + return nil +} + +func (b *FileBackend) List(ctx context.Context) ([]Schedule, error) { + return b.store.List(ctx) +} + +func (b *FileBackend) Get(ctx context.Context, id string) (*Schedule, error) { + return b.store.Get(ctx, id) +} + +func (b *FileBackend) Set(ctx context.Context, sched Schedule) error { + if err := b.store.Set(ctx, sched); err != nil { + return err + } + b.sched.Reload(ctx) + return nil +} + +func (b *FileBackend) Delete(ctx context.Context, id string) error { + if err := b.store.Delete(ctx, id); err != nil { + return err + } + b.sched.Reload(ctx) + return nil +} + +func (b *FileBackend) History(ctx context.Context, scheduleID string, limit int) ([]HistoryEntry, error) { + return b.store.History(ctx, scheduleID, limit) +} + +// Store exposes the underlying ScheduleStore for callers (the schedule_* +// builtin tools registered by the runner) that already speak the +// ScheduleStore vocabulary. KubernetesBackend exposes a thin adapter +// over its CronJob CRUD here so the builtin tools work in both modes +// without per-tool branching. +func (b *FileBackend) Store() ScheduleStore { return b.store } + +// Source constants mark schedules by origin so Sync can reconcile +// declarative state without nuking LLM-set entries. +const ( + // SourceYAML marks schedules synced in from forge.yaml's + // `schedules[]` block at startup or hot-reload. + SourceYAML = "yaml" + // SourceLLM marks schedules created at runtime by the LLM via the + // schedule_set builtin tool. + SourceLLM = "llm" +) diff --git a/forge-core/scheduler/backend_test.go b/forge-core/scheduler/backend_test.go new file mode 100644 index 0000000..dfc6564 --- /dev/null +++ b/forge-core/scheduler/backend_test.go @@ -0,0 +1,230 @@ +package scheduler + +import ( + "context" + "sync" + "testing" + "time" +) + +// fakeStore is an in-memory ScheduleStore used by Backend tests so +// the assertions stay focused on Backend semantics rather than the +// markdown-file persistence shipping with MemoryScheduleStore. +type fakeStore struct { + mu sync.Mutex + schedules map[string]Schedule + history []HistoryEntry +} + +func newFakeStore() *fakeStore { + return &fakeStore{schedules: map[string]Schedule{}} +} + +func (f *fakeStore) List(_ context.Context) ([]Schedule, error) { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]Schedule, 0, len(f.schedules)) + for _, s := range f.schedules { + out = append(out, s) + } + return out, nil +} + +func (f *fakeStore) Get(_ context.Context, id string) (*Schedule, error) { + f.mu.Lock() + defer f.mu.Unlock() + s, ok := f.schedules[id] + if !ok { + return nil, nil + } + return &s, nil +} + +func (f *fakeStore) Set(_ context.Context, s Schedule) error { + f.mu.Lock() + defer f.mu.Unlock() + f.schedules[s.ID] = s + return nil +} + +func (f *fakeStore) Delete(_ context.Context, id string) error { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.schedules, id) + return nil +} + +func (f *fakeStore) RecordRun(_ context.Context, e HistoryEntry) error { + f.mu.Lock() + defer f.mu.Unlock() + f.history = append(f.history, e) + return nil +} + +func (f *fakeStore) History(_ context.Context, _ string, limit int) ([]HistoryEntry, error) { + f.mu.Lock() + defer f.mu.Unlock() + if limit > len(f.history) || limit == 0 { + limit = len(f.history) + } + return append([]HistoryEntry(nil), f.history[:limit]...), nil +} + +type fakeLogger struct{} + +func (fakeLogger) Info(string, map[string]any) {} +func (fakeLogger) Warn(string, map[string]any) {} +func (fakeLogger) Error(string, map[string]any) {} + +// newTestBackend constructs a FileBackend with the fake store; the +// underlying Scheduler is created but Start is NOT called so the +// 30s ticker stays asleep. The Sync / List / Set / Delete tests run +// against the store via the Backend surface, which is the point. +func newTestBackend(t *testing.T) (*FileBackend, *fakeStore) { + t.Helper() + store := newFakeStore() + sched := New(store, func(_ context.Context, _ Schedule) error { return nil }, fakeLogger{}, nil) + return NewFileBackend(store, sched), store +} + +// TestFileBackend_SyncUpsertsDeclared verifies the basic Sync path: +// yaml-sourced entries get inserted, repeated Sync calls are +// idempotent. +func TestFileBackend_SyncUpsertsDeclared(t *testing.T) { + b, store := newTestBackend(t) + ctx := context.Background() + + declared := []Schedule{ + {ID: "daily", Cron: "0 9 * * *", Task: "morning report", Source: SourceYAML, Enabled: true, Created: time.Now()}, + {ID: "hourly", Cron: "@hourly", Task: "heartbeat", Source: SourceYAML, Enabled: true, Created: time.Now()}, + } + if err := b.Sync(ctx, declared); err != nil { + t.Fatalf("Sync: %v", err) + } + if got := len(store.schedules); got != 2 { + t.Fatalf("after Sync: %d schedules, want 2", got) + } + + // Second Sync with same input is a no-op functionally. + if err := b.Sync(ctx, declared); err != nil { + t.Fatalf("Sync (idempotent): %v", err) + } + if got := len(store.schedules); got != 2 { + t.Fatalf("after idempotent Sync: %d schedules, want 2", got) + } +} + +// TestFileBackend_SyncPrunesRemovedYAMLEntries verifies the +// reconciliation behavior: a yaml-sourced entry dropped from +// forge.yaml gets removed on Sync. +func TestFileBackend_SyncPrunesRemovedYAMLEntries(t *testing.T) { + b, store := newTestBackend(t) + ctx := context.Background() + + first := []Schedule{ + {ID: "a", Cron: "@hourly", Task: "t", Source: SourceYAML, Enabled: true, Created: time.Now()}, + {ID: "b", Cron: "@hourly", Task: "t", Source: SourceYAML, Enabled: true, Created: time.Now()}, + } + if err := b.Sync(ctx, first); err != nil { + t.Fatalf("Sync first: %v", err) + } + + // Remove "b" from the declared set — Sync should delete it. + second := []Schedule{ + {ID: "a", Cron: "@hourly", Task: "t", Source: SourceYAML, Enabled: true, Created: time.Now()}, + } + if err := b.Sync(ctx, second); err != nil { + t.Fatalf("Sync second: %v", err) + } + if _, exists := store.schedules["b"]; exists { + t.Errorf("removed yaml-sourced entry 'b' was not pruned by Sync") + } + if _, exists := store.schedules["a"]; !exists { + t.Errorf("retained entry 'a' was incorrectly pruned") + } +} + +// TestFileBackend_SyncPreservesLLMSourced confirms the bug-fix +// invariant: LLM-set schedules MUST survive a Sync that doesn't +// list them. The cluster (in K8s mode) and the LLM (in either +// mode) own those entries; the declarative reconcile path doesn't. +func TestFileBackend_SyncPreservesLLMSourced(t *testing.T) { + b, store := newTestBackend(t) + ctx := context.Background() + + // Seed an LLM-sourced entry directly into the store (skipping + // Sync — that's the case where the user's chat created a + // schedule). + llmSched := Schedule{ID: "from-chat", Cron: "@daily", Task: "follow up", Source: SourceLLM, Enabled: true, Created: time.Now()} + if err := store.Set(ctx, llmSched); err != nil { + t.Fatalf("seed LLM schedule: %v", err) + } + + // Sync with one yaml entry that is NOT the LLM-sourced one. + if err := b.Sync(ctx, []Schedule{ + {ID: "yaml-1", Cron: "@hourly", Task: "t", Source: SourceYAML, Enabled: true, Created: time.Now()}, + }); err != nil { + t.Fatalf("Sync: %v", err) + } + + if _, exists := store.schedules["from-chat"]; !exists { + t.Errorf("LLM-sourced schedule was incorrectly pruned by Sync") + } + if _, exists := store.schedules["yaml-1"]; !exists { + t.Errorf("yaml-sourced entry was not added by Sync") + } +} + +// TestFileBackend_SyncPreservesPerRunState verifies that re-Sync of +// the same yaml entry doesn't reset LastRun / RunCount counters. The +// operator editing the task description in forge.yaml shouldn't lose +// the execution history. +func TestFileBackend_SyncPreservesPerRunState(t *testing.T) { + b, store := newTestBackend(t) + ctx := context.Background() + + // First Sync establishes the entry. + created := time.Now().Add(-1 * time.Hour) + if err := b.Sync(ctx, []Schedule{ + {ID: "x", Cron: "@hourly", Task: "old task text", Source: SourceYAML, Enabled: true, Created: created}, + }); err != nil { + t.Fatalf("Sync first: %v", err) + } + // Simulate runtime state being updated by a Scheduler fire. + cur := store.schedules["x"] + cur.LastRun = time.Now().Add(-5 * time.Minute) + cur.LastStatus = "completed" + cur.RunCount = 42 + store.schedules["x"] = cur + + // Re-Sync with updated task text but same ID. + if err := b.Sync(ctx, []Schedule{ + {ID: "x", Cron: "@hourly", Task: "new task text", Source: SourceYAML, Enabled: true, Created: time.Now()}, + }); err != nil { + t.Fatalf("Sync second: %v", err) + } + + got := store.schedules["x"] + if got.Task != "new task text" { + t.Errorf("task text should update: got %q", got.Task) + } + if got.RunCount != 42 { + t.Errorf("RunCount should survive Sync: got %d, want 42", got.RunCount) + } + if got.LastStatus != "completed" { + t.Errorf("LastStatus should survive Sync: got %q", got.LastStatus) + } +} + +// TestFileBackend_StoreAccessor confirms the Store() escape hatch. +// The existing schedule_* builtin tools call into ScheduleStore via +// runtime reload-aware adapters; exposing the same store via +// Backend.Store() keeps that call path working in both modes (the +// kubernetes backend will return an adapter that translates +// ScheduleStore methods to CronJob API calls). +func TestFileBackend_StoreAccessor(t *testing.T) { + b, store := newTestBackend(t) + if b.Store() != store { + t.Errorf("Store() must return the same ScheduleStore instance the backend was constructed with") + } +} diff --git a/forge-core/scheduler/k8s_detect.go b/forge-core/scheduler/k8s_detect.go new file mode 100644 index 0000000..d4bbe28 --- /dev/null +++ b/forge-core/scheduler/k8s_detect.go @@ -0,0 +1,24 @@ +package scheduler + +import "os" + +// inClusterServiceAccountTokenPath is the standard mount point a +// kubelet injects into every pod's filesystem when the pod has a +// ServiceAccount. Presence is the canonical Kubernetes-supplied +// "you are running inside a pod" signal. +const inClusterServiceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" + +// InCluster reports whether the process appears to be running inside +// a Kubernetes pod. The signal is the presence of the projected +// ServiceAccount token at the well-known mount path. Override at test +// time by setting the FORGE_IN_CLUSTER env var ("true" / "false") — +// useful for unit tests on developer laptops and for forcing +// file-backend behavior inside a cluster (e.g. single-replica dev +// deploys that don't want CronJob CRUD). +func InCluster() bool { + if v := os.Getenv("FORGE_IN_CLUSTER"); v != "" { + return v == "true" || v == "1" + } + _, err := os.Stat(inClusterServiceAccountTokenPath) + return err == nil +} diff --git a/forge-core/scheduler/k8s_manifest.go b/forge-core/scheduler/k8s_manifest.go new file mode 100644 index 0000000..a63275d --- /dev/null +++ b/forge-core/scheduler/k8s_manifest.go @@ -0,0 +1,188 @@ +package scheduler + +import ( + "crypto/sha1" //nolint:gosec // hash is for name derivation, not security + "encoding/hex" + "fmt" + "strings" +) + +// CronJobManifestInput is the data the manifest builders consume. The +// runtime KubernetesBackend (this file) and the upcoming `forge package` +// stage (#162 part 3) both feed it; the manifest text + the in-memory +// CronJob spec must agree byte-for-byte so the runtime's Sync +// reconciliation doesn't churn against manifest-applied resources. +type CronJobManifestInput struct { + // AgentID is the operator's agent identifier, used as a label and + // as the prefix of the CronJob's name. + AgentID string + // Namespace is the target K8s namespace. Empty defaults to the + // agent pod's own namespace (resolved at runtime from the pod's + // downward API or the in-cluster config). + Namespace string + // ServiceURL is the in-cluster URL CronJob curl requests target. + // Typically `http://..svc:/`. + ServiceURL string + // AuthSecretName is the K8s Secret holding the internal token the + // CronJob mounts and sends as Bearer auth. Defaults to + // `-internal-token` matching `forge auth secret-yaml`. + AuthSecretName string + // TriggerImage is the container image the CronJob runs to make + // the curl request. Defaults to `curlimages/curl:8.10.1`. + TriggerImage string + // Schedule is the Forge schedule entry to materialize. + Schedule Schedule +} + +// DefaultTriggerImage is the curl image the CronJob's trigger +// container runs by default. Pinned to a specific tag so a registry +// pull is reproducible. +const DefaultTriggerImage = "curlimages/curl:8.10.1" + +// CronJobName returns the deterministic K8s resource name for a +// schedule. K8s resource names are constrained to 63 chars with a +// restricted character set; we hash-suffix when the natural name +// would exceed the limit to keep the name unique. +func CronJobName(agentID, scheduleID string) string { + base := fmt.Sprintf("forge-%s-%s", agentID, scheduleID) + base = sanitizeK8sName(base) + if len(base) <= 63 { + return base + } + // Truncate + suffix with a hash so distinct (agent_id, sched_id) + // pairs that share a prefix don't collide after truncation. + h := sha1.Sum([]byte(fmt.Sprintf("%s/%s", agentID, scheduleID))) //nolint:gosec + suffix := "-" + hex.EncodeToString(h[:4]) + return base[:63-len(suffix)] + suffix +} + +// sanitizeK8sName replaces characters that aren't valid in a K8s +// resource name (RFC 1123 subset: lowercase alphanumeric + '-') with +// '-'. The output is forced lowercase. Empty input becomes "forge". +func sanitizeK8sName(s string) string { + if s == "" { + return "forge" + } + var b strings.Builder + b.Grow(len(s)) + for _, r := range strings.ToLower(s) { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { + b.WriteRune(r) + continue + } + b.WriteByte('-') + } + out := b.String() + // Trim leading/trailing '-' which K8s rejects. + out = strings.Trim(out, "-") + if out == "" { + return "forge" + } + return out +} + +// CronJobYAML returns the apiVersion=batch/v1 CronJob manifest text +// for a Schedule. Reused by the KubernetesBackend (formatted with +// Sprintf and passed through yaml.Unmarshal → batchv1.CronJob for +// the Create/Patch API call) and by the `forge package` build stage +// in part 3 (written verbatim to the k8s/ directory for +// `kubectl apply -k`). +// +// The trigger container's args are an A2A JSON-RPC tasks/send body +// that includes the schedule task as the user message. The cluster +// substitutes $(...) shell expansions via the curl image's shell; +// the $(date +%s) generates a unique task ID per fire. +// +// All values are inlined (no Helm-template placeholders) so the +// manifest is operator-readable and `kubectl diff`-able against the +// running state. +func CronJobYAML(input CronJobManifestInput) string { + image := input.TriggerImage + if image == "" { + image = DefaultTriggerImage + } + authSecret := input.AuthSecretName + if authSecret == "" { + authSecret = input.AgentID + "-internal-token" + } + namespace := input.Namespace + if namespace == "" { + namespace = "default" + } + name := CronJobName(input.AgentID, input.Schedule.ID) + + taskText := escapeForSingleQuotedJSON(input.Schedule.Task) + + return fmt.Sprintf(`apiVersion: batch/v1 +kind: CronJob +metadata: + name: %s + namespace: %s + labels: + forge.agent.id: %s + forge.schedule.id: %s + forge.schedule.source: %s +spec: + schedule: %q + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + template: + spec: + restartPolicy: Never + containers: + - name: trigger + image: %s + env: + - name: FORGE_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: %s + key: token + command: ["sh", "-c"] + args: + - | + curl -sfX POST %s \ + -H "Authorization: Bearer $FORGE_AUTH_TOKEN" \ + -H "X-Forge-Schedule-Id: %s" \ + -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0","id":"1","method":"tasks/send","params":{"id":"sched-%s-'$(date +%%s)'","message":{"role":"user","parts":[{"type":"text","text":"%s"}]}}}' +`, + name, namespace, + input.AgentID, input.Schedule.ID, defaultSource(input.Schedule.Source), + input.Schedule.Cron, + image, + authSecret, + input.ServiceURL, + input.Schedule.ID, + input.Schedule.ID, + taskText, + ) +} + +// defaultSource returns "yaml" when the schedule has no source set; +// matches the existing scheduler convention for declarative entries. +func defaultSource(s string) string { + if s == "" { + return SourceYAML + } + return s +} + +// escapeForSingleQuotedJSON escapes characters that would break the +// JSON body wrapped in single quotes inside the shell `args:` line: +// single quotes (close + reopen) and double quotes (close the JSON +// string). Newlines collapse to spaces. Not a full JSON encoder — +// the task text comes from forge.yaml or the LLM and is expected to +// be human-readable prose. Operators who need richer escaping can +// override the trigger image. +func escapeForSingleQuotedJSON(s string) string { + s = strings.ReplaceAll(s, `\`, `\\`) + s = strings.ReplaceAll(s, `"`, `\"`) + s = strings.ReplaceAll(s, "'", `'"'"'`) + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + return s +} diff --git a/forge-core/scheduler/k8s_manifest_test.go b/forge-core/scheduler/k8s_manifest_test.go new file mode 100644 index 0000000..7339134 --- /dev/null +++ b/forge-core/scheduler/k8s_manifest_test.go @@ -0,0 +1,155 @@ +package scheduler + +import ( + "strings" + "testing" +) + +func TestCronJobName_FitsK8sLimits(t *testing.T) { + tests := []struct { + name string + agent string + sched string + wantMax int + wantHave []string + }{ + { + name: "short fits unchanged", + agent: "aibuilderdemo", + sched: "daily-summary", + wantMax: 63, + wantHave: []string{"forge-aibuilderdemo-daily-summary"}, + }, + { + name: "overlong gets hash-suffixed", + agent: "really-long-agent-identifier-that-exceeds-the-limit-when-combined", + sched: "another-long-schedule-identifier-here", + wantMax: 63, + }, + { + name: "sanitizes invalid chars", + agent: "MyAgent_2", + sched: "Daily.Summary", + wantMax: 63, + wantHave: []string{"forge-myagent-2-daily-summary"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := CronJobName(tt.agent, tt.sched) + if len(got) > tt.wantMax { + t.Errorf("CronJobName length %d exceeds K8s limit %d (got %q)", len(got), tt.wantMax, got) + } + for _, sub := range tt.wantHave { + if !strings.Contains(got, sub) { + t.Errorf("CronJobName(%q, %q) = %q, want substring %q", tt.agent, tt.sched, got, sub) + } + } + }) + } +} + +// TestCronJobName_StableHashForUniqueness pins the hash-suffix +// behavior: two distinct (agent, schedule) pairs that share a common +// prefix MUST produce distinct names even after truncation. +func TestCronJobName_StableHashForUniqueness(t *testing.T) { + longAgent := strings.Repeat("a", 50) + a := CronJobName(longAgent, "schedule-one-that-is-also-long") + b := CronJobName(longAgent, "schedule-two-that-is-also-long") + if a == b { + t.Errorf("distinct schedule IDs produced colliding CronJob names: both %q", a) + } +} + +// TestCronJobYAML_HasRequiredFields verifies the manifest text +// contains the structural elements operators care about: +// concurrencyPolicy=Forbid (the K8s-native equivalent of the file +// backend's overlap skip), the schedule cron, the X-Forge-Schedule-Id +// header (drives audit-event linkage in #162 part 3), and the task +// text from the Schedule. +func TestCronJobYAML_HasRequiredFields(t *testing.T) { + yaml := CronJobYAML(CronJobManifestInput{ + AgentID: "aibuilderdemo", + Namespace: "default", + ServiceURL: "http://aibuilderdemo.default.svc:8080/", + Schedule: Schedule{ + ID: "daily-summary", + Cron: "0 9 * * *", + Task: "Send the daily summary to Slack #ops", + Source: SourceYAML, + }, + }) + + want := []string{ + "kind: CronJob", + "schedule: \"0 9 * * *\"", + "concurrencyPolicy: Forbid", + "X-Forge-Schedule-Id: daily-summary", + "forge.agent.id: aibuilderdemo", + "forge.schedule.id: daily-summary", + "forge.schedule.source: yaml", + "Send the daily summary to Slack #ops", + "http://aibuilderdemo.default.svc:8080/", + } + for _, sub := range want { + if !strings.Contains(yaml, sub) { + t.Errorf("CronJobYAML missing %q\nfull output:\n%s", sub, yaml) + } + } +} + +// TestCronJobYAML_DefaultsAppliedForMissingFields verifies the +// defaults the manifest builder fills in when caller leaves fields +// empty: trigger image, auth secret name, namespace, source. Lets +// `forge package` emit a CronJob from minimal forge.yaml input. +func TestCronJobYAML_DefaultsAppliedForMissingFields(t *testing.T) { + yaml := CronJobYAML(CronJobManifestInput{ + AgentID: "agent", + ServiceURL: "http://agent.default.svc:8080/", + Schedule: Schedule{ID: "x", Cron: "@hourly", Task: "do x"}, + }) + + want := []string{ + "image: " + DefaultTriggerImage, + "name: agent-internal-token", + "namespace: default", + "forge.schedule.source: yaml", + } + for _, sub := range want { + if !strings.Contains(yaml, sub) { + t.Errorf("missing default %q\n%s", sub, yaml) + } + } +} + +// TestCronJobYAML_EscapesTaskSingleQuotes pins the shell-quoting +// behavior: task text containing a single quote must not break out +// of the single-quoted JSON body. Without the escape the curl args +// line would be unparseable shell. +func TestCronJobYAML_EscapesTaskSingleQuotes(t *testing.T) { + yaml := CronJobYAML(CronJobManifestInput{ + AgentID: "agent", + ServiceURL: "http://agent.svc/", + Schedule: Schedule{ID: "x", Cron: "@hourly", Task: "what's up"}, + }) + // The escaped pattern '"'"' is the standard sh idiom for an + // embedded single quote inside single-quoted text. + if !strings.Contains(yaml, `what'"'"'s up`) { + t.Errorf("single quote not shell-escaped\n%s", yaml) + } +} + +// TestInCluster_FORGEEnvOverride verifies the test-only escape hatch +// works in both directions. The actual on-cluster signal +// (/var/run/secrets/...) can't be exercised in CI, but the env +// override is the path tests + ops use to force a mode. +func TestInCluster_FORGEEnvOverride(t *testing.T) { + t.Setenv("FORGE_IN_CLUSTER", "true") + if !InCluster() { + t.Error("FORGE_IN_CLUSTER=true should force InCluster true") + } + t.Setenv("FORGE_IN_CLUSTER", "false") + if InCluster() { + t.Error("FORGE_IN_CLUSTER=false should force InCluster false") + } +} diff --git a/forge-core/types/config.go b/forge-core/types/config.go index e05246e..1c4d7f8 100644 --- a/forge-core/types/config.go +++ b/forge-core/types/config.go @@ -26,6 +26,7 @@ type ForgeConfig struct { Auth AuthConfig `yaml:"auth,omitempty"` MCP MCPConfig `yaml:"mcp,omitempty"` Schedules []ScheduleConfig `yaml:"schedules,omitempty"` + Scheduler SchedulerConfig `yaml:"scheduler,omitempty"` CORSOrigins []string `yaml:"cors_origins,omitempty"` Package PackageConfig `yaml:"package,omitempty"` GuardrailsPath string `yaml:"guardrails_path,omitempty"` // path to guardrails.json (default: "guardrails.json") @@ -293,6 +294,57 @@ type ScheduleConfig struct { ChannelTarget string `yaml:"channel_target,omitempty"` // destination ID (channel ID, chat ID) } +// SchedulerConfig selects the scheduler backend and tunes its +// behavior. Default zero value is "auto": file backend on the +// laptop / CI, Kubernetes backend when running in-cluster (the +// in-cluster signal is the projected ServiceAccount token at +// /var/run/secrets/kubernetes.io/serviceaccount/token). See issue +// #162. +type SchedulerConfig struct { + // Backend is one of "auto" (default), "file", or "kubernetes". + // - "auto": file when not in-cluster, kubernetes when in-cluster. + // - "file": always the file-backed scheduler with the 30s ticker. + // - "kubernetes": always the K8s CronJob backend. Errors at startup + // when not in-cluster and FORGE_IN_CLUSTER is not set true. + Backend string `yaml:"backend,omitempty"` + + // Kubernetes carries backend-specific tuning that's only consulted + // when Backend resolves to "kubernetes". + Kubernetes K8sSchedulerConfig `yaml:"kubernetes,omitempty"` +} + +// K8sSchedulerConfig is the kubernetes-backend tuning block. Wired +// in #162 part 2b (runtime CronJob CRUD) and #162 part 3 +// (`forge package` manifest generation). +type K8sSchedulerConfig struct { + // Namespace is the K8s namespace CronJobs land in. Empty defaults + // to the agent pod's own namespace at runtime; `forge package` + // emits "default" when this field is unset. + Namespace string `yaml:"namespace,omitempty"` + + // ServiceURL is the in-cluster URL CronJob trigger pods POST to. + // Required when Backend resolves to "kubernetes". Typical value: + // http://..svc:/ + ServiceURL string `yaml:"service_url,omitempty"` + + // AllowDynamic gates whether the LLM-driven `schedule_set` builtin + // tool can create new CronJobs at runtime. Default false — only + // declarative forge.yaml `schedules[]` entries materialize as + // CronJobs. Flipping to true requires granting the agent's + // ServiceAccount create/patch/delete RBAC on batch/cronjobs in its + // own namespace; see docs/deployment/scheduler-kubernetes.md. + AllowDynamic bool `yaml:"allow_dynamic,omitempty"` + + // TriggerImage is the container image the CronJob runs to make the + // curl request. Empty defaults to DefaultTriggerImage. + TriggerImage string `yaml:"trigger_image,omitempty"` + + // AuthSecretName overrides the K8s Secret name CronJobs mount for + // the internal bearer token. Empty defaults to + // "-internal-token" matching `forge auth secret-yaml`. + AuthSecretName string `yaml:"auth_secret_name,omitempty"` +} + // SecretsConfig configures secret management providers. type SecretsConfig struct { Providers []string `yaml:"providers,omitempty"` // e.g. ["env"], ["encrypted-file","env"]