Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env_SAMPLE
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
BOT_TOKEN=your_token
QUARANTINE_ROLE_ID=1234567890
QUARANTINE_HONEYPOT_CHANNEL_ID=1234567890
# Spam detection configuration (optional - defaults shown)
SPAM_SIMILARITY_THRESHOLD=0.85
SPAM_CHANNEL_THRESHOLD=3
MESSAGE_HISTORY_SECONDS=3600
SPAM_MIN_MESSAGE_LENGTH=20
SPAM_MAX_MESSAGES_PER_USER=50
SPAM_MAX_CONTENT_LENGTH=200
CLEANUP_INTERVAL_SECONDS=300
239 changes: 205 additions & 34 deletions am_bot/cogs/quarantine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from difflib import SequenceMatcher

import discord
from discord.ext import commands
Expand All @@ -13,27 +17,175 @@
)
QUARANTINE_ROLE_ID = int(os.getenv("QUARANTINE_ROLE_ID", 0))

# Spam detection configuration
# Minimum similarity ratio (0.0 to 1.0) to consider messages as duplicates
SPAM_SIMILARITY_THRESHOLD = float(os.getenv("SPAM_SIMILARITY_THRESHOLD", 0.85))
# Number of similar messages across different channels to trigger quarantine
SPAM_CHANNEL_THRESHOLD = int(os.getenv("SPAM_CHANNEL_THRESHOLD", 3))
# Message history retention in seconds (default 1 hour)
MESSAGE_HISTORY_SECONDS = int(os.getenv("MESSAGE_HISTORY_SECONDS", 3600))
# Minimum message length to consider for spam detection (ignore short messages)
SPAM_MIN_MESSAGE_LENGTH = int(os.getenv("SPAM_MIN_MESSAGE_LENGTH", 20))

# Internal constants (not configurable)
_MAX_MESSAGES_PER_USER = 50
_MAX_CONTENT_LENGTH = 200
_CLEANUP_INTERVAL_SECONDS = 300


@dataclass
class MessageRecord:
"""Record of a user's message for spam detection."""

content: str # Stored lowercase for efficient comparison
channel_id: int
timestamp: datetime


class QuarantineCog(commands.Cog):
def __init__(self, bot: discord.ext.commands.Bot):
self.bot = bot
# user_id -> list of MessageRecord
self.message_history: dict[int, list[MessageRecord]] = defaultdict(
list
)
self._cleanup_task: asyncio.Task | None = None

async def _delete_honeypot_message(self, message: discord.Message) -> None:
"""Delete the message that triggered the honeypot."""
try:
await message.delete()
logger.debug(f"Deleted honeypot message from {message.author}")
except discord.errors.Forbidden:
logger.warning(
f"Could not delete honeypot message from {message.author}"
def cog_load(self) -> None:
"""Start the periodic cleanup task when cog is loaded."""
self._cleanup_task = self.bot.loop.create_task(
self._periodic_cleanup()
)

def cog_unload(self) -> None:
"""Cancel the cleanup task when cog is unloaded."""
if self._cleanup_task:
self._cleanup_task.cancel()

async def _periodic_cleanup(self) -> None:
"""Periodically clean up old messages from all users."""
await asyncio.sleep(60) # Initial delay
while True:
try:
self._cleanup_old_messages()
except Exception as e:
logger.error(f"Error in periodic cleanup: {e}")
await asyncio.sleep(_CLEANUP_INTERVAL_SECONDS)

def _cleanup_old_messages(self) -> None:
"""Clean up old messages from all users."""
cutoff = datetime.now(timezone.utc) - timedelta(
seconds=MESSAGE_HISTORY_SECONDS
)
users_to_remove = []

for user_id, messages in self.message_history.items():
self.message_history[user_id] = [
msg for msg in messages if msg.timestamp > cutoff
]
if not self.message_history[user_id]:
users_to_remove.append(user_id)

for user_id in users_to_remove:
del self.message_history[user_id]

if users_to_remove:
logger.debug(
f"Cleaned up history for {len(users_to_remove)} users"
)

def _record_message(self, message: discord.Message) -> None:
"""Record a message in the user's history."""
# Store lowercase and truncated for memory efficiency
content = message.content[:_MAX_CONTENT_LENGTH].lower()

record = MessageRecord(
content=content,
channel_id=message.channel.id,
timestamp=datetime.now(timezone.utc),
)

user_history = self.message_history[message.author.id]
user_history.append(record)

# Enforce max messages per user (remove oldest if over limit)
if len(user_history) > _MAX_MESSAGES_PER_USER:
self.message_history[message.author.id] = user_history[
-_MAX_MESSAGES_PER_USER:
]

def _get_similarity(self, text1: str, text2: str) -> float:
"""Calculate similarity ratio between two lowercase strings."""
return SequenceMatcher(None, text1, text2).ratio()

def _detect_cross_channel_spam(
self, user_id: int, new_content: str, current_channel_id: int
) -> bool:
"""
Detect if a user is spamming similar messages across channels.

Returns True if spam is detected.
"""
content = new_content.strip()
if not content:
return False

# Skip short messages to avoid false positives (e.g., "lol", "ok")
if len(content) < SPAM_MIN_MESSAGE_LENGTH:
return False

history = self.message_history.get(user_id, [])
if not history:
return False

# Lowercase once for all comparisons
content_lower = content.lower()

# Find channels where similar messages were posted
spam_channels: set[int] = set()

for record in history:
# Skip messages from the same channel
if record.channel_id == current_channel_id:
continue

# Quick length check - very different lengths can't be similar
len_ratio = (
len(content_lower) / len(record.content)
if record.content
else 0
)
except discord.errors.NotFound:
logger.debug("Honeypot message already deleted")
if len_ratio < 0.5 or len_ratio > 2.0:
continue

similarity = self._get_similarity(content_lower, record.content)
if similarity >= SPAM_SIMILARITY_THRESHOLD:
spam_channels.add(record.channel_id)
logger.debug(
f"Similar message found in channel {record.channel_id} "
f"(similarity: {similarity:.2%})"
)

# Include current channel in the count
total_channels = len(spam_channels) + 1

if total_channels >= SPAM_CHANNEL_THRESHOLD:
logger.info(
f"Cross-channel spam detected for user {user_id}: "
f"similar messages in {total_channels} channels"
)
return True

return False

async def _assign_quarantine_role(
self, member: discord.Member, guild: discord.Guild
self, member: discord.Member, guild: discord.Guild, reason: str
) -> bool:
"""Assign quarantine role to member. Returns True on success."""
if QUARANTINE_ROLE_ID == 0:
logger.warning("Quarantine role ID not configured.")
return False

quarantine_role = guild.get_role(QUARANTINE_ROLE_ID)
if quarantine_role is None:
logger.error(
Expand All @@ -42,9 +194,7 @@ async def _assign_quarantine_role(
return False

try:
await member.add_roles(
quarantine_role, reason="Triggered quarantine honeypot"
)
await member.add_roles(quarantine_role, reason=reason)
logger.info(f"Assigned quarantine role to {member} ({member.id})")
return True
except discord.errors.Forbidden:
Expand Down Expand Up @@ -101,38 +251,59 @@ async def _purge_channel(
logger.warning(f"HTTP error purging in {channel.name}: {e}")
return 0

@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.author.bot:
return

if message.channel.id != QUARANTINE_HONEYPOT_CHANNEL_ID:
return

if QUARANTINE_HONEYPOT_CHANNEL_ID == 0 or QUARANTINE_ROLE_ID == 0:
logger.warning("Quarantine IDs not configured. Skipping.")
return

async def _handle_quarantine(
self, message: discord.Message, reason: str
) -> None:
"""Handle quarantining a user: assign role and purge messages."""
member = message.author
guild = message.guild

if guild is None:
logger.warning("Message not in a guild. Skipping.")
return

logger.info(
f"Honeypot triggered by {member} ({member.id}) "
f"in channel {message.channel.name}"
f"Quarantine triggered for {member} ({member.id}): {reason}"
)

await self._delete_honeypot_message(message)
# Delete the triggering message
try:
await message.delete()
except (discord.errors.Forbidden, discord.errors.NotFound):
pass

if not await self._assign_quarantine_role(member, guild):
if not await self._assign_quarantine_role(member, guild, reason):
return

deleted_count = await self._purge_member_messages(member, guild)

# Clear their message history from memory
if member.id in self.message_history:
del self.message_history[member.id]

logger.info(
f"Quarantine complete for {member} ({member.id}). "
f"Deleted {deleted_count} messages from the last hour."
)

@commands.Cog.listener()
async def on_message(self, message: discord.Message):
# Ignore bot messages and DMs
if message.author.bot or message.guild is None:
return

# Check 1: Honeypot channel trigger
if message.channel.id == QUARANTINE_HONEYPOT_CHANNEL_ID:
if QUARANTINE_HONEYPOT_CHANNEL_ID != 0:
await self._handle_quarantine(
message, "Triggered quarantine honeypot"
)
return

# Check 2: Cross-channel spam detection
if self._detect_cross_channel_spam(
message.author.id, message.content, message.channel.id
):
await self._handle_quarantine(
message, "Cross-channel spam detected"
)
return

# Record the message for future spam detection
self._record_message(message)
29 changes: 29 additions & 0 deletions helm/am-bot/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ spec:
- name: QUARANTINE_ROLE_ID
value: {{ .Values.quarantine.roleId | quote }}
{{- end }}
{{- /* Spam detection configuration */}}
{{- if .Values.spamDetection.similarityThreshold }}
- name: SPAM_SIMILARITY_THRESHOLD
value: {{ .Values.spamDetection.similarityThreshold | quote }}
{{- end }}
{{- if .Values.spamDetection.channelThreshold }}
- name: SPAM_CHANNEL_THRESHOLD
value: {{ .Values.spamDetection.channelThreshold | quote }}
{{- end }}
{{- if .Values.spamDetection.historySeconds }}
- name: MESSAGE_HISTORY_SECONDS
value: {{ .Values.spamDetection.historySeconds | quote }}
{{- end }}
{{- if .Values.spamDetection.minMessageLength }}
- name: SPAM_MIN_MESSAGE_LENGTH
value: {{ .Values.spamDetection.minMessageLength | quote }}
{{- end }}
{{- if .Values.spamDetection.maxMessagesPerUser }}
- name: SPAM_MAX_MESSAGES_PER_USER
value: {{ .Values.spamDetection.maxMessagesPerUser | quote }}
{{- end }}
{{- if .Values.spamDetection.maxContentLength }}
- name: SPAM_MAX_CONTENT_LENGTH
value: {{ .Values.spamDetection.maxContentLength | quote }}
{{- end }}
{{- if .Values.spamDetection.cleanupInterval }}
- name: CLEANUP_INTERVAL_SECONDS
value: {{ .Values.spamDetection.cleanupInterval | quote }}
{{- end }}
{{- /* Additional environment variables */}}
{{- range $key, $value := .Values.env }}
- name: {{ $key }}
Expand Down
17 changes: 17 additions & 0 deletions helm/am-bot/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ quarantine:
# Role ID to assign to users who trigger the honeypot
roleId: ""

# Spam detection configuration
spamDetection:
# Similarity threshold (0.0-1.0) for detecting duplicate messages
similarityThreshold: "0.85"
# Number of channels with similar messages to trigger quarantine
channelThreshold: "3"
# Message history retention in seconds (default 1 hour)
historySeconds: "3600"
# Minimum message length to check for spam (shorter messages are ignored)
minMessageLength: "20"
# Maximum messages stored per user (memory protection)
maxMessagesPerUser: "50"
# Maximum content length stored per message (truncates longer messages)
maxContentLength: "200"
# Interval for periodic cleanup of all users (seconds)
cleanupInterval: "300"

# Additional environment variables
env: {}

Expand Down
Loading