diff --git a/build.py b/build.py
index f2c17c9..eae117d 100644
--- a/build.py
+++ b/build.py
@@ -39,6 +39,7 @@
"seaplayer/objects/Notification.py": "seaplayer/objects/",
"seaplayer/objects/ProgressBar.py": "seaplayer/objects/",
"seaplayer/objects/Radio.py": "seaplayer/objects/",
+ "seaplayer/objects/PopUp.py": "seaplayer/objects/",
# * 1.2) Types
"seaplayer/types": "seaplayer/types/",
"seaplayer/types/__init__.py": "seaplayer/types/",
diff --git a/changelog.md b/changelog.md
index ee3d317..e7a22cb 100644
--- a/changelog.md
+++ b/changelog.md
@@ -3,7 +3,8 @@
| Version | Date | Tag | Changelog |
| ------- | ---- | --- | --------- |
-| [v0.8.4](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.4) | 11.12.2023 | **STABLE** | - Added widgets: `Rheostat`, `ClickableLabel`
- Added widget: Rheostat
- Added a new widget: `PopUp`
- Fixed all language
- More attempts to make `Confiturate` screen clearer
- Moved all `CSS` from `objects.tcss` to `DEFAULT_CSS` separately for each widget |
+| [v0.8.5](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.5) | 16.12.2023 | **STABLE** | - Added support for **python3.8**
- Added `on_ready` abstract methods for plugins
- Added new widgets: `PopUpWindow`, `WaitButton`
- Fixed method `add_sounds_to_list`
- Fixed `seaplayer.plug.cli`
- Fixed `build.py`
- Updated all child custom modules |
+| [v0.8.4](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.4) | 11.12.2023 | **DEPRECATED** | - Added widgets: `Rheostat`, `ClickableLabel`
- Added widget: Rheostat
- Added a new widget: `PopUp`
- Fixed all language
- More attempts to make `Confiturate` screen clearer
- Moved all `CSS` from `objects.tcss` to `DEFAULT_CSS` separately for each widget |
| [v0.8.3](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.3) | 10.12.2023 | **STABLE** | - Added plugin `VKMusic`
- Added priority system for codecs
- Added system handhers of value
- Fixed `build.py`
- Moved method `load_plugin_info` in `seaplayer.plug.pluginloader` |
| [v0.8.2](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.2) | 08.12.2023 | **STABLE** | - Added language merge (for translating plugins)
- Fixed in translation files (`Log Menu Enable` -> `Logging` )
- Changed `object.css` (classes will no longer be used to specify standard properties)
- Improved widget `FillLabel` |
| [v0.8.1](https://github.com/romanin-rf/SeaPlayer/releases/tag/v0.8.1) | 07.12.2023 | **STABLE** | - Revisioned of the LanguageLoader
- Added new language: `Українська` |
diff --git a/plugins/VKMusic/__init__.py b/plugins/VKMusic/__init__.py
index 79e85af..9b3eca3 100644
--- a/plugins/VKMusic/__init__.py
+++ b/plugins/VKMusic/__init__.py
@@ -1,13 +1,15 @@
import logging
from rich.console import Console
-from rich.prompt import Prompt
+from textual.widgets import Input, Label
from seaplayer.plug import PluginBase
-from vkpymusic import Service, TokenReceiver
+from seaplayer.objects import PopUpWindow, WaitButton
+from .vkpymusic import Service, TokenReceiverAsync
+# > Typing
+from typing import Tuple
# > Local Imports
from .vkmcodec import VKMCodec
from .units import (
pcheck,
- VKM_MAIN_PATTERN,
VKM_RANGE_PATTERN
)
@@ -37,24 +39,76 @@ def exist_token(self) -> bool:
def on_init(self) -> None:
self.configurated = self.exist_token()
- def on_run(self) -> None:
+ async def __req_login_password(self) -> Tuple[str, str]:
+ lppw = PopUpWindow(
+ ilogin:=Input(placeholder="Login"),
+ ipassword:=Input(placeholder="Password", password=True),
+ elpb:=WaitButton("Log In"),
+ title="VKMusic Authentication"
+ )
+ await self.app.mount(lppw)
+ await elpb.wait_click()
+ login, password = ilogin.value, ipassword.value
+ await lppw.remove()
+ return login, password
+
+ async def __req_2fa(self) -> str:
+ cpw = PopUpWindow(
+ icode:=Input(placeholder="Code"),
+ ecb:=WaitButton("Enter"),
+ title="VKMusic Authentication"
+ )
+ await self.app.mount(cpw)
+ await ecb.wait_click()
+ code = icode.value
+ await cpw.remove()
+ return code
+
+ async def __req_capcha(self, url: str) -> str:
+ cpw = PopUpWindow(
+ Label(f"[link={url}]{url}[/link]"),
+ icapcha:=Input(placeholder="Capcha"),
+ ecb:=WaitButton("Enter"),
+ title="VKMusic Authentication"
+ )
+ await self.app.mount(cpw)
+ await ecb.wait_click()
+ code = icapcha
+ await cpw.remove()
+ return code
+
+ async def __req_invalid_client(self) -> None:
+ pass
+
+ async def __req_critical_error(self) -> None:
+ pass
+
+ async def __init_service__(self) -> None:
if self.configurated:
self.service = Service.parse_config()
else:
- login, password = Prompt.ask("Login"), Prompt.ask("Password", password=True)
while not self.configurated:
- tr = TokenReceiver(login, password)
- if tr.auth(on_2fa=lambda: Prompt.ask("Code 2FA")):
+ login, password = await self.__req_login_password()
+ tr = TokenReceiverAsync(login, password)
+ if await tr.auth(
+ self.__req_capcha,
+ self.__req_2fa,
+ self.__req_invalid_client,
+ self.__req_critical_error
+ ):
tr.save_to_config()
self.configurated = True
else:
- console.print("[red]Failed to get a token, repeat...[/red]")
+ self.app.error("Failed to get a token, repeat...")
self.service = Service.parse_config()
self.app.info(f"Service is worked: {repr(self.service)}")
- # ! Registration
- self.app.CODECS_KWARGS["vkm_service"] = self.service
+ # ? Registration
+ self.app.CODECS_KWARGS.update({"vkm_service": self.service})
self.add_value_handlers(vkm_value_handler)
self.add_codecs(VKMCodec)
+
+ async def on_ready(self):
+ self.app.run_worker(self.__init_service__, group=self.info.name_id)
# ! Registeration
-__plugin__ = VKMusic
\ No newline at end of file
+__plugin__ = VKMusic
diff --git a/plugins/VKMusic/info.json b/plugins/VKMusic/info.json
index a0d4751..a855108 100644
--- a/plugins/VKMusic/info.json
+++ b/plugins/VKMusic/info.json
@@ -1,7 +1,7 @@
{
"name": "VK Music",
"name_id": "seaplayer.plugins.vk.music",
- "version": "0.2.0",
+ "version": "0.3.0.dev1",
"author": "Romanin",
"description": "Music downloader from VK.",
"url": "https://github.com/romanin-rf/SeaPlayer"
diff --git a/plugins/VKMusic/requirements.txt b/plugins/VKMusic/requirements.txt
index 17375c6..0124341 100644
--- a/plugins/VKMusic/requirements.txt
+++ b/plugins/VKMusic/requirements.txt
@@ -1,2 +1,2 @@
vkpymusic>=2.2.4
-vbml>=1.1
\ No newline at end of file
+vbml>=1.1
diff --git a/plugins/VKMusic/vkmcodec.py b/plugins/VKMusic/vkmcodec.py
index aee3ee6..ffbe833 100644
--- a/plugins/VKMusic/vkmcodec.py
+++ b/plugins/VKMusic/vkmcodec.py
@@ -1,5 +1,5 @@
import asyncio
-from vkpymusic import Service
+from .vkpymusic import Service
from seaplayer.codecs.URLS import URLSoundCodec
from seaplayer.codecs.AnySound import AnySound
# > Typing
diff --git a/plugins/VKMusic/vkpymusic/Client.py b/plugins/VKMusic/vkpymusic/Client.py
new file mode 100644
index 0000000..72dcb22
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/Client.py
@@ -0,0 +1,14 @@
+class Client:
+ def __init__(self, user_agent, client_id, client_secret):
+ self.user_agent = user_agent
+ self.client_id = client_id
+ self.client_secret = client_secret
+
+
+KateMobile = Client(
+ user_agent='KateMobileAndroid/56 lite-460 (Android 4.4.2; SDK 19; x86; unknown Android SDK built for x86; en)',
+ client_id='2685278',
+ client_secret='lxhD8OD7dMsqtXIm5IUY'
+)
+
+clients = {'Kate': KateMobile}
diff --git a/plugins/VKMusic/vkpymusic/Logger.py b/plugins/VKMusic/vkpymusic/Logger.py
new file mode 100644
index 0000000..aec8f77
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/Logger.py
@@ -0,0 +1,39 @@
+import os, datetime, logging
+
+
+class bcolors:
+ CRITICAL = "\033[95m"
+ OKBLUE = "\033[94m"
+ OKCYAN = "\033[96m"
+ OKGREEN = "\033[92m"
+ WARNING = "\033[93m"
+ ERROR = "\033[91m"
+ ENDC = "\033[0m"
+
+
+_log_format = "%(asctime)s | [%(name)s | (%(filename)s) .%(funcName)s(%(lineno)d)] [%(levelname)s] %(message)s"
+
+
+def get_file_handler():
+ file_path = f"logs/vkpymusic_{datetime.date.today()}.log"
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
+
+ file_handler = logging.FileHandler(file_path)
+ file_handler.setLevel(logging.WARNING)
+ file_handler.setFormatter(logging.Formatter(_log_format))
+ return file_handler
+
+
+def get_stream_handler():
+ stream_handler = logging.StreamHandler()
+ stream_handler.setLevel(logging.INFO)
+ stream_handler.setFormatter(logging.Formatter(_log_format))
+ return stream_handler
+
+
+def get_logger(name):
+ logger = logging.getLogger(name)
+ logger.setLevel(logging.INFO)
+ logger.addHandler(get_file_handler())
+ logger.addHandler(get_stream_handler())
+ return logger
diff --git a/plugins/VKMusic/vkpymusic/Playlist.py b/plugins/VKMusic/vkpymusic/Playlist.py
new file mode 100644
index 0000000..34c004a
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/Playlist.py
@@ -0,0 +1,32 @@
+class Playlist:
+ def __init__(
+ self, title, description, photo, count, owner_id, playlist_id, access_key
+ ):
+ self.title = title
+ self.description = description
+ self.photo = photo
+ self.count = count
+ self.owner_id = owner_id
+ self.playlist_id = playlist_id
+ self.access_key = access_key
+
+ def __str__(self):
+ return f"{self.title} ({self.count} tracks)"
+
+ def to_dict(self) -> dict:
+ return self.__dict__
+
+ @classmethod
+ def from_json(cls, item):
+ title = str(item["title"])
+ description = str(item["description"])
+ photo = str(item["photo"]["photo_1200"])
+ count = int(item["count"])
+ owner_id = int(item["owner_id"])
+ playlist_id = int(item["id"])
+ access_key = str(item["access_key"])
+
+ playlist = cls(
+ title, description, photo, count, owner_id, playlist_id, access_key
+ )
+ return playlist
diff --git a/plugins/VKMusic/vkpymusic/Service.py b/plugins/VKMusic/vkpymusic/Service.py
new file mode 100644
index 0000000..dd9b81d
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/Service.py
@@ -0,0 +1,498 @@
+import os, configparser, json, logging
+
+from requests import Response, Session
+import requests
+
+from .Logger import get_logger
+from .Playlist import Playlist
+from .Song import Song
+
+
+logger: logging = get_logger(__name__)
+
+
+class Service:
+ def __init__(self, user_agent: str, token: str):
+ self.user_agent = user_agent
+ self.__token = token
+
+ @classmethod
+ def parse_config(cls, filename: str = "config_vk.ini"):
+ """
+ Create an instance of Service from config.
+
+ Args:
+ filename (str): Filename of config (default value = "config_vk.ini").
+ """
+ dirname = os.path.dirname(__file__)
+ configfile_path = os.path.join(dirname, filename)
+
+ try:
+ config = configparser.ConfigParser()
+ config.read(configfile_path, encoding="utf-8")
+
+ user_agent = config["VK"]["user_agent"]
+ token = config["VK"]["token_for_audio"]
+
+ return Service(user_agent, token)
+ except Exception as e:
+ logger.warning(e)
+
+ @staticmethod
+ def del_config(filename: str = "config_vk.ini"):
+ """
+ Delete config created by 'TokenReceiver'.
+
+ Args:
+ filename (str): Filename of config (default value = "config_vk.ini").
+ """
+ dirname = os.path.dirname(__file__)
+ configfile_path = os.path.join(dirname, filename)
+
+ try:
+ os.remove(configfile_path)
+ logger.info("Config successful deleted!")
+ except Exception as e:
+ logger.warning(e)
+
+ ##########################
+ # COMMON REQUEST FOR AUDIO
+
+ def __get_response(
+ self, method: str, params: list[tuple[str, str or int]]
+ ) -> Response:
+ api_headers = {"User-Agent": self.user_agent}
+ api_url = f"https://api.vk.com/method/audio.{method}"
+ api_parameters = [
+ ("access_token", self.__token),
+ ("https", 1),
+ ("lang", "ru"),
+ ("extended", 1),
+ ("v", "5.131"),
+ ]
+
+ for pair in params:
+ api_parameters.append(pair)
+
+ session = Session()
+ session.headers.update(api_headers)
+ response = session.post(
+ url=api_url,
+ data=api_parameters,
+ )
+ session.close()
+
+ return response
+
+ ##############
+ # ANY REQUESTS
+
+ def __getCount(self, user_id: int) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ]
+
+ return self.__get_response("getCount", params)
+
+ def __get(
+ self,
+ user_id: int,
+ count: int = 100,
+ offset: int = 0,
+ playlist_id: int or None = None,
+ access_key: str or None = None,
+ ) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ if playlist_id:
+ params.append(("album_id", playlist_id))
+ params.append(("access_key", access_key))
+
+ return self.__get_response("get", params)
+
+ def __search(self, text: str, count: int = 100, offset: int = 0) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ("sort", 0),
+ ("autocomplete", 1),
+ ]
+
+ return self.__get_response("search", params)
+
+ def __getPlaylists(
+ self, user_id: int, count: int = 50, offset: int = 0
+ ) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return self.__get_response("getPlaylists", params)
+
+ def __searchPlaylists(
+ self, text: str, count: int = 50, offset: int = 0
+ ) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return self.__get_response("searchPlaylists", params)
+
+ def __searchAlbums(self, text: str, count: int = 50, offset: int = 0) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return self.__get_response("searchAlbums", params)
+
+ ############
+ # CONVERTERS
+
+ def __response_to_songs(self, response: Response):
+ response = json.loads(response.content.decode("utf-8"))
+ try:
+ items = response["response"]["items"]
+ except Exception as e:
+ logger.error(e)
+
+ songs: list[Song] = []
+ for item in items:
+ song = Song.from_json(item)
+ songs.append(song)
+ return songs
+
+ def __response_to_playlists(self, response: Response):
+ response = json.loads(response.content.decode("utf-8"))
+ try:
+ items = response["response"]["items"]
+ except Exception as e:
+ logger.error(e)
+ playlists: list[Playlist] = []
+ for item in items:
+ playlist = Playlist.from_json(item)
+ playlists.append(playlist)
+ return playlists
+
+ ##############
+ # MAIN METHODS
+
+ def get_count_by_user_id(self, user_id: str or int) -> int:
+ """
+ Get count of all user's songs.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+
+ Returns:
+ int: count of all user's songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response = self.__getCount(user_id)
+ data = json.loads(response.content.decode("utf-8"))
+ songs_count = int(data["response"])
+ except Exception as e:
+ logger.error(e)
+ return
+
+ logger.info(f"Count of user's songs: {songs_count}")
+ return songs_count
+
+ def get_songs_by_userid(
+ self, user_id: str or int, count: int = 100, offset: int = 0
+ ) -> list[Song]:
+ """
+ Search songs by owner/user id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response: Response = self.__get(user_id, count, offset)
+ songs = self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ def get_songs_by_playlist_id(
+ self,
+ user_id: str or int,
+ playlist_id: int,
+ access_key: str,
+ count: int = 100,
+ offset: int = 0,
+ ) -> list[Song]:
+ """
+ Get songs by playlist id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ playlist_id (int): VK playlist id. (Take it from methods for playlist).
+ access_key (str): VK access key. (Take it from methods for playlist).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response: Response = self.__get(
+ user_id, count, offset, playlist_id, access_key
+ )
+ songs = self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ def get_songs_by_playlist(
+ self, playlist: Playlist, count: int = 10, offset: int = 0
+ ) -> list[Song]:
+ """
+ Get songs by instance of 'Playlist'.
+
+ Args:
+ playlist (Playlist): Instance of 'Playlist' (take from methods for receiving Playlist).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ logger.info(f"Request by playlist: {playlist}")
+
+ try:
+ response: Response = self.__get(
+ playlist.owner_id,
+ count,
+ offset,
+ playlist.playlist_id,
+ playlist.access_key,
+ )
+ songs = self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ def search_songs_by_text(
+ self, text: str, count: int = 3, offset: int = 0
+ ) -> list[Song]:
+ """
+ Search songs by text/query.
+
+ Args:
+ text (str): Text of query. Can be title of song, author, etc.
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ logger.info(f'Request by text: "{text}" в количестве {count}')
+
+ try:
+ response = self.__search(text, count, offset)
+ songs = self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ def get_playlists_by_userid(
+ self, user_id: str or int, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Get playlist by owner/user id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of playlists.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response = self.__getPlaylists(user_id, count, offset)
+ playlists = self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ def search_playlists_by_text(
+ self, text: str, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Search playlists by text/query.
+ Playlist - it user's collection of songs.
+
+ Args:
+ text (str): Text of query. Can be title of playlist, genre, etc.
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of playlists.
+ """
+ logger.info(f"Request by text: {text}")
+
+ try:
+ response = self.__searchPlaylists(text, count, offset)
+ playlists = self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ def search_albums_by_text(
+ self, text: str, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Search albums by text/query.
+ Album - artists's album/collection of songs.
+ In obj context - same as 'Playlist'.
+
+ Args:
+ text (str): Text of query. Can be title of album, name of artist, etc.
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of albums.
+ """
+ logger.info(f"Request by text: {text}")
+
+ try:
+ response = self.__searchAlbums(text, count, offset)
+ playlists = self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ @staticmethod
+ def save_music(song: Song) -> str:
+ """
+ Save song to '{workDirectory}/Music/{songname}.mp3'.
+
+ Args:
+ song (Song): 'Song' instance obtained from 'Service' methods.
+
+ Returns:
+ str: relative path of downloaded music.
+ """
+ song.to_safe()
+ file_name_mp3 = f"{song}.mp3"
+ url = song.url
+
+ if url == "":
+ logger.warning("Url no found")
+ return
+
+ response = requests.get(url=url)
+ if response.status_code == 200:
+ if not os.path.exists("Music"):
+ os.makedirs("Music")
+ logger.info("Folder 'Music' was created")
+
+ file_path = os.path.join(os.getcwd(), "Music", file_name_mp3)
+
+ if not os.path.exists(file_path):
+ if "index.m3u8" in url:
+ logger.error(".m3u8 detected!")
+ return
+ else:
+ logger.warning(
+ f"File with name {file_name_mp3} exists. Overwrite it? (Y/n)"
+ )
+
+ res = input().lower()
+ if res.lower() != "y" and res.lower() != "yes":
+ return
+
+ response.close()
+ logger.info(f"Downloading {song}...")
+ with open(file_path, "wb") as output_file:
+ output_file.write(response.content)
+
+ logger.info(f"Success! Music was downloaded in '{file_path}'")
+ return file_path
diff --git a/plugins/VKMusic/vkpymusic/ServiceAsync.py b/plugins/VKMusic/vkpymusic/ServiceAsync.py
new file mode 100644
index 0000000..1580bb7
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/ServiceAsync.py
@@ -0,0 +1,513 @@
+import os, configparser, logging
+
+from httpx import AsyncClient, Response
+
+import aiofiles
+
+from .Logger import get_logger
+from .Playlist import Playlist
+from .Song import Song
+
+logger: logging = get_logger(__name__)
+
+
+class ServiceAsync:
+ def __init__(self, user_agent: str, token: str):
+ self.user_agent = user_agent
+ self.__token = token
+
+ @classmethod
+ def parse_config(cls, filename: str = "config_vk.ini"):
+ """
+ Create an instance of Service from config.
+
+ Args:
+ filename (str): Filename of config (default value = "config_vk.ini").
+ """
+ dirname = os.path.dirname(__file__)
+ configfile_path = os.path.join(dirname, filename)
+
+ try:
+ config = configparser.ConfigParser()
+ config.read(configfile_path, encoding="utf-8")
+
+ user_agent = config["VK"]["user_agent"]
+ token = config["VK"]["token_for_audio"]
+
+ return ServiceAsync(user_agent, token)
+ except Exception as e:
+ logger.warning(e)
+
+ @staticmethod
+ def del_config(filename: str = "config_vk.ini"):
+ """
+ Delete config created by 'TokenReceiver'.
+
+ Args:
+ filename (str): Filename of config (default value = "config_vk.ini").
+ """
+ dirname = os.path.dirname(__file__)
+ configfile_path = os.path.join(dirname, filename)
+
+ try:
+ os.remove(configfile_path)
+ logger.info("Config successful deleted!")
+ except Exception as e:
+ logger.warning(e)
+
+ ##########################
+ # COMMON REQUEST FOR AUDIO
+
+ async def __get_response(
+ self, method: str, params: list[tuple[str, str or int]]
+ ) -> Response:
+ api_headers = {"User-Agent": self.user_agent}
+ api_url = f"https://api.vk.com/method/audio.{method}"
+ api_parameters = [
+ ("access_token", self.__token),
+ ("https", 1),
+ ("lang", "ru"),
+ ("extended", 1),
+ ("v", "5.131"),
+ ]
+
+ for pair in params:
+ api_parameters.append(pair)
+
+ # session = ClientSession()
+ session = AsyncClient()
+ session.headers.update(api_headers)
+ response = await session.post(
+ url=api_url,
+ params=api_parameters
+ # ssl=ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
+ )
+ await session.aclose()
+ return response
+
+ ##############
+ # ANY REQUESTS
+
+ async def __getCount(self, user_id: int) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ]
+
+ return await self.__get_response("getCount", params)
+
+ async def __get(
+ self,
+ user_id: int,
+ count: int = 100,
+ offset: int = 0,
+ playlist_id: int or None = None,
+ access_key: str or None = None,
+ ) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ if playlist_id:
+ params.append(("album_id", playlist_id))
+ params.append(("access_key", access_key))
+
+ return await self.__get_response("get", params)
+
+ async def __search(self, text: str, count: int = 100,
+ offset: int = 0) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ("sort", 0),
+ ("autocomplete", 1),
+ ]
+
+ return await self.__get_response("search", params)
+
+ async def __getPlaylists(
+ self, user_id: int, count: int = 50, offset: int = 0
+ ) -> Response:
+ params = [
+ ("owner_id", user_id),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return await self.__get_response("getPlaylists", params)
+
+ async def __searchPlaylists(
+ self, text: str, count: int = 50, offset: int = 0
+ ) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return await self.__get_response("searchPlaylists", params)
+
+ async def __searchAlbums(self, text: str, count: int = 50,
+ offset: int = 0) -> Response:
+ params = [
+ ("q", text),
+ ("count", count),
+ ("offset", offset),
+ ]
+
+ return await self.__get_response("searchAlbums", params)
+
+ ############
+ # CONVERTERS
+
+ async def __response_to_songs(self, response: Response) \
+ -> list[Song]:
+ response = response.json()
+ count = 0
+ try:
+ items = response["response"]["items"]
+ except Exception as e:
+ logger.error(e)
+
+ songs: list[Song] = []
+ for item in items:
+ song = Song.from_json(item)
+ songs.append(song)
+ return songs
+
+ async def __response_to_playlists(self, response: Response):
+ # response = json.loads(response.content.decode("utf-8"))
+ response = response.json(encoding="utf-8")
+ try:
+ items = response["response"]["items"]
+ except Exception as e:
+ logger.error(e)
+ playlists: list[Playlist] = []
+ for item in items:
+ playlist = Playlist.from_json(item)
+ playlists.append(playlist)
+ return playlists
+
+ ##############
+ # MAIN METHODS
+
+ async def get_count_by_user_id(self, user_id: str or int) -> int:
+ """
+ Get count of all user's songs.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+
+ Returns:
+ int: count of all user's songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response = await self.__getCount(user_id)
+ # data = json.loads(response.content.decode("utf-8"))
+ data = response.json(encoding="utf-8")
+ songs_count = int(data["response"])
+ except Exception as e:
+ logger.error(e)
+ return
+
+ logger.info(f"Count of user's songs: {songs_count}")
+ return songs_count
+
+ async def get_songs_by_userid(
+ self, user_id: str or int, count: int = 100, offset: int = 0
+ ) -> list[Song]:
+ """
+ Search songs by owner/user id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response: Response = await self.__get(user_id, count, offset)
+ songs = await self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ async def get_songs_by_playlist_id(
+ self,
+ user_id: str or int,
+ playlist_id: int,
+ access_key: str,
+ count: int = 100,
+ offset: int = 0,
+ ) -> list[Song]:
+ """
+ Get songs by playlist id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ playlist_id (int): VK playlist id. (Take it from methods for playlist).
+ access_key (str): VK access key. (Take it from methods for playlist).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response: Response = await self.__get(
+ user_id, count, offset, playlist_id, access_key
+ )
+ songs = await self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ async def get_songs_by_playlist(
+ self, playlist: Playlist, count: int = 10, offset: int = 0
+ ) -> list[Song]:
+ """
+ Get songs by instance of 'Playlist'.
+
+ Args:
+ playlist (Playlist): Instance of 'Playlist' (take from methods for receiving Playlist).
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ logger.info(f"Request by playlist: {playlist}")
+
+ try:
+ response: Response = await self.__get(
+ playlist.owner_id,
+ count,
+ offset,
+ playlist.playlist_id,
+ playlist.access_key,
+ )
+ songs = await self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ async def search_songs_by_text(
+ self, text: str, count: int = 3, offset: int = 0
+ ) -> list[Song]:
+ """
+ Search songs by text/query.
+
+ Args:
+ text (str): Text of query. Can be title of song, author, etc.
+ count (int): Count of resulting songs (for VK API: default/max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Song]: List of songs.
+ """
+ logger.info(f'Request by text: "{text}" в количестве {count}')
+
+ try:
+ response: Response = await self.__search(text, count, offset)
+ songs = await self.__response_to_songs(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(songs) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, song in enumerate(songs, start=1):
+ logger.info(f"{i}) {song}")
+ return songs
+
+ async def get_playlists_by_userid(
+ self, user_id: str or int, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Get playlist by owner/user id.
+
+ Args:
+ user_id (str or int): VK user id. (NOT USERNAME! vk.com/id*******).
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of playlists.
+ """
+ user_id = int(user_id)
+ logger.info(f"Request by user: {user_id}")
+
+ try:
+ response: Response = await self.__getPlaylists(user_id, count,
+ offset)
+ playlists = await self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ async def search_playlists_by_text(
+ self, text: str, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Search playlists by text/query.
+ Playlist - it user's collection of songs.
+
+ Args:
+ text (str): Text of query. Can be title of playlist, genre, etc.
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of playlists.
+ """
+ logger.info(f"Request by text: {text}")
+
+ try:
+ response: Response = await self.__searchPlaylists(text, count,
+ offset)
+ playlists = await self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ async def search_albums_by_text(
+ self, text: str, count: int = 5, offset: int = 0
+ ) -> list[Playlist]:
+ """
+ Search albums by text/query.
+ Album - artists's album/collection of songs.
+ In obj context - same as 'Playlist'.
+
+ Args:
+ text (str): Text of query. Can be title of album, name of artist, etc.
+ count (int): Count of resulting playlists (for VK API: default = 50, max = 100).
+ offset (int): Set offset for result. For example, count = 100, offset = 100 -> 101-200.
+
+ Returns:
+ list[Playlist]: List of albums.
+ """
+ logger.info(f"Request by text: {text}")
+
+ try:
+ response: Response = \
+ await self.__searchAlbums(text, count, offset)
+ playlists = await self.__response_to_playlists(response)
+ except Exception as e:
+ logger.error(e)
+ return
+
+ if len(playlists) == 0:
+ logger.info("No results found ._.")
+ else:
+ logger.info("Results:")
+ for i, playlist in enumerate(playlists, start=1):
+ logger.info(f"{i}) {playlist}")
+ return playlists
+
+ @staticmethod
+ async def save_music(song: Song, overwrite: bool = False) -> str:
+ """
+ Save song to '{workDirectory}/Music/{songname}.mp3'.
+
+ Args:
+ song (Song): 'Song' instance obtained from 'Service' methods.
+ overwrite (bool): Overwrite file if it exists
+
+ Returns:
+ str: relative path of downloaded music.
+ """
+ song.to_safe()
+ file_name_mp3 = f"{song}.mp3"
+ url = song.url
+
+ if url == "":
+ logger.warning("Url no found")
+ return
+
+ session = AsyncClient()
+ response = await session.get(url=url)
+
+ if response.status_code == 200:
+ if not os.path.exists("Music"):
+ os.makedirs("Music")
+ logger.info("Folder 'Music' was created")
+
+ file_path = os.path.join(os.getcwd(), "Music", file_name_mp3)
+
+ if not os.path.exists(file_path):
+ if "index.m3u8" in url:
+ logger.error(".m3u8 detected!")
+ await session.aclose()
+ return
+ else:
+ logger.warning(
+ f"File with name {file_name_mp3} exists."
+ )
+ if not overwrite:
+ await session.aclose()
+ return file_path
+
+ logger.info(f"Downloading {song}...")
+ async with aiofiles.open(file_path, "wb") as output_file:
+ await output_file.write(response.read())
+
+ await response.aclose()
+ await session.aclose()
+
+ logger.info(f"Success! Music was downloaded in '{file_path}'")
+ return file_path
diff --git a/plugins/VKMusic/vkpymusic/Song.py b/plugins/VKMusic/vkpymusic/Song.py
new file mode 100644
index 0000000..9fbe792
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/Song.py
@@ -0,0 +1,37 @@
+import re
+
+
+class Song:
+ def __init__(self, title, artist, duration, track_id, owner_id, url=""):
+ self.title = title
+ self.artist = artist
+ self.duration = duration
+ self.track_id = track_id
+ self.owner_id = owner_id
+ self.url = url
+
+ def __str__(self):
+ return f"{self.title} - {self.artist}"
+
+ def to_dict(self) -> dict:
+ return self.__dict__
+
+ def to_safe(self):
+ def safe_format(string):
+ safe_string = re.sub(r"[^A-zА-я0-9+\s]", "", string)
+ return safe_string
+
+ self.title = safe_format(self.title)
+ self.artist = safe_format(self.artist)
+
+ @classmethod
+ def from_json(cls, item):
+ title = str(item["title"])
+ artist = str(item["artist"])
+ duration = int(item["duration"])
+ track_id = str(item["id"])
+ owner_id = str(item["owner_id"])
+ url = str(item["url"])
+
+ song = cls(title, artist, duration, track_id, owner_id, url)
+ return song
diff --git a/plugins/VKMusic/vkpymusic/TokenReceiver.py b/plugins/VKMusic/vkpymusic/TokenReceiver.py
new file mode 100644
index 0000000..223dfd7
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/TokenReceiver.py
@@ -0,0 +1,256 @@
+from typing import Callable
+
+import os
+import logging
+
+import json
+import requests
+
+from .Client import clients
+from .Logger import get_logger
+
+logger: logging = get_logger(__name__)
+
+
+def on_captcha_handler(url: str) -> str:
+ """
+ Default handler to captcha.
+
+ Args:
+ url (str): Url to captcha image.
+
+ Returns:
+ str: Key/decoded captcha.
+ """
+ logger.info("Are you a bot? You need to enter captcha...")
+ os.system(url)
+ captcha_key: str = input("Captcha: ")
+ return captcha_key
+
+
+def on_2fa_handler() -> str:
+ """
+ Default handler to 2fa.
+
+ Returns:
+ str: code from VK/SMS.
+ """
+ logger.info(
+ "SMS with a confirmation code has been sent to your phone! The code is valid for a few minutes!"
+ )
+ code = input("Code: ")
+ return code
+
+
+def on_invalid_client_handler():
+ """
+ Default handler to invalid_client.
+ """
+ logger.error("Invalid login or password")
+
+
+def on_critical_error_handler(response_auth_json):
+ """
+ Default handler to ctitical error.
+
+ Args:
+ response_auth_json (...): Message or object to research.
+ """
+ print(f"on_critical_error: {response_auth_json}")
+
+
+class TokenReceiver:
+ def __init__(self, login, password, client="Kate"):
+ self.__login: str = str(login)
+ self.__password: str = str(password)
+
+ if client in clients:
+ self.client = clients[client]
+ else:
+ self.client = clients["Kate"]
+ self.__token = None
+
+ def __request_auth(self, code=None, captcha=None):
+ session = requests.session()
+ session.headers.update({"User-Agent": self.client.user_agent})
+ query_params = [
+ ("grant_type", "password"),
+ ("client_id", self.client.client_id),
+ ("client_secret", self.client.client_secret),
+ ("username", self.__login),
+ ("password", self.__password),
+ ("scope", "audio,offline"),
+ ("2fa_supported", 1),
+ ("force_sms", 1),
+ ("v", 5.131),
+ ]
+ if captcha:
+ query_params.append(("captcha_sid", captcha[0]))
+ query_params.append(("captcha_key", captcha[1]))
+ if code:
+ query_params.append(("code", code))
+
+ request = session.post("https://oauth.vk.com/token", data=query_params)
+ session.close()
+ return request
+
+ def __request_code(self, sid):
+ session = requests.session()
+ session.headers.update({"User-Agent": self.client.user_agent})
+ query_params = [("sid", str(sid)), ("v", "5.131")]
+ response: requests.Response = session.post(
+ "https://api.vk.com/method/auth.validatePhone",
+ data=query_params,
+ allow_redirects=True,
+ )
+ session.close()
+
+ response_json = json.loads(response.content.decode("utf-8"))
+
+ # right_response_json = {
+ # "response": {
+ # "type": "general",
+ # "sid": {str(sid)},
+ # "delay": 60,
+ # "libverify_support": False,
+ # "validation_type": "sms",
+ # "validation_resend": "sms"
+ # }
+ # }
+
+ return response_json
+
+ def auth(
+ self,
+ on_captcha: Callable[[str], str] = on_captcha_handler,
+ on_2fa: Callable[[], str] = on_2fa_handler,
+ on_invalid_client: Callable[[], None] = on_invalid_client_handler,
+ on_critical_error: Callable[..., None] = on_critical_error_handler,
+ ) -> bool:
+ """
+ Performs authorization using the available login and password.
+ If necessary, interactively accepts a code from SMS or captcha.
+
+ Args:
+ on_captcha (Callable[[str], str]): Handler to captcha. Get url image. Return key.
+ on_2fa (Callable[[], str]): Handler to 2 factor auth. Return captcha.
+ on_invalid_client (Callable[[], None]): Handler to invalid client.
+ on_critical_error (Callable[[Any], None]): Handler to critical error. Get response.
+
+ Returns:
+ bool: Boolean value indicating whether authorization was successful or not.
+ """
+ response_auth: requests.Response = self.__request_auth()
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ while "error" in response_auth_json:
+ error = response_auth_json["error"]
+ sid = 0
+
+ if error == "need_captcha":
+ captcha_sid: str = response_auth_json["captcha_sid"]
+ captcha_img: str = response_auth_json["captcha_img"]
+
+ captcha_key: str = on_captcha(captcha_img)
+
+ response_auth = self.__request_auth(captcha=(captcha_sid, captcha_key))
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "need_validation":
+ sid = response_auth_json["validation_sid"]
+
+ # response2: requests.Response =
+ self.__request_code(sid)
+
+ # response2_json = json.loads(response2.content.decode('utf-8'))
+ code: str = on_2fa()
+
+ response_auth = self.__request_auth(code=code)
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "invalid_request":
+ logger.warn("Invalid code. Try again!")
+ code: str = on_2fa()
+
+ response_auth = self.__request_auth(code=code)
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "invalid_client":
+ del self.__login
+ del self.__password
+ on_invalid_client()
+ return False
+ else:
+ del self.__login
+ del self.__password
+ on_critical_error(response_auth_json)
+ self.__on_error(response_auth_json)
+ return False
+ if "access_token" in response_auth_json:
+ del self.__login
+ del self.__password
+ access_token = response_auth_json["access_token"]
+ logger.info("Token was received!")
+ self.__token = access_token
+ return True
+ del self.__login
+ del self.__password
+ self.__on_error(response_auth_json)
+ on_critical_error(response_auth_json)
+ return False
+
+ def get_token(self) -> str:
+ """
+ Prints token in console (if authorisation was succesful).
+ """
+ token = self.__token
+ if not token:
+ logger.warn('Please, first call the method "auth".')
+ return
+ logger.info(token)
+ return token
+
+ def save_to_config(self, file_path: str = "config_vk.ini"):
+ """
+ Save token and user agent data in config (if authorisation was succesful).
+
+ Args:
+ file_path (str): Filename of config (default value = "config_vk.ini").
+ """
+ token: str = self.__token
+ if not token:
+ logger.warn('Please, first call the method "auth"')
+ return
+ full_fp = self.create_path(file_path)
+ if os.path.isfile(full_fp):
+ print('File already exist! Enter "OK" for rewriting it')
+ if input().lower() != "ok":
+ return
+ os.makedirs(os.path.dirname(full_fp), exist_ok=True)
+ with open(full_fp, "w") as output_file:
+ output_file.write("[VK]\n")
+ output_file.write(f"user_agent={self.client.user_agent}\n")
+ output_file.write(f"token_for_audio={token}")
+ logger.info("Token was saved!")
+
+ @staticmethod
+ def create_path(file_path: str) -> str:
+ """
+ Create path before and after this for different funcs.
+
+ Args:
+ file_path (str): Relative path to file.
+
+ Returns:
+ str: Absolute path to file.
+ """
+ dirname = os.path.dirname(__file__)
+ path = os.path.join(dirname, file_path)
+ return path
+
+ @staticmethod
+ def __on_error(response):
+ logger.critical(
+ "Unexpected error! Please, create an issue in repository for solving this problem."
+ )
+ logger.critical(response)
diff --git a/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py b/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py
new file mode 100644
index 0000000..6e2e42f
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/TokenReceiverAsync.py
@@ -0,0 +1,210 @@
+from typing import Callable
+
+import os
+import logging
+
+import json
+import requests
+
+from .Client import clients
+from .Logger import get_logger
+
+logger: logging = get_logger(__name__)
+
+
+class TokenReceiverAsync:
+ def __init__(self, login, password, client="Kate"):
+ self.__login: str = str(login)
+ self.__password: str = str(password)
+
+ if client in clients:
+ self.client = clients[client]
+ else:
+ self.client = clients["Kate"]
+ self.__token = None
+
+ async def __request_auth(self, code=None, captcha=None):
+ session = requests.session()
+ session.headers.update({"User-Agent": self.client.user_agent})
+ query_params = [
+ ("grant_type", "password"),
+ ("client_id", self.client.client_id),
+ ("client_secret", self.client.client_secret),
+ ("username", self.__login),
+ ("password", self.__password),
+ ("scope", "audio,offline"),
+ ("2fa_supported", 1),
+ ("force_sms", 1),
+ ("v", 5.131),
+ ]
+ if captcha:
+ query_params.append(("captcha_sid", captcha[0]))
+ query_params.append(("captcha_key", captcha[1]))
+ if code:
+ query_params.append(("code", code))
+
+ request = session.post("https://oauth.vk.com/token", data=query_params)
+ session.close()
+ return request
+
+ async def __request_code(self, sid):
+ session = requests.session()
+ session.headers.update({"User-Agent": self.client.user_agent})
+ query_params = [("sid", str(sid)), ("v", "5.131")]
+ response: requests.Response = session.post(
+ "https://api.vk.com/method/auth.validatePhone",
+ data=query_params,
+ allow_redirects=True,
+ )
+ session.close()
+
+ response_json = json.loads(response.content.decode("utf-8"))
+
+ # right_response_json = {
+ # "response": {
+ # "type": "general",
+ # "sid": {str(sid)},
+ # "delay": 60,
+ # "libverify_support": False,
+ # "validation_type": "sms",
+ # "validation_resend": "sms"
+ # }
+ # }
+
+ return response_json
+
+ async def auth(
+ self,
+ on_captcha: Callable[[str], str],
+ on_2fa: Callable[[], str],
+ on_invalid_client: Callable[[], None],
+ on_critical_error: Callable[..., None],
+ ) -> bool:
+ """
+ Performs ASYNC authorization using the available login and password.
+ If necessary, interactively accepts a code from SMS or captcha.
+
+ Args:
+ on_captcha (Callable[[str], str]): ASYNC handler to captcha. Get url image. Return key.
+ on_2fa (Callable[[], str]): ASYNC handler to 2 factor auth. Return captcha.
+ on_invalid_client (Callable[[], None]): ASYNC handler to invalid client.
+ on_critical_error (Callable[[Any], None]): ASYNC handler to crit error. Get response.
+
+ Returns:
+ bool: Boolean value indicating whether authorization was successful or not.
+ """
+ response_auth: requests.Response = await self.__request_auth()
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ while "error" in response_auth_json:
+ error = response_auth_json["error"]
+
+ if error == "need_captcha":
+ captcha_sid: str = response_auth_json["captcha_sid"]
+ captcha_img: str = response_auth_json["captcha_img"]
+
+ captcha_key: str = await on_captcha(captcha_img)
+
+ response_auth = await self.__request_auth(
+ captcha=(captcha_sid, captcha_key)
+ )
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "need_validation":
+ sid = response_auth_json["validation_sid"]
+
+ # response2: requests.Response =
+ await self.__request_code(sid)
+
+ # response2_json = json.loads(response2.content.decode('utf-8'))
+ code: str = await on_2fa()
+
+ response_auth = await self.__request_auth(code=code)
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "invalid_request":
+ logger.warn("Invalid code. Try again!")
+ code: str = await on_2fa()
+
+ response_auth = await self.__request_auth(code=code)
+ response_auth_json = json.loads(response_auth.content.decode("utf-8"))
+
+ elif error == "invalid_client":
+ del self.__login
+ del self.__password
+ await on_invalid_client()
+ return False
+ else:
+ del self.__login
+ del self.__password
+ await on_critical_error(response_auth_json)
+ self.__on_error(response_auth_json)
+ return False
+ if "access_token" in response_auth_json:
+ del self.__login
+ del self.__password
+ access_token = response_auth_json["access_token"]
+ logger.info("Token was received!")
+ self.__token = access_token
+ return True
+ del self.__login
+ del self.__password
+ self.__on_error(response_auth_json)
+ await on_critical_error(response_auth_json)
+ return False
+
+ def get_token(self) -> str:
+ """
+ Prints token in console (if authorisation was succesful).
+ """
+ token = self.__token
+ if not token:
+ logger.warn('Please, first call the method "auth".')
+ return
+ logger.info(token)
+ return token
+
+ def save_to_config(self, file_path: str = "config_vk.ini"):
+ """
+ Save token and user agent data in config (if authorisation was succesful).
+
+ Args:
+ file_path (str): Filename of config (default value = "config_vk.ini").
+ """
+ token: str = self.__token
+ if not token:
+ logger.warn('Please, first call the method "auth"')
+ return
+ full_fp = self.create_path(file_path)
+ if os.path.isfile(full_fp):
+ print('File already exist! Enter "OK" for rewriting it')
+ if input().lower() != "ok":
+ return
+ os.makedirs(os.path.dirname(full_fp), exist_ok=True)
+ with open(full_fp, "w") as output_file:
+ output_file.write("[VK]\n")
+ output_file.write(f"user_agent={self.client.user_agent}\n")
+ output_file.write(f"token_for_audio={token}")
+ logger.info("Token was saved!")
+
+ @staticmethod
+ def create_path(file_path: str) -> str:
+ """
+ Create path before and after this for different funcs.
+
+ Args:
+ file_path (str): Relative path to file.
+
+ Returns:
+ str: Absolute path to file.
+ """
+ dirname = os.path.dirname(__file__)
+ path = os.path.join(dirname, file_path)
+ return path
+
+ @staticmethod
+ def __on_error(response):
+ logger.critical(
+ "Unexpected error! Please, create an issue in repository for solving this problem."
+ )
+ logger.critical(response)
diff --git a/plugins/VKMusic/vkpymusic/__init__.py b/plugins/VKMusic/vkpymusic/__init__.py
new file mode 100644
index 0000000..8bf4902
--- /dev/null
+++ b/plugins/VKMusic/vkpymusic/__init__.py
@@ -0,0 +1,7 @@
+from .TokenReceiver import TokenReceiver
+from .TokenReceiverAsync import TokenReceiverAsync
+from .Client import clients
+from .Service import Service
+from .ServiceAsync import ServiceAsync
+from .Playlist import Playlist
+from .Song import Song
diff --git a/pyproject.toml b/pyproject.toml
index e081674..00514c2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "SeaPlayer"
-version = "0.8.4"
+version = "0.8.5"
description = "SeaPlayer is a player that works in the terminal."
repository = "https://github.com/romanin-rf/SeaPlayer"
authors = ["Romanin "]
@@ -12,10 +12,7 @@ classifiers = [
"Operating System :: Microsoft :: Windows :: Windows 10",
"Operating System :: Microsoft :: Windows :: Windows 11",
"Operating System :: POSIX :: Linux",
- "Operating System :: MacOS",
- "Programming Language :: Python :: 3.9",
- "Programming Language :: Python :: 3.10",
- "Programming Language :: Python :: 3.11"
+ "Operating System :: MacOS"
]
packages = [{ include = "seaplayer" }]
include = [
@@ -36,14 +33,14 @@ seaplayer = "seaplayer.__main__:run"
seaplug = "seaplayer.plug.__main__:run"
[tool.poetry.dependencies]
-python = ">=3.9,<3.12"
+python = ">=3.8,<3.12"
# Custom modules
-ripix = "2.4.2"
-playsoundsimple-py = "0.8.1.post2"
-properties-py = "1.1.0"
-urlopen2 = "1.2.0.post1"
+ripix = "2.5.0"
+playsoundsimple-py = "0.8.4"
+properties-py = "1.2.1"
+urlopen2 = "1.4.0"
# Main Modules
-pillow = ">=10"
+pillow = ">=9.5"
aiofiles = ">=23.1"
rich = ">=13"
mutagen = ">=1.45"
diff --git a/requirements.txt b/requirements.txt
index 00d5c31..722a380 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,9 +1,9 @@
-ripix==2.4.2
-playsoundsimple-py==0.8.1.post2
-properties-py==1.1.0
-urlopen2==1.2.0.post1
+ripix==2.5.0
+playsoundsimple-py==0.8.4
+properties-py==1.2.1
+urlopen2==1.4.0
-pillow>=10
+pillow>=9.5
aiofiles>=23.1
rich>=13
mutagen>=1.45
diff --git a/seaplayer/codecs/AnySound.py b/seaplayer/codecs/AnySound.py
index b654d10..7995678 100644
--- a/seaplayer/codecs/AnySound.py
+++ b/seaplayer/codecs/AnySound.py
@@ -82,7 +82,7 @@ def from_url(
while len(data:=urlfile.read(download_buffer_size)) != 0:
tempfile.write(data)
- return AnySound(path, **{"is_temp": True, **kwargs})
+ return AnySound(path, is_temp=True, **kwargs)
@staticmethod
async def aio_from_url(
@@ -97,4 +97,4 @@ async def aio_from_url(
while len(data:=urlfile.read(download_buffer_size)) != 0:
await tempfile.write(data)
- return AnySound(path, **{"is_temp": True, **kwargs})
+ return AnySound(path, is_temp=True, **kwargs)
diff --git a/seaplayer/codecs/URLS.py b/seaplayer/codecs/URLS.py
index 11aae69..dbabb6d 100644
--- a/seaplayer/codecs/URLS.py
+++ b/seaplayer/codecs/URLS.py
@@ -70,10 +70,10 @@ async def __aio_sha1__(self, buffer_size: int) -> str:
def __init__(self, url: str, sound_device_id: Optional[int]=None, aio_init: bool=False, **kwargs) -> None:
self.name = url
if not aio_init:
- self._sound = AnySound.from_url(self.name, device_id=sound_device_id)
+ self._sound = AnySound.from_url(self.name, device_id=sound_device_id, **kwargs)
@staticmethod
async def __aio_init__(url: str, sound_device_id: Optional[int]=None, **kwargs):
self = URLSoundCodec(url, aio_init=True)
- self._sound = await asyncio.to_thread(AnySound.from_url, url, device_id=sound_device_id)
+ self._sound = await asyncio.to_thread(AnySound.from_url, url, device_id=sound_device_id, **kwargs)
return self
diff --git a/seaplayer/config.py b/seaplayer/config.py
index 228661d..fa7797a 100644
--- a/seaplayer/config.py
+++ b/seaplayer/config.py
@@ -7,35 +7,21 @@
# ! Vars
DEFAULT_CONFIG_DATA = {
- "main": {
- "lang": "en-eng"
- },
- "sound": {
- "sound_font_path": None, # * Optional[str]
- "output_sound_device_id": None, # * Optional[int]
- },
- "image": {
- "image_update_method": "sync", # * Literal["sync", "async"]
- "image_resample_method": "bilinear", # * Literal["nearest", "bilinear", "bicubic", "lanczos", "hamming", "box"]
- },
- "playback": {
- "volume_change_percent": 0.05,
- "rewind_count_seconds": 5,
- "max_volume_percent": 2.0
- },
- "playlist": {
- "recursive_search": False
- },
- "keys": {
- "quit": "q,й",
- "rewind_forward": "*",
- "rewind_back": "/",
- "volume_up": "+",
- "volume_down": "-"
- },
- "debag": {
- "log_menu_enable": False
- }
+ "main.lang": "en-eng",
+ "sound.sound_font_path": None,
+ "sound.output_sound_device_id": None,
+ "image.image_update_method": "sync",
+ "image.image_resample_method": "bilinear",
+ "playback.rewind_count_seconds": 5,
+ "playback.volume_change_percent": 0.05,
+ "playback.max_volume_percent": 2.0,
+ "playlist.recursive_search": False,
+ "keys.quit": "q,й",
+ "keys.rewind_forward": "*",
+ "keys.rewind_back": "/",
+ "keys.volume_up": "+",
+ "keys.volume_down": "-",
+ "debag.log_menu_enable": False
}
# ! Main Class
@@ -43,17 +29,17 @@ class SeaPlayerConfig:
@staticmethod
def dump(filepath: Path, data: Dict[str, Any]) -> None:
with open(filepath, "w", encoding="utf-8", errors="ignore") as file:
- properties.dump_tree(data, file)
+ properties.dump(data, file)
@staticmethod
def load(filepath: Path, default: Dict[str, Any]) -> Dict[str, Any]:
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
try:
- return properties.load_tree(file)
+ return properties.load(file)
except:
pass
with open(filepath, "w", encoding="utf-8", errors="ignore") as file:
- properties.dump_tree(default, file)
+ properties.dump(default, file)
return default
def refresh(self) -> None:
@@ -77,22 +63,12 @@ def __init__(
self.config = default_data.copy()
self.refresh()
- @staticmethod
- def tevey(key_path: str, *, sep: str=".") -> str:
- return "".join([f"[{repr(key)}]" for key in key_path.split(sep)])
-
def get(self, key: str, default: T=None) -> Union[Any, T]:
- try:
- return eval(f"self.config{self.tevey(key)}")
- except:
- return default
+ return self.config.get(key, default)
def set(self, key: str, value: Any) -> None:
- try:
- exec(f"self.config{self.tevey(key)} = value")
- self.refresh()
- except:
- pass
+ self.config[key] = value
+ self.refresh()
# ! Main
@property
diff --git a/seaplayer/objects/Buttons.py b/seaplayer/objects/Buttons.py
index b3c3057..a378b6d 100644
--- a/seaplayer/objects/Buttons.py
+++ b/seaplayer/objects/Buttons.py
@@ -1,6 +1,8 @@
+import asyncio
+from rich.text import TextType
from textual.widgets import Button
# > Typing
-from typing import Callable, Awaitable
+from typing import Optional, Callable, Awaitable
# ! Clikable Button Class
class ClikableButton(Button):
@@ -9,4 +11,16 @@ def __init__(self, label: str, on_click: Callable[[], Awaitable[None]], *args, *
self.__on_click_method = on_click
async def on_click(self, *args, **kwargs) -> None:
- await self.__on_click_method()
\ No newline at end of file
+ await self.__on_click_method()
+
+# ! Wait Button Click
+class WaitButton(Button):
+ def __init__(self, label: Optional[TextType]=None, queue_max_size: int=1, *args, **kwargs) -> None:
+ super().__init__(label, *args, **kwargs)
+ self.__wait_queue: asyncio.Queue[bool] = asyncio.Queue(queue_max_size)
+
+ async def on_click(self, *args, **kwargs) -> None:
+ await self.__wait_queue.put(True)
+
+ async def wait_click(self) -> bool:
+ return await self.__wait_queue.get()
\ No newline at end of file
diff --git a/seaplayer/objects/Input.py b/seaplayer/objects/Input.py
index bdb5ddd..12e31c4 100644
--- a/seaplayer/objects/Input.py
+++ b/seaplayer/objects/Input.py
@@ -1,33 +1,34 @@
from textual.widgets import Input
# > Typing
-from typing import Optional, Tuple, Any
+from typing import Optional, Tuple, Callable, Awaitable, Any
# ! InputFiles functions
-async def _conv(value: str) -> Tuple[bool, Optional[Any]]: return True, value
-async def _submit(input: Input, value: Any) -> None: ...
-def _update_placeholder() -> Optional[str]: ...
+async def conv(value: str) -> Tuple[bool, Optional[Any]]: return True, value
+async def submit(input: Input, value: Any) -> None: ...
+def update_placeholder() -> Optional[str]: ...
# ! InputField class
class InputField(Input):
def __init__(
self,
- conv=_conv,
- submit=_submit,
- update_placeholder=_update_placeholder,
+ conv: Callable[[str], Awaitable[Tuple[bool, Optional[Any]]]]=conv,
+ submit: Callable[[Input, Any], Awaitable[None]]=submit,
+ update_placeholder: Callable[[], Optional[str]]=update_placeholder,
**kwargs
) -> None:
super().__init__(**kwargs)
- self._conv = conv
- self._submit = submit
- self._update_placeholder = update_placeholder
- if (placeholder:=self._update_placeholder()) is not None:
+ self.__conv = conv
+ self.__submit = submit
+ self.__update_placeholder = update_placeholder
+ if (placeholder:=self.__update_placeholder()) is not None:
self.placeholder = placeholder
async def action_submit(self):
value = self.value
self.value = ""
if value.replace(" ", "") != "":
- ok, c_value = await self._conv(value)
- if ok: await self._submit(self, c_value)
- if (placeholder:=self._update_placeholder()) is not None:
- self.placeholder = placeholder
\ No newline at end of file
+ ok, c_value = await self.__conv(value)
+ if ok:
+ await self.__submit(self, c_value)
+ if (placeholder:=self.__update_placeholder()) is not None:
+ self.placeholder = placeholder
diff --git a/seaplayer/objects/Labels.py b/seaplayer/objects/Labels.py
index 8c5da95..5720a18 100644
--- a/seaplayer/objects/Labels.py
+++ b/seaplayer/objects/Labels.py
@@ -1,3 +1,4 @@
+import random
from textual.widgets import Label, Button
from rich.style import Style
from rich.console import RenderableType
@@ -14,23 +15,31 @@ class FillLabel(Label):
width: 1fr;
}
"""
+ def __gen_random_color(self) -> None:
+ return f"#{hex(random.randint(0,16777215))[2:]}"
- def _gen(self) -> Segments:
- return Segments([Segment(self.__chr, self.__style) for i in range((self.size[0] * self.size[1]))])
+ def __gen_segments(self) -> Segments:
+ segments = []
+ for i in range(self.size[0]*self.size[1]):
+ s = Style.parse(self.__gen_random_color()) if self.__rainbow else self.__style
+ segments.append(Segment(self.__chr, s))
+ return Segments(segments)
def __init__(
self,
char: str="-",
style: Optional[Style]=None,
+ rainbow: bool=False,
**kwargs
) -> None:
super().__init__(**kwargs)
self.__chr = char[:1]
+ self.__rainbow = rainbow
self.__style = style
- self.update(self._gen())
+ self.update(self.__gen_segments())
async def on_resize(self) -> None:
- self.update(self._gen())
+ self.update(self.__gen_segments())
# ! Clickable Label Class
class ClickableLabel(Label, Button, can_focus=True):
diff --git a/seaplayer/objects/PopUp.py b/seaplayer/objects/PopUp.py
index e25b05e..1746e49 100644
--- a/seaplayer/objects/PopUp.py
+++ b/seaplayer/objects/PopUp.py
@@ -1,4 +1,10 @@
+from textual.widget import Widget
+from textual.widgets import Label
from textual.containers import Container
+# > Typing
+from typing import Optional
+# > Local Imports
+from .Labels import FillLabel
# ! Main Class
class PopUp(Container):
@@ -6,6 +12,7 @@ class PopUp(Container):
PopUp {
layer: popup;
background: $background;
+ text-align: center;
align-vertical: middle;
align-horizontal: center;
content-align: center middle;
@@ -14,5 +21,50 @@ class PopUp(Container):
padding: 1 2 1 2;
}
"""
-
pass
+
+# ! Other Clases
+class PopUpWindow(PopUp):
+ DEFAULT_CSS = """
+ PopUpWindow {
+ layer: popup;
+ align-vertical: middle;
+ align-horizontal: center;
+ content-align: center middle;
+ text-align: center;
+ }
+ PopUpWindow FillLabel {
+ width: auto;
+ height: 1;
+ }
+ """
+ def __init__(
+ self,
+ *children: Widget,
+ title: Optional[str]=None,
+ name: Optional[str]=None,
+ id: Optional[str]=None,
+ classes: Optional[str]=None,
+ disabled: bool=False
+ ) -> None:
+ self.__title = title or self.__class__.__name__
+ self.__title_label = Label(self.__title)
+ super().__init__(
+ self.__title_label,
+ FillLabel('─'),
+ *children,
+ name=name,
+ id=id,
+ classes=classes,
+ disabled=disabled
+ )
+
+ # ! Propertys
+ @property
+ def title(self) -> str:
+ return self.__title
+
+ @title.setter
+ def title(self, value: str) -> str:
+ self.__title = str(value)
+ self.__title_label.update(self.__title)
\ No newline at end of file
diff --git a/seaplayer/objects/__init__.py b/seaplayer/objects/__init__.py
index 14ebba6..653c905 100644
--- a/seaplayer/objects/__init__.py
+++ b/seaplayer/objects/__init__.py
@@ -1,6 +1,7 @@
from .Log import LogMenu
from .Labels import FillLabel
from .Input import InputField
+from .Buttons import ClikableButton, WaitButton
from .Notification import Nofy, CallNofy
from .ProgressBar import IndeterminateProgress
from .DataOptions import DataOption, DataOptionList
@@ -8,5 +9,5 @@
from .MusicList import MusicListView, MusicListViewItem
from .Configurate import ConfigurateListItem, ConfigurateList
from .Radio import DataRadioSet, DataRadioButton
+from .PopUp import PopUp, PopUpWindow
from .Rheostat import Rheostat
-from .PopUp import PopUp
diff --git a/seaplayer/plug/cli/cli.py b/seaplayer/plug/cli/cli.py
index 4fd9c64..3e4b1ba 100644
--- a/seaplayer/plug/cli/cli.py
+++ b/seaplayer/plug/cli/cli.py
@@ -46,7 +46,7 @@ def listing():
for n, info in enumerate(get_plugins_info(), 1):
status = "[green]Enabled[/green]" if plugin_config.is_enable_plugin(info) else "[red]Disabled[/red]"
console.print(
- f"[cyan]{n}[/cyan]. [green]{info.name}[/green] ([green]{info.name_id}[/green]) [cyan]v{info.version}[/cyan] from [yellow]{info.author}[/yellow] ({status})"
+ f"[cyan]{n}[/cyan]. [green]{info.name}[/green] ([green]{info.name_id}[/green]) [#60fdff]v{info.version}[/#60fdff] from [yellow]{info.author}[/yellow] ({status})"
)
else:
console.print(f"[yellow]The list of plugins is [blue]empty[/blue].[/yellow]")
diff --git a/seaplayer/plug/cli/functions.py b/seaplayer/plug/cli/functions.py
index a509fd5..2ce379d 100644
--- a/seaplayer/plug/cli/functions.py
+++ b/seaplayer/plug/cli/functions.py
@@ -5,9 +5,12 @@
# > Local Import's
from .functions import *
from ..pluginbase import PluginInfo
-from ..pluginloader import PluginLoaderConfigManager, PluginLoader
+from ..pluginloader import PluginLoaderConfigManager, PluginLoader, load_plugin_info
from ...units import PLUGINS_CONFIG_PATH, PLUGINS_DIRPATH
+# ! Vars
+console = Console()
+
# ! Functions
def init_config() -> None:
os.makedirs(PLUGINS_DIRPATH, mode=0o755, exist_ok=True)
@@ -36,9 +39,9 @@ def get_plugins_info() -> List[PluginInfo]:
plugins_infos = []
for plugin_init_path, plugin_info_path, plugin_deps_path in PluginLoader.search_plugins_paths():
try:
- plugins_infos.append(PluginLoader.load_plugin_info(plugin_info_path))
+ plugins_infos.append(load_plugin_info(plugin_info_path))
except:
- pass
+ console.print_exception()
return plugins_infos
def is_plugin_dirpath(dirpath: str) -> bool:
diff --git a/seaplayer/plug/pluginbase.py b/seaplayer/plug/pluginbase.py
index a2db8ab..8e78f25 100644
--- a/seaplayer/plug/pluginbase.py
+++ b/seaplayer/plug/pluginbase.py
@@ -58,5 +58,8 @@ def on_run(self):
async def on_compose(self):
pass
+ async def on_ready(self):
+ pass
+
async def on_quit(self):
pass
diff --git a/seaplayer/plug/pluginbase.pyi b/seaplayer/plug/pluginbase.pyi
index 7b0c36f..85fc239 100644
--- a/seaplayer/plug/pluginbase.pyi
+++ b/seaplayer/plug/pluginbase.pyi
@@ -34,4 +34,5 @@ class PluginBase:
def on_init(self) -> None: ...
def on_run(self) -> None: ...
async def on_compose(self) -> None: ...
+ async def on_ready(self) -> None: ...
async def on_quit(self) -> None: ...
diff --git a/seaplayer/plug/pluginloader.py b/seaplayer/plug/pluginloader.py
index 214b954..9bdb554 100644
--- a/seaplayer/plug/pluginloader.py
+++ b/seaplayer/plug/pluginloader.py
@@ -275,6 +275,13 @@ async def on_compose(self) -> None:
except:
self.app.error(f"Failed to do [green]`await on_compose`[/green] in: {i}")
+ async def on_ready(self) -> None:
+ async for i in aiter(self.on_plugins):
+ try:
+ await i.on_ready()
+ except:
+ self.app.error(f"Failed to do [green]`await on_compose`[/green] in: {i}")
+
async def on_quit(self) -> None:
async for i in aiter(self.on_plugins):
try:
diff --git a/seaplayer/plug/pluginloader.pyi b/seaplayer/plug/pluginloader.pyi
index 8d82d02..d199171 100644
--- a/seaplayer/plug/pluginloader.pyi
+++ b/seaplayer/plug/pluginloader.pyi
@@ -88,4 +88,5 @@ class PluginLoader:
def on_init(self) -> None: ...
def on_run(self) -> None: ...
async def on_compose(self) -> None: ...
+ async def on_ready(self) -> None: ...
async def on_quit(self) -> None: ...
diff --git a/seaplayer/seaplayer.py b/seaplayer/seaplayer.py
index aeb88fb..f146fac 100644
--- a/seaplayer/seaplayer.py
+++ b/seaplayer/seaplayer.py
@@ -343,36 +343,33 @@ async def _spm(input: InputField, value: Any) -> None:
group="CONTROL_UPDATER-LOOP",
description="Control of playback modes and status updates."
)
- if ENABLE_PLUGIN_SYSTEM:
- self.run_worker(
- self.plugin_loader.on_compose,
- name="ON_COMPOSE",
- group="PluginLoader",
- description=""
- )
self.info("---")
# ! Currect Sound Controls
async def currect_sound_stop(self, sound: Optional[CodecBase]=None):
- if sound is None: sound = await self.aio_gcs()
+ if sound is None:
+ sound = await self.aio_gcs()
if sound is not None:
self.last_playback_status = 0
sound.stop()
async def currect_sound_play(self, sound: Optional[CodecBase]=None):
- if sound is None: sound = await self.aio_gcs()
+ if sound is None:
+ sound = await self.aio_gcs()
if sound is not None:
self.last_playback_status = 1
sound.play()
async def currect_sound_pause(self, sound: Optional[CodecBase]=None):
- if sound is None: sound = await self.aio_gcs()
+ if sound is None:
+ sound = await self.aio_gcs()
if sound is not None:
self.last_playback_status = 3
sound.pause()
async def currect_sound_unpause(self, sound: Optional[CodecBase]=None):
- if sound is None: sound = await self.aio_gcs()
+ if sound is None:
+ sound = await self.aio_gcs()
if sound is not None:
self.last_playback_status = 1
sound.unpause()
@@ -406,20 +403,26 @@ async def set_sound_for_playback(
async def add_sounds_to_list(self) -> None:
added_oks = 0
loading_nofy = await self.aio_callnofy(self.ll.get("nofys.sound.found").format(count=len(self.last_handlered_values)))
- async for path in aiter(self.last_handlered_values):
+ self.CODECS.sort(key=lambda x: x.codec_priority)
+ async for value in aiter(self.last_handlered_values):
sound = None
async for codec in aiter(self.CODECS):
+ self.info(f"Attempt to load via {repr(codec)}")
try:
- if await codec.aio_is_this_codec(path):
- if not hasattr(codec, "__aio_init__"):
+ if hasattr(codec, "aio_is_this_codec"):
+ this_codec = await codec.aio_is_this_codec(value)
+ else:
+ this_codec = codec.is_this_codec(value)
+ if this_codec:
+ if hasattr(codec, "__aio_init__"):
try:
- sound: CodecBase = codec(path, **self.CODECS_KWARGS)
+ sound: CodecBase = await codec.__aio_init__(value, **self.CODECS_KWARGS)
except Exception as e:
self.exception(e)
sound = None
else:
try:
- sound: CodecBase = await codec.__aio_init__(path, **self.CODECS_KWARGS)
+ sound: CodecBase = codec(value, **self.CODECS_KWARGS)
except Exception as e:
self.exception(e)
sound = None
@@ -430,14 +433,14 @@ async def add_sounds_to_list(self) -> None:
added_oks += 1
break
except FileNotFoundError:
- self.error(f"The file does not exist or is a directory: {repr(path)}")
+ self.error(f"The file does not exist or is a directory: {repr(value)}")
break
except OSError:
pass
except Exception as e:
self.exception(e)
if sound is None:
- self.error(f"The sound could not be loaded: {repr(path)}")
+ self.error(f"The sound could not be loaded: {repr(value)}")
await loading_nofy.remove()
self.info(f"Added [cyan]{added_oks}[/cyan] songs!")
await self.aio_nofy(self.ll.get("nofys.sound.added").format(count=added_oks))
@@ -451,7 +454,8 @@ async def on_button_pressed(self, event: Button.Pressed) -> None:
@on(Button.Pressed, "#button-pause")
async def bp_pause_unpause(self) -> None:
if (sound:=await self.aio_gcs()) is not None:
- if sound.playing: await self.currect_sound_pause(sound)
+ if sound.playing:
+ await self.currect_sound_pause(sound)
await self.aio_update_select_label(sound)
@on(Button.Pressed, "#button-play-stop")
@@ -529,10 +533,27 @@ async def action_quit(self):
sound.stop()
return await super().action_quit()
+ # ! On Functions
+ async def on_compose(self) -> None:
+ if ENABLE_PLUGIN_SYSTEM:
+ self.run_worker(
+ self.plugin_loader.on_compose,
+ name="ON_COMPOSE",
+ group="PluginLoader",
+ description=""
+ )
+
+ def on_ready(self, *args, **kwargs) -> None:
+ if ENABLE_PLUGIN_SYSTEM:
+ self.run_worker(
+ self.plugin_loader.on_ready,
+ name="ON_READY",
+ group="PluginLoader",
+ description=""
+ )
+
# ! Other
def run(self, *args, **kwargs):
if ENABLE_PLUGIN_SYSTEM:
self.plugin_loader.on_run()
- self.CODECS.sort(key=lambda x: x.codec_priority)
super().run(*args, **kwargs)
-
diff --git a/seaplayer/units.py b/seaplayer/units.py
index 7639e82..6eaf174 100644
--- a/seaplayer/units.py
+++ b/seaplayer/units.py
@@ -6,7 +6,7 @@
# ! Metadata
__title__ = "SeaPlayer"
-__version__ = "0.8.4"
+__version__ = "0.8.5"
__author__ = "Romanin"
__email__ = "semina054@gmail.com"
__url__ = "https://github.com/romanin-rf/SeaPlayer"