From 72114152e503b5d54f9d2fbdd0b50d3bde95d7d2 Mon Sep 17 00:00:00 2001 From: Justin Slay Date: Tue, 23 Dec 2025 14:58:42 -0700 Subject: [PATCH 1/3] Add quarantine cog --- .env_SAMPLE | 2 + am_bot/bot.py | 2 + am_bot/cogs/quarantine.py | 115 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 am_bot/cogs/quarantine.py diff --git a/.env_SAMPLE b/.env_SAMPLE index 22c3bdf..91a7f54 100644 --- a/.env_SAMPLE +++ b/.env_SAMPLE @@ -1 +1,3 @@ BOT_TOKEN=your_token +QUARANTINE_ROLE_ID=1234567890 +QUARANTINE_HONEYPOT_CHANNEL_ID=1234567890 diff --git a/am_bot/bot.py b/am_bot/bot.py index bcef481..300b228 100644 --- a/am_bot/bot.py +++ b/am_bot/bot.py @@ -5,6 +5,7 @@ from .cogs.greetings import GreetingsCog from .cogs.invite_response import InviteResponseCog +from .cogs.quarantine import QuarantineCog from .cogs.responses import ResponsesCog from .cogs.role_assignment import RoleAssignmentCog from .cogs.server_stats import ServerStatsCog @@ -37,6 +38,7 @@ async def on_message(self, message): async def add_cogs(self): await self.add_cog(GreetingsCog()) await self.add_cog(InviteResponseCog(self)) + await self.add_cog(QuarantineCog(self)) await self.add_cog(ResponsesCog(self)) await self.add_cog(RoleAssignmentCog(self)) await self.add_cog(ServerStatsCog(self)) diff --git a/am_bot/cogs/quarantine.py b/am_bot/cogs/quarantine.py new file mode 100644 index 0000000..0beb574 --- /dev/null +++ b/am_bot/cogs/quarantine.py @@ -0,0 +1,115 @@ +import logging +import os +from datetime import datetime, timedelta, timezone + +import discord +from discord.ext import commands + + +logger = logging.getLogger(__name__) + +QUARANTINE_HONEYPOT_CHANNEL_ID = int(os.getenv("QUARANTINE_HONEYPOT_CHANNEL_ID", 0)) +QUARANTINE_ROLE_ID = int(os.getenv("QUARANTINE_ROLE_ID", 0)) + + +class QuarantineCog(commands.Cog): + def __init__(self, bot: discord.ext.commands.Bot): + self.bot = bot + + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + # Ignore bot messages + if message.author.bot: + return + + # Only act on messages in the honeypot channel + if message.channel.id != QUARANTINE_HONEYPOT_CHANNEL_ID: + return + + if QUARANTINE_HONEYPOT_CHANNEL_ID == 0 or QUARANTINE_ROLE_ID == 0: + logger.warning( + "Quarantine honeypot or role ID not configured. Skipping." + ) + return + + member = message.author + guild = message.guild + + if guild is None: + logger.warning("Message not in a guild. Skipping.") + return + + logger.info( + f"Honeypot triggered by {member} ({member.id}) " + f"in channel {message.channel.name}" + ) + + # Delete the honeypot message first + try: + await message.delete() + logger.debug(f"Deleted honeypot message from {member}") + except discord.errors.Forbidden: + logger.warning(f"Could not delete honeypot message from {member}") + except discord.errors.NotFound: + logger.debug("Honeypot message already deleted") + + # Assign the quarantine role + quarantine_role = guild.get_role(QUARANTINE_ROLE_ID) + if quarantine_role is None: + logger.error( + f"Quarantine role {QUARANTINE_ROLE_ID} not found in guild." + ) + return + + try: + await member.add_roles( + quarantine_role, reason="Triggered quarantine honeypot" + ) + logger.info(f"Assigned quarantine role to {member} ({member.id})") + except discord.errors.Forbidden: + logger.error( + f"Bot lacks permission to assign quarantine role to {member}" + ) + return + + # Delete messages from the last hour across all text channels + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + deleted_count = 0 + + for channel in guild.text_channels: + try: + # Check if bot has permission to read and manage messages + permissions = channel.permissions_for(guild.me) + if not permissions.read_messages or not permissions.manage_messages: + logger.debug( + f"Skipping channel {channel.name} - insufficient permissions" + ) + continue + + # Use purge with a check function for efficiency + deleted = await channel.purge( + limit=None, + check=lambda m: m.author.id == member.id, + after=one_hour_ago, + reason=f"Quarantine purge for {member}", + ) + deleted_count += len(deleted) + if deleted: + logger.debug( + f"Deleted {len(deleted)} messages from {channel.name}" + ) + + except discord.errors.Forbidden: + logger.debug( + f"Cannot purge messages in {channel.name} - forbidden" + ) + except discord.errors.HTTPException as e: + logger.warning( + f"HTTP error purging messages in {channel.name}: {e}" + ) + + logger.info( + f"Quarantine complete for {member} ({member.id}). " + f"Deleted {deleted_count} messages from the last hour." + ) + From 7f51a662211f9fd3cda491e88bb94a0aae5fcb52 Mon Sep 17 00:00:00 2001 From: Justin Slay Date: Tue, 23 Dec 2025 15:06:38 -0700 Subject: [PATCH 2/3] Linting --- am_bot/cogs/quarantine.py | 151 ++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 64 deletions(-) diff --git a/am_bot/cogs/quarantine.py b/am_bot/cogs/quarantine.py index 0beb574..fd80c85 100644 --- a/am_bot/cogs/quarantine.py +++ b/am_bot/cogs/quarantine.py @@ -8,7 +8,9 @@ logger = logging.getLogger(__name__) -QUARANTINE_HONEYPOT_CHANNEL_ID = int(os.getenv("QUARANTINE_HONEYPOT_CHANNEL_ID", 0)) +QUARANTINE_HONEYPOT_CHANNEL_ID = int( + os.getenv("QUARANTINE_HONEYPOT_CHANNEL_ID", 0) +) QUARANTINE_ROLE_ID = int(os.getenv("QUARANTINE_ROLE_ID", 0)) @@ -16,100 +18,121 @@ class QuarantineCog(commands.Cog): def __init__(self, bot: discord.ext.commands.Bot): self.bot = bot - @commands.Cog.listener() - async def on_message(self, message: discord.Message): - # Ignore bot messages - if message.author.bot: - return - - # Only act on messages in the honeypot channel - if message.channel.id != QUARANTINE_HONEYPOT_CHANNEL_ID: - return - - if QUARANTINE_HONEYPOT_CHANNEL_ID == 0 or QUARANTINE_ROLE_ID == 0: - logger.warning( - "Quarantine honeypot or role ID not configured. Skipping." - ) - return - - member = message.author - guild = message.guild - - if guild is None: - logger.warning("Message not in a guild. Skipping.") - return - - logger.info( - f"Honeypot triggered by {member} ({member.id}) " - f"in channel {message.channel.name}" - ) - - # Delete the honeypot message first + async def _delete_honeypot_message(self, message: discord.Message) -> None: + """Delete the message that triggered the honeypot.""" try: await message.delete() - logger.debug(f"Deleted honeypot message from {member}") + logger.debug(f"Deleted honeypot message from {message.author}") except discord.errors.Forbidden: - logger.warning(f"Could not delete honeypot message from {member}") + logger.warning( + f"Could not delete honeypot message from {message.author}" + ) except discord.errors.NotFound: logger.debug("Honeypot message already deleted") - # Assign the quarantine role + async def _assign_quarantine_role( + self, member: discord.Member, guild: discord.Guild + ) -> bool: + """Assign quarantine role to member. Returns True on success.""" quarantine_role = guild.get_role(QUARANTINE_ROLE_ID) if quarantine_role is None: logger.error( f"Quarantine role {QUARANTINE_ROLE_ID} not found in guild." ) - return + return False try: await member.add_roles( quarantine_role, reason="Triggered quarantine honeypot" ) logger.info(f"Assigned quarantine role to {member} ({member.id})") + return True except discord.errors.Forbidden: logger.error( f"Bot lacks permission to assign quarantine role to {member}" ) - return + return False - # Delete messages from the last hour across all text channels + async def _purge_member_messages( + self, member: discord.Member, guild: discord.Guild + ) -> int: + """Purge messages from member in last hour. Returns deleted count.""" one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) deleted_count = 0 for channel in guild.text_channels: - try: - # Check if bot has permission to read and manage messages - permissions = channel.permissions_for(guild.me) - if not permissions.read_messages or not permissions.manage_messages: - logger.debug( - f"Skipping channel {channel.name} - insufficient permissions" - ) - continue - - # Use purge with a check function for efficiency - deleted = await channel.purge( - limit=None, - check=lambda m: m.author.id == member.id, - after=one_hour_ago, - reason=f"Quarantine purge for {member}", - ) - deleted_count += len(deleted) - if deleted: - logger.debug( - f"Deleted {len(deleted)} messages from {channel.name}" - ) + deleted_count += await self._purge_channel( + channel, member, one_hour_ago + ) - except discord.errors.Forbidden: + return deleted_count + + async def _purge_channel( + self, + channel: discord.TextChannel, + member: discord.Member, + after: datetime, + ) -> int: + """Purge messages from member in a single channel. Returns count.""" + try: + permissions = channel.permissions_for(channel.guild.me) + if ( + not permissions.read_messages + or not permissions.manage_messages + ): + logger.debug(f"Skipping {channel.name} - no permissions") + return 0 + + deleted = await channel.purge( + limit=None, + check=lambda m: m.author.id == member.id, + after=after, + reason=f"Quarantine purge for {member}", + ) + if deleted: logger.debug( - f"Cannot purge messages in {channel.name} - forbidden" - ) - except discord.errors.HTTPException as e: - logger.warning( - f"HTTP error purging messages in {channel.name}: {e}" + f"Deleted {len(deleted)} messages from {channel.name}" ) + return len(deleted) + + except discord.errors.Forbidden: + logger.debug(f"Cannot purge in {channel.name} - forbidden") + except discord.errors.HTTPException as e: + logger.warning(f"HTTP error purging in {channel.name}: {e}") + return 0 + + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + if message.author.bot: + return + + if message.channel.id != QUARANTINE_HONEYPOT_CHANNEL_ID: + return + + if QUARANTINE_HONEYPOT_CHANNEL_ID == 0 or QUARANTINE_ROLE_ID == 0: + logger.warning("Quarantine IDs not configured. Skipping.") + return + + member = message.author + guild = message.guild + + if guild is None: + logger.warning("Message not in a guild. Skipping.") + return + + logger.info( + f"Honeypot triggered by {member} ({member.id}) " + f"in channel {message.channel.name}" + ) + + await self._delete_honeypot_message(message) + + if not await self._assign_quarantine_role(member, guild): + return + + deleted_count = await self._purge_member_messages(member, guild) logger.info( f"Quarantine complete for {member} ({member.id}). " f"Deleted {deleted_count} messages from the last hour." ) - From 2053e16bee04f950096045b2bd5be4afba69e02b Mon Sep 17 00:00:00 2001 From: Justin Slay Date: Tue, 23 Dec 2025 15:11:17 -0700 Subject: [PATCH 3/3] Add envvars to chart --- helm/am-bot/templates/deployment.yaml | 9 +++++++++ helm/am-bot/values.yaml | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/helm/am-bot/templates/deployment.yaml b/helm/am-bot/templates/deployment.yaml index 19cb4ba..361aeda 100644 --- a/helm/am-bot/templates/deployment.yaml +++ b/helm/am-bot/templates/deployment.yaml @@ -56,6 +56,15 @@ spec: name: {{ include "am-bot.awsSecretName" . }} key: AWS_DEFAULT_REGION {{- end }} + {{- /* Quarantine honeypot configuration */}} + {{- if .Values.quarantine.honeypotChannelId }} + - name: QUARANTINE_HONEYPOT_CHANNEL_ID + value: {{ .Values.quarantine.honeypotChannelId | quote }} + {{- end }} + {{- if .Values.quarantine.roleId }} + - name: QUARANTINE_ROLE_ID + value: {{ .Values.quarantine.roleId | quote }} + {{- end }} {{- /* Additional environment variables */}} {{- range $key, $value := .Values.env }} - name: {{ $key }} diff --git a/helm/am-bot/values.yaml b/helm/am-bot/values.yaml index ca8b83f..d6c39cb 100644 --- a/helm/am-bot/values.yaml +++ b/helm/am-bot/values.yaml @@ -31,6 +31,14 @@ aws: # AWS_SECRET_ACCESS_KEY: "" # AWS_DEFAULT_REGION: "" +# Quarantine honeypot configuration +# Set these to enable the quarantine feature +quarantine: + # Channel ID that acts as the honeypot trap + honeypotChannelId: "" + # Role ID to assign to users who trigger the honeypot + roleId: "" + # Additional environment variables env: {}