diff --git a/src/disopy/cogs/base.py b/src/disopy/cogs/base.py index 81560cb..2224a03 100644 --- a/src/disopy/cogs/base.py +++ b/src/disopy/cogs/base.py @@ -28,6 +28,16 @@ def __init__(self, bot: Bot, options: Options) -> None: self.bot = bot self.options = options + def seconds_to_str(self, seconds: int) -> str: + """Converts seconds to h:m:s + + Args: + seconds: Time in seconds + """ + min, sec = divmod(seconds, 60) + hour, min = divmod(min, 60) + return '%d:%02d:%02d' % (hour, min, sec) if hour > 0 else '%02d:%02d' % (min, sec) + async def send_answer( self, interaction: Interaction, title: str, content: list[str] | None = None, ephemeral: bool = False ) -> None: diff --git a/src/disopy/cogs/queue.py b/src/disopy/cogs/queue.py index f91d39c..18ad17d 100644 --- a/src/disopy/cogs/queue.py +++ b/src/disopy/cogs/queue.py @@ -6,7 +6,10 @@ import logging from collections import deque +from random import sample +from math import ceil from typing import Iterable, NamedTuple, cast +from itertools import islice import discord from discord import PCMVolumeTransformer, VoiceClient, app_commands @@ -19,18 +22,20 @@ from ..options import Options from .base import Base - class Song(NamedTuple): """Data representation for a Subsonic song. Attributes: id: The ID in the Subsonic server. title: The title of the song. + artist: The primary artist of the song + duration: The duration of the song in seconds """ id: str title: str - + artist: str + duration: int logger = logging.getLogger(__name__) @@ -94,7 +99,7 @@ def pop(self, interaction: Interaction) -> Song | None: if id is None: return None - return self.queue[id].pop() + return self.queue[id].popleft() def append(self, interaction: Interaction, song: Song) -> None: """Append new songs to the queue. @@ -124,9 +129,47 @@ def length(self, interaction: Interaction) -> int: if id is None: # A little ugly but gets the job done return 0 - + return len(self.queue[id]) + + def shuffle(self, interaction: Interaction) -> None: + """Shuffles the current queue + + Args: + interaction: The interaction where the guild ID can be found. + + """ + id = self._check_guild(interaction) + if id is None: + return + self.queue[id] = deque(sample(self.queue[id],len(self.queue[id]))) + + def duration(self, interaction: Interaction) -> int: + """Calculates the remaining duration of the queue, without the current song + Args: + interaction: The interaction where the guild ID can be found. + + Returns: + Seconds of remaining duration + """ + id = self._check_guild(interaction) + if id is None: + return 0 + + return sum(song.duration for song in self.queue[id]) + + def clear(self, interaction: Interaction) -> None: + """Empties the queue + + Args: + interaction: The interaction where the guild ID can be found. + """ + id = self._check_guild(interaction) + if id is None: + return + + self.queue[id] = deque() class QueueCog(Base): """Cog that holds queue handling and music playback commands.""" @@ -148,6 +191,7 @@ def __init__(self, bot: Bot, options: Options, subsonic: Subsonic, config: Confi self.queue = Queue() self.now_playing: Song | None = None + self.loop = 0 # 0: no loop; 1: loop queue; 2: loop track self.skip_next_autoplay = False @@ -168,6 +212,9 @@ async def get_voice_client(self, interaction: Interaction, connect: bool = False if guild.voice_client is None: if connect: + self.queue.clear(interaction) + self.now_playing = None + self.loop = 0 return await user.voice.channel.connect(self_deaf=True) await self.send_error(interaction, ["I'm not connected to a voice channel!"]) return None @@ -203,11 +250,14 @@ def play_queue(self, interaction: Interaction, exception: Exception | None) -> N if exception is not None: raise exception - if self.queue.length(interaction) == 0: + if self.queue.length(interaction) == 0 and self.loop < 2: logger.info("The queue is empty") return - song = self.queue.pop(interaction) + song = self.queue.pop(interaction) if self.loop < 2 else self.now_playing + if self.loop == 1: + self.queue.append(interaction, song) + if song is None: logger.error("Unable to get the song for playback") return @@ -239,7 +289,7 @@ def play_queue(self, interaction: Interaction, exception: Exception | None) -> N if interaction.guild.voice_client is None: logger.warning("There is not available voice client in this interaction!") return - + voice_client = cast(VoiceClient, interaction.guild.voice_client) voice_client.play( @@ -254,67 +304,92 @@ def play_queue(self, interaction: Interaction, exception: Exception | None) -> N self.now_playing = song voice_client.source = PCMVolumeTransformer(voice_client.source, volume=self.config.volume / 100) - @app_commands.command(description="Add a song, album, or playlist to the queue") - @app_commands.choices( - what=[ - app_commands.Choice(name="Song", value="song"), - app_commands.Choice(name="Album", value="album"), - app_commands.Choice(name="Playlist", value="playlist"), - ] - ) + async def query_autocomplete(self, interaction: Interaction, current: str) -> list[app_commands.Choice[str]]: + """Looks up song/album for autocomplete + + Args: + interaction: The interaction that started the command. + current: Current input into query. + """ + + results = [] + + if len(current) >= 3: + search = self.subsonic.searching.search(current, song_count=5, album_count=5, artist_count=0) + if search.songs is not None: + for song in search.songs: + res = f"🎵 {(f"{song.artists[0].name} - " if song.artists[0].name is not None else "")}{song.title}" + duration = f" [{self.seconds_to_str(song.duration)}]" + # Trunctuate result length if over 100 characters + if len(res) + len(duration) > 100: + results.append(app_commands.Choice(name=res[:97 - len(duration)] + "..." + duration, value=f"song:{song.id}")) + else: + results.append(app_commands.Choice(name= res + duration, value=f"song:{song.id}")) + + if search.albums is not None: + for album in search.albums: + res = f"🎶 {(f"{album.artists[0].name} - " if album.artists[0] is not None else "")}{album.name}" + num_songs = f" ({album.song_count} songs)" + # Trunctuate result length if over 100 characters + if len(res) + len(num_songs) > 100: + results.append(app_commands.Choice(name=res[:97 - len(num_songs)] + "..." + num_songs, value=f"album:{album.id}")) + else: + results.append(app_commands.Choice(name=res + num_songs, value=f"album:{album.id}")) + + if len(results) == 0: + results = [app_commands.Choice(name="No result found :(", value="")] + else: + results = [app_commands.Choice(name="Input 3 or more letters to search", value="")] + return results + + @app_commands.command(description="Add a song or album to the queue") + @app_commands.autocomplete(query=query_autocomplete) async def play( self, interaction: Interaction, query: str, - # Ignore the MyPy error because discord.py uses the type to add autocompletion - what: app_commands.Choice[str] = "song", # type: ignore ) -> None: """Add a song in the queue and start the playback if it's stop. Args: interaction: The interaction that started the command. - song_name: The name of the song. + query: The type of media to play and its id. """ - await interaction.response.defer(thinking=True) - - # Extract the type of element to be search, taking care of the default value - choice = what if isinstance(what, str) else what.value - + voice_client = await self.get_voice_client(interaction, True) if voice_client is None: return - playing_element_name = query + choice, value = query.split(":") first_play = self.queue.length(interaction) == 0 and self.now_playing is None + playing_element_name = "" + songs_added = 0 match choice: case "song": - songs = self.subsonic.searching.search(query, song_count=10, album_count=0, artist_count=0).songs - if songs is None: - await self.send_error(interaction, [f"No songs found with the name: **{query}**"]) + song = self.subsonic.browsing.get_song(value) + if song is None: + await self.send_error(interaction, [f"No song found"]) return - - song = songs[0] + if song.title is None: await self.send_error(interaction, [f"The song is missing the required metadata: {query}"]) return - + playing_element_name = song.title - self.queue.append(interaction, Song(song.id, song.title)) + self.queue.append(interaction, Song(song.id, song.title, song.artists[0].name, song.duration)) case "album": - albums = self.subsonic.searching.search(query, song_count=0, album_count=10, artist_count=0).albums - if albums is None: - await self.send_error(interaction, [f"No albums found with the name: **{query}**"]) + album = self.subsonic.browsing.get_album(value) + if album is None: + await self.send_error(interaction, [f"No album found"]) return - - album = albums[0].generate() - print(album) + if album.songs is None: await self.send_error(interaction, [f"The album is missing the required metadata: {query}"]) return - + if album.name is not None: playing_element_name = album.name @@ -322,35 +397,59 @@ async def play( if song.title is None: logger.error(f"The song with ID '{song.id}' is missing the name metadata entry") continue + songs_added += 1 + self.queue.append(interaction, Song(song.id, song.title, song.artists[0].name, song.duration)) - self.queue.append(interaction, Song(song.id, song.title)) + case _: + await self.send_error(interaction, [f"No songs found"]) + return - case "playlist": - for playlist in self.subsonic.playlists.get_playlists(): - if playlist.name is None: - continue - - if query in playlist.name: - playlist = playlist.generate() - if playlist.songs is None: - await self.send_error(interaction, ["The playlist has no songs!"]) - return + if first_play: + await self.send_answer(interaction, "🎵 Now playing!", [f"**{playing_element_name}**{f" ({songs_added} songs added)" if songs_added > 0 else ""}"]) + else: + await self.send_answer(interaction, "🎧 Added to the queue", [f"**{playing_element_name}**{f" ({songs_added} songs added)" if songs_added > 0 else ""}"]) - if playlist.name is not None: - playing_element_name = playlist.name + if not voice_client.is_playing(): + self.play_queue(interaction, None) - for song in playlist.songs: - if song.title is None: - logger.error(f"The song with ID '{song.id}' is missing the name metadata entry") - continue + @app_commands.command(description="Adds a playlist to the queue") + async def playlist(self, interaction: Interaction, query: str) -> None: + """Queues a playlist + + Args: + interaction: The interaction that started the command. + query: The name of the playlist + """ + voice_client = await self.get_voice_client(interaction, True) + if voice_client is None: + return - self.queue.append(interaction, Song(song.id, song.title)) - break + first_play = self.queue.length(interaction) == 0 and self.now_playing is None + playing_element_name = query + songs_added = 0 + + for playlist in self.subsonic.playlists.get_playlists(): + if playlist.name is None: + continue + if query.lower() in playlist.name.lower(): + playlist = playlist.generate() + if playlist.songs is None: + await self.send_error(interaction, ["The playlist has no songs!"]) + return + if playlist.name is not None: + playing_element_name = playlist.name + for song in playlist.songs: + if song.title is None: + logger.error(f"The song with ID '{song.id}' is missing the name metadata entry") + continue + songs_added += 1 + self.queue.append(interaction, Song(song.id, song.title, song.artists[0].name, song.duration)) + break if first_play: - await self.send_answer(interaction, "🎵 Now playing!", [f"**{playing_element_name}**"]) + await self.send_answer(interaction, "🎵 Now playing!", [f"**{playing_element_name}** ({songs_added} songs added)"]) else: - await self.send_answer(interaction, "🎧 Added to the queue", [f"**{playing_element_name}**"]) + await self.send_answer(interaction, "🎧 Added to the queue", [f"**{playing_element_name}** ({songs_added} songs added)"]) if not voice_client.is_playing(): self.play_queue(interaction, None) @@ -415,6 +514,20 @@ async def skip(self, interaction: Interaction) -> None: voice_client.stop() await self.send_answer(interaction, "⏭️ Song skipped") + @app_commands.command(description="Clear the queue") + async def clear(self, interaction: Interaction) -> None: + """Clears the remaining queue + + Args: + interaction: The interaction that started the command. + """ + voice_client = await self.get_voice_client(interaction) + if voice_client is None: + return + + self.queue.clear(interaction) + await self.send_answer(interaction, "🗑️ Cleared the queue") + @app_commands.command(description="Resume the playback") async def resume(self, interaction: Interaction) -> None: """Resume the playback of the song and if there is no one playing play the next one in the queue. @@ -439,26 +552,117 @@ async def resume(self, interaction: Interaction) -> None: self.play_queue(interaction, None) await self.send_answer(interaction, "▶️ Resuming the playback") + @app_commands.command(description="Kick the bot from the voice call") + async def leave(self, interaction: Interaction) -> None: + user = interaction.user + if isinstance(user, discord.User): + await self.send_error(interaction, ["You are not a member of the guild, something has gone very wrong..."]) + return None + + if user.voice is None or user.voice.channel is None: + await self.send_error(interaction, ["You are not connected to any voice channel!"]) + return None + + guild = interaction.guild + if guild is None: + await self.send_error(interaction, ["We are not chatting in a guild, something has gone very wrong..."]) + return None + + if guild.voice_client is None: + await self.send_error(interaction, ["I'm not connected to a voice channel!"]) + return None + + if user.voice.channel != guild.voice_client.channel: + await self.send_error(interaction, ["Join the same voice channel where I am"]) + return None + try: + await guild.voice_client.disconnect() + finally: + await self.send_answer(interaction, "🚪 Bot left", ["Goodbye."]) + + @app_commands.command(name="loop", description="Loops the queue") + @app_commands.choices( + what = [ + app_commands.Choice(name="Queue", value=1), + app_commands.Choice(name="Song", value=2) + ] + ) + async def loop_command(self, interaction: Interaction, what: app_commands.Choice[int] = 1) -> None: + """Loops the queue / current track + + Args: + interaction: The interaction that started the command. + what: How to loop queue + """ + voice_client = await self.get_voice_client(interaction) + if voice_client is None: + return + + if self.queue.length(interaction) == 0 and what.value != 2: + await self.send_error(interaction, ["The queue is empty"]) + return + if self.now_playing is None and what.value == 2: + await self.send_error(interaction, ["Nothing is playing"]) + return + + if what.value == self.loop: + self.loop = 0 + await self.send_answer(interaction, "🔁 Stopped looping") + return + + if what.value == 1 and self.now_playing is not None and self.now_playing != self.queue[-1]: + self.queue.append(interaction, self.now_playing) + + self.loop = what.value + await self.send_answer(interaction, "🔁 Now looping queue" if what == 1 else "🔂 Now looping current track") + + @app_commands.command(name="shuffle", description="Shuffles the current queue") + async def shuffle_command(self, interaction: Interaction) -> None: + """Mixes the songs in the queue, if one exists + + Args: + interaction: The interaction that started the command. + """ + voice_client = await self.get_voice_client(interaction) + if voice_client is None: + return + + if self.queue.length(interaction) == 0: + await self.send_error(interaction, ["The queue is empty"]) + return + + self.queue.shuffle(interaction) + await self.send_answer(interaction, "🔀 Shuffling the queue") + @app_commands.command(name="queue", description="See the current queue") # Name changed to avoid collisions with the property `queue` - async def queue_command(self, interaction: Interaction) -> None: + async def queue_command(self, interaction: Interaction, page: int = 1) -> None: """List the songs added to the queue. Args: interaction: The interaction that started the command. """ - content = [] + max_page = ceil(self.queue.length(interaction)/10) + length = self.queue.length(interaction) + + if (1 > page or page > max_page) and max_page != 0: + await self.send_error(interaction, ["Out of queue bounds"]) + if self.now_playing is not None: - content.append(f"Now playing: **{self.now_playing.title}**") + content.append(f"Now playing: {self.now_playing.artist} - **{self.now_playing.title}**") content.append("") - length = self.queue.length(interaction) - + page -= 1 + if self.loop > 0: + content.append(f"Looping {"queue" if self.loop == 1 else "track"}") if length > 0: - content.append("Next:") - for song in self.queue.get(interaction): - content.append(f"- **{song.title}**") + content.append(f"""Remaining time - {self.seconds_to_str(self.queue.duration(interaction))} + Pages - {page+1}/{max_page} + + Next:""") + for num, song in enumerate(islice(self.queue.get(interaction), 10*page, 10*(page + 1))): + content.append(f"{10*page + num + 1}. {song.artist} - **{song.title}**\t[{self.seconds_to_str(song.duration)}]") if length == 0: content.append("_Queue empty_") @@ -475,7 +679,7 @@ async def volume(self, interaction: Interaction, volume: int) -> None: """ # Defer immediately to avoid timeout - await interaction.response.defer(thinking=True) + # await interaction.response.defer(thinking=True) voice_client = await self.get_voice_client(interaction) if voice_client is None: @@ -491,4 +695,4 @@ async def volume(self, interaction: Interaction, volume: int) -> None: # Every source has a volume handler attach to it so suppressing the mypy error is safe voice_client.source.volume = volume / 100 # type: ignore[attr-defined] - await self.send_answer(interaction, f"🔊 Volume level set to {volume}%") + await self.send_answer(interaction, f"🔊 Volume level set to {volume}%") \ No newline at end of file diff --git a/src/disopy/cogs/search.py b/src/disopy/cogs/search.py index c6b5744..c6f1d6f 100644 --- a/src/disopy/cogs/search.py +++ b/src/disopy/cogs/search.py @@ -65,7 +65,7 @@ def api_search(self, query: str, choice: str) -> tuple[str, list[str]]: for song in songs: content.append( - f"- **{song.title}**" + (f" - {song.artist.name}" if song.artist is not None else "") + "- " + (f"{song.artist.name} - " if song.artist is not None else "") + f"**{song.title}** [{self.seconds_to_str(song.duration)}]" ) case "album": @@ -74,7 +74,7 @@ def api_search(self, query: str, choice: str) -> tuple[str, list[str]]: for album in albums: content.append( - f"- **{album.name}**" + (f" - {album.artist.name}" if album.artist is not None else "") + "- " + (f"{album.artist.name} - " if album.artist is not None else "") + f"**{album.name}**" ) case "artist": diff --git a/src/disopy/discord.py b/src/disopy/discord.py index 2ecfe78..91903c0 100644 --- a/src/disopy/discord.py +++ b/src/disopy/discord.py @@ -62,7 +62,7 @@ def get_bot(subsonic: Subsonic, config: Config, options: Options) -> Bot: intents = discord.Intents.default() intents.message_content = True - bot = discord.ext.commands.Bot(f"!{APP_NAME_LOWER}", intents=intents) + bot = discord.ext.commands.Bot(f"!{APP_NAME_LOWER}", intents=intents, activity=discord.Activity(type=discord.ActivityType.listening, name="music")) @bot.event async def on_ready() -> None: