Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
671 changes: 323 additions & 348 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.8"
toml_edit = "0.20"
prost = "0.13"
rmp-serde = "1"

# Error handling
Expand Down Expand Up @@ -141,8 +143,11 @@ mailparse = "0.15"
tokio-test = "0.4"
tempfile = "3"

openssl = { version = "0.10", features = ["vendored"] }

[profile.release]
lto = true
codegen-units = 1
strip = true
opt-level = 3

6 changes: 6 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ pre-build = [
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt-get update && apt-get install --assume-yes libssl-dev:$CROSS_DEB_ARCH"
]

[build.env]
passthrough = [
"OPENSSL_STATIC",
"OPENSSL_NO_VENDOR",
]
6 changes: 6 additions & 0 deletions crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
}
}

fn approval_pending_rx(
&self,
) -> Option<tokio::sync::broadcast::Receiver<openfang_types::approval::ApprovalRequest>> {
Some(self.kernel.approval_manager.subscribe())
}

async fn list_approvals_text(&self) -> String {
let pending = self.kernel.approval_manager.list_pending();
if pending.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions crates/openfang-channels/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ chrono = { workspace = true }
dashmap = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
prost = { workspace = true }
reqwest = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
Expand Down
110 changes: 92 additions & 18 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ pub trait ChannelBridgeHandle: Send + Sync {
"Hand listing not available.".to_string()
}

/// Subscribe to approval pending notifications from the kernel.
///
/// Returns a broadcast receiver that fires each time an agent submits an
/// approval request. The channel bridge uses this to send a real-time
/// notification to the user so they can respond with `/approve <id>`.
/// Default implementation returns `None` (no kernel integration).
fn approval_pending_rx(
&self,
) -> Option<tokio::sync::broadcast::Receiver<openfang_types::approval::ApprovalRequest>> {
None
}

/// Authorize a channel user for an action.
///
/// Returns Ok(()) if the user is allowed, Err(reason) if denied.
Expand Down Expand Up @@ -275,20 +287,78 @@ impl BridgeManager {
let adapter_clone = adapter.clone();
let mut shutdown = self.shutdown_rx.clone();

// Track the most recent sender so the approval notifier knows who to notify.
let (last_sender_tx, last_sender_rx) =
tokio::sync::watch::channel::<Option<crate::types::ChannelUser>>(None);

// Approval notification task: listens for new pending approval requests and
// immediately sends a message to the last active user so they can /approve <id>.
if let Some(mut approval_rx) = handle.approval_pending_rx() {
let notify_adapter = adapter.clone();
let mut notify_shutdown = self.shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
result = approval_rx.recv() => {
match result {
Ok(req) => {
let user_opt = last_sender_rx.borrow().clone();
if let Some(user) = user_opt {
// Send interactive card with approve/reject buttons
let _ = notify_adapter
.send(&user, crate::types::ChannelContent::ApprovalRequest {
request_id: req.id.to_string(),
agent_id: req.agent_id,
tool_name: req.tool_name,
action_summary: req.action_summary,
})
.await;
} else {
warn!("Approval pending but no active user to notify on {}", notify_adapter.name());
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!("Approval notification receiver lagged by {n} messages");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
_ = notify_shutdown.changed() => {
if *notify_shutdown.borrow() { break; }
}
}
}
});
}

let task = tokio::spawn(async move {
let mut stream = std::pin::pin!(stream);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(message) => {
dispatch_message(
&message,
&handle,
&router,
adapter_clone.as_ref(),
&rate_limiter,
).await;
// Update the last active sender for approval notifications.
let _ = last_sender_tx.send(Some(message.sender.clone()));

// Spawn each message in its own task so that a long-running
// agent call (e.g. blocked on approval) does not prevent the
// user from sending `/approve <id>` or other commands.
let h = handle.clone();
let r = router.clone();
let ac = adapter_clone.clone();
let rl = rate_limiter.clone();
tokio::spawn(async move {
dispatch_message(
&message,
&h,
&r,
ac,
&rl,
).await;
});
}
None => {
info!("Channel adapter {} stream ended", adapter_clone.name());
Expand Down Expand Up @@ -366,7 +436,7 @@ async fn dispatch_message(
message: &ChannelMessage,
handle: &Arc<dyn ChannelBridgeHandle>,
router: &Arc<AgentRouter>,
adapter: &dyn ChannelAdapter,
adapter: Arc<dyn ChannelAdapter>,
rate_limiter: &ChannelRateLimiter,
) {
let ct_str = channel_type_str(&message.channel);
Expand Down Expand Up @@ -440,7 +510,7 @@ async fn dispatch_message(
if let Err(msg) =
rate_limiter.check(ct_str, &message.sender.platform_id, ov.rate_limit_per_user)
{
send_response(adapter, &message.sender, msg, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, msg, thread_id, output_format).await;
return;
}
}
Expand All @@ -450,7 +520,7 @@ async fn dispatch_message(
ChannelContent::Text(t) => t.clone(),
ChannelContent::Command { name, args } => {
let result = handle_command(name, args, handle, router, &message.sender).await;
send_response(adapter, &message.sender, result, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, result, thread_id, output_format).await;
return;
}
ChannelContent::Image { ref url, ref caption } => {
Expand All @@ -469,6 +539,10 @@ async fn dispatch_message(
ChannelContent::Location { lat, lon } => {
format!("[User shared location: {lat}, {lon}]")
}
ChannelContent::ApprovalRequest { .. } => {
// Approval requests are sent outbound only, never inbound from users
return;
}
};

// Check if it's a slash command embedded in text (e.g. "/agents")
Expand Down Expand Up @@ -512,7 +586,7 @@ async fn dispatch_message(
| "a2a"
) {
let result = handle_command(cmd, &args, handle, router, &message.sender).await;
send_response(adapter, &message.sender, result, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, result, thread_id, output_format).await;
return;
}
// Other slash commands pass through to the agent
Expand All @@ -528,7 +602,7 @@ async fn dispatch_message(
.await
{
send_response(
adapter,
adapter.as_ref(),
&message.sender,
format!("Access denied: {denied}"),
thread_id,
Expand Down Expand Up @@ -579,7 +653,7 @@ async fn dispatch_message(
}

let combined = responses.join("\n\n");
send_response(adapter, &message.sender, combined, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, combined, thread_id, output_format).await;
return;
}
}
Expand Down Expand Up @@ -612,7 +686,7 @@ async fn dispatch_message(
}
None => {
send_response(
adapter,
adapter.as_ref(),
&message.sender,
"No agents available. Start the dashboard at http://127.0.0.1:4200 to create one.".to_string(),
thread_id,
Expand All @@ -630,7 +704,7 @@ async fn dispatch_message(
.await
{
send_response(
adapter,
adapter.as_ref(),
&message.sender,
format!("Access denied: {denied}"),
thread_id,
Expand All @@ -643,7 +717,7 @@ async fn dispatch_message(
// Auto-reply check — if enabled, the engine decides whether to process this message.
// If auto-reply is enabled but suppressed for this message, skip agent call entirely.
if let Some(reply) = handle.check_auto_reply(agent_id, &text).await {
send_response(adapter, &message.sender, reply, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, reply, thread_id, output_format).await;
handle
.record_delivery(agent_id, ct_str, &message.sender.platform_id, true, None)
.await;
Expand All @@ -656,7 +730,7 @@ async fn dispatch_message(
// Send to agent and relay response
match handle.send_message(agent_id, &text).await {
Ok(response) => {
send_response(adapter, &message.sender, response, thread_id, output_format).await;
send_response(adapter.as_ref(), &message.sender, response, thread_id, output_format).await;
handle
.record_delivery(agent_id, ct_str, &message.sender.platform_id, true, None)
.await;
Expand All @@ -665,7 +739,7 @@ async fn dispatch_message(
warn!("Agent error for {agent_id}: {e}");
let err_msg = format!("Agent error: {e}");
send_response(
adapter,
adapter.as_ref(),
&message.sender,
err_msg.clone(),
thread_id,
Expand Down
Loading