diff --git a/am_bot/cogs/quarantine.py b/am_bot/cogs/quarantine.py index 716684d..a746e29 100644 --- a/am_bot/cogs/quarantine.py +++ b/am_bot/cogs/quarantine.py @@ -7,15 +7,17 @@ from difflib import SequenceMatcher import discord +from discord import app_commands from discord.ext import commands +from am_bot.constants import ( + QUARANTINE_HONEYPOT_CHANNEL_ID, + QUARANTINE_ROLE_ID, + STAFF_ROLE_ID, +) -logger = logging.getLogger(__name__) -QUARANTINE_HONEYPOT_CHANNEL_ID = int( - os.getenv("QUARANTINE_HONEYPOT_CHANNEL_ID", 0) -) -QUARANTINE_ROLE_ID = int(os.getenv("QUARANTINE_ROLE_ID", 0)) +logger = logging.getLogger(__name__) # Spam detection configuration # Minimum similarity ratio (0.0 to 1.0) to consider messages as duplicates @@ -307,3 +309,87 @@ async def on_message(self, message: discord.Message): # Record the message for future spam detection self._record_message(message) + + def _is_staff(self, member: discord.Member) -> bool: + """Check if a member has the staff role.""" + if STAFF_ROLE_ID == 0: + return False + return any(role.id == STAFF_ROLE_ID for role in member.roles) + + @app_commands.command( + name="quarantine", + description="Quarantine a user and purge their recent messages", + ) + @app_commands.describe( + member="The member to quarantine", reason="Reason for the quarantine" + ) + @app_commands.default_permissions(manage_roles=True) + async def quarantine_command( + self, + interaction: discord.Interaction, + member: discord.Member, + reason: str | None = None, + ) -> None: + """Slash command for staff to manually quarantine a user.""" + # Check if user has staff role + if not self._is_staff(interaction.user): + await interaction.response.send_message( + "You must be staff to use this command.", ephemeral=True + ) + return + + reason = reason or "Manual quarantine by staff" + full_reason = f"{reason} (by {interaction.user})" + + # Don't allow quarantining bots + if member.bot: + await interaction.response.send_message( + "Cannot quarantine bots.", ephemeral=True + ) + return + + # Don't allow self-quarantine + if member.id == interaction.user.id: + await interaction.response.send_message( + "You cannot quarantine yourself.", ephemeral=True + ) + return + + # Defer the response since purging may take time + await interaction.response.defer(ephemeral=True) + + logger.info( + f"Manual quarantine initiated for {member} ({member.id}) " + f"by {interaction.user} ({interaction.user.id}): {reason}" + ) + + success = await self._assign_quarantine_role( + member, interaction.guild, full_reason + ) + + if not success: + await interaction.followup.send( + "Failed to assign quarantine role. Check bot permissions.", + ephemeral=True, + ) + return + + deleted_count = await self._purge_member_messages( + member, interaction.guild + ) + + # Clear their message history from memory + if member.id in self.message_history: + del self.message_history[member.id] + + logger.info( + f"Manual quarantine complete for {member} ({member.id}). " + f"Deleted {deleted_count} messages from the last hour." + ) + + await interaction.followup.send( + f"✅ Quarantined {member.mention}.\n" + f"Deleted {deleted_count} messages from the last hour.\n" + f"Reason: {reason}", + ephemeral=True, + ) diff --git a/am_bot/constants.py b/am_bot/constants.py index 1cdfe6b..f644822 100644 --- a/am_bot/constants.py +++ b/am_bot/constants.py @@ -4,6 +4,8 @@ MODDER_ROLE_ID = 190385081523765248 MAPPER_ROLE_ID = 190385107297632257 WORKSHOP_ROLE_ID = 770207045357797378 +STAFF_ROLE_ID = 322496447687819264 +QUARANTINE_ROLE_ID = 886883973463166996 # Channel IDs MODDER_STATS_CHANNEL_ID = 877564476315029525 @@ -14,3 +16,4 @@ WORKSHOP_TEXT_CHANNEL_ID = 770198077943971880 STARBOARD_TEXT_CHANNEL_ID = 863887933089906718 INVITE_HELP_TEXT_CHANNEL_ID = 372253844953890817 +QUARANTINE_HONEYPOT_CHANNEL_ID = 1453143051332616223 diff --git a/helm/am-bot/templates/deployment.yaml b/helm/am-bot/templates/deployment.yaml index a7e28a8..cea0967 100644 --- a/helm/am-bot/templates/deployment.yaml +++ b/helm/am-bot/templates/deployment.yaml @@ -56,15 +56,6 @@ spec: name: {{ include "am-bot.awsSecretName" . }} key: AWS_DEFAULT_REGION {{- end }} - {{- /* Quarantine honeypot configuration */}} - {{- if .Values.quarantine.honeypotChannelId }} - - name: QUARANTINE_HONEYPOT_CHANNEL_ID - value: {{ .Values.quarantine.honeypotChannelId | quote }} - {{- end }} - {{- if .Values.quarantine.roleId }} - - name: QUARANTINE_ROLE_ID - value: {{ .Values.quarantine.roleId | quote }} - {{- end }} {{- /* Spam detection configuration */}} {{- if .Values.spamDetection.similarityThreshold }} - name: SPAM_SIMILARITY_THRESHOLD diff --git a/helm/am-bot/values.yaml b/helm/am-bot/values.yaml index 5db8cdc..b9561ca 100644 --- a/helm/am-bot/values.yaml +++ b/helm/am-bot/values.yaml @@ -31,14 +31,6 @@ aws: # AWS_SECRET_ACCESS_KEY: "" # AWS_DEFAULT_REGION: "" -# Quarantine honeypot configuration -# Set these to enable the quarantine feature -quarantine: - # Channel ID that acts as the honeypot trap - honeypotChannelId: "" - # Role ID to assign to users who trigger the honeypot - roleId: "" - # Spam detection configuration spamDetection: # Similarity threshold (0.0-1.0) for detecting duplicate messages diff --git a/tests/test_quarantine.py b/tests/test_quarantine.py index 3fd1c57..a7fba71 100644 --- a/tests/test_quarantine.py +++ b/tests/test_quarantine.py @@ -12,6 +12,7 @@ make_mock_guild, make_mock_member, make_mock_message, + make_mock_role, ) @@ -22,8 +23,6 @@ def setup_env(): with patch.dict( os.environ, { - "QUARANTINE_HONEYPOT_CHANNEL_ID": "123456789", - "QUARANTINE_ROLE_ID": "987654321", "SPAM_SIMILARITY_THRESHOLD": "0.85", "SPAM_CHANNEL_THRESHOLD": "3", "MESSAGE_HISTORY_SECONDS": "3600", @@ -563,3 +562,250 @@ async def test_on_message_spam_detection_triggers_quarantine(self, cog): # Message should be deleted (quarantine triggered) message.delete.assert_called_once() + + +class TestQuarantineSlashCommand: + """Tests for the /quarantine slash command.""" + + @pytest.fixture + def cog(self): + """Create a QuarantineCog instance with mocked bot.""" + from am_bot.cogs.quarantine import QuarantineCog + + bot = make_mock_bot() + return QuarantineCog(bot) + + @pytest.fixture + def staff_role(self): + """Create a mock staff role.""" + return make_mock_role(role_id=322496447687819264, name="Staff") + + @pytest.fixture + def mock_interaction(self, staff_role): + """Create a mock Discord interaction with a staff user.""" + from unittest.mock import AsyncMock + + interaction = MagicMock() + interaction.user = make_mock_member( + user_id=11111, name="StaffUser", roles=[staff_role] + ) + interaction.guild = make_mock_guild() + interaction.response = MagicMock() + interaction.response.send_message = AsyncMock() + interaction.response.defer = AsyncMock() + interaction.followup = MagicMock() + interaction.followup.send = AsyncMock() + return interaction + + def test_is_staff_returns_true_for_staff_member(self, cog, staff_role): + """Test _is_staff returns True when member has staff role.""" + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264): + member = make_mock_member(roles=[staff_role]) + assert cog._is_staff(member) is True + + def test_is_staff_returns_false_for_non_staff(self, cog): + """Test _is_staff returns False when member lacks staff role.""" + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264): + other_role = make_mock_role(role_id=999999, name="Other") + member = make_mock_member(roles=[other_role]) + assert cog._is_staff(member) is False + + def test_is_staff_returns_false_when_not_configured(self, cog, staff_role): + """Test _is_staff returns False when STAFF_ROLE_ID is 0.""" + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 0): + member = make_mock_member(roles=[staff_role]) + assert cog._is_staff(member) is False + + @pytest.mark.asyncio + async def test_quarantine_command_rejects_non_staff(self, cog): + """Test command rejects users without staff role.""" + from unittest.mock import AsyncMock + + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264): + interaction = MagicMock() + interaction.user = make_mock_member(user_id=11111, roles=[]) + interaction.response = MagicMock() + interaction.response.send_message = AsyncMock() + + target = make_mock_member(user_id=22222) + + await cog.quarantine_command.callback( + cog, interaction, target, None + ) + + interaction.response.send_message.assert_called_once() + call_args = interaction.response.send_message.call_args + assert "must be staff" in call_args[0][0] + assert call_args[1]["ephemeral"] is True + + @pytest.mark.asyncio + async def test_quarantine_command_rejects_bots( + self, cog, mock_interaction + ): + """Test that the command rejects attempts to quarantine bots.""" + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264): + target = make_mock_member(user_id=22222, bot=True) + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + mock_interaction.response.send_message.assert_called_once() + call_args = mock_interaction.response.send_message.call_args + assert "Cannot quarantine bots" in call_args[0][0] + assert call_args[1]["ephemeral"] is True + + @pytest.mark.asyncio + async def test_quarantine_command_rejects_self( + self, cog, mock_interaction + ): + """Test that the command rejects self-quarantine.""" + with patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264): + # Target is the same as the invoker + target = make_mock_member(user_id=11111, name="StaffUser") + mock_interaction.user.id = 11111 + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + mock_interaction.response.send_message.assert_called_once() + call_args = mock_interaction.response.send_message.call_args + assert "cannot quarantine yourself" in call_args[0][0] + assert call_args[1]["ephemeral"] is True + + @pytest.mark.asyncio + async def test_quarantine_command_fails_without_role( + self, cog, mock_interaction + ): + """Test command handles missing quarantine role.""" + with ( + patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264), + patch("am_bot.cogs.quarantine.QUARANTINE_ROLE_ID", 0), + ): + target = make_mock_member(user_id=22222) + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + mock_interaction.response.defer.assert_called_once() + mock_interaction.followup.send.assert_called_once() + call_args = mock_interaction.followup.send.call_args + assert "Failed to assign quarantine role" in call_args[0][0] + + @pytest.mark.asyncio + async def test_quarantine_command_success(self, cog, mock_interaction): + """Test successful quarantine via slash command.""" + with ( + patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264), + patch("am_bot.cogs.quarantine.QUARANTINE_ROLE_ID", 98765), + ): + target = make_mock_member(user_id=22222, name="BadUser") + mock_role = MagicMock() + mock_interaction.guild.get_role.return_value = mock_role + mock_interaction.guild.text_channels = [] + + await cog.quarantine_command.callback( + cog, mock_interaction, target, "Spamming in chat" + ) + + # Should defer and then follow up + mock_interaction.response.defer.assert_called_once_with( + ephemeral=True + ) + mock_interaction.followup.send.assert_called_once() + + # Check the followup message + call_args = mock_interaction.followup.send.call_args + assert "Quarantined" in call_args[0][0] + assert "Spamming in chat" in call_args[0][0] + assert call_args[1]["ephemeral"] is True + + # Role should be assigned + target.add_roles.assert_called_once() + + @pytest.mark.asyncio + async def test_quarantine_command_default_reason( + self, cog, mock_interaction + ): + """Test quarantine command with default reason.""" + with ( + patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264), + patch("am_bot.cogs.quarantine.QUARANTINE_ROLE_ID", 98765), + ): + target = make_mock_member(user_id=22222) + mock_role = MagicMock() + mock_interaction.guild.get_role.return_value = mock_role + mock_interaction.guild.text_channels = [] + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + # Check the role was assigned with default reason + target.add_roles.assert_called_once() + call_kwargs = target.add_roles.call_args[1] + assert "Manual quarantine by staff" in call_kwargs["reason"] + + @pytest.mark.asyncio + async def test_quarantine_command_clears_message_history( + self, cog, mock_interaction + ): + """Test that quarantine clears user's message history.""" + from am_bot.cogs.quarantine import MessageRecord + + with ( + patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264), + patch("am_bot.cogs.quarantine.QUARANTINE_ROLE_ID", 98765), + ): + target_id = 22222 + target = make_mock_member(user_id=target_id) + mock_role = MagicMock() + mock_interaction.guild.get_role.return_value = mock_role + mock_interaction.guild.text_channels = [] + + # Pre-populate some message history + now = datetime.now(timezone.utc) + cog.message_history[target_id] = [ + MessageRecord("test message", 100, now) + ] + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + # History should be cleared + assert target_id not in cog.message_history + + @pytest.mark.asyncio + async def test_quarantine_command_purges_messages( + self, cog, mock_interaction + ): + """Test that quarantine purges messages from channels.""" + with ( + patch("am_bot.cogs.quarantine.STAFF_ROLE_ID", 322496447687819264), + patch("am_bot.cogs.quarantine.QUARANTINE_ROLE_ID", 98765), + ): + target = make_mock_member(user_id=22222) + mock_role = MagicMock() + mock_interaction.guild.get_role.return_value = mock_role + + # Set up channels with messages to purge + channel1 = make_mock_channel(channel_id=111) + channel1.guild = mock_interaction.guild + channel1.purge.return_value = [MagicMock() for _ in range(3)] + + channel2 = make_mock_channel(channel_id=222) + channel2.guild = mock_interaction.guild + channel2.purge.return_value = [MagicMock() for _ in range(2)] + + mock_interaction.guild.text_channels = [channel1, channel2] + + await cog.quarantine_command.callback( + cog, mock_interaction, target, None + ) + + # Check followup reports correct message count + call_args = mock_interaction.followup.send.call_args + assert "5 messages" in call_args[0][0]