diff --git a/src/apps/desktop/src/api/app_state.rs b/src/apps/desktop/src/api/app_state.rs index 489f90e4..8beabd7f 100644 --- a/src/apps/desktop/src/api/app_state.rs +++ b/src/apps/desktop/src/api/app_state.rs @@ -1,6 +1,7 @@ //! Application state management use bitfun_core::agentic::{agents, tools}; +use bitfun_core::agentic::side_question::SideQuestionRuntime; use bitfun_core::infrastructure::ai::{AIClient, AIClientFactory}; use bitfun_core::miniapp::{initialize_global_miniapp_manager, JsWorkerPool, MiniAppManager}; use bitfun_core::service::{ai_rules, config, filesystem, mcp, token_usage, workspace}; @@ -30,6 +31,7 @@ pub struct AppStatistics { pub struct AppState { pub ai_client: Arc>>, pub ai_client_factory: Arc, + pub side_question_runtime: Arc, pub tool_registry: Arc>>, pub workspace_service: Arc, pub workspace_identity_watch_service: Arc, @@ -58,6 +60,7 @@ impl AppState { let ai_client_factory = AIClientFactory::get_global().await.map_err(|e| { BitFunError::service(format!("Failed to get global AIClientFactory: {}", e)) })?; + let side_question_runtime = Arc::new(SideQuestionRuntime::new()); let tool_registry = { let registry = tools::registry::get_global_tool_registry(); @@ -139,6 +142,7 @@ impl AppState { let app_state = Self { ai_client, ai_client_factory, + side_question_runtime, tool_registry, workspace_service, workspace_identity_watch_service, diff --git a/src/apps/desktop/src/api/btw_api.rs b/src/apps/desktop/src/api/btw_api.rs new file mode 100644 index 00000000..62b35c2a --- /dev/null +++ b/src/apps/desktop/src/api/btw_api.rs @@ -0,0 +1,226 @@ +//! BTW (side question) API +//! +//! Desktop adapter for the core side-question service: +//! - Reads current session context (no new dialog turn, no persistence writes) +//! - Streams answer via `btw://...` events +//! - Supports cancellation by request id + +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tauri::{AppHandle, Emitter, State}; + +use crate::api::app_state::AppState; + +use bitfun_core::agentic::coordination::ConversationCoordinator; +use bitfun_core::agentic::side_question::{ + SideQuestionService, SideQuestionStreamEvent, SideQuestionStreamRequest, +}; + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwAskRequest { + pub session_id: String, + pub question: String, + /// Optional model id override. Supports "fast"/"primary" aliases. + pub model_id: Option, + /// Limit how many context messages are included (from the end). + pub max_context_messages: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwAskResponse { + pub answer: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwAskStreamRequest { + pub request_id: String, + pub session_id: String, + pub question: String, + /// Optional model id override. Supports "fast"/"primary" aliases. + pub model_id: Option, + /// Limit how many context messages are included (from the end). + pub max_context_messages: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwAskStreamResponse { + pub ok: bool, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwCancelRequest { + pub request_id: String, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwTextChunkEvent { + pub request_id: String, + pub session_id: String, + pub text: String, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwCompletedEvent { + pub request_id: String, + pub session_id: String, + pub full_text: String, + pub finish_reason: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BtwErrorEvent { + pub request_id: String, + pub session_id: String, + pub error: String, +} + +fn side_question_service( + state: &AppState, + coordinator: Arc, +) -> SideQuestionService { + SideQuestionService::new( + coordinator, + state.ai_client_factory.clone(), + state.side_question_runtime.clone(), + ) +} + +#[tauri::command] +pub async fn btw_cancel( + state: State<'_, AppState>, + coordinator: State<'_, Arc>, + request: BtwCancelRequest, +) -> Result<(), String> { + if request.request_id.trim().is_empty() { + return Err("requestId is required".to_string()); + } + + let svc = side_question_service(&state, coordinator.inner().clone()); + svc.cancel(&request.request_id).await; + Ok(()) +} + +#[tauri::command] +pub async fn btw_ask_stream( + app: AppHandle, + state: State<'_, AppState>, + coordinator: State<'_, Arc>, + request: BtwAskStreamRequest, +) -> Result { + if request.request_id.trim().is_empty() { + return Err("requestId is required".to_string()); + } + if request.session_id.trim().is_empty() { + return Err("sessionId is required".to_string()); + } + if request.question.trim().is_empty() { + return Err("question is required".to_string()); + } + + let svc = side_question_service(&state, coordinator.inner().clone()); + + let rx = svc + .start_stream(SideQuestionStreamRequest { + request_id: request.request_id.clone(), + session_id: request.session_id.clone(), + question: request.question.clone(), + model_id: request.model_id.clone(), + max_context_messages: request.max_context_messages, + }) + .await + .map_err(|e| e.to_string())?; + + let app_handle = app.clone(); + tokio::spawn(async move { + let mut rx = rx; + while let Some(evt) = rx.recv().await { + match evt { + SideQuestionStreamEvent::TextChunk { + request_id, + session_id, + text, + } => { + let payload = BtwTextChunkEvent { + request_id, + session_id, + text, + }; + if let Err(e) = app_handle.emit("btw://text-chunk", payload) { + warn!("Failed to emit btw text chunk: {}", e); + } + } + SideQuestionStreamEvent::Completed { + request_id, + session_id, + full_text, + finish_reason, + } => { + let payload = BtwCompletedEvent { + request_id, + session_id, + full_text, + finish_reason, + }; + if let Err(e) = app_handle.emit("btw://completed", payload) { + warn!("Failed to emit btw completed: {}", e); + } + } + SideQuestionStreamEvent::Error { + request_id, + session_id, + error: err, + } => { + let payload = BtwErrorEvent { + request_id, + session_id, + error: err, + }; + if let Err(e) = app_handle.emit("btw://error", payload) { + warn!("Failed to emit btw error: {}", e); + } + } + } + } + }); + + Ok(BtwAskStreamResponse { ok: true }) +} + +#[tauri::command] +pub async fn btw_ask( + state: State<'_, AppState>, + coordinator: State<'_, Arc>, + request: BtwAskRequest, +) -> Result { + let svc = side_question_service(&state, coordinator.inner().clone()); + + let answer = svc + .ask( + &request.session_id, + &request.question, + request.model_id.as_deref(), + request.max_context_messages, + ) + .await + .map_err(|e| { + error!("BTW ask failed: {}", e); + e.to_string() + })?; + + info!( + "BTW ask completed: session_id={}, answer_len={}", + request.session_id, + answer.len() + ); + + Ok(BtwAskResponse { answer }) +} diff --git a/src/apps/desktop/src/api/mod.rs b/src/apps/desktop/src/api/mod.rs index 2932251c..d405234b 100644 --- a/src/apps/desktop/src/api/mod.rs +++ b/src/apps/desktop/src/api/mod.rs @@ -4,6 +4,7 @@ pub mod agentic_api; pub mod ai_memory_api; pub mod ai_rules_api; pub mod app_state; +pub mod btw_api; pub mod clipboard_file_api; pub mod commands; pub mod config_api; diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 54708948..1287380b 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -306,6 +306,9 @@ pub async fn run() { api::agentic_api::cancel_tool, api::agentic_api::generate_session_title, api::agentic_api::get_available_modes, + api::btw_api::btw_ask, + api::btw_api::btw_ask_stream, + api::btw_api::btw_cancel, api::image_analysis_api::analyze_images, api::image_analysis_api::send_enhanced_message, api::context_upload_api::upload_image_contexts, diff --git a/src/crates/core/src/agentic/mod.rs b/src/crates/core/src/agentic/mod.rs index 2827c1ca..e9513a9f 100644 --- a/src/crates/core/src/agentic/mod.rs +++ b/src/crates/core/src/agentic/mod.rs @@ -22,6 +22,9 @@ pub mod coordination; // Image analysis module pub mod image_analysis; +// Ephemeral side-question module (used by desktop /btw overlay) +pub mod side_question; + // Agents module pub mod agents; pub mod workspace; @@ -36,4 +39,5 @@ pub use execution::*; pub use image_analysis::{ImageAnalyzer, MessageEnhancer}; pub use persistence::PersistenceManager; pub use session::*; +pub use side_question::*; pub use workspace::WorkspaceBinding; diff --git a/src/crates/core/src/agentic/side_question.rs b/src/crates/core/src/agentic/side_question.rs new file mode 100644 index 00000000..449cbe15 --- /dev/null +++ b/src/crates/core/src/agentic/side_question.rs @@ -0,0 +1,379 @@ +//! Side question (ephemeral) service. +//! +//! This is the core implementation behind the desktop `/btw` feature: +//! - Uses existing session context (no new dialog turn, no persistence writes) +//! - Does not execute tools +//! - Supports streaming output and cancellation by request id + +use crate::agentic::coordination::ConversationCoordinator; +use crate::agentic::core::{Message as CoreMessage, MessageContent, MessageRole}; +use crate::infrastructure::ai::AIClientFactory; +use crate::util::errors::{BitFunError, BitFunResult}; +use crate::util::types::message::Message as AIMessage; + +use futures::StreamExt; +use log::{debug, warn}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Clone)] +pub struct SideQuestionRuntime { + tokens: Arc>>, +} + +impl SideQuestionRuntime { + pub fn new() -> Self { + Self { + tokens: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn register(&self, request_id: String) -> CancellationToken { + let token = CancellationToken::new(); + + let old = { + let mut guard = self.tokens.lock().await; + guard.insert(request_id, token.clone()) + }; + if let Some(old) = old { + old.cancel(); + } + + token + } + + pub async fn cancel(&self, request_id: &str) { + let token = { + let guard = self.tokens.lock().await; + guard.get(request_id).cloned() + }; + if let Some(token) = token { + token.cancel(); + } + } + + pub async fn remove(&self, request_id: &str) { + let mut guard = self.tokens.lock().await; + guard.remove(request_id); + } +} + +#[derive(Clone)] +pub struct SideQuestionService { + coordinator: Arc, + ai_client_factory: Arc, + runtime: Arc, +} + +impl SideQuestionService { + pub fn new( + coordinator: Arc, + ai_client_factory: Arc, + runtime: Arc, + ) -> Self { + Self { + coordinator, + ai_client_factory, + runtime, + } + } + + pub fn runtime(&self) -> &Arc { + &self.runtime + } + + fn core_message_to_transcript_line(msg: &CoreMessage) -> Option { + let role = match msg.role { + MessageRole::User => "User", + MessageRole::Assistant => "Assistant", + MessageRole::Tool => "Tool", + MessageRole::System => "System", + }; + + let content = match &msg.content { + MessageContent::Text(text) => text.trim().to_string(), + MessageContent::Multimodal { text, images } => { + let mut out = text.trim().to_string(); + if !images.is_empty() { + if !out.is_empty() { + out.push('\n'); + } + out.push_str(&format!("[{} image(s) omitted]", images.len())); + } + out + } + MessageContent::ToolResult { + tool_name, + result_for_assistant, + result, + is_error, + .. + } => { + let mut out = String::new(); + out.push_str(&format!( + "Tool result: name={}, is_error={}", + tool_name, is_error + )); + if let Some(text) = result_for_assistant + .as_ref() + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + { + out.push('\n'); + out.push_str(text); + } else if !result.is_null() { + if let Ok(json) = serde_json::to_string_pretty(result) { + out.push('\n'); + out.push_str(&json); + } + } + out + } + MessageContent::Mixed { text, .. } => text.trim().to_string(), + }; + + let content = content.trim(); + if content.is_empty() { + return None; + } + Some(format!("{}:\n{}", role, content)) + } + + fn build_user_prompt(context: &[CoreMessage], question: &str) -> String { + let mut lines: Vec = Vec::new(); + for msg in context { + if let Some(line) = Self::core_message_to_transcript_line(msg) { + lines.push(line); + } + } + + format!( + "CONTEXT (recent messages):\n\n{}\n\n---\n\nSIDE QUESTION:\n{}\n", + lines.join("\n\n"), + question.trim() + ) + } + + async fn load_context_messages( + &self, + session_id: &str, + max_context_messages: usize, + ) -> BitFunResult> { + let session_manager = self.coordinator.get_session_manager(); + let mut context_messages = session_manager.get_context_messages(session_id).await?; + + if context_messages.len() > max_context_messages { + context_messages = + context_messages.split_off(context_messages.len().saturating_sub(max_context_messages)); + } + + Ok(context_messages) + } + + fn system_prompt() -> &'static str { + "You are answering a side question about the ongoing chat.\n\ +Rules:\n\ +- Use only the information present in the provided CONTEXT.\n\ +- Do not call tools, do not browse, do not assume access to files or runtime.\n\ +- If the context is insufficient, say what is missing.\n\ +- Reply concisely, matching the question's language.\n" + } + + pub async fn ask( + &self, + session_id: &str, + question: &str, + model_id: Option<&str>, + max_context_messages: Option, + ) -> BitFunResult { + if session_id.trim().is_empty() { + return Err(BitFunError::Validation("session_id is required".to_string())); + } + if question.trim().is_empty() { + return Err(BitFunError::Validation("question is required".to_string())); + } + + let max_context_messages = max_context_messages.unwrap_or(60).clamp(10, 200); + let model_id = model_id + .map(str::trim) + .filter(|s| !s.is_empty()) + .unwrap_or("fast"); + + let context_messages = self + .load_context_messages(session_id, max_context_messages) + .await?; + + let user_prompt = Self::build_user_prompt(&context_messages, question); + + let client = self + .ai_client_factory + .get_client_resolved(model_id) + .await + .map_err(|e| BitFunError::service(format!("Failed to create AI client: {}", e)))?; + + let messages = vec![ + AIMessage::system(Self::system_prompt().to_string()), + AIMessage::user(user_prompt), + ]; + + let response = client + .send_message(messages, None) + .await + .map_err(|e| BitFunError::service(format!("AI call failed: {}", e)))?; + + Ok(response.text.trim().to_string()) + } + + pub async fn cancel(&self, request_id: &str) { + self.runtime.cancel(request_id).await + } + + pub async fn start_stream( + &self, + request: SideQuestionStreamRequest, + ) -> BitFunResult> { + if request.request_id.trim().is_empty() { + return Err(BitFunError::Validation("request_id is required".to_string())); + } + if request.session_id.trim().is_empty() { + return Err(BitFunError::Validation("session_id is required".to_string())); + } + if request.question.trim().is_empty() { + return Err(BitFunError::Validation("question is required".to_string())); + } + + let max_context_messages = request.max_context_messages.unwrap_or(60).clamp(10, 200); + let model_id = request + .model_id + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + .unwrap_or("fast") + .to_string(); + + let context_messages = self + .load_context_messages(&request.session_id, max_context_messages) + .await?; + let user_prompt = Self::build_user_prompt(&context_messages, &request.question); + + let client = self + .ai_client_factory + .get_client_resolved(&model_id) + .await + .map_err(|e| BitFunError::service(format!("Failed to create AI client: {}", e)))?; + + let messages = vec![ + AIMessage::system(Self::system_prompt().to_string()), + AIMessage::user(user_prompt), + ]; + + let cancel_token = self.runtime.register(request.request_id.clone()).await; + + let (tx, rx) = mpsc::unbounded_channel(); + let request_id = request.request_id.clone(); + let session_id = request.session_id.clone(); + let runtime = self.runtime.clone(); + + tokio::spawn(async move { + let mut full_text = String::new(); + let mut last_finish_reason: Option = None; + + let mut stream = match client.send_message_stream(messages, None).await { + Ok(resp) => resp.stream, + Err(e) => { + let _ = tx.send(SideQuestionStreamEvent::Error { + request_id, + session_id, + error: format!("AI call failed: {}", e), + }); + return; + } + }; + + while let Some(chunk_result) = stream.next().await { + if cancel_token.is_cancelled() { + debug!("Side question cancelled: request_id={}", request_id); + break; + } + + match chunk_result { + Ok(chunk) => { + if let Some(reason) = chunk.finish_reason.clone() { + last_finish_reason = Some(reason); + } + if let Some(text) = chunk.text { + if !text.is_empty() { + full_text.push_str(&text); + let _ = tx.send(SideQuestionStreamEvent::TextChunk { + request_id: request_id.clone(), + session_id: session_id.clone(), + text, + }); + } + } + } + Err(e) => { + let _ = tx.send(SideQuestionStreamEvent::Error { + request_id, + session_id, + error: format!("Stream error: {}", e), + }); + return; + } + } + } + + // Cleanup token record. + runtime.remove(&request_id).await; + + if cancel_token.is_cancelled() { + // No completion event on cancellation; caller may have already updated UI state. + return; + } + + if full_text.trim().is_empty() { + warn!("Side question stream completed with empty output: request_id={}", request_id); + } + + let _ = tx.send(SideQuestionStreamEvent::Completed { + request_id, + session_id, + full_text: full_text.trim().to_string(), + finish_reason: last_finish_reason, + }); + }); + + Ok(rx) + } +} + +#[derive(Debug, Clone)] +pub struct SideQuestionStreamRequest { + pub request_id: String, + pub session_id: String, + pub question: String, + pub model_id: Option, + pub max_context_messages: Option, +} + +#[derive(Debug, Clone)] +pub enum SideQuestionStreamEvent { + TextChunk { + request_id: String, + session_id: String, + text: String, + }, + Completed { + request_id: String, + session_id: String, + full_text: String, + finish_reason: Option, + }, + Error { + request_id: String, + session_id: String, + error: String, + }, +} diff --git a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.scss b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.scss index 80ffade8..3f87f100 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.scss +++ b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.scss @@ -90,6 +90,29 @@ outline: 1px solid var(--color-accent-500); outline-offset: -1px; } + + &.is-child { + height: 24px; + font-size: 12px; + padding-left: calc(#{$size-gap-1} + 14px); + position: relative; + + &::before { + content: ''; + position: absolute; + left: 10px; + top: 50%; + width: 10px; + height: 1px; + background: var(--border-subtle); + opacity: 0.9; + transform: translateY(-50%); + } + + .bitfun-nav-panel__inline-item-icon { + opacity: 0.55; + } + } } &__inline-item-icon { diff --git a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx index 6d3c1d22..225732a1 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/sessions/SessionsSection.tsx @@ -93,27 +93,61 @@ const SessionsSection: React.FC = ({ [flowChatState.sessions, workspacePath] ); + const { topLevelSessions, childrenByParent } = useMemo(() => { + const childMap = new Map(); + const parents: Session[] = []; + + const knownIds = new Set(sessions.map(s => s.sessionId)); + + for (const s of sessions) { + const pid = s.parentSessionId; + if (pid && typeof pid === 'string' && pid.trim() && knownIds.has(pid)) { + const list = childMap.get(pid) || []; + list.push(s); + childMap.set(pid, list); + } else { + parents.push(s); + } + } + + // Stable ordering: children follow parent, sorted by activity. + for (const [pid, list] of childMap) { + childMap.set(pid, [...list].sort((a, b) => b.lastActiveAt - a.lastActiveAt)); + } + + return { + topLevelSessions: [...parents].sort((a, b) => b.lastActiveAt - a.lastActiveAt), + childrenByParent: childMap, + }; + }, [sessions]); + const sessionDisplayLimit = useMemo(() => { if (isActiveWorkspace) { - return showAll || sessions.length <= MAX_VISIBLE_SESSIONS - ? sessions.length + return showAll || topLevelSessions.length <= MAX_VISIBLE_SESSIONS + ? topLevelSessions.length : MAX_VISIBLE_SESSIONS; } return showAll - ? Math.min(sessions.length, INACTIVE_WORKSPACE_EXPANDED_SESSIONS) - : Math.min(sessions.length, INACTIVE_WORKSPACE_COLLAPSED_SESSIONS); - }, [isActiveWorkspace, sessions.length, showAll]); + ? Math.min(topLevelSessions.length, INACTIVE_WORKSPACE_EXPANDED_SESSIONS) + : Math.min(topLevelSessions.length, INACTIVE_WORKSPACE_COLLAPSED_SESSIONS); + }, [isActiveWorkspace, topLevelSessions.length, showAll]); - const visibleSessions = useMemo( - () => sessions.slice(0, sessionDisplayLimit), - [sessionDisplayLimit, sessions] - ); + const visibleItems = useMemo(() => { + const visibleParents = topLevelSessions.slice(0, sessionDisplayLimit); + const out: Array<{ session: Session; level: 0 | 1 }> = []; + for (const p of visibleParents) { + out.push({ session: p, level: 0 }); + const children = childrenByParent.get(p.sessionId) || []; + for (const c of children) out.push({ session: c, level: 1 }); + } + return out; + }, [childrenByParent, sessionDisplayLimit, topLevelSessions]); const toggleThreshold = isActiveWorkspace ? MAX_VISIBLE_SESSIONS : INACTIVE_WORKSPACE_COLLAPSED_SESSIONS; - const hiddenCount = Math.max(0, sessions.length - toggleThreshold); + const hiddenCount = Math.max(0, topLevelSessions.length - toggleThreshold); const activeSessionId = flowChatState.activeSessionId; @@ -211,10 +245,10 @@ const SessionsSection: React.FC = ({ return (
- {sessions.length === 0 ? ( + {topLevelSessions.length === 0 ? (
{t('nav.sessions.noSessions')}
) : ( - visibleSessions.map(session => { + visibleItems.map(({ session, level }) => { const isEditing = editingSessionId === session.sessionId; const sessionModeKey = resolveSessionModeType(session); const sessionTitle = resolveSessionTitle(session); @@ -228,6 +262,7 @@ const SessionsSection: React.FC = ({
= ({ }) )} - {sessions.length > toggleThreshold && ( + {topLevelSessions.length > toggleThreshold && (