Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,11 @@ pub async fn start_channel_bridge_with_config(
if let Some(ref tg_config) = config.telegram {
if let Some(token) = read_token(&tg_config.bot_token_env, "Telegram") {
let poll_interval = Duration::from_secs(tg_config.poll_interval_secs);
let adapter = Arc::new(TelegramAdapter::new(
let adapter = Arc::new(TelegramAdapter::with_reactions(
token,
tg_config.allowed_users.clone(),
poll_interval,
tg_config.status_reactions,
));
adapters.push((adapter, tg_config.default_agent.clone()));
}
Expand Down
70 changes: 67 additions & 3 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use crate::formatter;
use crate::router::AgentRouter;
use crate::types::{ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser};
use crate::types::{
AgentPhase, ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser, LifecycleReaction,
default_phase_emoji,
};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::StreamExt;
Expand Down Expand Up @@ -653,15 +656,76 @@ async fn dispatch_message(
// Send typing indicator (best-effort)
let _ = adapter.send_typing(&message.sender).await;

// Send to agent and relay response
match handle.send_message(agent_id, &text).await {
// Inject channel context so agent can use message_react tool
let msg_id = &message.platform_message_id;
let text_with_ctx = format!(
"[channel_context: channel={} recipient={} message_id={}]\n{}",
ct_str, &message.sender.platform_id, msg_id, &text
);

// Ack reaction: πŸ‘€ immediately so user knows we received it
let _ = adapter
.send_reaction(
&message.sender,
msg_id,
&LifecycleReaction {
phase: AgentPhase::Queued,
emoji: default_phase_emoji(&AgentPhase::Queued).to_string(),
remove_previous: false,
},
)
.await;

// Send to agent; switch to "thinking" after a short delay while waiting
let agent_future = handle.send_message(agent_id, &text_with_ctx);
let thinking_delay = tokio::time::sleep(std::time::Duration::from_secs(2));

tokio::pin!(agent_future);
tokio::pin!(thinking_delay);

// Race: if the agent responds within 2s, skip the thinking reaction
let result = tokio::select! {
biased;
res = &mut agent_future => res,
_ = &mut thinking_delay => {
// Agent is still working β€” switch to "thinking" πŸ€”
let _ = adapter
.send_reaction(
&message.sender,
msg_id,
&LifecycleReaction {
phase: AgentPhase::Thinking,
emoji: default_phase_emoji(&AgentPhase::Thinking).to_string(),
remove_previous: true,
},
)
.await;
// Now await the actual response
agent_future.await
}
};

match result {
Ok(response) => {
// No hardcoded "done" reaction β€” agent can choose via message_react tool.
// Only clear thinking reaction if agent didn't react.
send_response(adapter, &message.sender, response, thread_id, output_format).await;
handle
.record_delivery(agent_id, ct_str, &message.sender.platform_id, true, None)
.await;
}
Err(e) => {
let _ = adapter
.send_reaction(
&message.sender,
msg_id,
&LifecycleReaction {
phase: AgentPhase::Error,
emoji: default_phase_emoji(&AgentPhase::Error).to_string(),
remove_previous: true,
},
)
.await;
warn!("Agent error for {agent_id}: {e}");
let err_msg = format!("Agent error: {e}");
send_response(
Expand Down
150 changes: 149 additions & 1 deletion crates/openfang-channels/src/telegram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
//! No external Telegram crate β€” just `reqwest` for full control over error handling.

use crate::types::{
split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser,
split_message, AgentPhase, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType,
ChannelUser, LifecycleReaction,
};
use async_trait::async_trait;
use futures::Stream;
Expand All @@ -30,6 +31,8 @@ pub struct TelegramAdapter {
client: reqwest::Client,
allowed_users: Vec<String>,
poll_interval: Duration,
/// Whether to show emoji reactions on messages for agent lifecycle status.
status_reactions: bool,
shutdown_tx: Arc<watch::Sender<bool>>,
shutdown_rx: watch::Receiver<bool>,
}
Expand All @@ -40,12 +43,23 @@ impl TelegramAdapter {
/// `token` is the raw bot token (read from env by the caller).
/// `allowed_users` is the list of Telegram user IDs allowed to interact (empty = allow all).
pub fn new(token: String, allowed_users: Vec<String>, poll_interval: Duration) -> Self {
Self::with_reactions(token, allowed_users, poll_interval, true)
}

/// Create a new Telegram adapter with explicit status_reactions flag.
pub fn with_reactions(
token: String,
allowed_users: Vec<String>,
poll_interval: Duration,
status_reactions: bool,
) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
Self {
token: Zeroizing::new(token),
client: reqwest::Client::new(),
allowed_users,
poll_interval,
status_reactions,
shutdown_tx: Arc::new(shutdown_tx),
shutdown_rx,
}
Expand Down Expand Up @@ -213,6 +227,41 @@ impl TelegramAdapter {
let _ = self.client.post(&url).json(&body).send().await?;
Ok(())
}

/// Call `setMessageReaction` to add an emoji reaction to a message.
/// Pass an empty `emoji` to remove all reactions.
async fn api_set_reaction(
&self,
chat_id: i64,
message_id: i64,
emoji: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if message_id <= 0 {
return Ok(());
}
let url = format!(
"https://api.telegram.org/bot{}/setMessageReaction",
self.token.as_str()
);
let reaction = if emoji.is_empty() {
serde_json::json!([])
} else {
serde_json::json!([{"type": "emoji", "emoji": emoji}])
};
let body = serde_json::json!({
"chat_id": chat_id,
"message_id": message_id,
"reaction": reaction,
});
let resp = self.client.post(&url).json(&body).send().await?;
if !resp.status().is_success() {
let body_text = resp.text().await.unwrap_or_default();
// Truncate to avoid leaking large error bodies into logs
let truncated = if body_text.len() > 200 { &body_text[..200] } else { &body_text };
warn!("Telegram setMessageReaction failed: {truncated}");
}
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -441,6 +490,28 @@ impl ChannelAdapter for TelegramAdapter {
self.api_send_typing(chat_id).await
}

async fn send_reaction(
&self,
user: &ChannelUser,
message_id: &str,
reaction: &LifecycleReaction,
) -> Result<(), Box<dyn std::error::Error>> {
if !self.status_reactions {
return Ok(());
}
let chat_id: i64 = user
.platform_id
.parse()
.map_err(|_| format!("Invalid Telegram chat_id: {}", user.platform_id))?;
let msg_id: i64 = message_id
.parse()
.map_err(|_| format!("Invalid Telegram message_id: {message_id}"))?;
// Map lifecycle emojis to Telegram-supported reaction emojis
let emoji = telegram_reaction_emoji(&reaction.phase);
self.api_set_reaction(chat_id, msg_id, emoji)
.await
}

async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
let _ = self.shutdown_tx.send(true);
Ok(())
Expand Down Expand Up @@ -599,6 +670,19 @@ pub fn calculate_backoff(current: Duration) -> Duration {
(current * 2).min(MAX_BACKOFF)
}

/// Map agent lifecycle phases to Telegram-supported reaction emojis.
/// Telegram only allows a specific set of emojis for reactions.
fn telegram_reaction_emoji(phase: &AgentPhase) -> &'static str {
match phase {
AgentPhase::Queued => "\u{1F440}", // πŸ‘€
AgentPhase::Thinking => "\u{1F914}", // πŸ€”
AgentPhase::ToolUse { .. } => "\u{1F525}", // πŸ”₯
AgentPhase::Streaming => "\u{270D}", // ✍
AgentPhase::Done => "\u{1F389}", // πŸŽ‰
AgentPhase::Error => "\u{1F631}", // 😱
}
}

/// Sanitize text for Telegram HTML parse mode.
///
/// Escapes angle brackets that are NOT part of Telegram-allowed HTML tags.
Expand Down Expand Up @@ -848,4 +932,68 @@ mod tests {
let msg = parse_telegram_update(&update, &[], "fake:token", &client).await.unwrap();
assert!(matches!(msg.content, ChannelContent::Location { .. }));
}

#[test]
fn test_telegram_reaction_emoji_mapping() {
assert_eq!(telegram_reaction_emoji(&AgentPhase::Queued), "\u{1F440}"); // πŸ‘€
assert_eq!(telegram_reaction_emoji(&AgentPhase::Thinking), "\u{1F914}"); // πŸ€”
assert_eq!(
telegram_reaction_emoji(&AgentPhase::ToolUse {
tool_name: "shell_exec".to_string()
}),
"\u{1F525}" // πŸ”₯
);
assert_eq!(telegram_reaction_emoji(&AgentPhase::Streaming), "\u{270D}"); // ✍
assert_eq!(telegram_reaction_emoji(&AgentPhase::Done), "\u{1F389}"); // πŸŽ‰
assert_eq!(telegram_reaction_emoji(&AgentPhase::Error), "\u{1F631}"); // 😱
}

#[tokio::test]
async fn test_send_reaction_disabled() {
let adapter = TelegramAdapter::with_reactions(
"fake:token".to_string(),
vec![],
Duration::from_secs(1),
false,
);
let user = ChannelUser {
platform_id: "123".to_string(),
display_name: "Test".to_string(),
openfang_user: None,
};
let reaction = LifecycleReaction {
phase: AgentPhase::Queued,
emoji: "\u{1F440}".to_string(),
remove_previous: false,
};
// Should return Ok without making any API call
let result = adapter.send_reaction(&user, "42", &reaction).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_send_reaction_invalid_message_id() {
let adapter = TelegramAdapter::with_reactions(
"fake:token".to_string(),
vec![],
Duration::from_secs(1),
true,
);
let user = ChannelUser {
platform_id: "123".to_string(),
display_name: "Test".to_string(),
openfang_user: None,
};
let reaction = LifecycleReaction {
phase: AgentPhase::Done,
emoji: "\u{1F44D}".to_string(),
remove_previous: true,
};
// Non-numeric message_id should error
let result = adapter.send_reaction(&user, "not-a-number", &reaction).await;
assert!(result.is_err());
// Empty message_id should error
let result = adapter.send_reaction(&user, "", &reaction).await;
assert!(result.is_err());
}
}
38 changes: 31 additions & 7 deletions crates/openfang-channels/tests/bridge_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,31 @@ async fn test_bridge_dispatch_text_message() {
// Give the async dispatch loop time to process
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Verify: adapter received the echo response
// Verify: adapter received the echo response (now includes channel_context prefix)
let sent = adapter_ref.get_sent();
assert_eq!(sent.len(), 1, "Expected 1 response, got {}", sent.len());
assert_eq!(sent[0].0, "user1");
assert_eq!(sent[0].1, "Echo: Hello agent!");
assert!(
sent[0].1.contains("Hello agent!"),
"Response should contain original message, got: {}",
sent[0].1
);

// Verify: handle received the message
// Verify: handle received the message with channel_context header
{
let received = handle.received.lock().unwrap();
assert_eq!(received.len(), 1);
assert_eq!(received[0].0, agent_id);
assert_eq!(received[0].1, "Hello agent!");
assert!(
received[0].1.contains("Hello agent!"),
"Handle should receive original message, got: {}",
received[0].1
);
assert!(
received[0].1.contains("[channel_context:"),
"Handle should receive channel_context header, got: {}",
received[0].1
);
}

manager.stop().await;
Expand Down Expand Up @@ -486,7 +499,10 @@ async fn test_bridge_manager_lifecycle() {
assert_eq!(sent.len(), 5, "Expected 5 responses, got {}", sent.len());

for (i, (_, text)) in sent.iter().enumerate() {
assert_eq!(*text, format!("Echo: message {i}"));
assert!(
text.contains(&format!("message {i}")),
"Response {i} should contain 'message {i}', got: {text}"
);
}

// Stop β€” should complete without hanging
Expand Down Expand Up @@ -535,11 +551,19 @@ async fn test_bridge_multiple_adapters() {

let tg_sent = tg_ref.get_sent();
assert_eq!(tg_sent.len(), 1);
assert_eq!(tg_sent[0].1, "Echo: from telegram");
assert!(
tg_sent[0].1.contains("from telegram"),
"Telegram response should contain original message, got: {}",
tg_sent[0].1
);

let dc_sent = dc_ref.get_sent();
assert_eq!(dc_sent.len(), 1);
assert_eq!(dc_sent[0].1, "Echo: from discord");
assert!(
dc_sent[0].1.contains("from discord"),
"Discord response should contain original message, got: {}",
dc_sent[0].1
);

manager.stop().await;
}
Loading