Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions docs/deployment/scheduler-kubernetes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
---
title: "Scheduler — Kubernetes"
description: "Hybrid scheduler backend: file-backed on developer laptops, Kubernetes CronJobs in production."
order: 10
---

## Scheduler — Kubernetes

Forge's scheduler picks one of two backends at startup:

| Backend | When used | Persistence | Timing |
|---------|-----------|-------------|--------|
| `file` | Outside a Kubernetes pod | `<WorkDir>/.forge/memory/SCHEDULES.md` | 30s in-process goroutine ticker |
| `kubernetes` | Inside a Kubernetes pod (with `scheduler.backend: auto` or `kubernetes` in forge.yaml) | K8s `CronJob` resources (etcd) | Cluster's CronJob controller |

The Kubernetes backend solves three operational problems the file backend has in container deploys: no PVC needed (etcd is durable), horizontally safe (CronJob controller is cluster-singleton), and visible to standard `kubectl get cronjobs` tooling.

## Backend selection

`forge.yaml`:

```yaml
scheduler:
backend: auto # auto (default) | file | kubernetes
kubernetes:
namespace: "" # defaults to the agent pod's own namespace at runtime
service_url: "" # in-cluster URL CronJob trigger pods POST to (required in k8s mode)
allow_dynamic: false # whether schedule_set (LLM-driven) can create CronJobs at runtime
trigger_image: "" # container image the trigger Job runs; default: curlimages/curl:8.10.1
auth_secret_name: "" # K8s Secret holding the internal token; default: <agent_id>-internal-token
```

Resolution at startup:

1. `scheduler.backend: file` → file backend, always
2. `scheduler.backend: kubernetes` → kubernetes backend, errors at startup when not in-cluster (unless `FORGE_IN_CLUSTER=true` is set for testing)
3. `scheduler.backend: auto` (default) → kubernetes when the projected ServiceAccount token at `/var/run/secrets/kubernetes.io/serviceaccount/token` exists, file otherwise

The escape hatch `FORGE_IN_CLUSTER=true|false` overrides the file-presence check — useful for forcing file behavior in a single-replica dev pod, or for running the K8s backend's unit tests on a developer laptop.

## CronJob manifest shape

Forge generates one CronJob per schedule. The shape is fixed (no Helm templates) so `kubectl diff` and `kubectl apply -k` work without runtime substitution:

```yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: forge-aibuilderdemo-daily-summary
namespace: default
labels:
forge.agent.id: aibuilderdemo
forge.schedule.id: daily-summary
forge.schedule.source: yaml # or "llm" for LLM-set schedules
spec:
schedule: "0 9 * * *"
concurrencyPolicy: Forbid # K8s-native equivalent of the file backend's overlap skip
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: Never
containers:
- name: trigger
image: curlimages/curl:8.10.1
env:
- name: FORGE_AUTH_TOKEN
valueFrom:
secretKeyRef:
name: aibuilderdemo-internal-token
key: token
command: ["sh", "-c"]
args:
- |
curl -sfX POST http://aibuilderdemo.default.svc:8080/ \
-H "Authorization: Bearer $FORGE_AUTH_TOKEN" \
-H "X-Forge-Schedule-Id: daily-summary" \
-H "Content-Type: application/json" \
--data '{"jsonrpc":"2.0",...,"id":"sched-daily-summary-'$(date +%s)'",...}'
```

`concurrencyPolicy: Forbid` is the K8s-native equivalent of the file backend's overlap check — same semantic, enforced by the cluster.

The CronJob resource name is deterministic: `forge-<agent_id>-<schedule_id>`, sanitized for K8s naming rules, hash-suffixed when the natural name exceeds the 63-character limit so distinct schedules sharing a prefix don't collide after truncation.

## Token plumbing

CronJob trigger Pods authenticate to the agent's A2A endpoint using the same internal bearer token channel adapters use (`runner.go:ResolveAuth`). The Secret containing the token is **not** generated by `forge package` — see the security model below.

```sh
# Bootstrap the Secret from the local runtime.token (or mint one):
forge auth mint-token
forge auth secret-yaml | kubectl apply -f -
```

When the CronJob fires, the trigger container reads `FORGE_AUTH_TOKEN` from the mounted Secret, sends it as `Authorization: Bearer <token>`, and the agent's existing `static_token` auth provider validates it. The `auth_verify` audit event lands with `Source: "internal"` identical to a channel callback — no new auth code path, no surprise.

## Security model

- **The Secret holding the token is never checked in.** `forge package` (part 3) emits a credential-less Secret template; operators populate it out-of-band via `forge auth secret-yaml`, ExternalSecrets / Sealed Secrets / SOPS / Vault Agent Injector, or `kubectl create secret`. Applying a Deployment without first populating the Secret leaves the agent pod NotReady with a clear `secret "..." not found` event.

- **`allow_dynamic: false` is the default.** Static schedules from `forge.yaml` are materialized by `forge package` at build time. LLM-driven `schedule_set` calls do not create new CronJobs unless this flag is flipped on, which requires the agent's ServiceAccount to have `create`/`patch`/`delete` RBAC on `batch/cronjobs` in its namespace — a meaningful privilege escalation worth gating.

- **Same-namespace only.** CronJobs run in the agent pod's own namespace. Cross-namespace deploys are out of scope.

## What's in this PR (#162 part 2)

- `scheduler.Backend` interface (one surface, two implementations)
- `scheduler.FileBackend` — wraps the existing `Scheduler` ticker + `MemoryScheduleStore`. Zero behavior change vs pre-#162.
- `scheduler.InCluster()` detection helper
- `scheduler.CronJobYAML(...)` + `scheduler.CronJobName(...)` — pure-Go manifest builders, reusable by the runtime backend (part 2b) and `forge package` (part 3)

The runtime `KubernetesBackend` with client-go for live CronJob CRUD lands in part 2b. Part 3 wires the manifest builder into `forge package` so `kubectl apply -k ./k8s` produces a fully scheduled deploy.

## Local fallback

Outside a cluster — `forge run` on a laptop, CI, a non-k8s VM — the file backend resolves automatically. The 30s in-process ticker + `<WorkDir>/.forge/memory/SCHEDULES.md` continue to work byte-identically to pre-#162.

## See also

- [`forge auth`](../reference/cli-reference.md#forge-auth) — internal-token operator UX (issue #162 part 1)
- [Scheduling](../core-concepts/scheduling.md) — declarative `forge.yaml` `schedules[]` syntax
- [Audit Logging](../security/audit-logging.md) — `schedule_fire` / `schedule_complete` events
21 changes: 15 additions & 6 deletions forge-cli/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type Runner struct {
modelConfig *coreruntime.ModelConfig // resolved model config (for banner)
derivedCLIConfig *contract.DerivedCLIConfig // auto-derived from skill requirements
skillGuardrails *agentspec.SkillGuardrailRules // runtime-parsed skill guardrails (fallback when no build artifact)
sched *scheduler.Scheduler // cron scheduler (nil until started)
schedBackend scheduler.Backend // schedule backend (nil until started); FileBackend in non-cluster deploys, KubernetesBackend (#162 part 2b) when running in-cluster with scheduler.backend=auto|kubernetes
startTime time.Time // server start time (for /health uptime)
scheduleNotifier ScheduleNotifier // optional: delivers cron results to channels
authToken string // resolved auth token (empty if --no-auth)
Expand Down Expand Up @@ -968,10 +968,19 @@ func (r *Runner) Run(ctx context.Context) error {
})
}
}
r.sched = scheduler.New(schedStore, dispatch, r.logger, auditFn)
// FileBackend wraps the existing Scheduler ticker
// + ScheduleStore behind the unified Backend
// interface introduced in #162 part 2. Behavior is
// identical to pre-#162 (same ticker, same store,
// same overlap semantics) — the wrapper exists so
// part 2b can drop in a KubernetesBackend at the
// same construction point without rewriting the
// runner's call sites.
sched := scheduler.New(schedStore, dispatch, r.logger, auditFn)
r.schedBackend = scheduler.NewFileBackend(schedStore, sched)
r.syncYAMLSchedules(ctx, schedStore)
r.sched.Start(ctx)
defer r.sched.Stop()
r.schedBackend.Start(ctx)
defer r.schedBackend.Stop()
}

r.logger.Info("using LLM executor", map[string]any{
Expand Down Expand Up @@ -3624,8 +3633,8 @@ type lazyScheduleReloader struct {
}

func (l *lazyScheduleReloader) Reload(ctx context.Context) {
if l.runner.sched != nil {
l.runner.sched.Reload(ctx)
if l.runner.schedBackend != nil {
l.runner.schedBackend.Reload(ctx)
}
}

Expand Down
182 changes: 182 additions & 0 deletions forge-core/scheduler/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package scheduler

import "context"

// Backend abstracts the persistence + timing layer the runner uses for
// scheduled tasks. Two implementations ship today (#162):
//
// - FileBackend: wraps the existing Scheduler ticker + MemoryScheduleStore.
// Persistence is a markdown file at <WorkDir>/.forge/memory/SCHEDULES.md;
// timing is a 30s goroutine ticker; overlap is prevented by an
// in-process map of "currently running" flags.
//
// - KubernetesBackend (forge-cli/runtime/scheduler_k8s.go): persists
// schedules as K8s CronJob resources via client-go. Timing is the
// cluster's CronJob controller. Overlap is prevented by
// CronJob.Spec.ConcurrencyPolicy=Forbid (K8s's native equivalent of
// the file backend's running map).
//
// The Backend interface intentionally bundles "timing concerns" (Start,
// Stop, Reload) with "persistence concerns" (List, Get, Set, Delete,
// Sync) into one surface because the two are co-located in the file
// backend's existing implementation and entirely owned by the cluster
// in the kubernetes backend. Splitting them would force one or the
// other backend to implement no-op methods.
type Backend interface {
// Start launches any backend-specific goroutines (file backend: the
// 30s ticker). For backends that delegate timing to an external
// system (kubernetes backend: the CronJob controller), Start is a
// no-op. Must be safe to call once per backend instance.
Start(ctx context.Context)

// Stop signals the backend to terminate any goroutines launched by
// Start and waits for them to exit. Idempotent.
Stop()

// Reload re-reads any cached state (file backend: the parsed-cron
// cache). For backends with no cached state (kubernetes backend:
// each operation hits the API), Reload is a no-op.
Reload(ctx context.Context)

// Sync reconciles the backend's state with the declarative list
// of schedules pulled from forge.yaml. Called once at startup
// after Start and again on hot-reload. Existing schedules with
// matching IDs are updated in-place; new ones are added;
// previously-yaml-sourced schedules no longer in the list are
// deleted (LLM-sourced schedules are left alone — they're owned
// by the agent's chat history, not the declarative manifest).
Sync(ctx context.Context, declared []Schedule) error

// List returns every active schedule the backend knows about.
List(ctx context.Context) ([]Schedule, error)

// Get returns a single schedule by ID, or nil when absent.
Get(ctx context.Context, id string) (*Schedule, error)

// Set creates or updates a schedule. Schedule.Source distinguishes
// declarative (forge.yaml) entries from LLM-set ones; backends use
// it to enforce RBAC in the kubernetes case and labeling in both.
Set(ctx context.Context, sched Schedule) error

// Delete removes a schedule by ID. Returns nil when the schedule
// did not exist (idempotent).
Delete(ctx context.Context, id string) error

// History returns recent run records for a schedule (or all
// schedules when scheduleID is empty). File backend reads from
// the SCHEDULES.md history block; kubernetes backend returns
// empty + a logger.Warn deferring to the audit stream (which
// already carries schedule_complete events with status +
// duration).
History(ctx context.Context, scheduleID string, limit int) ([]HistoryEntry, error)
}

// FileBackend is the default Backend implementation: wraps the
// existing Scheduler tick loop and ScheduleStore behind the unified
// Backend interface. Zero behavior change vs the pre-#162 wiring —
// constructed via NewFileBackend at the runner's existing scheduler
// init site. The wrapping is structural (delegates everything to the
// underlying Scheduler / Store), not a reimplementation.
type FileBackend struct {
store ScheduleStore
sched *Scheduler
}

// NewFileBackend constructs a FileBackend wrapping the given store +
// scheduler. The caller still owns store + scheduler lifecycle outside
// of Start/Stop on the Backend.
func NewFileBackend(store ScheduleStore, sched *Scheduler) *FileBackend {
return &FileBackend{store: store, sched: sched}
}

func (b *FileBackend) Start(ctx context.Context) { b.sched.Start(ctx) }
func (b *FileBackend) Stop() { b.sched.Stop() }
func (b *FileBackend) Reload(ctx context.Context) { b.sched.Reload(ctx) }

// Sync upserts declared (yaml-sourced) schedules into the store and
// removes any pre-existing yaml-sourced schedules that are no longer
// in the declared list. LLM-sourced schedules are preserved.
func (b *FileBackend) Sync(ctx context.Context, declared []Schedule) error {
existing, err := b.store.List(ctx)
if err != nil {
return err
}
declaredIDs := make(map[string]bool, len(declared))
for _, d := range declared {
declaredIDs[d.ID] = true
// Preserve existing per-run state when updating.
if cur, _ := b.store.Get(ctx, d.ID); cur != nil {
d.LastRun = cur.LastRun
d.LastStatus = cur.LastStatus
d.RunCount = cur.RunCount
if cur.Created.IsZero() {
// Should not happen in practice but stays safe.
} else {
d.Created = cur.Created
}
}
if err := b.store.Set(ctx, d); err != nil {
return err
}
}
// Delete previously-yaml-sourced entries no longer in the manifest.
for _, e := range existing {
if e.Source != SourceYAML {
continue
}
if declaredIDs[e.ID] {
continue
}
if err := b.store.Delete(ctx, e.ID); err != nil {
return err
}
}
b.sched.Reload(ctx)
return nil
}

func (b *FileBackend) List(ctx context.Context) ([]Schedule, error) {
return b.store.List(ctx)
}

func (b *FileBackend) Get(ctx context.Context, id string) (*Schedule, error) {
return b.store.Get(ctx, id)
}

func (b *FileBackend) Set(ctx context.Context, sched Schedule) error {
if err := b.store.Set(ctx, sched); err != nil {
return err
}
b.sched.Reload(ctx)
return nil
}

func (b *FileBackend) Delete(ctx context.Context, id string) error {
if err := b.store.Delete(ctx, id); err != nil {
return err
}
b.sched.Reload(ctx)
return nil
}

func (b *FileBackend) History(ctx context.Context, scheduleID string, limit int) ([]HistoryEntry, error) {
return b.store.History(ctx, scheduleID, limit)
}

// Store exposes the underlying ScheduleStore for callers (the schedule_*
// builtin tools registered by the runner) that already speak the
// ScheduleStore vocabulary. KubernetesBackend exposes a thin adapter
// over its CronJob CRUD here so the builtin tools work in both modes
// without per-tool branching.
func (b *FileBackend) Store() ScheduleStore { return b.store }

// Source constants mark schedules by origin so Sync can reconcile
// declarative state without nuking LLM-set entries.
const (
// SourceYAML marks schedules synced in from forge.yaml's
// `schedules[]` block at startup or hot-reload.
SourceYAML = "yaml"
// SourceLLM marks schedules created at runtime by the LLM via the
// schedule_set builtin tool.
SourceLLM = "llm"
)
Loading
Loading