Skip to content

Commit

Permalink
Filesystem matching and metadata fixes (#1625)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Aug 28, 2024
1 parent 78d065b commit cc68fa5
Show file tree
Hide file tree
Showing 8 changed files with 1,151 additions and 1,308 deletions.
57 changes: 21 additions & 36 deletions music_assistant/server/controllers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,21 +428,8 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
local_provs = get_global_cache_value("non_streaming_providers")
if TYPE_CHECKING:
local_provs = cast(set[str], local_provs)
for prov_mapping in artist.provider_mappings:
if prov_mapping.provider_instance not in local_provs:
continue
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
if prov.lookup_key in unique_keys:
continue
unique_keys.add(prov.lookup_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.artists.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
)
artist.metadata.update(prov_item.metadata)

# collect metadata from all (online) music/metadata providers
# collect metadata from all (online) music + metadata providers
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
# to not overload the music/metadata providers with api calls
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
Expand All @@ -459,12 +446,14 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
await self.mass.music.artists.match_providers(artist)

# collect metadata from all (streaming) music providers
# NOTE: local providers have already pushed their metadata in the sync
for prov_mapping in artist.provider_mappings:
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
if prov.lookup_key in unique_keys:
continue
unique_keys.add(prov.lookup_key)
if prov.lookup_key not in local_provs:
unique_keys.add(prov.lookup_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.artists.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
Expand Down Expand Up @@ -495,26 +484,12 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
async def _update_album_metadata(self, album: Album, force_refresh: bool = False) -> None:
"""Get/update rich metadata for an album."""
self.logger.debug("Updating metadata for Album %s", album.name)
unique_keys: set[str] = set()
# collect (local) metadata from all local music providers
local_provs = get_global_cache_value("non_streaming_providers")
if TYPE_CHECKING:
local_provs = cast(set[str], local_provs)
for prov_mapping in album.provider_mappings:
if prov_mapping.provider_instance not in local_provs:
continue
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
if prov.lookup_key in unique_keys:
continue
unique_keys.add(prov.lookup_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.albums.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
)
album.metadata.update(prov_item.metadata)

# collect metadata from all (online) music/metadata providers
# collect metadata from all (online) music + metadata providers
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
# to not overload the (free) metadata providers with api calls
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
Expand All @@ -531,12 +506,15 @@ async def _update_album_metadata(self, album: Album, force_refresh: bool = False
await self.mass.music.albums.match_providers(album)

# collect metadata from all (streaming) music providers
# NOTE: local providers have already pushed their metadata in the sync
unique_keys: set[str] = set()
for prov_mapping in album.provider_mappings:
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
continue
if prov.lookup_key in unique_keys:
continue
unique_keys.add(prov.lookup_key)
if prov.lookup_key not in local_provs:
unique_keys.add(prov.lookup_key)
with suppress(MediaNotFoundError):
prov_item = await self.mass.music.albums.get_provider_item(
prov_mapping.item_id, prov_mapping.provider_instance
Expand Down Expand Up @@ -600,7 +578,9 @@ async def _update_track_metadata(self, track: Track, force_refresh: bool = False
track.metadata.update(prov_item.metadata)

# collect metadata from all metadata providers
if self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
# there is only little metadata available for tracks so we only fetch metadata
# from other sources if the force flag is set
if force_refresh and self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
for provider in self.providers:
if ProviderFeature.TRACK_METADATA not in provider.supported_features:
continue
Expand Down Expand Up @@ -756,39 +736,44 @@ async def _metadata_scanner(self) -> None:
self.logger.info("Starting metadata scanner")
self._online_slots_available = MAX_ONLINE_CALLS_PER_RUN
timestamp = int(time() - 60 * 60 * 24 * 30)
# ARTISTS metadata refresh
query = (
f"json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') < {timestamp}"
)
for artist in await self.mass.music.artists.library_items(
limit=2500, order_by="random", extra_query=query
limit=50, order_by="random", extra_query=query
):
await self._update_artist_metadata(artist)

# ALBUMS metadata refresh
query = (
f"json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') < {timestamp}"
)
for album in await self.mass.music.albums.library_items(
limit=2500, order_by="random", extra_query=query
limit=50, order_by="random", extra_query=query
):
await self._update_album_metadata(album)

# PLAYLISTS metadata refresh
query = (
f"json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') < {timestamp}"
)
for playlist in await self.mass.music.playlists.library_items(
limit=2500, order_by="random", extra_query=query
limit=50, order_by="random", extra_query=query
):
await self._update_playlist_metadata(playlist)

# TRACKS metadata refresh
timestamp = int(time() - 60 * 60 * 24 * 30)
query = (
f"json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') < {timestamp}"
)
for track in await self.mass.music.tracks.library_items(
limit=2500, order_by="random", extra_query=query
limit=50, order_by="random", extra_query=query
):
await self._update_track_metadata(track)
self.logger.info("Metadata scanner finished.")
100 changes: 21 additions & 79 deletions music_assistant/server/helpers/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import Iterable
from dataclasses import dataclass
from json import JSONDecodeError
from typing import TYPE_CHECKING, Any
from typing import Any

import eyed3

Expand All @@ -20,9 +20,6 @@
from music_assistant.constants import MASS_LOGGER_NAME, UNKNOWN_ARTIST
from music_assistant.server.helpers.process import AsyncProcess

if TYPE_CHECKING:
from collections.abc import AsyncGenerator

LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.tags")

# silence the eyed3 logger because it is too verbose
Expand Down Expand Up @@ -369,16 +366,12 @@ def get(self, key: str, default=None) -> Any:
return self.tags.get(key, default)


async def parse_tags(
input_file: str | AsyncGenerator[bytes, None], file_size: int | None = None
) -> AudioTags:
"""Parse tags from a media file.
input_file may be a (local) filename/url accessible by ffmpeg or
an AsyncGenerator which yields the file contents as bytes.
async def parse_tags(input_file: str, file_size: int | None = None) -> AudioTags:
"""
file_path = input_file if isinstance(input_file, str) else "-"
Parse tags from a media file (or URL).
Input_file may be a (local) filename or URL accessible by ffmpeg.
"""
args = (
"ffprobe",
"-hide_banner",
Expand All @@ -393,35 +386,11 @@ async def parse_tags(
"-print_format",
"json",
"-i",
file_path,
input_file,
)

writer_task: asyncio.Task | None = None
ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
await ffmpeg_proc.start()

async def writer() -> None:
bytes_read = 0
async for chunk in input_file:
if ffmpeg_proc.closed:
break
await ffmpeg_proc.write(chunk)
bytes_read += len(chunk)
del chunk
if bytes_read > 25 * 1000000:
# this is possibly a m4a file with 'moove atom' metadata at the
# end of the file
# we'll have to read the entire file to do something with it
# for now we just ignore/deny these files
LOGGER.error("Found file with tags not present at beginning of file")
break

if file_path == "-":
# feed the file contents to the process
writer_task = asyncio.create_task(writer)

async with AsyncProcess(args, stdin=False, stdout=True) as ffmpeg:
res = await ffmpeg.read(-1)
try:
res = await ffmpeg_proc.read(-1)
data = json.loads(res)
if error := data.get("error"):
raise InvalidDataError(error["string"])
Expand All @@ -438,74 +407,47 @@ async def writer() -> None:
tags.duration = float(tags.raw["format"]["duration"])

if (
not file_path.startswith("http")
and file_path.endswith(".mp3")
not input_file.startswith("http")
and input_file.endswith(".mp3")
and "musicbrainzrecordingid" not in tags.tags
and await asyncio.to_thread(os.path.isfile, file_path)
and await asyncio.to_thread(os.path.isfile, input_file)
):
# eyed3 is able to extract the musicbrainzrecordingid from the unique file id
# this is actually a bug in ffmpeg/ffprobe which does not expose this tag
# so we use this as alternative approach for mp3 files
audiofile = await asyncio.to_thread(eyed3.load, file_path)
audiofile = await asyncio.to_thread(eyed3.load, input_file)
if audiofile is not None and audiofile.tag is not None:
for uf_id in audiofile.tag.unique_file_ids:
if uf_id.owner_id == b"http://musicbrainz.org" and uf_id.uniq_id:
tags.tags["musicbrainzrecordingid"] = uf_id.uniq_id.decode()
break

del audiofile
return tags
except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
msg = f"Unable to retrieve info for {file_path}: {err!s}"
msg = f"Unable to retrieve info for {input_file}: {err!s}"
raise InvalidDataError(msg) from err
finally:
if writer_task and not writer_task.done():
writer_task.cancel()
await ffmpeg_proc.close()


async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> bytes | None:
async def get_embedded_image(input_file: str) -> bytes | None:
"""Return embedded image data.
input_file may be a (local) filename/url accessible by ffmpeg or
an AsyncGenerator which yields the file contents as bytes.
Input_file may be a (local) filename or URL accessible by ffmpeg.
"""
file_path = input_file if isinstance(input_file, str) else "-"
args = (
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
file_path,
input_file,
"-an",
"-vcodec",
"mjpeg",
"-f",
"mjpeg",
"-",
)

writer_task: asyncio.Task | None = None
ffmpeg_proc = AsyncProcess(
args, stdin=file_path == "-", stdout=True, stderr=None, name="ffmpeg_image"
)
await ffmpeg_proc.start()

async def writer() -> None:
async for chunk in input_file:
if ffmpeg_proc.closed:
break
await ffmpeg_proc.write(chunk)
await ffmpeg_proc.write_eof()

# feed the file contents to the process stdin
if file_path == "-":
writer_task = asyncio.create_task(writer)

# return image bytes from stdout
try:
return await ffmpeg_proc.read(-1)
finally:
if writer_task and not writer_task.cancelled():
writer_task.cancel()
await ffmpeg_proc.close()
async with AsyncProcess(
args, stdin=False, stdout=True, stderr=None, name="ffmpeg_image"
) as ffmpeg:
return await ffmpeg.read(-1)
18 changes: 16 additions & 2 deletions music_assistant/server/helpers/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,29 @@ class TaskManager:
Logging of exceptions is done by the mass.create_task helper.
"""

def __init__(self, mass: MusicAssistant):
def __init__(self, mass: MusicAssistant, limit: int = 0):
"""Initialize the TaskManager."""
self.mass = mass
self._tasks: list[asyncio.Task] = []
self._semaphore = asyncio.Semaphore(limit) if limit else None

def create_task(self, coro: Coroutine) -> None:
def create_task(self, coro: Coroutine) -> asyncio.Task:
"""Create a new task and add it to the manager."""
task = self.mass.create_task(coro)
self._tasks.append(task)
return task

async def create_task_with_limit(self, coro: Coroutine) -> None:
"""Create a new task with semaphore limit."""
assert self._semaphore is not None

def task_done_callback(_task: asyncio.Task) -> None:
self._tasks.remove(task)
self._semaphore.release()

await self._semaphore.acquire()
task: asyncio.Task = self.create_task(coro)
task.add_done_callback(task_done_callback)

async def __aenter__(self) -> Self:
"""Enter context manager."""
Expand Down
Loading

0 comments on commit cc68fa5

Please sign in to comment.