Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,19 @@ fn channel_type_str(channel: &crate::types::ChannelType) -> &str {
}
}

/// Extract the sender's user identity from a message.
///
/// Some adapters (e.g. Slack) set `platform_id` to the channel/conversation ID
/// (needed for the send path) and store the actual user ID in metadata.
/// This helper returns the user ID for RBAC and rate limiting.
fn sender_user_id(message: &ChannelMessage) -> &str {
message
.metadata
.get("sender_user_id")
.and_then(|v| v.as_str())
.unwrap_or(&message.sender.platform_id)
}

/// Send a response, applying output formatting and optional threading.
async fn send_response(
adapter: &dyn ChannelAdapter,
Expand Down Expand Up @@ -438,7 +451,7 @@ async fn dispatch_message(
if let Some(ref ov) = overrides {
if ov.rate_limit_per_user > 0 {
if let Err(msg) =
rate_limiter.check(ct_str, &message.sender.platform_id, ov.rate_limit_per_user)
rate_limiter.check(ct_str, sender_user_id(message), ov.rate_limit_per_user)
{
send_response(adapter, &message.sender, msg, thread_id, output_format).await;
return;
Expand Down Expand Up @@ -524,7 +537,7 @@ async fn dispatch_message(
if !targets.is_empty() {
// RBAC check applies to broadcast too
if let Err(denied) = handle
.authorize_channel_user(ct_str, &message.sender.platform_id, "chat")
.authorize_channel_user(ct_str, sender_user_id(message), "chat")
.await
{
send_response(
Expand Down Expand Up @@ -626,7 +639,7 @@ async fn dispatch_message(

// RBAC: authorize the user before forwarding to agent
if let Err(denied) = handle
.authorize_channel_user(ct_str, &message.sender.platform_id, "chat")
.authorize_channel_user(ct_str, sender_user_id(message), "chat")
.await
{
send_response(
Expand Down Expand Up @@ -1124,4 +1137,49 @@ mod tests {
"irc"
);
}

#[test]
fn test_sender_user_id_from_metadata() {
let mut metadata = std::collections::HashMap::new();
metadata.insert(
"sender_user_id".to_string(),
serde_json::Value::String("U456".to_string()),
);
let msg = ChannelMessage {
channel: ChannelType::Slack,
platform_message_id: "ts".to_string(),
sender: ChannelUser {
platform_id: "C789".to_string(),
display_name: "U456".to_string(),
openfang_user: None,
},
content: ChannelContent::Text("hi".to_string()),
target_agent: None,
timestamp: chrono::Utc::now(),
is_group: true,
thread_id: None,
metadata,
};
assert_eq!(sender_user_id(&msg), "U456");
}

#[test]
fn test_sender_user_id_fallback_to_platform_id() {
let msg = ChannelMessage {
channel: ChannelType::Telegram,
platform_message_id: "123".to_string(),
sender: ChannelUser {
platform_id: "chat123".to_string(),
display_name: "Alice".to_string(),
openfang_user: None,
},
content: ChannelContent::Text("hi".to_string()),
target_agent: None,
timestamp: chrono::Utc::now(),
is_group: true,
thread_id: None,
metadata: std::collections::HashMap::new(),
};
assert_eq!(sender_user_id(&msg), "chat123");
}
}
52 changes: 47 additions & 5 deletions crates/openfang-channels/src/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,11 @@ async fn parse_slack_event(

let channel = event["channel"].as_str()?;

// Filter by allowed channels
if !allowed_channels.is_empty() && !allowed_channels.contains(&channel.to_string()) {
// Filter by allowed channels (DMs are exempt — dm_policy handles those)
if !channel.starts_with('D')
&& !allowed_channels.is_empty()
&& !allowed_channels.contains(&channel.to_string())
{
return None;
}

Expand Down Expand Up @@ -413,20 +416,30 @@ async fn parse_slack_event(
ChannelContent::Text(text.to_string())
};

// Slack channel prefixes: C=public, G=private/mpim, D=1:1 DM, W=org-wide.
// Only D channels are true DMs where dm_policy should apply.
let is_group = !channel.starts_with('D');

let mut metadata = HashMap::new();
metadata.insert(
"sender_user_id".to_string(),
serde_json::Value::String(user_id.to_string()),
);

Some(ChannelMessage {
channel: ChannelType::Slack,
platform_message_id: ts.to_string(),
sender: ChannelUser {
platform_id: channel.to_string(),
display_name: user_id.to_string(), // Slack user IDs as display name
display_name: user_id.to_string(),
openfang_user: None,
},
content,
target_agent: None,
timestamp,
is_group: true,
is_group,
thread_id: None,
metadata: HashMap::new(),
metadata,
})
}

Expand All @@ -448,6 +461,11 @@ mod tests {
let msg = parse_slack_event(&event, &bot_id, &[]).await.unwrap();
assert_eq!(msg.channel, ChannelType::Slack);
assert_eq!(msg.sender.platform_id, "C789");
assert_eq!(
msg.metadata.get("sender_user_id").and_then(|v| v.as_str()),
Some("U456")
);
assert!(msg.is_group);
assert!(matches!(msg.content, ChannelContent::Text(ref t) if t == "Hello agent!"));
}

Expand Down Expand Up @@ -559,9 +577,33 @@ mod tests {
let msg = parse_slack_event(&event, &bot_id, &[]).await.unwrap();
assert_eq!(msg.channel, ChannelType::Slack);
assert_eq!(msg.sender.platform_id, "C789");
assert_eq!(
msg.metadata.get("sender_user_id").and_then(|v| v.as_str()),
Some("U456")
);
assert!(matches!(msg.content, ChannelContent::Text(ref t) if t == "Edited message text"));
}

#[tokio::test]
async fn test_parse_slack_event_dm_detected() {
let bot_id = Arc::new(RwLock::new(Some("B123".to_string())));
let event = serde_json::json!({
"type": "message",
"user": "U456",
"channel": "D789",
"text": "Hello via DM",
"ts": "1700000000.000100"
});

let msg = parse_slack_event(&event, &bot_id, &[]).await.unwrap();
assert!(!msg.is_group);
assert_eq!(msg.sender.platform_id, "D789");
assert_eq!(
msg.metadata.get("sender_user_id").and_then(|v| v.as_str()),
Some("U456")
);
}

#[test]
fn test_slack_adapter_creation() {
let adapter = SlackAdapter::new(
Expand Down