From c69a9ffa210513dce40df77e0645212da925c87b Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 28 Apr 2026 18:31:06 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Graph=20tracking=20refactor=20=E2=80=94?= =?UTF-8?q?=20ManagedAgentGraph=20drives=20tracking=20for=20new=20runner?= =?UTF-8?q?=20shape?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ManagedAgentGraph.run() now detects the runner result type and dispatches accordingly: - AgentGraphRunnerResult (new shape): managed layer drives all graph-level tracking from result.metrics (path, duration, success/failure, total tokens) via the graph tracker. Node-level tracking from node_metrics will be wired once runners populate that field (PR 11-openai/langchain). - AgentGraphResult (legacy shape): tracking already occurred inside the runner; managed layer wraps result without additional tracking. ManagedAgentGraph now accepts an optional graph parameter (AgentGraphDefinition) used to create the graph tracker. LDAIClient.create_agent_graph() passes the resolved graph definition. This is a deliberate bridge pattern: the legacy detection branch will be removed once both runners are migrated. Co-Authored-By: Claude Sonnet 4.6 --- packages/sdk/server-ai/src/ldai/client.py | 2 +- .../server-ai/src/ldai/managed_agent_graph.py | 109 +++++++++++++++--- .../tests/test_managed_agent_graph.py | 78 ++++++++++++- 3 files changed, 167 insertions(+), 22 deletions(-) diff --git a/packages/sdk/server-ai/src/ldai/client.py b/packages/sdk/server-ai/src/ldai/client.py index 448d5c55..eeff1f1b 100644 --- a/packages/sdk/server-ai/src/ldai/client.py +++ b/packages/sdk/server-ai/src/ldai/client.py @@ -815,7 +815,7 @@ async def create_agent_graph( if not runner: return None - return ManagedAgentGraph(runner) + return ManagedAgentGraph(runner, graph=graph) def agents( self, diff --git a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py index 50b3440e..94026973 100644 --- a/packages/sdk/server-ai/src/ldai/managed_agent_graph.py +++ b/packages/sdk/server-ai/src/ldai/managed_agent_graph.py @@ -1,19 +1,33 @@ """ManagedAgentGraph — LaunchDarkly managed wrapper for agent graph execution.""" -import asyncio -from typing import Any, List +from typing import Any, Optional from ldai.providers import AgentGraphResult, AgentGraphRunner -from ldai.providers.types import GraphMetricSummary, JudgeResult, ManagedGraphResult +from ldai.providers.types import ( + AgentGraphRunnerResult, + GraphMetricSummary, + LDAIMetrics, + ManagedGraphResult, +) class ManagedAgentGraph: """ LaunchDarkly managed wrapper for AI agent graph execution. - Holds an AgentGraphRunner. Wraps the runner result in a - :class:`~ldai.providers.types.ManagedGraphResult` and builds a - :class:`~ldai.providers.types.GraphMetricSummary` from the runner's metrics. + Holds an AgentGraphRunner and an optional AgentGraphDefinition. Wraps the + runner result in a :class:`~ldai.providers.types.ManagedGraphResult` and + builds a :class:`~ldai.providers.types.GraphMetricSummary` from the runner's + metrics. + + When the runner returns an :class:`~ldai.providers.types.AgentGraphRunnerResult` + (new shape), the managed layer drives all graph-level tracking from + ``result.metrics``. When the runner returns the legacy + :class:`~ldai.providers.AgentGraphResult`, tracking has already been performed + inside the runner; the managed layer simply wraps the result. This detection + branch exists as a deliberate bridge: once PR 11-openai and PR 11-langchain + migrate both runners to return ``AgentGraphRunnerResult``, the legacy branch + becomes dead code and will be removed in PR 11-langchain's final cleanup commit. Obtain an instance via ``LDAIClient.create_agent_graph()``. """ @@ -21,46 +35,105 @@ class ManagedAgentGraph: def __init__( self, runner: AgentGraphRunner, + graph: Optional[Any] = None, ): """ Initialize ManagedAgentGraph. :param runner: The AgentGraphRunner to delegate execution to + :param graph: Optional AgentGraphDefinition used to create the + graph-level tracker when the runner returns an + :class:`AgentGraphRunnerResult` (new shape). Not needed for + legacy runners that still return :class:`AgentGraphResult`. """ self._runner = runner + self._graph = graph async def run(self, input: Any) -> ManagedGraphResult: """ Run the agent graph with the given input. - Delegates to the underlying AgentGraphRunner, builds a - :class:`GraphMetricSummary` from the result, and wraps everything in a - :class:`ManagedGraphResult`. + Delegates to the underlying AgentGraphRunner. The returned type + determines which tracking path is taken: + + - :class:`AgentGraphRunnerResult` (new shape): the managed layer drives + graph-level tracking from ``result.metrics`` via the graph tracker. + Per-node tracking from ``result.metrics.node_metrics`` will be wired + in a follow-up commit once the runners populate ``node_metrics``. + - :class:`AgentGraphResult` (legacy shape): tracking already occurred + inside the runner; the managed layer wraps the result without + additional tracking. :param input: The input prompt or structured input for the graph - :return: ManagedGraphResult containing the content, metric summary, raw response, - and an optional evaluations task (currently always ``None`` for graphs — - per-graph evaluations will be added in a future PR). + :return: ManagedGraphResult containing the content, metric summary, + raw response, and an optional evaluations task (always ``None`` + for now — per-graph evaluations will be added in a future PR). """ - result: AgentGraphResult = await self._runner.run(input) + raw_result = await self._runner.run(input) + + if isinstance(raw_result, AgentGraphRunnerResult): + # New shape: managed layer drives all tracking. + summary = self._build_summary_from_runner_result(raw_result) + if self._graph is not None: + self._flush_graph_tracking(raw_result, self._graph.create_tracker()) + return ManagedGraphResult( + content=raw_result.content, + metrics=summary, + raw=raw_result.raw, + evaluations=None, + ) + # Legacy shape (AgentGraphResult): tracking already happened in the runner. # Build a GraphMetricSummary from the runner result's LDAIMetrics. # path and node_metrics will be populated once graph runners are migrated - # to return AgentGraphRunnerResult with GraphMetrics (PR 11). - metrics = result.metrics + # to return AgentGraphRunnerResult with GraphMetrics (PR 11-openai/langchain). + metrics: LDAIMetrics = raw_result.metrics summary = GraphMetricSummary( success=metrics.success, usage=metrics.usage, duration_ms=getattr(metrics, 'duration_ms', None), ) - return ManagedGraphResult( - content=result.output, + content=raw_result.output, metrics=summary, - raw=result.raw, + raw=raw_result.raw, evaluations=None, ) + def _build_summary_from_runner_result( + self, + result: AgentGraphRunnerResult, + ) -> GraphMetricSummary: + """Build a GraphMetricSummary from an AgentGraphRunnerResult.""" + m = result.metrics + return GraphMetricSummary( + success=m.success, + path=list(m.path), + duration_ms=m.duration_ms, + usage=m.usage, + node_metrics=dict(m.node_metrics), + ) + + def _flush_graph_tracking(self, result: AgentGraphRunnerResult, tracker: Any) -> None: + """ + Drive graph-level LaunchDarkly tracking events from runner result metrics. + + Called only when the runner returns the new ``AgentGraphRunnerResult`` + shape. Node-level tracking (from ``result.metrics.node_metrics``) will + be wired once the runners start populating that field. + """ + m = result.metrics + if m.path: + tracker.track_path(m.path) + if m.duration_ms is not None: + tracker.track_duration(m.duration_ms) + if m.success: + tracker.track_invocation_success() + else: + tracker.track_invocation_failure() + if m.usage is not None: + tracker.track_total_tokens(m.usage) + def get_agent_graph_runner(self) -> AgentGraphRunner: """ Return the underlying AgentGraphRunner for advanced use. diff --git a/packages/sdk/server-ai/tests/test_managed_agent_graph.py b/packages/sdk/server-ai/tests/test_managed_agent_graph.py index 9cdceaed..05b0ed27 100644 --- a/packages/sdk/server-ai/tests/test_managed_agent_graph.py +++ b/packages/sdk/server-ai/tests/test_managed_agent_graph.py @@ -6,13 +6,15 @@ from ldclient.integrations.test_data import TestData from ldai import LDAIClient, ManagedAgentGraph, ManagedGraphResult -from ldai.providers.types import LDAIMetrics +from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry +from ldai.tracker import TokenUsage -# --- Test double --- +# --- Test doubles --- class StubAgentGraphRunner(AgentGraphRunner): + """Legacy runner that returns AgentGraphResult (old shape).""" def __init__(self, output: str = "stub output"): self._output = output @@ -24,10 +26,30 @@ async def run(self, input) -> AgentGraphResult: ) -# --- ManagedAgentGraph unit tests --- +class StubNewShapeRunner(AgentGraphRunner): + """New-shape runner that returns AgentGraphRunnerResult.""" + def __init__(self, content: str = "new shape output"): + self._content = content + + async def run(self, input) -> AgentGraphRunnerResult: + return AgentGraphRunnerResult( + content=self._content, + metrics=GraphMetrics( + success=True, + path=["root", "specialist"], + duration_ms=42, + usage=TokenUsage(total=10, input=5, output=5), + node_metrics={}, + ), + raw={"input": input}, + ) + + +# --- ManagedAgentGraph unit tests (legacy shape) --- @pytest.mark.asyncio async def test_managed_agent_graph_run_delegates_to_runner(): + """Legacy AgentGraphResult shape: content comes from output field.""" runner = StubAgentGraphRunner("hello world") managed = ManagedAgentGraph(runner) result = await managed.run("test input") @@ -42,6 +64,56 @@ def test_managed_agent_graph_get_runner(): assert managed.get_agent_graph_runner() is runner +# --- ManagedAgentGraph unit tests (new AgentGraphRunnerResult shape) --- + +@pytest.mark.asyncio +async def test_managed_agent_graph_run_handles_new_shape(): + """New AgentGraphRunnerResult shape: content and GraphMetrics are surfaced.""" + runner = StubNewShapeRunner("final answer") + mock_graph = MagicMock() + mock_tracker = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=mock_tracker) + + managed = ManagedAgentGraph(runner, graph=mock_graph) + result = await managed.run("test input") + + assert isinstance(result, ManagedGraphResult) + assert result.content == "final answer" + assert result.metrics.success is True + assert result.metrics.path == ["root", "specialist"] + assert result.metrics.duration_ms == 42 + assert result.metrics.usage is not None + assert result.metrics.usage.total == 10 + + +@pytest.mark.asyncio +async def test_managed_agent_graph_new_shape_drives_tracking(): + """New shape: managed layer calls tracker methods from result.metrics.""" + runner = StubNewShapeRunner() + mock_graph = MagicMock() + mock_tracker = MagicMock() + mock_graph.create_tracker = MagicMock(return_value=mock_tracker) + + managed = ManagedAgentGraph(runner, graph=mock_graph) + await managed.run("test input") + + mock_tracker.track_path.assert_called_once_with(["root", "specialist"]) + mock_tracker.track_duration.assert_called_once_with(42) + mock_tracker.track_invocation_success.assert_called_once() + mock_tracker.track_total_tokens.assert_called_once() + + +@pytest.mark.asyncio +async def test_managed_agent_graph_new_shape_no_graph_skips_tracking(): + """New shape without graph: no tracking called (graph not available).""" + runner = StubNewShapeRunner() + managed = ManagedAgentGraph(runner, graph=None) + # Should not raise even without a graph reference + result = await managed.run("test input") + assert result.content == "new shape output" + assert result.metrics.success is True + + # --- LDAIClient.create_agent_graph() integration tests ---