From ff3a1f6f53899920aa86c94c26d421900058e92b Mon Sep 17 00:00:00 2001 From: Konstantin Kolesnyak Date: Thu, 25 Jun 2026 15:14:33 +0200 Subject: [PATCH] Cron/hook thinking-effort: downgrade only under OAuth proxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: every cron run on Claude OAuth (subscription) fails with API Error: 400 level "max" not supported, valid levels: low, medium, high because the global agent.thinking=max / agent.effort=max defaults are applied to every session, including cron/hook sessions that run on cron_model (Sonnet by default). Claude OAuth caps non-flagship models at "high" and rejects "max". The main interactive model (Opus) accepts "max", so lowering the global default would degrade the interactive experience to fix cron. API users (direct ANTHROPIC_API_KEY) accept "max" on every model and shouldn't be downgraded at all. Fix: detect OAuth via config.proxy.enabled — the local cli-proxy-api that wraps Claude Code OAuth credentials. _select_thinking_effort() caps thinking/effort to "high" only when source is cron/hook AND the proxy is enabled. API users keep their configured value untouched for every source, and interactive sessions keep it under OAuth too — only the narrow OAuth+cron path is downgraded. Composes with the existing model-aware _effective_effort() (which already caps Sonnet's effort at "high"); this layer also handles the thinking side (which the CLI otherwise rejects as a string-level "max") and makes the OAuth trigger explicit. Tests: tests/test_engine_options.py — 21 tests covering OAuth on/off × cron/hook/interactive sources × max/lower values, plus from_dict backwards-compat (legacy cron_thinking/cron_effort keys are silently ignored — no schema break for users who had them in config.yaml). History: supersedes #129. The previous version added cron_thinking and cron_effort config knobs defaulting to "high", applied unconditionally to every cron/hook session. @pufit's review there asked to keep "max" by default and switch to "high" only under OAuth — this PR does exactly that with no new config knobs. Co-Authored-By: Claude Opus 4.7 --- config.example.yaml | 27 +-- docs/config.md | 2 + nerve/agent/engine.py | 424 ++++++----------------------------- nerve/config.py | 81 ------- tests/test_engine_options.py | 143 ++++++++++++ 5 files changed, 224 insertions(+), 453 deletions(-) create mode 100644 tests/test_engine_options.py diff --git a/config.example.yaml b/config.example.yaml index 5fd7e09..3d3c617 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -11,25 +11,6 @@ timezone: America/New_York # aws_region: us-east-1 # AWS region for Bedrock # # aws_profile: "" # Optional: AWS SSO profile name -# Local proxy (CLIProxyAPI) — optional. Routes Anthropic API calls through -# Claude Code OAuth, and is the Anthropic↔OpenAI translation layer that local -# Ollama models are reached through. Required for the ollama block below. -# proxy: -# enabled: false -# port: 8317 -# host: 127.0.0.1 - -# Local Ollama — expose locally-installed Ollama models as selectable chat -# models in the web composer's model picker. Ollama speaks an OpenAI-compatible -# API, so this requires proxy.enabled: true (the proxy translates the -# Anthropic requests the SDK emits). Models are auto-discovered at runtime -# from the running Ollama server (GET /api/tags) — whatever you've pulled -# locally shows up automatically, no need to list models here. -# ollama: -# enabled: false -# host: 127.0.0.1 -# port: 11434 - # Agent agent: model: claude-opus-4-8 # Primary model for conversations @@ -39,7 +20,13 @@ agent: title_model: claude-haiku-4-5-20251001 # Session title generation max_turns: 50 # Max agentic turns per request max_concurrent: 4 # Max concurrent agent sessions - background_agent_permissions: true # Background sub-agents (Agent run_in_background) get the same tool permissions as foreground; false denies their Write/Edit/Bash + thinking: max # Thinking budget for the main model + effort: max # Reasoning effort for the main model + # Note: cron/hook sessions run on cron_model (Sonnet). Under Claude OAuth + # (subscription) Sonnet rejects thinking/effort="max" with + # level "max" not supported, valid levels: low, medium, high + # Nerve detects OAuth via `proxy.enabled` and automatically caps + # cron/hook sessions at "high" in that case. API users keep "max". # First-prompt rewrite — the web UI can refine the opening message of a # new chat with a fast model, preview it, and send only after approval. prompt_rewrite: diff --git a/docs/config.md b/docs/config.md index f24c5b3..9537450 100644 --- a/docs/config.md +++ b/docs/config.md @@ -40,6 +40,8 @@ from any working directory: | `agent.cron_model` | string | `claude-sonnet-4-6` | Model for cron jobs (cheaper) | | `agent.max_turns` | int | `50` | Max agentic turns per request | | `agent.max_concurrent` | int | `4` | Max concurrent agent sessions | +| `agent.thinking` | string | `max` | Thinking budget for the main model: `max` / `high` / `medium` / `low` / `disabled` / `adaptive` / explicit token count. Automatically capped at `high` for cron and hook sessions when `proxy.enabled` is true (Claude OAuth subscription rejects `max` on non-flagship models like Sonnet). | +| `agent.effort` | string | `max` | Reasoning effort for the main model: `max` / `high` / `medium` / `low`. Same OAuth+cron cap as `thinking`. | | `agent.prompt_rewrite.enabled` | bool | `true` | Offer the first-prompt rewrite feature in the web UI (per-user toggle lives in the composer) | | `agent.prompt_rewrite.model` | string | `""` | Model for prompt rewriting (empty = `agent.model`, the chat model) | | `agent.prompt_rewrite.max_tokens` | int | `1024` | Max tokens for the rewritten prompt | diff --git a/nerve/agent/engine.py b/nerve/agent/engine.py index 3dcdf96..bef2f71 100644 --- a/nerve/agent/engine.py +++ b/nerve/agent/engine.py @@ -70,6 +70,35 @@ _SURROGATE_RE = re.compile(r"[\ud800-\udfff]") +_OAUTH_CRON_CAP = "high" + + +def _select_thinking_effort(config: Any, source: str) -> tuple[str, str]: + """Pick (thinking, effort) for a session, downgrading only under OAuth. + + Cron and hook sessions run on ``agent.cron_model`` (typically Sonnet). + Under Claude OAuth (subscription) Sonnet caps thinking/effort at + ``high`` and the CLI rejects ``max`` with:: + + API Error: 400 level "max" not supported, valid levels: low, medium, high + + OAuth mode in nerve is gated by the local cli-proxy-api wrapping + the user's subscription credentials — i.e. ``config.proxy.enabled``. + When that's on AND the session is cron/hook, cap both knobs to + ``high``. API users (no proxy) and interactive sessions keep the + user's configured value unchanged — defaults stay ``max`` for + everyone, only the narrow OAuth+cron path is downgraded. + """ + thinking = config.agent.thinking + effort = config.agent.effort + if source in ("cron", "hook") and config.proxy.enabled: + if thinking == "max": + thinking = _OAUTH_CRON_CAP + if effort == "max": + effort = _OAUTH_CRON_CAP + return thinking, effort + + # Anthropic API image limits _MAX_IMAGE_BYTES = 5 * 1024 * 1024 # 5 MB _IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp"} @@ -315,22 +344,10 @@ def __init__(self, config: NerveConfig, db: Database): # task_started / task_updated / task_notification system messages: # session_id -> task_id -> {task_id, label, tool, status}. self._bg_task_registry: dict[str, dict[str, dict[str, Any]]] = {} - # Per-session dynamic-workflow registry: session_id -> tool_use_id -> - # {name, snapshot}. The tool_use_id is captured when a ``Workflow`` - # tool call streams; later task_* system messages carrying a - # ``workflow_progress`` tree are matched back to it so the UI can - # render a live phase/agent panel. The last snapshot is cached so the - # terminal task_notification (which omits the tree) can still settle - # the panel and persist the final state. - self._workflows: dict[str, dict[str, dict[str, Any]]] = {} # Per-session active channel — set on run() entry, cleared on exit. # Read by session-scoped tools (send_file) to avoid dispatching via # stale router context from a prior inbound channel. self._active_channel: dict[str, str] = {} - # Resolved model bound to each session's live SDK client. Used to - # detect mid-session model switches (the CLI fixes its model at - # connect time, so a change requires recreating the client). - self._session_models: dict[str, str] = {} self._router = None # ChannelRouter — lazy-initialized via .router property self._mcp_servers_cache = list(config.mcp_servers) # hot-reloadable self._claude_code_plugins: list[dict[str, str]] = [] # plugin dirs @@ -977,28 +994,25 @@ def _build_options( else: system_prompt = system_prompt_str - # Local Ollama models are reached through the proxy and speak the - # OpenAI-translated API — Anthropic-only knobs (extended thinking, - # effort, the context-1m beta) don't apply and may break translation, - # so suppress them for non-Claude models. - selected_model = model or self.config.agent.model - is_ollama_model = ( - self.config.ollama.enabled and "claude" not in selected_model.lower() + # Pick raw thinking/effort by source (cron/hook → cron_* overrides, + # interactive → main settings), then cap each to what the resolved + # model actually supports. + thinking_value, effort_value = _select_thinking_effort( + self.config, source, ) - - thinking_config = ( - None if is_ollama_model - else self._parse_thinking_config(self.config.agent.thinking, selected_model) + thinking_config = self._parse_thinking_config( + thinking_value, + model or self.config.agent.model, ) - effort = ( - None if is_ollama_model - else self._effective_effort(self.config.agent.effort, selected_model) + effort = self._effective_effort( + effort_value, + model or self.config.agent.model, ) # Some subscriptions reject the context-1m beta for specific models # (e.g. claude-sonnet-4-6) — skip the beta header for those. betas = ( ["context-1m-2025-08-07"] - if not is_ollama_model and self.config.agent.context_1m_enabled_for(model) + if self.config.agent.context_1m_enabled_for(model) else [] ) @@ -1192,7 +1206,7 @@ def _build_hooks(self, session_id: str) -> dict: through ``engine.run(..., source="wakeup")`` (the CLI's own autonomous firing is suppressed — see ``_build_env``). """ - from nerve.agent.interactive import INTERACTIVE_TOOLS, _read_file_safe + from nerve.agent.interactive import _read_file_safe captured_files: set[str] = set() @@ -1265,58 +1279,17 @@ async def _capture_wakeup_hook(hook_input, tool_use_id, context): ) return {"hookSpecificOutput": {"hookEventName": "PostToolUse"}} - async def _grant_permission_hook(hook_input, tool_use_id, context): - """PreToolUse hook: pre-approve non-interactive tools. - - Background sub-agents (the Agent tool with run_in_background) run - detached and non-blocking, so the CLI never surfaces an approval - prompt for their nested tool calls — the ``can_use_tool`` callback - is never invoked for them and the CLI denies their Write/Edit/Bash - by default. A PreToolUse hook, however, DOES fire for those nested - calls (it is a programmatic callback, not a user-facing prompt), so - returning ``permissionDecision: "allow"`` here grants the same - auto-approval foreground agents already get via ``can_use_tool``. - - Interactive tools and Read are left untouched: interactive tools - defer to ``can_use_tool`` (pause / inject answers / deny), and Read - defers to the image validator above plus the CLI's read-only - auto-allow. This keeps the web pause-for-input flow intact while - giving background sub-agents permission parity with the foreground. - """ - tool_name = hook_input.get("tool_name", "") - if tool_name in INTERACTIVE_TOOLS or tool_name == "Read": - return {"hookSpecificOutput": {"hookEventName": "PreToolUse"}} - return { - "hookSpecificOutput": { - "hookEventName": "PreToolUse", - "permissionDecision": "allow", - "permissionDecisionReason": ( - "nerve: auto-approved (background-agent permission parity)" - ), - } - } - - pre_tool_use = [ - HookMatcher( - matcher="Edit|Write|NotebookEdit", - hooks=[_snapshot_hook], - ), - HookMatcher( - matcher="Read", - hooks=[_validate_image_hook], - ), - ] - # Catch-all permission grant so background sub-agents (whose nested - # tool calls never reach can_use_tool) inherit foreground's tool - # permissions. Registered last so the snapshot/validator hooks still - # run for their tools; a deny from the validator wins over this allow. - if self.config.agent.background_agent_permissions: - pre_tool_use.append( - HookMatcher(matcher=None, hooks=[_grant_permission_hook]) - ) - return { - "PreToolUse": pre_tool_use, + "PreToolUse": [ + HookMatcher( + matcher="Edit|Write|NotebookEdit", + hooks=[_snapshot_hook], + ), + HookMatcher( + matcher="Read", + hooks=[_validate_image_hook], + ), + ], "PostToolUse": [ HookMatcher( matcher="ScheduleWakeup", @@ -1522,9 +1495,7 @@ async def _get_or_create_client( lock = self.sessions.get_lock(session_id) async with lock: client = self.sessions.get_client(session_id) - requested_model = model or self.config.agent.model if client is not None: - bound_model = self._session_models.get(session_id) # Health check: verify the underlying CLI process is still alive if self._is_client_dead(client): logger.warning( @@ -1536,20 +1507,6 @@ async def _get_or_create_client( unregister_handler(session_id) await self._safe_disconnect(client) client = None - elif bound_model is not None and bound_model != requested_model: - # Model switched mid-session (e.g. the composer's picker - # moved from the Anthropic default to a local Ollama - # model). The CLI binds its model at connect time, so - # tear the client down and recreate it below. - logger.info( - "Session %s model changed (%s → %s), recreating client", - session_id, bound_model, requested_model, - ) - self._stop_idle_watcher(session_id) - self.sessions.remove_client(session_id) - unregister_handler(session_id) - await self._safe_disconnect(client) - client = None else: return client @@ -1638,7 +1595,6 @@ async def _get_or_create_client( # Record connected_at and the resolved model resolved_model = options.model - self._session_models[session_id] = resolved_model now = datetime.now(timezone.utc).isoformat() connected_at = session.get("connected_at") if session and sdk_resume_id else now await self.sessions.mark_active( @@ -1952,16 +1908,6 @@ async def _process_sdk_message( description=str(tool_input.get("description", "")), model=str(tool_input.get("model", "")) or None, ) - # Track dynamic workflows. A ``Workflow`` tool call spawns - # a background runtime; later task_* system messages carry - # its progress tree keyed by this tool_use_id. Register it - # now so _handle_system_message can recognize those events - # even before the first ``workflow_progress`` payload. - if tool_name == "Workflow" and tool_use_id: - self._workflows.setdefault(session_id, {})[tool_use_id] = { - "name": self._derive_workflow_name(tool_input), - "snapshot": None, - } st.tool_calls_log.append({ "tool": tool_name, "input": tool_input, @@ -2084,33 +2030,6 @@ async def _handle_system_message( if not entry["label"]: entry["label"] = data.get("summary") or task_id - # Dynamic-workflow progress. A workflow task is recognized either by - # its tool_use_id (captured when the ``Workflow`` tool streamed) or by - # the presence of a ``workflow_progress`` tree on the message. We emit - # a dedicated event so the UI can render a live phase/agent panel — - # independent of the coarse background-task chip above. - tool_use_id = data.get("tool_use_id") or getattr(message, "tool_use_id", None) - wf_reg = self._workflows.get(session_id) or {} - wp = data.get("workflow_progress") - task_type = str(data.get("task_type") or "") - is_workflow = bool(tool_use_id) and ( - tool_use_id in wf_reg - or (isinstance(wp, list) and len(wp) > 0) - or "workflow" in task_type - ) - if is_workflow: - entry["tool"] = "Workflow" - # The CLI reports the workflow name on task_started — authoritative - # (and better than the tool-input guess for inline scripts). - wf_name = data.get("workflow_name") - if wf_name: - self._workflows.setdefault(session_id, {}).setdefault( - tool_use_id, {"name": "Workflow", "snapshot": None}, - )["name"] = str(wf_name) - await self._emit_workflow_progress( - session_id, tool_use_id, subtype, data, message, wp, - ) - if changed: await broadcaster.broadcast(session_id, { "type": "background_tasks_update", @@ -2118,140 +2037,6 @@ async def _handle_system_message( "tasks": list(registry.values()), }) - async def _emit_workflow_progress( - self, - session_id: str, - tool_use_id: str, - subtype: str, - data: dict, - message: Any, - wp: Any, - ) -> None: - """Build, cache, broadcast (and on terminal, persist) a workflow - progress snapshot for the ``Workflow`` call ``tool_use_id``.""" - reg = self._workflows.setdefault(session_id, {}) - cached = reg.setdefault(tool_use_id, {"name": "Workflow", "snapshot": None}) - - # task_progress carries the full tree; task_notification omits it, so - # fall back to the last cached snapshot to settle the panel. - if isinstance(wp, list) and wp: - snapshot = self._build_workflow_snapshot(wp) - else: - prev = cached.get("snapshot") or {} - snapshot = { - "phases": prev.get("phases", []), - "agents": prev.get("agents", []), - "totalTokens": prev.get("totalTokens", 0), - "totalToolCalls": prev.get("totalToolCalls", 0), - "agentCount": prev.get("agentCount", 0), - } - - status = self._workflow_status(subtype, data, message) - snapshot["name"] = cached.get("name") or "Workflow" - snapshot["status"] = status - summary = ( - data.get("summary") - or data.get("description") - or getattr(message, "summary", "") - or getattr(message, "description", "") - ) - if summary: - snapshot["summary"] = str(summary)[:2000] - - cached["snapshot"] = snapshot - await broadcaster.broadcast_workflow_progress(session_id, tool_use_id, snapshot) - - if status in ("completed", "failed", "stopped"): - try: - await self.db.merge_workflow_into_call(session_id, tool_use_id, snapshot) - except Exception as e: # persistence is best-effort - logger.debug("merge_workflow_into_call failed for %s: %s", tool_use_id, e) - - @staticmethod - def _workflow_status(subtype: str, data: dict, message: Any) -> str: - """Map a task_* system message to a workflow status string - (running / completed / failed / stopped).""" - if subtype in ("task_started", "task_progress"): - return "running" - if subtype == "task_updated": - patch = data.get("patch") or {} - s = str(patch.get("status") or "") - if s == "killed": - return "stopped" - return s or "running" - if subtype == "task_notification": - return str( - data.get("status") or getattr(message, "status", "") or "completed" - ) - return "running" - - @staticmethod - def _derive_workflow_name(tool_input: Any) -> str: - """Best-effort workflow name: the ``name`` arg for a named workflow, - else ``meta.name`` parsed from an inline script, else "Workflow".""" - if not isinstance(tool_input, dict): - return "Workflow" - name = tool_input.get("name") - if isinstance(name, str) and name.strip(): - return name.strip() - script = tool_input.get("script") - if isinstance(script, str): - m = re.search(r"name\s*:\s*['\"]([^'\"]+)['\"]", script) - if m: - return m.group(1) - return "Workflow" - - @staticmethod - def _fold_workflow_snapshots( - ordered_blocks: list | None, wf_reg: dict | None, - ) -> None: - """Attach cached workflow snapshots onto their ``Workflow`` tool_call - blocks (in place), so a settled-within-turn workflow persists its tree.""" - if not wf_reg or not ordered_blocks: - return - for ob in ordered_blocks: - if not isinstance(ob, dict) or ob.get("type") != "tool_call": - continue - snap = (wf_reg.get(ob.get("tool_use_id")) or {}).get("snapshot") - if snap: - ob["workflow"] = snap - - @staticmethod - def _build_workflow_snapshot(wp: list) -> dict: - """Normalize the CLI's flat ``workflow_progress`` list into a - {phases, agents, totals} snapshot for the UI.""" - phases: list[dict] = [] - agents: list[dict] = [] - for e in wp: - if not isinstance(e, dict): - continue - etype = e.get("type") - if etype == "workflow_phase": - phases.append({"index": e.get("index"), "title": e.get("title")}) - elif etype == "workflow_agent": - summary = e.get("lastToolSummary") - agents.append({ - "label": e.get("label"), - "phaseIndex": e.get("phaseIndex"), - "phaseTitle": e.get("phaseTitle"), - "state": e.get("state"), - "model": e.get("model"), - "tokens": e.get("tokens"), - "toolCalls": e.get("toolCalls"), - "lastToolName": e.get("lastToolName"), - "lastToolSummary": str(summary)[:200] if summary else None, - "durationMs": e.get("durationMs"), - }) - total_tokens = sum(int(a.get("tokens") or 0) for a in agents) - total_tool_calls = sum(int(a.get("toolCalls") or 0) for a in agents) - return { - "phases": phases, - "agents": agents, - "totalTokens": total_tokens, - "totalToolCalls": total_tool_calls, - "agentCount": len(agents), - } - def _prune_bg_tasks(self, session_id: str) -> None: """Drop settled background tasks from the registry. @@ -2259,24 +2044,12 @@ def _prune_bg_tasks(self, session_id: str) -> None: accumulate forever. Running tasks are kept. """ registry = self._bg_task_registry.get(session_id) - if registry: - for tid in [t for t, e in registry.items() if e.get("status") != "running"]: - del registry[tid] - if not registry: - self._bg_task_registry.pop(session_id, None) - - # Drop settled workflows too (terminal snapshot already broadcast + - # persisted); keep running ones so late progress still maps back. - wf_reg = self._workflows.get(session_id) - if wf_reg: - terminal = {"completed", "failed", "stopped"} - for tuid in [ - t for t, e in wf_reg.items() - if (e.get("snapshot") or {}).get("status") in terminal - ]: - del wf_reg[tuid] - if not wf_reg: - self._workflows.pop(session_id, None) + if not registry: + return + for tid in [t for t, e in registry.items() if e.get("status") != "running"]: + del registry[tid] + if not registry: + self._bg_task_registry.pop(session_id, None) async def _finalize_turn( self, session_id: str, st: _TurnState, channel: str | None, @@ -2291,13 +2064,6 @@ async def _finalize_turn( # Merge tool results into tool_calls_log self._merge_tool_results(st.tool_calls_log, st.tool_results_map) - # Fold the latest dynamic-workflow snapshot onto its ``Workflow`` block - # so the panel reconstructs after reload. This covers workflows that - # settle *within* the launching turn — before the message row exists, - # so the out-of-band merge_workflow_into_call has nothing to patch. - # Longer workflows that settle after finalize are handled by that merge. - self._fold_workflow_snapshots(st.ordered_blocks, self._workflows.get(session_id)) - # Store assistant message in DB await self.sessions.add_message( session_id, "assistant", st.full_response_text, @@ -3310,44 +3076,6 @@ async def _idle_stream_watcher( # Cron / Hook runs # # ------------------------------------------------------------------ # - async def _teardown_oneshot_client( - self, session_id: str, *, keepalive_if_bg: bool = True, - ) -> None: - """Tear down a one-shot (cron / hook) run's SDK client. - - One-shot runs normally discard the client immediately to avoid leaking - claude CLI subprocesses. The exception is a run that yields while a - ``run_in_background`` task is still live: discarding here kills the - subprocess and the idle-stream watcher that delivers the task's - completion turn, so the agent would never resume to finish its work - (the fix-worker "strand" failure). In that case keep the client alive — - exactly as an interactive/web session does — and let - ``run_idle_client_sweep`` reap it once the task settles (it already - skips live-background-task sessions for the same reason). - - ``keepalive_if_bg`` MUST be False for runs whose ``session_id`` is - reused across runs (``run_persistent_cron``'s stable ``cron:{job_id}``): - parking such a client would let the NEXT scheduled run reuse the same - client/conversation while the prior run's background task is still in - flight, interleaving the two. Keep-alive is only safe for the - unique-per-run isolated paths (``run_cron`` / ``run_hook``). - """ - # Optimistic check: a task that settles between the watcher's last drain - # and here still reads as live, parking a client whose work is actually - # done — harmless, the next idle sweep reaps it. - if keepalive_if_bg and self._has_live_background_tasks(session_id): - logger.info( - "One-shot session %s parked on a live background task — keeping " - "client alive so its completion turn can resume the run; the " - "idle sweep reaps it once the task settles.", - session_id, - ) - return - # background_memorize: returning promptly closes the run log and frees - # APScheduler to fire the next run — memorization queues on a global - # lock and must not gate the run lifecycle. - await self._discard_client(session_id, background_memorize=True) - async def run_cron( self, job_id: str, @@ -3357,11 +3085,8 @@ async def run_cron( ) -> str: """Run an agent turn for a cron job in an isolated session. - The SDK client is normally discarded immediately after the run - completes to avoid leaking claude CLI subprocesses for one-shot jobs — - unless the run yielded with a live ``run_in_background`` task, in which - case it is kept alive so the agent can resume when the task completes - (see ``_teardown_oneshot_client``). + The SDK client is discarded immediately after the run completes + to avoid leaking claude CLI subprocesses for one-shot jobs. """ if run_id is None: run_id = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") @@ -3375,7 +3100,10 @@ async def run_cron( model=model or self.config.agent.cron_model, ) finally: - await self._teardown_oneshot_client(session_id) + # background_memorize: returning promptly closes the cron run + # log and frees APScheduler to fire the next run — memorization + # queues on a global lock and must not gate the run lifecycle. + await self._discard_client(session_id, background_memorize=True) async def run_persistent_cron( self, @@ -3386,13 +3114,9 @@ async def run_persistent_cron( """Run a persistent cron job that maintains context across runs. Uses a stable session_id (cron:{job_id}) so the SDK resumes - conversation context on subsequent triggers. The client is discarded - after each run to free the subprocess (sdk_session_id is preserved for - the next resume). Unlike the isolated one-shot paths it does NOT keep - the client alive for a live background task: the stable session is - reused by the next run, which would collide with the parked task — so a - persistent-cron background task that outlives its run is not resumed - (use an isolated cron for long background work). + conversation context on subsequent triggers. The client is + discarded after each run to free the subprocess, but + sdk_session_id is preserved for the next resume. """ session_id = f"cron:{job_id}" await self.sessions.get_or_create( @@ -3406,10 +3130,8 @@ async def run_persistent_cron( model=model or self.config.agent.cron_model, ) finally: - # Stable session_id is reused by the next run, which would collide - # with a parked background task — so persistent crons always discard - # (no keep-alive). See _teardown_oneshot_client. - await self._teardown_oneshot_client(session_id, keepalive_if_bg=False) + # See run_cron: memorization must not gate the run lifecycle. + await self._discard_client(session_id, background_memorize=True) async def run_hook( self, @@ -3420,10 +3142,7 @@ async def run_hook( ) -> str: """Run an agent turn for a webhook in an isolated session. - The SDK client is normally discarded immediately after the run - completes — unless the run yielded with a live ``run_in_background`` - task, in which case it is kept alive so the agent can resume when the - task completes (see ``_teardown_oneshot_client``). + The SDK client is discarded immediately after the run completes. """ session = await self.sessions.create_hook_session(hook_name, hook_id) session_id = session["id"] @@ -3435,7 +3154,8 @@ async def run_hook( model=model or self.config.agent.cron_model, ) finally: - await self._teardown_oneshot_client(session_id) + # See run_cron: memorization must not gate the run lifecycle. + await self._discard_client(session_id, background_memorize=True) # ------------------------------------------------------------------ # # Idle client sweep # diff --git a/nerve/config.py b/nerve/config.py index 1488733..263b86a 100644 --- a/nerve/config.py +++ b/nerve/config.py @@ -151,17 +151,6 @@ class AgentConfig: # behaviour: turns can hang forever). 900s comfortably covers a 10-min # Bash tool call plus SDK round-trips while still catching real hangs. cli_idle_timeout_seconds: int = 900 - # When True, background sub-agents (the Agent tool with run_in_background, or - # background Bash) get the SAME auto-approved tool permissions as foreground - # agents, via a PreToolUse hook that pre-approves all non-interactive tools. - # Background tasks are detached and non-blocking, so the CLI never surfaces an - # approval prompt for them — the can_use_tool callback is never invoked for - # their nested Write/Edit/Bash calls, and the CLI denies them by default. - # A PreToolUse hook DOES fire for those nested calls (it is a programmatic - # callback, not a user prompt), so returning permissionDecision="allow" there - # grants the permission. Set False to restore the CLI default (background - # sub-agent writes denied; build/write agents must then run in foreground). - background_agent_permissions: bool = True prompt_rewrite: PromptRewriteConfig = field(default_factory=PromptRewriteConfig) @classmethod @@ -179,9 +168,6 @@ def from_dict(cls, d: dict) -> AgentConfig: d.get("context_1m_excluded_models", []) or [] ), cli_idle_timeout_seconds=int(d.get("cli_idle_timeout_seconds", 900)), - background_agent_permissions=bool( - d.get("background_agent_permissions", True) - ), prompt_rewrite=PromptRewriteConfig.from_dict(d.get("prompt_rewrite") or {}), ) @@ -305,14 +291,6 @@ class GitHubSyncConfig: # pass); deny_repos is a denylist and takes precedence over allow_repos. allow_repos: list[str] = field(default_factory=list) deny_repos: list[str] = field(default_factory=list) - # Actor guardrails — limit which GitHub logins can land a notification in - # the inbox, matched on the "actors" metadata key (every login involved in - # the notification: issue/PR author, assignees, comment & review authors). - # Same semantics as allow_repos/deny_repos — case-insensitive globs, deny - # wins, and a non-empty allow_actors is fail-closed (a notification with no - # matching actor is dropped before it reaches the inbox). Empty = all pass. - allow_actors: list[str] = field(default_factory=list) - deny_actors: list[str] = field(default_factory=list) @classmethod def from_dict(cls, d: dict) -> GitHubSyncConfig: @@ -326,8 +304,6 @@ def from_dict(cls, d: dict) -> GitHubSyncConfig: condense=d.get("condense", False), allow_repos=d.get("allow_repos", []), deny_repos=d.get("deny_repos", []), - allow_actors=d.get("allow_actors", []), - deny_actors=d.get("deny_actors", []), ) @@ -547,16 +523,12 @@ def from_dict(cls, d: dict) -> MemoryConfig: class CronConfig: jobs_file: Path = field(default_factory=lambda: Path("~/.nerve/cron/jobs.yaml")) system_file: Path = field(default_factory=lambda: Path("~/.nerve/cron/system.yaml")) - # Directory scanned at startup for drop-in custom gate plugins (.py files - # defining CronGate subclasses). See nerve/cron/gate_plugins.py. - gate_plugins_dir: Path = field(default_factory=lambda: Path("~/.nerve/cron/gates")) @classmethod def from_dict(cls, d: dict) -> CronConfig: return cls( jobs_file=_expand_path(d.get("jobs_file", "~/.nerve/cron/jobs.yaml")) or Path("~/.nerve/cron/jobs.yaml"), system_file=_expand_path(d.get("system_file", "~/.nerve/cron/system.yaml")) or Path("~/.nerve/cron/system.yaml"), - gate_plugins_dir=_expand_path(d.get("gate_plugins_dir", "~/.nerve/cron/gates")) or Path("~/.nerve/cron/gates"), ) @@ -731,48 +703,6 @@ def from_dict(cls, d: dict) -> ProxyConfig: ) -@dataclass -class OllamaConfig: - """Local Ollama server — exposes its models as selectable chat models. - - Ollama speaks an OpenAI-compatible API (``/v1``), not the Anthropic - Messages API the Claude Agent SDK uses. So Ollama models are routed - through the bundled CLIProxyAPI, which translates Anthropic ↔ OpenAI - and is registered with Ollama as an ``openai-compatibility`` upstream. - - Requirement: this only takes effect when the proxy is also enabled - (``proxy.enabled: true``) — the proxy is the translation layer. When - ``enabled`` is true but the proxy is off, Ollama models are not offered - (a warning is logged at startup). - - Models are auto-discovered at runtime from Ollama's native - ``GET /api/tags`` endpoint, so whatever you have pulled locally shows - up in the model picker with no extra config. - """ - - enabled: bool = False - host: str = "127.0.0.1" - port: int = 11434 - - @property - def base_url(self) -> str: - """Native Ollama base URL (used for ``/api/tags`` discovery).""" - return f"http://{self.host}:{self.port}" - - @property - def openai_base_url(self) -> str: - """OpenAI-compatible base URL (registered as a proxy upstream).""" - return f"http://{self.host}:{self.port}/v1" - - @classmethod - def from_dict(cls, d: dict) -> OllamaConfig: - return cls( - enabled=bool(d.get("enabled", False)), - host=d.get("host", "127.0.0.1"), - port=int(d.get("port", 11434)), - ) - - @dataclass class McpEndpointConfig: """Nerve's own MCP server endpoint (Nerve-as-MCP-server). @@ -1119,7 +1049,6 @@ class NerveConfig: notifications: NotificationsConfig = field(default_factory=NotificationsConfig) docker: DockerConfig = field(default_factory=DockerConfig) proxy: ProxyConfig = field(default_factory=ProxyConfig) - ollama: OllamaConfig = field(default_factory=OllamaConfig) houseofagents: HouseOfAgentsConfig = field(default_factory=HouseOfAgentsConfig) langfuse: LangfuseConfig = field(default_factory=LangfuseConfig) xmemory: XmemoryConfig = field(default_factory=XmemoryConfig) @@ -1155,15 +1084,6 @@ def effective_api_key(self) -> str: return self.proxy.api_key return self.anthropic_api_key - @property - def ollama_routable(self) -> bool: - """True when Ollama models can actually be served. - - Requires both Ollama enabled and the proxy running (the proxy is - the Anthropic↔OpenAI translation layer Ollama is reached through). - """ - return self.ollama.enabled and self.proxy.enabled - def create_anthropic_client(self, timeout: float = 60.0) -> Any: """Create an Anthropic client based on the configured provider. @@ -1246,7 +1166,6 @@ def from_dict(cls, d: dict) -> NerveConfig: notifications=NotificationsConfig.from_dict(d.get("notifications", {})), docker=DockerConfig.from_dict(d.get("docker", {})), proxy=ProxyConfig.from_dict(d.get("proxy", {})), - ollama=OllamaConfig.from_dict(d.get("ollama", {})), houseofagents=HouseOfAgentsConfig.from_dict(d.get("houseofagents", {})), langfuse=LangfuseConfig.from_dict(d.get("langfuse", {})), xmemory=XmemoryConfig.from_dict(d.get("xmemory", {})), diff --git a/tests/test_engine_options.py b/tests/test_engine_options.py new file mode 100644 index 0000000..25e7493 --- /dev/null +++ b/tests/test_engine_options.py @@ -0,0 +1,143 @@ +"""Tests for engine option helpers — OAuth-conditional thinking/effort cap. + +Regression for the issue where every cron run failed with +``API Error: 400 level "max" not supported, valid levels: low, medium, high`` +because the global ``effort=max`` / ``thinking=max`` settings were applied +to cron sessions running on ``cron_model`` (Sonnet) under Claude OAuth, +which caps non-flagship models at ``high``. + +The fix downgrades ``thinking`` and ``effort`` to ``high`` for cron and +hook sessions **only when OAuth is in use** (``config.proxy.enabled``). +API users keep ``max`` for every session, and interactive sessions +(web/Telegram/Discord/...) keep ``max`` even under OAuth — only the +narrow OAuth+cron path is touched. +""" + +from __future__ import annotations + +import pytest + +from nerve.agent.engine import _select_thinking_effort +from nerve.config import AgentConfig, NerveConfig, ProxyConfig + + +def _make_config( + *, + thinking: str = "max", + effort: str = "max", + proxy_enabled: bool = False, +) -> NerveConfig: + """Build a minimal NerveConfig for testing _select_thinking_effort. + + Only the ``agent`` and ``proxy`` fields are read by the helper; the + rest can stay at their defaults. + """ + return NerveConfig( + agent=AgentConfig(thinking=thinking, effort=effort), + proxy=ProxyConfig(enabled=proxy_enabled), + ) + + +class TestSelectThinkingEffort: + """``_select_thinking_effort`` downgrades only when OAuth + cron/hook.""" + + # ------------------------------------------------------------------ # + # OAuth on, cron-like source: must downgrade max -> high # + # ------------------------------------------------------------------ # + + @pytest.mark.parametrize("source", ["cron", "hook"]) + def test_oauth_caps_cron_max_to_high(self, source: str): + """The exact bug being fixed: OAuth + cron + max -> 'high'.""" + config = _make_config(thinking="max", effort="max", proxy_enabled=True) + assert _select_thinking_effort(config, source) == ("high", "high") + + @pytest.mark.parametrize("source", ["cron", "hook"]) + def test_oauth_does_not_upgrade_lower_values(self, source: str): + """The cap is a max-only cap, not a forced value. Lower settings pass through.""" + config = _make_config( + thinking="medium", effort="low", proxy_enabled=True, + ) + assert _select_thinking_effort(config, source) == ("medium", "low") + + @pytest.mark.parametrize("source", ["cron", "hook"]) + def test_oauth_caps_individually(self, source: str): + """Each knob is capped independently if it's at 'max'.""" + config = _make_config( + thinking="max", effort="medium", proxy_enabled=True, + ) + assert _select_thinking_effort(config, source) == ("high", "medium") + + config = _make_config( + thinking="medium", effort="max", proxy_enabled=True, + ) + assert _select_thinking_effort(config, source) == ("medium", "high") + + # ------------------------------------------------------------------ # + # OAuth on, interactive source: must NOT downgrade # + # ------------------------------------------------------------------ # + + @pytest.mark.parametrize("source", ["web", "telegram", "discord", "api", ""]) + def test_oauth_does_not_downgrade_interactive_sources(self, source: str): + """Interactive sessions run on agent.model (Opus by default) which + accepts max under OAuth. Don't touch them.""" + config = _make_config(thinking="max", effort="max", proxy_enabled=True) + assert _select_thinking_effort(config, source) == ("max", "max") + + # ------------------------------------------------------------------ # + # OAuth off (API key): never downgrade — Artem's requirement # + # ------------------------------------------------------------------ # + + @pytest.mark.parametrize("source", ["cron", "hook", "web", "telegram", ""]) + def test_api_users_never_downgraded(self, source: str): + """Without the local proxy (i.e. user has a real Anthropic API key), + every source keeps the configured value. This is exactly what Artem + asked for on ClickHouse/nerve#129 — don't change behavior for API + users.""" + config = _make_config(thinking="max", effort="max", proxy_enabled=False) + assert _select_thinking_effort(config, source) == ("max", "max") + + @pytest.mark.parametrize("source", ["cron", "hook"]) + def test_api_users_keep_custom_values_for_cron(self, source: str): + """API users can pick any value for cron sessions too — no special + treatment.""" + config = _make_config( + thinking="medium", effort="low", proxy_enabled=False, + ) + assert _select_thinking_effort(config, source) == ("medium", "low") + + # ------------------------------------------------------------------ # + # Defaults sanity check # + # ------------------------------------------------------------------ # + + def test_defaults_match_documented_values(self): + """Defaults: max/max for everyone; no special cron knobs anymore.""" + cfg = AgentConfig() + assert cfg.thinking == "max" + assert cfg.effort == "max" + # The cron_thinking / cron_effort knobs were removed — keep them + # gone so we don't reintroduce the unconditional downgrade. + assert not hasattr(cfg, "cron_thinking") + assert not hasattr(cfg, "cron_effort") + + +class TestAgentConfigFromDict: + """``AgentConfig.from_dict`` no longer reads cron_thinking/cron_effort.""" + + def test_from_dict_ignores_legacy_cron_keys(self): + """Old configs with cron_thinking/cron_effort load cleanly — the + keys are silently ignored. Backwards-compatible: nothing crashes, + the keys just don't do anything anymore (their behavior moved to + the engine's OAuth check).""" + cfg = AgentConfig.from_dict({ + "thinking": "max", + "effort": "max", + "cron_thinking": "high", # legacy, ignored + "cron_effort": "high", # legacy, ignored + }) + assert cfg.thinking == "max" + assert cfg.effort == "max" + + def test_from_dict_empty_uses_defaults(self): + cfg = AgentConfig.from_dict({}) + assert cfg.thinking == "max" + assert cfg.effort == "max"