Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions packages/sdk/server-ai/src/ldai/managed_agent.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
"""ManagedAgent — LaunchDarkly managed wrapper for agent invocations."""

from typing import Union
import asyncio
from typing import List, Union

from ldai import log
from ldai.models import AIAgentConfig
from ldai.providers import AgentResult, AgentRunner
from ldai.providers.runner import Runner
from ldai.providers.types import ManagedResult, RunnerResult
from ldai.providers.types import JudgeResult, ManagedResult, RunnerResult
from ldai.tracker import LDAIConfigTracker


class ManagedAgent:
"""
LaunchDarkly managed wrapper for AI agent invocations.

Holds an AgentRunner or Runner. Handles tracking automatically via
``create_tracker()``.
Holds an AgentRunner or Runner. Handles tracking and judge evaluation
dispatch automatically via ``create_tracker()``.
Obtain an instance via ``LDAIClient.create_agent()``.
"""

Expand All @@ -29,8 +32,13 @@ async def run(self, input: str) -> ManagedResult:
"""
Run the agent with the given input string.

Invokes the runner, tracks metrics, and dispatches judge evaluations
asynchronously. Returns immediately; awaiting ``result.evaluations``
guarantees both evaluation and tracking complete.

:param input: The user prompt or input to the agent
:return: ManagedResult containing the agent's output and metric summary
:return: ManagedResult containing the agent's output, metric summary,
and an optional evaluations task
"""
tracker = self._ai_config.create_tracker()
result: Union[RunnerResult, AgentResult] = await tracker.track_metrics_of_async(
Expand All @@ -39,12 +47,39 @@ async def run(self, input: str) -> ManagedResult:
)
# Support both RunnerResult (content) and legacy AgentResult (output)
content = result.content if isinstance(result, RunnerResult) else result.output # type: ignore[union-attr]

evaluations_task = self._track_judge_results(tracker, input, content)


return ManagedResult(
content=content,
metrics=tracker.get_summary(),
raw=result.raw,
evaluations=evaluations_task,
)

def _track_judge_results(
self,
tracker: LDAIConfigTracker,
input_text: str,
output_text: str,
) -> asyncio.Task[List[JudgeResult]]:
evaluator_task = self._ai_config.evaluator.evaluate(input_text, output_text)

async def _run_and_track(eval_task: asyncio.Task) -> List[JudgeResult]:
results = await eval_task
for r in results:
if r.success:
try:
tracker.track_judge_result(r)
except Exception as exc:
log.warning("Judge evaluation failed: %s", exc)
else:
log.warning("Judge evaluation failed: %s", r.error_message)
return results

return asyncio.create_task(_run_and_track(evaluator_task))

def get_agent_runner(self) -> Union[Runner, AgentRunner]:
"""
Return the underlying runner for advanced use.
Expand Down
208 changes: 193 additions & 15 deletions packages/sdk/server-ai/tests/test_managed_agent.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Tests for ManagedAgent."""

import asyncio
import pytest
from typing import List
from unittest.mock import AsyncMock, MagicMock

from ldai import LDAIClient, ManagedAgent
from ldai.evaluator import Evaluator
from ldai.managed_agent import ManagedAgent
from ldai.models import AIAgentConfig, AIAgentConfigDefault, ModelConfig, ProviderConfig
from ldai.providers.types import LDAIMetrics, ManagedResult, RunnerResult
from ldai.tracker import LDAIMetricSummary
from ldai.providers.types import JudgeResult, LDAIMetrics, ManagedResult, RunnerResult
from ldai.tracker import LDAIConfigTracker, LDAIMetricSummary

from ldclient import Config, Context, LDClient
from ldclient.integrations.test_data import TestData
Expand All @@ -19,6 +22,23 @@ def _make_summary(success: bool = True) -> LDAIMetricSummary:
return summary


def _make_noop_evaluator_config() -> MagicMock:
"""Build a minimal mock AIAgentConfig with a noop evaluator and a mock tracker."""
mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(
content="Test response",
raw=None,
metrics=LDAIMetrics(success=True, usage=None),
)
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config.evaluator = Evaluator.noop()
return mock_config


@pytest.fixture
def td() -> TestData:
td = TestData.data_source()
Expand Down Expand Up @@ -60,17 +80,7 @@ class TestManagedAgentRun:
@pytest.mark.asyncio
async def test_run_delegates_to_agent_runner(self):
"""Should delegate run() to the underlying AgentRunner and return ManagedResult."""
mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock()
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(
content="Test response",
metrics=LDAIMetrics(success=True, usage=None),
raw=None,
)
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config = _make_noop_evaluator_config()
mock_runner = MagicMock()
mock_runner.run = AsyncMock(
return_value=RunnerResult(
Expand All @@ -87,13 +97,16 @@ async def test_run_delegates_to_agent_runner(self):
assert result.content == "Test response"
assert result.metrics.success is True
mock_config.create_tracker.assert_called_once()
mock_tracker.track_metrics_of_async.assert_called_once()
mock_config.create_tracker.return_value.track_metrics_of_async.assert_called_once()
# evaluations should be present (from noop evaluator)
if result.evaluations is not None:
await result.evaluations

@pytest.mark.asyncio
async def test_run_uses_create_tracker_for_fresh_tracker(self):
"""Should use create_tracker() factory for a fresh tracker per invocation."""
mock_config = MagicMock(spec=AIAgentConfig)
fresh_tracker = MagicMock()
fresh_tracker = MagicMock(spec=LDAIConfigTracker)
fresh_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(
content="Fresh tracker response",
Expand All @@ -103,6 +116,7 @@ async def test_run_uses_create_tracker_for_fresh_tracker(self):
)
fresh_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_config.create_tracker = MagicMock(return_value=fresh_tracker)
mock_config.evaluator = Evaluator.noop()

mock_runner = MagicMock()

Expand All @@ -113,6 +127,8 @@ async def test_run_uses_create_tracker_for_fresh_tracker(self):
assert result.content == "Fresh tracker response"
mock_config.create_tracker.assert_called_once()
fresh_tracker.track_metrics_of_async.assert_called_once()
if result.evaluations is not None:
await result.evaluations

def test_get_agent_runner_returns_runner(self):
"""Should return the underlying AgentRunner."""
Expand All @@ -129,6 +145,168 @@ def test_get_config_returns_config(self):
assert agent.get_config() is mock_config


class TestManagedAgentEvaluations:
"""Tests for ManagedAgent evaluations chain (PR 12)."""

@pytest.mark.asyncio
async def test_run_returns_before_evaluations_resolve(self):
"""run() should return before evaluations complete."""
barrier = asyncio.Event()

async def _slow_evaluate(input_text: str, output_text: str) -> List[JudgeResult]:
await barrier.wait()
return []

mock_evaluator = MagicMock(spec=Evaluator)
mock_evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_slow_evaluate(i, o))
)

mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(content="resp", raw=None, metrics=LDAIMetrics(success=True))
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config.evaluator = mock_evaluator

mock_runner = MagicMock()
agent = ManagedAgent(mock_config, mock_runner)
result = await agent.run("Hello")

assert result is not None
assert result.evaluations is not None
assert not result.evaluations.done(), "evaluations task should still be pending"

barrier.set()
await result.evaluations

@pytest.mark.asyncio
async def test_await_evaluations_collects_results(self):
"""await result.evaluations should return the list of JudgeResult instances."""
judge_result = JudgeResult(
judge_config_key='judge-key',
success=True,
sampled=True,
metric_key='$ld:ai:judge:relevance',
score=0.9,
reasoning='Good agent response',
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [judge_result]

mock_evaluator = MagicMock(spec=Evaluator)
mock_evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(content="resp", raw=None, metrics=LDAIMetrics(success=True))
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_tracker.track_judge_result = MagicMock()
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config.evaluator = mock_evaluator

mock_runner = MagicMock()
agent = ManagedAgent(mock_config, mock_runner)
result = await agent.run("Hello")

results = await result.evaluations # type: ignore[misc]
assert results == [judge_result]

@pytest.mark.asyncio
async def test_tracking_fires_inside_awaited_chain(self):
"""tracker.track_judge_result() must be called when evaluations are awaited."""
judge_result = JudgeResult(
judge_config_key='agent-judge',
success=True,
sampled=True,
metric_key='$ld:ai:judge:relevance',
score=0.85,
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [judge_result]

mock_evaluator = MagicMock(spec=Evaluator)
mock_evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(content="resp", raw=None, metrics=LDAIMetrics(success=True))
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_tracker.track_judge_result = MagicMock()
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config.evaluator = mock_evaluator

mock_runner = MagicMock()
agent = ManagedAgent(mock_config, mock_runner)
result = await agent.run("Hello")

# Tracking should NOT have fired yet (before we await evaluations)
mock_tracker.track_judge_result.assert_not_called()

# Now await the evaluations task — tracking fires inside the chain
await result.evaluations # type: ignore[misc]

mock_tracker.track_judge_result.assert_called_once_with(judge_result)

@pytest.mark.asyncio
async def test_noop_evaluator_returns_empty_list(self):
"""With a noop evaluator, awaiting evaluations should return an empty list."""
mock_config = _make_noop_evaluator_config()
mock_runner = MagicMock()
agent = ManagedAgent(mock_config, mock_runner)
result = await agent.run("Hello")

results = await result.evaluations # type: ignore[misc]
assert results == []

@pytest.mark.asyncio
async def test_tracking_not_called_for_failed_judge_result(self):
"""tracker.track_judge_result() should NOT be called for unsuccessful judge results."""
failed_result = JudgeResult(
success=False,
sampled=True,
metric_key='$ld:ai:judge:relevance',
error_message='Judge evaluation failed',
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [failed_result]

mock_evaluator = MagicMock(spec=Evaluator)
mock_evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_config = MagicMock(spec=AIAgentConfig)
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(
return_value=RunnerResult(content="resp", raw=None, metrics=LDAIMetrics(success=True))
)
mock_tracker.get_summary = MagicMock(return_value=_make_summary(True))
mock_tracker.track_judge_result = MagicMock()
mock_config.create_tracker = MagicMock(return_value=mock_tracker)
mock_config.evaluator = mock_evaluator

mock_runner = MagicMock()
agent = ManagedAgent(mock_config, mock_runner)
result = await agent.run("Hello")
await result.evaluations # type: ignore[misc]

mock_tracker.track_judge_result.assert_not_called()


class TestLDAIClientCreateAgent:
"""Tests for LDAIClient.create_agent."""

Expand Down
Loading