Skip to content

Commit

Permalink
feat: adding headless control
Browse files Browse the repository at this point in the history
  • Loading branch information
robbrad committed Jan 6, 2024
1 parent 1bfa6fe commit c810759
Show file tree
Hide file tree
Showing 47 changed files with 210 additions and 143 deletions.
13 changes: 8 additions & 5 deletions custom_components/uk_bin_collection/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ async def get_council_schema(self, council=str) -> vol.Schema:
if self.councils_data is None:
self.councils_data = await self.get_councils_json()
council_schema = vol.Schema({})
if ("skip_get_url" not in self.councils_data[council] or
"custom_component_show_url_field" in self.councils_data[council]):
if (
"skip_get_url" not in self.councils_data[council]
or "custom_component_show_url_field" in self.councils_data[council]
):
council_schema = council_schema.extend(
{vol.Required("url", default=""): cv.string}
)
Expand All @@ -54,6 +56,9 @@ async def get_council_schema(self, council=str) -> vol.Schema:
council_schema = council_schema.extend(
{vol.Required("web_driver", default=""): cv.string}
)
council_schema = council_schema.extend(
{vol.Required("headless", default=True): cv.boolean}
)
return council_schema

async def async_step_user(self, user_input=None):
Expand Down Expand Up @@ -116,9 +121,7 @@ async def async_step_council(self, user_input=None):

# Create the config entry
_LOGGER.info(LOG_PREFIX + "Creating config entry with data: %s", user_input)
return self.async_create_entry(
title=user_input["name"], data=user_input
)
return self.async_create_entry(title=user_input["name"], data=user_input)

# Show the configuration form to the user with the specific councils necessary fields
council_schema = await self.get_council_schema(self.data["council"])
Expand Down
2 changes: 1 addition & 1 deletion custom_components/uk_bin_collection/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
STATE_ATTR_NEXT_COLLECTION = "next_collection"
STATE_ATTR_DAYS = "days"

DEVICE_CLASS = "bin_collection_schedule"
DEVICE_CLASS = "bin_collection_schedule"
33 changes: 23 additions & 10 deletions custom_components/uk_bin_collection/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def async_setup_entry(
for bin_type in coordinator.data.keys()
)


def get_latest_collection_info(data) -> dict:
# Get the current date
current_date = datetime.now()
Expand All @@ -92,14 +93,17 @@ def get_latest_collection_info(data) -> dict:
if collection_date.date() >= current_date.date():
# If the bin type is in the dict, update its collection date if needed; otherwise, add it
if bin_type in next_collection_dates:
if collection_date < datetime.strptime(next_collection_dates[bin_type], "%d/%m/%Y"):
if collection_date < datetime.strptime(
next_collection_dates[bin_type], "%d/%m/%Y"
):
next_collection_dates[bin_type] = collection_date_str
else:
next_collection_dates[bin_type] = collection_date_str

_LOGGER.info(f"{LOG_PREFIX} Next Collection Dates: {next_collection_dates}")
return next_collection_dates


class HouseholdBinCoordinator(DataUpdateCoordinator):
"""Household Bin Coordinator"""

Expand All @@ -124,8 +128,10 @@ async def _async_update_data(self):
_LOGGER.info(f"{LOG_PREFIX} UKBinCollectionApp: {data}")

if cm.expired:
_LOGGER.warning(f"{LOG_PREFIX} UKBinCollectionApp timeout expired during run")

_LOGGER.warning(
f"{LOG_PREFIX} UKBinCollectionApp timeout expired during run"
)

return get_latest_collection_info(json.loads(data))


Expand Down Expand Up @@ -162,7 +168,9 @@ def apply_values(self):
name = "{} {}".format(self.coordinator.name, self._bin_type)
self._id = name
self._name = name
self._next_collection = parser.parse(self.coordinator.data[self._bin_type], dayfirst=True).date()
self._next_collection = parser.parse(
self.coordinator.data[self._bin_type], dayfirst=True
).date()
self._hidden = False
self._icon = "mdi:trash-can"
self._colour = "red"
Expand All @@ -179,16 +187,22 @@ def apply_values(self):
next_week_start = this_week_end + timedelta(days=1)
next_week_end = next_week_start + timedelta(days=6)

self._days = (self._next_collection- now.date()).days
self._days = (self._next_collection - now.date()).days
_LOGGER.info(f"{LOG_PREFIX} _days: {self._days}")

if self._next_collection == now.date():
self._state = "Today"
elif self._next_collection == (now + timedelta(days=1)).date():
self._state = "Tomorrow"
elif self._next_collection >= this_week_start and self._next_collection <= this_week_end:
elif (
self._next_collection >= this_week_start
and self._next_collection <= this_week_end
):
self._state = f"This Week: {self._next_collection.strftime('%A')}"
elif self._next_collection >= next_week_start and self._next_collection <= next_week_end:
elif (
self._next_collection >= next_week_start
and self._next_collection <= next_week_end
):
self._state = f"Next Week: {self._next_collection.strftime('%A')}"
elif self._next_collection > next_week_end:
self._state = f"Future: {self._next_collection}"
Expand All @@ -199,7 +213,6 @@ def apply_values(self):

_LOGGER.info(f"{LOG_PREFIX} State of the sensor: {self._state}")


@property
def name(self):
"""Return the name of the bin."""
Expand Down Expand Up @@ -239,7 +252,7 @@ def colour(self):
def unique_id(self):
"""Return a unique ID to use for this sensor."""
return self._id

@property
def bin_type(self):
"""Return the bin type."""
Expand Down
1 change: 0 additions & 1 deletion uk_bin_collection/tests/input.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"web_driver": "http://selenium:4444",
"postcode": "DA5 3AH",
"uprn": "100020196143",
"headless": true,
"house_number": "1 Dorchester Avenue, Bexley",
"wiki_name": "Bexley Council",
"wiki_note": "In order to use this parser, you will need to sign up to [Bexley's @Home app](https://www.bexley.gov.uk/services/rubbish-and-recycling/bexley-home-recycling-app/about-app) (available for [iOS](https://apps.apple.com/gb/app/home-collection-reminder/id1050703690) and [Android](https://play.google.com/store/apps/details?id=com.contender.athome.android)).\nComplete the setup by entering your email and setting your address with postcode and address line.\nOnce you can see the calendar, you _should_ be good to run the parser.\nJust pass the email you used in quotes in the UPRN parameter.\n"
Expand Down
72 changes: 52 additions & 20 deletions uk_bin_collection/tests/test_common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_get_date_with_ordinal_exception():
result = get_date_with_ordinal(date_number)
assert exc_info.type == TypeError
assert (
exc_info.value.args[0] == "not all arguments converted during string formatting"
exc_info.value.args[0] == "not all arguments converted during string formatting"
)


Expand All @@ -91,11 +91,13 @@ def test_is_holiday():
result = is_holiday("2022, 12, 25")
assert result is True


def test_is_holiday():
date = "20"
result = is_holiday("2022, 12, 25")
assert result is True


def test_is_holiday_region():
date = "20"
result = is_holiday("2022, 12, 25", Region.WLS)
Expand Down Expand Up @@ -153,52 +155,82 @@ def test_get_weekday_dates_in_period_bad():
assert len(result) != 8
assert result[6] != "08/04/20232"


def test_get_next_occurrence_from_day_month_false():
result = get_next_occurrence_from_day_month(datetime(2023,12,1))
result = get_next_occurrence_from_day_month(datetime(2023, 12, 1))
assert result == datetime(2023, 12, 1, 0, 0)


def test_get_next_occurrence_from_day_month_true():
result = get_next_occurrence_from_day_month(datetime(2023,9,1))
assert result == pd.Timestamp('2024-09-01 00:00:00')
result = get_next_occurrence_from_day_month(datetime(2023, 9, 1))
assert result == pd.Timestamp("2024-09-01 00:00:00")


def test_update_input_json():
council = "test_council"
url = "TEST_URL"
postcode="TEST_POSTCODE"
uprn="TEST_UPRN"
web_driver="TEST_WEBDRIVER"
postcode = "TEST_POSTCODE"
uprn = "TEST_UPRN"
web_driver = "TEST_WEBDRIVER"
skip_get_url = True
update_input_json(council, url, postcode=postcode, uprn=uprn, web_driver=web_driver, skip_get_url=skip_get_url)
update_input_json(
council,
url,
postcode=postcode,
uprn=uprn,
web_driver=web_driver,
skip_get_url=skip_get_url,
)
cwd = os.getcwd()
input_file_path = os.path.join(cwd, "uk_bin_collection", "tests", "input.json")
result1 = os.path.exists(input_file_path)
with open(input_file_path, 'r') as f:
data = json.load(f)
result1 = os.path.exists(input_file_path)
with open(input_file_path, "r") as f:
data = json.load(f)
assert result1 == True
assert data[council] == {"postcode": postcode, "skip_get_url": skip_get_url, "uprn": uprn, "url": url, "web_driver": web_driver, "wiki_name": council}
assert data[council] == {
"postcode": postcode,
"skip_get_url": skip_get_url,
"uprn": uprn,
"url": url,
"web_driver": web_driver,
"wiki_name": council,
}


def test_update_input_json_fail(capsys, monkeypatch):
def mock_os_path_exists(path):
return False # Simulate the path not existing

monkeypatch.setattr(os.path, 'exists', mock_os_path_exists)
monkeypatch.setattr(os.path, "exists", mock_os_path_exists)

council = "test_council"
url = "TEST_URL"
postcode="TEST_POSTCODE"
uprn="TEST_UPRN"
web_driver="TEST_WEBDRIVER"
postcode = "TEST_POSTCODE"
uprn = "TEST_UPRN"
web_driver = "TEST_WEBDRIVER"
skip_get_url = True
update_input_json(council, url, postcode=postcode, uprn=uprn, web_driver=web_driver, skip_get_url=skip_get_url)
update_input_json(
council,
url,
postcode=postcode,
uprn=uprn,
web_driver=web_driver,
skip_get_url=skip_get_url,
)

captured = capsys.readouterr()
assert "Exception encountered: Unable to update input.json file for the council." in captured.out
assert (
"Exception encountered: Unable to update input.json file for the council."
in captured.out
)
assert "Please check you're running developer mode" in captured.out


def test_create_webdriver_local():
result = create_webdriver(None)
assert result.name == 'chrome'
assert result.name == "chrome"


def test_create_webdriver_remote():
result = create_webdriver("http://selenium:4444")
assert result.name == 'chrome'
assert result.name == "chrome"
10 changes: 7 additions & 3 deletions uk_bin_collection/uk_bin_collection/collect_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

_LOGGER = logging.getLogger(__name__)


# Dynamically importing the council processor
def import_council_module(module_name, src_path="councils"):
module_path = os.path.realpath(os.path.join(os.path.dirname(__file__), src_path))
Expand All @@ -24,7 +25,9 @@ def __init__(self):
self.parsed_args = None

def setup_arg_parser(self):
self.parser = argparse.ArgumentParser(description="UK Bin Collection Data Parser")
self.parser = argparse.ArgumentParser(
description="UK Bin Collection Data Parser"
)
self.parser.add_argument(
"module", type=str, help="Name of council module to use"
)
Expand Down Expand Up @@ -60,7 +63,7 @@ def setup_arg_parser(self):
)
self.parser.add_argument(
"--headless",
action="store_true",
action="store_false",
help="Should Selenium be headless. Defaults to true. Can be set to false to debug council",
required=False,
)
Expand Down Expand Up @@ -110,8 +113,9 @@ def client_code(self, get_bin_data_class, address_url, **kwargs) -> None:
"""
return get_bin_data_class.template_method(address_url, **kwargs)


if __name__ == "__main__":
_LOGGER = setup_logging(LOGGING_CONFIG, None)
app = UKBinCollectionApp()
app.set_args(sys.argv[1:])
print(app.run())
print(app.run())
7 changes: 4 additions & 3 deletions uk_bin_collection/uk_bin_collection/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,15 @@ def validate_dates(bin_dates: dict) -> dict:
raise NotImplementedError()
# If a date is in December and the next is in January, increase the year


def contains_date(string, fuzzy=False) -> bool:
"""
Return whether the string can be interpreted as a date.
:param string: str, string to check for date
:param fuzzy: bool, ignore unknown tokens in string if True
"""
try:
try:
parse(string, fuzzy=fuzzy)
return True

Expand All @@ -268,8 +269,8 @@ def create_webdriver(web_driver: str, headless: bool) -> webdriver.Chrome:
"""
# Set up Selenium to run 'headless'
options = webdriver.ChromeOptions()
if headless is None:
options.add_argument("--headless")
if headless is True:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
options.add_argument("--start-maximized")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def parse_data(self, page: str, **kwargs) -> dict:
user_paon = kwargs.get("paon")
user_postcode = kwargs.get("postcode")
web_driver = kwargs.get("web_driver")
headless= kwargs.get("headless")
headless = kwargs.get("headless")

# Create Selenium webdriver
driver = create_webdriver(web_driver, headless)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def parse_data(self, page: str, **kwargs) -> dict:
data = {"bins": []}
uprn = kwargs.get("uprn")
web_driver = kwargs.get("web_driver")
headless= kwargs.get("headless")
headless = kwargs.get("headless")
current_month = datetime.today().strftime("%m")
current_year = datetime.today().strftime("%Y")
url = (
f"https://mybins.blackburn.gov.uk/api/mybins/getbincollectiondays?uprn={uprn}&month={current_month}"
f"&year={current_year}"
)
driver = create_webdriver(web_driver,headless)
driver = create_webdriver(web_driver, headless)
driver.get(url)

soup = BeautifulSoup(driver.page_source, "html.parser")
Expand Down
4 changes: 2 additions & 2 deletions uk_bin_collection/uk_bin_collection/councils/BoltonCouncil.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def parse_data(self, page: str, **kwargs) -> dict:
user_postcode = kwargs.get("postcode")
check_postcode(user_postcode)
web_driver = kwargs.get("web_driver")
headless= kwargs.get("headless")
headless = kwargs.get("headless")

data = {"bins": []}

# Get our initial session running
page = "https://carehomes.bolton.gov.uk/bins.aspx"

driver = create_webdriver(web_driver,headless)
driver = create_webdriver(web_driver, headless)
driver.get(page)

# If you bang in the house number (or property name) and postcode in the box it should find your property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def parse_data(self, page: str, **kwargs) -> dict:
user_paon = kwargs.get("paon")
postcode = kwargs.get("postcode")
web_driver = kwargs.get("web_driver")
headless= kwargs.get("headless")
driver = create_webdriver(web_driver,headless)
headless = kwargs.get("headless")
driver = create_webdriver(web_driver, headless)
driver.get(kwargs.get("url"))

wait = WebDriverWait(driver, 60)
Expand Down
Loading

0 comments on commit c810759

Please sign in to comment.