Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/arch_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,18 @@ properties:
type: integer
trace_arch_internal:
type: boolean
span_attributes:
type: object
properties:
header_prefixes:
type: array
items:
type: string
static:
type: object
additionalProperties:
type: string
additionalProperties: false
additionalProperties: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need additionalProperties: false here

mode:
type: string
Expand Down
114 changes: 76 additions & 38 deletions crates/brightstaff/src/handlers/agent_chat_completions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime};

use bytes::Bytes;
use common::configuration::SpanAttributes;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
use hermesllm::apis::OpenAIMessage;
Expand All @@ -18,7 +20,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{http, operation_component, OperationNameBuilder};
use crate::tracing::{
extract_custom_trace_attributes, http, operation_component, OperationNameBuilder,
};

/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
Expand All @@ -42,13 +46,15 @@ pub async fn agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
span_attributes: Arc<Option<SpanAttributes>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(
request,
orchestrator_service,
agents_list,
listeners,
trace_collector,
span_attributes,
)
.await
{
Expand Down Expand Up @@ -127,6 +133,7 @@ async fn handle_agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
span_attributes: Arc<Option<SpanAttributes>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
Expand Down Expand Up @@ -176,6 +183,26 @@ async fn handle_agent_chat(

headers
};
let mut header_prefixes: Option<&[String]> = None;
let mut static_attributes: Option<&HashMap<String, String>> = None;
if let Some(attrs) = span_attributes.as_ref() {
header_prefixes = attrs.header_prefixes.as_deref();
static_attributes = attrs.static_attributes.as_ref();
}
let mut custom_attrs = HashMap::new();
if let Some(static_attributes) = static_attributes {
for (key, value) in static_attributes {
custom_attrs.insert(key.clone(), value.clone());
}
}
if let Some(prefixes) = header_prefixes {
if !prefixes.is_empty() {
custom_attrs.extend(extract_custom_trace_attributes(
&request_headers,
Some(prefixes),
));
}
}

let chat_request_bytes = request.collect().await?.to_bytes();

Expand Down Expand Up @@ -230,6 +257,13 @@ async fn handle_agent_chat(
(String::new(), None)
};

let apply_custom_attrs = |mut builder: SpanBuilder| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you want a lambda function in the file. Feels a bit misplaced. I think we should ask ourselves, whats the canonical way to append attributes after a span has been built. A simple for loop feels okay. Although I might move some of that functionality in commons

for (key, value) in &custom_attrs {
builder = builder.with_attribute(key, value);
}
builder
};

// Select appropriate agents using arch orchestrator llm model
let selection_span_id = generate_random_span_id();
let selection_start_time = SystemTime::now();
Expand All @@ -248,27 +282,29 @@ async fn handle_agent_chat(
.with_target(&listener.name)
.build();

let mut selection_span_builder = SpanBuilder::new(&selection_operation_name)
.with_span_id(selection_span_id)
.with_kind(SpanKind::Internal)
.with_start_time(selection_start_time)
.with_end_time(selection_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/select")
.with_attribute("selection.listener", listener.name.clone())
.with_attribute("selection.agent_count", selected_agents.len().to_string())
.with_attribute(
"selection.agents",
selected_agents
.iter()
.map(|a| a.id.as_str())
.collect::<Vec<_>>()
.join(","),
)
.with_attribute(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
);
let mut selection_span_builder = apply_custom_attrs(
SpanBuilder::new(&selection_operation_name)
.with_span_id(selection_span_id)
.with_kind(SpanKind::Internal)
.with_start_time(selection_start_time)
.with_end_time(selection_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/select")
.with_attribute("selection.listener", listener.name.clone())
.with_attribute("selection.agent_count", selected_agents.len().to_string())
.with_attribute(
"selection.agents",
selected_agents
.iter()
.map(|a| a.id.as_str())
.collect::<Vec<_>>()
.join(","),
)
.with_attribute(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
),
);

if !trace_id.is_empty() {
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
Expand Down Expand Up @@ -343,22 +379,24 @@ async fn handle_agent_chat(
.with_target(&agent_name)
.build();

let mut span_builder = SpanBuilder::new(&operation_name)
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", agent_name.clone())
.with_attribute(
"agent.sequence",
format!("{}/{}", agent_index + 1, agent_count),
)
.with_attribute(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
);
let mut span_builder = apply_custom_attrs(
SpanBuilder::new(&operation_name)
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", agent_name.clone())
.with_attribute(
"agent.sequence",
format!("{}/{}", agent_index + 1, agent_count),
)
.with_attribute(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
),
);

if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id.clone());
Expand Down
33 changes: 31 additions & 2 deletions crates/brightstaff/src/handlers/llm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use common::configuration::ModelAlias;
use common::configuration::{ModelAlias, SpanAttributes};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
Expand All @@ -26,25 +26,47 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::tracing::operation_component;
use crate::tracing::{extract_custom_trace_attributes, operation_component};

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

#[allow(clippy::too_many_arguments)]
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
full_qualified_llm_provider_url: String,
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
trace_collector: Arc<TraceCollector>,
span_attributes: Arc<Option<SpanAttributes>>,
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let request_headers = request.headers().clone();
let mut header_prefixes: Option<&[String]> = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge this into a single utility that both llm.rs and agent_chat_completions.rs work off of. Else its duplicate code and will be maintained in two places.

let mut static_attributes: Option<&HashMap<String, String>> = None;
if let Some(attrs) = span_attributes.as_ref() {
header_prefixes = attrs.header_prefixes.as_deref();
static_attributes = attrs.static_attributes.as_ref();
}
let mut custom_attrs = HashMap::new();
if let Some(static_attributes) = static_attributes {
for (key, value) in static_attributes {
custom_attrs.insert(key.clone(), value.clone());
}
}
if let Some(prefixes) = header_prefixes {
if !prefixes.is_empty() {
custom_attrs.extend(extract_custom_trace_attributes(
&request_headers,
Some(prefixes),
));
}
}
let request_id: String = match request_headers
.get(REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok())
Expand Down Expand Up @@ -253,6 +275,7 @@ pub async fn llm_chat(
&traceparent,
&request_path,
&request_id,
&custom_attrs,
)
.await
{
Expand Down Expand Up @@ -337,6 +360,7 @@ pub async fn llm_chat(
user_message_preview,
temperature,
&llm_providers,
&custom_attrs,
)
.await;

Expand Down Expand Up @@ -423,6 +447,7 @@ async fn build_llm_span(
user_message_preview: Option<String>,
temperature: Option<f32>,
llm_providers: &Arc<RwLock<LlmProviders>>,
custom_attrs: &HashMap<String, String>,
) -> common::traces::Span {
use crate::tracing::{http, llm, OperationNameBuilder};
use common::traces::{parse_traceparent, SpanBuilder, SpanKind};
Expand Down Expand Up @@ -488,6 +513,10 @@ async fn build_llm_span(
span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview);
}

for (key, value) in custom_attrs {
span_builder = span_builder.with_attribute(key, value);
}

span_builder.build()
}

Expand Down
10 changes: 10 additions & 0 deletions crates/brightstaff/src/handlers/router_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn router_chat_get_upstream_model(
traceparent: &str,
request_path: &str,
request_id: &str,
custom_attrs: &HashMap<String, String>,
) -> Result<RoutingResult, RoutingError> {
// Clone metadata for routing before converting (which consumes client_request)
let routing_metadata = client_request.metadata().clone();
Expand Down Expand Up @@ -139,6 +140,9 @@ pub async fn router_chat_get_upstream_model(
// Record successful routing span
let mut attrs: HashMap<String, String> = HashMap::new();
attrs.insert("route.selected_model".to_string(), model_name.clone());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand All @@ -160,6 +164,9 @@ pub async fn router_chat_get_upstream_model(

let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "none".to_string());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand All @@ -179,6 +186,9 @@ pub async fn router_chat_get_upstream_model(
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "unknown".to_string());
attrs.insert("error.message".to_string(), err.to_string());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
Expand Down
10 changes: 10 additions & 0 deletions crates/brightstaff/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
));

let model_aliases = Arc::new(arch_config.model_aliases.clone());
let span_attributes = Arc::new(
arch_config
.tracing
.as_ref()
.and_then(|tracing| tracing.span_attributes.clone()),
);

// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in arch_config.yaml
Expand Down Expand Up @@ -176,6 +182,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
Expand All @@ -187,6 +194,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();

async move {
Expand All @@ -207,6 +215,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
agents_list,
listeners,
trace_collector,
span_attributes,
)
.with_context(parent_cx)
.await;
Expand All @@ -225,6 +234,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
model_aliases,
llm_providers,
trace_collector,
span_attributes,
state_storage,
)
.with_context(parent_cx)
Expand Down
Loading