-
-
Notifications
You must be signed in to change notification settings - Fork 314
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add Subtitulamos provider integration
- Loading branch information
Showing
7 changed files
with
339 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
"""Language converter for Subtitulamos.""" | ||
|
||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
from babelfish import LanguageReverseConverter, language_converters | ||
|
||
if TYPE_CHECKING: | ||
from . import LanguageTuple | ||
|
||
|
||
class SubtitulamosConverter(LanguageReverseConverter): | ||
"""Language converter for Subtitulamos.""" | ||
|
||
def __init__(self) -> None: | ||
self.name_converter = language_converters['name'] | ||
self.from_subtitulamos: dict[str, LanguageTuple] = { | ||
'Español': ('spa',), | ||
'Español (España)': ('spa',), | ||
'Español (Latinoamérica)': ('spa', 'MX'), | ||
'Català': ('cat',), | ||
'English': ('eng',), | ||
'Galego': ('glg',), | ||
'Portuguese': ('por',), | ||
'English (US)': ('eng', 'US'), | ||
'English (UK)': ('eng', 'GB'), | ||
'Brazilian': ('por', 'BR'), | ||
} | ||
self.to_subtitulamos: dict[LanguageTuple, str] = { | ||
('cat',): 'Català', | ||
('glg',): 'Galego', | ||
('por', 'BR'): 'Brazilian', | ||
} | ||
self.codes = set(self.from_subtitulamos.keys()) | ||
|
||
def convert(self, alpha3: str, country: str | None = None, script: str | None = None) -> str: | ||
"""Convert an alpha3 language code with an alpha2 country code and a script code into a custom code.""" | ||
if (alpha3, country) in self.to_subtitulamos: | ||
return self.to_subtitulamos[(alpha3, country)] | ||
if (alpha3,) in self.to_subtitulamos: | ||
return self.to_subtitulamos[(alpha3,)] | ||
|
||
return self.name_converter.convert(alpha3, country, script) | ||
|
||
def reverse(self, code: str) -> LanguageTuple: | ||
"""Reverse a custom code into alpha3, country and script code.""" | ||
if code in self.from_subtitulamos: | ||
return self.from_subtitulamos[code] | ||
|
||
return self.name_converter.reverse(code) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
"""Provider for Subtitulamos.""" | ||
|
||
from __future__ import annotations | ||
|
||
import contextlib | ||
import json | ||
import logging | ||
from typing import TYPE_CHECKING, Any, ClassVar | ||
|
||
from babelfish import Language, language_converters # type: ignore[import-untyped] | ||
from guessit import guessit | ||
from requests import Session | ||
|
||
from subliminal import __short_version__ | ||
from subliminal.cache import SHOW_EXPIRATION_TIME, region | ||
from subliminal.exceptions import ProviderError | ||
from subliminal.matches import guess_matches | ||
from subliminal.score import get_equivalent_release_groups | ||
from subliminal.subtitle import Subtitle, fix_line_ending | ||
from subliminal.utils import sanitize, sanitize_release_group | ||
from subliminal.video import Episode | ||
|
||
from . import ParserBeautifulSoup, Provider | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Set | ||
|
||
from requests import Response | ||
|
||
from subliminal.video import Video | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
with contextlib.suppress(ValueError): | ||
language_converters.register('subtitulamos = subliminal.converters.subtitulamos:SubtitulamosConverter') | ||
|
||
|
||
class SubtitulamosSubtitle(Subtitle): | ||
"""Subtitulamos Subtitle.""" | ||
|
||
provider_name: ClassVar[str] = 'subtitulamos' | ||
|
||
def __init__( | ||
self, | ||
language: Language, | ||
hearing_impaired: bool | None = None, | ||
page_link: str | None = None, | ||
series: str | None = None, | ||
season: int | None = None, | ||
episode: int | None = None, | ||
title: str | None = None, | ||
year: int | None = None, | ||
release_group: str | None = None, | ||
download_link: str | None = None, | ||
) -> None: | ||
super().__init__(language=language, hearing_impaired=hearing_impaired, page_link=page_link) | ||
self.page_link = page_link | ||
self.series = series | ||
self.season = season | ||
self.episode = episode | ||
self.title = title | ||
self.year = year | ||
self.release_group = release_group | ||
self.download_link = download_link | ||
|
||
@property | ||
def id(self) -> str: | ||
"""Unique identifier of the subtitle.""" | ||
return self.download_link | ||
|
||
def get_matches(self, video: Video) -> set[str]: | ||
"""Get the matches against the `video`.""" | ||
matches = set() | ||
|
||
# series | ||
if video.series and sanitize(self.series) == sanitize(video.series): | ||
matches.add('series') | ||
# season | ||
if video.season and self.season == video.season: | ||
matches.add('season') | ||
# episode | ||
if video.episode and self.episode == video.episode: | ||
matches.add('episode') | ||
# title | ||
if video.title and sanitize(self.title) == sanitize(video.title): | ||
matches.add('title') | ||
# year | ||
if video.original_series and self.year is None or video.year and video.year == self.year: | ||
matches.add('year') | ||
# release_group | ||
if ( | ||
video.release_group | ||
and self.release_group | ||
and any( | ||
r in sanitize_release_group(self.release_group) | ||
for r in get_equivalent_release_groups(sanitize_release_group(video.release_group)) | ||
) | ||
): | ||
matches.add('release_group') | ||
# resolution | ||
if video.resolution and self.release_group and video.resolution in self.release_group.lower(): | ||
matches.add('resolution') | ||
|
||
# other properties | ||
matches |= guess_matches(video, guessit(self.release_group), partial=True) | ||
|
||
return matches | ||
|
||
|
||
class SubtitulamosProvider(Provider): | ||
"""Subtitulamos Provider.""" | ||
|
||
languages: ClassVar[Set[Language]] = {Language('por', 'BR')} | { | ||
Language(lang) for lang in ['cat', 'eng', 'glg', 'por', 'spa'] | ||
} | ||
|
||
video_types = (Episode,) | ||
server_url = 'https://www.subtitulamos.tv' | ||
search_url = server_url + '/search/query' | ||
|
||
def __init__(self) -> None: | ||
self.session = None | ||
|
||
def initialize(self) -> None: | ||
"""Initialize the provider.""" | ||
self.session = Session() | ||
self.session.headers['User-Agent'] = f'Subliminal/{__short_version__}' | ||
|
||
def terminate(self) -> None: | ||
"""Terminate the provider.""" | ||
self.session.close() | ||
|
||
def _session_request(self, *args: Any, **kwargs: Any) -> Response: | ||
"""Perform a GET request to the provider.""" | ||
r = self.session.get(*args, **kwargs) | ||
r.raise_for_status() | ||
|
||
if r.status_code != 200: | ||
msg = 'Error requesting data' | ||
raise ProviderError(msg) | ||
|
||
return r | ||
|
||
def _query_search(self, search_param: str) -> list[dict[str, str]]: | ||
"""Search Series/Series + Season using query search method.""" | ||
r = self._session_request( | ||
self.search_url, headers={'Referer': self.server_url}, params={'q': search_param}, timeout=10 | ||
) | ||
return json.loads(r.text) | ||
|
||
def _read_series(self, series_url: str) -> ParserBeautifulSoup: | ||
"""Read series information from provider.""" | ||
r = self._session_request(self.server_url + series_url, headers={'Referer': self.server_url}, timeout=10) | ||
return ParserBeautifulSoup(r.content, ['lxml', 'html.parser']) | ||
|
||
def _get_episode_url(self, series_id: str, season: int, episode: int) -> str | None: | ||
"""Provides the URL for a specific episode of the series.""" | ||
series_content = self._read_series(f'/shows/{series_id}') | ||
|
||
for season_element in series_content.select('#season-choices a.choice'): | ||
if season == int(season_element.get_text()): | ||
if 'selected' not in season_element.get('class'): | ||
series_content = self._read_series(season_element['href']) | ||
break | ||
return None | ||
|
||
for episode_element in series_content.select('#episode-choices a.choice'): | ||
if episode == int(episode_element.get_text()): | ||
return episode_element['href'] | ||
return None | ||
|
||
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME) | ||
def _search_url_titles( | ||
self, series: str | None = None, season: int | None = None, episode: int | None = None, year: int | None = None | ||
) -> str: | ||
"""Search the URL titles by kind for the given `title`, `season` and `episode`. | ||
:param str series: series to search for. | ||
:param int season: season to search for. | ||
:param int episode: episode to search for. | ||
:param int year: year to search for. | ||
:return: the episode URL. | ||
:rtype: str | ||
""" | ||
logger.info('Searching episode url for %s, season %d, episode %d', series, season, episode) | ||
|
||
# attempt first with year | ||
series_response = self._query_search(f'{series} ({year})') | ||
if len(series_response) == 0: | ||
series_response = self._query_search(series) | ||
|
||
episode_url = self._get_episode_url(series_response[0]['show_id'], season, episode) | ||
|
||
return self.server_url + episode_url | ||
|
||
def query( | ||
self, series: str | None = None, season: int | None = None, episode: int | None = None, year: int | None = None | ||
) -> list[SubtitulamosSubtitle]: | ||
"""Query the provider for subtitles.""" | ||
# get the episode url | ||
episode_url = self._search_url_titles(series, season, episode, year) | ||
if episode_url is None: | ||
logger.error('No episode url found for %s, season %d, episode %d', series, season, episode) | ||
return [] | ||
|
||
r = self.session.get(episode_url, headers={'Referer': self.server_url}, timeout=10) | ||
r.raise_for_status() | ||
soup = ParserBeautifulSoup(r.content, ['lxml', 'html.parser']) | ||
|
||
# get episode title | ||
title = soup.select('#episode-name h3')[0].get_text().strip().lower() | ||
|
||
subtitles = [] | ||
for sub in soup.select('.download-button:not(unavailable)'): | ||
# read the language | ||
language = Language.fromsubtitulamos(sub.find_previous('div', class_='language-name').get_text().strip()) | ||
|
||
version_container = sub.find_previous('div', class_='version-container') | ||
|
||
hearing_impaired = False | ||
|
||
# modify spanish latino subtitle language to only spanish and set hearing_impaired = True | ||
# because if exists spanish and spanish latino subtitle for the same episode, the score will be | ||
# higher with spanish subtitle. Spanish subtitle takes priority. | ||
if language == Language('spa', 'MX'): | ||
language = Language('spa') | ||
hearing_impaired = True | ||
|
||
# read the release subtitle | ||
release_group = version_container.select('.version-container .text.spaced')[0].getText() | ||
|
||
# read the subtitle url | ||
subtitle_url = self.server_url + sub.parent['href'] | ||
subtitle = SubtitulamosSubtitle( | ||
language, | ||
hearing_impaired, | ||
episode_url, | ||
series, | ||
season, | ||
episode, | ||
title, | ||
year, | ||
release_group, | ||
subtitle_url, | ||
) | ||
logger.debug('Found subtitle %r', subtitle) | ||
subtitles.append(subtitle) | ||
|
||
return subtitles | ||
|
||
def list_subtitles(self, video: Video, languages: Set[Language]) -> list[SubtitulamosSubtitle]: | ||
"""List all the subtitles for the video.""" | ||
return [s for s in self.query(video.series, video.season, video.episode, video.year) if s.language in languages] | ||
|
||
def download_subtitle(self, subtitle: SubtitulamosSubtitle) -> None: | ||
"""Download the content of the subtitle.""" | ||
logger.info('Downloading subtitle %s', subtitle.download_link) | ||
r = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link}, timeout=10) | ||
r.raise_for_status() | ||
|
||
subtitle.content = fix_line_ending(r.content) |
Oops, something went wrong.