diff --git a/build.py b/build.py index f2c17c9..eae117d 100644 --- a/build.py +++ b/build.py @@ -39,6 +39,7 @@ "seaplayer/objects/Notification.py": "seaplayer/objects/", "seaplayer/objects/ProgressBar.py": "seaplayer/objects/", "seaplayer/objects/Radio.py": "seaplayer/objects/", + "seaplayer/objects/PopUp.py": "seaplayer/objects/", # * 1.2) Types "seaplayer/types": "seaplayer/types/", "seaplayer/types/__init__.py": "seaplayer/types/", diff --git a/changelog.md b/changelog.md index ee3d317..e7a22cb 100644 --- a/changelog.md +++ b/changelog.md @@ -3,7 +3,8 @@ | Version | Date | Tag | Changelog | | ------- | ---- | --- | --------- | -| [v0.8.4](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.4) | 11.12.2023 | **STABLE** | - Added widgets: `Rheostat`, `ClickableLabel`
- Added widget: Rheostat
- Added a new widget: `PopUp`
- Fixed all language
- More attempts to make `Confiturate` screen clearer
- Moved all `CSS` from `objects.tcss` to `DEFAULT_CSS` separately for each widget | +| [v0.8.5](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.5) | 16.12.2023 | **STABLE** | - Added support for **python3.8**
- Added `on_ready` abstract methods for plugins
- Added new widgets: `PopUpWindow`, `WaitButton`
- Fixed method `add_sounds_to_list`
- Fixed `seaplayer.plug.cli`
- Fixed `build.py`
- Updated all child custom modules | +| [v0.8.4](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.4) | 11.12.2023 | **DEPRECATED** | - Added widgets: `Rheostat`, `ClickableLabel`
- Added widget: Rheostat
- Added a new widget: `PopUp`
- Fixed all language
- More attempts to make `Confiturate` screen clearer
- Moved all `CSS` from `objects.tcss` to `DEFAULT_CSS` separately for each widget | | [v0.8.3](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.3) | 10.12.2023 | **STABLE** | - Added plugin `VKMusic`
- Added priority system for codecs
- Added system handhers of value
- Fixed `build.py`
- Moved method `load_plugin_info` in `seaplayer.plug.pluginloader` | | [v0.8.2](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.2) | 08.12.2023 | **STABLE** | - Added language merge (for translating plugins)
- Fixed in translation files (`Log Menu Enable` -> `Logging` )
- Changed `object.css` (classes will no longer be used to specify standard properties)
- Improved widget `FillLabel` | | [v0.8.1](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.1) | 07.12.2023 | **STABLE** | - Revisioned of the LanguageLoader
- Added new language: `Українська` | diff --git a/plugins/VKMusic/__init__.py b/plugins/VKMusic/__init__.py index 79e85af..9b3eca3 100644 --- a/plugins/VKMusic/__init__.py +++ b/plugins/VKMusic/__init__.py @@ -1,13 +1,15 @@ import logging from rich.console import Console -from rich.prompt import Prompt +from textual.widgets import Input, Label from seaplayer.plug import PluginBase -from vkpymusic import Service, TokenReceiver +from seaplayer.objects import PopUpWindow, WaitButton +from .vkpymusic import Service, TokenReceiverAsync +# > Typing +from typing import Tuple # > Local Imports from .vkmcodec import VKMCodec from .units import ( pcheck, - VKM_MAIN_PATTERN, VKM_RANGE_PATTERN ) @@ -37,24 +39,76 @@ def exist_token(self) -> bool: def on_init(self) -> None: self.configurated = self.exist_token() - def on_run(self) -> None: + async def __req_login_password(self) -> Tuple[str, str]: + lppw = PopUpWindow( + ilogin:=Input(placeholder="Login"), + ipassword:=Input(placeholder="Password", password=True), + elpb:=WaitButton("Log In"), + title="VKMusic Authentication" + ) + await self.app.mount(lppw) + await elpb.wait_click() + login, password = ilogin.value, ipassword.value + await lppw.remove() + return login, password + + async def __req_2fa(self) -> str: + cpw = PopUpWindow( + icode:=Input(placeholder="Code"), + ecb:=WaitButton("Enter"), + title="VKMusic Authentication" + ) + await self.app.mount(cpw) + await ecb.wait_click() + code = icode.value + await cpw.remove() + return code + + async def __req_capcha(self, url: str) -> str: + cpw = PopUpWindow( + Label(f"[link={url}]{url}[/link]"), + icapcha:=Input(placeholder="Capcha"), + ecb:=WaitButton("Enter"), + title="VKMusic Authentication" + ) + await self.app.mount(cpw) + await ecb.wait_click() + code = icapcha + await cpw.remove() + return code + + async def __req_invalid_client(self) -> None: + pass + + async def __req_critical_error(self) -> None: + pass + + async def __init_service__(self) -> None: if self.configurated: self.service = Service.parse_config() else: - login, password = Prompt.ask("Login"), Prompt.ask("Password", password=True) while not self.configurated: - tr = TokenReceiver(login, password) - if tr.auth(on_2fa=lambda: Prompt.ask("Code 2FA")): + login, password = await self.__req_login_password() + tr = TokenReceiverAsync(login, password) + if await tr.auth( + self.__req_capcha, + self.__req_2fa, + self.__req_invalid_client, + self.__req_critical_error + ): tr.save_to_config() self.configurated = True else: - console.print("[red]Failed to get a token, repeat...[/red]") + self.app.error("Failed to get a token, repeat...") self.service = Service.parse_config() self.app.info(f"Service is worked: {repr(self.service)}") - # ! Registration - self.app.CODECS_KWARGS["vkm_service"] = self.service + # ? Registration + self.app.CODECS_KWARGS.update({"vkm_service": self.service}) self.add_value_handlers(vkm_value_handler) self.add_codecs(VKMCodec) + + async def on_ready(self): + self.app.run_worker(self.__init_service__, group=self.info.name_id) # ! Registeration -__plugin__ = VKMusic \ No newline at end of file +__plugin__ = VKMusic diff --git a/plugins/VKMusic/info.json b/plugins/VKMusic/info.json index a0d4751..a855108 100644 --- a/plugins/VKMusic/info.json +++ b/plugins/VKMusic/info.json @@ -1,7 +1,7 @@ { "name": "VK Music", "name_id": "seaplayer.plugins.vk.music", - "version": "0.2.0", + "version": "0.3.0.dev1", "author": "Romanin", "description": "Music downloader from VK.", "url": "https://github.com/romanin-rf/SeaPlayer" diff --git a/plugins/VKMusic/requirements.txt b/plugins/VKMusic/requirements.txt index 17375c6..0124341 100644 --- a/plugins/VKMusic/requirements.txt +++ b/plugins/VKMusic/requirements.txt @@ -1,2 +1,2 @@ vkpymusic>=2.2.4 -vbml>=1.1 \ No newline at end of file +vbml>=1.1 diff --git a/plugins/VKMusic/vkmcodec.py b/plugins/VKMusic/vkmcodec.py index aee3ee6..ffbe833 100644 --- a/plugins/VKMusic/vkmcodec.py +++ b/plugins/VKMusic/vkmcodec.py @@ -1,5 +1,5 @@ import asyncio -from vkpymusic import Service +from .vkpymusic import Service from seaplayer.codecs.URLS import URLSoundCodec from seaplayer.codecs.AnySound import AnySound # > Typing diff --git a/plugins/VKMusic/vkpymusic/Client.py b/plugins/VKMusic/vkpymusic/Client.py new file mode 100644 index 0000000..72dcb22 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/Client.py @@ -0,0 +1,14 @@ +class Client: + def __init__(self, user_agent, client_id, client_secret): + self.user_agent = user_agent + self.client_id = client_id + self.client_secret = client_secret + + +KateMobile = Client( + user_agent='KateMobileAndroid/56 lite-460 (Android 4.4.2; SDK 19; x86; unknown Android SDK built for x86; en)', + client_id='2685278', + client_secret='lxhD8OD7dMsqtXIm5IUY' +) + +clients = {'Kate': KateMobile} diff --git a/plugins/VKMusic/vkpymusic/Logger.py b/plugins/VKMusic/vkpymusic/Logger.py new file mode 100644 index 0000000..aec8f77 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/Logger.py @@ -0,0 +1,39 @@ +import os, datetime, logging + + +class bcolors: + CRITICAL = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + ERROR = "\033[91m" + ENDC = "\033[0m" + + +_log_format = "%(asctime)s | [%(name)s | (%(filename)s) .%(funcName)s(%(lineno)d)] [%(levelname)s] %(message)s" + + +def get_file_handler(): + file_path = f"logs/vkpymusic_{datetime.date.today()}.log" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + file_handler = logging.FileHandler(file_path) + file_handler.setLevel(logging.WARNING) + file_handler.setFormatter(logging.Formatter(_log_format)) + return file_handler + + +def get_stream_handler(): + stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.INFO) + stream_handler.setFormatter(logging.Formatter(_log_format)) + return stream_handler + + +def get_logger(name): + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + logger.addHandler(get_file_handler()) + logger.addHandler(get_stream_handler()) + return logger diff --git a/plugins/VKMusic/vkpymusic/Playlist.py b/plugins/VKMusic/vkpymusic/Playlist.py new file mode 100644 index 0000000..34c004a --- /dev/null +++ b/plugins/VKMusic/vkpymusic/Playlist.py @@ -0,0 +1,32 @@ +class Playlist: + def __init__( + self, title, description, photo, count, owner_id, playlist_id, access_key + ): + self.title = title + self.description = description + self.photo = photo + self.count = count + self.owner_id = owner_id + self.playlist_id = playlist_id + self.access_key = access_key + + def __str__(self): + return f"{self.title} ({self.count} tracks)" + + def to_dict(self) -> dict: + return self.__dict__ + + @classmethod + def from_json(cls, item): + title = str(item["title"]) + description = str(item["description"]) + photo = str(item["photo"]["photo_1200"]) + count = int(item["count"]) + owner_id = int(item["owner_id"]) + playlist_id = int(item["id"]) + access_key = str(item["access_key"]) + + playlist = cls( + title, description, photo, count, owner_id, playlist_id, access_key + ) + return playlist diff --git a/plugins/VKMusic/vkpymusic/Service.py b/plugins/VKMusic/vkpymusic/Service.py new file mode 100644 index 0000000..dd9b81d --- /dev/null +++ b/plugins/VKMusic/vkpymusic/Service.py @@ -0,0 +1,498 @@ +import os, configparser, json, logging + +from requests import Response, Session +import requests + +from .Logger import get_logger +from .Playlist import Playlist +from .Song import Song + + +logger: logging = get_logger(__name__) + + +class Service: + def __init__(self, user_agent: str, token: str): + self.user_agent = user_agent + self.__token = token + + @classmethod + def parse_config(cls, filename: str = "config_vk.ini"): + """ + Create an instance of Service from config. + + Args: + filename (str): Filename of config (default value = "config_vk.ini"). + """ + dirname = os.path.dirname(__file__) + configfile_path = os.path.join(dirname, filename) + + try: + config = configparser.ConfigParser() + config.read(configfile_path, encoding="utf-8") + + user_agent = config["VK"]["user_agent"] + token = config["VK"]["token_for_audio"] + + return Service(user_agent, token) + except Exception as e: + logger.warning(e) + + @staticmethod + def del_config(filename: str = "config_vk.ini"): + """ + Delete config created by 'TokenReceiver'. + + Args: + filename (str): Filename of config (default value = "config_vk.ini"). + """ + dirname = os.path.dirname(__file__) + configfile_path = os.path.join(dirname, filename) + + try: + os.remove(configfile_path) + logger.info("Config successful deleted!") + except Exception as e: + logger.warning(e) + + ########################## + # COMMON REQUEST FOR AUDIO + + def __get_response( + self, method: str, params: list[tuple[str, str or int]] + ) -> Response: + api_headers = {"User-Agent": self.user_agent} + api_url = f"https://api.vk.com/method/audio.{method}" + api_parameters = [ + ("access_token", self.__token), + ("https", 1), + ("lang", "ru"), + ("extended", 1), + ("v", "5.131"), + ] + + for pair in params: + api_parameters.append(pair) + + session = Session() + session.headers.update(api_headers) + response = session.post( + url=api_url, + data=api_parameters, + ) + session.close() + + return response + + ############## + # ANY REQUESTS + + def __getCount(self, user_id: int) -> Response: + params = [ + ("owner_id", user_id), + ] + + return self.__get_response("getCount", params) + + def __get( + self, + user_id: int, + count: int = 100, + offset: int = 0, + playlist_id: int or None = None, + access_key: str or None = None, + ) -> Response: + params = [ + ("owner_id", user_id), + ("count", count), + ("offset", offset), + ] + + if playlist_id: + params.append(("album_id", playlist_id)) + params.append(("access_key", access_key)) + + return self.__get_response("get", params) + + def __search(self, text: str, count: int = 100, offset: int = 0) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ("sort", 0), + ("autocomplete", 1), + ] + + return self.__get_response("search", params) + + def __getPlaylists( + self, user_id: int, count: int = 50, offset: int = 0 + ) -> Response: + params = [ + ("owner_id", user_id), + ("count", count), + ("offset", offset), + ] + + return self.__get_response("getPlaylists", params) + + def __searchPlaylists( + self, text: str, count: int = 50, offset: int = 0 + ) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ] + + return self.__get_response("searchPlaylists", params) + + def __searchAlbums(self, text: str, count: int = 50, offset: int = 0) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ] + + return self.__get_response("searchAlbums", params) + + ############ + # CONVERTERS + + def __response_to_songs(self, response: Response): + response = json.loads(response.content.decode("utf-8")) + try: + items = response["response"]["items"] + except Exception as e: + logger.error(e) + + songs: list[Song] = [] + for item in items: + song = Song.from_json(item) + songs.append(song) + return songs + + def __response_to_playlists(self, response: Response): + response = json.loads(response.content.decode("utf-8")) + try: + items = response["response"]["items"] + except Exception as e: + logger.error(e) + playlists: list[Playlist] = [] + for item in items: + playlist = Playlist.from_json(item) + playlists.append(playlist) + return playlists + + ############## + # MAIN METHODS + + def get_count_by_user_id(self, user_id: str or int) -> int: + """ + Get count of all user's songs. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + + Returns: + int: count of all user's songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response = self.__getCount(user_id) + data = json.loads(response.content.decode("utf-8")) + songs_count = int(data["response"]) + except Exception as e: + logger.error(e) + return + + logger.info(f"Count of user's songs: {songs_count}") + return songs_count + + def get_songs_by_userid( + self, user_id: str or int, count: int = 100, offset: int = 0 + ) -> list[Song]: + """ + Search songs by owner/user id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response: Response = self.__get(user_id, count, offset) + songs = self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + def get_songs_by_playlist_id( + self, + user_id: str or int, + playlist_id: int, + access_key: str, + count: int = 100, + offset: int = 0, + ) -> list[Song]: + """ + Get songs by playlist id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + playlist_id (int): VK playlist id. (Take it from methods for playlist). + access_key (str): VK access key. (Take it from methods for playlist). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response: Response = self.__get( + user_id, count, offset, playlist_id, access_key + ) + songs = self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + def get_songs_by_playlist( + self, playlist: Playlist, count: int = 10, offset: int = 0 + ) -> list[Song]: + """ + Get songs by instance of 'Playlist'. + + Args: + playlist (Playlist): Instance of 'Playlist' (take from methods for receiving Playlist). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + logger.info(f"Request by playlist: {playlist}") + + try: + response: Response = self.__get( + playlist.owner_id, + count, + offset, + playlist.playlist_id, + playlist.access_key, + ) + songs = self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + def search_songs_by_text( + self, text: str, count: int = 3, offset: int = 0 + ) -> list[Song]: + """ + Search songs by text/query. + + Args: + text (str): Text of query. Can be title of song, author, etc. + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + logger.info(f'Request by text: "{text}" в количестве {count}') + + try: + response = self.__search(text, count, offset) + songs = self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + def get_playlists_by_userid( + self, user_id: str or int, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Get playlist by owner/user id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of playlists. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response = self.__getPlaylists(user_id, count, offset) + playlists = self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + def search_playlists_by_text( + self, text: str, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Search playlists by text/query. + Playlist - it user's collection of songs. + + Args: + text (str): Text of query. Can be title of playlist, genre, etc. + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of playlists. + """ + logger.info(f"Request by text: {text}") + + try: + response = self.__searchPlaylists(text, count, offset) + playlists = self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + def search_albums_by_text( + self, text: str, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Search albums by text/query. + Album - artists's album/collection of songs. + In obj context - same as 'Playlist'. + + Args: + text (str): Text of query. Can be title of album, name of artist, etc. + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of albums. + """ + logger.info(f"Request by text: {text}") + + try: + response = self.__searchAlbums(text, count, offset) + playlists = self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + @staticmethod + def save_music(song: Song) -> str: + """ + Save song to '{workDirectory}/Music/{songname}.mp3'. + + Args: + song (Song): 'Song' instance obtained from 'Service' methods. + + Returns: + str: relative path of downloaded music. + """ + song.to_safe() + file_name_mp3 = f"{song}.mp3" + url = song.url + + if url == "": + logger.warning("Url no found") + return + + response = requests.get(url=url) + if response.status_code == 200: + if not os.path.exists("Music"): + os.makedirs("Music") + logger.info("Folder 'Music' was created") + + file_path = os.path.join(os.getcwd(), "Music", file_name_mp3) + + if not os.path.exists(file_path): + if "index.m3u8" in url: + logger.error(".m3u8 detected!") + return + else: + logger.warning( + f"File with name {file_name_mp3} exists. Overwrite it? (Y/n)" + ) + + res = input().lower() + if res.lower() != "y" and res.lower() != "yes": + return + + response.close() + logger.info(f"Downloading {song}...") + with open(file_path, "wb") as output_file: + output_file.write(response.content) + + logger.info(f"Success! Music was downloaded in '{file_path}'") + return file_path diff --git a/plugins/VKMusic/vkpymusic/ServiceAsync.py b/plugins/VKMusic/vkpymusic/ServiceAsync.py new file mode 100644 index 0000000..1580bb7 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/ServiceAsync.py @@ -0,0 +1,513 @@ +import os, configparser, logging + +from httpx import AsyncClient, Response + +import aiofiles + +from .Logger import get_logger +from .Playlist import Playlist +from .Song import Song + +logger: logging = get_logger(__name__) + + +class ServiceAsync: + def __init__(self, user_agent: str, token: str): + self.user_agent = user_agent + self.__token = token + + @classmethod + def parse_config(cls, filename: str = "config_vk.ini"): + """ + Create an instance of Service from config. + + Args: + filename (str): Filename of config (default value = "config_vk.ini"). + """ + dirname = os.path.dirname(__file__) + configfile_path = os.path.join(dirname, filename) + + try: + config = configparser.ConfigParser() + config.read(configfile_path, encoding="utf-8") + + user_agent = config["VK"]["user_agent"] + token = config["VK"]["token_for_audio"] + + return ServiceAsync(user_agent, token) + except Exception as e: + logger.warning(e) + + @staticmethod + def del_config(filename: str = "config_vk.ini"): + """ + Delete config created by 'TokenReceiver'. + + Args: + filename (str): Filename of config (default value = "config_vk.ini"). + """ + dirname = os.path.dirname(__file__) + configfile_path = os.path.join(dirname, filename) + + try: + os.remove(configfile_path) + logger.info("Config successful deleted!") + except Exception as e: + logger.warning(e) + + ########################## + # COMMON REQUEST FOR AUDIO + + async def __get_response( + self, method: str, params: list[tuple[str, str or int]] + ) -> Response: + api_headers = {"User-Agent": self.user_agent} + api_url = f"https://api.vk.com/method/audio.{method}" + api_parameters = [ + ("access_token", self.__token), + ("https", 1), + ("lang", "ru"), + ("extended", 1), + ("v", "5.131"), + ] + + for pair in params: + api_parameters.append(pair) + + # session = ClientSession() + session = AsyncClient() + session.headers.update(api_headers) + response = await session.post( + url=api_url, + params=api_parameters + # ssl=ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) + ) + await session.aclose() + return response + + ############## + # ANY REQUESTS + + async def __getCount(self, user_id: int) -> Response: + params = [ + ("owner_id", user_id), + ] + + return await self.__get_response("getCount", params) + + async def __get( + self, + user_id: int, + count: int = 100, + offset: int = 0, + playlist_id: int or None = None, + access_key: str or None = None, + ) -> Response: + params = [ + ("owner_id", user_id), + ("count", count), + ("offset", offset), + ] + + if playlist_id: + params.append(("album_id", playlist_id)) + params.append(("access_key", access_key)) + + return await self.__get_response("get", params) + + async def __search(self, text: str, count: int = 100, + offset: int = 0) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ("sort", 0), + ("autocomplete", 1), + ] + + return await self.__get_response("search", params) + + async def __getPlaylists( + self, user_id: int, count: int = 50, offset: int = 0 + ) -> Response: + params = [ + ("owner_id", user_id), + ("count", count), + ("offset", offset), + ] + + return await self.__get_response("getPlaylists", params) + + async def __searchPlaylists( + self, text: str, count: int = 50, offset: int = 0 + ) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ] + + return await self.__get_response("searchPlaylists", params) + + async def __searchAlbums(self, text: str, count: int = 50, + offset: int = 0) -> Response: + params = [ + ("q", text), + ("count", count), + ("offset", offset), + ] + + return await self.__get_response("searchAlbums", params) + + ############ + # CONVERTERS + + async def __response_to_songs(self, response: Response) \ + -> list[Song]: + response = response.json() + count = 0 + try: + items = response["response"]["items"] + except Exception as e: + logger.error(e) + + songs: list[Song] = [] + for item in items: + song = Song.from_json(item) + songs.append(song) + return songs + + async def __response_to_playlists(self, response: Response): + # response = json.loads(response.content.decode("utf-8")) + response = response.json(encoding="utf-8") + try: + items = response["response"]["items"] + except Exception as e: + logger.error(e) + playlists: list[Playlist] = [] + for item in items: + playlist = Playlist.from_json(item) + playlists.append(playlist) + return playlists + + ############## + # MAIN METHODS + + async def get_count_by_user_id(self, user_id: str or int) -> int: + """ + Get count of all user's songs. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + + Returns: + int: count of all user's songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response = await self.__getCount(user_id) + # data = json.loads(response.content.decode("utf-8")) + data = response.json(encoding="utf-8") + songs_count = int(data["response"]) + except Exception as e: + logger.error(e) + return + + logger.info(f"Count of user's songs: {songs_count}") + return songs_count + + async def get_songs_by_userid( + self, user_id: str or int, count: int = 100, offset: int = 0 + ) -> list[Song]: + """ + Search songs by owner/user id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response: Response = await self.__get(user_id, count, offset) + songs = await self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + async def get_songs_by_playlist_id( + self, + user_id: str or int, + playlist_id: int, + access_key: str, + count: int = 100, + offset: int = 0, + ) -> list[Song]: + """ + Get songs by playlist id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + playlist_id (int): VK playlist id. (Take it from methods for playlist). + access_key (str): VK access key. (Take it from methods for playlist). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response: Response = await self.__get( + user_id, count, offset, playlist_id, access_key + ) + songs = await self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + async def get_songs_by_playlist( + self, playlist: Playlist, count: int = 10, offset: int = 0 + ) -> list[Song]: + """ + Get songs by instance of 'Playlist'. + + Args: + playlist (Playlist): Instance of 'Playlist' (take from methods for receiving Playlist). + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + logger.info(f"Request by playlist: {playlist}") + + try: + response: Response = await self.__get( + playlist.owner_id, + count, + offset, + playlist.playlist_id, + playlist.access_key, + ) + songs = await self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + async def search_songs_by_text( + self, text: str, count: int = 3, offset: int = 0 + ) -> list[Song]: + """ + Search songs by text/query. + + Args: + text (str): Text of query. Can be title of song, author, etc. + count (int): Count of resulting songs (for VK API: default/max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Song]: List of songs. + """ + logger.info(f'Request by text: "{text}" в количестве {count}') + + try: + response: Response = await self.__search(text, count, offset) + songs = await self.__response_to_songs(response) + except Exception as e: + logger.error(e) + return + + if len(songs) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, song in enumerate(songs, start=1): + logger.info(f"{i}) {song}") + return songs + + async def get_playlists_by_userid( + self, user_id: str or int, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Get playlist by owner/user id. + + Args: + user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******). + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of playlists. + """ + user_id = int(user_id) + logger.info(f"Request by user: {user_id}") + + try: + response: Response = await self.__getPlaylists(user_id, count, + offset) + playlists = await self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + async def search_playlists_by_text( + self, text: str, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Search playlists by text/query. + Playlist - it user's collection of songs. + + Args: + text (str): Text of query. Can be title of playlist, genre, etc. + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of playlists. + """ + logger.info(f"Request by text: {text}") + + try: + response: Response = await self.__searchPlaylists(text, count, + offset) + playlists = await self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + async def search_albums_by_text( + self, text: str, count: int = 5, offset: int = 0 + ) -> list[Playlist]: + """ + Search albums by text/query. + Album - artists's album/collection of songs. + In obj context - same as 'Playlist'. + + Args: + text (str): Text of query. Can be title of album, name of artist, etc. + count (int): Count of resulting playlists (for VK API: default = 50, max = 100). + offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200. + + Returns: + list[Playlist]: List of albums. + """ + logger.info(f"Request by text: {text}") + + try: + response: Response = \ + await self.__searchAlbums(text, count, offset) + playlists = await self.__response_to_playlists(response) + except Exception as e: + logger.error(e) + return + + if len(playlists) == 0: + logger.info("No results found ._.") + else: + logger.info("Results:") + for i, playlist in enumerate(playlists, start=1): + logger.info(f"{i}) {playlist}") + return playlists + + @staticmethod + async def save_music(song: Song, overwrite: bool = False) -> str: + """ + Save song to '{workDirectory}/Music/{songname}.mp3'. + + Args: + song (Song): 'Song' instance obtained from 'Service' methods. + overwrite (bool): Overwrite file if it exists + + Returns: + str: relative path of downloaded music. + """ + song.to_safe() + file_name_mp3 = f"{song}.mp3" + url = song.url + + if url == "": + logger.warning("Url no found") + return + + session = AsyncClient() + response = await session.get(url=url) + + if response.status_code == 200: + if not os.path.exists("Music"): + os.makedirs("Music") + logger.info("Folder 'Music' was created") + + file_path = os.path.join(os.getcwd(), "Music", file_name_mp3) + + if not os.path.exists(file_path): + if "index.m3u8" in url: + logger.error(".m3u8 detected!") + await session.aclose() + return + else: + logger.warning( + f"File with name {file_name_mp3} exists." + ) + if not overwrite: + await session.aclose() + return file_path + + logger.info(f"Downloading {song}...") + async with aiofiles.open(file_path, "wb") as output_file: + await output_file.write(response.read()) + + await response.aclose() + await session.aclose() + + logger.info(f"Success! Music was downloaded in '{file_path}'") + return file_path diff --git a/plugins/VKMusic/vkpymusic/Song.py b/plugins/VKMusic/vkpymusic/Song.py new file mode 100644 index 0000000..9fbe792 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/Song.py @@ -0,0 +1,37 @@ +import re + + +class Song: + def __init__(self, title, artist, duration, track_id, owner_id, url=""): + self.title = title + self.artist = artist + self.duration = duration + self.track_id = track_id + self.owner_id = owner_id + self.url = url + + def __str__(self): + return f"{self.title} - {self.artist}" + + def to_dict(self) -> dict: + return self.__dict__ + + def to_safe(self): + def safe_format(string): + safe_string = re.sub(r"[^A-zА-я0-9+\s]", "", string) + return safe_string + + self.title = safe_format(self.title) + self.artist = safe_format(self.artist) + + @classmethod + def from_json(cls, item): + title = str(item["title"]) + artist = str(item["artist"]) + duration = int(item["duration"]) + track_id = str(item["id"]) + owner_id = str(item["owner_id"]) + url = str(item["url"]) + + song = cls(title, artist, duration, track_id, owner_id, url) + return song diff --git a/plugins/VKMusic/vkpymusic/TokenReceiver.py b/plugins/VKMusic/vkpymusic/TokenReceiver.py new file mode 100644 index 0000000..223dfd7 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/TokenReceiver.py @@ -0,0 +1,256 @@ +from typing import Callable + +import os +import logging + +import json +import requests + +from .Client import clients +from .Logger import get_logger + +logger: logging = get_logger(__name__) + + +def on_captcha_handler(url: str) -> str: + """ + Default handler to captcha. + + Args: + url (str): Url to captcha image. + + Returns: + str: Key/decoded captcha. + """ + logger.info("Are you a bot? You need to enter captcha...") + os.system(url) + captcha_key: str = input("Captcha: ") + return captcha_key + + +def on_2fa_handler() -> str: + """ + Default handler to 2fa. + + Returns: + str: code from VK/SMS. + """ + logger.info( + "SMS with a confirmation code has been sent to your phone! The code is valid for a few minutes!" + ) + code = input("Code: ") + return code + + +def on_invalid_client_handler(): + """ + Default handler to invalid_client. + """ + logger.error("Invalid login or password") + + +def on_critical_error_handler(response_auth_json): + """ + Default handler to ctitical error. + + Args: + response_auth_json (...): Message or object to research. + """ + print(f"on_critical_error: {response_auth_json}") + + +class TokenReceiver: + def __init__(self, login, password, client="Kate"): + self.__login: str = str(login) + self.__password: str = str(password) + + if client in clients: + self.client = clients[client] + else: + self.client = clients["Kate"] + self.__token = None + + def __request_auth(self, code=None, captcha=None): + session = requests.session() + session.headers.update({"User-Agent": self.client.user_agent}) + query_params = [ + ("grant_type", "password"), + ("client_id", self.client.client_id), + ("client_secret", self.client.client_secret), + ("username", self.__login), + ("password", self.__password), + ("scope", "audio,offline"), + ("2fa_supported", 1), + ("force_sms", 1), + ("v", 5.131), + ] + if captcha: + query_params.append(("captcha_sid", captcha[0])) + query_params.append(("captcha_key", captcha[1])) + if code: + query_params.append(("code", code)) + + request = session.post("https://oauth.vk.com/token", data=query_params) + session.close() + return request + + def __request_code(self, sid): + session = requests.session() + session.headers.update({"User-Agent": self.client.user_agent}) + query_params = [("sid", str(sid)), ("v", "5.131")] + response: requests.Response = session.post( + "https://api.vk.com/method/auth.validatePhone", + data=query_params, + allow_redirects=True, + ) + session.close() + + response_json = json.loads(response.content.decode("utf-8")) + + # right_response_json = { + # "response": { + # "type": "general", + # "sid": {str(sid)}, + # "delay": 60, + # "libverify_support": False, + # "validation_type": "sms", + # "validation_resend": "sms" + # } + # } + + return response_json + + def auth( + self, + on_captcha: Callable[[str], str] = on_captcha_handler, + on_2fa: Callable[[], str] = on_2fa_handler, + on_invalid_client: Callable[[], None] = on_invalid_client_handler, + on_critical_error: Callable[..., None] = on_critical_error_handler, + ) -> bool: + """ + Performs authorization using the available login and password. + If necessary, interactively accepts a code from SMS or captcha. + + Args: + on_captcha (Callable[[str], str]): Handler to captcha. Get url image. Return key. + on_2fa (Callable[[], str]): Handler to 2 factor auth. Return captcha. + on_invalid_client (Callable[[], None]): Handler to invalid client. + on_critical_error (Callable[[Any], None]): Handler to critical error. Get response. + + Returns: + bool: Boolean value indicating whether authorization was successful or not. + """ + response_auth: requests.Response = self.__request_auth() + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + while "error" in response_auth_json: + error = response_auth_json["error"] + sid = 0 + + if error == "need_captcha": + captcha_sid: str = response_auth_json["captcha_sid"] + captcha_img: str = response_auth_json["captcha_img"] + + captcha_key: str = on_captcha(captcha_img) + + response_auth = self.__request_auth(captcha=(captcha_sid, captcha_key)) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "need_validation": + sid = response_auth_json["validation_sid"] + + # response2: requests.Response = + self.__request_code(sid) + + # response2_json = json.loads(response2.content.decode('utf-8')) + code: str = on_2fa() + + response_auth = self.__request_auth(code=code) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "invalid_request": + logger.warn("Invalid code. Try again!") + code: str = on_2fa() + + response_auth = self.__request_auth(code=code) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "invalid_client": + del self.__login + del self.__password + on_invalid_client() + return False + else: + del self.__login + del self.__password + on_critical_error(response_auth_json) + self.__on_error(response_auth_json) + return False + if "access_token" in response_auth_json: + del self.__login + del self.__password + access_token = response_auth_json["access_token"] + logger.info("Token was received!") + self.__token = access_token + return True + del self.__login + del self.__password + self.__on_error(response_auth_json) + on_critical_error(response_auth_json) + return False + + def get_token(self) -> str: + """ + Prints token in console (if authorisation was succesful). + """ + token = self.__token + if not token: + logger.warn('Please, first call the method "auth".') + return + logger.info(token) + return token + + def save_to_config(self, file_path: str = "config_vk.ini"): + """ + Save token and user agent data in config (if authorisation was succesful). + + Args: + file_path (str): Filename of config (default value = "config_vk.ini"). + """ + token: str = self.__token + if not token: + logger.warn('Please, first call the method "auth"') + return + full_fp = self.create_path(file_path) + if os.path.isfile(full_fp): + print('File already exist! Enter "OK" for rewriting it') + if input().lower() != "ok": + return + os.makedirs(os.path.dirname(full_fp), exist_ok=True) + with open(full_fp, "w") as output_file: + output_file.write("[VK]\n") + output_file.write(f"user_agent={self.client.user_agent}\n") + output_file.write(f"token_for_audio={token}") + logger.info("Token was saved!") + + @staticmethod + def create_path(file_path: str) -> str: + """ + Create path before and after this for different funcs. + + Args: + file_path (str): Relative path to file. + + Returns: + str: Absolute path to file. + """ + dirname = os.path.dirname(__file__) + path = os.path.join(dirname, file_path) + return path + + @staticmethod + def __on_error(response): + logger.critical( + "Unexpected error! Please, create an issue in repository for solving this problem." + ) + logger.critical(response) diff --git a/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py b/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py new file mode 100644 index 0000000..6e2e42f --- /dev/null +++ b/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py @@ -0,0 +1,210 @@ +from typing import Callable + +import os +import logging + +import json +import requests + +from .Client import clients +from .Logger import get_logger + +logger: logging = get_logger(__name__) + + +class TokenReceiverAsync: + def __init__(self, login, password, client="Kate"): + self.__login: str = str(login) + self.__password: str = str(password) + + if client in clients: + self.client = clients[client] + else: + self.client = clients["Kate"] + self.__token = None + + async def __request_auth(self, code=None, captcha=None): + session = requests.session() + session.headers.update({"User-Agent": self.client.user_agent}) + query_params = [ + ("grant_type", "password"), + ("client_id", self.client.client_id), + ("client_secret", self.client.client_secret), + ("username", self.__login), + ("password", self.__password), + ("scope", "audio,offline"), + ("2fa_supported", 1), + ("force_sms", 1), + ("v", 5.131), + ] + if captcha: + query_params.append(("captcha_sid", captcha[0])) + query_params.append(("captcha_key", captcha[1])) + if code: + query_params.append(("code", code)) + + request = session.post("https://oauth.vk.com/token", data=query_params) + session.close() + return request + + async def __request_code(self, sid): + session = requests.session() + session.headers.update({"User-Agent": self.client.user_agent}) + query_params = [("sid", str(sid)), ("v", "5.131")] + response: requests.Response = session.post( + "https://api.vk.com/method/auth.validatePhone", + data=query_params, + allow_redirects=True, + ) + session.close() + + response_json = json.loads(response.content.decode("utf-8")) + + # right_response_json = { + # "response": { + # "type": "general", + # "sid": {str(sid)}, + # "delay": 60, + # "libverify_support": False, + # "validation_type": "sms", + # "validation_resend": "sms" + # } + # } + + return response_json + + async def auth( + self, + on_captcha: Callable[[str], str], + on_2fa: Callable[[], str], + on_invalid_client: Callable[[], None], + on_critical_error: Callable[..., None], + ) -> bool: + """ + Performs ASYNC authorization using the available login and password. + If necessary, interactively accepts a code from SMS or captcha. + + Args: + on_captcha (Callable[[str], str]): ASYNC handler to captcha. Get url image. Return key. + on_2fa (Callable[[], str]): ASYNC handler to 2 factor auth. Return captcha. + on_invalid_client (Callable[[], None]): ASYNC handler to invalid client. + on_critical_error (Callable[[Any], None]): ASYNC handler to crit error. Get response. + + Returns: + bool: Boolean value indicating whether authorization was successful or not. + """ + response_auth: requests.Response = await self.__request_auth() + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + while "error" in response_auth_json: + error = response_auth_json["error"] + + if error == "need_captcha": + captcha_sid: str = response_auth_json["captcha_sid"] + captcha_img: str = response_auth_json["captcha_img"] + + captcha_key: str = await on_captcha(captcha_img) + + response_auth = await self.__request_auth( + captcha=(captcha_sid, captcha_key) + ) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "need_validation": + sid = response_auth_json["validation_sid"] + + # response2: requests.Response = + await self.__request_code(sid) + + # response2_json = json.loads(response2.content.decode('utf-8')) + code: str = await on_2fa() + + response_auth = await self.__request_auth(code=code) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "invalid_request": + logger.warn("Invalid code. Try again!") + code: str = await on_2fa() + + response_auth = await self.__request_auth(code=code) + response_auth_json = json.loads(response_auth.content.decode("utf-8")) + + elif error == "invalid_client": + del self.__login + del self.__password + await on_invalid_client() + return False + else: + del self.__login + del self.__password + await on_critical_error(response_auth_json) + self.__on_error(response_auth_json) + return False + if "access_token" in response_auth_json: + del self.__login + del self.__password + access_token = response_auth_json["access_token"] + logger.info("Token was received!") + self.__token = access_token + return True + del self.__login + del self.__password + self.__on_error(response_auth_json) + await on_critical_error(response_auth_json) + return False + + def get_token(self) -> str: + """ + Prints token in console (if authorisation was succesful). + """ + token = self.__token + if not token: + logger.warn('Please, first call the method "auth".') + return + logger.info(token) + return token + + def save_to_config(self, file_path: str = "config_vk.ini"): + """ + Save token and user agent data in config (if authorisation was succesful). + + Args: + file_path (str): Filename of config (default value = "config_vk.ini"). + """ + token: str = self.__token + if not token: + logger.warn('Please, first call the method "auth"') + return + full_fp = self.create_path(file_path) + if os.path.isfile(full_fp): + print('File already exist! Enter "OK" for rewriting it') + if input().lower() != "ok": + return + os.makedirs(os.path.dirname(full_fp), exist_ok=True) + with open(full_fp, "w") as output_file: + output_file.write("[VK]\n") + output_file.write(f"user_agent={self.client.user_agent}\n") + output_file.write(f"token_for_audio={token}") + logger.info("Token was saved!") + + @staticmethod + def create_path(file_path: str) -> str: + """ + Create path before and after this for different funcs. + + Args: + file_path (str): Relative path to file. + + Returns: + str: Absolute path to file. + """ + dirname = os.path.dirname(__file__) + path = os.path.join(dirname, file_path) + return path + + @staticmethod + def __on_error(response): + logger.critical( + "Unexpected error! Please, create an issue in repository for solving this problem." + ) + logger.critical(response) diff --git a/plugins/VKMusic/vkpymusic/__init__.py b/plugins/VKMusic/vkpymusic/__init__.py new file mode 100644 index 0000000..8bf4902 --- /dev/null +++ b/plugins/VKMusic/vkpymusic/__init__.py @@ -0,0 +1,7 @@ +from .TokenReceiver import TokenReceiver +from .TokenReceiverAsync import TokenReceiverAsync +from .Client import clients +from .Service import Service +from .ServiceAsync import ServiceAsync +from .Playlist import Playlist +from .Song import Song diff --git a/pyproject.toml b/pyproject.toml index e081674..00514c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "SeaPlayer" -version = "0.8.4" +version = "0.8.5" description = "SeaPlayer is a player that works in the terminal." repository = "https://github.com/romanin-rf/SeaPlayer" authors = ["Romanin "] @@ -12,10 +12,7 @@ classifiers = [ "Operating System :: Microsoft :: Windows :: Windows 10", "Operating System :: Microsoft :: Windows :: Windows 11", "Operating System :: POSIX :: Linux", - "Operating System :: MacOS", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11" + "Operating System :: MacOS" ] packages = [{ include = "seaplayer" }] include = [ @@ -36,14 +33,14 @@ seaplayer = "seaplayer.__main__:run" seaplug = "seaplayer.plug.__main__:run" [tool.poetry.dependencies] -python = ">=3.9,<3.12" +python = ">=3.8,<3.12" # Custom modules -ripix = "2.4.2" -playsoundsimple-py = "0.8.1.post2" -properties-py = "1.1.0" -urlopen2 = "1.2.0.post1" +ripix = "2.5.0" +playsoundsimple-py = "0.8.4" +properties-py = "1.2.1" +urlopen2 = "1.4.0" # Main Modules -pillow = ">=10" +pillow = ">=9.5" aiofiles = ">=23.1" rich = ">=13" mutagen = ">=1.45" diff --git a/requirements.txt b/requirements.txt index 00d5c31..722a380 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ -ripix==2.4.2 -playsoundsimple-py==0.8.1.post2 -properties-py==1.1.0 -urlopen2==1.2.0.post1 +ripix==2.5.0 +playsoundsimple-py==0.8.4 +properties-py==1.2.1 +urlopen2==1.4.0 -pillow>=10 +pillow>=9.5 aiofiles>=23.1 rich>=13 mutagen>=1.45 diff --git a/seaplayer/codecs/AnySound.py b/seaplayer/codecs/AnySound.py index b654d10..7995678 100644 --- a/seaplayer/codecs/AnySound.py +++ b/seaplayer/codecs/AnySound.py @@ -82,7 +82,7 @@ def from_url( while len(data:=urlfile.read(download_buffer_size)) != 0: tempfile.write(data) - return AnySound(path, **{"is_temp": True, **kwargs}) + return AnySound(path, is_temp=True, **kwargs) @staticmethod async def aio_from_url( @@ -97,4 +97,4 @@ async def aio_from_url( while len(data:=urlfile.read(download_buffer_size)) != 0: await tempfile.write(data) - return AnySound(path, **{"is_temp": True, **kwargs}) + return AnySound(path, is_temp=True, **kwargs) diff --git a/seaplayer/codecs/URLS.py b/seaplayer/codecs/URLS.py index 11aae69..dbabb6d 100644 --- a/seaplayer/codecs/URLS.py +++ b/seaplayer/codecs/URLS.py @@ -70,10 +70,10 @@ async def __aio_sha1__(self, buffer_size: int) -> str: def __init__(self, url: str, sound_device_id: Optional[int]=None, aio_init: bool=False, **kwargs) -> None: self.name = url if not aio_init: - self._sound = AnySound.from_url(self.name, device_id=sound_device_id) + self._sound = AnySound.from_url(self.name, device_id=sound_device_id, **kwargs) @staticmethod async def __aio_init__(url: str, sound_device_id: Optional[int]=None, **kwargs): self = URLSoundCodec(url, aio_init=True) - self._sound = await asyncio.to_thread(AnySound.from_url, url, device_id=sound_device_id) + self._sound = await asyncio.to_thread(AnySound.from_url, url, device_id=sound_device_id, **kwargs) return self diff --git a/seaplayer/config.py b/seaplayer/config.py index 228661d..fa7797a 100644 --- a/seaplayer/config.py +++ b/seaplayer/config.py @@ -7,35 +7,21 @@ # ! Vars DEFAULT_CONFIG_DATA = { - "main": { - "lang": "en-eng" - }, - "sound": { - "sound_font_path": None, # * Optional[str] - "output_sound_device_id": None, # * Optional[int] - }, - "image": { - "image_update_method": "sync", # * Literal["sync", "async"] - "image_resample_method": "bilinear", # * Literal["nearest", "bilinear", "bicubic", "lanczos", "hamming", "box"] - }, - "playback": { - "volume_change_percent": 0.05, - "rewind_count_seconds": 5, - "max_volume_percent": 2.0 - }, - "playlist": { - "recursive_search": False - }, - "keys": { - "quit": "q,й", - "rewind_forward": "*", - "rewind_back": "/", - "volume_up": "+", - "volume_down": "-" - }, - "debag": { - "log_menu_enable": False - } + "main.lang": "en-eng", + "sound.sound_font_path": None, + "sound.output_sound_device_id": None, + "image.image_update_method": "sync", + "image.image_resample_method": "bilinear", + "playback.rewind_count_seconds": 5, + "playback.volume_change_percent": 0.05, + "playback.max_volume_percent": 2.0, + "playlist.recursive_search": False, + "keys.quit": "q,й", + "keys.rewind_forward": "*", + "keys.rewind_back": "/", + "keys.volume_up": "+", + "keys.volume_down": "-", + "debag.log_menu_enable": False } # ! Main Class @@ -43,17 +29,17 @@ class SeaPlayerConfig: @staticmethod def dump(filepath: Path, data: Dict[str, Any]) -> None: with open(filepath, "w", encoding="utf-8", errors="ignore") as file: - properties.dump_tree(data, file) + properties.dump(data, file) @staticmethod def load(filepath: Path, default: Dict[str, Any]) -> Dict[str, Any]: with open(filepath, "r", encoding="utf-8", errors="ignore") as file: try: - return properties.load_tree(file) + return properties.load(file) except: pass with open(filepath, "w", encoding="utf-8", errors="ignore") as file: - properties.dump_tree(default, file) + properties.dump(default, file) return default def refresh(self) -> None: @@ -77,22 +63,12 @@ def __init__( self.config = default_data.copy() self.refresh() - @staticmethod - def tevey(key_path: str, *, sep: str=".") -> str: - return "".join([f"[{repr(key)}]" for key in key_path.split(sep)]) - def get(self, key: str, default: T=None) -> Union[Any, T]: - try: - return eval(f"self.config{self.tevey(key)}") - except: - return default + return self.config.get(key, default) def set(self, key: str, value: Any) -> None: - try: - exec(f"self.config{self.tevey(key)} = value") - self.refresh() - except: - pass + self.config[key] = value + self.refresh() # ! Main @property diff --git a/seaplayer/objects/Buttons.py b/seaplayer/objects/Buttons.py index b3c3057..a378b6d 100644 --- a/seaplayer/objects/Buttons.py +++ b/seaplayer/objects/Buttons.py @@ -1,6 +1,8 @@ +import asyncio +from rich.text import TextType from textual.widgets import Button # > Typing -from typing import Callable, Awaitable +from typing import Optional, Callable, Awaitable # ! Clikable Button Class class ClikableButton(Button): @@ -9,4 +11,16 @@ def __init__(self, label: str, on_click: Callable[[], Awaitable[None]], *args, * self.__on_click_method = on_click async def on_click(self, *args, **kwargs) -> None: - await self.__on_click_method() \ No newline at end of file + await self.__on_click_method() + +# ! Wait Button Click +class WaitButton(Button): + def __init__(self, label: Optional[TextType]=None, queue_max_size: int=1, *args, **kwargs) -> None: + super().__init__(label, *args, **kwargs) + self.__wait_queue: asyncio.Queue[bool] = asyncio.Queue(queue_max_size) + + async def on_click(self, *args, **kwargs) -> None: + await self.__wait_queue.put(True) + + async def wait_click(self) -> bool: + return await self.__wait_queue.get() \ No newline at end of file diff --git a/seaplayer/objects/Input.py b/seaplayer/objects/Input.py index bdb5ddd..12e31c4 100644 --- a/seaplayer/objects/Input.py +++ b/seaplayer/objects/Input.py @@ -1,33 +1,34 @@ from textual.widgets import Input # > Typing -from typing import Optional, Tuple, Any +from typing import Optional, Tuple, Callable, Awaitable, Any # ! InputFiles functions -async def _conv(value: str) -> Tuple[bool, Optional[Any]]: return True, value -async def _submit(input: Input, value: Any) -> None: ... -def _update_placeholder() -> Optional[str]: ... +async def conv(value: str) -> Tuple[bool, Optional[Any]]: return True, value +async def submit(input: Input, value: Any) -> None: ... +def update_placeholder() -> Optional[str]: ... # ! InputField class class InputField(Input): def __init__( self, - conv=_conv, - submit=_submit, - update_placeholder=_update_placeholder, + conv: Callable[[str], Awaitable[Tuple[bool, Optional[Any]]]]=conv, + submit: Callable[[Input, Any], Awaitable[None]]=submit, + update_placeholder: Callable[[], Optional[str]]=update_placeholder, **kwargs ) -> None: super().__init__(**kwargs) - self._conv = conv - self._submit = submit - self._update_placeholder = update_placeholder - if (placeholder:=self._update_placeholder()) is not None: + self.__conv = conv + self.__submit = submit + self.__update_placeholder = update_placeholder + if (placeholder:=self.__update_placeholder()) is not None: self.placeholder = placeholder async def action_submit(self): value = self.value self.value = "" if value.replace(" ", "") != "": - ok, c_value = await self._conv(value) - if ok: await self._submit(self, c_value) - if (placeholder:=self._update_placeholder()) is not None: - self.placeholder = placeholder \ No newline at end of file + ok, c_value = await self.__conv(value) + if ok: + await self.__submit(self, c_value) + if (placeholder:=self.__update_placeholder()) is not None: + self.placeholder = placeholder diff --git a/seaplayer/objects/Labels.py b/seaplayer/objects/Labels.py index 8c5da95..5720a18 100644 --- a/seaplayer/objects/Labels.py +++ b/seaplayer/objects/Labels.py @@ -1,3 +1,4 @@ +import random from textual.widgets import Label, Button from rich.style import Style from rich.console import RenderableType @@ -14,23 +15,31 @@ class FillLabel(Label): width: 1fr; } """ + def __gen_random_color(self) -> None: + return f"#{hex(random.randint(0,16777215))[2:]}" - def _gen(self) -> Segments: - return Segments([Segment(self.__chr, self.__style) for i in range((self.size[0] * self.size[1]))]) + def __gen_segments(self) -> Segments: + segments = [] + for i in range(self.size[0]*self.size[1]): + s = Style.parse(self.__gen_random_color()) if self.__rainbow else self.__style + segments.append(Segment(self.__chr, s)) + return Segments(segments) def __init__( self, char: str="-", style: Optional[Style]=None, + rainbow: bool=False, **kwargs ) -> None: super().__init__(**kwargs) self.__chr = char[:1] + self.__rainbow = rainbow self.__style = style - self.update(self._gen()) + self.update(self.__gen_segments()) async def on_resize(self) -> None: - self.update(self._gen()) + self.update(self.__gen_segments()) # ! Clickable Label Class class ClickableLabel(Label, Button, can_focus=True): diff --git a/seaplayer/objects/PopUp.py b/seaplayer/objects/PopUp.py index e25b05e..1746e49 100644 --- a/seaplayer/objects/PopUp.py +++ b/seaplayer/objects/PopUp.py @@ -1,4 +1,10 @@ +from textual.widget import Widget +from textual.widgets import Label from textual.containers import Container +# > Typing +from typing import Optional +# > Local Imports +from .Labels import FillLabel # ! Main Class class PopUp(Container): @@ -6,6 +12,7 @@ class PopUp(Container): PopUp { layer: popup; background: $background; + text-align: center; align-vertical: middle; align-horizontal: center; content-align: center middle; @@ -14,5 +21,50 @@ class PopUp(Container): padding: 1 2 1 2; } """ - pass + +# ! Other Clases +class PopUpWindow(PopUp): + DEFAULT_CSS = """ + PopUpWindow { + layer: popup; + align-vertical: middle; + align-horizontal: center; + content-align: center middle; + text-align: center; + } + PopUpWindow FillLabel { + width: auto; + height: 1; + } + """ + def __init__( + self, + *children: Widget, + title: Optional[str]=None, + name: Optional[str]=None, + id: Optional[str]=None, + classes: Optional[str]=None, + disabled: bool=False + ) -> None: + self.__title = title or self.__class__.__name__ + self.__title_label = Label(self.__title) + super().__init__( + self.__title_label, + FillLabel('─'), + *children, + name=name, + id=id, + classes=classes, + disabled=disabled + ) + + # ! Propertys + @property + def title(self) -> str: + return self.__title + + @title.setter + def title(self, value: str) -> str: + self.__title = str(value) + self.__title_label.update(self.__title) \ No newline at end of file diff --git a/seaplayer/objects/__init__.py b/seaplayer/objects/__init__.py index 14ebba6..653c905 100644 --- a/seaplayer/objects/__init__.py +++ b/seaplayer/objects/__init__.py @@ -1,6 +1,7 @@ from .Log import LogMenu from .Labels import FillLabel from .Input import InputField +from .Buttons import ClikableButton, WaitButton from .Notification import Nofy, CallNofy from .ProgressBar import IndeterminateProgress from .DataOptions import DataOption, DataOptionList @@ -8,5 +9,5 @@ from .MusicList import MusicListView, MusicListViewItem from .Configurate import ConfigurateListItem, ConfigurateList from .Radio import DataRadioSet, DataRadioButton +from .PopUp import PopUp, PopUpWindow from .Rheostat import Rheostat -from .PopUp import PopUp diff --git a/seaplayer/plug/cli/cli.py b/seaplayer/plug/cli/cli.py index 4fd9c64..3e4b1ba 100644 --- a/seaplayer/plug/cli/cli.py +++ b/seaplayer/plug/cli/cli.py @@ -46,7 +46,7 @@ def listing(): for n, info in enumerate(get_plugins_info(), 1): status = "[green]Enabled[/green]" if plugin_config.is_enable_plugin(info) else "[red]Disabled[/red]" console.print( - f"[cyan]{n}[/cyan]. [green]{info.name}[/green] ([green]{info.name_id}[/green]) [cyan]v{info.version}[/cyan] from [yellow]{info.author}[/yellow] ({status})" + f"[cyan]{n}[/cyan]. [green]{info.name}[/green] ([green]{info.name_id}[/green]) [#60fdff]v{info.version}[/#60fdff] from [yellow]{info.author}[/yellow] ({status})" ) else: console.print(f"[yellow]The list of plugins is [blue]empty[/blue].[/yellow]") diff --git a/seaplayer/plug/cli/functions.py b/seaplayer/plug/cli/functions.py index a509fd5..2ce379d 100644 --- a/seaplayer/plug/cli/functions.py +++ b/seaplayer/plug/cli/functions.py @@ -5,9 +5,12 @@ # > Local Import's from .functions import * from ..pluginbase import PluginInfo -from ..pluginloader import PluginLoaderConfigManager, PluginLoader +from ..pluginloader import PluginLoaderConfigManager, PluginLoader, load_plugin_info from ...units import PLUGINS_CONFIG_PATH, PLUGINS_DIRPATH +# ! Vars +console = Console() + # ! Functions def init_config() -> None: os.makedirs(PLUGINS_DIRPATH, mode=0o755, exist_ok=True) @@ -36,9 +39,9 @@ def get_plugins_info() -> List[PluginInfo]: plugins_infos = [] for plugin_init_path, plugin_info_path, plugin_deps_path in PluginLoader.search_plugins_paths(): try: - plugins_infos.append(PluginLoader.load_plugin_info(plugin_info_path)) + plugins_infos.append(load_plugin_info(plugin_info_path)) except: - pass + console.print_exception() return plugins_infos def is_plugin_dirpath(dirpath: str) -> bool: diff --git a/seaplayer/plug/pluginbase.py b/seaplayer/plug/pluginbase.py index a2db8ab..8e78f25 100644 --- a/seaplayer/plug/pluginbase.py +++ b/seaplayer/plug/pluginbase.py @@ -58,5 +58,8 @@ def on_run(self): async def on_compose(self): pass + async def on_ready(self): + pass + async def on_quit(self): pass diff --git a/seaplayer/plug/pluginbase.pyi b/seaplayer/plug/pluginbase.pyi index 7b0c36f..85fc239 100644 --- a/seaplayer/plug/pluginbase.pyi +++ b/seaplayer/plug/pluginbase.pyi @@ -34,4 +34,5 @@ class PluginBase: def on_init(self) -> None: ... def on_run(self) -> None: ... async def on_compose(self) -> None: ... + async def on_ready(self) -> None: ... async def on_quit(self) -> None: ... diff --git a/seaplayer/plug/pluginloader.py b/seaplayer/plug/pluginloader.py index 214b954..9bdb554 100644 --- a/seaplayer/plug/pluginloader.py +++ b/seaplayer/plug/pluginloader.py @@ -275,6 +275,13 @@ async def on_compose(self) -> None: except: self.app.error(f"Failed to do [green]`await on_compose`[/green] in: {i}") + async def on_ready(self) -> None: + async for i in aiter(self.on_plugins): + try: + await i.on_ready() + except: + self.app.error(f"Failed to do [green]`await on_compose`[/green] in: {i}") + async def on_quit(self) -> None: async for i in aiter(self.on_plugins): try: diff --git a/seaplayer/plug/pluginloader.pyi b/seaplayer/plug/pluginloader.pyi index 8d82d02..d199171 100644 --- a/seaplayer/plug/pluginloader.pyi +++ b/seaplayer/plug/pluginloader.pyi @@ -88,4 +88,5 @@ class PluginLoader: def on_init(self) -> None: ... def on_run(self) -> None: ... async def on_compose(self) -> None: ... + async def on_ready(self) -> None: ... async def on_quit(self) -> None: ... diff --git a/seaplayer/seaplayer.py b/seaplayer/seaplayer.py index aeb88fb..f146fac 100644 --- a/seaplayer/seaplayer.py +++ b/seaplayer/seaplayer.py @@ -343,36 +343,33 @@ async def _spm(input: InputField, value: Any) -> None: group="CONTROL_UPDATER-LOOP", description="Control of playback modes and status updates." ) - if ENABLE_PLUGIN_SYSTEM: - self.run_worker( - self.plugin_loader.on_compose, - name="ON_COMPOSE", - group="PluginLoader", - description="" - ) self.info("---") # ! Currect Sound Controls async def currect_sound_stop(self, sound: Optional[CodecBase]=None): - if sound is None: sound = await self.aio_gcs() + if sound is None: + sound = await self.aio_gcs() if sound is not None: self.last_playback_status = 0 sound.stop() async def currect_sound_play(self, sound: Optional[CodecBase]=None): - if sound is None: sound = await self.aio_gcs() + if sound is None: + sound = await self.aio_gcs() if sound is not None: self.last_playback_status = 1 sound.play() async def currect_sound_pause(self, sound: Optional[CodecBase]=None): - if sound is None: sound = await self.aio_gcs() + if sound is None: + sound = await self.aio_gcs() if sound is not None: self.last_playback_status = 3 sound.pause() async def currect_sound_unpause(self, sound: Optional[CodecBase]=None): - if sound is None: sound = await self.aio_gcs() + if sound is None: + sound = await self.aio_gcs() if sound is not None: self.last_playback_status = 1 sound.unpause() @@ -406,20 +403,26 @@ async def set_sound_for_playback( async def add_sounds_to_list(self) -> None: added_oks = 0 loading_nofy = await self.aio_callnofy(self.ll.get("nofys.sound.found").format(count=len(self.last_handlered_values))) - async for path in aiter(self.last_handlered_values): + self.CODECS.sort(key=lambda x: x.codec_priority) + async for value in aiter(self.last_handlered_values): sound = None async for codec in aiter(self.CODECS): + self.info(f"Attempt to load via {repr(codec)}") try: - if await codec.aio_is_this_codec(path): - if not hasattr(codec, "__aio_init__"): + if hasattr(codec, "aio_is_this_codec"): + this_codec = await codec.aio_is_this_codec(value) + else: + this_codec = codec.is_this_codec(value) + if this_codec: + if hasattr(codec, "__aio_init__"): try: - sound: CodecBase = codec(path, **self.CODECS_KWARGS) + sound: CodecBase = await codec.__aio_init__(value, **self.CODECS_KWARGS) except Exception as e: self.exception(e) sound = None else: try: - sound: CodecBase = await codec.__aio_init__(path, **self.CODECS_KWARGS) + sound: CodecBase = codec(value, **self.CODECS_KWARGS) except Exception as e: self.exception(e) sound = None @@ -430,14 +433,14 @@ async def add_sounds_to_list(self) -> None: added_oks += 1 break except FileNotFoundError: - self.error(f"The file does not exist or is a directory: {repr(path)}") + self.error(f"The file does not exist or is a directory: {repr(value)}") break except OSError: pass except Exception as e: self.exception(e) if sound is None: - self.error(f"The sound could not be loaded: {repr(path)}") + self.error(f"The sound could not be loaded: {repr(value)}") await loading_nofy.remove() self.info(f"Added [cyan]{added_oks}[/cyan] songs!") await self.aio_nofy(self.ll.get("nofys.sound.added").format(count=added_oks)) @@ -451,7 +454,8 @@ async def on_button_pressed(self, event: Button.Pressed) -> None: @on(Button.Pressed, "#button-pause") async def bp_pause_unpause(self) -> None: if (sound:=await self.aio_gcs()) is not None: - if sound.playing: await self.currect_sound_pause(sound) + if sound.playing: + await self.currect_sound_pause(sound) await self.aio_update_select_label(sound) @on(Button.Pressed, "#button-play-stop") @@ -529,10 +533,27 @@ async def action_quit(self): sound.stop() return await super().action_quit() + # ! On Functions + async def on_compose(self) -> None: + if ENABLE_PLUGIN_SYSTEM: + self.run_worker( + self.plugin_loader.on_compose, + name="ON_COMPOSE", + group="PluginLoader", + description="" + ) + + def on_ready(self, *args, **kwargs) -> None: + if ENABLE_PLUGIN_SYSTEM: + self.run_worker( + self.plugin_loader.on_ready, + name="ON_READY", + group="PluginLoader", + description="" + ) + # ! Other def run(self, *args, **kwargs): if ENABLE_PLUGIN_SYSTEM: self.plugin_loader.on_run() - self.CODECS.sort(key=lambda x: x.codec_priority) super().run(*args, **kwargs) - diff --git a/seaplayer/units.py b/seaplayer/units.py index 7639e82..6eaf174 100644 --- a/seaplayer/units.py +++ b/seaplayer/units.py @@ -6,7 +6,7 @@ # ! Metadata __title__ = "SeaPlayer" -__version__ = "0.8.4" +__version__ = "0.8.5" __author__ = "Romanin" __email__ = "semina054@gmail.com" __url__ = "https://github.com/romanin-rf/SeaPlayer"