-
-
Notifications
You must be signed in to change notification settings - Fork 740
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Fix the stevenage_gov_uk source * Update documentation * reformatting + ./update_docu_links.py + add findmyaddress.co.uk link in md file --------- Co-authored-by: Adam Prickett <adam.prickett@ampersa.co.uk> Co-authored-by: 5ila5 <5ila5@users.noreply.github.com>
- Loading branch information
1 parent
9739594
commit 41aa829
Showing
5 changed files
with
107 additions
and
98 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 86 additions & 84 deletions
170
...components/waste_collection_schedule/waste_collection_schedule/source/stevenage_gov_uk.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,108 +1,110 @@ | ||
import json | ||
import requests | ||
import urllib3 | ||
from datetime import datetime | ||
import datetime | ||
import time | ||
|
||
import requests | ||
from waste_collection_schedule import Collection # type: ignore[attr-defined] | ||
|
||
# With verify=True the POST fails due to a SSLCertVerificationError. | ||
# Using verify=False works, but is not ideal. The following links may provide a better way of dealing with this: | ||
# https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings | ||
# https://urllib3.readthedocs.io/en/1.26.x/user-guide.html#ssl | ||
# This line suppresses the InsecureRequestWarning when using verify=False | ||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | ||
|
||
|
||
TITLE = "Stevenage Borough Council" | ||
DESCRIPTION = "Source for stevenage.gov.uk services for Stevenage, UK." | ||
URL = "https://stevenage.gov.uk" | ||
TITLE = "Stevenage" | ||
DESCRIPTION = "Source for Stevenage." | ||
URL = "https://www.stevenage.gov.uk/" | ||
TEST_CASES = { | ||
"Coopers Close schedule": {"road": "Coopers Close", "postcode": "SG2 9TL"}, | ||
"Wansbeck Close schedule": {"road": "Wansbeck Close", "postcode": "SG1 6AA"}, | ||
"Chepstow Close schedule": {"road": "Chepstow Close", "postcode": "SG1 5TT"}, | ||
"Chepstow Close": {"uprn": "100080879233"}, | ||
"Rectory Lane": {"uprn": "100081137566"}, | ||
"Neptune Gate": {"uprn": "200000585910"}, | ||
} | ||
|
||
SEARCH_URLS = { | ||
"round_search": "https://services.stevenage.gov.uk/~?a=find&v=1&p=P1&c=P1_C33_&act=P1_A43_", | ||
"collection_search": "https://services.stevenage.gov.uk/~?a=find&v=1&p=P1&c=P1_C37_&act=P1_A64_", | ||
} | ||
ICON_MAP = { | ||
"REFUSE": "mdi:trash-can", | ||
"RECYCLING": "mdi:recycle", | ||
"general waste": "mdi:trash-can", | ||
"recycling": "mdi:recycle", | ||
} | ||
COLLECTIONS = {"Rubbish", "Recycling"} | ||
|
||
SESSION_URL = "https://stevenage-self.achieveservice.com/authapi/isauthenticated?uri=https%3A%2F%2Fstevenage-self.achieveservice.com%2Fen%2Fservice%2FCheck_your_household_bin_collection_days&hostname=stevenage-self.achieveservice.com&withCredentials=true" | ||
TOKEN_URL = ( | ||
"https://stevenage-self.achieveservice.com/apibroker/runLookup?id=5e55337a540d4" | ||
) | ||
API_URL = "https://stevenage-self.achieveservice.com/apibroker/runLookup" | ||
|
||
|
||
class Source: | ||
def __init__(self, road, postcode): | ||
self._road = road | ||
self._postcode = postcode | ||
def __init__(self, uprn): | ||
self._uprn = str(uprn) | ||
|
||
def fetch(self): | ||
data = { | ||
"formValues": { | ||
"Section 1": { | ||
"token": {"value": ""}, | ||
"LLPGUPRN": { | ||
"value": self._uprn, | ||
}, | ||
"MinimumDateLookAhead": { | ||
"value": time.strftime("%Y-%m-%d"), | ||
}, | ||
"MaximumDateLookAhead": { | ||
"value": str(int(time.strftime("%Y")) + 1) | ||
+ time.strftime("-%m-%d"), | ||
}, | ||
} | ||
} | ||
} | ||
|
||
s = requests.Session() | ||
|
||
# Get Round ID and Round Code | ||
# Don't fully understand significance of all of the fields, but API borks if they are not present | ||
roundData = { | ||
"data": { | ||
"fields": ["P1_C31_", "P1_C31_", "P1_C105_", "P1_C105_"], | ||
"rows": [[self._road, self._road, self._postcode, self._postcode]], | ||
}, | ||
"sequence": 1, | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"User-Agent": "Mozilla/5.0", | ||
"X-Requested-With": "XMLHttpRequest", | ||
"Referer": "https://stevenage-self.achieveservice.com/fillform/?iframe_id=fillform-frame-1&db_id=", | ||
} | ||
s = requests.session() | ||
r = s.get(SESSION_URL) | ||
r.raise_for_status() | ||
session_data = r.json() | ||
sid = session_data["auth-session"] | ||
|
||
headers = {"Content-type": "application/json", "Accept": "text/plain"} | ||
roundRequest = s.post( | ||
SEARCH_URLS["round_search"], data=json.dumps(roundData), headers=headers, verify=False | ||
) | ||
roundJson = json.loads(roundRequest.text) | ||
t = s.get(TOKEN_URL) | ||
t.raise_for_status() | ||
token_data = t.json() | ||
data["formValues"]["Section 1"]["token"]["value"] = token_data["integration"][ | ||
"transformed" | ||
]["rows_data"]["0"]["token"] | ||
|
||
# Get collection info | ||
collectionData = { | ||
"data": { | ||
"fields": [ | ||
"P1_C37_.selectedRowData.id", | ||
"P1_C37_.selectedRowData.roundCode", | ||
], | ||
"rows": [[roundJson["rows"][0][0], roundJson["rows"][0][2]]], | ||
}, | ||
"sequence": 1, | ||
"childQueries": [ | ||
{ | ||
"data": { | ||
"fields": ["P1_C37_.selectedRowData.id"], | ||
"rows": [[roundJson["rows"][0][0]]], | ||
}, | ||
"index": 0, | ||
} | ||
], | ||
params = { | ||
"id": "64ba8cee353e6", | ||
"repeat_against": "", | ||
"noRetry": "false", | ||
"getOnlyTokens": "undefined", | ||
"log_id": "", | ||
"app_name": "AF-Renderer::Self", | ||
# unix_timestamp | ||
"_": str(int(time.time() * 1000)), | ||
"sid": sid, | ||
} | ||
|
||
collectionRequest = s.post( | ||
SEARCH_URLS["collection_search"], | ||
data=json.dumps(collectionData), | ||
headers=headers,verify=False | ||
) | ||
collectionJson = json.loads(collectionRequest.text) | ||
r = s.post(API_URL, json=data, headers=headers, params=params) | ||
r.raise_for_status() | ||
data = r.json() | ||
rows_data = data["integration"]["transformed"]["rows_data"] | ||
if not isinstance(rows_data, dict): | ||
raise ValueError("Invalid data returned from API") | ||
|
||
entries = [] | ||
for collection in collectionJson["rows"]: | ||
if collection[2] == "Recycling collection": | ||
entries.append( | ||
Collection( | ||
date=datetime.strptime(collection[1], "%d/%m/%Y").date(), | ||
t="Recycling", | ||
icon=ICON_MAP.get("RECYCLING"), | ||
) | ||
) | ||
elif collection[2] == "Refuse collection": | ||
entries.append( | ||
Collection( | ||
date=datetime.strptime(collection[1], "%d/%m/%Y").date(), | ||
t="Refuse", | ||
icon=ICON_MAP.get("REFUSE"), | ||
) | ||
for key in rows_data: | ||
value = rows_data[key] | ||
bin_type = value["bintype"].strip() | ||
|
||
try: | ||
date = datetime.datetime.strptime( | ||
value["collectiondate"], "%A %d %B %Y" | ||
).date() | ||
except ValueError: | ||
continue | ||
|
||
entries.append( | ||
Collection( | ||
date=date, | ||
t=bin_type, | ||
icon=ICON_MAP.get(bin_type.lower()), | ||
) | ||
) | ||
|
||
return entries |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.