diff --git a/.gitignore b/.gitignore index cd448815..2aa97b8e 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,6 @@ codegen.log Brewfile.lock.json .ipynb_checkpoints -.DS_Store \ No newline at end of file +.DS_Store + +.worktrees/ \ No newline at end of file diff --git a/src/openlayer/lib/integrations/_openai_embedding_common.py b/src/openlayer/lib/integrations/_openai_embedding_common.py new file mode 100644 index 00000000..429b6275 --- /dev/null +++ b/src/openlayer/lib/integrations/_openai_embedding_common.py @@ -0,0 +1,84 @@ +"""Shared parsing helpers for OpenAI-shaped embedding tracers (OpenAI, AsyncOpenAI, LiteLLM).""" + +from typing import Any, Dict, List, Optional, Tuple, Union + + +def parse_embedding_response( + response: Any, +) -> Tuple[Union[List[float], List[List[float]]], int, int]: + """Extract (embeddings, dimensions, count) from an OpenAI-shaped EmbeddingResponse. + + For a single input, returns the vector directly. + For a batch, returns a list of vectors. + """ + try: + data = getattr(response, "data", None) + if data is None and isinstance(response, dict): + data = response.get("data", []) + if not data: + return [], 0, 0 + embeddings = [ + item["embedding"] if isinstance(item, dict) else item.embedding + for item in data + ] + if not embeddings: + return [], 0, 0 + if len(embeddings) == 1: + return embeddings[0], len(embeddings[0]), 1 + return embeddings, len(embeddings[0]), len(embeddings) + except Exception: + return [], 0, 0 + + +def get_embedding_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Extract embedding-relevant model parameters from create() kwargs.""" + return { + "dimensions": kwargs.get("dimensions"), + "encoding_format": kwargs.get("encoding_format"), + "user": kwargs.get("user"), + } + + +def build_embedding_step_kwargs( + response: Any, + call_kwargs: Dict[str, Any], + start_time: float, + end_time: float, + *, + name: str, + provider: str, + inference_id: Optional[str] = None, +) -> Dict[str, Any]: + """Build the kwargs to pass to ``tracer.add_embedding_step_to_trace``. + + Common boilerplate for OpenAI-shaped responses (OpenAI sync/async, LiteLLM). + Callers may layer extra fields (cost, extra_metadata, model_parameters) on + top of the returned dict before invoking the tracer helper. + """ + model_name = getattr(response, "model", call_kwargs.get("model", "unknown")) + embeddings, dim, count = parse_embedding_response(response) + usage = getattr(response, "usage", None) + prompt_tokens = getattr(usage, "prompt_tokens", 0) if usage else 0 + total_tokens = getattr(usage, "total_tokens", prompt_tokens) if usage else prompt_tokens + + return { + "name": name, + "end_time": end_time, + "inputs": {"input": call_kwargs.get("input")}, + "output": embeddings, + "latency": (end_time - start_time) * 1000, + "tokens": total_tokens, + "prompt_tokens": prompt_tokens, + "model": model_name, + "model_parameters": get_embedding_model_parameters(call_kwargs), + "embedding_dimensions": dim, + "embedding_count": count, + "raw_output": ( + response.model_dump() + if hasattr(response, "model_dump") + else str(response) + ), + "provider": provider, + "id": inference_id, + "metadata": {"provider": provider}, + } diff --git a/src/openlayer/lib/integrations/async_openai_tracer.py b/src/openlayer/lib/integrations/async_openai_tracer.py index 01a73ba8..59369220 100644 --- a/src/openlayer/lib/integrations/async_openai_tracer.py +++ b/src/openlayer/lib/integrations/async_openai_tracer.py @@ -16,6 +16,8 @@ if TYPE_CHECKING: import openai +from ..tracing import tracer +from ._openai_embedding_common import build_embedding_step_kwargs from .openai_tracer import ( get_model_parameters, create_trace_args, @@ -155,6 +157,22 @@ async def traced_responses_create_func(*args, **kwargs): else: logger.debug("Responses API not available in this AsyncOpenAI client version") + # Patch Embeddings API + if hasattr(client, "embeddings"): + embeddings_create_func = client.embeddings.create + + @wraps(embeddings_create_func) + async def traced_embeddings_create_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + return await handle_embedding_async( + *args, + **kwargs, + original_func=embeddings_create_func, + inference_id=inference_id, + ) + + client.embeddings.create = traced_embeddings_create_func + return client @@ -698,3 +716,35 @@ async def handle_async_non_streaming_parse( ) return response + + +# ----------------------------- Async Embeddings ----------------------------- # +async def handle_embedding_async( + original_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Trace an async AsyncOpenAI client.embeddings.create() call.""" + start_time = time.time() + response = await original_func(*args, **kwargs) + end_time = time.time() + + try: + tracer.add_embedding_step_to_trace( + **build_embedding_step_kwargs( + response, + kwargs, + start_time, + end_time, + name="OpenAI Embedding", + provider="OpenAI", + inference_id=inference_id, + ) + ) + except Exception as e: + logger.error( + "Failed to trace the OpenAI embedding request with Openlayer. %s", e + ) + + return response diff --git a/src/openlayer/lib/integrations/bedrock_tracer.py b/src/openlayer/lib/integrations/bedrock_tracer.py index a497474e..a7834d65 100644 --- a/src/openlayer/lib/integrations/bedrock_tracer.py +++ b/src/openlayer/lib/integrations/bedrock_tracer.py @@ -5,7 +5,7 @@ import logging import time from functools import wraps -from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union from botocore.response import StreamingBody @@ -25,6 +25,11 @@ logger = logging.getLogger(__name__) +def _is_embedding_model(model_id: str) -> bool: + """Return True when modelId refers to a Bedrock embedding model.""" + return "embed" in (model_id or "").lower() + + def trace_bedrock(client: "boto3.client") -> "boto3.client": """Patch the Bedrock client to trace model invocations. @@ -63,6 +68,14 @@ def trace_bedrock(client: "boto3.client") -> "boto3.client": @wraps(invoke_model_func) def traced_invoke_model(*args, **kwargs): inference_id = kwargs.pop("inference_id", None) + model_id = kwargs.get("modelId", "") + if _is_embedding_model(model_id): + return handle_embedding_invoke( + *args, + **kwargs, + invoke_func=invoke_model_func, + inference_id=inference_id, + ) return handle_non_streaming_invoke( *args, **kwargs, @@ -153,6 +166,112 @@ def handle_non_streaming_invoke( return response +def handle_embedding_invoke( + invoke_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Dict[str, Any]: + """Handle invoke_model for embedding models (Titan, Cohere).""" + start_time = time.time() + response = invoke_func(*args, **kwargs) + end_time = time.time() + + try: + # Parse the request body + body_str = kwargs.get("body", "{}") + if isinstance(body_str, bytes): + body_str = body_str.decode("utf-8") + body_data = json.loads(body_str) if isinstance(body_str, str) else body_str + + # Read and replace the response body so callers can still consume it + original_body = response["body"] + response_body_bytes = original_body.read() + response_data = json.loads( + response_body_bytes.decode("utf-8") + if isinstance(response_body_bytes, bytes) + else response_body_bytes + ) + new_stream = io.BytesIO(response_body_bytes) + response["body"] = StreamingBody(new_stream, len(response_body_bytes)) + + model_id = kwargs.get("modelId", "") + inputs = _parse_embedding_input(body_data, model_id) + embeddings, dim, count = _parse_embedding_output(response_data, model_id) + prompt_tokens = _parse_embedding_tokens(response_data, model_id) + model_parameters = _get_embedding_model_parameters(body_data, model_id) + + tracer.add_embedding_step_to_trace( + name="AWS Bedrock Embedding", + end_time=end_time, + inputs=inputs, + output=embeddings, + latency=(end_time - start_time) * 1000, + tokens=prompt_tokens, + prompt_tokens=prompt_tokens, + model=model_id, + model_parameters=model_parameters, + embedding_dimensions=dim, + embedding_count=count, + raw_output=response_data, + provider="Bedrock", + id=inference_id, + metadata={}, + ) + + except Exception as e: + logger.error( + "Failed to trace the Bedrock embedding invocation with Openlayer. %s", e + ) + + return response + + +def _parse_embedding_input(body_data: Dict[str, Any], model_id: str) -> Dict[str, Any]: + if model_id.startswith("amazon.titan-embed"): + return {"input": body_data.get("inputText", "")} + if model_id.startswith("cohere.embed"): + return {"input": body_data.get("texts", [])} + return {"input": body_data} + + +def _parse_embedding_output( + response_data: Dict[str, Any], model_id: str +) -> Tuple[Union[List[float], List[List[float]]], int, int]: + """Returns (embeddings, dimensions, count).""" + if model_id.startswith("amazon.titan-embed"): + emb = response_data.get("embedding", []) + return emb, len(emb), 1 + if model_id.startswith("cohere.embed"): + embs = response_data.get("embeddings", []) + dim = len(embs[0]) if embs else 0 + return embs, dim, len(embs) + return [], 0, 0 + + +def _parse_embedding_tokens(response_data: Dict[str, Any], model_id: str) -> int: + if model_id.startswith("amazon.titan-embed"): + return response_data.get("inputTextTokenCount", 0) + return 0 + + +def _get_embedding_model_parameters( + body_data: Dict[str, Any], model_id: str +) -> Dict[str, Any]: + if model_id.startswith("amazon.titan-embed"): + return { + "dimensions": body_data.get("dimensions"), + "normalize": body_data.get("normalize"), + } + if model_id.startswith("cohere.embed"): + return { + "input_type": body_data.get("input_type"), + "truncate": body_data.get("truncate"), + "embedding_types": body_data.get("embedding_types"), + } + return {} + + def handle_streaming_invoke( invoke_func: callable, *args, diff --git a/src/openlayer/lib/integrations/litellm_tracer.py b/src/openlayer/lib/integrations/litellm_tracer.py index d4545e90..00fa411f 100644 --- a/src/openlayer/lib/integrations/litellm_tracer.py +++ b/src/openlayer/lib/integrations/litellm_tracer.py @@ -18,6 +18,7 @@ from ..tracing import tracer from ..tracing import enums as tracer_enums +from ._openai_embedding_common import build_embedding_step_kwargs logger = logging.getLogger(__name__) @@ -96,8 +97,24 @@ def traced_completion(*args, **kwargs): ) litellm.completion = traced_completion + + # Patch litellm.embedding to trace embedding calls. + original_embedding = litellm.embedding + + @wraps(original_embedding) + def traced_embedding(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + return handle_embedding( + *args, + **kwargs, + embedding_func=original_embedding, + inference_id=inference_id, + ) + + litellm.embedding = traced_embedding + _litellm_traced = True - logger.debug("litellm.completion has been patched for Openlayer tracing") + logger.debug("litellm.completion and litellm.embedding have been patched for Openlayer tracing") def handle_streaming_completion( @@ -337,6 +354,60 @@ def handle_non_streaming_completion( return response +def handle_embedding( + embedding_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handle a single litellm.embedding() invocation.""" + start_time = time.time() + response = embedding_func(*args, **kwargs) + end_time = time.time() + + try: + model_name = kwargs.get("model", getattr(response, "model", "unknown")) + provider = detect_provider_from_response(response, model_name) + extra_metadata = extract_litellm_metadata(response, model_name) + usage_data = extract_usage_from_response(response) + + step_kwargs = build_embedding_step_kwargs( + response, + kwargs, + start_time, + end_time, + name="LiteLLM Embedding", + provider=provider, + inference_id=inference_id, + ) + + # LiteLLM-specific overlays: usage uses LiteLLM's normalized dict, extra + # connection params, response cost, and provider metadata. + prompt_tokens = usage_data.get("prompt_tokens") or 0 + step_kwargs["prompt_tokens"] = prompt_tokens + step_kwargs["tokens"] = usage_data.get("total_tokens") or prompt_tokens + step_kwargs["model_parameters"] = { + **step_kwargs["model_parameters"], + "timeout": kwargs.get("timeout"), + "api_base": kwargs.get("api_base"), + "api_version": kwargs.get("api_version"), + } + step_kwargs["cost"] = extra_metadata.get("cost", None) + step_kwargs["metadata"] = { + **step_kwargs["metadata"], + "litellm_model": model_name, + **extra_metadata, + } + + tracer.add_embedding_step_to_trace(**step_kwargs) + except Exception as e: + logger.error( + "Failed to trace the LiteLLM embedding request with Openlayer. %s", e + ) + + return response + + def get_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]: """Gets the model parameters from the kwargs.""" return { diff --git a/src/openlayer/lib/integrations/openai_tracer.py b/src/openlayer/lib/integrations/openai_tracer.py index 37e6fe58..c64c70ae 100644 --- a/src/openlayer/lib/integrations/openai_tracer.py +++ b/src/openlayer/lib/integrations/openai_tracer.py @@ -27,6 +27,7 @@ ImageContent, TextContent, ) +from ._openai_embedding_common import build_embedding_step_kwargs logger = logging.getLogger(__name__) @@ -153,6 +154,22 @@ def traced_responses_create_func(*args, **kwargs): else: logger.debug("Responses API not available in this OpenAI client version") + # Patch Embeddings API + if hasattr(client, "embeddings"): + embeddings_create_func = client.embeddings.create + + @wraps(embeddings_create_func) + def traced_embeddings_create_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + return handle_embedding( + *args, + **kwargs, + original_func=embeddings_create_func, + inference_id=inference_id, + ) + + client.embeddings.create = traced_embeddings_create_func + return client @@ -1611,6 +1628,38 @@ def parse_structured_output_data(response: Any) -> Union[str, Dict[str, Any], No return parse_non_streaming_output_data(response) +# ----------------------------- OpenAI Embeddings ---------------------------- # +def handle_embedding( + original_func: callable, + *args, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Trace a sync OpenAI client.embeddings.create() call.""" + start_time = time.time() + response = original_func(*args, **kwargs) + end_time = time.time() + + try: + tracer.add_embedding_step_to_trace( + **build_embedding_step_kwargs( + response, + kwargs, + start_time, + end_time, + name="OpenAI Embedding", + provider="OpenAI", + inference_id=inference_id, + ) + ) + except Exception as e: + logger.error( + "Failed to trace the OpenAI embedding request with Openlayer. %s", e + ) + + return response + + # --------------------------- OpenAI Assistants API -------------------------- # def trace_openai_assistant_thread_run( client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run" diff --git a/src/openlayer/lib/tracing/enums.py b/src/openlayer/lib/tracing/enums.py index ee483b7a..e99d4b72 100644 --- a/src/openlayer/lib/tracing/enums.py +++ b/src/openlayer/lib/tracing/enums.py @@ -8,6 +8,7 @@ class StepType(enum.Enum): AGENT = "agent" CHAT_COMPLETION = "chat_completion" + EMBEDDING = "embedding" GUARDRAIL = "guardrail" HANDOFF = "handoff" RETRIEVER = "retriever" diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index baea887f..ad18c80c 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -518,6 +518,15 @@ def add_chat_completion_step_to_trace(**kwargs) -> None: step.log(**kwargs) +def add_embedding_step_to_trace(**kwargs) -> None: + """Adds an embedding step to the trace.""" + with create_step( + step_type=enums.StepType.EMBEDDING, + name=kwargs.get("name", "Embedding"), + ) as step: + step.log(**kwargs) + + def trace( *step_args, inference_pipeline_id: Optional[str] = None, diff --git a/tests/test_async_openai_embedding_integration.py b/tests/test_async_openai_embedding_integration.py new file mode 100644 index 00000000..5af59f18 --- /dev/null +++ b/tests/test_async_openai_embedding_integration.py @@ -0,0 +1,104 @@ +"""Test OpenAI embedding integration (async).""" + +# openlayer.lib.integrations is in pyright's ignore list, so imports get +# unknown/partially unknown types; disable these diagnostics for this test file only. +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportUnknownParameterType=false, reportMissingParameterType=false + +import asyncio +from unittest.mock import Mock, AsyncMock, MagicMock, patch + + +class TestOpenAIAsyncEmbedding: + def _fake_response(self, embeddings, prompt_tokens=4, model="text-embedding-3-small"): + response = Mock() + response.model = model + response.data = [Mock(embedding=v) for v in embeddings] + response.usage = Mock(prompt_tokens=prompt_tokens, total_tokens=prompt_tokens) + response.model_dump = Mock(return_value={"model": model}) + return response + + def test_handle_embedding_async_single_input(self) -> None: + from openlayer.lib.integrations.async_openai_tracer import ( + handle_embedding_async, + ) + + fake = self._fake_response([[0.1, 0.2, 0.3]]) + original = AsyncMock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + result = asyncio.run( + handle_embedding_async( + original_func=original, + model="text-embedding-3-small", + input="hello", + ) + ) + + assert result is fake + kwargs = mock_add.call_args.kwargs + assert kwargs["name"] == "OpenAI Embedding" + assert kwargs["provider"] == "OpenAI" + assert kwargs["output"] == [0.1, 0.2, 0.3] + assert kwargs["embedding_dimensions"] == 3 + assert kwargs["embedding_count"] == 1 + + def test_handle_embedding_async_batch_input(self) -> None: + from openlayer.lib.integrations.async_openai_tracer import ( + handle_embedding_async, + ) + + fake = self._fake_response([[0.1, 0.2], [0.3, 0.4]], prompt_tokens=6) + original = AsyncMock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + asyncio.run( + handle_embedding_async( + original_func=original, + model="text-embedding-3-small", + input=["a", "b"], + ) + ) + + kwargs = mock_add.call_args.kwargs + assert kwargs["inputs"] == {"input": ["a", "b"]} + assert kwargs["output"] == [[0.1, 0.2], [0.3, 0.4]] + assert kwargs["embedding_count"] == 2 + assert kwargs["prompt_tokens"] == 6 + + def test_handle_embedding_async_failure_does_not_break_client(self) -> None: + from openlayer.lib.integrations.async_openai_tracer import ( + handle_embedding_async, + ) + + fake = self._fake_response([[0.0]]) + original = AsyncMock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace", + side_effect=RuntimeError("backend down"), + ): + result = asyncio.run( + handle_embedding_async( + original_func=original, + model="text-embedding-3-small", + input="x", + ) + ) + + assert result is fake + + def test_trace_async_openai_patches_embeddings_create(self) -> None: + import openai # pyright: ignore[reportMissingImports] + + from openlayer.lib.integrations.async_openai_tracer import trace_async_openai + + client = MagicMock(spec=openai.AsyncOpenAI) + original_create = client.embeddings.create + + traced_client = trace_async_openai(client) + + assert traced_client.embeddings.create is not original_create diff --git a/tests/test_bedrock_integration.py b/tests/test_bedrock_integration.py new file mode 100644 index 00000000..1b53bef5 --- /dev/null +++ b/tests/test_bedrock_integration.py @@ -0,0 +1,302 @@ +"""Test AWS Bedrock integration.""" + +# openlayer.lib.integrations is in pyright's ignore list, so imports from bedrock_tracer +# get unknown/partially unknown types; disable these diagnostics for this test file only. +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportUnknownParameterType=false, reportMissingParameterType=false + +import io +import json +from unittest.mock import MagicMock, patch + + +class TestBedrockChatRegression: + """Lock in existing chat-completion behaviour before refactoring.""" + + def _make_anthropic_response(self, body_dict): + """Build a response dict mimicking what bedrock-runtime returns.""" + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + body_bytes = json.dumps(body_dict).encode("utf-8") + return {"body": StreamingBody(io.BytesIO(body_bytes), len(body_bytes))} + + def test_anthropic_chat_invoke_routes_through_existing_handler(self) -> None: + """invoke_model with a Claude model must hit the existing chat handler.""" + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + mock_client = MagicMock() + mock_client.invoke_model.return_value = self._make_anthropic_response( + { + "id": "msg_01", + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "hello back"}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 10, "output_tokens": 5}, + } + ) + + traced = trace_bedrock(mock_client) + + with patch("openlayer.lib.integrations.bedrock_tracer.add_to_trace") as mock_add: + response = traced.invoke_model( + modelId="anthropic.claude-3-haiku-20240307-v1:0", + body=json.dumps( + { + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 100, + "messages": [{"role": "user", "content": "hi"}], + } + ), + contentType="application/json", + accept="application/json", + ) + + # The chat path called add_to_trace (which wraps add_chat_completion_step_to_trace). + mock_add.assert_called_once() + kwargs = mock_add.call_args.kwargs + assert kwargs["model"] == "anthropic.claude-3-haiku-20240307-v1:0" + assert kwargs["output"] == "hello back" + assert kwargs["prompt_tokens"] == 10 + assert kwargs["completion_tokens"] == 5 + + # Caller can still consume the response body — critical regression guard. + replayed = response["body"].read() + assert b"hello back" in replayed + + +class TestBedrockEmbeddingDetection: + """Detection routes embedding models away from the chat handler.""" + + def test_is_embedding_model_titan(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import _is_embedding_model + + assert _is_embedding_model("amazon.titan-embed-text-v1") is True + assert _is_embedding_model("amazon.titan-embed-text-v2:0") is True + + def test_is_embedding_model_cohere(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import _is_embedding_model + + assert _is_embedding_model("cohere.embed-english-v3") is True + assert _is_embedding_model("cohere.embed-multilingual-v3") is True + + def test_is_embedding_model_chat_returns_false(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import _is_embedding_model + + assert _is_embedding_model("anthropic.claude-3-haiku-20240307-v1:0") is False + assert _is_embedding_model("meta.llama3-70b-instruct-v1:0") is False + + def test_is_embedding_model_handles_empty_string(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import _is_embedding_model + + assert _is_embedding_model("") is False + + def test_traced_invoke_routes_embedding_to_new_handler(self) -> None: + """An embedding modelId must call handle_embedding_invoke, not the chat handler.""" + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + body = json.dumps( + {"embedding": [0.1, 0.2], "inputTextTokenCount": 4} + ).encode("utf-8") + mock_client = MagicMock() + mock_client.invoke_model.return_value = { + "body": StreamingBody(io.BytesIO(body), len(body)) + } + traced = trace_bedrock(mock_client) + + with patch( + "openlayer.lib.integrations.bedrock_tracer.handle_embedding_invoke" + ) as mock_embed, patch( + "openlayer.lib.integrations.bedrock_tracer.handle_non_streaming_invoke" + ) as mock_chat: + mock_embed.return_value = {"body": "ok"} + traced.invoke_model( + modelId="amazon.titan-embed-text-v2:0", + body=json.dumps({"inputText": "hi"}), + ) + + mock_embed.assert_called_once() + mock_chat.assert_not_called() + + +class TestBedrockTitanEmbedding: + """Titan v1 and v2 embedding requests produce well-formed embedding steps.""" + + def _titan_response(self, embedding, token_count): + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + body = json.dumps( + {"embedding": embedding, "inputTextTokenCount": token_count} + ).encode("utf-8") + return {"body": StreamingBody(io.BytesIO(body), len(body))} + + def test_titan_v2_single_embedding(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + vec = [0.1, 0.2, 0.3, 0.4] + mock_client = MagicMock() + mock_client.invoke_model.return_value = self._titan_response(vec, 7) + traced = trace_bedrock(mock_client) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + response = traced.invoke_model( + modelId="amazon.titan-embed-text-v2:0", + body=json.dumps( + {"inputText": "hello world", "dimensions": 4, "normalize": True} + ), + contentType="application/json", + accept="application/json", + ) + + mock_add.assert_called_once() + kwargs = mock_add.call_args.kwargs + assert kwargs["name"] == "AWS Bedrock Embedding" + assert kwargs["model"] == "amazon.titan-embed-text-v2:0" + assert kwargs["provider"] == "Bedrock" + assert kwargs["inputs"] == {"input": "hello world"} + assert kwargs["output"] == vec + assert kwargs["embedding_dimensions"] == 4 + assert kwargs["embedding_count"] == 1 + assert kwargs["prompt_tokens"] == 7 + assert kwargs["tokens"] == 7 + assert kwargs["model_parameters"] == { + "dimensions": 4, + "normalize": True, + } + assert response["body"].read() == json.dumps( + {"embedding": vec, "inputTextTokenCount": 7} + ).encode("utf-8") + + def test_titan_v1_single_embedding(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + vec = [0.5] * 1536 + mock_client = MagicMock() + mock_client.invoke_model.return_value = self._titan_response(vec, 12) + traced = trace_bedrock(mock_client) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + traced.invoke_model( + modelId="amazon.titan-embed-text-v1", + body=json.dumps({"inputText": "another"}), + ) + + kwargs = mock_add.call_args.kwargs + assert kwargs["model"] == "amazon.titan-embed-text-v1" + assert kwargs["embedding_dimensions"] == 1536 + assert kwargs["embedding_count"] == 1 + assert kwargs["prompt_tokens"] == 12 + # v1 has no `dimensions`/`normalize` params in its request body + assert kwargs["model_parameters"] == { + "dimensions": None, + "normalize": None, + } + + +class TestBedrockCohereEmbedding: + """Cohere v3 embedding produces a multi-vector batch step.""" + + def _cohere_response(self, embeddings): + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + body = json.dumps( + { + "embeddings": embeddings, + "id": "abc-123", + "response_type": "embeddings_floats", + } + ).encode("utf-8") + return {"body": StreamingBody(io.BytesIO(body), len(body))} + + def test_cohere_embed_batch(self) -> None: + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] + mock_client = MagicMock() + mock_client.invoke_model.return_value = self._cohere_response(embeddings) + traced = trace_bedrock(mock_client) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + traced.invoke_model( + modelId="cohere.embed-english-v3", + body=json.dumps( + { + "texts": ["one", "two", "three"], + "input_type": "search_document", + } + ), + ) + + kwargs = mock_add.call_args.kwargs + assert kwargs["model"] == "cohere.embed-english-v3" + assert kwargs["inputs"] == {"input": ["one", "two", "three"]} + assert kwargs["output"] == embeddings + assert kwargs["embedding_dimensions"] == 3 + assert kwargs["embedding_count"] == 3 + # Cohere v3 does not return tokens in its response body. + assert kwargs["prompt_tokens"] == 0 + assert kwargs["model_parameters"] == { + "input_type": "search_document", + "truncate": None, + "embedding_types": None, + } + + +class TestBedrockEmbeddingResilience: + """Tracing failures must never break the caller; response body must remain usable.""" + + def test_embedding_failure_does_not_break_client(self) -> None: + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + body_bytes = json.dumps( + {"embedding": [0.1, 0.2], "inputTextTokenCount": 4} + ).encode("utf-8") + mock_client = MagicMock() + mock_client.invoke_model.return_value = { + "body": StreamingBody(io.BytesIO(body_bytes), len(body_bytes)) + } + traced = trace_bedrock(mock_client) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace", + side_effect=RuntimeError("backend down"), + ): + response = traced.invoke_model( + modelId="amazon.titan-embed-text-v2:0", + body=json.dumps({"inputText": "hi"}), + ) + + # Caller still gets the real response. + assert response["body"].read() == body_bytes + + def test_embedding_response_body_is_replayable(self) -> None: + from botocore.response import StreamingBody # pyright: ignore[reportMissingImports] + + from openlayer.lib.integrations.bedrock_tracer import trace_bedrock + + body_bytes = json.dumps( + {"embedding": [0.9, 0.8, 0.7], "inputTextTokenCount": 3} + ).encode("utf-8") + mock_client = MagicMock() + mock_client.invoke_model.return_value = { + "body": StreamingBody(io.BytesIO(body_bytes), len(body_bytes)) + } + traced = trace_bedrock(mock_client) + + with patch("openlayer.lib.tracing.tracer.add_embedding_step_to_trace"): + response = traced.invoke_model( + modelId="amazon.titan-embed-text-v2:0", + body=json.dumps({"inputText": "x"}), + ) + + # The body must be readable by the caller after tracing has consumed it. + assert response["body"].read() == body_bytes diff --git a/tests/test_litellm_integration.py b/tests/test_litellm_integration.py index ecebbff8..c01e2372 100644 --- a/tests/test_litellm_integration.py +++ b/tests/test_litellm_integration.py @@ -272,6 +272,178 @@ def test_detect_provider_with_litellm_method(self, mock_litellm: Mock) -> None: mock_response = Mock(spec=[]) # No special attributes provider = detect_provider_from_response(mock_response, 'gpt-4') - + assert provider == 'openai' mock_litellm.get_llm_provider.assert_called_once_with('gpt-4') + + +class TestLiteLLMEmbedding: + """Embedding calls must be traced via add_embedding_step_to_trace.""" + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + @patch("openlayer.lib.integrations.litellm_tracer.litellm") + def test_trace_litellm_patches_embedding(self, mock_litellm: Mock) -> None: + from openlayer.lib.integrations import litellm_tracer + + litellm_tracer._litellm_traced = False + + original_embedding = Mock() + mock_litellm.embedding = original_embedding + mock_litellm.completion = Mock() + + from openlayer.lib.integrations.litellm_tracer import trace_litellm + + trace_litellm() + + assert mock_litellm.embedding != original_embedding + assert callable(mock_litellm.embedding) + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + def test_handle_embedding_single_input(self) -> None: + from openlayer.lib.integrations.litellm_tracer import ( + handle_embedding, # pyright: ignore[reportUnknownVariableType] + ) + + fake_response = Mock() + fake_response.model = "text-embedding-3-small" + fake_response.data = [Mock(embedding=[0.1, 0.2, 0.3])] + fake_response.usage = Mock(prompt_tokens=4, total_tokens=4) + fake_response.model_dump = Mock( + return_value={"model": "text-embedding-3-small"} + ) + fake_response._hidden_params = {} + + embedding_func = Mock(return_value=fake_response) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + result = handle_embedding( + embedding_func=embedding_func, + model="text-embedding-3-small", + input="hello", + inference_id="custom-id", + ) + + assert result is fake_response + mock_add.assert_called_once() + kwargs = mock_add.call_args.kwargs + assert kwargs["name"] == "LiteLLM Embedding" + assert kwargs["model"] == "text-embedding-3-small" + assert kwargs["inputs"] == {"input": "hello"} + assert kwargs["output"] == [0.1, 0.2, 0.3] + assert kwargs["embedding_dimensions"] == 3 + assert kwargs["embedding_count"] == 1 + assert kwargs["prompt_tokens"] == 4 + assert kwargs["tokens"] == 4 + assert kwargs["id"] == "custom-id" + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + def test_handle_embedding_batch_input(self) -> None: + from openlayer.lib.integrations.litellm_tracer import ( + handle_embedding, # pyright: ignore[reportUnknownVariableType] + ) + + fake_response = Mock() + fake_response.model = "text-embedding-3-small" + fake_response.data = [ + Mock(embedding=[0.1, 0.2, 0.3]), + Mock(embedding=[0.4, 0.5, 0.6]), + Mock(embedding=[0.7, 0.8, 0.9]), + ] + fake_response.usage = Mock(prompt_tokens=12, total_tokens=12) + fake_response.model_dump = Mock(return_value={}) + fake_response._hidden_params = {} + + embedding_func = Mock(return_value=fake_response) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + handle_embedding( + embedding_func=embedding_func, + model="text-embedding-3-small", + input=["a", "b", "c"], + ) + + kwargs = mock_add.call_args.kwargs + assert kwargs["inputs"] == {"input": ["a", "b", "c"]} + assert kwargs["output"] == [ + [0.1, 0.2, 0.3], + [0.4, 0.5, 0.6], + [0.7, 0.8, 0.9], + ] + assert kwargs["embedding_dimensions"] == 3 + assert kwargs["embedding_count"] == 3 + assert kwargs["prompt_tokens"] == 12 + assert kwargs["tokens"] == 12 + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + def test_handle_embedding_cost_from_hidden_params(self) -> None: + from openlayer.lib.integrations.litellm_tracer import ( + handle_embedding, # pyright: ignore[reportUnknownVariableType] + ) + + fake_response = Mock() + fake_response.model = "text-embedding-3-small" + fake_response.data = [Mock(embedding=[0.1, 0.2])] + fake_response.usage = Mock(prompt_tokens=2, total_tokens=2) + fake_response.model_dump = Mock(return_value={}) + fake_response._hidden_params = {"response_cost": 0.0000123} + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + handle_embedding( + embedding_func=Mock(return_value=fake_response), + model="text-embedding-3-small", + input="x", + ) + + assert mock_add.call_args.kwargs["cost"] == 0.0000123 + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + def test_handle_embedding_failure_does_not_break_client(self) -> None: + from openlayer.lib.integrations.litellm_tracer import ( + handle_embedding, # pyright: ignore[reportUnknownVariableType] + ) + + fake_response = Mock() + fake_response.model = "text-embedding-3-small" + fake_response.data = [Mock(embedding=[0.1])] + fake_response.usage = Mock(prompt_tokens=1, total_tokens=1) + fake_response.model_dump = Mock(return_value={}) + fake_response._hidden_params = {} + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace", + side_effect=RuntimeError("backend down"), + ): + result = handle_embedding( + embedding_func=Mock(return_value=fake_response), + model="text-embedding-3-small", + input="x", + ) + + assert result is fake_response + + @patch("openlayer.lib.integrations.litellm_tracer.HAVE_LITELLM", True) + @patch("openlayer.lib.integrations.litellm_tracer.litellm") + def test_completion_path_unchanged_after_embedding_patch( + self, mock_litellm: Mock + ) -> None: + """Patching embedding must not affect completion behaviour.""" + from openlayer.lib.integrations import litellm_tracer + + litellm_tracer._litellm_traced = False + + original_completion = Mock() + mock_litellm.completion = original_completion + mock_litellm.embedding = Mock() + + from openlayer.lib.integrations.litellm_tracer import trace_litellm + + trace_litellm() + + assert mock_litellm.completion != original_completion + assert callable(mock_litellm.completion) diff --git a/tests/test_openai_embedding_helpers.py b/tests/test_openai_embedding_helpers.py new file mode 100644 index 00000000..75a9c7c2 --- /dev/null +++ b/tests/test_openai_embedding_helpers.py @@ -0,0 +1,98 @@ +"""Test shared OpenAI embedding parsers.""" + +# openlayer.lib.integrations is in pyright's ignore list, so imports get +# unknown/partially unknown types; disable these diagnostics for this test file only. +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportUnknownParameterType=false, reportMissingParameterType=false + +from unittest.mock import Mock + + +class TestParseEmbeddingResponse: + def test_single_vector(self) -> None: + from openlayer.lib.integrations._openai_embedding_common import ( + parse_embedding_response, + ) + + response = Mock() + response.data = [Mock(embedding=[0.1, 0.2, 0.3, 0.4])] + + embeddings, dim, count = parse_embedding_response(response) + assert embeddings == [0.1, 0.2, 0.3, 0.4] + assert dim == 4 + assert count == 1 + + def test_batch_vectors(self) -> None: + from openlayer.lib.integrations._openai_embedding_common import ( + parse_embedding_response, + ) + + response = Mock() + response.data = [ + Mock(embedding=[0.1, 0.2]), + Mock(embedding=[0.3, 0.4]), + ] + + embeddings, dim, count = parse_embedding_response(response) + assert embeddings == [[0.1, 0.2], [0.3, 0.4]] + assert dim == 2 + assert count == 2 + + def test_empty_data(self) -> None: + from openlayer.lib.integrations._openai_embedding_common import ( + parse_embedding_response, + ) + + response = Mock() + response.data = [] + + embeddings, dim, count = parse_embedding_response(response) + assert embeddings == [] + assert dim == 0 + assert count == 0 + + def test_dict_data_items(self) -> None: + """Some response shapes carry dict items instead of model objects.""" + from openlayer.lib.integrations._openai_embedding_common import ( + parse_embedding_response, + ) + + response = Mock() + response.data = [{"embedding": [0.5, 0.6]}] + + embeddings, dim, count = parse_embedding_response(response) + assert embeddings == [0.5, 0.6] + assert dim == 2 + assert count == 1 + + +class TestGetEmbeddingModelParameters: + def test_extracts_relevant_params(self) -> None: + from openlayer.lib.integrations._openai_embedding_common import ( + get_embedding_model_parameters, + ) + + params = get_embedding_model_parameters( + { + "dimensions": 1536, + "encoding_format": "float", + "user": "u1", + "irrelevant": "ignored", + } + ) + assert params == { + "dimensions": 1536, + "encoding_format": "float", + "user": "u1", + } + + def test_missing_params_default_to_none(self) -> None: + from openlayer.lib.integrations._openai_embedding_common import ( + get_embedding_model_parameters, + ) + + params = get_embedding_model_parameters({}) + assert params == { + "dimensions": None, + "encoding_format": None, + "user": None, + } diff --git a/tests/test_openai_embedding_integration.py b/tests/test_openai_embedding_integration.py new file mode 100644 index 00000000..7a8577a5 --- /dev/null +++ b/tests/test_openai_embedding_integration.py @@ -0,0 +1,104 @@ +"""Test OpenAI embedding integration (sync).""" + +# openlayer.lib.integrations is in pyright's ignore list, so imports get +# unknown/partially unknown types; disable these diagnostics for this test file only. +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportUnknownParameterType=false, reportMissingParameterType=false + +from unittest.mock import Mock, MagicMock, patch + + +class TestOpenAISyncEmbedding: + """Sync OpenAI client.embeddings.create must be traced.""" + + def _fake_response(self, embeddings, prompt_tokens=4, model="text-embedding-3-small"): + response = Mock() + response.model = model + response.data = [Mock(embedding=v) for v in embeddings] + response.usage = Mock(prompt_tokens=prompt_tokens, total_tokens=prompt_tokens) + response.model_dump = Mock(return_value={"model": model}) + return response + + def test_handle_embedding_single_input(self) -> None: + from openlayer.lib.integrations.openai_tracer import handle_embedding + + fake = self._fake_response([[0.1, 0.2, 0.3]]) + original = Mock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + result = handle_embedding( + original_func=original, + model="text-embedding-3-small", + input="hello", + inference_id="abc", + ) + + assert result is fake + kwargs = mock_add.call_args.kwargs + assert kwargs["name"] == "OpenAI Embedding" + assert kwargs["provider"] == "OpenAI" + assert kwargs["model"] == "text-embedding-3-small" + assert kwargs["inputs"] == {"input": "hello"} + assert kwargs["output"] == [0.1, 0.2, 0.3] + assert kwargs["embedding_dimensions"] == 3 + assert kwargs["embedding_count"] == 1 + assert kwargs["prompt_tokens"] == 4 + assert kwargs["tokens"] == 4 + assert kwargs["id"] == "abc" + + def test_handle_embedding_batch_input(self) -> None: + from openlayer.lib.integrations.openai_tracer import handle_embedding + + fake = self._fake_response( + [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]], prompt_tokens=9 + ) + original = Mock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace" + ) as mock_add: + handle_embedding( + original_func=original, + model="text-embedding-3-small", + input=["one", "two", "three"], + ) + + kwargs = mock_add.call_args.kwargs + assert kwargs["inputs"] == {"input": ["one", "two", "three"]} + assert kwargs["output"] == [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]] + assert kwargs["embedding_dimensions"] == 2 + assert kwargs["embedding_count"] == 3 + assert kwargs["prompt_tokens"] == 9 + + def test_handle_embedding_failure_does_not_break_client(self) -> None: + from openlayer.lib.integrations.openai_tracer import handle_embedding + + fake = self._fake_response([[0.0]]) + original = Mock(return_value=fake) + + with patch( + "openlayer.lib.tracing.tracer.add_embedding_step_to_trace", + side_effect=RuntimeError("backend down"), + ): + result = handle_embedding( + original_func=original, + model="text-embedding-3-small", + input="x", + ) + + assert result is fake + + def test_trace_openai_patches_embeddings_create(self) -> None: + """After trace_openai, client.embeddings.create is replaced.""" + import openai # pyright: ignore[reportMissingImports] + + from openlayer.lib.integrations.openai_tracer import trace_openai + + # Make client appear like a real OpenAI (not Azure) client. + client = MagicMock(spec=openai.OpenAI) + original_create = client.embeddings.create + + traced_client = trace_openai(client) + + assert traced_client.embeddings.create is not original_create diff --git a/tests/test_tracing_core.py b/tests/test_tracing_core.py index 99e2b18a..a8984fb7 100644 --- a/tests/test_tracing_core.py +++ b/tests/test_tracing_core.py @@ -812,3 +812,66 @@ def my_func(tool_count: int) -> None: # "tool_count" exists in inputs, so it should be promoted from there my_func(tool_count=5) + + +class TestEmbeddingStep: + """Tests for StepType.EMBEDDING and the add_embedding_step_to_trace helper.""" + + def test_step_type_embedding_exists(self) -> None: + """StepType.EMBEDDING must exist with the expected string value.""" + from openlayer.lib.tracing.enums import StepType + + assert StepType.EMBEDDING.value == "embedding" + + def test_add_embedding_step_to_trace_creates_step_with_correct_type(self) -> None: + """The helper must create a step with StepType.EMBEDDING and forward kwargs to step.log.""" + from unittest.mock import MagicMock + + from openlayer.lib.tracing import enums + + mock_step = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__.return_value = mock_step + mock_ctx.__exit__.return_value = None + + with patch.object(tracer, "create_step", return_value=mock_ctx) as mock_create_step: + tracer.add_embedding_step_to_trace( + name="OpenAI Embedding", + inputs={"input": "hello"}, + output=[0.1, 0.2, 0.3], + model="text-embedding-3-small", + embedding_dimensions=3, + embedding_count=1, + ) + + mock_create_step.assert_called_once_with( + step_type=enums.StepType.EMBEDDING, + name="OpenAI Embedding", + ) + mock_step.log.assert_called_once_with( + name="OpenAI Embedding", + inputs={"input": "hello"}, + output=[0.1, 0.2, 0.3], + model="text-embedding-3-small", + embedding_dimensions=3, + embedding_count=1, + ) + + def test_add_embedding_step_to_trace_uses_default_name(self) -> None: + """When no name is given, the helper must default to 'Embedding'.""" + from unittest.mock import MagicMock + + from openlayer.lib.tracing import enums + + mock_step = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__.return_value = mock_step + mock_ctx.__exit__.return_value = None + + with patch.object(tracer, "create_step", return_value=mock_ctx) as mock_create_step: + tracer.add_embedding_step_to_trace(model="x", output=[0.0]) + + mock_create_step.assert_called_once_with( + step_type=enums.StepType.EMBEDDING, + name="Embedding", + )