Skip to content

Commit

Permalink
Feat: Ability to use Audiobookshelf Sessions/ HLS Stream (#1887)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmunkes authored Jan 19, 2025
1 parent 4182cb7 commit fe63c1f
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 16 deletions.
95 changes: 84 additions & 11 deletions music_assistant/providers/audiobookshelf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from __future__ import annotations

import logging
from collections.abc import AsyncGenerator, Sequence
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -38,7 +37,9 @@
from music_assistant.providers.audiobookshelf.abs_client import ABSClient
from music_assistant.providers.audiobookshelf.abs_schema import (
ABSAudioBook,
ABSDeviceInfo,
ABSLibrary,
ABSPlaybackSessionExpanded,
ABSPodcast,
ABSPodcastEpisodeExpanded,
)
Expand Down Expand Up @@ -126,26 +127,40 @@ async def handle_async_init(self) -> None:
"""Pass config values to client and initialize."""
self._client = ABSClient()
base_url = str(self.config.get_value(CONF_URL))
username = str(self.config.get_value(CONF_USERNAME))
try:
await self._client.init(
session=self.mass.http_session,
base_url=base_url,
username=str(self.config.get_value(CONF_USERNAME)),
username=username,
password=str(self.config.get_value(CONF_PASSWORD)),
logger=self.logger,
check_ssl=bool(self.config.get_value(CONF_VERIFY_SSL)),
)
except RuntimeError:
# login details were not correct
raise LoginFailed(f"Login to abs instance at {base_url} failed.")
await self._client.sync()

# this will be provided when creating sessions or receive already opened sessions
self.device_info = ABSDeviceInfo(
device_id=self.instance_id,
client_name="Music Assistant",
client_version=self.mass.version,
manufacturer="",
model=self.mass.server_id,
)

self.logger.debug(f"Our playback session device_id is {self.instance_id}")

async def unload(self, is_removed: bool = False) -> None:
"""
Handle unload/close of the provider.
Called when provider is deregistered (e.g. MA exiting or config reloading).
is_removed will be set to True when the provider is removed from the configuration.
"""
await self._client.close_all_playback_sessions()
await self._client.logout()

@property
Expand Down Expand Up @@ -358,24 +373,70 @@ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
abs_audiobook = await self._client.get_audiobook(prov_audiobook_id)
return await self._parse_audiobook(abs_audiobook)

async def get_streamdetails_from_playback_session(
self, session: ABSPlaybackSessionExpanded
) -> StreamDetails:
"""Give Streamdetails from given session."""
tracks = session.audio_tracks
if len(tracks) == 0:
raise RuntimeError("Playback session has no tracks to play")
track = tracks[0]
track_url = track.content_url
if track_url.split("/")[1] != "hls":
raise RuntimeError("Did expect HLS stream for session playback")
item_id = ""
if session.media_type == "podcast":
media_type = MediaType.PODCAST_EPISODE
podcast_id = session.library_item_id
session_id = session.id_
episode_id = session.episode_id
item_id = f"{podcast_id} {episode_id} {session_id}"
else:
media_type = MediaType.AUDIOBOOK
audiobook_id = session.library_item_id
session_id = session.id_
item_id = f"{audiobook_id} {session_id}"
token = self._client.token
base_url = str(self.config.get_value(CONF_URL))
media_url = track.content_url
stream_url = f"{base_url}{media_url}?token={token}"
return StreamDetails(
provider=self.instance_id,
item_id=item_id,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
),
media_type=media_type,
stream_type=StreamType.HLS,
path=stream_url,
)

async def get_stream_details(
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails:
"""Get stream of item."""
# self.logger.debug(f"Streamdetails: {item_id}")
if media_type == MediaType.PODCAST_EPISODE:
return await self._get_stream_details_podcast_episode(item_id)
elif media_type == MediaType.AUDIOBOOK:
return await self._get_stream_details_audiobook(item_id)
abs_audiobook = await self._client.get_audiobook(item_id)
tracks = abs_audiobook.media.tracks
if len(tracks) == 0:
raise MediaNotFoundError("Stream not found")
if len(tracks) > 1:
session = await self._client.get_playback_session_audiobook(
device_info=self.device_info, audiobook_id=item_id
)
return await self.get_streamdetails_from_playback_session(session)
return await self._get_stream_details_audiobook(abs_audiobook)
raise MediaNotFoundError("Stream unknown")

async def _get_stream_details_audiobook(self, audiobook_id: str) -> StreamDetails:
async def _get_stream_details_audiobook(self, abs_audiobook: ABSAudioBook) -> StreamDetails:
"""Only single audio file in audiobook."""
abs_audiobook = await self._client.get_audiobook(audiobook_id)
self.logger.debug(
f"Using direct playback for audiobook {abs_audiobook.media.metadata.title}"
)
tracks = abs_audiobook.media.tracks
if len(tracks) == 0:
raise MediaNotFoundError("Stream not found")
if len(tracks) > 1:
logging.warning("Music Assistant only supports single file base audiobooks")
token = self._client.token
base_url = str(self.config.get_value(CONF_URL))
media_url = tracks[0].content_url
Expand All @@ -384,7 +445,7 @@ async def _get_stream_details_audiobook(self, audiobook_id: str) -> StreamDetail
# to lift unknown at some point.
return StreamDetails(
provider=self.lookup_key,
item_id=audiobook_id,
item_id=abs_audiobook.id_,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
),
Expand All @@ -404,6 +465,7 @@ async def _get_stream_details_podcast_episode(self, podcast_id: str) -> StreamDe
break
if abs_episode is None:
raise MediaNotFoundError("Stream not found")
self.logger.debug(f"Using direct playback for podcast episode {abs_episode.title}")
token = self._client.token
base_url = str(self.config.get_value(CONF_URL))
media_url = abs_episode.audio_track.content_url
Expand All @@ -422,11 +484,21 @@ async def _get_stream_details_podcast_episode(self, podcast_id: str) -> StreamDe
async def on_played(
self, media_type: MediaType, item_id: str, fully_played: bool, position: int
) -> None:
"""Update progress in Audiobookshelf."""
"""Update progress in Audiobookshelf.
In our case media_type may have 3 values:
- PODCAST
- PODCAST_EPISODE
- AUDIOBOOK
We ignore PODCAST (function is called on adding a podcast with position=None)
"""
# self.logger.debug(f"on_played: {media_type=} {item_id=}, {fully_played=} {position=}")
if media_type == MediaType.PODCAST_EPISODE:
abs_podcast_id, abs_episode_id = item_id.split(" ")
mass_podcast_episode = await self.get_podcast_episode(item_id)
duration = mass_podcast_episode.duration
self.logger.debug(f"Updating of {media_type.value} named {mass_podcast_episode.name}")
await self._client.update_podcast_progress(
podcast_id=abs_podcast_id,
episode_id=abs_episode_id,
Expand All @@ -437,6 +509,7 @@ async def on_played(
if media_type == MediaType.AUDIOBOOK:
mass_audiobook = await self.get_audiobook(item_id)
duration = mass_audiobook.duration
self.logger.debug(f"Updating {media_type.value} named {mass_audiobook.name} progress")
await self._client.update_audiobook_progress(
audiobook_id=item_id,
progress_s=position,
Expand Down
129 changes: 127 additions & 2 deletions music_assistant/providers/audiobookshelf/abs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@
We only implement the functions necessary for mass.
"""

import logging
from collections.abc import AsyncGenerator
from enum import Enum
from typing import Any

from aiohttp import ClientSession
from music_assistant_models.media_items import UniqueList

from music_assistant.providers.audiobookshelf.abs_schema import (
ABSAudioBook,
ABSDeviceInfo,
ABSLibrariesItemsResponse,
ABSLibrariesResponse,
ABSLibrary,
ABSLibraryItem,
ABSLoginResponse,
ABSMediaProgress,
ABSPlaybackSession,
ABSPlaybackSessionExpanded,
ABSPlayRequest,
ABSPodcast,
ABSSessionsResponse,
ABSSessionUpdate,
ABSUser,
)

Expand All @@ -44,19 +52,31 @@ def __init__(self) -> None:
self.audiobook_libraries: list[ABSLibrary] = []
self.user: ABSUser
self.check_ssl: bool
# I would like to receive opened sessions via the API, however, it appears
# that this only possible for closed sessions. That's probably because
# abs expects only a single session per device
self.open_playback_session_ids: UniqueList[str] = UniqueList([])

async def init(
self,
session: ClientSession,
base_url: str,
username: str,
password: str,
logger: logging.Logger | None = None,
check_ssl: bool = True,
) -> None:
"""Initialize."""
self.session = session
self.base_url = base_url
self.check_ssl = check_ssl

if logger is None:
self.logger = logging.getLogger(name="ABSClient")
self.logger.setLevel(logging.DEBUG)
else:
self.logger = logger

self.session_headers = {}
self.user = await self.login(username=username, password=password)
self.token: str = self.user.token
Expand All @@ -80,7 +100,7 @@ async def _post(
)
status = response.status
if status != ABSStatus.STATUS_OK.value:
raise RuntimeError(f"API post call to {endpoint=} failed.")
raise RuntimeError(f"API post call to {endpoint=} failed with {status=}.")
return await response.read()

async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
Expand Down Expand Up @@ -231,15 +251,18 @@ async def _update_progress(
data={"isFinished": is_finished},
)
if is_finished:
self.logger.debug(f"Marked played {endpoint}")
return
percentage = progress_seconds / duration_seconds
await self._patch(
endpoint,
data={"progress": progress_seconds / duration_seconds},
data={"progress": percentage},
)
await self._patch(
endpoint,
data={"duration": duration_seconds, "currentTime": progress_seconds},
)
self.logger.debug(f"Updated to {percentage * 100:.0f}%")

async def update_podcast_progress(
self,
Expand Down Expand Up @@ -291,3 +314,105 @@ async def get_audiobook(self, id_: str) -> ABSAudioBook:
# this endpoint gives more audiobook extra data
audiobook = await self._get(f"items/{id_}?expanded=1")
return ABSAudioBook.from_json(audiobook)

async def get_playback_session_podcast(
self, device_info: ABSDeviceInfo, podcast_id: str, episode_id: str
) -> ABSPlaybackSessionExpanded:
"""Get Podcast playback session.
Returns an open session if it is already available.
"""
endpoint = f"items/{podcast_id}/play/{episode_id}"
# by adding in the media item id, we can have several
# open sessions (i.e. we are able to stream more than a single
# audiobook/ podcast from abs at the same time)
# also fixes preload in playlist
device_info.device_id += f"/{podcast_id}/{episode_id}"
return await self._get_playback_session(endpoint, device_info=device_info)

async def get_playback_session_audiobook(
self, device_info: ABSDeviceInfo, audiobook_id: str
) -> ABSPlaybackSessionExpanded:
"""Get Audiobook playback session.
Returns an open session if it is already available.
"""
endpoint = f"items/{audiobook_id}/play"
# see podcast comment above
device_info.device_id += f"/{audiobook_id}"
return await self._get_playback_session(endpoint, device_info=device_info)

async def get_open_playback_session(self, session_id: str) -> ABSPlaybackSessionExpanded | None:
"""Return open playback session."""
data = await self._get(f"session/{session_id}")
if data:
return ABSPlaybackSessionExpanded.from_json(data)
else:
return None

async def _get_playback_session(
self, endpoint: str, device_info: ABSDeviceInfo
) -> ABSPlaybackSessionExpanded:
"""Get an ABS Playback Session.
You can only have a single session per device.
"""
play_request = ABSPlayRequest(
device_info=device_info,
force_direct_play=False,
force_transcode=False,
# specifying no supported mime types makes abs send the file
# via hls but without transcoding to another format
supported_mime_types=[],
)
data = await self._post(endpoint, data=play_request.to_dict())
session = ABSPlaybackSessionExpanded.from_json(data)
self.logger.debug(
f"Got playback session {session.id_} "
f"for {session.media_type} named {session.display_title}"
)
self.open_playback_session_ids.append(session.id_)
return session

async def close_playback_session(self, playback_session_id: str) -> None:
"""Close an open playback session."""
# optional data would be ABSSessionUpdate
self.logger.debug(f"Closing playback session {playback_session_id=}")
await self._post(f"session/{playback_session_id}/close")

async def sync_playback_session(
self, playback_session_id: str, update: ABSSessionUpdate
) -> None:
"""Sync an open playback session."""
await self._post(f"session/{playback_session_id}/sync", data=update.to_dict())

async def get_all_closed_playback_sessions(self) -> AsyncGenerator[ABSPlaybackSession]:
"""Get library items with pagination.
This returns only sessions, which are already closed.
"""
page_cnt = 0
while True:
data = await self._get(
"me/listening-sessions",
params={"itemsPerPage": LIMIT_ITEMS_PER_PAGE, "page": page_cnt},
)
page_cnt += 1

sessions = ABSSessionsResponse.from_json(data).sessions
self.logger.debug([session.device_info for session in sessions])
if sessions:
for session in sessions:
yield session
else:
return

async def close_all_playback_sessions(self) -> None:
"""Cleanup all playback sessions opened by us."""
if self.open_playback_session_ids:
self.logger.debug("Closing our playback sessions.")
for session_id in self.open_playback_session_ids:
try:
await self.close_playback_session(session_id)
except RuntimeError:
self.logger.debug(f"Was unable to close session {session_id}")
Loading

0 comments on commit fe63c1f

Please sign in to comment.