Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/sdk/server-ai/src/ldai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ async def create_agent_graph(
if not runner:
return None

return ManagedAgentGraph(runner)
return ManagedAgentGraph(runner, graph=graph)

def agents(
self,
Expand Down
109 changes: 91 additions & 18 deletions packages/sdk/server-ai/src/ldai/managed_agent_graph.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,139 @@
"""ManagedAgentGraph — LaunchDarkly managed wrapper for agent graph execution."""

import asyncio
from typing import Any, List
from typing import Any, Optional

from ldai.providers import AgentGraphResult, AgentGraphRunner
from ldai.providers.types import GraphMetricSummary, JudgeResult, ManagedGraphResult
from ldai.providers.types import (
AgentGraphRunnerResult,
GraphMetricSummary,
LDAIMetrics,
ManagedGraphResult,
)


class ManagedAgentGraph:
"""
LaunchDarkly managed wrapper for AI agent graph execution.

Holds an AgentGraphRunner. Wraps the runner result in a
:class:`~ldai.providers.types.ManagedGraphResult` and builds a
:class:`~ldai.providers.types.GraphMetricSummary` from the runner's metrics.
Holds an AgentGraphRunner and an optional AgentGraphDefinition. Wraps the
runner result in a :class:`~ldai.providers.types.ManagedGraphResult` and
builds a :class:`~ldai.providers.types.GraphMetricSummary` from the runner's
metrics.

When the runner returns an :class:`~ldai.providers.types.AgentGraphRunnerResult`
(new shape), the managed layer drives all graph-level tracking from
``result.metrics``. When the runner returns the legacy
:class:`~ldai.providers.AgentGraphResult`, tracking has already been performed
inside the runner; the managed layer simply wraps the result. This detection
branch exists as a deliberate bridge: once PR 11-openai and PR 11-langchain
migrate both runners to return ``AgentGraphRunnerResult``, the legacy branch
becomes dead code and will be removed in PR 11-langchain's final cleanup commit.

Obtain an instance via ``LDAIClient.create_agent_graph()``.
"""

def __init__(
self,
runner: AgentGraphRunner,
graph: Optional[Any] = None,
):
"""
Initialize ManagedAgentGraph.

:param runner: The AgentGraphRunner to delegate execution to
:param graph: Optional AgentGraphDefinition used to create the
graph-level tracker when the runner returns an
:class:`AgentGraphRunnerResult` (new shape). Not needed for
legacy runners that still return :class:`AgentGraphResult`.
"""
self._runner = runner
self._graph = graph

async def run(self, input: Any) -> ManagedGraphResult:
"""
Run the agent graph with the given input.

Delegates to the underlying AgentGraphRunner, builds a
:class:`GraphMetricSummary` from the result, and wraps everything in a
:class:`ManagedGraphResult`.
Delegates to the underlying AgentGraphRunner. The returned type
determines which tracking path is taken:

- :class:`AgentGraphRunnerResult` (new shape): the managed layer drives
graph-level tracking from ``result.metrics`` via the graph tracker.
Per-node tracking from ``result.metrics.node_metrics`` will be wired
in a follow-up commit once the runners populate ``node_metrics``.
- :class:`AgentGraphResult` (legacy shape): tracking already occurred
inside the runner; the managed layer wraps the result without
additional tracking.

:param input: The input prompt or structured input for the graph
:return: ManagedGraphResult containing the content, metric summary, raw response,
and an optional evaluations task (currently always ``None`` for graphs —
per-graph evaluations will be added in a future PR).
:return: ManagedGraphResult containing the content, metric summary,
raw response, and an optional evaluations task (always ``None``
for now — per-graph evaluations will be added in a future PR).
"""
result: AgentGraphResult = await self._runner.run(input)
raw_result = await self._runner.run(input)

if isinstance(raw_result, AgentGraphRunnerResult):
# New shape: managed layer drives all tracking.
summary = self._build_summary_from_runner_result(raw_result)
if self._graph is not None:
self._flush_graph_tracking(raw_result, self._graph.create_tracker())
return ManagedGraphResult(
content=raw_result.content,
metrics=summary,
raw=raw_result.raw,
evaluations=None,
)

# Legacy shape (AgentGraphResult): tracking already happened in the runner.
# Build a GraphMetricSummary from the runner result's LDAIMetrics.
# path and node_metrics will be populated once graph runners are migrated
# to return AgentGraphRunnerResult with GraphMetrics (PR 11).
metrics = result.metrics
# to return AgentGraphRunnerResult with GraphMetrics (PR 11-openai/langchain).
metrics: LDAIMetrics = raw_result.metrics
summary = GraphMetricSummary(
success=metrics.success,
usage=metrics.usage,
duration_ms=getattr(metrics, 'duration_ms', None),
)

return ManagedGraphResult(
content=result.output,
content=raw_result.output,
metrics=summary,
raw=result.raw,
raw=raw_result.raw,
evaluations=None,
)

def _build_summary_from_runner_result(
self,
result: AgentGraphRunnerResult,
) -> GraphMetricSummary:
"""Build a GraphMetricSummary from an AgentGraphRunnerResult."""
m = result.metrics
return GraphMetricSummary(
success=m.success,
path=list(m.path),
duration_ms=m.duration_ms,
usage=m.usage,
node_metrics=dict(m.node_metrics),
)

def _flush_graph_tracking(self, result: AgentGraphRunnerResult, tracker: Any) -> None:
"""
Drive graph-level LaunchDarkly tracking events from runner result metrics.

Called only when the runner returns the new ``AgentGraphRunnerResult``
shape. Node-level tracking (from ``result.metrics.node_metrics``) will
be wired once the runners start populating that field.
"""
m = result.metrics
if m.path:
tracker.track_path(m.path)
if m.duration_ms is not None:
tracker.track_duration(m.duration_ms)
if m.success:
tracker.track_invocation_success()
else:
tracker.track_invocation_failure()
if m.usage is not None:
tracker.track_total_tokens(m.usage)

def get_agent_graph_runner(self) -> AgentGraphRunner:
"""
Return the underlying AgentGraphRunner for advanced use.
Expand Down
78 changes: 75 additions & 3 deletions packages/sdk/server-ai/tests/test_managed_agent_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from ldclient.integrations.test_data import TestData

from ldai import LDAIClient, ManagedAgentGraph, ManagedGraphResult
from ldai.providers.types import LDAIMetrics
from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics
from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry
from ldai.tracker import TokenUsage


# --- Test double ---
# --- Test doubles ---

class StubAgentGraphRunner(AgentGraphRunner):
"""Legacy runner that returns AgentGraphResult (old shape)."""
def __init__(self, output: str = "stub output"):
self._output = output

Expand All @@ -24,10 +26,30 @@ async def run(self, input) -> AgentGraphResult:
)


# --- ManagedAgentGraph unit tests ---
class StubNewShapeRunner(AgentGraphRunner):
"""New-shape runner that returns AgentGraphRunnerResult."""
def __init__(self, content: str = "new shape output"):
self._content = content

async def run(self, input) -> AgentGraphRunnerResult:
return AgentGraphRunnerResult(
content=self._content,
metrics=GraphMetrics(
success=True,
path=["root", "specialist"],
duration_ms=42,
usage=TokenUsage(total=10, input=5, output=5),
node_metrics={},
),
raw={"input": input},
)


# --- ManagedAgentGraph unit tests (legacy shape) ---

@pytest.mark.asyncio
async def test_managed_agent_graph_run_delegates_to_runner():
"""Legacy AgentGraphResult shape: content comes from output field."""
runner = StubAgentGraphRunner("hello world")
managed = ManagedAgentGraph(runner)
result = await managed.run("test input")
Expand All @@ -42,6 +64,56 @@ def test_managed_agent_graph_get_runner():
assert managed.get_agent_graph_runner() is runner


# --- ManagedAgentGraph unit tests (new AgentGraphRunnerResult shape) ---

@pytest.mark.asyncio
async def test_managed_agent_graph_run_handles_new_shape():
"""New AgentGraphRunnerResult shape: content and GraphMetrics are surfaced."""
runner = StubNewShapeRunner("final answer")
mock_graph = MagicMock()
mock_tracker = MagicMock()
mock_graph.create_tracker = MagicMock(return_value=mock_tracker)

managed = ManagedAgentGraph(runner, graph=mock_graph)
result = await managed.run("test input")

assert isinstance(result, ManagedGraphResult)
assert result.content == "final answer"
assert result.metrics.success is True
assert result.metrics.path == ["root", "specialist"]
assert result.metrics.duration_ms == 42
assert result.metrics.usage is not None
assert result.metrics.usage.total == 10


@pytest.mark.asyncio
async def test_managed_agent_graph_new_shape_drives_tracking():
"""New shape: managed layer calls tracker methods from result.metrics."""
runner = StubNewShapeRunner()
mock_graph = MagicMock()
mock_tracker = MagicMock()
mock_graph.create_tracker = MagicMock(return_value=mock_tracker)

managed = ManagedAgentGraph(runner, graph=mock_graph)
await managed.run("test input")

mock_tracker.track_path.assert_called_once_with(["root", "specialist"])
mock_tracker.track_duration.assert_called_once_with(42)
mock_tracker.track_invocation_success.assert_called_once()
mock_tracker.track_total_tokens.assert_called_once()


@pytest.mark.asyncio
async def test_managed_agent_graph_new_shape_no_graph_skips_tracking():
"""New shape without graph: no tracking called (graph not available)."""
runner = StubNewShapeRunner()
managed = ManagedAgentGraph(runner, graph=None)
# Should not raise even without a graph reference
result = await managed.run("test input")
assert result.content == "new shape output"
assert result.metrics.success is True




# --- LDAIClient.create_agent_graph() integration tests ---
Expand Down
Loading