Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{error, info, warn};

/// Safely truncate a string to at most n characters.
/// Returns the original string if it's shorter than n characters.
fn truncate_str(s: &str, n: usize) -> &str {
if s.len() > n {
&s[..n]
} else {
s
}
}

/// Wraps `OpenFangKernel` to implement `ChannelBridgeHandle`.
pub struct KernelBridgeAdapter {
kernel: Arc<OpenFangKernel>,
Expand Down Expand Up @@ -351,7 +361,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
.map(|e| e.name.clone())
.unwrap_or_else(|| t.agent_id.to_string());
let status = if t.enabled { "on" } else { "off" };
let id_short = &t.id.0.to_string()[..8];
let id_str = t.id.0.to_string();
let id_short = truncate_str(&id_str, 8);
msg.push_str(&format!(
" [{}] {} -> {} ({:?}) fires:{} [{}]\n",
id_short,
Expand Down Expand Up @@ -390,7 +401,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
.kernel
.triggers
.register(agent.id, pattern, prompt.to_string(), 0);
let id_short = &trigger_id.0.to_string()[..8];
let id_str = trigger_id.0.to_string();
let id_short = truncate_str(&id_str, 8);
format!("Trigger created [{id_short}] for agent '{agent_name}'.")
}

Expand All @@ -405,7 +417,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
1 => {
let t = matched[0];
if self.kernel.triggers.remove(t.id) {
format!("Trigger [{}] removed.", &t.id.0.to_string()[..8])
let id_str = t.id.0.to_string();
format!("Trigger [{}] removed.", truncate_str(&id_str, 8))
} else {
"Failed to remove trigger.".to_string()
}
Expand All @@ -428,7 +441,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
.map(|e| e.name.clone())
.unwrap_or_else(|| job.agent_id.to_string());
let status = if job.enabled { "on" } else { "off" };
let id_short = &job.id.0.to_string()[..8];
let id_str = job.id.0.to_string();
let id_short = truncate_str(&id_str, 8);
let sched = match &job.schedule {
openfang_types::scheduler::CronSchedule::Cron { expr, .. } => expr.clone(),
openfang_types::scheduler::CronSchedule::Every { every_secs } => {
Expand Down Expand Up @@ -488,7 +502,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {

match self.kernel.cron_scheduler.add_job(job, false) {
Ok(id) => {
let id_short = &id.0.to_string()[..8];
let id_str = id.0.to_string();
let id_short = truncate_str(&id_str, 8);
format!("Job [{id_short}] created: '{cron_expr}' -> {agent_name}: \"{message}\"")
}
Err(e) => format!("Failed to create job: {e}"),
Expand All @@ -510,7 +525,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
let j = matched[0];
match self.kernel.cron_scheduler.remove_job(j.id) {
Ok(_) => {
format!("Job [{}] '{}' removed.", &j.id.0.to_string()[..8], j.name)
let id_str = j.id.0.to_string();
format!("Job [{}] '{}' removed.", truncate_str(&id_str, 8), j.name)
}
Err(e) => format!("Failed to remove job: {e}"),
}
Expand Down Expand Up @@ -542,7 +558,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
};
match self.kernel.send_message(j.agent_id, &message).await {
Ok(result) => {
let id_short = &j.id.0.to_string()[..8];
let id_str = j.id.0.to_string();
let id_short = truncate_str(&id_str, 8);
format!("Job [{id_short}] ran:\n{}", result.response)
}
Err(e) => format!("Failed to run job: {e}"),
Expand All @@ -562,7 +579,8 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
}
let mut msg = format!("Pending approvals ({}):\n", pending.len());
for req in &pending {
let id_short = &req.id.to_string()[..8];
let id_str = req.id.to_string();
let id_short = truncate_str(&id_str, 8);
let age_secs = (chrono::Utc::now() - req.requested_at).num_seconds();
let age = if age_secs >= 60 {
format!("{}m", age_secs / 60)
Expand Down Expand Up @@ -603,10 +621,11 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
) {
Ok(_) => {
let verb = if approve { "Approved" } else { "Rejected" };
let id_str = req.id.to_string();
format!(
"{} [{}] {} — {}",
verb,
&req.id.to_string()[..8],
truncate_str(&id_str, 8),
req.tool_name,
req.agent_id
)
Expand Down
33 changes: 26 additions & 7 deletions crates/openfang-desktop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use openfang_api::server::build_router;
use openfang_kernel::OpenFangKernel;
use std::net::{SocketAddr, TcpListener};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{error, info};
Expand All @@ -20,24 +21,40 @@ pub struct ServerHandle {
shutdown_tx: watch::Sender<bool>,
/// Join handle for the background server thread.
server_thread: Option<std::thread::JoinHandle<()>>,
/// Track whether shutdown has already been initiated to prevent race conditions.
shutdown_initiated: Arc<AtomicBool>,
}

impl ServerHandle {
/// Signal the server to shut down and wait for the background thread.
pub fn shutdown(mut self) {
let _ = self.shutdown_tx.send(true);
if let Some(handle) = self.server_thread.take() {
let _ = handle.join();
// Only proceed if shutdown hasn't been initiated yet
if self
.shutdown_initiated
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
let _ = self.shutdown_tx.send(true);
if let Some(handle) = self.server_thread.take() {
let _ = handle.join();
}
self.kernel.shutdown();
info!("OpenFang embedded server stopped");
}
self.kernel.shutdown();
info!("OpenFang embedded server stopped");
}
}

impl Drop for ServerHandle {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
// Best-effort: don't block in drop, the thread will exit on its own.
// Only send shutdown signal if it hasn't been initiated yet
if self
.shutdown_initiated
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
let _ = self.shutdown_tx.send(true);
// Best-effort: don't block in drop, the thread will exit on its own.
}
}
}

Expand All @@ -61,6 +78,7 @@ pub fn start_server() -> Result<ServerHandle, Box<dyn std::error::Error>> {

let (shutdown_tx, shutdown_rx) = watch::channel(false);
let kernel_clone = kernel.clone();
let shutdown_initiated = Arc::new(AtomicBool::new(false));

let server_thread = std::thread::Builder::new()
.name("openfang-server".into())
Expand All @@ -83,6 +101,7 @@ pub fn start_server() -> Result<ServerHandle, Box<dyn std::error::Error>> {
kernel,
shutdown_tx,
server_thread: Some(server_thread),
shutdown_initiated,
})
}

Expand Down
3 changes: 2 additions & 1 deletion crates/openfang-runtime/src/tool_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3092,7 +3092,8 @@ async fn tool_canvas_present(
let _ = tokio::fs::create_dir_all(&output_dir).await;

let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
let filename = format!("canvas_{timestamp}_{}.html", &canvas_id[..8]);
let id_short = if canvas_id.len() > 8 { &canvas_id[..8] } else { &canvas_id };
let filename = format!("canvas_{timestamp}_{}.html", id_short);
let filepath = output_dir.join(&filename);

// Write the full HTML document
Expand Down