Skip to content

Commit

Permalink
Added initial code for new auth flow
Browse files Browse the repository at this point in the history
  • Loading branch information
itchannel committed Nov 14, 2023
1 parent 9fed392 commit 87525f8
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 36 deletions.
250 changes: 215 additions & 35 deletions custom_components/fordpass/fordpass_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
defaultHeaders = {
"Accept": "*/*",
"Accept-Language": "en-us",
"User-Agent": "FordPass/23 CFNetwork/1408.0.4 Darwin/22.5.0",
"User-Agent": "FordPass/26 CFNetwork/1485 Darwin/23.1.0",
"Accept-Encoding": "gzip, deflate, br",
}

loginHeaders = {
"Accept": "*/*",
"Accept-Language": "en-us",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
"Accept-Encoding": "gzip, deflate, br",
}

Expand All @@ -32,13 +39,27 @@
"North America & Canada": "71A3AD0A-CF46-4CCF-B473-FC7FE5BC4592",
}

locale_lookup = {
"UK&Europe": "EN-GB",
"Australia": "EN-AU",
"North America & Canada": "EN-US",
}

locale_short_lookup = {
"UK&Europe": "GB",
"Australia": "AUS",
"North America & Canada": "USA",
}


NEW_API = True

BASE_URL = "https://usapi.cv.ford.com/api"
GUARD_URL = "https://api.mps.ford.com/api"
SSO_URL = "https://sso.ci.ford.com"
AUTONOMIC_URL = "https://api.autonomic.ai/v1"
AUTONOMIC_ACCOUNT_URL = "https://accounts.autonomic.ai/v1"
FORD_LOGIN_URL = "https://login.ford.com"

session = requests.Session()

Expand All @@ -53,6 +74,8 @@ def __init__(
self.password = password
self.save_token = save_token
self.region = region_lookup[region]
self.country_code = locale_lookup[region]
self.short_code = locale_short_lookup[region]
self.region2 = region
self.vin = vin
self.token = None
Expand Down Expand Up @@ -81,6 +104,149 @@ def generate_hash(self, code):
hashengine = hashlib.sha256()
hashengine.update(code.encode('utf-8'))
return self.base64_url_encode(hashengine.digest()).decode('utf-8')

def auth2_step1(self):
"""Auth2 step 1 obtain tokens"""
_LOGGER.debug("Running Step1 new!")
headers = {
**loginHeaders,
}
code1 = ''.join(random.choice(string.ascii_lowercase) for i in range(43))
code_verifier = self.generate_hash(code1)
step1_session = requests.session()
step1_url = f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/authorize?redirect_uri=fordapp://userauthorized&response_type=code&max_age=3600&scope=%2009852200-05fd-41f6-8c21-d36d3497dc64%20openid&client_id=09852200-05fd-41f6-8c21-d36d3497dc64&code_challenge={code_verifier}&code_challenge_method=S256&ui_locales={self.country_code}&language_code={self.country_code}&country_code={self.short_code}&ford_application_id=5C80A6BB-CF0D-4A30-BDBF-FC804B5C1A98"
step1get = step1_session.get(
step1_url,
headers=headers,
)

step1get.raise_for_status()

#_LOGGER.debug(step1_session.text)
pattern = r'var SETTINGS = (\{[^;]*\});'
#_LOGGER.debug(step1get.text)
match = re.search(pattern, step1get.text)
transId = None
csrfToken = None
if match:
settings = match.group(1)
settings_json = json.loads(settings)
_LOGGER.debug(settings_json)
_LOGGER.debug(settings_json["transId"])
transId = settings_json["transId"]
csrfToken = settings_json["csrf"]
_LOGGER.debug(step1get.status_code)
_LOGGER.debug(step1_session.cookies.get_dict())
data = {
"request_type": "RESPONSE",
"signInName": self.username,
"password": self.password,
}
urlp = f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/SelfAsserted?tx={transId}&p=B2C_1A_SignInSignUp_en-AU"
_LOGGER.debug(urlp)
headers = {
**loginHeaders,
"Origin": "https://login.ford.com",
"Referer": step1_url,
"X-Csrf-Token": csrfToken
}
step1post = step1_session.post(
urlp,
headers=headers,
data=data
)
step1post.raise_for_status()
_LOGGER.debug("checking password")
_LOGGER.debug(step1post.text)
_LOGGER.debug(step1post.status_code)
cookie_dict = step1_session.cookies.get_dict()
_LOGGER.debug(cookie_dict)




step1pt2 = step1_session.get(
f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/api/CombinedSigninAndSignup/confirmed?rememberMe=false&csrf_token={csrfToken}",
headers=headers,
allow_redirects=False,
)
step1pt2.raise_for_status()

test = step1pt2.headers["Location"]
_LOGGER.debug(test)

code_new = test.replace("fordapp://userauthorized/?code=","")

_LOGGER.debug(code_new)

data = {
"client_id" : "09852200-05fd-41f6-8c21-d36d3497dc64",
"grant_type": "authorization_code",
"code_verifier": code1,
"code": code_new,
"redirect_uri": "fordapp://userauthorized"

}

step1pt3 = step1_session.post(
f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/token",
headers=headers,
data=data
)
step1pt3.raise_for_status()

_LOGGER.debug(step1pt3.status_code)
_LOGGER.debug(step1pt3.text)

tokens = step1pt3.json()
if tokens:
if self.auth2_step2(tokens):
return tokens
else:
_LOGGER.debug("DAM IT WENT WRONG")









def auth2_step2(self, result):
_LOGGER.debug(result)

data = {"idpToken": result["access_token"]}
headers = {**apiHeaders, "Application-Id": self.region}
response = session.post(
f"{GUARD_URL}/token/v2/cat-with-b2c-access-token",
data=json.dumps(data),
headers=headers,
)
response.raise_for_status()
_LOGGER.debug(response.status_code)
_LOGGER.debug(response.text)
result = response.json()
self.token = result["access_token"]
_LOGGER.debug(self.token)
self.refresh_token = result["refresh_token"]
self.expires_at = time.time() + result["expires_in"]
_LOGGER.debug(self.expires_at)
auto_token = self.get_auto_token()
_LOGGER.debug("AUTO 2")
self.auto_token = auto_token["access_token"]
self.auto_expires_at = time.time() + result["expires_in"]
if self.save_token:
result["expiry_date"] = time.time() + result["expires_in"]
result["auto_token"] = auto_token["access_token"]
result["auto_refresh"] = "" #auto_token["refresh_token"]
result["auto_expiry"] = time.time() + auto_token["expires_in"]

self.write_token(result)
session.cookies.clear()
_LOGGER.debug("Step 5 Complete")
return True


def auth_step1(self):
"""Obtain data-ibm-login-url"""
Expand All @@ -93,13 +259,13 @@ def auth_step1(self):
# _LOGGER.debug("Before")
code1 = ''.join(random.choice(string.ascii_lowercase) for i in range(43))
code_verifier = self.generate_hash(code1)
url1 = f"{SSO_URL}/v1.0/endpoint/default/authorize?redirect_uri=fordapp://userauthorized&response_type=code&scope=openid&max_age=3600&client_id=9fb503e0-715b-47e8-adfd-ad4b7770f73b&code_challenge={code_verifier}&code_challenge_method=S256"
url1 = f"https://login.ford.com/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/authorize?redirect_uri=fordapp://userauthorized&response_type=code&scope=%2009852200-05fd-41f6-8c21-d36d3497dc64%20openid&max_age=3600&client_id=09852200-05fd-41f6-8c21-d36d3497dc64&code_challenge={code_verifier}&code_challenge_method=S256&ui_locales=en-AU&language_code=en-AU&country_code=AUS&ford_application_id=5C80A6BB-CF0D-4A30-BDBF-FC804B5C1A98"
response = session.get(
url1,
headers=headers,
)
# _LOGGER.debug(response.text)
# _LOGGER.debug(response.status_code)
_LOGGER.debug(response.text)
_LOGGER.debug(response.status_code)
if response.status_code != 200:
_LOGGER.debug("Incorrect response from URL")
raise Exception("Response from URL was invalid")
Expand Down Expand Up @@ -271,61 +437,64 @@ def auth(self):
_LOGGER.debug(self.errors)

# Run Step 1 auth
ibm_urls = self.auth_step1()
access_tokens = self.auth2_step1()
# ibm_urls = self.auth_step1()

if ibm_urls is None:
self.errors += 1
if self.errors <= 10:
self.auth()
else:
raise Exception("Step 1 has reached error limit")

# Run Step 2 auth
login_url = self.auth_step2(ibm_urls["ibm_url"])
# if ibm_urls is None:
# self.errors += 1
# if self.errors <= 10:
# self.auth()
# else:
# raise Exception("Step 1 has reached error limit")

if login_url is None:
self.errors += 1
if self.errors <= 10:
self.auth()
else:
raise Exception("Step 2 has reached error limit")
# # Run Step 2 auth
# login_url = self.auth_step2(ibm_urls["ibm_url"])

# Run Step 3 auth
codes = self.auth_step3(login_url)
# if login_url is None:
# self.errors += 1
# if self.errors <= 10:
# self.auth()
# else:
# raise Exception("Step 2 has reached error limit")

if codes is None:
self.errors += 1
if self.errors <= 10:
self.auth()
else:
raise Exception("Step 3 has reached error limit")
# # Run Step 3 auth
# codes = self.auth_step3(login_url)

# if codes is None:
# self.errors += 1
# if self.errors <= 10:
# self.auth()
# else:
# raise Exception("Step 3 has reached error limit")

# Run Step 4 auth
access_tokens = self.auth_step4(codes, ibm_urls["code1"])
# # Run Step 4 auth
# access_tokens = self.auth_step4(codes, ibm_urls["code1"])

if access_tokens is None:
self.errors += 1
if self.errors <= 10:
self.auth()
else:
raise Exception("Step 4 has reached error limit")
raise Exception("Step 1 has reached error limit")

# Run Step 5 auth
success = self.auth_step5(access_tokens)

#success = self.auth_step5(access_tokens)
success = self.auth2_step2(access_tokens)
if success is False:
self.errors += 1
if self.errors <= 10:
self.auth()
else:
raise Exception("Step 5 has reached error limit")
raise Exception("Step 2 has reached error limit")
else:
self.errors = 0
return True
return False

def refresh_token_func(self, token):
"""Refresh token if still valid"""
_LOGGER.debug("Refreshing token")
data = {"refresh_token": token["refresh_token"]}
headers = {**apiHeaders, "Application-Id": self.region}

Expand Down Expand Up @@ -445,11 +614,14 @@ def get_auto_token(self):

}

_LOGGER.debug(data)

r = session.post(
f"{AUTONOMIC_ACCOUNT_URL}/auth/oidc/token",
data=data,
headers=headers
)
_LOGGER.debug(r.text)

if r.status_code == 200:
result = r.json()
Expand Down Expand Up @@ -494,8 +666,9 @@ def status(self):
_LOGGER.debug("NEW API???")

if r.status_code == 200:
# _LOGGER.debug(r.text)
#_LOGGER.debug(r.text)
result = r.json()

return result
if r.status_code == 401:
self.auth()
Expand Down Expand Up @@ -744,7 +917,14 @@ def __request_and_poll_command(self, command, vin=None):
_LOGGER.debug("Command succeeded")
return True
if status["states"][f"{command}Command"]["value"]["toState"] == "expired":
_LOGGER.debug("Command expired")
_LOGGER.warning(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Expired Status')}")
if "statusRefresh" in command:
raise exceptions.HomeAssistantError(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Expired Status')}")
return False
if status["states"][f"{command}Command"]["value"]["toState"] == "failed":
_LOGGER.warning(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Failed Status')}")
if "statusRefresh" in command:
raise exceptions.HomeAssistantError(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Failed Status')}")
return False
i += 1
_LOGGER.debug("Looping again")
Expand Down
2 changes: 1 addition & 1 deletion custom_components/fordpass/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
"loggers": ["custom_components.fordpass"],
"requirements": [],
"ssdp": [],
"version": "0.1.61",
"version": "0.1.63",
"zeroconf": []
}
4 changes: 4 additions & 0 deletions info.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## **Changelog**
### Version 1.63 - Beta
- Reworked authentication to use login.ford.com
### Version 1.62
- Skipped due to emergency release of auth changes
### Version 1.61
- Deepsleep status is now reported again as a sensor
- Compass Direction is now an attribute under the device_tracker entity
Expand Down

0 comments on commit 87525f8

Please sign in to comment.