diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 798a19ec5..82879c0a9 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -310,16 +310,6 @@ async def set_provider_config_value( conf_key = f"{CONF_PROVIDERS}/{config.instance_id}" self.set(conf_key, config.to_raw()) - @api_command("config/providers/reload") - async def reload_provider(self, instance_id: str) -> None: - """Reload provider.""" - try: - config = await self.get_provider_config(instance_id) - except KeyError: - # Edge case: Provider was removed before we could reload it - return - await self._load_provider_config(config) - @api_command("config/players") async def get_player_configs( self, provider: str | None = None, include_values: bool = False @@ -732,6 +722,16 @@ async def _async_save(self) -> None: await _file.write(json_dumps(self._data, indent=True)) LOGGER.debug("Saved data to persistent storage") + @api_command("config/providers/reload") + async def _reload_provider(self, instance_id: str) -> None: + """Reload provider.""" + try: + config = await self.get_provider_config(instance_id) + except KeyError: + # Edge case: Provider was removed before we could reload it + return + await self.mass.load_provider_config(config) + async def _update_provider_config( self, instance_id: str, values: dict[str, ConfigValueType] ) -> ProviderConfig: @@ -750,7 +750,7 @@ async def _update_provider_config( raw_conf = config.to_raw() self.set(conf_key, raw_conf) if config.enabled: - await self._load_provider_config(config) + await self.mass.load_provider_config(config) else: # disable provider prov_manifest = self.mass.get_provider_manifest(config.domain) @@ -826,24 +826,9 @@ async def _add_provider_config( self.set(conf_key, config.to_raw()) # try to load the provider try: - await self._load_provider_config(config) + await self.mass.load_provider_config(config) except Exception: # loading failed, remove config self.remove(conf_key) raise return config - - async def _load_provider_config(self, config: ProviderConfig) -> None: - """Load given provider config.""" - # check if there are no other providers dependent of this provider - deps = set() - for dep_prov in self.mass.providers: - if dep_prov.manifest.depends_on == config.domain: - deps.add(dep_prov.instance_id) - await self.mass.unload_provider(dep_prov.instance_id) - # (re)load the provider - await self.mass.load_provider_config(config) - # reload any dependants - for dep in deps: - conf = await self.get_provider_config(dep) - await self.mass.load_provider(conf.instance_id) diff --git a/music_assistant/server/controllers/music.py b/music_assistant/server/controllers/music.py index 28c8708f3..826ecb24f 100644 --- a/music_assistant/server/controllers/music.py +++ b/music_assistant/server/controllers/music.py @@ -816,6 +816,14 @@ def on_sync_task_done(task: asyncio.Task) -> None: async def cleanup_provider(self, provider_instance: str) -> None: """Cleanup provider records from the database.""" + if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")): + # removal of a local provider can become messy very fast due to the relations + # such as images pointing at the files etc. so we just reset the whole db + self.logger.warning( + "Removal of local provider detected, issuing full database reset..." + ) + await self._reset_database() + return deleted_providers = self.mass.config.get_raw_core_config_value( self.domain, CONF_DELETED_PROVIDERS, [] ) @@ -828,8 +836,8 @@ async def cleanup_provider(self, provider_instance: str) -> None: ) self.mass.config.save(True) - # clean cache items from deleted provider(s) - await self.mass.cache.clear(provider_instance) + # always clear cache when a provider is removed + await self.mass.cache.clear() # cleanup media items from db matched to deleted provider self.logger.info( @@ -962,7 +970,7 @@ async def _setup_database(self) -> None: "Database migration failed - setup can not continue. " "Try restarting the server. If this issue persists, create an issue report " " on Github and/or re-install the server (or restore a backup).", - exc_info=err, + exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None, ) # restore backup file await asyncio.to_thread(shutil.copyfile, db_path_backup, db_path) @@ -987,6 +995,7 @@ async def _setup_database(self) -> None: async def __migrate_database(self, prev_version: int) -> None: """Perform a database migration.""" + # ruff: noqa: PLR0915 self.logger.info( "Migrating database from version %s to %s", prev_version, DB_SCHEMA_VERSION ) @@ -1075,7 +1084,16 @@ async def __migrate_database(self, prev_version: int) -> None: item.provider_mappings = { x for x in item.provider_mappings if x.provider_instance is not None } - await ctrl.update_item_in_library(item.item_id, item, True) + try: + await ctrl.update_item_in_library(item.item_id, item, True) + except Exception as err: + self.logger.warning( + "Error while migrating %s: %s", + item.item_id, + str(err), + exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None, + ) + await ctrl.remove_item_from_library(item.item_id) if prev_version <= 5: # mark all provider mappings as available to recover from the bug @@ -1114,7 +1132,16 @@ async def __migrate_database(self, prev_version: int) -> None: changes = True if changes: media_item.metadata.images = images - await ctrl.update_item_in_library(media_item.item_id, media_item, True) + try: + await ctrl.update_item_in_library(media_item.item_id, media_item, True) + except Exception as err: + self.logger.warning( + "Error while migrating %s: %s", + media_item.item_id, + str(err), + exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None, + ) + await ctrl.remove_item_from_library(media_item.item_id) # save changes await self.database.commit() diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 8cff8ced4..b7f6e4a63 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -46,7 +46,7 @@ check_audio_support, crossfade_pcm_parts, get_ffmpeg_stream, - get_hls_stream, + get_hls_substream, get_icy_stream, get_player_filter_params, get_silence, @@ -751,9 +751,13 @@ async def get_media_stream( async for chunk in get_silence(2, pcm_format): yield chunk elif streamdetails.stream_type == StreamType.HLS: - audio_source = get_hls_stream( - self.mass, streamdetails.path, streamdetails, streamdetails.seek_position - ) + # we simply select the best quality substream here + # if we ever want to support adaptive stream selection based on bandwidth + # we need to move the substream selection into the loop below and make it + # bandwidth aware. For now we just assume domestic high bandwidth where + # the user wants the best quality possible at all times. + substream = await get_hls_substream(self.mass, streamdetails.path) + audio_source = substream.path elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP: audio_source = streamdetails.path extra_input_args += ["-decryption_key", streamdetails.decryption_key] diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 6b7b5c8e6..5c47b7c68 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -7,7 +7,6 @@ import os import re import struct -import time from collections import deque from collections.abc import AsyncGenerator from contextlib import suppress @@ -50,7 +49,6 @@ fetch_playlist, parse_m3u, ) -from music_assistant.server.helpers.tags import parse_tags from .process import AsyncProcess, check_output, communicate from .util import create_tempfile @@ -371,7 +369,7 @@ async def get_stream_details( # work out how to handle radio stream if ( - streamdetails.media_type == MediaType.RADIO + streamdetails.media_type in (MediaType.RADIO, StreamType.ICY, StreamType.HLS) and streamdetails.stream_type == StreamType.HTTP ): resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, streamdetails.path) @@ -467,7 +465,6 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo - bool if the URL represents a ICY (radio) stream. - bool uf the URL represents a HLS stream/playlist. """ - base_url = url.split("?")[0] cache_key = f"RADIO_RESOLVED_{url}" if cache := await mass.cache.get(cache_key): return cache @@ -486,8 +483,12 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo is_icy = headers.get("icy-metaint") is not None is_hls = headers.get("content-type") in HLS_CONTENT_TYPES if ( - base_url.endswith((".m3u", ".m3u8", ".pls")) - or headers.get("content-type") == "audio/x-mpegurl" + url.endswith((".m3u", ".m3u8", ".pls")) + or ".m3u?" in url + or ".m3u8?" in url + or ".pls?" in url + or "audio/x-mpegurl" in headers.get("content-type") + or "audio/x-scpls" in headers.get("content-type", "") ): # url is playlist, we need to unfold it substreams = await fetch_playlist(mass, url) @@ -497,7 +498,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo if not line.is_url: continue # unfold first url of playlist - resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, line.path) + return await resolve_radio_stream(mass, line.path) raise InvalidDataError("No content found in playlist") except IsHLSPlaylist: is_hls = True @@ -549,105 +550,6 @@ async def get_icy_stream( streamdetails.stream_title = cleaned_stream_title -async def get_hls_stream( - mass: MusicAssistant, - url: str, - streamdetails: StreamDetails, - seek_position: int = 0, -) -> AsyncGenerator[bytes, None]: - """Get audio stream from HTTP HLS stream.""" - logger = LOGGER.getChild("hls_stream") - logger.debug("Start streaming HLS stream for url %s", url) - timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) - prev_chunks: deque[str] = deque(maxlen=50) - has_playlist_metadata: bool | None = None - has_id3_metadata: bool | None = None - is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration - # we simply select the best quality substream here - # if we ever want to support adaptive stream selection based on bandwidth - # we need to move the substream selection into the loop below and make it - # bandwidth aware. For now we just assume domestic high bandwidth where - # the user wants the best quality possible at all times. - playlist_item = await get_hls_substream(mass, url) - substream_url = playlist_item.path - seconds_skipped = 0 - empty_loops = 0 - while True: - logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url) - async with mass.http_session.get( - substream_url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout - ) as resp: - resp.raise_for_status() - charset = resp.charset or "utf-8" - substream_m3u_data = await resp.text(charset) - # get chunk-parts from the substream - hls_chunks = parse_m3u(substream_m3u_data) - chunk_seconds = 0 - time_start = time.time() - for chunk_item in hls_chunks: - if chunk_item.path in prev_chunks: - continue - chunk_length = int(chunk_item.length) if chunk_item.length else 6 - # try to support seeking here - if seek_position and (seconds_skipped + chunk_length) < seek_position: - seconds_skipped += chunk_length - continue - chunk_item_url = chunk_item.path - if not chunk_item_url.startswith("http"): - # path is relative, stitch it together - base_path = substream_url.rsplit("/", 1)[0] - chunk_item_url = base_path + "/" + chunk_item.path - # handle (optional) in-playlist (timed) metadata - if has_playlist_metadata is None: - has_playlist_metadata = chunk_item.title not in (None, "") - logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata) - if has_playlist_metadata and chunk_item.title != "no desc": - # bbc (and maybe others?) set the title to 'no desc' - cleaned_stream_title = clean_stream_title(chunk_item.title) - if cleaned_stream_title != streamdetails.stream_title: - logger.log( - VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title - ) - logger.log( - VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title - ) - streamdetails.stream_title = cleaned_stream_title - logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item) - # prevent that we play this chunk again if we loop through - prev_chunks.append(chunk_item.path) - async with mass.http_session.get( - chunk_item_url, headers=HTTP_HEADERS, timeout=timeout - ) as resp: - yield await resp.content.read() - chunk_seconds += chunk_length - # handle (optional) in-band (m3u) metadata - if has_id3_metadata is not None and has_playlist_metadata: - continue - if has_id3_metadata in (None, True): - tags = await parse_tags(chunk_item_url) - has_id3_metadata = tags.title and tags.title not in chunk_item.path - logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata) - # end of stream reached - for non livestreams, we are ready and should return - # for livestreams we loop around to get the next playlist with chunks - if not is_live_stream: - return - # safeguard for an endless loop - # this may happen if we're simply going too fast for the live stream - # we already throttle it a bit but we may end up in a situation where something is wrong - # and we want to break out of this loop, hence this check - if chunk_seconds == 0: - empty_loops += 1 - await asyncio.sleep(1) - else: - empty_loops = 0 - if empty_loops == 50: - logger.warning("breaking out of endless loop") - break - # ensure that we're not going to fast - otherwise we get the same substream playlist - while (time.time() - time_start) < (chunk_seconds - 1): - await asyncio.sleep(0.5) - - async def get_hls_substream( mass: MusicAssistant, url: str, @@ -663,6 +565,11 @@ async def get_hls_substream( charset = resp.charset or "utf-8" master_m3u_data = await resp.text(charset) substreams = parse_m3u(master_m3u_data) + if any(x for x in substreams if x.length): + # this is already a substream! + return PlaylistItem( + path=url, + ) # sort substreams on best quality (highest bandwidth) when available if any(x for x in substreams if x.stream_info): substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) diff --git a/music_assistant/server/helpers/playlists.py b/music_assistant/server/helpers/playlists.py index 8df3606e8..8986de7fe 100644 --- a/music_assistant/server/helpers/playlists.py +++ b/music_assistant/server/helpers/playlists.py @@ -70,6 +70,8 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]: if len(info) != 2: continue length = info[0].strip()[0] + if length == "-1": + length = None title = info[1].strip() elif line.startswith("#EXT-X-STREAM-INF:"): # HLS stream properties @@ -129,9 +131,10 @@ def parse_pls(pls_data: str) -> list[PlaylistItem]: if file_option not in playlist_section: continue itempath = playlist_section[file_option] + length = playlist_section.get(f"Length{entry}") playlist.append( PlaylistItem( - length=playlist_section.get(f"Length{entry}"), + length=length if length and length != "-1" else None, title=playlist_section.get(f"Title{entry}"), path=itempath, ) diff --git a/music_assistant/server/providers/builtin/__init__.py b/music_assistant/server/providers/builtin/__init__.py index 38b0c2dea..f609e56c3 100644 --- a/music_assistant/server/providers/builtin/__init__.py +++ b/music_assistant/server/providers/builtin/__init__.py @@ -436,7 +436,10 @@ async def parse_item( force_radio: bool = False, ) -> Track | Radio: """Parse plain URL to MediaItem of type Radio or Track.""" - media_info = await self._get_media_info(url, force_refresh) + try: + media_info = await self._get_media_info(url, force_refresh) + except Exception as err: + raise MediaNotFoundError from err is_radio = media_info.get("icyname") or not media_info.duration provider_mappings = { ProviderMapping( diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index cafc6510a..ca0405286 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -185,7 +185,7 @@ def on_player_config_changed( """Call (by config manager) when the configuration of a player changes.""" super().on_player_config_changed(config, changed_keys) if "enabled" in changed_keys and config.player_id not in self.castplayers: - self.mass.create_task(self.mass.config.reload_provider, self.instance_id) + self.mass.create_task(self.mass.load_provider, self.instance_id) async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" diff --git a/music_assistant/server/providers/hass/__init__.py b/music_assistant/server/providers/hass/__init__.py index 3f7c11002..d704fe9f0 100644 --- a/music_assistant/server/providers/hass/__init__.py +++ b/music_assistant/server/providers/hass/__init__.py @@ -204,6 +204,6 @@ async def _hass_listener(self) -> None: await self.hass.start_listening() except BaseHassClientError as err: self.logger.warning("Connection to HA lost due to error: %s", err) - self.logger.info("Connection to HA lost. Reloading provider in 5 seconds.") + self.logger.info("Connection to HA lost. Connection will be automatically retried later.") # schedule a reload of the provider - self.mass.call_later(5, self.mass.config.reload_provider(self.instance_id)) + self.mass.call_later(5, self.mass.load_provider(self.instance_id, allow_retry=True)) diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index f4880f840..131f4348c 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -694,4 +694,4 @@ def _handle_disconnect(self, exc: Exception) -> None: "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.", str(exc) ) # schedule a reload of the provider - self.mass.call_later(5, self.mass.config.reload_provider(self.instance_id)) + self.mass.call_later(5, self.mass.load_provider(self.instance_id, allow_retry=True)) diff --git a/music_assistant/server/providers/soundcloud/__init__.py b/music_assistant/server/providers/soundcloud/__init__.py index 4f6585afd..a4e091adf 100644 --- a/music_assistant/server/providers/soundcloud/__init__.py +++ b/music_assistant/server/providers/soundcloud/__init__.py @@ -332,6 +332,7 @@ async def _parse_artist(self, artist_obj: dict) -> Artist: if not artist_id: msg = "Artist does not have a valid ID" raise InvalidDataError(msg) + artist_id = str(artist_id) artist = Artist( item_id=artist_id, name=artist_obj["username"], @@ -361,13 +362,14 @@ async def _parse_artist(self, artist_obj: dict) -> Artist: async def _parse_playlist(self, playlist_obj: dict) -> Playlist: """Parse a Soundcloud Playlist response to a Playlist object.""" + playlist_id = str(playlist_obj["id"]) playlist = Playlist( - item_id=playlist_obj["id"], + item_id=playlist_id, provider=self.domain, name=playlist_obj["title"], provider_mappings={ ProviderMapping( - item_id=playlist_obj["id"], + item_id=playlist_id, provider_domain=self.domain, provider_instance=self.instance_id, ) @@ -394,15 +396,16 @@ async def _parse_playlist(self, playlist_obj: dict) -> Playlist: async def _parse_track(self, track_obj: dict, playlist_position: int = 0) -> Track: """Parse a Soundcloud Track response to a Track model object.""" name, version = parse_title_and_version(track_obj["title"]) + track_id = str(track_obj["id"]) track = Track( - item_id=track_obj["id"], + item_id=track_id, provider=self.domain, name=name, version=version, duration=track_obj["duration"] / 1000, provider_mappings={ ProviderMapping( - item_id=track_obj["id"], + item_id=track_id, provider_domain=self.domain, provider_instance=self.instance_id, audio_format=AudioFormat( diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 794df518a..9b29aaad5 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -775,31 +775,27 @@ async def login(self) -> dict: if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN)): raise LoginFailed("Authentication required") - expires_at = self.config.get_value(CONF_AUTH_EXPIRES_AT) or 0 - access_token = self.config.get_value(CONF_ACCESS_TOKEN) - - if expires_at < (time.time() - 300): - # refresh token - client_id = self.config.get_value(CONF_CLIENT_ID) or app_var(2) - params = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": client_id, - } - async with self.mass.http_session.post( - "https://accounts.spotify.com/api/token", data=params - ) as response: - if response.status != 200: - err = await response.text() - self.mass.config.set_raw_provider_config_value( - self.instance_id, CONF_REFRESH_TOKEN, None - ) - raise LoginFailed(f"Failed to refresh access token: {err}") - data = await response.json() - access_token = data["access_token"] - refresh_token = data["refresh_token"] - expires_at = int(data["expires_in"] + time.time()) - self.logger.debug("Successfully refreshed access token") + # refresh token + client_id = self.config.get_value(CONF_CLIENT_ID) or app_var(2) + params = { + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": client_id, + } + async with self.mass.http_session.post( + "https://accounts.spotify.com/api/token", data=params + ) as response: + if response.status != 200: + err = await response.text() + self.mass.config.set_raw_provider_config_value( + self.instance_id, CONF_REFRESH_TOKEN, None + ) + raise LoginFailed(f"Failed to refresh access token: {err}") + data = await response.json() + access_token = data["access_token"] + refresh_token = data["refresh_token"] + expires_at = int(data["expires_in"] + time.time()) + self.logger.debug("Successfully refreshed access token") self._auth_info = auth_info = { "access_token": access_token, diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 51334ef89..7799337fc 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -428,18 +428,18 @@ async def load_provider_config( if existing := self._tracked_timers.pop(task_id, None): existing.cancel() - try: - await self._load_provider(prov_conf) - # pylint: disable=broad-except - except Exception as exc: - LOGGER.error( - "Error loading provider(instance) %s: %s", - prov_conf.name or prov_conf.instance_id, - str(exc) or exc.__class__.__name__, - # log full stack trace if debug logging is enabled - exc_info=exc if LOGGER.isEnabledFor(logging.DEBUG) else None, - ) - raise + await self._load_provider(prov_conf) + + # (re)load any dependants + prov_configs = await self.config.get_provider_configs(include_values=True) + for dep_prov_conf in prov_configs: + if not dep_prov_conf.enabled: + continue + manifest = self.get_provider_manifest(dep_prov_conf.domain) + if not manifest.depends_on: + continue + if manifest.depends_on == prov_conf.domain: + await self._load_provider(dep_prov_conf) async def load_provider( self, @@ -474,16 +474,30 @@ async def load_provider( # auto schedule a retry if the (re)load failed (handled exceptions only) if isinstance(exc, MusicAssistantError) and allow_retry: self.call_later( - 300, + 120, self.load_provider, instance_id, allow_retry, task_id=task_id, ) + LOGGER.warning( + "Error loading provider(instance) %s: %s (will be retried later)", + prov_conf.name or prov_conf.instance_id, + str(exc) or exc.__class__.__name__, + # log full stack trace if verbose logging is enabled + exc_info=exc if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) return # raise in all other situations raise + # (re)load any dependents if needed + for dep_prov in self.providers: + if dep_prov.available: + continue + if dep_prov.manifest.depends_on == prov_conf.domain: + await self.unload_provider(dep_prov.instance_id) + async def unload_provider(self, instance_id: str) -> None: """Unload a provider.""" if provider := self._providers.get(instance_id): @@ -577,17 +591,12 @@ async def _load_provider(self, conf: ProviderConfig) -> None: raise SetupFailedError(msg) # handle dependency on other provider - if prov_manifest.depends_on: - for _ in range(30): - if self.get_provider(prov_manifest.depends_on): - break - await asyncio.sleep(1) - else: - msg = ( - f"Provider {domain} depends on {prov_manifest.depends_on} " - "which is not available." - ) - raise SetupFailedError(msg) + if prov_manifest.depends_on and not self.get_provider(prov_manifest.depends_on): + msg = ( + f"Provider {domain} depends on {prov_manifest.depends_on} " + "which is not (yet) available." + ) + raise SetupFailedError(msg) # try to setup the module prov_mod = await load_provider_module(domain, prov_manifest.requirements)