diff --git a/examples/extensions/phase-handoff.go b/examples/extensions/phase-handoff.go index cbd619c6..020ffabf 100644 --- a/examples/extensions/phase-handoff.go +++ b/examples/extensions/phase-handoff.go @@ -54,8 +54,12 @@ func Init(api ext.API) { return } - // NewSession blocks until the TUI completes the switch; run it in - // a goroutine so the agent's turn-end pipeline isn't stalled. + // NewSession blocks while the agent finishes settling and then while + // the TUI completes the switch; run it in a goroutine so the agent's + // turn-end pipeline isn't stalled. The internal wait-for-idle (added + // in response to issue #63) makes this reliable even when post-turn + // tooling (formatters, on-save hooks, hidden tool calls) extends the + // busy window past AgentEnd. go func() { if err := ctx.NewSession(HANDOFFPrompt); err != nil { ctx.PrintError("phase-handoff: " + err.Error()) diff --git a/internal/app/app.go b/internal/app/app.go index 503cbdbc..7f6b70f2 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + "errors" "fmt" "log" "os" @@ -24,6 +25,26 @@ type queueItem struct { Files []kit.LLMFilePart } +// ErrAgentBusy is returned when an operation cannot proceed because the agent +// is still processing a turn (including any post-turn extension hooks) and did +// not become idle before the operation's deadline. +// +// This is an alias for extensions.ErrAgentBusy so the extension API and the +// app layer share a single sentinel value — callers can detect the condition +// with errors.Is(err, app.ErrAgentBusy) without substring-matching the error +// message. +var ErrAgentBusy = extensions.ErrAgentBusy + +// DefaultNewSessionIdleWait bounds how long RequestNewSessionFromExtension +// will block waiting for the agent to settle. It needs to be generous enough +// to cover real-world post-turn tooling (project formatters, on-save linters, +// hidden tool calls) which routinely hold the busy flag for seconds and +// occasionally minutes — yet still short enough to surface a wedged agent. +// +// Issue #63 reported workloads where the busy window regularly exceeded +// 6 seconds; ten minutes is the same bound the workaround in that issue used. +const DefaultNewSessionIdleWait = 10 * time.Minute + // App is the application-layer orchestrator. It owns the agentic loop, // conversation history (via MessageStore), and queue management. It is // designed to be created once per session and reused across multiple prompts. @@ -55,11 +76,25 @@ type App struct { // each new step and called by CancelCurrentStep(). cancelStep context.CancelFunc - // mu protects busy, queue, and cancelStep. + // mu protects busy, queue, cancelStep, and idleCh. mu sync.Mutex busy bool queue []queueItem + // idleCh is closed when the agent transitions from busy back to idle. + // While the agent is idle the channel is already closed (recv returns + // immediately). When busy transitions to true a fresh open channel is + // allocated so callers blocked on the previous one are released. All + // transitions are funnelled through setBusyLocked to keep the channel + // pointer in sync with the busy flag. + // + // This is the underlying primitive WaitForIdle and + // RequestNewSessionFromExtension wait on to fix the AgentEnd→NewSession + // race described in issue #63: AgentEnd is emitted from inside the agent + // loop, before drainQueue clears busy, so any extension hook that calls + // ctx.NewSession synchronously would otherwise observe busy==true. + idleCh chan struct{} + // wg tracks in-flight goroutines; Close() waits on it. wg sync.WaitGroup @@ -95,6 +130,10 @@ type App struct { // initialMessages may be nil or empty for a fresh session. func New(opts Options, initialMessages []kit.LLMMessage) *App { rootCtx, rootCancel := context.WithCancel(context.Background()) + // idleCh starts already closed: the freshly constructed App is idle, so + // any caller blocking on it via WaitForIdle should be released immediately. + idleCh := make(chan struct{}) + close(idleCh) return &App{ opts: opts, store: NewMessageStoreWithMessages(initialMessages), @@ -102,6 +141,90 @@ func New(opts Options, initialMessages []kit.LLMMessage) *App { rootCancel: rootCancel, // cancelStep starts as a no-op so CancelCurrentStep() is always safe. cancelStep: func() {}, + idleCh: idleCh, + } +} + +// setBusyLocked is the single chokepoint for mutating a.busy. It keeps the +// idleCh signalling channel in sync with the busy flag: +// +// - false → true: allocate a fresh open channel so future WaitForIdle +// callers block until the next idle transition. +// - true → false: close the current channel so any waiters wake up. +// +// No-op when the requested state already matches. The caller must hold a.mu. +func (a *App) setBusyLocked(busy bool) { + if a.busy == busy { + return + } + a.busy = busy + if busy { + a.idleCh = make(chan struct{}) + } else { + close(a.idleCh) + } +} + +// idleSnapshot returns the current busy state and the channel that will be +// closed on the next idle transition. The snapshot is taken under a.mu so the +// pair is consistent (busy==true ⇒ ch is the open channel for *this* busy +// cycle, not a stale one). +func (a *App) idleSnapshot() (busy bool, ch chan struct{}) { + a.mu.Lock() + defer a.mu.Unlock() + return a.busy, a.idleCh +} + +// WaitForIdle blocks until the agent is idle, the given timeout elapses, or +// the app shuts down. Returns nil on idle, ErrAgentBusy on timeout, or the +// rootCtx error if the app is closing. +// +// A non-positive timeout disables the deadline and waits indefinitely (until +// idle or app shutdown). Safe to call from any goroutine, but never from +// inside the Bubble Tea Update() loop — it blocks. +// +// Idiomatic use from extensions: +// +// if err := app.WaitForIdle(0); err != nil { /* shutdown */ } +// +// The loop guards against the agent re-arming itself between wakeups: if +// another prompt is queued (or a steer message lands) while we're waiting, +// setBusyLocked allocates a fresh idleCh and we wait again. +func (a *App) WaitForIdle(timeout time.Duration) error { + var deadline time.Time + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + for { + busy, ch := a.idleSnapshot() + if !busy { + return nil + } + var timer *time.Timer + var timerCh <-chan time.Time + if timeout > 0 { + remaining := time.Until(deadline) + if remaining <= 0 { + return ErrAgentBusy + } + timer = time.NewTimer(remaining) + timerCh = timer.C + } + select { + case <-ch: + // Idle transition observed — loop and re-check under the + // mutex in case a new busy cycle started immediately after. + case <-timerCh: + return ErrAgentBusy + case <-a.rootCtx.Done(): + if timer != nil { + timer.Stop() + } + return a.rootCtx.Err() + } + if timer != nil { + timer.Stop() + } } } @@ -155,7 +278,7 @@ func (a *App) RunWithFiles(prompt string, files []kit.LLMFilePart) int { return qLen } - a.busy = true + a.setBusyLocked(true) a.wg.Add(1) a.mu.Unlock() go a.drainQueue(item) @@ -235,7 +358,7 @@ func (a *App) SteerWithFiles(prompt string, files []kit.LLMFilePart) int { if !a.busy { // Not busy — start immediately, same as RunWithFiles(). item := queueItem{Prompt: prompt, Files: files} - a.busy = true + a.setBusyLocked(true) a.wg.Add(1) a.mu.Unlock() go a.drainQueue(item) @@ -271,7 +394,7 @@ func (a *App) InterruptAndSend(prompt string) { if !a.busy { // Not busy — start immediately, same as Run(). - a.busy = true + a.setBusyLocked(true) a.wg.Add(1) a.mu.Unlock() go a.drainQueue(item) @@ -470,7 +593,7 @@ func (a *App) CompactConversation(customInstructions string) error { a.mu.Unlock() return fmt.Errorf("SDK instance not available") } - a.busy = true + a.setBusyLocked(true) a.wg.Add(1) a.mu.Unlock() @@ -532,7 +655,7 @@ func (a *App) CompactAsync(customInstructions string, onComplete func(), onError a.mu.Unlock() return fmt.Errorf("SDK instance not available") } - a.busy = true + a.setBusyLocked(true) a.wg.Add(1) a.mu.Unlock() @@ -621,7 +744,7 @@ func (a *App) releaseBusyAfterCompact() { // in just before closed was set. if a.closed { a.queue = a.queue[:0] - a.busy = false + a.setBusyLocked(false) a.mu.Unlock() return } @@ -633,7 +756,7 @@ func (a *App) releaseBusyAfterCompact() { a.queue = a.queue[:0] if len(pending) == 0 { - a.busy = false + a.setBusyLocked(false) a.mu.Unlock() return } @@ -850,7 +973,7 @@ func (a *App) drainQueue(first queueItem) { // Mark as no longer busy a.mu.Lock() - a.busy = false + a.setBusyLocked(false) a.mu.Unlock() } @@ -1233,8 +1356,17 @@ func (a *App) SetEditorTextFromExtension(text string) { // RequestNewSessionFromExtension sends a NewSessionRequestEvent to the TUI // to end the current session and start a fresh one. If initialPrompt is // non-empty it is submitted as the first user turn of the new session. -// Returns an error when running headless (no TUI attached), when the agent -// is busy, or when a BeforeSessionSwitch extension hook cancels the switch. +// +// If the agent is currently busy (e.g. the caller is an OnAgentEnd hook that +// fires before drainQueue clears the busy flag, or there are queued prompts +// still being processed) the call blocks until the agent becomes idle, up to +// DefaultNewSessionIdleWait. If that deadline elapses, ErrAgentBusy is +// returned and callers can detect it with errors.Is. This wait-then-send +// behavior fixes the v0.79.0 phase-handoff race documented in issue #63. +// +// Returns an error when running headless (no TUI attached), when the wait +// for idle times out (ErrAgentBusy), when the app is shutting down, or when +// a BeforeSessionSwitch extension hook cancels the switch. // // This is the implementation behind ctx.NewSession(prompt) for the // interactive TUI. It blocks the caller until the TUI processes the @@ -1246,8 +1378,11 @@ func (a *App) RequestNewSessionFromExtension(initialPrompt string) error { if prog == nil { return fmt.Errorf("new session unavailable: no interactive TUI attached") } - if a.IsBusy() { - return fmt.Errorf("cannot start new session while agent is busy") + if err := a.WaitForIdle(DefaultNewSessionIdleWait); err != nil { + if errors.Is(err, ErrAgentBusy) { + return fmt.Errorf("cannot start new session: %w", err) + } + return err } ch := make(chan error, 1) prog.Send(NewSessionRequestEvent{InitialPrompt: initialPrompt, ResponseCh: ch}) diff --git a/internal/app/app_test.go b/internal/app/app_test.go index bf214a8f..02b16130 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -794,7 +794,7 @@ func TestReleaseBusyAfterCompact_flushesQueuedMessages(t *testing.T) { // summarising. (Run() would have appended them and returned a queue // length > 0 to the caller.) app.mu.Lock() - app.busy = true + app.setBusyLocked(true) app.queue = append(app.queue, queueItem{Prompt: "queued during compact #1"}, queueItem{Prompt: "queued during compact #2"}, @@ -834,7 +834,7 @@ func TestReleaseBusyAfterCompact_idleWhenQueueEmpty(t *testing.T) { defer app.Close() app.mu.Lock() - app.busy = true + app.setBusyLocked(true) app.mu.Unlock() app.releaseBusyAfterCompact() @@ -901,7 +901,7 @@ func TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue(t *testing.T) { // Simulate the state at the end of compaction: busy is set and a couple // of regular Run() prompts have piled up after the steer messages. app.mu.Lock() - app.busy = true + app.setBusyLocked(true) app.queue = append(app.queue, queueItem{Prompt: "queued-1"}, queueItem{Prompt: "queued-2"}, @@ -950,7 +950,7 @@ func TestReleaseBusyAfterCompact_dropsQueueWhenClosed(t *testing.T) { app := newTestApp(stub) app.mu.Lock() - app.busy = true + app.setBusyLocked(true) app.queue = append(app.queue, queueItem{Prompt: "would have run"}) app.closed = true app.mu.Unlock() @@ -999,7 +999,7 @@ func TestPopLastUserMessage_WhileBusy(t *testing.T) { defer app.Close() app.mu.Lock() - app.busy = true + app.setBusyLocked(true) app.mu.Unlock() _, _, err := app.PopLastUserMessage() @@ -1115,3 +1115,281 @@ func TestPopLastUserMessage_NoUserOnBranch(t *testing.T) { t.Fatalf("expected error mentioning missing user message, got %q", err.Error()) } } + +// -------------------------------------------------------------------------- +// WaitForIdle / RequestNewSessionFromExtension (issue #63) +// -------------------------------------------------------------------------- + +// TestWaitForIdle_AlreadyIdle verifies the fast path: a freshly constructed +// App is idle and WaitForIdle returns immediately without consulting the +// timeout. +func TestWaitForIdle_AlreadyIdle(t *testing.T) { + app := newTestApp(newStub()) + defer app.Close() + + start := time.Now() + if err := app.WaitForIdle(2 * time.Second); err != nil { + t.Fatalf("WaitForIdle on idle app: %v", err) + } + if elapsed := time.Since(start); elapsed > 100*time.Millisecond { + t.Fatalf("WaitForIdle blocked for %s on already-idle app", elapsed) + } +} + +// TestWaitForIdle_BlocksUntilDrain reproduces the issue #63 race: while +// drainQueue holds busy==true the call should block, then return nil as soon +// as the drain completes. +func TestWaitForIdle_BlocksUntilDrain(t *testing.T) { + gate := make(chan struct{}) + var gateOnce sync.Once + closeGate := func() { gateOnce.Do(func() { close(gate) }) } + stub := newStubWithFuncs( + func(ctx context.Context) (*kit.TurnResult, error) { + select { + case <-gate: + case <-ctx.Done(): + return nil, ctx.Err() + } + return turnResult("done"), nil + }, + ) + app := newTestApp(stub) + t.Cleanup(func() { + closeGate() + app.Close() + }) + + app.Run("hello") + + // Confirm the agent is busy before we start waiting. + if !waitForCondition(2*time.Second, func() bool { return app.IsBusy() }) { + t.Fatal("app never became busy after Run()") + } + + errCh := make(chan error, 1) + go func() { + errCh <- app.WaitForIdle(5 * time.Second) + }() + + // Should not return while the stub is blocked. + select { + case err := <-errCh: + t.Fatalf("WaitForIdle returned early (err=%v) while agent still busy", err) + case <-time.After(150 * time.Millisecond): + } + + closeGate() + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("WaitForIdle: %v", err) + } + case <-time.After(3 * time.Second): + t.Fatal("WaitForIdle did not return after drain completed") + } + + if app.IsBusy() { + t.Fatal("app still reports busy after WaitForIdle returned") + } +} + +// TestWaitForIdle_TimeoutReturnsErrAgentBusy verifies that a slow turn yields +// ErrAgentBusy (detectable via errors.Is) when the deadline elapses. +func TestWaitForIdle_TimeoutReturnsErrAgentBusy(t *testing.T) { + gate := make(chan struct{}) + stub := newStubWithFuncs( + func(ctx context.Context) (*kit.TurnResult, error) { + select { + case <-gate: + case <-ctx.Done(): + return nil, ctx.Err() + } + return turnResult("done"), nil + }, + ) + app := newTestApp(stub) + // Release the stub before Close so wg.Wait() can return. + t.Cleanup(func() { + close(gate) + app.Close() + }) + + app.Run("hello") + if !waitForCondition(2*time.Second, func() bool { return app.IsBusy() }) { + t.Fatal("app never became busy after Run()") + } + + err := app.WaitForIdle(50 * time.Millisecond) + if !errors.Is(err, ErrAgentBusy) { + t.Fatalf("expected ErrAgentBusy on timeout, got %v", err) + } +} + +// TestWaitForIdle_ZeroTimeoutWaitsIndefinitely verifies that a non-positive +// timeout still blocks until idle (or shutdown) — not an instant ErrAgentBusy. +func TestWaitForIdle_ZeroTimeoutWaitsIndefinitely(t *testing.T) { + gate := make(chan struct{}) + var gateOnce sync.Once + closeGate := func() { gateOnce.Do(func() { close(gate) }) } + stub := newStubWithFuncs( + func(ctx context.Context) (*kit.TurnResult, error) { + select { + case <-gate: + case <-ctx.Done(): + return nil, ctx.Err() + } + return turnResult("done"), nil + }, + ) + app := newTestApp(stub) + t.Cleanup(func() { + closeGate() + app.Close() + }) + + app.Run("hello") + if !waitForCondition(2*time.Second, func() bool { return app.IsBusy() }) { + t.Fatal("app never became busy after Run()") + } + + errCh := make(chan error, 1) + go func() { errCh <- app.WaitForIdle(0) }() + + select { + case err := <-errCh: + t.Fatalf("WaitForIdle(0) returned early with %v while agent was busy", err) + case <-time.After(150 * time.Millisecond): + } + + closeGate() + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("WaitForIdle(0) returned %v after idle", err) + } + case <-time.After(3 * time.Second): + t.Fatal("WaitForIdle(0) did not return after drain completed") + } +} + +// TestWaitForIdle_AppClose verifies that shutting down the app while a +// caller is blocked in WaitForIdle releases the wait. +func TestWaitForIdle_AppClose(t *testing.T) { + gate := make(chan struct{}) + stub := newStubWithFuncs( + func(ctx context.Context) (*kit.TurnResult, error) { + select { + case <-gate: + case <-ctx.Done(): + return nil, ctx.Err() + } + return turnResult("done"), nil + }, + ) + app := newTestApp(stub) + + app.Run("hello") + if !waitForCondition(2*time.Second, func() bool { return app.IsBusy() }) { + t.Fatal("app never became busy after Run()") + } + + errCh := make(chan error, 1) + go func() { errCh <- app.WaitForIdle(5 * time.Second) }() + + // Give the goroutine a moment to enter the wait. + time.Sleep(50 * time.Millisecond) + + // rootCancel is called by Close, which should release the waiter + // before drainQueue itself observes the cancellation and clears busy. + go func() { + // Unblock the stub so Close() can proceed past wg.Wait(). + close(gate) + }() + app.Close() + + select { + case err := <-errCh: + // Either rootCtx cancellation propagated first (err = context.Canceled) + // or the drain finished cleanly first (err == nil); both are + // acceptable terminations. The key invariant is that WaitForIdle + // does not hang past Close. + if err != nil && !errors.Is(err, context.Canceled) { + t.Fatalf("WaitForIdle returned unexpected error: %v", err) + } + case <-time.After(3 * time.Second): + t.Fatal("WaitForIdle did not return after Close()") + } +} + +// TestRequestNewSessionFromExtension_NoTUI verifies the headless guard: with +// no Bubble Tea program registered the call fails fast (no busy-wait). +func TestRequestNewSessionFromExtension_NoTUI(t *testing.T) { + app := newTestApp(newStub()) + defer app.Close() + + err := app.RequestNewSessionFromExtension("hello") + if err == nil { + t.Fatal("expected error in headless mode") + } + if !strings.Contains(err.Error(), "no interactive TUI") { + t.Fatalf("expected 'no interactive TUI' error, got %q", err.Error()) + } +} + +// TestBusyTransitionsSignalIdleCh exercises the setBusyLocked invariants +// directly: a fresh App is idle (closed channel); Run() opens a new channel +// that is then closed when drainQueue exits. +func TestBusyTransitionsSignalIdleCh(t *testing.T) { + app := newTestApp(newStub("ok")) + defer app.Close() + + // Initial state: closed channel, busy==false. + busy, ch := app.idleSnapshot() + if busy { + t.Fatal("freshly constructed App should not be busy") + } + select { + case <-ch: + default: + t.Fatal("initial idleCh should already be closed") + } + + gate := make(chan struct{}) + var gateOnce sync.Once + closeGate := func() { gateOnce.Do(func() { close(gate) }) } + stub := newStubWithFuncs(func(ctx context.Context) (*kit.TurnResult, error) { + select { + case <-gate: + case <-ctx.Done(): + return nil, ctx.Err() + } + return turnResult("ok"), nil + }) + app2 := newTestApp(stub) + t.Cleanup(func() { + closeGate() + app2.Close() + }) + + app2.Run("hello") + if !waitForCondition(2*time.Second, func() bool { return app2.IsBusy() }) { + t.Fatal("app2 never became busy") + } + + _, ch2 := app2.idleSnapshot() + select { + case <-ch2: + t.Fatal("idleCh should be open while busy") + default: + } + + closeGate() + + select { + case <-ch2: + case <-time.After(3 * time.Second): + t.Fatal("idleCh was never closed after drain completed") + } +} diff --git a/internal/extensions/api.go b/internal/extensions/api.go index 21c637da..f01636e1 100644 --- a/internal/extensions/api.go +++ b/internal/extensions/api.go @@ -1,5 +1,24 @@ package extensions +import ( + "errors" +) + +// ErrAgentBusy is returned (wrapped) when an extension API call that requires +// the agent to be idle cannot proceed because the agent is still processing a +// turn or post-turn hooks. Most notably, ctx.NewSession waits for idle +// internally; if its wait deadline elapses it returns an error that wraps +// this sentinel. +// +// Extensions can detect the condition with errors.Is: +// +// if err := ctx.NewSession(prompt); err != nil { +// if errors.Is(err, ext.ErrAgentBusy) { +// // agent never settled — fall back to a queued message instead +// } +// } +var ErrAgentBusy = errors.New("agent is busy") + // --------------------------------------------------------------------------- // Internal types (used by runner, NOT exposed to Yaegi) // --------------------------------------------------------------------------- @@ -130,10 +149,24 @@ type Context struct { // expanded the same way they are for normal user input. Pass an empty // string to start an empty session. // - // Returns an error if the agent is currently busy, if a registered - // BeforeSessionSwitch handler cancels the switch, or if the new - // session file cannot be created. In non-interactive (ACP / headless) - // mode this is a no-op that returns an error. + // If the agent is currently busy when NewSession is called (for example, + // from an OnAgentEnd hook that fires before the agent fully settles, or + // while post-turn formatters/linters are still running), the call blocks + // until the agent transitions to idle. This avoids the v0.79.0 + // phase-handoff race where NewSession from OnAgentEnd would fail with + // "agent is busy" because TurnEnd fires before the busy flag clears. + // The wait has a generous internal timeout; if it elapses the returned + // error wraps ErrAgentBusy (detectable with errors.Is). + // + // Returns an error if the agent does not become idle within the wait + // window, if a registered BeforeSessionSwitch handler cancels the + // switch, or if the new session file cannot be created. In + // non-interactive (ACP / headless) mode this is a no-op that returns + // an error. + // + // Because NewSession may block, call it from a goroutine — not + // directly from inside an event handler that the agent loop is waiting + // on. // // Typical pattern — start a fresh session at the end of a phase by // reading a handoff file: @@ -145,7 +178,9 @@ type Context struct { // } // last := msgs[len(msgs)-1].Content // if strings.Contains(last, "") { - // _ = ctx.NewSession("Read @HANDOFF.md and continue the next phase.") + // go func() { + // _ = ctx.NewSession("Read @HANDOFF.md and continue the next phase.") + // }() // } // }) NewSession func(prompt string) error diff --git a/internal/extensions/symbols.go b/internal/extensions/symbols.go index 03a812fa..c7e4c5d6 100644 --- a/internal/extensions/symbols.go +++ b/internal/extensions/symbols.go @@ -28,6 +28,11 @@ func Symbols() interp.Exports { "CommandDef": reflect.ValueOf((*CommandDef)(nil)), "PrintBlockOpts": reflect.ValueOf((*PrintBlockOpts)(nil)), + // Sentinel errors. Extensions detect them with errors.Is: + // + // if errors.Is(err, ext.ErrAgentBusy) { ... } + "ErrAgentBusy": reflect.ValueOf(&ErrAgentBusy).Elem(), + // Session types "SessionMessage": reflect.ValueOf((*SessionMessage)(nil)), "ExtensionEntry": reflect.ValueOf((*ExtensionEntry)(nil)),