Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions docs/deployment/scheduler-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,43 @@ When the CronJob fires, the trigger container reads `FORGE_AUTH_TOKEN` from the

- **Same-namespace only.** CronJobs run in the agent pod's own namespace. Cross-namespace deploys are out of scope.

## What's in this PR (#162 part 2)
## RBAC

- `scheduler.Backend` interface (one surface, two implementations)
- `scheduler.FileBackend` — wraps the existing `Scheduler` ticker + `MemoryScheduleStore`. Zero behavior change vs pre-#162.
- `scheduler.InCluster()` detection helper
- `scheduler.CronJobYAML(...)` + `scheduler.CronJobName(...)` — pure-Go manifest builders, reusable by the runtime backend (part 2b) and `forge package` (part 3)
The agent pod's ServiceAccount needs read access to its own CronJobs in any K8s-backend deploy. CRUD verbs are only required when `allow_dynamic: true`:

The runtime `KubernetesBackend` with client-go for live CronJob CRUD lands in part 2b. Part 3 wires the manifest builder into `forge package` so `kubectl apply -k ./k8s` produces a fully scheduled deploy.
```yaml
- apiGroups: ["batch"]
resources: ["cronjobs"]
verbs:
- get # always
- list # always — powers schedule_list
- create # only when allow_dynamic: true
- update # only when allow_dynamic: true
- delete # only when allow_dynamic: true OR a yaml schedule was removed
```

`forge package` (part 3) emits a Role + RoleBinding scoped to the agent's namespace with the minimum verbs based on `allow_dynamic`.

## Annotations on Forge-owned CronJobs

Beyond the labels above, the runtime KubernetesBackend stamps Forge-specific fields as annotations so they round-trip through `kubectl get cronjob -o yaml` and back into the `schedule_list` tool:

| Annotation | Source |
|------------|--------|
| `forge.schedule.task` | natural-language task description |
| `forge.schedule.skill` | optional skill name |
| `forge.schedule.channel` | optional channel adapter |
| `forge.schedule.channel_target` | optional channel destination ID |
| `forge.schedule.run_count` | execution counter (LLM-set schedules only) |
| `forge.schedule.last_status` | last execution outcome |

`LastRun` is read from `CronJob.Status.LastScheduleTime` — operators don't need to write it.

## What's NOT in the K8s backend

- **`schedule_history`**: returns empty + logs once. The audit stream's `schedule_fire` / `schedule_complete` events are the canonical source of truth.
- **Cross-namespace deploys**: first cut assumes CronJob and agent live in the same namespace.
- **Token auto-rotation**: the internal token is long-lived. Operators rotate by re-deploying with a fresh token in the Secret + pod restart picking it up.

## Local fallback

Expand Down
15 changes: 9 additions & 6 deletions forge-cli/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/initializ/forge/forge-cli

go 1.25.0
go 1.26.0

require (
github.com/charmbracelet/bubbles v1.0.0
Expand All @@ -11,7 +11,7 @@ require (
github.com/initializ/forge/forge-skills v0.0.0
github.com/initializ/forge/forge-ui v0.0.0
github.com/spf13/cobra v1.10.2
golang.org/x/term v0.40.0
golang.org/x/term v0.43.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -49,11 +49,14 @@ require (
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.mongodb.org/mongo-driver v1.17.3 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/text v0.37.0 // indirect
golang.org/x/time v0.15.0 // indirect
k8s.io/api v0.36.2 // indirect
k8s.io/apimachinery v0.36.2 // indirect
k8s.io/client-go v0.36.2 // indirect
)

replace (
Expand Down
14 changes: 14 additions & 0 deletions forge-cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc
github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand All @@ -63,6 +64,7 @@ github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand All @@ -87,6 +89,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand All @@ -97,6 +100,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -106,16 +110,19 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg=
golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM=
golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -124,5 +131,12 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.36.2 h1:TF6YDLIzKfccK7cq9YpTcGX8TJmEkHVRv78DM51fRYY=
k8s.io/api v0.36.2/go.mod h1:F4LbMO4brjZYh7yFkXWhynSvtB7YauxV4c+HHkNRGNg=
k8s.io/apimachinery v0.36.2 h1:0PE/W/WNy1UX61NLbXY5TMbJ6UwLL6E6lAPkYrKFxbQ=
k8s.io/apimachinery v0.36.2/go.mod h1:fvf/HOLXq9RId0rnDIbN1OEBvHXdQbLMM8nu0LcBUf4=
k8s.io/client-go v0.36.2 h1:bfgxmFKc9CgqsgX4xKLAAdmTQlWee7Ob/HlDOrJ5TBI=
k8s.io/client-go v0.36.2/go.mod h1:1vgO4OAlfPnoLcb+Rze2GF5rAr14w8qjrYMoyXJzQj0=
140 changes: 86 additions & 54 deletions forge-cli/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,17 +968,24 @@ func (r *Runner) Run(ctx context.Context) error {
})
}
}
// Pick the schedule backend per scheduler.backend:
// "kubernetes" — always K8s (errors at startup when not in-cluster);
// "file" — always the file backend;
// "auto" / "" — kubernetes when in-cluster, file otherwise.
// FileBackend wraps the existing Scheduler ticker
// + ScheduleStore behind the unified Backend
// interface introduced in #162 part 2. Behavior is
// identical to pre-#162 (same ticker, same store,
// same overlap semantics) — the wrapper exists so
// part 2b can drop in a KubernetesBackend at the
// same construction point without rewriting the
// runner's call sites.
sched := scheduler.New(schedStore, dispatch, r.logger, auditFn)
r.schedBackend = scheduler.NewFileBackend(schedStore, sched)
r.syncYAMLSchedules(ctx, schedStore)
// interface introduced in #162 part 2; the
// KubernetesBackend (#162 part 2b) delegates timing
// to the cluster's CronJob controller and persists
// state as CronJob resources in etcd.
backend, berr := r.selectScheduleBackend(schedStore, dispatch, auditFn)
if berr != nil {
return berr
}
r.schedBackend = backend
if syncErr := r.schedBackend.Sync(ctx, r.declaredSchedules()); syncErr != nil {
r.logger.Warn("schedule backend sync failed", map[string]any{"error": syncErr.Error()})
}
r.schedBackend.Start(ctx)
defer r.schedBackend.Stop()
}
Expand Down Expand Up @@ -3710,62 +3717,87 @@ func (r *Runner) makeScheduleDispatcher(executor coreruntime.AgentExecutor, egre
}
}

// syncYAMLSchedules upserts schedules from forge.yaml into the store and
// removes stale yaml-sourced schedules no longer in config.
func (r *Runner) syncYAMLSchedules(ctx context.Context, store scheduler.ScheduleStore) {
yamlConfigs := r.cfg.Config.Schedules
if len(yamlConfigs) == 0 {
return
// selectScheduleBackend picks the scheduler.Backend implementation
// based on forge.yaml's `scheduler.backend` field. Resolution:
//
// - "kubernetes" → always KubernetesBackend; returns a hard error
// when not in-cluster and FORGE_IN_CLUSTER is not set true.
// - "file" → always FileBackend.
// - "auto" / "" → KubernetesBackend when in-cluster, otherwise
// FileBackend.
//
// The FileBackend is constructed with the supplied store/dispatch/audit
// from the existing wiring. The KubernetesBackend ignores those (the
// cluster handles timing and audit linkage lands via the
// X-Forge-Schedule-Id header at the A2A boundary in part 3).
func (r *Runner) selectScheduleBackend(
store scheduler.ScheduleStore,
dispatch scheduler.TaskDispatcher,
auditFn scheduler.AuditFunc,
) (scheduler.Backend, error) {
mode := r.cfg.Config.Scheduler.Backend
useK8s := false
switch mode {
case "", "auto":
useK8s = scheduler.InCluster()
case "kubernetes":
useK8s = true
if !scheduler.InCluster() {
return nil, fmt.Errorf("scheduler.backend=kubernetes requires running in a Kubernetes pod (set FORGE_IN_CLUSTER=true to override for tests)")
}
case "file":
useK8s = false
default:
return nil, fmt.Errorf("scheduler.backend = %q: must be one of auto / file / kubernetes", mode)
}
if !useK8s {
sched := scheduler.New(store, dispatch, r.logger, auditFn)
return scheduler.NewFileBackend(store, sched), nil
}
k8sCfg := r.cfg.Config.Scheduler.Kubernetes
backend, err := NewKubernetesBackend(
r.cfg.Config.AgentID,
k8sCfg.Namespace,
K8sBackendConfig{
ServiceURL: k8sCfg.ServiceURL,
AuthSecretName: k8sCfg.AuthSecretName,
TriggerImage: k8sCfg.TriggerImage,
AllowDynamic: k8sCfg.AllowDynamic,
},
r.logger,
)
if err != nil {
return nil, err
}
r.logger.Info("scheduler: using kubernetes backend", map[string]any{
"namespace": k8sCfg.Namespace,
"service_url": k8sCfg.ServiceURL,
"allow_dynamic": k8sCfg.AllowDynamic,
})
return backend, nil
}

// Build set of yaml schedule IDs from config.
configIDs := make(map[string]bool, len(yamlConfigs))
for _, sc := range yamlConfigs {
configIDs[sc.ID] = true

now := time.Now().UTC()
existing, _ := store.Get(ctx, sc.ID)

sched := scheduler.Schedule{
// declaredSchedules translates the forge.yaml schedules[] block into
// the scheduler.Schedule shape Backend.Sync consumes. Marks each as
// SourceYAML so the backend's reconciliation distinguishes them from
// LLM-set entries.
func (r *Runner) declaredSchedules() []scheduler.Schedule {
out := make([]scheduler.Schedule, 0, len(r.cfg.Config.Schedules))
now := time.Now().UTC()
for _, sc := range r.cfg.Config.Schedules {
out = append(out, scheduler.Schedule{
ID: sc.ID,
Cron: sc.Cron,
Task: sc.Task,
Skill: sc.Skill,
Channel: sc.Channel,
ChannelTarget: sc.ChannelTarget,
Source: "yaml",
Source: scheduler.SourceYAML,
Enabled: true,
Created: now,
}

// Preserve runtime state from existing schedule.
if existing != nil {
sched.Created = existing.Created
sched.LastRun = existing.LastRun
sched.LastStatus = existing.LastStatus
sched.RunCount = existing.RunCount
}

if err := store.Set(ctx, sched); err != nil {
r.logger.Warn("failed to sync yaml schedule", map[string]any{
"id": sc.ID, "error": err.Error(),
})
}
}

// Remove stale yaml-sourced schedules.
existing, _ := store.List(ctx)
for _, s := range existing {
if s.Source == "yaml" && !configIDs[s.ID] {
if err := store.Delete(ctx, s.ID); err != nil {
r.logger.Warn("failed to remove stale yaml schedule", map[string]any{
"id": s.ID, "error": err.Error(),
})
}
}
})
}

r.logger.Info("synced yaml schedules", map[string]any{"count": len(yamlConfigs)})
return out
}

// buildSchedulerPrompt generates the scheduler awareness section for the system prompt.
Expand Down
Loading