diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 8374afa9c..78cc15ae0 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -445,3 +445,14 @@ class CacheCategory(IntEnum): PLAYER_QUEUE_STATE = 7 MEDIA_INFO = 8 LIBRARY_ITEMS = 9 + + +class VolumeNormalizationMode(StrEnum): + """Enum with possible VolumeNormalization modes.""" + + DISABLED = "disabled" + DYNAMIC = "dynamic" + MEASUREMENT_ONLY = "measurement_only" + FALLBACK_FIXED_GAIN = "fallback_fixed_gain" + FIXED_GAIN = "fixed_gain" + FALLBACK_DYNAMIC = "fallback_dynamic" diff --git a/music_assistant/common/models/streamdetails.py b/music_assistant/common/models/streamdetails.py index 5dc0c6b69..9740e9ce6 100644 --- a/music_assistant/common/models/streamdetails.py +++ b/music_assistant/common/models/streamdetails.py @@ -7,7 +7,7 @@ from mashumaro import DataClassDictMixin -from music_assistant.common.models.enums import MediaType, StreamType +from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode from music_assistant.common.models.media_items import AudioFormat @@ -44,11 +44,10 @@ class StreamDetails(DataClassDictMixin): # the fields below will be set/controlled by the streamcontroller seek_position: int = 0 fade_in: bool = False - enable_volume_normalization: bool = False loudness: float | None = None loudness_album: float | None = None prefer_album_loudness: bool = False - force_dynamic_volume_normalization: bool = False + volume_normalization_mode: VolumeNormalizationMode | None = None queue_id: str | None = None seconds_streamed: float | None = None target_loudness: float | None = None diff --git a/music_assistant/constants.py b/music_assistant/constants.py index ea3c9feb2..820f909aa 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -71,6 +71,9 @@ CONF_SYNCGROUP_DEFAULT_ON: Final[str] = "syncgroup_default_on" CONF_ENABLE_ICY_METADATA: Final[str] = "enable_icy_metadata" CONF_VOLUME_NORMALIZATION_RADIO: Final[str] = "volume_normalization_radio" +CONF_VOLUME_NORMALIZATION_TRACKS: Final[str] = "volume_normalization_tracks" +CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO: Final[str] = "volume_normalization_fixed_gain_radio" +CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS: Final[str] = "volume_normalization_fixed_gain_tracks" # config default values DEFAULT_HOST: Final[str] = "0.0.0.0" diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index f02c416aa..411f96e4c 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -76,6 +76,7 @@ class CompareState(TypedDict): current_index: int | None elapsed_time: int stream_title: str | None + content_type: str | None class PlayerQueuesController(CoreController): @@ -659,7 +660,7 @@ async def next(self, queue_id: str) -> None: while True: try: if (next_index := self._get_next_index(queue_id, idx, True)) is not None: - await self.play_index(queue_id, next_index) + await self.play_index(queue_id, next_index, debounce=True) break except MediaNotFoundError: self.logger.warning( @@ -683,7 +684,7 @@ async def previous(self, queue_id: str) -> None: current_index = self._queues[queue_id].current_index if current_index is None: return - await self.play_index(queue_id, max(current_index - 1, 0)) + await self.play_index(queue_id, max(current_index - 1, 0), debounce=True) @api_command("player_queues/skip") async def skip(self, queue_id: str, seconds: int = 10) -> None: @@ -765,6 +766,7 @@ async def play_index( index: int | str, seek_position: int = 0, fade_in: bool = False, + debounce: bool = False, ) -> None: """Play item at index (or item_id) X in queue.""" queue = self._queues[queue_id] @@ -786,6 +788,9 @@ async def play_index( queue.stream_finished = False queue.end_of_track_reached = False + queue.current_item = queue_item + self.signal_update(queue_id) + # work out if we are playing an album and if we should prefer album loudness if ( next_index is not None @@ -821,13 +826,14 @@ async def play_index( # NOTE that we debounce this a bit to account for someone hitting the next button # like a madman. This will prevent the player from being overloaded with requests. self.mass.call_later( - 0.25, + 1 if debounce else 0.1, self.mass.players.play_media, player_id=queue_id, # transform into PlayerMedia to send to the actual player implementation media=self.player_media_from_queue_item(queue_item, queue.flow_mode), task_id=f"play_media_{queue_id}", ) + self.signal_update(queue_id) @api_command("player_queues/transfer") async def transfer_queue( @@ -981,6 +987,9 @@ def on_player_update( stream_title=queue.current_item.streamdetails.stream_title if queue.current_item and queue.current_item.streamdetails else None, + content_type=queue.current_item.streamdetails.audio_format.output_format_str + if queue.current_item and queue.current_item.streamdetails + else None, ) changed_keys = get_changed_keys(prev_state, new_state) # return early if nothing changed @@ -1148,10 +1157,6 @@ def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None: # enqueue the next track as soon as the player reports # it has started buffering the given queue item task_id = f"enqueue_next_{queue_id}" - self.mass.call_later(0.2, self._enqueue_next, queue, item_id, task_id=task_id) - # we repeat this task once more after 2 seconds to ensure the player - # received the command as it may be missed at the first attempt - # due to a race condition self.mass.call_later(2, self._enqueue_next, queue, item_id, task_id=task_id) # Main queue manipulation methods @@ -1225,7 +1230,7 @@ def signal_update(self, queue_id: str, items_changed: bool = False) -> None: if queue.index_in_buffer is not None: task_id = f"enqueue_next_{queue.queue_id}" self.mass.call_later( - 1, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id + 5, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id ) # always send the base event diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index e51d77651..88731633a 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -24,7 +24,13 @@ ConfigValueOption, ConfigValueType, ) -from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType +from music_assistant.common.models.enums import ( + ConfigEntryType, + ContentType, + MediaType, + StreamType, + VolumeNormalizationMode, +) from music_assistant.common.models.errors import QueueEmpty from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.streamdetails import StreamDetails @@ -39,7 +45,10 @@ CONF_PUBLISH_IP, CONF_SAMPLE_RATES, CONF_VOLUME_NORMALIZATION, + CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO, + CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS, CONF_VOLUME_NORMALIZATION_RADIO, + CONF_VOLUME_NORMALIZATION_TRACKS, MASS_LOGO_ONLINE, SILENCE_FILE, VERBOSE_LOG_LEVEL, @@ -49,7 +58,6 @@ check_audio_support, crossfade_pcm_parts, get_chunksize, - get_ffmpeg_stream, get_hls_radio_stream, get_hls_substream, get_icy_radio_stream, @@ -58,6 +66,8 @@ get_silence, get_stream_details, ) +from music_assistant.server.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER +from music_assistant.server.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -70,12 +80,13 @@ DEFAULT_STREAM_HEADERS = { + "Server": "Music Assistant", "transferMode.dlna.org": "Streaming", "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", # noqa: E501 "Cache-Control": "no-cache,must-revalidate", "Pragma": "no-cache", - "Connection": "close", "Accept-Ranges": "none", + "Connection": "close", } ICY_HEADERS = { "icy-name": "Music Assistant", @@ -146,6 +157,44 @@ async def get_config_entries( "Make sure that this server can be reached " "on the given IP and TCP port by players on the local network.", ), + ConfigEntry( + key=CONF_VOLUME_NORMALIZATION_RADIO, + type=ConfigEntryType.STRING, + default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC, + label="Volume normalization method for radio streams", + options=( + ConfigValueOption(x.value.replace("_", " ").title(), x.value) + for x in VolumeNormalizationMode + ), + category="audio", + ), + ConfigEntry( + key=CONF_VOLUME_NORMALIZATION_TRACKS, + type=ConfigEntryType.STRING, + default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC, + label="Volume normalization method for tracks", + options=( + ConfigValueOption(x.value.replace("_", " ").title(), x.value) + for x in VolumeNormalizationMode + ), + category="audio", + ), + ConfigEntry( + key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO, + type=ConfigEntryType.FLOAT, + range=(-20, 10), + default_value=-6, + label="Fixed/fallback gain adjustment for radio streams", + category="audio", + ), + ConfigEntry( + key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS, + type=ConfigEntryType.FLOAT, + range=(-20, 10), + default_value=-6, + label="Fixed/fallback gain adjustment for tracks", + category="audio", + ), ConfigEntry( key=CONF_PUBLISH_IP, type=ConfigEntryType.STRING, @@ -171,26 +220,6 @@ async def get_config_entries( "not be adjusted in regular setups.", category="advanced", ), - ConfigEntry( - key=CONF_VOLUME_NORMALIZATION_RADIO, - type=ConfigEntryType.STRING, - default_value="standard", - label="Volume normalization method to use for radio streams", - description="Radio streams often have varying loudness levels, especially " - "during announcements and commercials. \n" - "You can choose to enforce dynamic volume normalization to radio streams, " - "even if a (average) loudness measurement for the radio station exists. \n\n" - "Options: \n" - "- Disabled - do not apply volume normalization at all \n" - "- Force dynamic - Enforce dynamic volume levelling at all times \n" - "- Standard - use normalization based on previous measurement, ", - options=( - ConfigValueOption("Disabled", "disabled"), - ConfigValueOption("Force dynamic", "dynamic"), - ConfigValueOption("Standard", "standard"), - ), - category="advanced", - ), ) async def setup(self, config: CoreConfig) -> None: @@ -211,8 +240,9 @@ async def setup(self, config: CoreConfig) -> None: version, "with libsoxr support" if libsoxr_support else "", ) - # copy log level to audio module + # copy log level to audio/ffmpeg loggers AUDIO_LOGGER.setLevel(self.logger.level) + FFMPEG_LOGGER.setLevel(self.logger.level) # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -305,9 +335,6 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.Response: headers = { **DEFAULT_STREAM_HEADERS, "Content-Type": f"audio/{output_format.output_format_str}", - "Accept-Ranges": "none", - "Cache-Control": "no-cache", - "Connection": "close", "icy-name": queue_item.name, } resp = web.StreamResponse( @@ -769,23 +796,41 @@ async def get_media_stream( filter_params = [] extra_input_args = [] # handle volume normalization - if streamdetails.enable_volume_normalization and streamdetails.target_loudness is not None: - if streamdetails.force_dynamic_volume_normalization or streamdetails.loudness is None: - # volume normalization with unknown loudness measurement - # use loudnorm filter in dynamic mode - # which also collects the measurement on the fly during playback - # more info: https://k.ylo.ph/2016/04/04/loudnorm.html - filter_rule = ( - f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0" - ) - filter_rule += ":print_format=json" - filter_params.append(filter_rule) - else: - # volume normalization with known loudness measurement - # apply fixed volume/gain correction - gain_correct = streamdetails.target_loudness - streamdetails.loudness - gain_correct = round(gain_correct, 2) - filter_params.append(f"volume={gain_correct}dB") + enable_volume_normalization = ( + streamdetails.target_loudness is not None + and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED + ) + dynamic_volume_normalization = ( + streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC + and enable_volume_normalization + ) + if dynamic_volume_normalization: + # volume normalization using loudnorm filter (in dynamic mode) + # which also collects the measurement on the fly during playback + # more info: https://k.ylo.ph/2016/04/04/loudnorm.html + filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0" + filter_rule += ":print_format=json" + filter_params.append(filter_rule) + elif ( + enable_volume_normalization + and streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN + ): + # apply used defined fixed volume/gain correction + gain_correct: float = await self.mass.config.get_core_config_value( + CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO + if streamdetails.media_type == MediaType.RADIO + else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS, + ) + gain_correct = round(gain_correct, 2) + filter_params.append(f"volume={gain_correct}dB") + elif enable_volume_normalization and streamdetails.loudness is not None: + # volume normalization with known loudness measurement + # apply volume/gain correction + gain_correct = streamdetails.target_loudness - streamdetails.loudness + gain_correct = round(gain_correct, 2) + filter_params.append(f"volume={gain_correct}dB") + + # work out audio source for these streamdetails if streamdetails.stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, @@ -819,8 +864,9 @@ async def get_media_stream( if streamdetails.media_type == MediaType.RADIO: # pad some silence before the radio stream starts to create some headroom # for radio stations that do not provide any look ahead buffer - # without this, some radio streams jitter a lot - async for chunk in get_silence(2, pcm_format): + # without this, some radio streams jitter a lot, especially with dynamic normalization + pad_seconds = 5 if dynamic_volume_normalization else 2 + async for chunk in get_silence(pad_seconds, pcm_format): yield chunk async for chunk in get_media_stream( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 4809b8ee7..84f08adda 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -10,23 +10,18 @@ import time from collections import deque from collections.abc import AsyncGenerator -from contextlib import suppress from io import BytesIO -from signal import SIGINT from typing import TYPE_CHECKING import aiofiles from aiohttp import ClientTimeout -from music_assistant.common.helpers.global_cache import ( - get_global_cache_value, - set_global_cache_values, -) +from music_assistant.common.helpers.global_cache import set_global_cache_values from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads from music_assistant.common.helpers.util import clean_stream_title -from music_assistant.common.models.enums import MediaType, StreamType +from music_assistant.common.models.config_entries import CoreConfig, PlayerConfig +from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode from music_assistant.common.models.errors import ( - AudioError, InvalidDataError, MediaNotFoundError, MusicAssistantError, @@ -41,20 +36,16 @@ CONF_VOLUME_NORMALIZATION, CONF_VOLUME_NORMALIZATION_RADIO, CONF_VOLUME_NORMALIZATION_TARGET, + CONF_VOLUME_NORMALIZATION_TRACKS, MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL, ) -from music_assistant.server.helpers.playlists import ( - HLS_CONTENT_TYPES, - IsHLSPlaylist, - PlaylistItem, - fetch_playlist, - parse_m3u, -) -from music_assistant.server.helpers.tags import parse_tags -from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER +from .ffmpeg import FFMpeg, get_ffmpeg_stream +from .playlists import HLS_CONTENT_TYPES, IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, check_output, communicate +from .tags import parse_tags +from .throttle_retry import BYPASS_THROTTLER from .util import create_tempfile if TYPE_CHECKING: @@ -69,142 +60,6 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} -class FFMpeg(AsyncProcess): - """FFMpeg wrapped as AsyncProcess.""" - - def __init__( - self, - audio_input: AsyncGenerator[bytes, None] | str | int, - input_format: AudioFormat, - output_format: AudioFormat, - filter_params: list[str] | None = None, - extra_args: list[str] | None = None, - extra_input_args: list[str] | None = None, - audio_output: str | int = "-", - collect_log_history: bool = False, - logger: logging.Logger | None = None, - ) -> None: - """Initialize AsyncProcess.""" - ffmpeg_args = get_ffmpeg_args( - input_format=input_format, - output_format=output_format, - filter_params=filter_params or [], - extra_args=extra_args or [], - input_path=audio_input if isinstance(audio_input, str) else "-", - output_path=audio_output if isinstance(audio_output, str) else "-", - extra_input_args=extra_input_args or [], - loglevel="info", - ) - self.audio_input = audio_input - self.input_format = input_format - self.collect_log_history = collect_log_history - self.log_history: deque[str] = deque(maxlen=100) - self._stdin_task: asyncio.Task | None = None - self._logger_task: asyncio.Task | None = None - super().__init__( - ffmpeg_args, - stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input, - stdout=True if isinstance(audio_output, str) else audio_output, - stderr=True, - ) - self.logger = logger or LOGGER.getChild("ffmpeg") - clean_args = [] - for arg in ffmpeg_args[1:]: - if arg.startswith("http"): - clean_args.append("") - elif "/" in arg and "." in arg: - clean_args.append("") - else: - clean_args.append(arg) - args_str = " ".join(clean_args) - self.logger.log(VERBOSE_LOG_LEVEL, "starting ffmpeg with args: %s", args_str) - - async def start(self) -> None: - """Perform Async init of process.""" - await super().start() - self._logger_task = asyncio.create_task(self._log_reader_task()) - if isinstance(self.audio_input, AsyncGenerator): - self._stdin_task = asyncio.create_task(self._feed_stdin()) - - async def close(self, send_signal: bool = True) -> None: - """Close/terminate the process and wait for exit.""" - if self._stdin_task and not self._stdin_task.done(): - self._stdin_task.cancel() - if not self.collect_log_history: - await super().close(send_signal) - return - # override close logic to make sure we catch all logging - self._close_called = True - if send_signal and self.returncode is None: - self.proc.send_signal(SIGINT) - if self.proc.stdin and not self.proc.stdin.is_closing(): - self.proc.stdin.close() - await asyncio.sleep(0) # yield to loop - # abort existing readers on stdout first before we send communicate - waiter: asyncio.Future - if self.proc.stdout and (waiter := self.proc.stdout._waiter): - self.proc.stdout._waiter = None - if waiter and not waiter.done(): - waiter.set_exception(asyncio.CancelledError()) - # read remaining bytes to unblock pipe - await self.read(-1) - # wait for log task to complete that reads the remaining data from stderr - with suppress(TimeoutError): - await asyncio.wait_for(self._logger_task, 5) - await super().close(False) - - async def _log_reader_task(self) -> None: - """Read ffmpeg log from stderr.""" - decode_errors = 0 - async for line in self.iter_stderr(): - if self.collect_log_history: - self.log_history.append(line) - if "error" in line or "warning" in line: - self.logger.debug(line) - elif "critical" in line: - self.logger.warning(line) - else: - self.logger.log(VERBOSE_LOG_LEVEL, line) - - if "Invalid data found when processing input" in line: - decode_errors += 1 - if decode_errors >= 50: - self.logger.error(line) - await super().close(True) - - # if streamdetails contenttype is unknown, try parse it from the ffmpeg log - if line.startswith("Stream #") and ": Audio: " in line: - if self.input_format.content_type == ContentType.UNKNOWN: - content_type_raw = line.split(": Audio: ")[1].split(" ")[0] - content_type = ContentType.try_parse(content_type_raw) - self.logger.debug( - "Detected (input) content type: %s (%s)", content_type, content_type_raw - ) - self.input_format.content_type = content_type - del line - - async def _feed_stdin(self) -> None: - """Feed stdin with audio chunks from an AsyncGenerator.""" - if TYPE_CHECKING: - self.audio_input: AsyncGenerator[bytes, None] - try: - async for chunk in self.audio_input: - await self.write(chunk) - # write EOF once we've reached the end of the input stream - await self.write_eof() - except Exception as err: - if isinstance(err, asyncio.CancelledError): - return - # make sure we dont swallow any exceptions and we bail out - # once our audio source fails. - self.logger.error( - "Stream error: %s", - str(err), - exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, - ) - await self.write_eof() - - async def crossfade_pcm_parts( fade_in_part: bytes, fade_out_part: bytes, @@ -389,6 +244,7 @@ async def get_stream_details( streamdetails.fade_in = fade_in if not streamdetails.duration: streamdetails.duration = queue_item.duration + # handle volume normalization details if result := await mass.music.get_loudness( streamdetails.item_id, @@ -398,16 +254,11 @@ async def get_stream_details( streamdetails.loudness, streamdetails.loudness_album = result streamdetails.prefer_album_loudness = prefer_album_loudness player_settings = await mass.config.get_player_config(streamdetails.queue_id) - streamdetails.enable_volume_normalization = player_settings.get_value(CONF_VOLUME_NORMALIZATION) - streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET) - - radio_norm_pref = await mass.config.get_core_config_value( - "streams", CONF_VOLUME_NORMALIZATION_RADIO + core_config = await mass.config.get_core_config("streams") + streamdetails.volume_normalization_mode = _get_normalization_mode( + core_config, player_settings, streamdetails ) - if streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "disabled": - streamdetails.enable_volume_normalization = False - elif streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "dynamic": - streamdetails.force_dynamic_volume_normalization = True + streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET) process_time = int((time.time() - time_start) * 1000) LOGGER.debug("retrieved streamdetails for %s in %s milliseconds", queue_item.uri, process_time) @@ -434,90 +285,85 @@ async def get_media_stream( chunk_number = 0 buffer: bytes = b"" finished = False + + ffmpeg_proc = FFMpeg( + audio_input=audio_source, + input_format=streamdetails.audio_format, + output_format=pcm_format, + filter_params=filter_params, + extra_input_args=extra_input_args, + collect_log_history=True, + ) try: - async with FFMpeg( - audio_input=audio_source, - input_format=streamdetails.audio_format, - output_format=pcm_format, - filter_params=filter_params, - extra_input_args=extra_input_args, - collect_log_history=True, - logger=logger, - ) as ffmpeg_proc: - async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): - # for radio streams we just yield all chunks directly - if streamdetails.media_type == MediaType.RADIO: - yield chunk - bytes_sent += len(chunk) - continue + await ffmpeg_proc.start() + async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): + # for radio streams we just yield all chunks directly + if streamdetails.media_type == MediaType.RADIO: + yield chunk + bytes_sent += len(chunk) + continue - chunk_number += 1 - # determine buffer size dynamically - if chunk_number < 5 and strip_silence_begin: - req_buffer_size = int(pcm_format.pcm_sample_size * 4) - elif chunk_number > 30 and strip_silence_end: - req_buffer_size = int(pcm_format.pcm_sample_size * 8) - else: - req_buffer_size = int(pcm_format.pcm_sample_size * 2) - - # always append to buffer - buffer += chunk - del chunk - - if len(buffer) < req_buffer_size: - # buffer is not full enough, move on - continue + chunk_number += 1 + # determine buffer size dynamically + if chunk_number < 5 and strip_silence_begin: + req_buffer_size = int(pcm_format.pcm_sample_size * 4) + elif chunk_number > 30 and strip_silence_end: + req_buffer_size = int(pcm_format.pcm_sample_size * 8) + else: + req_buffer_size = int(pcm_format.pcm_sample_size * 2) - if chunk_number == 5 and strip_silence_begin: - # strip silence from begin of audio - chunk = await strip_silence( # noqa: PLW2901 - mass, buffer, pcm_format=pcm_format - ) - bytes_sent += len(chunk) - yield chunk - buffer = b"" - continue + # always append to buffer + buffer += chunk + del chunk + + if len(buffer) < req_buffer_size: + # buffer is not full enough, move on + continue - #### OTHER: enough data in buffer, feed to output - while len(buffer) > req_buffer_size: - yield buffer[: pcm_format.pcm_sample_size] - bytes_sent += pcm_format.pcm_sample_size - buffer = buffer[pcm_format.pcm_sample_size :] - - # end of audio/track reached - if strip_silence_end and buffer: - # strip silence from end of audio - buffer = await strip_silence( - mass, - buffer, - pcm_format=pcm_format, - reverse=True, + if chunk_number == 5 and strip_silence_begin: + # strip silence from begin of audio + chunk = await strip_silence( # noqa: PLW2901 + mass, buffer, pcm_format=pcm_format ) - # send remaining bytes in buffer - bytes_sent += len(buffer) - yield buffer - del buffer - - if bytes_sent == 0: - # edge case: no audio data was sent - streamdetails.stream_error = True - finished = False - logger.warning("Stream error on %s", streamdetails.uri) - # we send a bit of silence so players get at least some data - # without it, some players refuse to skip to the next track - async for chunk in get_silence(6, pcm_format): - yield chunk - bytes_sent += len(chunk) - else: - finished = True + bytes_sent += len(chunk) + yield chunk + buffer = b"" + continue + + #### OTHER: enough data in buffer, feed to output + while len(buffer) > req_buffer_size: + yield buffer[: pcm_format.pcm_sample_size] + bytes_sent += pcm_format.pcm_sample_size + buffer = buffer[pcm_format.pcm_sample_size :] + + # end of audio/track reached + if strip_silence_end and buffer: + # strip silence from end of audio + buffer = await strip_silence( + mass, + buffer, + pcm_format=pcm_format, + reverse=True, + ) + # send remaining bytes in buffer + bytes_sent += len(buffer) + yield buffer + del buffer + + if bytes_sent == 0: + # edge case: no audio data was sent + streamdetails.stream_error = True + finished = False + logger.warning("Stream error on %s", streamdetails.uri) + # we send a bit of silence so players get at least some data + # without it, some players refuse to skip to the next track + async for chunk in get_silence(6, pcm_format): + yield chunk + bytes_sent += len(chunk) + else: + finished = True finally: - if "ffmpeg_proc" not in locals(): - # edge case: ffmpeg process was not yet started - return # noqa: B012 - if finished and not ffmpeg_proc.closed: - await asyncio.wait_for(ffmpeg_proc.wait(), 60) - elif not ffmpeg_proc.closed: - await ffmpeg_proc.close() + await ffmpeg_proc.close() # try to determine how many seconds we've streamed seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 @@ -535,10 +381,15 @@ async def get_media_stream( streamdetails.duration = seconds_streamed # parse loudnorm data if we have that collected - required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 - if streamdetails.loudness is None and (finished or (seconds_streamed >= required_seconds)): - loudness_details = parse_loudnorm(" ".join(ffmpeg_proc.log_history)) - if loudness_details is not None: + if ( + streamdetails.loudness is None + and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED + and (finished or (seconds_streamed >= 60)) + ): + # if dynamic volume normalization is enabled and the entire track is streamed + # the loudnorm filter will output the measuremeet in the log, + # so we can use those directly instead of analyzing the audio + if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): logger.debug( "Loudness measurement for %s: %s dB", streamdetails.uri, @@ -553,6 +404,12 @@ async def get_media_stream( media_type=streamdetails.media_type, ) ) + else: + # no data from loudnorm filter found, we need to analyze the audio + # add background task to start analyzing the audio + task_id = f"analyze_loudness_{streamdetails.uri}" + mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id) + # report playback if finished or seconds_streamed > 30: mass.create_task( @@ -950,41 +807,6 @@ async def get_file_stream( yield data -async def get_ffmpeg_stream( - audio_input: AsyncGenerator[bytes, None] | str, - input_format: AudioFormat, - output_format: AudioFormat, - filter_params: list[str] | None = None, - extra_args: list[str] | None = None, - chunk_size: int | None = None, - extra_input_args: list[str] | None = None, - logger: logging.Logger | None = None, -) -> AsyncGenerator[bytes, None]: - """ - Get the ffmpeg audio stream as async generator. - - Takes care of resampling and/or recoding if needed, - according to player preferences. - """ - async with FFMpeg( - audio_input=audio_input, - input_format=input_format, - output_format=output_format, - filter_params=filter_params, - extra_args=extra_args, - extra_input_args=extra_input_args, - logger=logger, - ) as ffmpeg_proc: - # read final chunks from stdout - iterator = ( - ffmpeg_proc.iter_chunked(chunk_size) - if chunk_size - else ffmpeg_proc.iter_any(get_chunksize(output_format)) - ) - async for chunk in iterator: - yield chunk - - async def check_audio_support() -> tuple[bool, bool, str]: """Check if ffmpeg is present (with/without libsoxr support).""" # check for FFmpeg presence @@ -1072,7 +894,7 @@ def get_chunksize( if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF): return pcm_size if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC): - return int(pcm_size * 0.5) + return int(pcm_size * 0.8) if fmt.content_type in (ContentType.MP3, ContentType.OGG): return int((320000 / 8) * seconds) if fmt.content_type in (ContentType.AAC, ContentType.M4A): @@ -1112,142 +934,6 @@ def get_player_filter_params( return filter_params -def get_ffmpeg_args( - input_format: AudioFormat, - output_format: AudioFormat, - filter_params: list[str], - extra_args: list[str] | None = None, - input_path: str = "-", - output_path: str = "-", - extra_input_args: list[str] | None = None, - loglevel: str = "error", -) -> list[str]: - """Collect all args to send to the ffmpeg process.""" - if extra_args is None: - extra_args = [] - ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") - if not ffmpeg_present: - msg = ( - "FFmpeg binary is missing from system." - "Please install ffmpeg on your OS to enable playback." - ) - raise AudioError( - msg, - ) - - major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha())) - - # generic args - generic_args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - loglevel, - "-nostats", - "-ignore_unknown", - "-protocol_whitelist", - "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp", - ] - # collect input args - input_args = [] - if extra_input_args: - input_args += extra_input_args - if input_path.startswith("http"): - # append reconnect options for direct stream from http - input_args += [ - "-reconnect", - "1", - "-reconnect_streamed", - "1", - ] - if major_version > 4: - # these options are only supported in ffmpeg > 5 - input_args += [ - "-reconnect_on_network_error", - "1", - "-reconnect_on_http_error", - "5xx,4xx", - ] - if input_format.content_type.is_pcm(): - input_args += [ - "-ac", - str(input_format.channels), - "-channel_layout", - "mono" if input_format.channels == 1 else "stereo", - "-ar", - str(input_format.sample_rate), - "-acodec", - input_format.content_type.name.lower(), - "-f", - input_format.content_type.value, - "-i", - input_path, - ] - else: - # let ffmpeg auto detect the content type from the metadata/headers - input_args += ["-i", input_path] - - # collect output args - output_args = [] - if output_path.upper() == "NULL": - # devnull stream - output_args = ["-f", "null", "-"] - elif output_format.content_type == ContentType.UNKNOWN: - raise RuntimeError("Invalid output format specified") - elif output_format.content_type == ContentType.AAC: - output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path] - elif output_format.content_type == ContentType.MP3: - output_args = ["-f", "mp3", "-b:a", "320k", output_path] - else: - if output_format.content_type.is_pcm(): - output_args += ["-acodec", output_format.content_type.name.lower()] - # use explicit format identifier for all other - output_args += [ - "-f", - output_format.content_type.value, - "-ar", - str(output_format.sample_rate), - "-ac", - str(output_format.channels), - ] - if output_format.output_format_str == "flac": - output_args += ["-compression_level", "6"] - output_args += [output_path] - - # edge case: source file is not stereo - downmix to stereo - if input_format.channels > 2 and output_format.channels == 2: - filter_params = [ - "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE", - *filter_params, - ] - - # determine if we need to do resampling - if ( - input_format.sample_rate != output_format.sample_rate - or input_format.bit_depth > output_format.bit_depth - ): - # prefer resampling with libsoxr due to its high quality - if libsoxr_support: - resample_filter = "aresample=resampler=soxr:precision=30" - else: - resample_filter = "aresample=resampler=swr" - - # sample rate conversion - if input_format.sample_rate != output_format.sample_rate: - resample_filter += f":osr={output_format.sample_rate}" - - # bit depth conversion: apply dithering when going down to 16 bits - if output_format.bit_depth == 16 and input_format.bit_depth > 16: - resample_filter += ":osf=s16:dither_method=triangular_hp" - - filter_params.append(resample_filter) - - if filter_params and "-filter_complex" not in extra_args: - extra_args += ["-af", ",".join(filter_params)] - - return generic_args + input_args + extra_args + output_args - - def parse_loudnorm(raw_stderr: bytes | str) -> float | None: """Parse Loudness measurement from ffmpeg stderr output.""" stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr @@ -1261,3 +947,108 @@ def parse_loudnorm(raw_stderr: bytes | str) -> float | None: except JSON_DECODE_EXCEPTIONS: return None return float(loudness_data["input_i"]) + + +async def analyze_loudness( + mass: MusicAssistant, + streamdetails: StreamDetails, +) -> None: + """Analyze media item's audio, to calculate EBU R128 loudness.""" + if result := await mass.music.get_loudness( + streamdetails.item_id, + streamdetails.provider, + media_type=streamdetails.media_type, + ): + # only when needed we do the analyze job + streamdetails.loudness = result + return + + logger = LOGGER.getChild("analyze_loudness") + logger.debug("Start analyzing audio for %s", streamdetails.uri) + + extra_input_args = [ + # limit to 10 minutes to reading too much in memory + "-t", + "600", + ] + if streamdetails.stream_type == StreamType.CUSTOM: + audio_source = mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + ) + elif streamdetails.stream_type == StreamType.HLS: + substream = await get_hls_substream(mass, streamdetails.path) + audio_source = substream.path + elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP: + audio_source = streamdetails.path + extra_input_args += ["-decryption_key", streamdetails.decryption_key] + else: + audio_source = streamdetails.path + + # calculate BS.1770 R128 integrated loudness with ffmpeg + async with FFMpeg( + audio_input=audio_source, + input_format=streamdetails.audio_format, + output_format=streamdetails.audio_format, + audio_output="NULL", + filter_params=["ebur128=framelog=verbose"], + extra_input_args=extra_input_args, + collect_log_history=True, + ) as ffmpeg_proc: + await ffmpeg_proc.wait() + log_lines = ffmpeg_proc.log_history + log_lines_str = "\n".join(log_lines) + try: + loudness_str = ( + log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0] + ) + loudness = float(loudness_str.strip()) + except (IndexError, ValueError, AttributeError): + LOGGER.warning( + "Could not determine integrated loudness of %s - %s", + streamdetails.uri, + log_lines_str or "received empty value", + ) + else: + streamdetails.loudness = loudness + await mass.music.set_loudness( + streamdetails.item_id, + streamdetails.provider, + loudness, + media_type=streamdetails.media_type, + ) + logger.debug( + "Integrated loudness of %s is: %s", + streamdetails.uri, + loudness, + ) + + +def _get_normalization_mode( + core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails +) -> VolumeNormalizationMode: + if not player_config.get_value(CONF_VOLUME_NORMALIZATION): + # disabled for this player + return VolumeNormalizationMode.DISABLED + # work out preference for track or radio + preference = VolumeNormalizationMode( + core_config.get_value( + CONF_VOLUME_NORMALIZATION_RADIO + if streamdetails.media_type == MediaType.RADIO + else CONF_VOLUME_NORMALIZATION_TRACKS, + ) + ) + + # handle no measurement available but fallback to dynamic mode is allowed + if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC: + return VolumeNormalizationMode.DYNAMIC + + # handle no measurement available and no fallback allowed + if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY: + return VolumeNormalizationMode.DISABLED + + # handle no measurement available and fallback to fixed gain is allowed + if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN: + return VolumeNormalizationMode.FIXED_GAIN + + # simply return the preference + return preference diff --git a/music_assistant/server/helpers/ffmpeg.py b/music_assistant/server/helpers/ffmpeg.py new file mode 100644 index 000000000..490beafe5 --- /dev/null +++ b/music_assistant/server/helpers/ffmpeg.py @@ -0,0 +1,305 @@ +"""FFMpeg related helpers.""" + +from __future__ import annotations + +import asyncio +import logging +from collections import deque +from collections.abc import AsyncGenerator +from typing import TYPE_CHECKING + +from music_assistant.common.helpers.global_cache import get_global_cache_value +from music_assistant.common.models.errors import AudioError +from music_assistant.common.models.media_items import AudioFormat, ContentType +from music_assistant.constants import VERBOSE_LOG_LEVEL + +from .process import AsyncProcess +from .util import close_async_generator + +LOGGER = logging.getLogger("ffmpeg") + + +class FFMpeg(AsyncProcess): + """FFMpeg wrapped as AsyncProcess.""" + + def __init__( + self, + audio_input: AsyncGenerator[bytes, None] | str | int, + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str] | None = None, + extra_args: list[str] | None = None, + extra_input_args: list[str] | None = None, + audio_output: str | int = "-", + collect_log_history: bool = False, + ) -> None: + """Initialize AsyncProcess.""" + ffmpeg_args = get_ffmpeg_args( + input_format=input_format, + output_format=output_format, + filter_params=filter_params or [], + extra_args=extra_args or [], + input_path=audio_input if isinstance(audio_input, str) else "-", + output_path=audio_output if isinstance(audio_output, str) else "-", + extra_input_args=extra_input_args or [], + loglevel="info", + ) + self.audio_input = audio_input + self.input_format = input_format + self.collect_log_history = collect_log_history + self.log_history: deque[str] = deque(maxlen=100) + self._stdin_task: asyncio.Task | None = None + self._logger_task: asyncio.Task | None = None + super().__init__( + ffmpeg_args, + stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input, + stdout=True if isinstance(audio_output, str) else audio_output, + stderr=True, + ) + self.logger = LOGGER + + async def start(self) -> None: + """Perform Async init of process.""" + await super().start() + if self.proc: + self.logger = LOGGER.getChild(str(self.proc.pid)) + clean_args = [] + for arg in self._args[1:]: + if arg.startswith("http"): + clean_args.append("") + elif "/" in arg and "." in arg: + clean_args.append("") + else: + clean_args.append(arg) + args_str = " ".join(clean_args) + self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str) + self._logger_task = asyncio.create_task(self._log_reader_task()) + if isinstance(self.audio_input, AsyncGenerator): + self._stdin_task = asyncio.create_task(self._feed_stdin()) + + async def close(self, send_signal: bool = True) -> None: + """Close/terminate the process and wait for exit.""" + if self.closed: + return + if self._stdin_task and not self._stdin_task.done(): + self._stdin_task.cancel() + await super().close(send_signal) + + async def _log_reader_task(self) -> None: + """Read ffmpeg log from stderr.""" + decode_errors = 0 + async for line in self.iter_stderr(): + if self.collect_log_history: + self.log_history.append(line) + if "error" in line or "warning" in line: + self.logger.debug(line) + elif "critical" in line: + self.logger.warning(line) + else: + self.logger.log(VERBOSE_LOG_LEVEL, line) + + if "Invalid data found when processing input" in line: + decode_errors += 1 + if decode_errors >= 50: + self.logger.error(line) + await super().close(True) + + # if streamdetails contenttype is unknown, try parse it from the ffmpeg log + if line.startswith("Stream #") and ": Audio: " in line: + if self.input_format.content_type == ContentType.UNKNOWN: + content_type_raw = line.split(": Audio: ")[1].split(" ")[0] + content_type = ContentType.try_parse(content_type_raw) + self.logger.debug( + "Detected (input) content type: %s (%s)", content_type, content_type_raw + ) + self.input_format.content_type = content_type + del line + + async def _feed_stdin(self) -> None: + """Feed stdin with audio chunks from an AsyncGenerator.""" + if TYPE_CHECKING: + self.audio_input: AsyncGenerator[bytes, None] + generator_exhausted = False + try: + async for chunk in self.audio_input: + await self.write(chunk) + generator_exhausted = True + except Exception as err: + if isinstance(err, asyncio.CancelledError): + return + self.logger.error( + "Stream error: %s", + str(err), + exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) + finally: + await self.write_eof() + # we need to ensure that we close the async generator + # if we get cancelled otherwise it keeps lingering forever + if not generator_exhausted: + await close_async_generator(self.audio_input) + + +async def get_ffmpeg_stream( + audio_input: AsyncGenerator[bytes, None] | str, + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str] | None = None, + extra_args: list[str] | None = None, + chunk_size: int | None = None, + extra_input_args: list[str] | None = None, +) -> AsyncGenerator[bytes, None]: + """ + Get the ffmpeg audio stream as async generator. + + Takes care of resampling and/or recoding if needed, + according to player preferences. + """ + async with FFMpeg( + audio_input=audio_input, + input_format=input_format, + output_format=output_format, + filter_params=filter_params, + extra_args=extra_args, + extra_input_args=extra_input_args, + ) as ffmpeg_proc: + # read final chunks from stdout + iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() + async for chunk in iterator: + yield chunk + + +def get_ffmpeg_args( + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str], + extra_args: list[str] | None = None, + input_path: str = "-", + output_path: str = "-", + extra_input_args: list[str] | None = None, + loglevel: str = "error", +) -> list[str]: + """Collect all args to send to the ffmpeg process.""" + if extra_args is None: + extra_args = [] + ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") + if not ffmpeg_present: + msg = ( + "FFmpeg binary is missing from system." + "Please install ffmpeg on your OS to enable playback." + ) + raise AudioError( + msg, + ) + + major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha())) + + # generic args + generic_args = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + loglevel, + "-nostats", + "-ignore_unknown", + "-protocol_whitelist", + "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp", + ] + # collect input args + input_args = [] + if extra_input_args: + input_args += extra_input_args + if input_path.startswith("http"): + # append reconnect options for direct stream from http + input_args += [ + "-reconnect", + "1", + "-reconnect_streamed", + "1", + ] + if major_version > 4: + # these options are only supported in ffmpeg > 5 + input_args += [ + "-reconnect_on_network_error", + "1", + "-reconnect_on_http_error", + "5xx,4xx", + ] + if input_format.content_type.is_pcm(): + input_args += [ + "-ac", + str(input_format.channels), + "-channel_layout", + "mono" if input_format.channels == 1 else "stereo", + "-ar", + str(input_format.sample_rate), + "-acodec", + input_format.content_type.name.lower(), + "-f", + input_format.content_type.value, + "-i", + input_path, + ] + else: + # let ffmpeg auto detect the content type from the metadata/headers + input_args += ["-i", input_path] + + # collect output args + output_args = [] + if output_path.upper() == "NULL": + # devnull stream + output_args = ["-f", "null", "-"] + elif output_format.content_type == ContentType.UNKNOWN: + raise RuntimeError("Invalid output format specified") + elif output_format.content_type == ContentType.AAC: + output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path] + elif output_format.content_type == ContentType.MP3: + output_args = ["-f", "mp3", "-b:a", "320k", output_path] + else: + if output_format.content_type.is_pcm(): + output_args += ["-acodec", output_format.content_type.name.lower()] + # use explicit format identifier for all other + output_args += [ + "-f", + output_format.content_type.value, + "-ar", + str(output_format.sample_rate), + "-ac", + str(output_format.channels), + ] + if output_format.output_format_str == "flac": + output_args += ["-compression_level", "6"] + output_args += [output_path] + + # edge case: source file is not stereo - downmix to stereo + if input_format.channels > 2 and output_format.channels == 2: + filter_params = [ + "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE", + *filter_params, + ] + + # determine if we need to do resampling + if ( + input_format.sample_rate != output_format.sample_rate + or input_format.bit_depth > output_format.bit_depth + ): + # prefer resampling with libsoxr due to its high quality + if libsoxr_support: + resample_filter = "aresample=resampler=soxr:precision=30" + else: + resample_filter = "aresample=resampler=swr" + + # sample rate conversion + if input_format.sample_rate != output_format.sample_rate: + resample_filter += f":osr={output_format.sample_rate}" + + # bit depth conversion: apply dithering when going down to 16 bits + if output_format.bit_depth == 16 and input_format.bit_depth > 16: + resample_filter += ":osf=s16:dither_method=triangular_hp" + + filter_params.append(resample_filter) + + if filter_params and "-filter_complex" not in extra_args: + extra_args += ["-af", ",".join(filter_params)] + + return generic_args + input_args + extra_args + output_args diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index 41e055744..82c2f3dba 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -11,7 +11,8 @@ import urllib.error import urllib.parse import urllib.request -from collections.abc import Awaitable, Callable, Coroutine +from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine +from contextlib import suppress from functools import lru_cache from importlib.metadata import PackageNotFoundError from importlib.metadata import version as pkg_version @@ -157,6 +158,15 @@ def get_port_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None: return discovery_info.port +async def close_async_generator(agen: AsyncGenerator[Any, None]) -> None: + """Force close an async generator.""" + task = asyncio.create_task(agen.__anext__()) + task.cancel() + with suppress(asyncio.CancelledError): + await task + await agen.aclose() + + class TaskManager: """ Helper class to run many tasks at once. diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 7d35ff48a..d4add27ba 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -272,7 +272,6 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None: output_format=AIRPLAY_PCM_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), audio_output=write, - logger=self.airplay_player.logger.getChild("ffmpeg"), ) await self._ffmpeg_proc.start() await asyncio.to_thread(os.close, write) diff --git a/music_assistant/server/providers/bluesound/__init__.py b/music_assistant/server/providers/bluesound/__init__.py index bbfefd0f1..06e974594 100644 --- a/music_assistant/server/providers/bluesound/__init__.py +++ b/music_assistant/server/providers/bluesound/__init__.py @@ -12,10 +12,9 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED, CONF_ENTRY_ENABLE_ICY_METADATA, CONF_ENTRY_ENFORCE_MP3, - CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED, + CONF_ENTRY_FLOW_MODE_ENFORCED, CONF_ENTRY_HTTP_PROFILE_FORCED_2, ConfigEntry, ConfigValueType, @@ -208,7 +207,7 @@ def supported_features(self) -> tuple[ProviderFeature, ...]: async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" - self.bluos_players: dict[str, BluosPlayer] = {} + self.bluos_players: dict[str, BluesoundPlayer] = {} async def on_mdns_service_state_change( self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None @@ -291,16 +290,15 @@ async def get_player_config_entries( ) -> tuple[ConfigEntry, ...]: """Return Config Entries for the given player.""" base_entries = await super().get_player_config_entries(self.player_id) - if not self.bluos_players.get(self.player_id): + if not self.bluos_players.get(player_id): # TODO fix player entries return (*base_entries, CONF_ENTRY_CROSSFADE) return ( *base_entries, CONF_ENTRY_HTTP_PROFILE_FORCED_2, CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED, CONF_ENTRY_ENFORCE_MP3, - CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED, + CONF_ENTRY_FLOW_MODE_ENFORCED, CONF_ENTRY_ENABLE_ICY_METADATA, ) @@ -349,13 +347,11 @@ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: mass_player.volume_mute = muted await bluos_player.update_attributes() - async def play_media( - self, player_id: str, media: PlayerMedia, timeout: float | None = None - ) -> None: + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA for BluOS player using the provided URL.""" mass_player = self.mass.players.get(player_id) if bluos_player := self.bluos_players[player_id]: - play_status = await bluos_player.client.play_url(media.uri, timeout=timeout) + play_status = await bluos_player.client.play_url(media.uri) if play_status == "stream": # Update media info then optimistically override playback state and source await bluos_player.update_attributes() diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 7566825e1..1484c318e 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -351,12 +351,21 @@ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: media.uri = media.uri.replace(".flac", ".mp3") didl_metadata = create_didl_metadata(media) title = media.title or media.uri - await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata) - self.logger.debug( - "Enqued next track (%s) to player %s", - title, - dlna_player.player.display_name, - ) + try: + await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata) + except UpnpError: + self.logger.error( + "Enqueuing the next track failed for player %s - " + "the player probably doesn't support this. " + "Enable 'flow mode' for this player.", + dlna_player.player.display_name, + ) + else: + self.logger.debug( + "Enqued next track (%s) to player %s", + title, + dlna_player.player.display_name, + ) @catch_request_errors async def cmd_pause(self, player_id: str) -> None: diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 2b4930c24..82814e83f 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -517,7 +517,6 @@ async def _streamer() -> None: output_format=DEFAULT_SNAPCAST_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), audio_output=stream_path, - logger=self.logger.getChild("ffmpeg"), ) as ffmpeg_proc: player.state = PlayerState.PLAYING player.current_media = media diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 1e8b03fe5..7fd60e953 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -330,7 +330,7 @@ def update_attributes(self) -> None: # noqa: PLR0915 else: # player is group child (synced to another player) group_parent = self.prov.sonos_players.get(self.client.player.group.coordinator_id) - if not group_parent: + if not group_parent or not group_parent.client: # handle race condition where the group parent is not yet discovered return active_group = group_parent.client.player.group diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 0bd10b5e9..910fe80d7 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -585,14 +585,23 @@ async def get_audio_stream( chunk_size = get_chunksize(streamdetails.audio_format) stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot") - async with AsyncProcess( - args, - stdout=True, - stderr=stderr, - name="librespot", - ) as librespot_proc: - async for chunk in librespot_proc.iter_any(chunk_size): - yield chunk + for attempt in range(1, 3): + async with AsyncProcess( + args, + stdout=True, + stderr=stderr, + name="librespot", + ) as librespot_proc: + chunks_received = 0 + async for chunk in librespot_proc.iter_any(chunk_size): + yield chunk + chunks_received += 1 + if chunks_received: + break + self.logger.warning( + "librespot failed to stream track, retrying... (attempt %s/3)", attempt + ) + await asyncio.sleep(0.5) def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout."""