Skip to content

Commit

Permalink
Various bugfixes and optimizations (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Aug 17, 2024
1 parent bf4c92f commit c0a9baf
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 201 deletions.
39 changes: 12 additions & 27 deletions music_assistant/server/controllers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,6 @@ async def set_provider_config_value(
conf_key = f"{CONF_PROVIDERS}/{config.instance_id}"
self.set(conf_key, config.to_raw())

@api_command("config/providers/reload")
async def reload_provider(self, instance_id: str) -> None:
"""Reload provider."""
try:
config = await self.get_provider_config(instance_id)
except KeyError:
# Edge case: Provider was removed before we could reload it
return
await self._load_provider_config(config)

@api_command("config/players")
async def get_player_configs(
self, provider: str | None = None, include_values: bool = False
Expand Down Expand Up @@ -732,6 +722,16 @@ async def _async_save(self) -> None:
await _file.write(json_dumps(self._data, indent=True))
LOGGER.debug("Saved data to persistent storage")

@api_command("config/providers/reload")
async def _reload_provider(self, instance_id: str) -> None:
"""Reload provider."""
try:
config = await self.get_provider_config(instance_id)
except KeyError:
# Edge case: Provider was removed before we could reload it
return
await self.mass.load_provider_config(config)

async def _update_provider_config(
self, instance_id: str, values: dict[str, ConfigValueType]
) -> ProviderConfig:
Expand All @@ -750,7 +750,7 @@ async def _update_provider_config(
raw_conf = config.to_raw()
self.set(conf_key, raw_conf)
if config.enabled:
await self._load_provider_config(config)
await self.mass.load_provider_config(config)
else:
# disable provider
prov_manifest = self.mass.get_provider_manifest(config.domain)
Expand Down Expand Up @@ -826,24 +826,9 @@ async def _add_provider_config(
self.set(conf_key, config.to_raw())
# try to load the provider
try:
await self._load_provider_config(config)
await self.mass.load_provider_config(config)
except Exception:
# loading failed, remove config
self.remove(conf_key)
raise
return config

async def _load_provider_config(self, config: ProviderConfig) -> None:
"""Load given provider config."""
# check if there are no other providers dependent of this provider
deps = set()
for dep_prov in self.mass.providers:
if dep_prov.manifest.depends_on == config.domain:
deps.add(dep_prov.instance_id)
await self.mass.unload_provider(dep_prov.instance_id)
# (re)load the provider
await self.mass.load_provider_config(config)
# reload any dependants
for dep in deps:
conf = await self.get_provider_config(dep)
await self.mass.load_provider(conf.instance_id)
37 changes: 32 additions & 5 deletions music_assistant/server/controllers/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,14 @@ def on_sync_task_done(task: asyncio.Task) -> None:

async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")):
# removal of a local provider can become messy very fast due to the relations
# such as images pointing at the files etc. so we just reset the whole db
self.logger.warning(
"Removal of local provider detected, issuing full database reset..."
)
await self._reset_database()
return
deleted_providers = self.mass.config.get_raw_core_config_value(
self.domain, CONF_DELETED_PROVIDERS, []
)
Expand All @@ -828,8 +836,8 @@ async def cleanup_provider(self, provider_instance: str) -> None:
)
self.mass.config.save(True)

# clean cache items from deleted provider(s)
await self.mass.cache.clear(provider_instance)
# always clear cache when a provider is removed
await self.mass.cache.clear()

# cleanup media items from db matched to deleted provider
self.logger.info(
Expand Down Expand Up @@ -962,7 +970,7 @@ async def _setup_database(self) -> None:
"Database migration failed - setup can not continue. "
"Try restarting the server. If this issue persists, create an issue report "
" on Github and/or re-install the server (or restore a backup).",
exc_info=err,
exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
# restore backup file
await asyncio.to_thread(shutil.copyfile, db_path_backup, db_path)
Expand All @@ -987,6 +995,7 @@ async def _setup_database(self) -> None:

async def __migrate_database(self, prev_version: int) -> None:
"""Perform a database migration."""
# ruff: noqa: PLR0915
self.logger.info(
"Migrating database from version %s to %s", prev_version, DB_SCHEMA_VERSION
)
Expand Down Expand Up @@ -1075,7 +1084,16 @@ async def __migrate_database(self, prev_version: int) -> None:
item.provider_mappings = {
x for x in item.provider_mappings if x.provider_instance is not None
}
await ctrl.update_item_in_library(item.item_id, item, True)
try:
await ctrl.update_item_in_library(item.item_id, item, True)
except Exception as err:
self.logger.warning(
"Error while migrating %s: %s",
item.item_id,
str(err),
exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
await ctrl.remove_item_from_library(item.item_id)

if prev_version <= 5:
# mark all provider mappings as available to recover from the bug
Expand Down Expand Up @@ -1114,7 +1132,16 @@ async def __migrate_database(self, prev_version: int) -> None:
changes = True
if changes:
media_item.metadata.images = images
await ctrl.update_item_in_library(media_item.item_id, media_item, True)
try:
await ctrl.update_item_in_library(media_item.item_id, media_item, True)
except Exception as err:
self.logger.warning(
"Error while migrating %s: %s",
media_item.item_id,
str(err),
exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
await ctrl.remove_item_from_library(media_item.item_id)

# save changes
await self.database.commit()
Expand Down
12 changes: 8 additions & 4 deletions music_assistant/server/controllers/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
check_audio_support,
crossfade_pcm_parts,
get_ffmpeg_stream,
get_hls_stream,
get_hls_substream,
get_icy_stream,
get_player_filter_params,
get_silence,
Expand Down Expand Up @@ -751,9 +751,13 @@ async def get_media_stream(
async for chunk in get_silence(2, pcm_format):
yield chunk
elif streamdetails.stream_type == StreamType.HLS:
audio_source = get_hls_stream(
self.mass, streamdetails.path, streamdetails, streamdetails.seek_position
)
# we simply select the best quality substream here
# if we ever want to support adaptive stream selection based on bandwidth
# we need to move the substream selection into the loop below and make it
# bandwidth aware. For now we just assume domestic high bandwidth where
# the user wants the best quality possible at all times.
substream = await get_hls_substream(self.mass, streamdetails.path)
audio_source = substream.path
elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
audio_source = streamdetails.path
extra_input_args += ["-decryption_key", streamdetails.decryption_key]
Expand Down
119 changes: 13 additions & 106 deletions music_assistant/server/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import re
import struct
import time
from collections import deque
from collections.abc import AsyncGenerator
from contextlib import suppress
Expand Down Expand Up @@ -50,7 +49,6 @@
fetch_playlist,
parse_m3u,
)
from music_assistant.server.helpers.tags import parse_tags

from .process import AsyncProcess, check_output, communicate
from .util import create_tempfile
Expand Down Expand Up @@ -371,7 +369,7 @@ async def get_stream_details(

# work out how to handle radio stream
if (
streamdetails.media_type == MediaType.RADIO
streamdetails.media_type in (MediaType.RADIO, StreamType.ICY, StreamType.HLS)
and streamdetails.stream_type == StreamType.HTTP
):
resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, streamdetails.path)
Expand Down Expand Up @@ -467,7 +465,6 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
- bool if the URL represents a ICY (radio) stream.
- bool uf the URL represents a HLS stream/playlist.
"""
base_url = url.split("?")[0]
cache_key = f"RADIO_RESOLVED_{url}"
if cache := await mass.cache.get(cache_key):
return cache
Expand All @@ -486,8 +483,12 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
is_icy = headers.get("icy-metaint") is not None
is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
if (
base_url.endswith((".m3u", ".m3u8", ".pls"))
or headers.get("content-type") == "audio/x-mpegurl"
url.endswith((".m3u", ".m3u8", ".pls"))
or ".m3u?" in url
or ".m3u8?" in url
or ".pls?" in url
or "audio/x-mpegurl" in headers.get("content-type")
or "audio/x-scpls" in headers.get("content-type", "")
):
# url is playlist, we need to unfold it
substreams = await fetch_playlist(mass, url)
Expand All @@ -497,7 +498,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
if not line.is_url:
continue
# unfold first url of playlist
resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, line.path)
return await resolve_radio_stream(mass, line.path)
raise InvalidDataError("No content found in playlist")
except IsHLSPlaylist:
is_hls = True
Expand Down Expand Up @@ -549,105 +550,6 @@ async def get_icy_stream(
streamdetails.stream_title = cleaned_stream_title


async def get_hls_stream(
mass: MusicAssistant,
url: str,
streamdetails: StreamDetails,
seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Get audio stream from HTTP HLS stream."""
logger = LOGGER.getChild("hls_stream")
logger.debug("Start streaming HLS stream for url %s", url)
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
prev_chunks: deque[str] = deque(maxlen=50)
has_playlist_metadata: bool | None = None
has_id3_metadata: bool | None = None
is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
# we simply select the best quality substream here
# if we ever want to support adaptive stream selection based on bandwidth
# we need to move the substream selection into the loop below and make it
# bandwidth aware. For now we just assume domestic high bandwidth where
# the user wants the best quality possible at all times.
playlist_item = await get_hls_substream(mass, url)
substream_url = playlist_item.path
seconds_skipped = 0
empty_loops = 0
while True:
logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
async with mass.http_session.get(
substream_url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
charset = resp.charset or "utf-8"
substream_m3u_data = await resp.text(charset)
# get chunk-parts from the substream
hls_chunks = parse_m3u(substream_m3u_data)
chunk_seconds = 0
time_start = time.time()
for chunk_item in hls_chunks:
if chunk_item.path in prev_chunks:
continue
chunk_length = int(chunk_item.length) if chunk_item.length else 6
# try to support seeking here
if seek_position and (seconds_skipped + chunk_length) < seek_position:
seconds_skipped += chunk_length
continue
chunk_item_url = chunk_item.path
if not chunk_item_url.startswith("http"):
# path is relative, stitch it together
base_path = substream_url.rsplit("/", 1)[0]
chunk_item_url = base_path + "/" + chunk_item.path
# handle (optional) in-playlist (timed) metadata
if has_playlist_metadata is None:
has_playlist_metadata = chunk_item.title not in (None, "")
logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
if has_playlist_metadata and chunk_item.title != "no desc":
# bbc (and maybe others?) set the title to 'no desc'
cleaned_stream_title = clean_stream_title(chunk_item.title)
if cleaned_stream_title != streamdetails.stream_title:
logger.log(
VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
)
logger.log(
VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
)
streamdetails.stream_title = cleaned_stream_title
logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
# prevent that we play this chunk again if we loop through
prev_chunks.append(chunk_item.path)
async with mass.http_session.get(
chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
yield await resp.content.read()
chunk_seconds += chunk_length
# handle (optional) in-band (m3u) metadata
if has_id3_metadata is not None and has_playlist_metadata:
continue
if has_id3_metadata in (None, True):
tags = await parse_tags(chunk_item_url)
has_id3_metadata = tags.title and tags.title not in chunk_item.path
logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
# end of stream reached - for non livestreams, we are ready and should return
# for livestreams we loop around to get the next playlist with chunks
if not is_live_stream:
return
# safeguard for an endless loop
# this may happen if we're simply going too fast for the live stream
# we already throttle it a bit but we may end up in a situation where something is wrong
# and we want to break out of this loop, hence this check
if chunk_seconds == 0:
empty_loops += 1
await asyncio.sleep(1)
else:
empty_loops = 0
if empty_loops == 50:
logger.warning("breaking out of endless loop")
break
# ensure that we're not going to fast - otherwise we get the same substream playlist
while (time.time() - time_start) < (chunk_seconds - 1):
await asyncio.sleep(0.5)


async def get_hls_substream(
mass: MusicAssistant,
url: str,
Expand All @@ -663,6 +565,11 @@ async def get_hls_substream(
charset = resp.charset or "utf-8"
master_m3u_data = await resp.text(charset)
substreams = parse_m3u(master_m3u_data)
if any(x for x in substreams if x.length):
# this is already a substream!
return PlaylistItem(
path=url,
)
# sort substreams on best quality (highest bandwidth) when available
if any(x for x in substreams if x.stream_info):
substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
Expand Down
5 changes: 4 additions & 1 deletion music_assistant/server/helpers/playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def parse_m3u(m3u_data: str) -> list[PlaylistItem]:
if len(info) != 2:
continue
length = info[0].strip()[0]
if length == "-1":
length = None
title = info[1].strip()
elif line.startswith("#EXT-X-STREAM-INF:"):
# HLS stream properties
Expand Down Expand Up @@ -129,9 +131,10 @@ def parse_pls(pls_data: str) -> list[PlaylistItem]:
if file_option not in playlist_section:
continue
itempath = playlist_section[file_option]
length = playlist_section.get(f"Length{entry}")
playlist.append(
PlaylistItem(
length=playlist_section.get(f"Length{entry}"),
length=length if length and length != "-1" else None,
title=playlist_section.get(f"Title{entry}"),
path=itempath,
)
Expand Down
5 changes: 4 additions & 1 deletion music_assistant/server/providers/builtin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,10 @@ async def parse_item(
force_radio: bool = False,
) -> Track | Radio:
"""Parse plain URL to MediaItem of type Radio or Track."""
media_info = await self._get_media_info(url, force_refresh)
try:
media_info = await self._get_media_info(url, force_refresh)
except Exception as err:
raise MediaNotFoundError from err
is_radio = media_info.get("icyname") or not media_info.duration
provider_mappings = {
ProviderMapping(
Expand Down
2 changes: 1 addition & 1 deletion music_assistant/server/providers/chromecast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def on_player_config_changed(
"""Call (by config manager) when the configuration of a player changes."""
super().on_player_config_changed(config, changed_keys)
if "enabled" in changed_keys and config.player_id not in self.castplayers:
self.mass.create_task(self.mass.config.reload_provider, self.instance_id)
self.mass.create_task(self.mass.load_provider, self.instance_id)

async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
Expand Down
4 changes: 2 additions & 2 deletions music_assistant/server/providers/hass/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,6 @@ async def _hass_listener(self) -> None:
await self.hass.start_listening()
except BaseHassClientError as err:
self.logger.warning("Connection to HA lost due to error: %s", err)
self.logger.info("Connection to HA lost. Reloading provider in 5 seconds.")
self.logger.info("Connection to HA lost. Connection will be automatically retried later.")
# schedule a reload of the provider
self.mass.call_later(5, self.mass.config.reload_provider(self.instance_id))
self.mass.call_later(5, self.mass.load_provider(self.instance_id, allow_retry=True))
Loading

0 comments on commit c0a9baf

Please sign in to comment.