Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 77 additions & 15 deletions src/agents/tracing/processors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import os
import queue
import random
Expand Down Expand Up @@ -29,6 +30,8 @@ def export(self, items: list[Trace | Span[Any]]) -> None:

class BackendSpanExporter(TracingExporter):
_OPENAI_TRACING_INGEST_ENDPOINT = "https://api.openai.com/v1/traces/ingest"
_OPENAI_TRACING_MAX_FIELD_BYTES = 100_000
_OPENAI_TRACING_STRING_TRUNCATION_SUFFIX = "... [truncated]"
_OPENAI_TRACING_ALLOWED_USAGE_KEYS = frozenset(
{
"input_tokens",
Expand Down Expand Up @@ -182,32 +185,91 @@ def _should_sanitize_for_openai_tracing_api(self) -> bool:
return self.endpoint.rstrip("/") == self._OPENAI_TRACING_INGEST_ENDPOINT.rstrip("/")

def _sanitize_for_openai_tracing_api(self, payload_item: dict[str, Any]) -> dict[str, Any]:
"""Drop fields known to be rejected by OpenAI tracing ingestion."""
"""Drop or truncate fields known to be rejected by OpenAI tracing ingestion."""
span_data = payload_item.get("span_data")
if not isinstance(span_data, dict):
return payload_item

if span_data.get("type") != "generation":
return payload_item

usage = span_data.get("usage")
if not isinstance(usage, dict):
return payload_item
sanitized_span_data = span_data
did_mutate = False

filtered_usage = {
key: value
for key, value in usage.items()
if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS
}
if filtered_usage == usage:
for field_name in ("input", "output"):
if field_name not in span_data:
continue
truncated_field = self._truncate_span_field_value(span_data[field_name])
if truncated_field != span_data[field_name]:
if not did_mutate:
sanitized_span_data = dict(span_data)
did_mutate = True
sanitized_span_data[field_name] = truncated_field

if span_data.get("type") == "generation":
usage = span_data.get("usage")
if isinstance(usage, dict):
filtered_usage = {
key: value
for key, value in usage.items()
if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS
}
if filtered_usage != usage:
if not did_mutate:
sanitized_span_data = dict(span_data)
did_mutate = True
sanitized_span_data["usage"] = filtered_usage

if not did_mutate:
return payload_item

sanitized_span_data = dict(span_data)
sanitized_span_data["usage"] = filtered_usage
sanitized_payload_item = dict(payload_item)
sanitized_payload_item["span_data"] = sanitized_span_data
return sanitized_payload_item

def _value_json_size_bytes(self, value: Any) -> int:
return len(json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8"))

def _truncate_string_for_json_limit(self, value: str, max_bytes: int) -> str:
suffix = self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX
value_size = self._value_json_size_bytes(value)
if value_size <= max_bytes:
return value
suffix_size = self._value_json_size_bytes(suffix)
if suffix_size > max_bytes:
return ""
if max_bytes <= suffix_size:
return suffix if suffix_size == max_bytes else ""

# Use a proportional estimate to avoid repeatedly serializing dozens of candidates.
budget_without_suffix = max_bytes - suffix_size
estimated_chars = int(len(value) * budget_without_suffix / max(value_size, 1))
estimated_chars = max(0, min(len(value), estimated_chars))

best = value[:estimated_chars] + suffix
best_size = self._value_json_size_bytes(best)
while best_size > max_bytes and estimated_chars > 0:
overflow_ratio = (best_size - max_bytes) / max(best_size, 1)
trim_chars = max(1, int(estimated_chars * overflow_ratio) + 1)
estimated_chars = max(0, estimated_chars - trim_chars)
best = value[:estimated_chars] + suffix
best_size = self._value_json_size_bytes(best)
return best

def _truncate_span_field_value(self, value: Any) -> Any:
max_bytes = self._OPENAI_TRACING_MAX_FIELD_BYTES
if self._value_json_size_bytes(value) <= max_bytes:
return value

if isinstance(value, str):
return self._truncate_string_for_json_limit(value, max_bytes)

preview = str(value)
if len(preview) > 512:
preview = preview[:512] + self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX
Comment on lines +264 to +266

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid materializing full preview for oversized non-string fields

When span_data.input/output is an oversized non-string value, this path computes preview = str(value) before truncating it, which forces Python to stringify the entire object first. For very large dict/list payloads (the exact case this sanitization is meant to handle), that can allocate a huge temporary string and add significant CPU/memory pressure, potentially stalling or crashing export before the truncated payload is sent.

Useful? React with 👍 / 👎.

return {
"truncated": True,
"original_type": type(value).__name__,
"preview": preview,
}

def close(self):
"""Close the underlying HTTP client."""
self._client.close()
Expand Down
155 changes: 155 additions & 0 deletions tests/test_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,108 @@ def export(self):
exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_truncates_large_input_for_openai_tracing(mock_client):
class DummyItem:
tracing_api_key = None

def __init__(self):
self.exported_payload: dict[str, Any] = {
"object": "trace.span",
"span_data": {
"type": "generation",
"input": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000),
},
}

def export(self):
return self.exported_payload

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key")
item = DummyItem()
exporter.export([cast(Any, item)])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
sent_input = sent_payload["span_data"]["input"]
assert isinstance(sent_input, str)
assert sent_input.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX)
assert exporter._value_json_size_bytes(sent_input) <= exporter._OPENAI_TRACING_MAX_FIELD_BYTES
assert item.exported_payload["span_data"]["input"] != sent_input
exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_truncates_large_non_string_input_for_openai_tracing(mock_client):
class DummyItem:
tracing_api_key = None

def __init__(self):
self.exported_payload: dict[str, Any] = {
"object": "trace.span",
"span_data": {
"type": "generation",
"input": {
"blob": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000)
},
},
}

def export(self):
return self.exported_payload

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key")
exporter.export([cast(Any, DummyItem())])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
sent_input = sent_payload["span_data"]["input"]
assert isinstance(sent_input, dict)
assert sent_input["truncated"] is True
assert sent_input["original_type"] == "dict"
assert exporter._value_json_size_bytes(sent_input) <= exporter._OPENAI_TRACING_MAX_FIELD_BYTES
exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_keeps_large_input_for_custom_endpoint(mock_client):
class DummyItem:
tracing_api_key = None

def __init__(self):
self.exported_payload: dict[str, Any] = {
"object": "trace.span",
"span_data": {
"type": "generation",
"input": "x" * (BackendSpanExporter._OPENAI_TRACING_MAX_FIELD_BYTES + 5000),
},
}

def export(self):
return self.exported_payload

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(
api_key="test_key",
endpoint="https://example.com/v1/traces/ingest",
)
item = DummyItem()
exporter.export([cast(Any, item)])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
assert sent_payload["span_data"]["input"] == item.exported_payload["span_data"]["input"]
exporter.close()


def test_sanitize_for_openai_tracing_api_keeps_allowed_generation_usage():
exporter = BackendSpanExporter(api_key="test_key")
payload = {
Expand Down Expand Up @@ -421,3 +523,56 @@ def test_sanitize_for_openai_tracing_api_skips_non_dict_generation_usage():
}
assert exporter._sanitize_for_openai_tracing_api(payload) is payload
exporter.close()


def test_sanitize_for_openai_tracing_api_keeps_small_input_without_mutation():
exporter = BackendSpanExporter(api_key="test_key")
payload = {
"object": "trace.span",
"span_data": {
"type": "generation",
"input": "short input",
"usage": {"input_tokens": 1},
},
}
assert exporter._sanitize_for_openai_tracing_api(payload) is payload
exporter.close()


def test_truncate_string_for_json_limit_returns_original_when_within_limit():
exporter = BackendSpanExporter(api_key="test_key")
value = "hello"
max_bytes = exporter._value_json_size_bytes(value)
assert exporter._truncate_string_for_json_limit(value, max_bytes) == value
exporter.close()


def test_truncate_string_for_json_limit_returns_empty_when_suffix_too_large():
exporter = BackendSpanExporter(api_key="test_key")
max_bytes = (
exporter._value_json_size_bytes(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX) - 1
)
assert exporter._truncate_string_for_json_limit("x" * 100, max_bytes) == ""
exporter.close()


def test_truncate_string_for_json_limit_returns_suffix_when_limit_equals_suffix():
exporter = BackendSpanExporter(api_key="test_key")
max_bytes = exporter._value_json_size_bytes(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX)
assert (
exporter._truncate_string_for_json_limit("x" * 100, max_bytes)
== exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX
)
exporter.close()


def test_truncate_string_for_json_limit_handles_escape_heavy_input():
exporter = BackendSpanExporter(api_key="test_key")
value = ('\\"' * 40000) + "tail"
max_bytes = exporter._OPENAI_TRACING_MAX_FIELD_BYTES

truncated = exporter._truncate_string_for_json_limit(value, max_bytes)

assert truncated.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX)
assert exporter._value_json_size_bytes(truncated) <= max_bytes
exporter.close()