Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions desktop/src/apps/ObservatoryApp.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,58 @@ describe("ObservatoryApp", () => {
});
});

it("surfaces an error when a steer post is rejected by the server", async () => {
const fetchMock = mockFetch({
"GET /api/observatory/fleet": { ok: true, body: fleetBody },
"GET /api/observatory/throttle": { ok: true, body: { global: null, lanes: {} } },
"POST /api/observatory/pause": { ok: false, status: 403, body: { detail: "forbidden" } },
});
vi.stubGlobal("fetch", fetchMock);
render(<ObservatoryApp windowId="w1" />);
await flush();

fireEvent.click(screen.getByRole("button", { name: /pause queue/i }));
await flush();

await waitFor(() =>
expect(screen.getByText(/could not update the pause state/i)).toBeTruthy(),
);
});

it("clears the steer error after a subsequent successful post", async () => {
let pauseOk = false;
const fetchMock = vi.fn().mockImplementation((input: string, init?: RequestInit) => {
const method = (init?.method ?? "GET").toUpperCase();
if (input === "/api/observatory/fleet") {
return Promise.resolve({ ok: true, status: 200, json: () => Promise.resolve(fleetBody) });
}
if (input === "/api/observatory/throttle") {
return Promise.resolve({ ok: true, status: 200, json: () => Promise.resolve({ global: null, lanes: {} }) });
}
if (method === "POST" && input === "/api/observatory/pause") {
const ok = pauseOk;
pauseOk = true; // first attempt fails, the next succeeds
return Promise.resolve({ ok, status: ok ? 200 : 403, json: () => Promise.resolve({}) });
}
throw new Error(`Unmocked fetch: ${method} ${input}`);
});
vi.stubGlobal("fetch", fetchMock);
render(<ObservatoryApp windowId="w1" />);
await flush();

fireEvent.click(screen.getByRole("button", { name: /pause queue/i }));
await flush();
await waitFor(() =>
expect(screen.getByText(/could not update the pause state/i)).toBeTruthy(),
);

fireEvent.click(screen.getByRole("button", { name: /pause queue/i }));
await flush();
await waitFor(() =>
expect(screen.queryByText(/could not update the pause state/i)).toBeNull(),
);
});

it("shows the idle empty state when no agents are working", async () => {
vi.stubGlobal(
"fetch",
Expand Down
108 changes: 67 additions & 41 deletions desktop/src/apps/ObservatoryApp.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { useState, useEffect, useCallback } from "react";
import { Radar, Pause, Play, Loader2, CircleDot, Minus, Plus } from "lucide-react";
import { useState, useEffect, useCallback, useRef } from "react";
import { Radar, Pause, Play, Loader2, CircleDot, Minus, Plus, AlertCircle } from "lucide-react";
import { Switch } from "@/components/ui";

interface HeldCard {
Expand Down Expand Up @@ -94,6 +94,7 @@ export function ObservatoryApp({ windowId: _windowId }: { windowId: string }) {
const [laneCaps, setLaneCaps] = useState<Record<string, number>>({});
const [loading, setLoading] = useState(true);
const [busy, setBusy] = useState<string | null>(null);
const [steerError, setSteerError] = useState<string | null>(null);

const load = useCallback(async (opts?: { silent?: boolean }) => {
if (!opts?.silent) setLoading(true);
Expand Down Expand Up @@ -128,6 +129,32 @@ export function ObservatoryApp({ windowId: _windowId }: { windowId: string }) {
return () => clearInterval(id);
}, [load]);

// Shared write path for every steer control. Posts the change, surfaces a
// visible error if the server rejects it (so an optimistic value is not left
// standing silently), and always reconciles against the server. A sequence
// guard ensures only the latest write controls the banner: steer controls are
// not fully serialized (pause and a cap change use different busy scopes), so
// an earlier slow response must not overwrite a newer one's result.
const steerSeq = useRef(0);
const postSteer = useCallback(
async (url: string, body: object, failMsg: string) => {
const seq = ++steerSeq.current;
try {
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (seq === steerSeq.current) setSteerError(res.ok ? null : failMsg);
} catch {
if (seq === steerSeq.current) setSteerError(failMsg);
} finally {
await load({ silent: true });
}
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
[load],
);

const setScope = useCallback(
async (scope: string, paused: boolean) => {
setBusy(scope);
Expand All @@ -140,40 +167,28 @@ export function ObservatoryApp({ windowId: _windowId }: { windowId: string }) {
lanes: { ...prev.lanes, [scope]: paused },
},
);
try {
await fetch("/api/observatory/pause", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ scope, paused }),
});
await load({ silent: true });
} catch {
await load({ silent: true });
} finally {
setBusy(null);
}
await postSteer(
"/api/observatory/pause",
{ scope, paused },
"Could not update the pause state.",
);
setBusy(null);
},
[load],
[postSteer],
);

const setGlobalCap = useCallback(
async (next: number | null) => {
setBusy("cap");
setCap(next); // optimistic; reconciled on the next poll
try {
await fetch("/api/observatory/throttle", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ scope: "global", max_concurrent: next }),
});
await load({ silent: true });
} catch {
await load({ silent: true });
} finally {
setBusy(null);
}
await postSteer(
"/api/observatory/throttle",
{ scope: "global", max_concurrent: next },
"Could not update the concurrency cap.",
);
setBusy(null);
},
[load],
[postSteer],
);

const setLaneCap = useCallback(
Expand All @@ -185,20 +200,14 @@ export function ObservatoryApp({ windowId: _windowId }: { windowId: string }) {
else copy[handle] = next;
return copy;
});
try {
await fetch("/api/observatory/throttle", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ scope: handle, max_concurrent: next }),
});
await load({ silent: true });
} catch {
await load({ silent: true });
} finally {
setBusy(null);
}
await postSteer(
"/api/observatory/throttle",
{ scope: handle, max_concurrent: next },
`Could not update the cap for ${handle}.`,
);
setBusy(null);
},
[load],
[postSteer],
);

return (
Expand Down Expand Up @@ -234,6 +243,23 @@ export function ObservatoryApp({ windowId: _windowId }: { windowId: string }) {
</div>
)}

{steerError && (
<div
className="flex items-center gap-2 border-b border-red-500/20 bg-red-500/10 px-5 py-2 text-sm text-red-400"
role="alert"
>
<AlertCircle size={14} className="shrink-0" />
{steerError}
<button
type="button"
onClick={() => setSteerError(null)}
className="ml-auto text-xs text-red-400/80 transition-colors hover:text-red-400"
>
Dismiss
</button>
</div>
)}

{/* Steer: global concurrency cap (volume knob alongside the pause switch) */}
<div className="flex items-center gap-3 border-b border-shell-border px-5 py-2.5">
<span className="text-xs font-medium uppercase tracking-wide text-shell-text-tertiary">
Expand Down
Loading