forked from d-mcknight/skill-plex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplex_api.py
141 lines (127 loc) · 5.21 KB
/
plex_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from typing import List
from ovos_utils.log import LOG
from ovos_workshop.backwards_compat import MediaEntry, MediaType, PlaybackType
from plexapi.audio import Album, Artist, Track
from plexapi.library import MovieSection, MusicSection, ShowSection
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi.video import Episode, Movie, Show
class PlexAPI:
"""Thinly wrapped plexapi library for OVOS Common Play results"""
def __init__(self, token: str):
self.servers: List[PlexServer] = []
self.movies: List[MovieSection] = []
self.shows: List[ShowSection] = []
self.music: List[MusicSection] = []
self.connect_to_servers(token)
self.init_libraries()
def connect_to_servers(self, token: str):
"""Provide connections to all servers accessible from the provided token."""
account = MyPlexAccount(token=token)
servers = [
r
for r in account.resources()
if "server" in r.provides and r.presence is True
]
LOG.info(
"Found %s active servers: %s",
len(servers),
",".join([s.name for s in servers]),
)
self.servers = [account.resource(server.name).connect() for server in servers]
def init_libraries(self):
"""Initialize server libraries, specifically Movies, Shows, and Music."""
for server in self.servers:
lib = server.library.sections()
for section in lib:
if isinstance(section, MovieSection):
self.movies.append(section)
elif isinstance(section, ShowSection):
self.shows.append(section)
elif isinstance(section, MusicSection):
self.music.append(section)
def search_music(self, query: str):
"""Search music libraries"""
track_list = []
for music in self.music:
results = music.hubSearch(query)
for result in results:
tracks = self._get_tracks_from_result(result)
track_list += [self._construct_track_dict(track) for track in tracks]
LOG.debug("Found %s music tracks in Plex", len(track_list))
return track_list
def _get_tracks_from_result(self, result):
"""Get music Tracks from search results"""
if isinstance(result, (Album, Artist)):
return result.tracks()
if isinstance(result, Track):
return [result]
return []
def _construct_track_dict(self, track) -> MediaEntry:
"""Construct a dictionary of Tracks for use with OVOS Common Play"""
return MediaEntry(
media_type=MediaType.MUSIC,
uri=track.getStreamURL(),
title=track.title,
playback=PlaybackType.AUDIO,
image=track.thumbUrl if track.thumbUrl else "",
artist=track.grandparentTitle,
length=track.duration,
)
def search_movies(self, query: str):
"""Search movie libraries"""
movie_list = []
for movies in self.movies:
results = movies.hubSearch(query)
for result in results:
if isinstance(result, Movie):
movie_list.append(self._construct_movie_dict(result))
LOG.debug("Found %s movies in Plex", len(movie_list))
return movie_list
def _construct_movie_dict(self, mov):
"""Construct a dictionary of Movies for use with OVOS Common Play"""
return MediaEntry(
media_type=MediaType.MOVIE,
uri=mov.getStreamURL(),
title=mov.title,
playback=PlaybackType.VIDEO,
image=mov.thumbUrl if mov.thumbUrl else "",
artist=(
", ".join([director.tag for director in mov.directors])
if mov.directors
else ""
),
length=mov.duration,
)
def search_shows(self, query: str):
"""Search TV Show libraries"""
show_list = []
for shows in self.shows:
results = shows.hubSearch(query)
for result in results:
episodes = self._get_episodes_from_result(result)
show_list += [self._construct_show_dict(show) for show in episodes]
LOG.debug("Found %s TV shows in Plex", len(show_list))
return show_list
def _get_episodes_from_result(self, result):
"""Get TV Episodes from search results"""
if isinstance(result, Show):
return result.episodes()
if isinstance(result, Episode):
return [result]
return []
def _construct_show_dict(self, show):
"""Construct a dictionary of Shows for use with OVOS Common Play"""
return MediaEntry(
media_type=MediaType.TV,
uri=show.getStreamURL(),
title=f"{show.seasonEpisode if show.seasonEpisode else ''} - {show.title}",
playback=PlaybackType.VIDEO,
image=show.thumbUrl if show.thumbUrl else "",
artist=(
", ".join([director.tag for director in show.directors])
if show.directors
else ""
),
length=show.duration,
)