diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index b72180670..240a8b07b 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -1026,10 +1026,11 @@ pub async fn start_channel_bridge_with_config( if let Some(ref tg_config) = config.telegram { if let Some(token) = read_token(&tg_config.bot_token_env, "Telegram") { let poll_interval = Duration::from_secs(tg_config.poll_interval_secs); - let adapter = Arc::new(TelegramAdapter::new( + let adapter = Arc::new(TelegramAdapter::with_reactions( token, tg_config.allowed_users.clone(), poll_interval, + tg_config.status_reactions, )); adapters.push((adapter, tg_config.default_agent.clone())); } diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 1daedb947..dbe91cffa 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -5,7 +5,10 @@ use crate::formatter; use crate::router::AgentRouter; -use crate::types::{ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser}; +use crate::types::{ + AgentPhase, ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser, LifecycleReaction, + default_phase_emoji, +}; use async_trait::async_trait; use dashmap::DashMap; use futures::StreamExt; @@ -653,15 +656,76 @@ async fn dispatch_message( // Send typing indicator (best-effort) let _ = adapter.send_typing(&message.sender).await; - // Send to agent and relay response - match handle.send_message(agent_id, &text).await { + // Inject channel context so agent can use message_react tool + let msg_id = &message.platform_message_id; + let text_with_ctx = format!( + "[channel_context: channel={} recipient={} message_id={}]\n{}", + ct_str, &message.sender.platform_id, msg_id, &text + ); + + // Ack reaction: 👀 immediately so user knows we received it + let _ = adapter + .send_reaction( + &message.sender, + msg_id, + &LifecycleReaction { + phase: AgentPhase::Queued, + emoji: default_phase_emoji(&AgentPhase::Queued).to_string(), + remove_previous: false, + }, + ) + .await; + + // Send to agent; switch to "thinking" after a short delay while waiting + let agent_future = handle.send_message(agent_id, &text_with_ctx); + let thinking_delay = tokio::time::sleep(std::time::Duration::from_secs(2)); + + tokio::pin!(agent_future); + tokio::pin!(thinking_delay); + + // Race: if the agent responds within 2s, skip the thinking reaction + let result = tokio::select! { + biased; + res = &mut agent_future => res, + _ = &mut thinking_delay => { + // Agent is still working — switch to "thinking" 🤔 + let _ = adapter + .send_reaction( + &message.sender, + msg_id, + &LifecycleReaction { + phase: AgentPhase::Thinking, + emoji: default_phase_emoji(&AgentPhase::Thinking).to_string(), + remove_previous: true, + }, + ) + .await; + // Now await the actual response + agent_future.await + } + }; + + match result { Ok(response) => { + // No hardcoded "done" reaction — agent can choose via message_react tool. + // Only clear thinking reaction if agent didn't react. send_response(adapter, &message.sender, response, thread_id, output_format).await; handle .record_delivery(agent_id, ct_str, &message.sender.platform_id, true, None) .await; } Err(e) => { + let _ = adapter + .send_reaction( + &message.sender, + msg_id, + &LifecycleReaction { + phase: AgentPhase::Error, + emoji: default_phase_emoji(&AgentPhase::Error).to_string(), + remove_previous: true, + }, + ) + .await; warn!("Agent error for {agent_id}: {e}"); let err_msg = format!("Agent error: {e}"); send_response( diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 1469d1af1..dad58d697 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -4,7 +4,8 @@ //! No external Telegram crate — just `reqwest` for full control over error handling. use crate::types::{ - split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, + split_message, AgentPhase, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, + ChannelUser, LifecycleReaction, }; use async_trait::async_trait; use futures::Stream; @@ -30,6 +31,8 @@ pub struct TelegramAdapter { client: reqwest::Client, allowed_users: Vec, poll_interval: Duration, + /// Whether to show emoji reactions on messages for agent lifecycle status. + status_reactions: bool, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, } @@ -40,12 +43,23 @@ impl TelegramAdapter { /// `token` is the raw bot token (read from env by the caller). /// `allowed_users` is the list of Telegram user IDs allowed to interact (empty = allow all). pub fn new(token: String, allowed_users: Vec, poll_interval: Duration) -> Self { + Self::with_reactions(token, allowed_users, poll_interval, true) + } + + /// Create a new Telegram adapter with explicit status_reactions flag. + pub fn with_reactions( + token: String, + allowed_users: Vec, + poll_interval: Duration, + status_reactions: bool, + ) -> Self { let (shutdown_tx, shutdown_rx) = watch::channel(false); Self { token: Zeroizing::new(token), client: reqwest::Client::new(), allowed_users, poll_interval, + status_reactions, shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, } @@ -213,6 +227,41 @@ impl TelegramAdapter { let _ = self.client.post(&url).json(&body).send().await?; Ok(()) } + + /// Call `setMessageReaction` to add an emoji reaction to a message. + /// Pass an empty `emoji` to remove all reactions. + async fn api_set_reaction( + &self, + chat_id: i64, + message_id: i64, + emoji: &str, + ) -> Result<(), Box> { + if message_id <= 0 { + return Ok(()); + } + let url = format!( + "https://api.telegram.org/bot{}/setMessageReaction", + self.token.as_str() + ); + let reaction = if emoji.is_empty() { + serde_json::json!([]) + } else { + serde_json::json!([{"type": "emoji", "emoji": emoji}]) + }; + let body = serde_json::json!({ + "chat_id": chat_id, + "message_id": message_id, + "reaction": reaction, + }); + let resp = self.client.post(&url).json(&body).send().await?; + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + // Truncate to avoid leaking large error bodies into logs + let truncated = if body_text.len() > 200 { &body_text[..200] } else { &body_text }; + warn!("Telegram setMessageReaction failed: {truncated}"); + } + Ok(()) + } } #[async_trait] @@ -441,6 +490,28 @@ impl ChannelAdapter for TelegramAdapter { self.api_send_typing(chat_id).await } + async fn send_reaction( + &self, + user: &ChannelUser, + message_id: &str, + reaction: &LifecycleReaction, + ) -> Result<(), Box> { + if !self.status_reactions { + return Ok(()); + } + let chat_id: i64 = user + .platform_id + .parse() + .map_err(|_| format!("Invalid Telegram chat_id: {}", user.platform_id))?; + let msg_id: i64 = message_id + .parse() + .map_err(|_| format!("Invalid Telegram message_id: {message_id}"))?; + // Map lifecycle emojis to Telegram-supported reaction emojis + let emoji = telegram_reaction_emoji(&reaction.phase); + self.api_set_reaction(chat_id, msg_id, emoji) + .await + } + async fn stop(&self) -> Result<(), Box> { let _ = self.shutdown_tx.send(true); Ok(()) @@ -599,6 +670,19 @@ pub fn calculate_backoff(current: Duration) -> Duration { (current * 2).min(MAX_BACKOFF) } +/// Map agent lifecycle phases to Telegram-supported reaction emojis. +/// Telegram only allows a specific set of emojis for reactions. +fn telegram_reaction_emoji(phase: &AgentPhase) -> &'static str { + match phase { + AgentPhase::Queued => "\u{1F440}", // 👀 + AgentPhase::Thinking => "\u{1F914}", // 🤔 + AgentPhase::ToolUse { .. } => "\u{1F525}", // 🔥 + AgentPhase::Streaming => "\u{270D}", // ✍ + AgentPhase::Done => "\u{1F389}", // 🎉 + AgentPhase::Error => "\u{1F631}", // 😱 + } +} + /// Sanitize text for Telegram HTML parse mode. /// /// Escapes angle brackets that are NOT part of Telegram-allowed HTML tags. @@ -848,4 +932,68 @@ mod tests { let msg = parse_telegram_update(&update, &[], "fake:token", &client).await.unwrap(); assert!(matches!(msg.content, ChannelContent::Location { .. })); } + + #[test] + fn test_telegram_reaction_emoji_mapping() { + assert_eq!(telegram_reaction_emoji(&AgentPhase::Queued), "\u{1F440}"); // 👀 + assert_eq!(telegram_reaction_emoji(&AgentPhase::Thinking), "\u{1F914}"); // 🤔 + assert_eq!( + telegram_reaction_emoji(&AgentPhase::ToolUse { + tool_name: "shell_exec".to_string() + }), + "\u{1F525}" // 🔥 + ); + assert_eq!(telegram_reaction_emoji(&AgentPhase::Streaming), "\u{270D}"); // ✍ + assert_eq!(telegram_reaction_emoji(&AgentPhase::Done), "\u{1F389}"); // 🎉 + assert_eq!(telegram_reaction_emoji(&AgentPhase::Error), "\u{1F631}"); // 😱 + } + + #[tokio::test] + async fn test_send_reaction_disabled() { + let adapter = TelegramAdapter::with_reactions( + "fake:token".to_string(), + vec![], + Duration::from_secs(1), + false, + ); + let user = ChannelUser { + platform_id: "123".to_string(), + display_name: "Test".to_string(), + openfang_user: None, + }; + let reaction = LifecycleReaction { + phase: AgentPhase::Queued, + emoji: "\u{1F440}".to_string(), + remove_previous: false, + }; + // Should return Ok without making any API call + let result = adapter.send_reaction(&user, "42", &reaction).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_send_reaction_invalid_message_id() { + let adapter = TelegramAdapter::with_reactions( + "fake:token".to_string(), + vec![], + Duration::from_secs(1), + true, + ); + let user = ChannelUser { + platform_id: "123".to_string(), + display_name: "Test".to_string(), + openfang_user: None, + }; + let reaction = LifecycleReaction { + phase: AgentPhase::Done, + emoji: "\u{1F44D}".to_string(), + remove_previous: true, + }; + // Non-numeric message_id should error + let result = adapter.send_reaction(&user, "not-a-number", &reaction).await; + assert!(result.is_err()); + // Empty message_id should error + let result = adapter.send_reaction(&user, "", &reaction).await; + assert!(result.is_err()); + } } diff --git a/crates/openfang-channels/tests/bridge_integration_test.rs b/crates/openfang-channels/tests/bridge_integration_test.rs index e4c647766..1952f62b2 100644 --- a/crates/openfang-channels/tests/bridge_integration_test.rs +++ b/crates/openfang-channels/tests/bridge_integration_test.rs @@ -225,18 +225,31 @@ async fn test_bridge_dispatch_text_message() { // Give the async dispatch loop time to process tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Verify: adapter received the echo response + // Verify: adapter received the echo response (now includes channel_context prefix) let sent = adapter_ref.get_sent(); assert_eq!(sent.len(), 1, "Expected 1 response, got {}", sent.len()); assert_eq!(sent[0].0, "user1"); - assert_eq!(sent[0].1, "Echo: Hello agent!"); + assert!( + sent[0].1.contains("Hello agent!"), + "Response should contain original message, got: {}", + sent[0].1 + ); - // Verify: handle received the message + // Verify: handle received the message with channel_context header { let received = handle.received.lock().unwrap(); assert_eq!(received.len(), 1); assert_eq!(received[0].0, agent_id); - assert_eq!(received[0].1, "Hello agent!"); + assert!( + received[0].1.contains("Hello agent!"), + "Handle should receive original message, got: {}", + received[0].1 + ); + assert!( + received[0].1.contains("[channel_context:"), + "Handle should receive channel_context header, got: {}", + received[0].1 + ); } manager.stop().await; @@ -486,7 +499,10 @@ async fn test_bridge_manager_lifecycle() { assert_eq!(sent.len(), 5, "Expected 5 responses, got {}", sent.len()); for (i, (_, text)) in sent.iter().enumerate() { - assert_eq!(*text, format!("Echo: message {i}")); + assert!( + text.contains(&format!("message {i}")), + "Response {i} should contain 'message {i}', got: {text}" + ); } // Stop — should complete without hanging @@ -535,11 +551,19 @@ async fn test_bridge_multiple_adapters() { let tg_sent = tg_ref.get_sent(); assert_eq!(tg_sent.len(), 1); - assert_eq!(tg_sent[0].1, "Echo: from telegram"); + assert!( + tg_sent[0].1.contains("from telegram"), + "Telegram response should contain original message, got: {}", + tg_sent[0].1 + ); let dc_sent = dc_ref.get_sent(); assert_eq!(dc_sent.len(), 1); - assert_eq!(dc_sent[0].1, "Echo: from discord"); + assert!( + dc_sent[0].1.contains("from discord"), + "Discord response should contain original message, got: {}", + dc_sent[0].1 + ); manager.stop().await; } diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index d46dbf9f4..de0e92419 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -5397,6 +5397,41 @@ impl KernelHandle for OpenFangKernel { Ok(format!("{} sent to {} via {}", media_type, recipient, channel)) } + async fn send_channel_reaction( + &self, + channel: &str, + recipient: &str, + message_id: &str, + emoji: &str, + ) -> Result { + let adapter = self + .channel_adapters + .get(channel) + .ok_or_else(|| { + format!("Channel '{}' not found for reaction", channel) + })? + .clone(); + + let user = openfang_channels::types::ChannelUser { + platform_id: recipient.to_string(), + display_name: recipient.to_string(), + openfang_user: None, + }; + + let reaction = openfang_channels::types::LifecycleReaction { + phase: openfang_channels::types::AgentPhase::Done, + emoji: emoji.to_string(), + remove_previous: true, + }; + + adapter + .send_reaction(&user, message_id, &reaction) + .await + .map_err(|e| format!("Reaction failed: {e}"))?; + + Ok(format!("Reacted with {} on {} via {}", emoji, message_id, channel)) + } + async fn spawn_agent_checked( &self, manifest_toml: &str, diff --git a/crates/openfang-runtime/src/kernel_handle.rs b/crates/openfang-runtime/src/kernel_handle.rs index 1bb2e76eb..ede72e338 100644 --- a/crates/openfang-runtime/src/kernel_handle.rs +++ b/crates/openfang-runtime/src/kernel_handle.rs @@ -209,6 +209,20 @@ pub trait KernelHandle: Send + Sync { Err("Channel media send not available".to_string()) } + /// Send an emoji reaction to a message on a channel. + /// `channel` is the adapter name (e.g. "telegram"), `recipient` is the user/chat ID, + /// `message_id` is the platform message ID, `emoji` is the emoji character. + async fn send_channel_reaction( + &self, + channel: &str, + recipient: &str, + message_id: &str, + emoji: &str, + ) -> Result { + let _ = (channel, recipient, message_id, emoji); + Err("Channel reactions not available".to_string()) + } + /// Spawn an agent with capability inheritance enforcement. /// `parent_caps` are the parent's granted capabilities. The kernel MUST verify /// that every capability in the child manifest is covered by `parent_caps`. diff --git a/crates/openfang-runtime/src/tool_runner.rs b/crates/openfang-runtime/src/tool_runner.rs index f37db523e..ecb35bd0d 100644 --- a/crates/openfang-runtime/src/tool_runner.rs +++ b/crates/openfang-runtime/src/tool_runner.rs @@ -306,6 +306,8 @@ pub async fn execute_tool( // Channel send tool (proactive outbound messaging) "channel_send" => tool_channel_send(input, kernel).await, + // Channel reaction tool (emoji reaction on incoming message) + "message_react" => tool_message_react(input, kernel).await, // Persistent process tools "process_start" => tool_process_start(input, process_manager, caller_agent_id).await, @@ -1014,6 +1016,21 @@ pub fn builtin_tool_definitions() -> Vec { "required": ["channel", "recipient"] }), }, + // --- Channel reaction tool --- + ToolDefinition { + name: "message_react".to_string(), + description: "Add an emoji reaction to the incoming message on the current channel (Telegram, Discord, etc). Use this to react to user messages with a contextually appropriate emoji. The channel, recipient, and message_id are provided in the [channel_context] header of the incoming message.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "channel": { "type": "string", "description": "Channel name (e.g. 'telegram')" }, + "recipient": { "type": "string", "description": "User/chat ID on the platform" }, + "message_id": { "type": "string", "description": "Platform message ID to react to" }, + "emoji": { "type": "string", "description": "Single emoji to react with (e.g. '❤️', '🎉', '👀'). For Telegram, must be from the supported set." } + }, + "required": ["channel", "recipient", "message_id", "emoji"] + }), + }, // --- Hand tools (curated autonomous capability packages) --- ToolDefinition { name: "hand_list".to_string(), @@ -2190,6 +2207,42 @@ async fn tool_channel_send( .await } +// --------------------------------------------------------------------------- +// Channel reaction tool +// --------------------------------------------------------------------------- + +async fn tool_message_react( + input: &serde_json::Value, + kernel: Option<&Arc>, +) -> Result { + let kh = require_kernel(kernel)?; + + let channel = input["channel"] + .as_str() + .ok_or("Missing 'channel' parameter")? + .trim() + .to_lowercase(); + let recipient = input["recipient"] + .as_str() + .ok_or("Missing 'recipient' parameter")? + .trim(); + let message_id = input["message_id"] + .as_str() + .ok_or("Missing 'message_id' parameter")? + .trim(); + let emoji = input["emoji"] + .as_str() + .ok_or("Missing 'emoji' parameter")? + .trim(); + + if emoji.is_empty() { + return Err("Emoji cannot be empty".to_string()); + } + + kh.send_channel_reaction(&channel, recipient, message_id, emoji) + .await +} + // --------------------------------------------------------------------------- // Hand tools (delegated to kernel via KernelHandle trait) // --------------------------------------------------------------------------- @@ -3121,8 +3174,8 @@ mod tests { fn test_builtin_tool_definitions() { let tools = builtin_tool_definitions(); assert!( - tools.len() >= 39, - "Expected at least 39 tools, got {}", + tools.len() >= 40, + "Expected at least 40 tools, got {}", tools.len() ); let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect(); @@ -3168,8 +3221,9 @@ mod tests { assert!(names.contains(&"cron_create")); assert!(names.contains(&"cron_list")); assert!(names.contains(&"cron_cancel")); - // 1 channel send tool + // channel tools assert!(names.contains(&"channel_send")); + assert!(names.contains(&"message_react")); // 4 hand tools assert!(names.contains(&"hand_list")); assert!(names.contains(&"hand_activate")); diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 5e54a6743..da4a06f2e 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -4,6 +4,11 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; +/// Serde helper: returns `true` for `#[serde(default = "default_true")]`. +fn default_true() -> bool { + true +} + /// Deserialize a `Vec` that tolerates both string and integer elements. /// /// When channel configs are saved from the web dashboard, numeric IDs (e.g. Discord @@ -1557,6 +1562,9 @@ pub struct TelegramConfig { pub default_agent: Option, /// Polling interval in seconds. pub poll_interval_secs: u64, + /// Show emoji reactions on messages to indicate agent lifecycle status. + #[serde(default = "default_true")] + pub status_reactions: bool, /// Per-channel behavior overrides. #[serde(default)] pub overrides: ChannelOverrides, @@ -1569,6 +1577,7 @@ impl Default for TelegramConfig { allowed_users: vec![], default_agent: None, poll_interval_secs: 1, + status_reactions: true, overrides: ChannelOverrides::default(), } }