diff --git a/config/arch_config_schema.yaml b/config/arch_config_schema.yaml index 003bb9b47..553108918 100644 --- a/config/arch_config_schema.yaml +++ b/config/arch_config_schema.yaml @@ -382,6 +382,18 @@ properties: type: integer trace_arch_internal: type: boolean + span_attributes: + type: object + properties: + header_prefixes: + type: array + items: + type: string + static: + type: object + additionalProperties: + type: string + additionalProperties: false additionalProperties: false mode: type: string diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 5ced34c04..6784d0db9 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime}; use bytes::Bytes; +use common::configuration::SpanAttributes; use common::consts::TRACE_PARENT_HEADER; use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; use hermesllm::apis::OpenAIMessage; @@ -18,7 +20,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; use super::response_handler::ResponseHandler; use crate::router::plano_orchestrator::OrchestratorService; -use crate::tracing::{http, operation_component, OperationNameBuilder}; +use crate::tracing::{ + extract_custom_trace_attributes, http, operation_component, OperationNameBuilder, +}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] @@ -42,6 +46,7 @@ pub async fn agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, + span_attributes: Arc>, ) -> Result>, hyper::Error> { match handle_agent_chat( request, @@ -49,6 +54,7 @@ pub async fn agent_chat( agents_list, listeners, trace_collector, + span_attributes, ) .await { @@ -127,6 +133,7 @@ async fn handle_agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, + span_attributes: Arc>, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -176,6 +183,26 @@ async fn handle_agent_chat( headers }; + let mut header_prefixes: Option<&[String]> = None; + let mut static_attributes: Option<&HashMap> = None; + if let Some(attrs) = span_attributes.as_ref() { + header_prefixes = attrs.header_prefixes.as_deref(); + static_attributes = attrs.static_attributes.as_ref(); + } + let mut custom_attrs = HashMap::new(); + if let Some(static_attributes) = static_attributes { + for (key, value) in static_attributes { + custom_attrs.insert(key.clone(), value.clone()); + } + } + if let Some(prefixes) = header_prefixes { + if !prefixes.is_empty() { + custom_attrs.extend(extract_custom_trace_attributes( + &request_headers, + Some(prefixes), + )); + } + } let chat_request_bytes = request.collect().await?.to_bytes(); @@ -230,6 +257,13 @@ async fn handle_agent_chat( (String::new(), None) }; + let apply_custom_attrs = |mut builder: SpanBuilder| { + for (key, value) in &custom_attrs { + builder = builder.with_attribute(key, value); + } + builder + }; + // Select appropriate agents using arch orchestrator llm model let selection_span_id = generate_random_span_id(); let selection_start_time = SystemTime::now(); @@ -248,27 +282,29 @@ async fn handle_agent_chat( .with_target(&listener.name) .build(); - let mut selection_span_builder = SpanBuilder::new(&selection_operation_name) - .with_span_id(selection_span_id) - .with_kind(SpanKind::Internal) - .with_start_time(selection_start_time) - .with_end_time(selection_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, "/agents/select") - .with_attribute("selection.listener", listener.name.clone()) - .with_attribute("selection.agent_count", selected_agents.len().to_string()) - .with_attribute( - "selection.agents", - selected_agents - .iter() - .map(|a| a.id.as_str()) - .collect::>() - .join(","), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), - ); + let mut selection_span_builder = apply_custom_attrs( + SpanBuilder::new(&selection_operation_name) + .with_span_id(selection_span_id) + .with_kind(SpanKind::Internal) + .with_start_time(selection_start_time) + .with_end_time(selection_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, "/agents/select") + .with_attribute("selection.listener", listener.name.clone()) + .with_attribute("selection.agent_count", selected_agents.len().to_string()) + .with_attribute( + "selection.agents", + selected_agents + .iter() + .map(|a| a.id.as_str()) + .collect::>() + .join(","), + ) + .with_attribute( + "duration_ms", + format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), + ), + ); if !trace_id.is_empty() { selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); @@ -343,22 +379,24 @@ async fn handle_agent_chat( .with_target(&agent_name) .build(); - let mut span_builder = SpanBuilder::new(&operation_name) - .with_span_id(span_id) - .with_kind(SpanKind::Internal) - .with_start_time(agent_start_time) - .with_end_time(agent_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, full_path) - .with_attribute("agent.name", agent_name.clone()) - .with_attribute( - "agent.sequence", - format!("{}/{}", agent_index + 1, agent_count), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), - ); + let mut span_builder = apply_custom_attrs( + SpanBuilder::new(&operation_name) + .with_span_id(span_id) + .with_kind(SpanKind::Internal) + .with_start_time(agent_start_time) + .with_end_time(agent_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, full_path) + .with_attribute("agent.name", agent_name.clone()) + .with_attribute( + "agent.sequence", + format!("{}/{}", agent_index + 1, agent_count), + ) + .with_attribute( + "duration_ms", + format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), + ), + ); if !trace_id.is_empty() { span_builder = span_builder.with_trace_id(trace_id.clone()); diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index e1fe5a93e..c86d6ded1 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use common::configuration::ModelAlias; +use common::configuration::{ModelAlias, SpanAttributes}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; @@ -26,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor; use crate::state::{ extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError, }; -use crate::tracing::operation_component; +use crate::tracing::{extract_custom_trace_attributes, operation_component}; fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()) @@ -34,6 +34,7 @@ fn full>(chunk: T) -> BoxBody { .boxed() } +#[allow(clippy::too_many_arguments)] pub async fn llm_chat( request: Request, router_service: Arc, @@ -41,10 +42,31 @@ pub async fn llm_chat( model_aliases: Arc>>, llm_providers: Arc>, trace_collector: Arc, + span_attributes: Arc>, state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); + let mut header_prefixes: Option<&[String]> = None; + let mut static_attributes: Option<&HashMap> = None; + if let Some(attrs) = span_attributes.as_ref() { + header_prefixes = attrs.header_prefixes.as_deref(); + static_attributes = attrs.static_attributes.as_ref(); + } + let mut custom_attrs = HashMap::new(); + if let Some(static_attributes) = static_attributes { + for (key, value) in static_attributes { + custom_attrs.insert(key.clone(), value.clone()); + } + } + if let Some(prefixes) = header_prefixes { + if !prefixes.is_empty() { + custom_attrs.extend(extract_custom_trace_attributes( + &request_headers, + Some(prefixes), + )); + } + } let request_id: String = match request_headers .get(REQUEST_ID_HEADER) .and_then(|h| h.to_str().ok()) @@ -253,6 +275,7 @@ pub async fn llm_chat( &traceparent, &request_path, &request_id, + &custom_attrs, ) .await { @@ -337,6 +360,7 @@ pub async fn llm_chat( user_message_preview, temperature, &llm_providers, + &custom_attrs, ) .await; @@ -423,6 +447,7 @@ async fn build_llm_span( user_message_preview: Option, temperature: Option, llm_providers: &Arc>, + custom_attrs: &HashMap, ) -> common::traces::Span { use crate::tracing::{http, llm, OperationNameBuilder}; use common::traces::{parse_traceparent, SpanBuilder, SpanKind}; @@ -488,6 +513,10 @@ async fn build_llm_span( span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); } + for (key, value) in custom_attrs { + span_builder = span_builder.with_attribute(key, value); + } + span_builder.build() } diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index c3a517e0f..29cd0f87b 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -40,6 +40,7 @@ pub async fn router_chat_get_upstream_model( traceparent: &str, request_path: &str, request_id: &str, + custom_attrs: &HashMap, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); @@ -139,6 +140,9 @@ pub async fn router_chat_get_upstream_model( // Record successful routing span let mut attrs: HashMap = HashMap::new(); attrs.insert("route.selected_model".to_string(), model_name.clone()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, @@ -160,6 +164,9 @@ pub async fn router_chat_get_upstream_model( let mut attrs = HashMap::new(); attrs.insert("route.selected_model".to_string(), "none".to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, @@ -179,6 +186,9 @@ pub async fn router_chat_get_upstream_model( let mut attrs = HashMap::new(); attrs.insert("route.selected_model".to_string(), "unknown".to_string()); attrs.insert("error.message".to_string(), err.to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index b8fa8832a..3da383177 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -112,6 +112,12 @@ async fn main() -> Result<(), Box> { )); let model_aliases = Arc::new(arch_config.model_aliases.clone()); + let span_attributes = Arc::new( + arch_config + .tracing + .as_ref() + .and_then(|tracing| tracing.span_attributes.clone()), + ); // Initialize trace collector and start background flusher // Tracing is enabled if the tracing config is present in arch_config.yaml @@ -176,6 +182,7 @@ async fn main() -> Result<(), Box> { let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); + let span_attributes = span_attributes.clone(); let state_storage = state_storage.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -187,6 +194,7 @@ async fn main() -> Result<(), Box> { let agents_list = agents_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); + let span_attributes = span_attributes.clone(); let state_storage = state_storage.clone(); async move { @@ -207,6 +215,7 @@ async fn main() -> Result<(), Box> { agents_list, listeners, trace_collector, + span_attributes, ) .with_context(parent_cx) .await; @@ -225,6 +234,7 @@ async fn main() -> Result<(), Box> { model_aliases, llm_providers, trace_collector, + span_attributes, state_storage, ) .with_context(parent_cx) diff --git a/crates/brightstaff/src/tracing/custom_attributes.rs b/crates/brightstaff/src/tracing/custom_attributes.rs new file mode 100644 index 000000000..3aa964307 --- /dev/null +++ b/crates/brightstaff/src/tracing/custom_attributes.rs @@ -0,0 +1,94 @@ +use std::collections::HashMap; + +use hyper::header::HeaderMap; + +pub fn extract_custom_trace_attributes( + headers: &HeaderMap, + span_attribute_header_prefixes: Option<&[String]>, +) -> HashMap { + let mut attributes = HashMap::new(); + let Some(span_attribute_header_prefixes) = span_attribute_header_prefixes else { + return attributes; + }; + if span_attribute_header_prefixes.is_empty() { + return attributes; + } + + for (name, value) in headers.iter() { + let header_name = name.as_str(); + let mut matched_prefix: Option<&str> = None; + for prefix in span_attribute_header_prefixes { + if header_name.starts_with(prefix) { + matched_prefix = Some(prefix.as_str()); + break; + } + } + let Some(prefix) = matched_prefix else { + continue; + }; + + let raw_value = match value.to_str().ok() { + Some(value) => value.trim(), + None => continue, + }; + + let suffix = header_name.strip_prefix(prefix).unwrap_or(""); + let suffix_key = suffix.trim_start_matches('-').replace('-', "."); + if suffix_key.is_empty() { + continue; + } + + attributes.insert(suffix_key, raw_value.to_string()); + } + + attributes +} + +#[cfg(test)] +mod tests { + use super::extract_custom_trace_attributes; + use hyper::header::{HeaderMap, HeaderValue}; + + #[test] + fn extracts_headers_by_prefix() { + let mut headers = HeaderMap::new(); + headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456")); + headers.insert("x-katanemo-user-id", HeaderValue::from_static("usr_789")); + headers.insert("x-katanemo-admin-level", HeaderValue::from_static("3")); + headers.insert("x-other-id", HeaderValue::from_static("ignored")); + + let prefixes = vec!["x-katanemo-".to_string()]; + let attrs = extract_custom_trace_attributes(&headers, Some(&prefixes)); + + assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string())); + assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string())); + assert_eq!(attrs.get("admin.level"), Some(&"3".to_string())); + assert!(!attrs.contains_key("other.id")); + } + + #[test] + fn returns_empty_when_prefixes_missing_or_empty() { + let mut headers = HeaderMap::new(); + headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456")); + + let attrs_none = extract_custom_trace_attributes(&headers, None); + assert!(attrs_none.is_empty()); + + let empty_prefixes: Vec = Vec::new(); + let attrs_empty = extract_custom_trace_attributes(&headers, Some(&empty_prefixes)); + assert!(attrs_empty.is_empty()); + } + + #[test] + fn supports_multiple_prefixes() { + let mut headers = HeaderMap::new(); + headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456")); + headers.insert("x-tenant-user-id", HeaderValue::from_static("usr_789")); + + let prefixes = vec!["x-katanemo-".to_string(), "x-tenant-".to_string()]; + let attrs = extract_custom_trace_attributes(&headers, Some(&prefixes)); + + assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string())); + assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string())); + } +} diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 09ec6f2a1..e3834e2bb 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -1,5 +1,7 @@ mod constants; +mod custom_attributes; pub use constants::{ error, http, llm, operation_component, routing, signals, OperationNameBuilder, }; +pub use custom_attributes::extract_custom_trace_attributes; diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index c600ed5dc..9ac013b0d 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -90,6 +90,14 @@ pub struct Overrides { pub struct Tracing { pub sampling_rate: Option, pub trace_arch_internal: Option, + pub span_attributes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SpanAttributes { + pub header_prefixes: Option>, + #[serde(rename = "static")] + pub static_attributes: Option>, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] diff --git a/demos/use_cases/travel_agents/config.yaml b/demos/use_cases/travel_agents/config.yaml index 2cb24d712..ce9b8f449 100644 --- a/demos/use_cases/travel_agents/config.yaml +++ b/demos/use_cases/travel_agents/config.yaml @@ -55,3 +55,6 @@ listeners: tracing: random_sampling: 100 + span_attributes: + header_prefixes: + - x-katanemo- diff --git a/demos/use_cases/travel_agents/test.rest b/demos/use_cases/travel_agents/test.rest index f3ecaf66c..7d7c5759e 100644 --- a/demos/use_cases/travel_agents/test.rest +++ b/demos/use_cases/travel_agents/test.rest @@ -3,9 +3,15 @@ ### Travel Agent Chat Completion Request POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1 Content-Type: application/json +X-Katanemo-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3 +X-Katanemo-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a +X-Katanemo-User-Id: usr_19df7e6751b846f9ba026776e3c12abe +X-Katanemo-Admin-Level: 3 +X-Katanemo-Is-Internal: true +X-Katanemo-Budget: 42.5 { - "model": "gpt-4o", + "model": "gpt-5.2", "messages": [ { "role": "user", @@ -20,7 +26,28 @@ Content-Type: application/json "content": "What is one Alaska flight that goes direct to Atlanta from Seattle?" } ], - "max_tokens": 1000, + "max_completion_tokens": 1000, + "stream": false, + "temperature": 1.0 +} + + +### Travel Agent Request (prefix mismatch - ignored) +POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1 +Content-Type: application/json +X-Other-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3 +X-Other-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a +X-Other-User-Id: usr_19df7e6751b846f9ba026776e3c12abe + +{ + "model": "gpt-5.2", + "messages": [ + { + "role": "user", + "content": "What's the weather in Seattle?" + } + ], + "max_completion_tokens": 1000, "stream": false, "temperature": 1.0 } diff --git a/docs/source/guides/observability/tracing.rst b/docs/source/guides/observability/tracing.rst index aab3b0694..b0b615703 100644 --- a/docs/source/guides/observability/tracing.rst +++ b/docs/source/guides/observability/tracing.rst @@ -142,6 +142,109 @@ In your observability platform (Jaeger, Grafana Tempo, Datadog, etc.), filter tr For complete details on all available signals, detection methods, and best practices, see the :doc:`../../concepts/signals` guide. +Custom Span Attributes +------------------------------------------- + +Plano can automatically attach **custom span attributes** derived from request headers and **static** attributes +defined in configuration. This lets you stamp +traces with identifiers like workspace, tenant, or user IDs without changing application code or adding +custom instrumentation. + +**Why This Is Useful** + +- **Tenant-aware debugging**: Filter traces by ``workspace.id`` or ``tenant.id``. +- **Customer-specific visibility**: Attribute performance or errors to a specific customer. +- **Low overhead**: No code changes in agents or clients—just headers. + +How It Works +~~~~~~~~~~~~ + +You configure one or more header prefixes. Any incoming HTTP header whose name starts with one of these +prefixes is captured as a span attribute. You can also provide static attributes that are always injected. + +- The **prefix is only for matching**, not the resulting attribute key. +- The attribute key is the header name **with the prefix removed**, then hyphens converted to dots. + +.. note:: + + Custom span attributes are attached to LLM spans when handling ``/v1/...`` requests via ``llm_chat``. For orchestrator requests to ``/agents/...``, + these attributes are added to both the orchestrator selection span and to each agent span created by ``agent_chat``. + +**Example** + +Configured prefix:: + + tracing: + span_attributes: + header_prefixes: + - x-katanemo- + +Incoming headers:: + + X-Katanemo-Workspace-Id: ws_123 + X-Katanemo-Tenant-Id: ten_456 + +Resulting span attributes:: + + workspace.id = "ws_123" + tenant.id = "ten_456" + +Configuration +~~~~~~~~~~~~~ + +Add the prefix list under ``tracing`` in your config: + +.. code-block:: yaml + + tracing: + random_sampling: 100 + span_attributes: + header_prefixes: + - x-katanemo- + static: + environment: production + service.version: "1.0.0" + +Static attributes are always injected alongside any header-derived attributes. If a header-derived +attribute key matches a static key, the header value overrides the static value. + +You can provide multiple prefixes: + +.. code-block:: yaml + + tracing: + span_attributes: + header_prefixes: + - x-katanemo- + - x-tenant- + static: + environment: production + service.version: "1.0.0" + +Notes and Examples +~~~~~~~~~~~~~~~~~~ + +- **Prefix must match exactly**: ``katanemo-`` does not match ``x-katanemo-`` headers. +- **Trailing dash is recommended**: Without it, ``x-katanemo`` would also match ``x-katanemo-foo`` and + ``x-katanemofoo``. +- **Keys are always strings**: Values are captured as string attributes. + +**Prefix mismatch example** + +Config:: + + tracing: + span_attributes: + header_prefixes: + - x-katanemo- + +Request headers:: + + X-Other-User-Id: usr_999 + +Result: no attributes are captured from ``X-Other-User-Id``. + + Benefits of Using ``Traceparent`` Headers -----------------------------------------