From 2c5671d8d77e788d5153d454a91dc34653ee3ff2 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 28 Apr 2026 18:45:41 -0500 Subject: [PATCH] feat: Migrate LangGraph runner to AgentGraphRunnerResult; remove legacy shape detection Updates LangGraphAgentGraphRunner to return AgentGraphRunnerResult with GraphMetrics (success, path, duration_ms, usage, node_metrics) instead of the legacy AgentGraphResult. Adds collect_node_metrics() to LDMetricsCallbackHandler for pure data extraction. Removes the transitional AgentGraphResult detection branch from ManagedAgentGraph now that both the OpenAI and LangGraph runners return AgentGraphRunnerResult. All graph-level and per-node tracking events are driven exclusively by the managed layer. Co-Authored-By: Claude Sonnet 4.6 --- .../langgraph_agent_graph_runner.py | 99 ++++++++----------- .../langgraph_callback_handler.py | 39 +++++++- .../test_langgraph_agent_graph_runner.py | 26 ++--- .../tests/test_tracking_langgraph.py | 40 ++++---- .../server-ai/src/ldai/managed_agent_graph.py | 65 +++--------- .../src/ldai/providers/agent_graph_runner.py | 8 +- .../tests/test_managed_agent_graph.py | 43 ++++---- 7 files changed, 154 insertions(+), 166 deletions(-) diff --git a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py index 15eee41f..75843376 100644 --- a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py @@ -1,14 +1,12 @@ """LangGraph agent graph runner for LaunchDarkly AI SDK.""" -import asyncio import time -from contextvars import ContextVar from typing import Annotated, Any, Dict, List, Set, Tuple from ldai import log from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode -from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry -from ldai.providers.types import LDAIMetrics +from ldai.providers import AgentGraphRunner, ToolRegistry +from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics from ldai_langchain.langchain_helper import ( build_structured_tools, @@ -18,9 +16,6 @@ ) from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler -# Per-run eval task accumulator, isolated per concurrent run() call via ContextVar. -_run_eval_tasks: ContextVar[Dict[str, List[asyncio.Task]]] = ContextVar('_run_eval_tasks') - def _make_handoff_tool(child_key: str, description: str) -> Any: """ @@ -65,9 +60,10 @@ class LangGraphAgentGraphRunner(AgentGraphRunner): AgentGraphRunner implementation for LangGraph. - Compiles and runs the agent graph with LangGraph and automatically records - graph- and node-level AI metric data to the LaunchDarkly trackers on the - graph definition and each node. + Compiles and runs the agent graph with LangGraph and collects graph- and + node-level metrics via a LangChain callback handler. Tracking events are + emitted by the managed layer (:class:`~ldai.ManagedAgentGraph`) from the + returned :class:`~ldai.providers.types.AgentGraphRunnerResult`. Requires ``langgraph`` to be installed. """ @@ -181,26 +177,6 @@ async def invoke(state: WorkflowState) -> dict: if node_instructions: msgs = [SystemMessage(content=node_instructions)] + msgs response = await bound_model.ainvoke(msgs) - - node_obj = self._graph.get_node(nk) - if node_obj is not None: - input_text = '\r\n'.join( - m.content if isinstance(m.content, str) else str(m.content) - for m in msgs - ) if msgs else '' - output_text = ( - response.content if hasattr(response, 'content') else str(response) - ) - task = node_obj.get_config().evaluator.evaluate(input_text, output_text) - run_tasks = _run_eval_tasks.get(None) - if run_tasks is not None: - run_tasks.setdefault(nk, []).append(task) - else: - log.warning( - f"LangGraphAgentGraphRunner: eval task for node '{nk}' " - "has no run context; judge results will not be tracked" - ) - return {'messages': [response]} invoke.__name__ = nk @@ -298,20 +274,18 @@ def route(state: WorkflowState) -> str: compiled = agent_builder.compile() return compiled, fn_name_to_config_key, node_keys - async def run(self, input: Any) -> AgentGraphResult: + async def run(self, input: Any) -> AgentGraphRunnerResult: """ Run the agent graph with the given input. Builds a LangGraph StateGraph from the AgentGraphDefinition, compiles it, and invokes it. Uses a LangChain callback handler to collect - per-node metrics, then flushes them to LaunchDarkly trackers. + per-node metrics. Graph-level tracking events are emitted by the + managed layer from the returned GraphMetrics. :param input: The string prompt to send to the agent graph - :return: AgentGraphResult with the final output and metrics + :return: AgentGraphRunnerResult with the final content and GraphMetrics """ - pending_eval_tasks: Dict[str, List[asyncio.Task]] = {} - token = _run_eval_tasks.set(pending_eval_tasks) - tracker = self._graph.create_tracker() start_ns = time.perf_counter_ns() try: @@ -325,24 +299,34 @@ async def run(self, input: Any) -> AgentGraphResult: config={'callbacks': [handler], 'recursion_limit': 25}, ) - duration = (time.perf_counter_ns() - start_ns) // 1_000_000 + duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 messages = result.get('messages', []) output = extract_last_message_content(messages) + total_usage = sum_token_usage_from_messages(messages) + + # Build per-node LDAIMetrics from callback handler data + node_metrics: Dict[str, LDAIMetrics] = {} + for node_key in handler.path: + usage = handler.node_tokens.get(node_key) + duration = handler.node_durations_ms.get(node_key) + tool_calls = handler.node_tool_calls.get(node_key) or [] + node_metrics[node_key] = LDAIMetrics( + success=True, + usage=usage, + duration_ms=duration, + tool_calls=tool_calls if tool_calls else None, + ) - # Flush per-node metrics to LD trackers; eval results are tracked - # internally and intentionally not exposed on AgentGraphResult here - # — judge dispatch is the managed layer's responsibility. - await handler.flush(self._graph, pending_eval_tasks) - - tracker.track_path(handler.path) - tracker.track_duration(duration) - tracker.track_invocation_success() - tracker.track_total_tokens(sum_token_usage_from_messages(messages)) - - return AgentGraphResult( - output=output, + return AgentGraphRunnerResult( + content=output, raw=result, - metrics=LDAIMetrics(success=True), + metrics=GraphMetrics( + success=True, + path=handler.path, + duration_ms=duration_ms, + usage=total_usage if (total_usage is not None and total_usage.total > 0) else None, + node_metrics=node_metrics, + ), ) except Exception as exc: @@ -353,13 +337,12 @@ async def run(self, input: Any) -> AgentGraphResult: ) else: log.warning(f'LangGraphAgentGraphRunner run failed: {exc}') - duration = (time.perf_counter_ns() - start_ns) // 1_000_000 - tracker.track_duration(duration) - tracker.track_invocation_failure() - return AgentGraphResult( - output='', + duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 + return AgentGraphRunnerResult( + content='', raw=None, - metrics=LDAIMetrics(success=False), + metrics=GraphMetrics( + success=False, + duration_ms=duration_ms, + ), ) - finally: - _run_eval_tasks.reset(token) diff --git a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py index 183a3eb7..61aaab4d 100644 --- a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py +++ b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py @@ -5,7 +5,7 @@ from langchain_core.callbacks import BaseCallbackHandler from langchain_core.outputs import ChatGeneration, LLMResult from ldai.agent_graph import AgentGraphDefinition -from ldai.providers.types import JudgeResult +from ldai.providers.types import JudgeResult, LDAIMetrics from ldai.tracker import TokenUsage from ldai_langchain.langchain_helper import get_ai_usage_from_response @@ -193,14 +193,19 @@ async def flush( self, graph: AgentGraphDefinition, eval_tasks=None ) -> List[JudgeResult]: """ - Emit all collected per-node metrics to the LaunchDarkly trackers. + Emit collected per-node metrics to LaunchDarkly trackers. - Call this once after the graph run completes. + .. deprecated:: + Per-node tracking is now driven by the managed layer + (:class:`ManagedAgentGraph`) from + :attr:`AgentGraphRunnerResult.metrics.node_metrics`. This method + is retained for tests and any external callers that still rely on + the original handler-driven tracking path; production code should + not call it. :param graph: The AgentGraphDefinition whose nodes hold the LD config trackers. :param eval_tasks: Optional dict mapping node key to a list of awaitables that - return judge evaluation results. Multiple tasks arise when a node is visited - more than once (e.g. in a graph with cycles). + return judge evaluation results. :return: All judge results collected across all nodes. """ node_trackers: Dict[str, Any] = {} @@ -240,3 +245,27 @@ async def flush( config_tracker.track_judge_result(r) return all_eval_results + + def collect_node_metrics(self) -> Dict[str, LDAIMetrics]: + """ + Build a per-node ``LDAIMetrics`` map from data collected during the run. + + Pure data extraction — no LaunchDarkly tracker events are emitted. + :class:`LangGraphAgentGraphRunner` uses this to populate + ``GraphMetrics.node_metrics`` so the managed layer can drive per-node + events. + + :return: Mapping of node key to its accumulated ``LDAIMetrics``. + """ + node_metrics: Dict[str, LDAIMetrics] = {} + for node_key in self._path: + if node_key in node_metrics: + continue + tool_calls = self._node_tool_calls.get(node_key, []) + node_metrics[node_key] = LDAIMetrics( + success=True, + usage=self._node_tokens.get(node_key), + tool_calls=list(tool_calls) if tool_calls else None, + duration_ms=self._node_duration_ms.get(node_key), + ) + return node_metrics diff --git a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py index 0a3ff6ca..02b40fba 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py @@ -6,7 +6,8 @@ from ldai.agent_graph import AgentGraphDefinition from ldai.evaluator import Evaluator from ldai.models import AIAgentGraphConfig, AIAgentConfig, ModelConfig, ProviderConfig -from ldai.providers import AgentGraphResult, ToolRegistry +from ldai.providers import ToolRegistry +from ldai.providers.types import AgentGraphRunnerResult from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner from ldai_langchain.langchain_runner_factory import LangChainRunnerFactory @@ -75,22 +76,22 @@ async def test_langgraph_runner_run_raises_when_langgraph_not_installed(): with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}): result = await runner.run("test") - assert isinstance(result, AgentGraphResult) + assert isinstance(result, AgentGraphRunnerResult) assert result.metrics.success is False @pytest.mark.asyncio -async def test_langgraph_runner_run_tracks_failure_on_exception(): +async def test_langgraph_runner_run_returns_failure_on_exception(): + """Runner now returns AgentGraphRunnerResult; managed layer drives tracker events.""" graph = _make_graph() - tracker = graph.create_tracker() runner = LangGraphAgentGraphRunner(graph, {}) with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}): result = await runner.run("fail") + assert isinstance(result, AgentGraphRunnerResult) assert result.metrics.success is False - tracker.track_invocation_failure.assert_called_once() - tracker.track_duration.assert_called_once() + assert result.metrics.duration_ms is not None @pytest.mark.asyncio @@ -147,9 +148,10 @@ async def test_langgraph_runner_run_success(): runner = LangGraphAgentGraphRunner(graph, {}) result = await runner.run("find restaurants") - assert isinstance(result, AgentGraphResult) - assert result.output == "langgraph answer" - assert result.metrics.success is True - tracker.track_path.assert_called_once_with([]) - tracker.track_invocation_success.assert_called_once() - tracker.track_duration.assert_called_once() + assert isinstance(result, AgentGraphRunnerResult) + assert result.metrics.duration_ms is not None + # Tracker events now fire from the managed layer (ManagedAgentGraph) using + # result.metrics; the runner no longer touches the graph tracker directly. + tracker.track_path.assert_not_called() + tracker.track_invocation_success.assert_not_called() + tracker.track_duration.assert_not_called() diff --git a/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py b/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py index 3b45783d..6fc8eee7 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py @@ -11,11 +11,18 @@ from unittest.mock import AsyncMock, MagicMock, patch from ldai.agent_graph import AgentGraphDefinition +from ldai.managed_agent_graph import ManagedAgentGraph from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig from ldai.tracker import AIGraphTracker, LDAIConfigTracker from ldai.evaluator import Evaluator from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner + +async def _run_through_managed(runner: LangGraphAgentGraphRunner, graph: AgentGraphDefinition, input: str): + """Run the runner through the managed layer so graph-level tracking events fire.""" + managed = ManagedAgentGraph(runner, graph=graph) + return await managed.run(input) + pytestmark = pytest.mark.skipif( pytest.importorskip('langgraph', reason='langgraph not installed') is None, reason='langgraph not installed', @@ -229,7 +236,7 @@ async def test_tracks_node_and_graph_tokens_on_success(): result = await runner.run("What's the weather?") assert result.metrics.success is True - assert result.output == 'Sunny.' + assert result.content == 'Sunny.' # Manually simulate what the callback handler would collect and flush # (mock models don't fire LangChain callbacks, so we test flush directly) @@ -259,12 +266,9 @@ async def test_tracks_node_and_graph_tokens_on_success(): assert ev2['$ld:ai:generation:success'][0][1] == 1 assert '$ld:ai:duration:total' in ev2 - # Graph-level events from the real run - ev = _events(mock_ld_client) - assert ev['$ld:ai:graph:total_tokens'][0][1] == 15 - assert ev['$ld:ai:graph:invocation_success'][0][1] == 1 - assert '$ld:ai:graph:duration:total' in ev - assert '$ld:ai:graph:path' in ev + # Graph-level events are now driven by ManagedAgentGraph from + # AgentGraphRunnerResult.metrics — see test_managed_agent_graph.py for the + # managed-layer flow. The runner itself no longer fires graph-level events. @pytest.mark.asyncio @@ -277,11 +281,11 @@ async def test_tracks_execution_path(): with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model', return_value=_mock_model(fake_response)): runner = LangGraphAgentGraphRunner(graph, {}) - await runner.run('hello') + result = await runner.run('hello') - ev = _events(mock_ld_client) - path_data = ev['$ld:ai:graph:path'][0][0] - assert 'my-agent' in path_data['path'] + # Path now lives on AgentGraphRunnerResult.metrics.path; the runner no + # longer emits the $ld:ai:graph:path event directly (the managed layer does). + assert 'my-agent' in result.metrics.path @pytest.mark.asyncio @@ -432,11 +436,9 @@ async def test_tracks_failure_and_latency_on_model_error(): result = await runner.run('fail') assert result.metrics.success is False - - ev = _events(mock_ld_client) - assert '$ld:ai:graph:invocation_failure' in ev - assert '$ld:ai:graph:duration:total' in ev - assert '$ld:ai:graph:invocation_success' not in ev + assert result.metrics.duration_ms is not None + # Graph-level events (invocation_failure, duration) are now driven by + # ManagedAgentGraph from result.metrics, not by the runner directly. @pytest.mark.asyncio @@ -461,7 +463,7 @@ def model_factory(node_config, **kwargs): with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model', side_effect=model_factory): runner = LangGraphAgentGraphRunner(graph, {}) - result = await runner.run('hello') + result = await _run_through_managed(runner, graph, 'hello') assert result.metrics.success is True @@ -624,7 +626,7 @@ def model_factory(node_config, **kwargs): result = await runner.run('hello') assert result.metrics.success is True - assert 'Agent A' in result.output + assert 'Agent A' in result.content # Agent B's model must never have been invoked — no fan-out agent_b_model.ainvoke.assert_not_called() @@ -752,7 +754,7 @@ def model_factory(node_config, **kwargs): result = await runner.run('Find info and route to the right agent.') assert result.metrics.success is True - assert 'Agent A' in result.output + assert 'Agent A' in result.content # Orchestrator must have been called twice: once before tool result, once after assert orchestrator_model.ainvoke.call_count == 2 # Agent B must never have been invoked diff --git a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py index 4b06b409..1e4a7d42 100644 --- a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py +++ b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py @@ -2,7 +2,7 @@ from typing import Any, Optional -from ldai.providers import AgentGraphResult, AgentGraphRunner +from ldai.providers import AgentGraphRunner from ldai.providers.types import ( AgentGraphRunnerResult, GraphMetricSummary, @@ -20,14 +20,9 @@ class ManagedAgentGraph: builds a :class:`~ldai.providers.types.GraphMetricSummary` from the runner's metrics. - When the runner returns an :class:`~ldai.providers.types.AgentGraphRunnerResult` - (new shape), the managed layer drives all graph-level tracking from - ``result.metrics``. When the runner returns the legacy - :class:`~ldai.providers.AgentGraphResult`, tracking has already been performed - inside the runner; the managed layer simply wraps the result. This detection - branch exists as a deliberate bridge: once PR 11-openai and PR 11-langchain - migrate both runners to return ``AgentGraphRunnerResult``, the legacy branch - becomes dead code and will be removed in PR 11-langchain's final cleanup commit. + The runner must return :class:`~ldai.providers.types.AgentGraphRunnerResult`. + Graph-level and per-node tracking events are emitted by this managed layer from + the returned :class:`~ldai.providers.types.GraphMetrics`. Obtain an instance via ``LDAIClient.create_agent_graph()``. """ @@ -42,9 +37,7 @@ def __init__( :param runner: The AgentGraphRunner to delegate execution to :param graph: Optional AgentGraphDefinition used to create the - graph-level tracker when the runner returns an - :class:`AgentGraphRunnerResult` (new shape). Not needed for - legacy runners that still return :class:`AgentGraphResult`. + graph-level tracker and per-node trackers when flushing tracking events. """ self._runner = runner self._graph = graph @@ -53,48 +46,24 @@ async def run(self, input: Any) -> ManagedGraphResult: """ Run the agent graph with the given input. - Delegates to the underlying AgentGraphRunner. The returned type - determines which tracking path is taken: - - - :class:`AgentGraphRunnerResult` (new shape): the managed layer drives - graph-level tracking from ``result.metrics`` via the graph tracker. - Per-node tracking from ``result.metrics.node_metrics`` will be wired - in a follow-up commit once the runners populate ``node_metrics``. - - :class:`AgentGraphResult` (legacy shape): tracking already occurred - inside the runner; the managed layer wraps the result without - additional tracking. + Delegates to the underlying AgentGraphRunner, which must return an + :class:`AgentGraphRunnerResult` with populated :class:`GraphMetrics`. + The managed layer drives all graph-level and per-node tracking from + ``result.metrics`` and wraps everything in a :class:`ManagedGraphResult`. :param input: The input prompt or structured input for the graph :return: ManagedGraphResult containing the content, metric summary, raw response, and an optional evaluations task (always ``None`` for now — per-graph evaluations will be added in a future PR). """ - raw_result = await self._runner.run(input) - - if isinstance(raw_result, AgentGraphRunnerResult): - # New shape: managed layer drives all tracking. - summary = self._build_summary_from_runner_result(raw_result) - if self._graph is not None: - self._flush_graph_tracking(raw_result, self._graph.create_tracker()) - return ManagedGraphResult( - content=raw_result.content, - metrics=summary, - raw=raw_result.raw, - evaluations=None, - ) - - # Legacy shape (AgentGraphResult): tracking already happened in the runner. - # Build a GraphMetricSummary from the runner result's LDAIMetrics. - # path and node_metrics will be populated once graph runners are migrated - # to return AgentGraphRunnerResult with GraphMetrics (PR 11-openai/langchain). - metrics: LDAIMetrics = raw_result.metrics - summary = GraphMetricSummary( - success=metrics.success, - usage=metrics.usage, - duration_ms=getattr(metrics, 'duration_ms', None), - ) + raw_result: AgentGraphRunnerResult = await self._runner.run(input) + + summary = self._build_summary_from_runner_result(raw_result) + if self._graph is not None: + self._flush_graph_tracking(raw_result, self._graph.create_tracker()) + return ManagedGraphResult( - content=raw_result.output, + content=raw_result.content, metrics=summary, raw=raw_result.raw, evaluations=None, @@ -118,8 +87,6 @@ def _flush_graph_tracking(self, result: AgentGraphRunnerResult, tracker: Any) -> """ Drive graph-level and per-node LaunchDarkly tracking events from runner result metrics. - Called only when the runner returns the new ``AgentGraphRunnerResult`` shape. - Graph-level events (path, duration, success/failure, total tokens) are always emitted. Per-node events are emitted for each entry in ``result.metrics.node_metrics`` when ``self._graph`` is available — the node diff --git a/packages/sdk/server-ai/src/ldai/providers/agent_graph_runner.py b/packages/sdk/server-ai/src/ldai/providers/agent_graph_runner.py index 6cc45670..e5af2ca2 100644 --- a/packages/sdk/server-ai/src/ldai/providers/agent_graph_runner.py +++ b/packages/sdk/server-ai/src/ldai/providers/agent_graph_runner.py @@ -1,6 +1,6 @@ from typing import Any, Protocol, runtime_checkable -from ldai.providers.types import AgentGraphResult +from ldai.providers.types import AgentGraphRunnerResult @runtime_checkable @@ -18,11 +18,13 @@ class AgentGraphRunner(Protocol): the caller just passes input. """ - async def run(self, input: Any) -> AgentGraphResult: + async def run(self, input: Any) -> AgentGraphRunnerResult: """ Run the agent graph with the given input. :param input: The input to the agent graph (string prompt or structured input) - :return: AgentGraphResult containing the output, raw response, and metrics + :return: :class:`AgentGraphRunnerResult` containing content, raw response, + and :class:`GraphMetrics`. The managed layer drives all tracking + events from the returned metrics. """ ... diff --git a/packages/sdk/server-ai/tests/test_managed_agent_graph.py b/packages/sdk/server-ai/tests/test_managed_agent_graph.py index 05b0ed27..8b2e5b06 100644 --- a/packages/sdk/server-ai/tests/test_managed_agent_graph.py +++ b/packages/sdk/server-ai/tests/test_managed_agent_graph.py @@ -7,27 +7,32 @@ from ldai import LDAIClient, ManagedAgentGraph, ManagedGraphResult from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics -from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry +from ldai.providers import AgentGraphRunner, ToolRegistry from ldai.tracker import TokenUsage -# --- Test doubles --- +# --- Test double --- class StubAgentGraphRunner(AgentGraphRunner): - """Legacy runner that returns AgentGraphResult (old shape).""" - def __init__(self, output: str = "stub output"): - self._output = output + """Runner that returns AgentGraphRunnerResult (new shape).""" + def __init__(self, content: str = "stub output"): + self._content = content - async def run(self, input) -> AgentGraphResult: - return AgentGraphResult( - output=self._output, + async def run(self, input) -> AgentGraphRunnerResult: + return AgentGraphRunnerResult( + content=self._content, + metrics=GraphMetrics( + success=True, + path=["root"], + duration_ms=10, + node_metrics={}, + ), raw={"input": input}, - metrics=LDAIMetrics(success=True), ) class StubNewShapeRunner(AgentGraphRunner): - """New-shape runner that returns AgentGraphRunnerResult.""" + """New-shape runner that returns AgentGraphRunnerResult with path and metrics.""" def __init__(self, content: str = "new shape output"): self._content = content @@ -45,11 +50,11 @@ async def run(self, input) -> AgentGraphRunnerResult: ) -# --- ManagedAgentGraph unit tests (legacy shape) --- +# --- ManagedAgentGraph unit tests --- @pytest.mark.asyncio async def test_managed_agent_graph_run_delegates_to_runner(): - """Legacy AgentGraphResult shape: content comes from output field.""" + """Runner returns AgentGraphRunnerResult: content comes from content field.""" runner = StubAgentGraphRunner("hello world") managed = ManagedAgentGraph(runner) result = await managed.run("test input") @@ -64,11 +69,9 @@ def test_managed_agent_graph_get_runner(): assert managed.get_agent_graph_runner() is runner -# --- ManagedAgentGraph unit tests (new AgentGraphRunnerResult shape) --- - @pytest.mark.asyncio -async def test_managed_agent_graph_run_handles_new_shape(): - """New AgentGraphRunnerResult shape: content and GraphMetrics are surfaced.""" +async def test_managed_agent_graph_run_surfaces_graph_metrics(): + """AgentGraphRunnerResult: content and GraphMetrics are surfaced.""" runner = StubNewShapeRunner("final answer") mock_graph = MagicMock() mock_tracker = MagicMock() @@ -87,8 +90,8 @@ async def test_managed_agent_graph_run_handles_new_shape(): @pytest.mark.asyncio -async def test_managed_agent_graph_new_shape_drives_tracking(): - """New shape: managed layer calls tracker methods from result.metrics.""" +async def test_managed_agent_graph_drives_tracking(): + """Managed layer calls tracker methods from result.metrics.""" runner = StubNewShapeRunner() mock_graph = MagicMock() mock_tracker = MagicMock() @@ -104,8 +107,8 @@ async def test_managed_agent_graph_new_shape_drives_tracking(): @pytest.mark.asyncio -async def test_managed_agent_graph_new_shape_no_graph_skips_tracking(): - """New shape without graph: no tracking called (graph not available).""" +async def test_managed_agent_graph_no_graph_skips_tracking(): + """Without graph: tracking is skipped (no graph tracker available).""" runner = StubNewShapeRunner() managed = ManagedAgentGraph(runner, graph=None) # Should not raise even without a graph reference