Skip to content

Commit

Permalink
Merge pull request #221 from exislow/188-support-for-hires
Browse files Browse the repository at this point in the history
Implements PKCE Authorization to enable access to HiRess files.
  • Loading branch information
tehkillerbee authored Jan 27, 2024
2 parents 4eb3354 + 8a5f54b commit 4a08d05
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 4 deletions.
8 changes: 8 additions & 0 deletions tidalapi/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
class Requests(object):
"""A class for handling api requests to TIDAL."""

user_agent: str

def __init__(self, session: "Session"):
# More Android User-Agents here: https://user-agents.net/browsers/android
self.user_agent = "Mozilla/5.0 (Linux; Android 12; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/91.0.4472.114 Safari/537.36"
self.session = session
self.config = session.config

Expand All @@ -76,6 +80,10 @@ def basic_request(

if not headers:
headers = {}

if "User-Agent" not in headers:
headers["User-Agent"] = self.user_agent

if self.session.token_type and self.session.access_token is not None:
headers["authorization"] = (
self.session.token_type + " " + self.session.access_token
Expand Down
148 changes: 144 additions & 4 deletions tidalapi/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import base64
import concurrent.futures
import datetime
import hashlib
import json
import locale
import logging
import os
import random
import time
import uuid
Expand All @@ -43,7 +45,7 @@
cast,
no_type_check,
)
from urllib.parse import urljoin
from urllib.parse import parse_qs, urlencode, urljoin, urlsplit

import requests

Expand Down Expand Up @@ -95,6 +97,8 @@ class Config:
Additionally, num_videos will turn into num_tracks in playlists.
"""

api_oauth2_token: str = "https://auth.tidal.com/v1/oauth2/token"
api_pkce_auth: str = "https://login.tidal.com/authorize"
api_v1_location: str = "https://api.tidal.com/v1/"
api_v2_location: str = "https://api.tidal.com/v2/"
api_token: str
Expand All @@ -105,6 +109,12 @@ class Config:
quality: str
video_quality: str
video_url: str = "https://resources.tidal.com/videos/%s/%ix%i.mp4"
# Necessary for PKCE authorization only
client_unique_key: str
code_verifier: str
code_challenge: str
pkce_uri_redirect: str = "https://tidal.com/android/login/auth"
client_id_pkce: str

@no_type_check
def __init__(
Expand Down Expand Up @@ -184,6 +194,23 @@ def __init__(
self.client_id = "".join(self.client_id)
self.client_secret = self.client_id
self.client_id = self.api_token
# PKCE Authorization. We will keep the former `client_id` as a fallback / will only be used for non PCKE
# authorizations.
self.client_unique_key = format(random.getrandbits(64), "02x")
self.code_verifier = base64.urlsafe_b64encode(os.urandom(32))[:-1].decode(
"utf-8"
)
self.code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier.encode("utf-8")).digest()
)[:-1].decode("utf-8")
self.client_id_pkce = base64.b64decode(
base64.b64decode(b"TmtKRVUxSmtjRXM=")
+ base64.b64decode(b"NWFIRkZRbFJuVlE9PQ==")
).decode("utf-8")
self.client_secret_pkce = base64.b64decode(
base64.b64decode(b"ZUdWMVVHMVpOMjVpY0ZvNVNVbGlURUZqVVQ=")
+ base64.b64decode(b"a3pjMmhyWVRGV1RtaGxWVUZ4VGpaSlkzTjZhbFJIT0QwPQ==")
).decode("utf-8")


class Case(Enum):
Expand Down Expand Up @@ -359,7 +386,7 @@ def load_oauth_session(
:param refresh_token: (Optional) A refresh token that lets you get a new access
token after it has expired
:param expiry_time: (Optional) The datetime the access token will expire
:return: True if we believe the log in was successful, otherwise false.
:return: True if we believe the login was successful, otherwise false.
"""
self.token_type = token_type
self.access_token = access_token
Expand Down Expand Up @@ -431,6 +458,109 @@ def login_oauth_file(self, oauth_file: Path) -> bool:
log.info("TIDAL Login KO")
return False

def login_pkce(self, fn_print: Callable[[str], None] = print) -> None:
"""Login handler for PKCE based authentication. This is the only way how to get
access to HiRes (Up to 24-bit, 192 kHz) FLAC files.
This handler will ask you to follow a URL, process with the login in the browser
and copy & paste the URL of the redirected browser page.
:param fn_print: A function which will be called to print the instructions,
defaults to `print()`.
:type fn_print: Callable, optional
:return:
"""
# Get login url
url_login: str = self.pkce_login_url()

fn_print("READ CAREFULLY!")
fn_print("---------------")
fn_print(
"You need to open this link and login with your username and password. "
"Afterwards you will be redirected to an 'Oops' page. "
"To complete the login you must copy the URL from this 'Oops' page and paste it to the input field."
)
fn_print(url_login)

# Get redirect URL from user input.
url_redirect: str = input("Paste 'Ooops' page URL here and press <ENTER>:")
# Query for auth tokens
json: dict[str, Union[str, int]] = self.pkce_get_auth_token(url_redirect)

# Parse and set tokens.
self.process_auth_token(json)

# Swap the client_id and secret
#self.client_enable_hires()

def client_enable_hires(self):
self.config.client_id = self.config.client_id_pkce
self.config.client_secret = self.config.client_secret_pkce

def pkce_login_url(self) -> str:
"""Returns the Login-URL to login via web browser.
:return: The URL the user has to use for login.
:rtype: str
"""
params: request.Params = {
"response_type": "code",
"redirect_uri": self.config.pkce_uri_redirect,
"client_id": self.config.client_id_pkce,
"lang": "EN",
"appMode": "android",
"client_unique_key": self.config.client_unique_key,
"code_challenge": self.config.code_challenge,
"code_challenge_method": "S256",
"restrict_signup": "true",
}

return self.config.api_pkce_auth + "?" + urlencode(params)

def pkce_get_auth_token(self, url_redirect: str) -> dict[str, Union[str, int]]:
"""Parses the redirect url to extract access and refresh tokens.
:param url_redirect: URL of the 'Ooops' page, where the user was redirected to
after login.
:type url_redirect: str
:return: A parsed JSON object with access and refresh tokens and other
information.
:rtype: dict[str, str | int]
"""
# w_usr=WRITE_USR, r_usr=READ_USR_DATA, w_sub=WRITE_SUBSCRIPTION
scope_default: str = "r_usr+w_usr+w_sub"

# Extract the code parameter from query string
if url_redirect and "https://" in url_redirect:
code: str = parse_qs(urlsplit(url_redirect).query)["code"][0]
else:
raise Exception("The provided redirect url looks wrong: " + url_redirect)

# Set post data and call the API
data: request.Params = {
"code": code,
"client_id": self.config.client_id_pkce,
"grant_type": "authorization_code",
"redirect_uri": self.config.pkce_uri_redirect,
"scope": scope_default,
"code_verifier": self.config.code_verifier,
"client_unique_key": self.config.client_unique_key,
}
response = self.request_session.post(self.config.api_oauth2_token, data)

# Check response
if not response.ok:
log.error("Login failed: %s", response.text)
response.raise_for_status()

# Parse the JSON response.
try:
token: dict[str, Union[str, int]] = response.json()
except:
raise Exception("Wrong one-time authorization code", response)

return token

def login_oauth_simple(self, function: Callable[[str], None] = print) -> None:
"""Login to TIDAL using a remote link. You can select what function you want to
use to display the link.
Expand Down Expand Up @@ -496,6 +626,16 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:

def _process_link_login(self, json: JsonObj) -> None:
json = self._wait_for_link_login(json)
self.process_auth_token(json)

def process_auth_token(self, json: dict[str, Union[str, int]]) -> None:
"""Parses the authorization response and sets the token values to the specific
variables for further usage.
:param json: Parsed JSON response after login / authorization.
:type json: dict[str, str | int]
:return: None
"""
self.access_token = json["access_token"]
self.expiry_time = datetime.datetime.utcnow() + datetime.timedelta(
seconds=json["expires_in"]
Expand All @@ -512,7 +652,7 @@ def _wait_for_link_login(self, json: JsonObj) -> Any:
expiry = float(json["expiresIn"])
interval = float(json["interval"])
device_code = json["deviceCode"]
url = "https://auth.tidal.com/v1/oauth2/token"
url = self.config.api_oauth2_token
params = {
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
Expand Down Expand Up @@ -541,7 +681,7 @@ def token_refresh(self, refresh_token: str) -> bool:
:return: True if we believe the token was successfully refreshed, otherwise
False
"""
url = "https://auth.tidal.com/v1/oauth2/token"
url = self.config.api_oauth2_token
params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
Expand Down

0 comments on commit 4a08d05

Please sign in to comment.