-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient.py
More file actions
52 lines (44 loc) · 1.76 KB
/
Copy pathclient.py
File metadata and controls
52 lines (44 loc) · 1.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""cost_budget client — observes per-currency counter draining to exhausted."""
from __future__ import annotations
import asyncio
import contextlib
import os
import sys
from arcp import ClientInfo, WebSocketTransport
from arcp.client import ARCPClient
PORT = int(os.environ.get("ARCP_DEMO_PORT", "7891"))
URL = os.environ.get("ARCP_DEMO_URL", f"ws://127.0.0.1:{PORT}/arcp")
TOKEN = os.environ.get("ARCP_DEMO_TOKEN", "demo-token")
async def main() -> int:
client = ARCPClient(
client=ClientInfo(name="cost-budget-client", version="1.0.0"),
token=TOKEN,
features=("cost.budget",),
)
async with contextlib.aclosing(client):
transport = await WebSocketTransport.connect(URL)
await client.connect(transport)
handle = await client.submit(
agent="caller",
lease_request={"cost.budget": ["USD:1.00"]},
)
observed_remaining: list[float] = []
exhausted_in_tool_result = False
async for ev in handle.events():
if ev["kind"] == "metric" and ev["body"].get("name") == "cost.budget.remaining":
observed_remaining.append(float(ev["body"]["value"]))
elif (
ev["kind"] == "tool_result"
and ev["body"].get("error", {}).get("code") == "BUDGET_EXHAUSTED"
):
exhausted_in_tool_result = True
result = await handle.done
print(
f"remaining_snapshots={observed_remaining}, exhausted_in_tool_result={exhausted_in_tool_result}"
)
assert result.final_status == "success"
assert exhausted_in_tool_result
assert observed_remaining == sorted(observed_remaining, reverse=True)
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))