-
Notifications
You must be signed in to change notification settings - Fork 284
add Custom Trace Attributes to extend observability #708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f61d720
40c959e
4b70658
0e95212
d015b40
21cf046
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use std::time::{Instant, SystemTime}; | ||
|
|
||
| use bytes::Bytes; | ||
| use common::configuration::SpanAttributes; | ||
| use common::consts::TRACE_PARENT_HEADER; | ||
| use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; | ||
| use hermesllm::apis::OpenAIMessage; | ||
|
|
@@ -18,7 +20,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector}; | |
| use super::pipeline_processor::{PipelineError, PipelineProcessor}; | ||
| use super::response_handler::ResponseHandler; | ||
| use crate::router::plano_orchestrator::OrchestratorService; | ||
| use crate::tracing::{http, operation_component, OperationNameBuilder}; | ||
| use crate::tracing::{ | ||
| extract_custom_trace_attributes, http, operation_component, OperationNameBuilder, | ||
| }; | ||
|
|
||
| /// Main errors for agent chat completions | ||
| #[derive(Debug, thiserror::Error)] | ||
|
|
@@ -42,13 +46,15 @@ pub async fn agent_chat( | |
| agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>, | ||
| listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>, | ||
| trace_collector: Arc<common::traces::TraceCollector>, | ||
| span_attributes: Arc<Option<SpanAttributes>>, | ||
| ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { | ||
| match handle_agent_chat( | ||
| request, | ||
| orchestrator_service, | ||
| agents_list, | ||
| listeners, | ||
| trace_collector, | ||
| span_attributes, | ||
| ) | ||
| .await | ||
| { | ||
|
|
@@ -127,6 +133,7 @@ async fn handle_agent_chat( | |
| agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>, | ||
| listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>, | ||
| trace_collector: Arc<common::traces::TraceCollector>, | ||
| span_attributes: Arc<Option<SpanAttributes>>, | ||
| ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> { | ||
| // Initialize services | ||
| let agent_selector = AgentSelector::new(orchestrator_service); | ||
|
|
@@ -176,6 +183,26 @@ async fn handle_agent_chat( | |
|
|
||
| headers | ||
| }; | ||
| let mut header_prefixes: Option<&[String]> = None; | ||
| let mut static_attributes: Option<&HashMap<String, String>> = None; | ||
| if let Some(attrs) = span_attributes.as_ref() { | ||
| header_prefixes = attrs.header_prefixes.as_deref(); | ||
| static_attributes = attrs.static_attributes.as_ref(); | ||
| } | ||
| let mut custom_attrs = HashMap::new(); | ||
| if let Some(static_attributes) = static_attributes { | ||
| for (key, value) in static_attributes { | ||
| custom_attrs.insert(key.clone(), value.clone()); | ||
| } | ||
| } | ||
| if let Some(prefixes) = header_prefixes { | ||
| if !prefixes.is_empty() { | ||
| custom_attrs.extend(extract_custom_trace_attributes( | ||
| &request_headers, | ||
| Some(prefixes), | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| let chat_request_bytes = request.collect().await?.to_bytes(); | ||
|
|
||
|
|
@@ -230,6 +257,13 @@ async fn handle_agent_chat( | |
| (String::new(), None) | ||
| }; | ||
|
|
||
| let apply_custom_attrs = |mut builder: SpanBuilder| { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you want a lambda function in the file. Feels a bit misplaced. I think we should ask ourselves, whats the canonical way to append attributes after a span has been built. A simple for loop feels okay. Although I might move some of that functionality in commons |
||
| for (key, value) in &custom_attrs { | ||
| builder = builder.with_attribute(key, value); | ||
| } | ||
| builder | ||
| }; | ||
|
|
||
| // Select appropriate agents using arch orchestrator llm model | ||
| let selection_span_id = generate_random_span_id(); | ||
| let selection_start_time = SystemTime::now(); | ||
|
|
@@ -248,27 +282,29 @@ async fn handle_agent_chat( | |
| .with_target(&listener.name) | ||
| .build(); | ||
|
|
||
| let mut selection_span_builder = SpanBuilder::new(&selection_operation_name) | ||
| .with_span_id(selection_span_id) | ||
| .with_kind(SpanKind::Internal) | ||
| .with_start_time(selection_start_time) | ||
| .with_end_time(selection_end_time) | ||
| .with_attribute(http::METHOD, "POST") | ||
| .with_attribute(http::TARGET, "/agents/select") | ||
| .with_attribute("selection.listener", listener.name.clone()) | ||
| .with_attribute("selection.agent_count", selected_agents.len().to_string()) | ||
| .with_attribute( | ||
| "selection.agents", | ||
| selected_agents | ||
| .iter() | ||
| .map(|a| a.id.as_str()) | ||
| .collect::<Vec<_>>() | ||
| .join(","), | ||
| ) | ||
| .with_attribute( | ||
| "duration_ms", | ||
| format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), | ||
| ); | ||
| let mut selection_span_builder = apply_custom_attrs( | ||
| SpanBuilder::new(&selection_operation_name) | ||
| .with_span_id(selection_span_id) | ||
| .with_kind(SpanKind::Internal) | ||
| .with_start_time(selection_start_time) | ||
| .with_end_time(selection_end_time) | ||
| .with_attribute(http::METHOD, "POST") | ||
| .with_attribute(http::TARGET, "/agents/select") | ||
| .with_attribute("selection.listener", listener.name.clone()) | ||
| .with_attribute("selection.agent_count", selected_agents.len().to_string()) | ||
| .with_attribute( | ||
| "selection.agents", | ||
| selected_agents | ||
| .iter() | ||
| .map(|a| a.id.as_str()) | ||
| .collect::<Vec<_>>() | ||
| .join(","), | ||
| ) | ||
| .with_attribute( | ||
| "duration_ms", | ||
| format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), | ||
| ), | ||
| ); | ||
|
|
||
| if !trace_id.is_empty() { | ||
| selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); | ||
|
|
@@ -343,22 +379,24 @@ async fn handle_agent_chat( | |
| .with_target(&agent_name) | ||
| .build(); | ||
|
|
||
| let mut span_builder = SpanBuilder::new(&operation_name) | ||
| .with_span_id(span_id) | ||
| .with_kind(SpanKind::Internal) | ||
| .with_start_time(agent_start_time) | ||
| .with_end_time(agent_end_time) | ||
| .with_attribute(http::METHOD, "POST") | ||
| .with_attribute(http::TARGET, full_path) | ||
| .with_attribute("agent.name", agent_name.clone()) | ||
| .with_attribute( | ||
| "agent.sequence", | ||
| format!("{}/{}", agent_index + 1, agent_count), | ||
| ) | ||
| .with_attribute( | ||
| "duration_ms", | ||
| format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), | ||
| ); | ||
| let mut span_builder = apply_custom_attrs( | ||
| SpanBuilder::new(&operation_name) | ||
| .with_span_id(span_id) | ||
| .with_kind(SpanKind::Internal) | ||
| .with_start_time(agent_start_time) | ||
| .with_end_time(agent_end_time) | ||
| .with_attribute(http::METHOD, "POST") | ||
| .with_attribute(http::TARGET, full_path) | ||
| .with_attribute("agent.name", agent_name.clone()) | ||
| .with_attribute( | ||
| "agent.sequence", | ||
| format!("{}/{}", agent_index + 1, agent_count), | ||
| ) | ||
| .with_attribute( | ||
| "duration_ms", | ||
| format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), | ||
| ), | ||
| ); | ||
|
|
||
| if !trace_id.is_empty() { | ||
| span_builder = span_builder.with_trace_id(trace_id.clone()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| use bytes::Bytes; | ||
| use common::configuration::ModelAlias; | ||
| use common::configuration::{ModelAlias, SpanAttributes}; | ||
| use common::consts::{ | ||
| ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, | ||
| }; | ||
|
|
@@ -26,25 +26,47 @@ use crate::state::response_state_processor::ResponsesStateProcessor; | |
| use crate::state::{ | ||
| extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError, | ||
| }; | ||
| use crate::tracing::operation_component; | ||
| use crate::tracing::{extract_custom_trace_attributes, operation_component}; | ||
|
|
||
| fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { | ||
| Full::new(chunk.into()) | ||
| .map_err(|never| match never {}) | ||
| .boxed() | ||
| } | ||
|
|
||
| #[allow(clippy::too_many_arguments)] | ||
| pub async fn llm_chat( | ||
| request: Request<hyper::body::Incoming>, | ||
| router_service: Arc<RouterService>, | ||
| full_qualified_llm_provider_url: String, | ||
| model_aliases: Arc<Option<HashMap<String, ModelAlias>>>, | ||
| llm_providers: Arc<RwLock<LlmProviders>>, | ||
| trace_collector: Arc<TraceCollector>, | ||
| span_attributes: Arc<Option<SpanAttributes>>, | ||
| state_storage: Option<Arc<dyn StateStorage>>, | ||
| ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { | ||
| let request_path = request.uri().path().to_string(); | ||
| let request_headers = request.headers().clone(); | ||
| let mut header_prefixes: Option<&[String]> = None; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can merge this into a single utility that both llm.rs and agent_chat_completions.rs work off of. Else its duplicate code and will be maintained in two places. |
||
| let mut static_attributes: Option<&HashMap<String, String>> = None; | ||
| if let Some(attrs) = span_attributes.as_ref() { | ||
| header_prefixes = attrs.header_prefixes.as_deref(); | ||
| static_attributes = attrs.static_attributes.as_ref(); | ||
| } | ||
| let mut custom_attrs = HashMap::new(); | ||
| if let Some(static_attributes) = static_attributes { | ||
| for (key, value) in static_attributes { | ||
| custom_attrs.insert(key.clone(), value.clone()); | ||
| } | ||
| } | ||
| if let Some(prefixes) = header_prefixes { | ||
| if !prefixes.is_empty() { | ||
| custom_attrs.extend(extract_custom_trace_attributes( | ||
| &request_headers, | ||
| Some(prefixes), | ||
| )); | ||
| } | ||
| } | ||
| let request_id: String = match request_headers | ||
| .get(REQUEST_ID_HEADER) | ||
| .and_then(|h| h.to_str().ok()) | ||
|
|
@@ -253,6 +275,7 @@ pub async fn llm_chat( | |
| &traceparent, | ||
| &request_path, | ||
| &request_id, | ||
| &custom_attrs, | ||
| ) | ||
| .await | ||
| { | ||
|
|
@@ -337,6 +360,7 @@ pub async fn llm_chat( | |
| user_message_preview, | ||
| temperature, | ||
| &llm_providers, | ||
| &custom_attrs, | ||
| ) | ||
| .await; | ||
|
|
||
|
|
@@ -423,6 +447,7 @@ async fn build_llm_span( | |
| user_message_preview: Option<String>, | ||
| temperature: Option<f32>, | ||
| llm_providers: &Arc<RwLock<LlmProviders>>, | ||
| custom_attrs: &HashMap<String, String>, | ||
| ) -> common::traces::Span { | ||
| use crate::tracing::{http, llm, OperationNameBuilder}; | ||
| use common::traces::{parse_traceparent, SpanBuilder, SpanKind}; | ||
|
|
@@ -488,6 +513,10 @@ async fn build_llm_span( | |
| span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); | ||
| } | ||
|
|
||
| for (key, value) in custom_attrs { | ||
| span_builder = span_builder.with_attribute(key, value); | ||
| } | ||
|
|
||
| span_builder.build() | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need
additionalProperties: falsehere