diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index 0b0bffa5b..4ade1a9b9 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import queue import random @@ -29,6 +30,8 @@ def export(self, items: list[Trace | Span[Any]]) -> None: class BackendSpanExporter(TracingExporter): _OPENAI_TRACING_INGEST_ENDPOINT = "https://api.openai.com/v1/traces/ingest" + _OPENAI_TRACING_MAX_FIELD_BYTES = 100_000 + _OPENAI_TRACING_STRING_TRUNCATION_SUFFIX = "... [truncated]" _OPENAI_TRACING_ALLOWED_USAGE_KEYS = frozenset( { "input_tokens", @@ -182,32 +185,91 @@ def _should_sanitize_for_openai_tracing_api(self) -> bool: return self.endpoint.rstrip("/") == self._OPENAI_TRACING_INGEST_ENDPOINT.rstrip("/") def _sanitize_for_openai_tracing_api(self, payload_item: dict[str, Any]) -> dict[str, Any]: - """Drop fields known to be rejected by OpenAI tracing ingestion.""" + """Drop or truncate fields known to be rejected by OpenAI tracing ingestion.""" span_data = payload_item.get("span_data") if not isinstance(span_data, dict): return payload_item - if span_data.get("type") != "generation": - return payload_item - - usage = span_data.get("usage") - if not isinstance(usage, dict): - return payload_item + sanitized_span_data = span_data + did_mutate = False - filtered_usage = { - key: value - for key, value in usage.items() - if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS - } - if filtered_usage == usage: + for field_name in ("input", "output"): + if field_name not in span_data: + continue + truncated_field = self._truncate_span_field_value(span_data[field_name]) + if truncated_field != span_data[field_name]: + if not did_mutate: + sanitized_span_data = dict(span_data) + did_mutate = True + sanitized_span_data[field_name] = truncated_field + + if span_data.get("type") == "generation": + usage = span_data.get("usage") + if isinstance(usage, dict): + filtered_usage = { + key: value + for key, value in usage.items() + if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS + } + if filtered_usage != usage: + if not did_mutate: + sanitized_span_data = dict(span_data) + did_mutate = True + sanitized_span_data["usage"] = filtered_usage + + if not did_mutate: return payload_item - sanitized_span_data = dict(span_data) - sanitized_span_data["usage"] = filtered_usage sanitized_payload_item = dict(payload_item) sanitized_payload_item["span_data"] = sanitized_span_data return sanitized_payload_item + def _value_json_size_bytes(self, value: Any) -> int: + return len(json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8")) + + def _truncate_string_for_json_limit(self, value: str, max_bytes: int) -> str: + suffix = self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX + value_size = self._value_json_size_bytes(value) + if value_size <= max_bytes: + return value + suffix_size = self._value_json_size_bytes(suffix) + if suffix_size > max_bytes: + return "" + if max_bytes <= suffix_size: + return suffix if suffix_size == max_bytes else "" + + # Use a proportional estimate to avoid repeatedly serializing dozens of candidates. + budget_without_suffix = max_bytes - suffix_size + estimated_chars = int(len(value) * budget_without_suffix / max(value_size, 1)) + estimated_chars = max(0, min(len(value), estimated_chars)) + + best = value[:estimated_chars] + suffix + best_size = self._value_json_size_bytes(best) + while best_size > max_bytes and estimated_chars > 0: + overflow_ratio = (best_size - max_bytes) / max(best_size, 1) + trim_chars = max(1, int(estimated_chars * overflow_ratio) + 1) + estimated_chars = max(0, estimated_chars - trim_chars) + best = value[:estimated_chars] + suffix + best_size = self._value_json_size_bytes(best) + return best + + def _truncate_span_field_value(self, value: Any) -> Any: + max_bytes = self._OPENAI_TRACING_MAX_FIELD_BYTES + if self._value_json_size_bytes(value) <= max_bytes: + return value + + if isinstance(value, str): + return self._truncate_string_for_json_limit(value, max_bytes) + + preview = str(value) + if len(preview) > 512: + preview = preview[:512] + self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX + return { + "truncated": True, + "original_type": type(value).__name__, + "preview": preview, + } + def close(self): """Close the underlying HTTP client.""" self._client.close() diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index 5ebbd1e12..7023b3163 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -391,6 +391,108 @@ def export(self): exporter.close() +@patch("httpx.Client") +def test_backend_span_exporter_truncates_large_input_for_openai_tracing(mock_client): + class DummyItem: + tracing_api_key = None + + def __init__(self): + self.exported_payload: dict[str, Any] = { + "object": "trace.span", + "span_data": { + "type": "generation", + "input": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000), + }, + } + + def export(self): + return self.exported_payload + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.return_value.post.return_value = mock_response + + exporter = BackendSpanExporter(api_key="test_key") + item = DummyItem() + exporter.export([cast(Any, item)]) + + sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0] + sent_input = sent_payload["span_data"]["input"] + assert isinstance(sent_input, str) + assert sent_input.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) + assert exporter._value_json_size_bytes(sent_input) <= exporter._OPENAI_TRACING_MAX_FIELD_BYTES + assert item.exported_payload["span_data"]["input"] != sent_input + exporter.close() + + +@patch("httpx.Client") +def test_backend_span_exporter_truncates_large_non_string_input_for_openai_tracing(mock_client): + class DummyItem: + tracing_api_key = None + + def __init__(self): + self.exported_payload: dict[str, Any] = { + "object": "trace.span", + "span_data": { + "type": "generation", + "input": { + "blob": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000) + }, + }, + } + + def export(self): + return self.exported_payload + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.return_value.post.return_value = mock_response + + exporter = BackendSpanExporter(api_key="test_key") + exporter.export([cast(Any, DummyItem())]) + + sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0] + sent_input = sent_payload["span_data"]["input"] + assert isinstance(sent_input, dict) + assert sent_input["truncated"] is True + assert sent_input["original_type"] == "dict" + assert exporter._value_json_size_bytes(sent_input) <= exporter._OPENAI_TRACING_MAX_FIELD_BYTES + exporter.close() + + +@patch("httpx.Client") +def test_backend_span_exporter_keeps_large_input_for_custom_endpoint(mock_client): + class DummyItem: + tracing_api_key = None + + def __init__(self): + self.exported_payload: dict[str, Any] = { + "object": "trace.span", + "span_data": { + "type": "generation", + "input": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000), + }, + } + + def export(self): + return self.exported_payload + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.return_value.post.return_value = mock_response + + exporter = BackendSpanExporter( + api_key="test_key", + endpoint="https://example.com/v1/traces/ingest", + ) + item = DummyItem() + exporter.export([cast(Any, item)]) + + sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0] + assert sent_payload["span_data"]["input"] == item.exported_payload["span_data"]["input"] + exporter.close() + + def test_sanitize_for_openai_tracing_api_keeps_allowed_generation_usage(): exporter = BackendSpanExporter(api_key="test_key") payload = { @@ -421,3 +523,56 @@ def test_sanitize_for_openai_tracing_api_skips_non_dict_generation_usage(): } assert exporter._sanitize_for_openai_tracing_api(payload) is payload exporter.close() + + +def test_sanitize_for_openai_tracing_api_keeps_small_input_without_mutation(): + exporter = BackendSpanExporter(api_key="test_key") + payload = { + "object": "trace.span", + "span_data": { + "type": "generation", + "input": "short input", + "usage": {"input_tokens": 1}, + }, + } + assert exporter._sanitize_for_openai_tracing_api(payload) is payload + exporter.close() + + +def test_truncate_string_for_json_limit_returns_original_when_within_limit(): + exporter = BackendSpanExporter(api_key="test_key") + value = "hello" + max_bytes = exporter._value_json_size_bytes(value) + assert exporter._truncate_string_for_json_limit(value, max_bytes) == value + exporter.close() + + +def test_truncate_string_for_json_limit_returns_empty_when_suffix_too_large(): + exporter = BackendSpanExporter(api_key="test_key") + max_bytes = ( + exporter._value_json_size_bytes(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) - 1 + ) + assert exporter._truncate_string_for_json_limit("x" * 100, max_bytes) == "" + exporter.close() + + +def test_truncate_string_for_json_limit_returns_suffix_when_limit_equals_suffix(): + exporter = BackendSpanExporter(api_key="test_key") + max_bytes = exporter._value_json_size_bytes(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) + assert ( + exporter._truncate_string_for_json_limit("x" * 100, max_bytes) + == exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX + ) + exporter.close() + + +def test_truncate_string_for_json_limit_handles_escape_heavy_input(): + exporter = BackendSpanExporter(api_key="test_key") + value = ('\\"' * 40000) + "tail" + max_bytes = exporter._OPENAI_TRACING_MAX_FIELD_BYTES + + truncated = exporter._truncate_string_for_json_limit(value, max_bytes) + + assert truncated.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) + assert exporter._value_json_size_bytes(truncated) <= max_bytes + exporter.close()