diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 740b5f9..93f938d 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -9,7 +9,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ["3.11"] + python-version: ["3.12"] steps: - uses: actions/checkout@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7dcb7e3..461f3d0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,13 @@ repos: -- repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.0.261' + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.9 hooks: - id: ruff args: [--fix, --show-fixes, --exit-non-zero-on-fix] exclude: ^.*\b(assets)\b.*$ -- repo: https://github.com/pre-commit/pre-commit-hooks + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.3.0 hooks: - id: end-of-file-fixer diff --git a/custom_components/hikvision_next/__init__.py b/custom_components/hikvision_next/__init__.py index 04cd1bf..f28256a 100644 --- a/custom_components/hikvision_next/__init__.py +++ b/custom_components/hikvision_next/__init__.py @@ -14,24 +14,14 @@ ) from homeassistant.components.switch import ENTITY_ID_FORMAT as SWITCH_ENTITY_ID_FORMAT from homeassistant.config_entries import ConfigEntry -from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, Platform +from homeassistant.const import Platform from homeassistant.core import HomeAssistant from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers import device_registry as dr, entity_registry as er -from homeassistant.helpers.httpx_client import get_async_client from homeassistant.helpers.typing import ConfigType -from .const import ( - ALARM_SERVER_PATH, - DATA_ALARM_SERVER_HOST, - DATA_ISAPI, - DATA_SET_ALARM_SERVER, - DOMAIN, - EVENTS_COORDINATOR, - SECONDARY_COORDINATOR, -) -from .coordinator import EventsCoordinator, SecondaryCoordinator -from .isapi import ISAPI +from .const import DOMAIN +from .hikvision_device import HikvisionDevice from .notifications import EventNotificationsView from .services import setup_services @@ -45,6 +35,8 @@ Platform.IMAGE, ] +type HikvisionConfigEntry = ConfigEntry[HikvisionDevice] + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the Hikvision component.""" @@ -54,58 +46,35 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: +async def async_setup_entry(hass: HomeAssistant, entry: HikvisionConfigEntry) -> bool: """Set up integration from a config entry.""" - hass.data.setdefault(DOMAIN, {}) - - host = entry.data[CONF_HOST] - username = entry.data[CONF_USERNAME] - password = entry.data[CONF_PASSWORD] - session = get_async_client(hass) - isapi = ISAPI(host, username, password, session) - isapi.pending_initialization = True + device = HikvisionDevice(hass, entry) + device.pending_initialization = True try: - await isapi.get_hardware_info() - await isapi.get_cameras() - device_info = isapi.hass_device_info() + await device.get_hardware_info() + device_info = device.hass_device_info() device_registry = dr.async_get(hass) device_registry.async_get_or_create(config_entry_id=entry.entry_id, **device_info) except (TimeoutError, TimeoutException) as ex: - raise ConfigEntryNotReady(f"Timeout while connecting to {host}. Cannot initialize {DOMAIN}") from ex + raise ConfigEntryNotReady(f"Timeout while connecting to {device.host}. Cannot initialize {DOMAIN}") from ex except Exception as ex: # pylint: disable=broad-except - msg = f"Cannot initialize {DOMAIN} {host}. Error: {ex}\n" + msg = f"Cannot initialize {DOMAIN} {device.host}. Error: {ex}\n" _LOGGER.error(msg + traceback.format_exc()) raise ConfigEntryNotReady(msg) from ex - coordinators = {} - - coordinators[EVENTS_COORDINATOR] = EventsCoordinator(hass, isapi) - - if isapi.device_info.support_holiday_mode or isapi.device_info.support_alarm_server: - coordinators[SECONDARY_COORDINATOR] = SecondaryCoordinator(hass, isapi) + entry.runtime_data = device - hass.data[DOMAIN][entry.entry_id] = { - DATA_SET_ALARM_SERVER: entry.data[DATA_SET_ALARM_SERVER], - DATA_ALARM_SERVER_HOST: entry.data[DATA_ALARM_SERVER_HOST], - DATA_ISAPI: isapi, - **coordinators, - } - - if entry.data[DATA_SET_ALARM_SERVER] and isapi.device_info.support_alarm_server: - await isapi.set_alarm_server(entry.data[DATA_ALARM_SERVER_HOST], ALARM_SERVER_PATH) - - for coordinator in coordinators.values(): - await coordinator.async_config_entry_first_refresh() + await device.init_coordinators() await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) - isapi.pending_initialization = False + device.pending_initialization = False # Only initialise view once if multiple instances of integration if get_first_instance_unique_id(hass) == entry.unique_id: hass.http.register_view(EventNotificationsView(hass)) - refresh_disabled_entities_in_registry(hass, isapi) + refresh_disabled_entities_in_registry(hass, device) return True @@ -120,11 +89,9 @@ async def async_remove_config_entry_device(hass: HomeAssistant, config_entry, de return True -async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: +async def async_unload_entry(hass: HomeAssistant, entry: HikvisionConfigEntry) -> bool: """Unload a config entry.""" - config = hass.data[DOMAIN][entry.entry_id] - # Unload a config entry unload_ok = all( await asyncio.gather( @@ -133,13 +100,10 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: ) # Reset alarm server after it has been set - if config[DATA_SET_ALARM_SERVER]: - isapi = config[DATA_ISAPI] + device = entry.runtime_data + if device.control_alarm_server_host: with suppress(Exception): - await isapi.set_alarm_server("http://0.0.0.0:80", "/") - - if unload_ok: - hass.data[DOMAIN].pop(entry.entry_id) + await device.set_alarm_server("http://0.0.0.0:80", "/") return unload_ok @@ -176,7 +140,7 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry): return True -def refresh_disabled_entities_in_registry(hass: HomeAssistant, isapi: ISAPI): +def refresh_disabled_entities_in_registry(hass: HomeAssistant, device: HikvisionDevice): """Set disable state according to Notify Surveillance Center flag.""" def update_entity(event, ENTITY_ID_FORMAT): @@ -189,11 +153,11 @@ def update_entity(event, ENTITY_ID_FORMAT): entity_registry.async_update_entity(entity_id, disabled_by=disabled_by) entity_registry = er.async_get(hass) - for camera in isapi.cameras: + for camera in device.cameras: for event in camera.events_info: update_entity(event, SWITCH_ENTITY_ID_FORMAT) update_entity(event, BINARY_SENSOR_ENTITY_ID_FORMAT) - for event in isapi.device_info.events_info: + for event in device.events_info: update_entity(event, SWITCH_ENTITY_ID_FORMAT) update_entity(event, BINARY_SENSOR_ENTITY_ID_FORMAT) diff --git a/custom_components/hikvision_next/binary_sensor.py b/custom_components/hikvision_next/binary_sensor.py index f0d24f4..89ae15d 100644 --- a/custom_components/hikvision_next/binary_sensor.py +++ b/custom_components/hikvision_next/binary_sensor.py @@ -3,31 +3,36 @@ from __future__ import annotations from homeassistant.components.binary_sensor import ENTITY_ID_FORMAT, BinarySensorEntity -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback -from .const import DATA_ISAPI, DOMAIN, EVENTS, EVENT_IO +from . import HikvisionConfigEntry +from .isapi.const import EVENT_IO +from .const import EVENTS +from .hikvision_device import HikvisionDevice from .isapi import EventInfo -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None: +async def async_setup_entry( + hass: HomeAssistant, + entry: HikvisionConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: """Add binary sensors for hikvision events states.""" - config = hass.data[DOMAIN][entry.entry_id] - isapi = config[DATA_ISAPI] + device = entry.runtime_data entities = [] # Camera Events - for camera in isapi.cameras: + for camera in device.cameras: for event in camera.events_info: - entities.append(EventBinarySensor(isapi, camera.id, event)) + entities.append(EventBinarySensor(device, camera.id, event)) # NVR Events - if isapi.device_info.is_nvr: - for event in isapi.device_info.events_info: - entities.append(EventBinarySensor(isapi, 0, event)) + if device.device_info.is_nvr: + for event in device.events_info: + entities.append(EventBinarySensor(device, 0, event)) async_add_entities(entities) @@ -38,7 +43,7 @@ class EventBinarySensor(BinarySensorEntity): _attr_has_entity_name = True _attr_is_on = False - def __init__(self, isapi, device_id: int, event: EventInfo) -> None: + def __init__(self, device: HikvisionDevice, device_id: int, event: EventInfo) -> None: """Initialize.""" self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) self._attr_unique_id = self.entity_id @@ -46,5 +51,5 @@ def __init__(self, isapi, device_id: int, event: EventInfo) -> None: if event.id == EVENT_IO: self._attr_translation_placeholders = {"io_port_id": event.io_port_id} self._attr_device_class = EVENTS[event.id]["device_class"] - self._attr_device_info = isapi.hass_device_info(device_id) + self._attr_device_info = device.hass_device_info(device_id) self._attr_entity_registry_enabled_default = not event.disabled diff --git a/custom_components/hikvision_next/camera.py b/custom_components/hikvision_next/camera.py index da5cac9..66cc8a7 100644 --- a/custom_components/hikvision_next/camera.py +++ b/custom_components/hikvision_next/camera.py @@ -3,25 +3,26 @@ from __future__ import annotations from homeassistant.components.camera import Camera, CameraEntityFeature -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.util import slugify -from .const import DATA_ISAPI, DOMAIN -from .isapi import ISAPI, AnalogCamera, CameraStreamInfo, IPCamera +from . import HikvisionConfigEntry +from .hikvision_device import HikvisionDevice +from .isapi import AnalogCamera, CameraStreamInfo, IPCamera -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None: +async def async_setup_entry( + hass: HomeAssistant, entry: HikvisionConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: """Set up a Hikvision IP Camera.""" - config = hass.data[DOMAIN][entry.entry_id] - isapi: ISAPI = config[DATA_ISAPI] + device = entry.runtime_data entities = [] - for camera in isapi.cameras: + for camera in device.cameras: for stream in camera.streams: - entities.append(HikvisionCamera(isapi, camera, stream)) + entities.append(HikvisionCamera(device, camera, stream)) async_add_entities(entities) @@ -33,17 +34,15 @@ class HikvisionCamera(Camera): def __init__( self, - isapi: ISAPI, + device: HikvisionDevice, camera: AnalogCamera | IPCamera, stream_info: CameraStreamInfo, ) -> None: """Initialize Hikvision camera stream.""" Camera.__init__(self) - self._attr_device_info = isapi.hass_device_info(camera.id) - self._attr_unique_id = slugify( - f"{isapi.device_info.serial_no.lower()}_{stream_info.id}" - ) + self._attr_device_info = device.hass_device_info(camera.id) + self._attr_unique_id = slugify(f"{device.device_info.serial_no.lower()}_{stream_info.id}") if stream_info.type_id > 1: self._attr_has_entity_name = True self._attr_translation_key = f"stream{stream_info.type_id}" @@ -52,13 +51,13 @@ def __init__( # for the main stream use just its name self._attr_name = camera.name self.entity_id = f"camera.{self.unique_id}" - self.isapi = isapi + self.device = device self.stream_info = stream_info async def stream_source(self) -> str | None: """Return the source of the stream.""" - return self.isapi.get_stream_source(self.stream_info) + return self.device.get_stream_source(self.stream_info) async def async_camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None: """Return a still image response from the camera.""" - return await self.isapi.get_camera_image(self.stream_info, width, height) + return await self.device.get_camera_image(self.stream_info, width, height) diff --git a/custom_components/hikvision_next/config_flow.py b/custom_components/hikvision_next/config_flow.py index 33d9330..f89d33c 100755 --- a/custom_components/hikvision_next/config_flow.py +++ b/custom_components/hikvision_next/config_flow.py @@ -16,8 +16,8 @@ from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers.httpx_client import get_async_client -from .const import DATA_ALARM_SERVER_HOST, DATA_SET_ALARM_SERVER, DOMAIN -from .isapi import ISAPI +from .const import CONF_ALARM_SERVER_HOST, CONF_SET_ALARM_SERVER, DOMAIN +from .isapi import ISAPIClient _LOGGER = logging.getLogger(__name__) @@ -38,12 +38,12 @@ async def get_schema(self, user_input: dict[str, Any]): vol.Required(CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")): str, vol.Required(CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")): str, vol.Required( - DATA_SET_ALARM_SERVER, - default=user_input.get(DATA_SET_ALARM_SERVER, True), + CONF_SET_ALARM_SERVER, + default=user_input.get(CONF_SET_ALARM_SERVER, True), ): bool, vol.Required( - DATA_ALARM_SERVER_HOST, - default=user_input.get(DATA_ALARM_SERVER_HOST, f"http://{local_ip}:8123"), + CONF_ALARM_SERVER_HOST, + default=user_input.get(CONF_ALARM_SERVER_HOST, f"http://{local_ip}:8123"), ): str, } ) @@ -64,7 +64,7 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Flo } session = get_async_client(self.hass) - isapi = ISAPI(host, username, password, session) + isapi = ISAPIClient(host, username, password, session) await isapi.get_device_info() if self._reauth_entry: diff --git a/custom_components/hikvision_next/const.py b/custom_components/hikvision_next/const.py index 3227e10..dfbe6a8 100644 --- a/custom_components/hikvision_next/const.py +++ b/custom_components/hikvision_next/const.py @@ -4,111 +4,64 @@ from homeassistant.components.binary_sensor import BinarySensorDeviceClass +from .isapi.const import EVENTS as ISAPI_EVENTS + DOMAIN: Final = "hikvision_next" -DATA_SET_ALARM_SERVER: Final = "set_alarm_server" -DATA_ALARM_SERVER_HOST: Final = "alarm_server" -DATA_ISAPI = "isapi" +CONF_SET_ALARM_SERVER: Final = "set_alarm_server" +CONF_ALARM_SERVER_HOST: Final = "alarm_server" ALARM_SERVER_PATH = "/api/hikvision" EVENTS_COORDINATOR: Final = "events" SECONDARY_COORDINATOR: Final = "secondary" HOLIDAY_MODE = "holiday_mode" -CONNECTION_TYPE_DIRECT = "Direct" -CONNECTION_TYPE_PROXIED = "Proxied" - ATTR_CONFIG_ENTRY_ID = "config_entry_id" ACTION_REBOOT = "reboot" ACTION_ISAPI_REQUEST = "isapi_request" ACTION_UPDATE_SNAPSHOT = "update_snapshot" HIKVISION_EVENT = f"{DOMAIN}_event" -EVENT_BASIC: Final = "basic" -EVENT_IO: Final = "io" -EVENT_SMART: Final = "smart" -EVENT_PIR: Final = "pir" + EVENTS = { "motiondetection": { - "type": EVENT_BASIC, - "label": "Motion", - "slug": "motionDetection", - "mutex": True, + **ISAPI_EVENTS["motiondetection"], "device_class": BinarySensorDeviceClass.MOTION, }, "tamperdetection": { - "type": EVENT_BASIC, - "label": "Video Tampering", - "slug": "tamperDetection", + **ISAPI_EVENTS["tamperdetection"], "device_class": BinarySensorDeviceClass.TAMPER, }, "videoloss": { - "type": EVENT_BASIC, - "label": "Video Loss", - "slug": "videoLoss", + **ISAPI_EVENTS["videoloss"], "device_class": BinarySensorDeviceClass.PROBLEM, }, "scenechangedetection": { - "type": EVENT_SMART, - "label": "Scene Change", - "slug": "SceneChangeDetection", - "mutex": True, + **ISAPI_EVENTS["scenechangedetection"], "device_class": BinarySensorDeviceClass.TAMPER, }, "fielddetection": { - "type": EVENT_SMART, - "label": "Intrusion", - "slug": "FieldDetection", - "mutex": True, + **ISAPI_EVENTS["fielddetection"], "device_class": BinarySensorDeviceClass.MOTION, }, "linedetection": { - "type": EVENT_SMART, - "label": "Line Crossing", - "slug": "LineDetection", - "mutex": True, + **ISAPI_EVENTS["linedetection"], "device_class": BinarySensorDeviceClass.MOTION, }, "regionentrance": { - "type": EVENT_SMART, - "label": "Region Entrance", - "slug": "regionEntrance", + **ISAPI_EVENTS["regionentrance"], "device_class": BinarySensorDeviceClass.MOTION, }, "regionexiting": { - "type": EVENT_SMART, - "label": "Region Exiting", - "slug": "regionExiting", + **ISAPI_EVENTS["regionexiting"], "device_class": BinarySensorDeviceClass.MOTION, }, "io": { - "type": EVENT_IO, - "label": "Alarm Input", - "slug": "inputs", - "direct_node": "IOInputPort", - "proxied_node": "IOProxyInputPort", + **ISAPI_EVENTS["io"], "device_class": BinarySensorDeviceClass.MOTION, }, "pir": { - "type": EVENT_PIR, - "label": "PIR", - "slug": "WLAlarm/PIR", - "direct_node": "PIRAlarm", + **ISAPI_EVENTS["pir"], "device_class": BinarySensorDeviceClass.MOTION, - } -} - -EVENTS_ALTERNATE_ID = { - "vmd": "motiondetection", - "shelteralarm": "tamperdetection", - "VMDHumanVehicle": "motiondetection", -} - -MUTEX_ALTERNATE_IDS = {"motiondetection": "VMDHumanVehicle"} - -STREAM_TYPE = { - 1: "Main Stream", - 2: "Sub-stream", - 3: "Third Stream", - 4: "Transcoded Stream", + }, } diff --git a/custom_components/hikvision_next/coordinator.py b/custom_components/hikvision_next/coordinator.py index 72dc854..9355735 100644 --- a/custom_components/hikvision_next/coordinator.py +++ b/custom_components/hikvision_next/coordinator.py @@ -1,4 +1,4 @@ -"""Coordinators""" +"""Coordinators.""" from __future__ import annotations @@ -11,8 +11,7 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.util import slugify -from .const import DATA_ALARM_SERVER_HOST, DOMAIN, HOLIDAY_MODE -from .isapi import ISAPI +from .const import CONF_ALARM_SERVER_HOST, DOMAIN, HOLIDAY_MODE SCAN_INTERVAL_EVENTS = timedelta(seconds=120) SCAN_INTERVAL_HOLIDAYS = timedelta(minutes=60) @@ -23,9 +22,9 @@ class EventsCoordinator(DataUpdateCoordinator): """Manage fetching events state from NVR or camera.""" - def __init__(self, hass: HomeAssistant, isapi: ISAPI) -> None: + def __init__(self, hass: HomeAssistant, device) -> None: """Initialize.""" - self.isapi = isapi + self.device = device super().__init__( hass, @@ -40,41 +39,41 @@ async def _async_update_data(self): data = {} # Get camera event status - for camera in self.isapi.cameras: + for camera in self.device.cameras: for event in camera.events_info: if event.disabled: continue try: _id = ENTITY_ID_FORMAT.format(event.unique_id) - data[_id] = await self.isapi.get_event_enabled_state(event) + data[_id] = await self.device.get_event_enabled_state(event) except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for {event.id}") + self.device.handle_exception(ex, f"Cannot fetch state for {event.id}") # Get NVR event status - for event in self.isapi.device_info.events_info: + for event in self.device.events_info: if event.disabled: continue try: _id = ENTITY_ID_FORMAT.format(event.unique_id) - data[_id] = await self.isapi.get_event_enabled_state(event) + data[_id] = await self.device.get_event_enabled_state(event) except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for {event.id}") + self.device.handle_exception(ex, f"Cannot fetch state for {event.id}") # Get output port(s) status - for i in range(1, self.isapi.device_info.output_ports + 1): + for i in range(1, self.device.capabilities.output_ports + 1): try: _id = ENTITY_ID_FORMAT.format( - f"{slugify(self.isapi.device_info.serial_no.lower())}_{i}_alarm_output" + f"{slugify(self.device.device_info.serial_no.lower())}_{i}_alarm_output" ) - data[_id] = await self.isapi.get_port_status("output", i) + data[_id] = await self.device.get_io_port_status("output", i) except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for alarm output {i}") + self.device.handle_exception(ex, f"Cannot fetch state for alarm output {i}") # Refresh HDD data try: - self.isapi.device_info.storage = await self.isapi.get_storage_devices() + self.device.storage = await self.device.get_storage_devices() except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, "Cannot fetch state for HDD") + self.device.handle_exception(ex, "Cannot fetch storage state") return data @@ -82,9 +81,9 @@ async def _async_update_data(self): class SecondaryCoordinator(DataUpdateCoordinator): """Manage fetching events state from NVR.""" - def __init__(self, hass: HomeAssistant, isapi: ISAPI) -> None: + def __init__(self, hass: HomeAssistant, device) -> None: """Initialize.""" - self.isapi = isapi + self.device = device super().__init__( hass, @@ -98,14 +97,14 @@ async def _async_update_data(self): async with asyncio.timeout(20): data = {} try: - if self.isapi.device_info.support_holiday_mode: - data[HOLIDAY_MODE] = await self.isapi.get_holiday_enabled_state() + if self.device.capabilities.support_holiday_mode: + data[HOLIDAY_MODE] = await self.device.get_holiday_enabled_state() except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for {HOLIDAY_MODE}") + self.device.handle_exception(ex, f"Cannot fetch state for {HOLIDAY_MODE}") try: - if self.isapi.device_info.support_alarm_server: - alarm_server = await self.isapi.get_alarm_server() - data[DATA_ALARM_SERVER_HOST] = alarm_server + if self.device.capabilities.support_alarm_server: + alarm_server = await self.device.get_alarm_server() + data[CONF_ALARM_SERVER_HOST] = alarm_server except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for {DATA_ALARM_SERVER_HOST}") + self.device.handle_exception(ex, f"Cannot fetch state for {CONF_ALARM_SERVER_HOST}") return data diff --git a/custom_components/hikvision_next/diagnostics.py b/custom_components/hikvision_next/diagnostics.py index 5831e78..95162eb 100644 --- a/custom_components/hikvision_next/diagnostics.py +++ b/custom_components/hikvision_next/diagnostics.py @@ -9,13 +9,11 @@ from httpx import HTTPStatusError -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.device_registry import DeviceEntry -from .const import DATA_ISAPI, DOMAIN, STREAM_TYPE - -GET = "get" +from . import HikvisionConfigEntry +from .isapi.const import GET, STREAM_TYPE def anonymise_mac(orignal: str): @@ -56,7 +54,7 @@ def anonymise_serial(orignal: str): anon_map = {} -async def async_get_config_entry_diagnostics(hass: HomeAssistant, entry: ConfigEntry) -> dict[str, Any]: +async def async_get_config_entry_diagnostics(hass: HomeAssistant, entry: HikvisionConfigEntry) -> dict[str, Any]: """Return diagnostics for a config entry.""" return await _async_get_diagnostics(hass, entry) @@ -64,10 +62,10 @@ async def async_get_config_entry_diagnostics(hass: HomeAssistant, entry: ConfigE @callback async def _async_get_diagnostics( hass: HomeAssistant, - entry: ConfigEntry, + entry: HikvisionConfigEntry, device: DeviceEntry | None = None, ) -> dict[str, Any]: - isapi = hass.data[DOMAIN][entry.entry_id][DATA_ISAPI] + device = entry.runtime_data # Get info set info = {} @@ -93,18 +91,18 @@ async def _async_get_diagnostics( ] for endpoint in endpoints: - responses[endpoint] = await get_isapi_data(isapi, endpoint) + responses[endpoint] = await get_isapi_data(device, endpoint) # channels - for camera in isapi.cameras: + for camera in device.cameras: for stream_type_id in STREAM_TYPE: endpoint = f"Streaming/channels/{camera.id}0{stream_type_id}" - responses[endpoint] = await get_isapi_data(isapi, endpoint) + responses[endpoint] = await get_isapi_data(device, endpoint) # event states - for camera in isapi.cameras: + for camera in device.cameras: for event in camera.events_info: - responses[event.url] = await get_isapi_data(isapi, event.url) + responses[event.url] = await get_isapi_data(device, event.url) info["ISAPI"] = responses return info diff --git a/custom_components/hikvision_next/hikvision_device.py b/custom_components/hikvision_next/hikvision_device.py new file mode 100644 index 0000000..fe96d49 --- /dev/null +++ b/custom_components/hikvision_next/hikvision_device.py @@ -0,0 +1,152 @@ +"ISAPI client for Home Assistant integration." + +import asyncio +from http import HTTPStatus +import logging + +import httpx + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers import device_registry as dr +from homeassistant.helpers.entity import DeviceInfo +from homeassistant.helpers.httpx_client import get_async_client +from homeassistant.util import slugify + +from .isapi.const import CONNECTION_TYPE_DIRECT, EVENT_IO +from .const import ( + ALARM_SERVER_PATH, + CONF_ALARM_SERVER_HOST, + CONF_SET_ALARM_SERVER, + DOMAIN, + EVENTS, + EVENTS_COORDINATOR, + SECONDARY_COORDINATOR, +) +from .coordinator import EventsCoordinator, SecondaryCoordinator +from .isapi import EventInfo, ISAPIClient, IPCamera + +_LOGGER = logging.getLogger(__name__) + + +class HikvisionDevice(ISAPIClient): + """Hikvision device for Home Assistant integration.""" + + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + """Initialize device.""" + + self.hass = hass + self.control_alarm_server_host = entry.data[CONF_SET_ALARM_SERVER] + self.alarm_server_host = entry.data[CONF_ALARM_SERVER_HOST] + + # init ISAPI client + host = entry.data[CONF_HOST] + username = entry.data[CONF_USERNAME] + password = entry.data[CONF_PASSWORD] + session = get_async_client(hass) + super().__init__(host, username, password, session) + + self.events_info: list[EventInfo] = [] + + async def init_coordinators(self): + """Initialize coordinators.""" + + # init events supported by integration + self.events_info = self.get_device_event_capabilities() + for camera in self.cameras: + camera.events_info = self.get_device_event_capabilities(camera.id, camera.connection_type) + + # create coordinators + self.coordinators = {} + self.coordinators[EVENTS_COORDINATOR] = EventsCoordinator(self.hass, self) + if ( + self.capabilities.support_holiday_mode + or self.capabilities.support_alarm_server + or self.capabilities.storage + ): + self.coordinators[SECONDARY_COORDINATOR] = SecondaryCoordinator(self.hass, self) + + if self.control_alarm_server_host and self.capabilities.support_alarm_server: + await self.set_alarm_server(self.alarm_server_host, ALARM_SERVER_PATH) + + # first data fetch + for coordinator in self.coordinators.values(): + await coordinator.async_config_entry_first_refresh() + + def hass_device_info(self, camera_id: int = 0) -> DeviceInfo: + """Return Home Assistant entity device information.""" + if camera_id == 0: + return DeviceInfo( + manufacturer=self.device_info.manufacturer, + identifiers={(DOMAIN, self.device_info.serial_no)}, + connections={(dr.CONNECTION_NETWORK_MAC, self.device_info.mac_address)}, + model=self.device_info.model, + name=self.device_info.name, + sw_version=self.device_info.firmware, + ) + else: + camera_info = self.get_camera_by_id(camera_id) + is_ip_camera = isinstance(camera_info, IPCamera) + + return DeviceInfo( + manufacturer=self.device_info.manufacturer, + identifiers={(DOMAIN, camera_info.serial_no)}, + model=camera_info.model, + name=camera_info.name, + sw_version=camera_info.firmware if is_ip_camera else "Unknown", + via_device=(DOMAIN, self.device_info.serial_no) if self.device_info.is_nvr else None, + ) + + def get_device_event_capabilities( + self, + camera_id: int | None = None, + connection_type: str = CONNECTION_TYPE_DIRECT, + ) -> list[EventInfo]: + """Get events info handled by integration (camera id: NVR = None, camera > 0).""" + events = [] + + if camera_id is None: # NVR + integration_supported_events = [ + s for s in self.supported_events if (s.id in EVENTS and EVENTS[s.id].get("type") == EVENT_IO) + ] + else: # Camera + integration_supported_events = [ + s for s in self.supported_events if (s.channel_id == int(camera_id) and s.id in EVENTS) + ] + + for event in integration_supported_events: + # Build unique_id + device_id_param = f"_{camera_id}" if camera_id else "" + io_port_id_param = f"_{event.io_port_id}" if event.io_port_id != 0 else "" + unique_id = f"{slugify(self.device_info.serial_no.lower())}{device_id_param}{io_port_id_param}_{event.id}" + + if EVENTS.get(event.id): + event.unique_id = unique_id + event.disabled = "center" not in event.notifications # Disable if not set Notify Surveillance Center + events.append(event) + return events + + def handle_exception(self, ex: Exception, details: str = "") -> bool: + """Handle common exception, returns False if exception remains unhandled.""" + + def is_reauth_needed(): + if isinstance(ex, httpx.HTTPStatusError): + status_code = ex.response.status_code + if status_code in (HTTPStatus.UNAUTHORIZED,): + return True + return False + + host = self.host + if is_reauth_needed(): + # Re-establish session + self._session = None + self._auth_method = None + return True + + elif isinstance(ex, (asyncio.TimeoutError, httpx.TimeoutException)): + raise HomeAssistantError(f"Timeout while connecting to {host} {details}") from ex + + _LOGGER.warning("Unexpected exception | %s | %s", details, ex) + return False diff --git a/custom_components/hikvision_next/image.py b/custom_components/hikvision_next/image.py index 0272d30..d193312 100644 --- a/custom_components/hikvision_next/image.py +++ b/custom_components/hikvision_next/image.py @@ -7,7 +7,6 @@ from homeassistant.components.camera import Camera from homeassistant.components.image import ImageEntity -from homeassistant.config_entries import ConfigEntry from homeassistant.const import ATTR_ENTITY_ID, CONF_FILENAME from homeassistant.core import HomeAssistant from homeassistant.helpers import config_validation as cv, entity_platform @@ -15,23 +14,26 @@ from homeassistant.helpers.template import Template from homeassistant.util import slugify -from .const import ACTION_UPDATE_SNAPSHOT, DATA_ISAPI, DOMAIN -from .isapi import ISAPI, CameraStreamInfo +from . import HikvisionConfigEntry +from .const import ACTION_UPDATE_SNAPSHOT +from .hikvision_device import HikvisionDevice +from .isapi import CameraStreamInfo _LOGGER = logging.getLogger(__name__) -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None: +async def async_setup_entry( + hass: HomeAssistant, entry: HikvisionConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: """Add images with snapshots.""" - config = hass.data[DOMAIN][entry.entry_id] - isapi: ISAPI = config[DATA_ISAPI] + device = entry.runtime_data entities = [] - for camera in isapi.cameras: + for camera in device.cameras: for stream in camera.streams: if stream.type_id == 1: - entities.append(SnapshotFile(hass, isapi, camera, stream)) + entities.append(SnapshotFile(hass, device, camera, stream)) async_add_entities(entities) @@ -52,7 +54,7 @@ class SnapshotFile(ImageEntity): def __init__( self, hass: HomeAssistant, - isapi: ISAPI, + device: HikvisionDevice, camera: Camera, stream_info: CameraStreamInfo, ) -> None: @@ -60,7 +62,7 @@ def __init__( ImageEntity.__init__(self, hass) - self._attr_unique_id = slugify(f"{isapi.device_info.serial_no.lower()}_{stream_info.id}_snapshot") + self._attr_unique_id = slugify(f"{device.device_info.serial_no.lower()}_{stream_info.id}_snapshot") self.entity_id = f"camera.{self.unique_id}" self._attr_translation_key = "snapshot" self._attr_translation_placeholders = {"camera": camera.name} diff --git a/custom_components/hikvision_next/isapi/__init__.py b/custom_components/hikvision_next/isapi/__init__.py new file mode 100644 index 0000000..9786a52 --- /dev/null +++ b/custom_components/hikvision_next/isapi/__init__.py @@ -0,0 +1,2 @@ +from .isapi import ISAPIClient, SetEventStateMutexError # noqa: F401 +from .models import EventInfo, IPCamera, AlertInfo, StorageInfo, AnalogCamera, CameraStreamInfo # noqa: F401 diff --git a/custom_components/hikvision_next/isapi/const.py b/custom_components/hikvision_next/isapi/const.py new file mode 100644 index 0000000..944da06 --- /dev/null +++ b/custom_components/hikvision_next/isapi/const.py @@ -0,0 +1,88 @@ +from typing import Final + +GET = "GET" +PUT = "PUT" +POST = "POST" + +CONNECTION_TYPE_DIRECT = "Direct" +CONNECTION_TYPE_PROXIED = "Proxied" + +EVENT_BASIC: Final = "basic" +EVENT_IO: Final = "io" +EVENT_SMART: Final = "smart" +EVENT_PIR: Final = "pir" +EVENTS = { + "motiondetection": { + "type": EVENT_BASIC, + "label": "Motion", + "slug": "motionDetection", + "mutex": True, + }, + "tamperdetection": { + "type": EVENT_BASIC, + "label": "Video Tampering", + "slug": "tamperDetection", + }, + "videoloss": { + "type": EVENT_BASIC, + "label": "Video Loss", + "slug": "videoLoss", + }, + "scenechangedetection": { + "type": EVENT_SMART, + "label": "Scene Change", + "slug": "SceneChangeDetection", + "mutex": True, + }, + "fielddetection": { + "type": EVENT_SMART, + "label": "Intrusion", + "slug": "FieldDetection", + "mutex": True, + }, + "linedetection": { + "type": EVENT_SMART, + "label": "Line Crossing", + "slug": "LineDetection", + "mutex": True, + }, + "regionentrance": { + "type": EVENT_SMART, + "label": "Region Entrance", + "slug": "regionEntrance", + }, + "regionexiting": { + "type": EVENT_SMART, + "label": "Region Exiting", + "slug": "regionExiting", + }, + "io": { + "type": EVENT_IO, + "label": "Alarm Input", + "slug": "inputs", + "direct_node": "IOInputPort", + "proxied_node": "IOProxyInputPort", + }, + "pir": { + "type": EVENT_PIR, + "label": "PIR", + "slug": "WLAlarm/PIR", + "direct_node": "PIRAlarm", + }, +} + +STREAM_TYPE = { + 1: "Main Stream", + 2: "Sub-stream", + 3: "Third Stream", + 4: "Transcoded Stream", +} + + +EVENTS_ALTERNATE_ID = { + "vmd": "motiondetection", + "shelteralarm": "tamperdetection", + "VMDHumanVehicle": "motiondetection", +} + +MUTEX_ALTERNATE_ID = {"motiondetection": "VMDHumanVehicle"} diff --git a/custom_components/hikvision_next/isapi.py b/custom_components/hikvision_next/isapi/isapi.py similarity index 61% rename from custom_components/hikvision_next/isapi.py rename to custom_components/hikvision_next/isapi/isapi.py index 6fa1d63..55a0da8 100644 --- a/custom_components/hikvision_next/isapi.py +++ b/custom_components/hikvision_next/isapi/isapi.py @@ -2,204 +2,83 @@ from __future__ import annotations -import asyncio from contextlib import suppress -from dataclasses import dataclass, field import datetime -from functools import reduce -from http import HTTPStatus import json import logging -from typing import Any, Optional -from urllib.parse import quote, urlparse +from typing import Any, AsyncIterator +from urllib.parse import quote, urljoin, urlparse import httpx -from httpx import HTTPStatusError, TimeoutException +from httpx import HTTPStatusError import xmltodict -from homeassistant.exceptions import HomeAssistantError -from homeassistant.helpers import device_registry as dr -from homeassistant.helpers.entity import DeviceInfo -from homeassistant.util import slugify - from .const import ( CONNECTION_TYPE_DIRECT, CONNECTION_TYPE_PROXIED, - DOMAIN, EVENT_BASIC, EVENT_IO, EVENT_PIR, EVENTS, EVENTS_ALTERNATE_ID, - MUTEX_ALTERNATE_IDS, + GET, + MUTEX_ALTERNATE_ID, + POST, + PUT, STREAM_TYPE, ) -from .isapi_client import ISAPI_Client +from .models import ( + AlarmServer, + AlertInfo, + AnalogCamera, + CameraStreamInfo, + CapabilitiesInfo, + EventInfo, + IPCamera, + ISAPIDeviceInfo, + MutexIssue, + ProtocolsInfo, + StorageInfo, +) +from .utils import bool_to_str, deep_get, parse_isapi_response, str_to_bool Node = dict[str, Any] _LOGGER = logging.getLogger(__name__) -GET = "GET" -PUT = "PUT" -POST = "POST" - - -@dataclass -class AlarmServer: - """Holds alarm server info.""" - - # Uses pylint invalid names to not break previous versions - ipAddress: str # pylint: disable=invalid-name - portNo: int # pylint: disable=invalid-name - url: str # pylint: disable=invalid-name - protocolType: str # pylint: disable=invalid-name - - -@dataclass -class AlertInfo: - """Holds NVR/Camera event notification info.""" - - channel_id: int - io_port_id: int - event_id: str - device_serial_no: Optional[str] - mac: str = "" - region_id: int = 0 - detection_target: Optional[str] = None - - -@dataclass -class MutexIssue: - """Holds mutually exclusive event checking info.""" - - event_id: str - channels: list = field(default_factory=list) - - -@dataclass -class EventInfo: - """Holds event info of particular device.""" - - id: str - channel_id: int - io_port_id: int - unique_id: str - url: str - disabled: bool = False - - -@dataclass -class SupportedEventsInfo: - """Holds supported event info for NVR/IP Camera.""" - - channel_id: int - io_port_id: int - event_id: str - notifications: list[str] = field(default_factory=list) - - -@dataclass -class CameraStreamInfo: - """Holds info of a camera stream.""" - - id: int - name: str - type_id: int - type: str - enabled: bool - codec: str - width: int - height: int - audio: bool - use_alternate_picture_url: bool = False - - -@dataclass -class StorageInfo: - """Holds info for internal and NAS storage devices.""" - - id: int - name: str - type: str - status: str - capacity: int - freespace: int - property: str - ip: str = "" - - -@dataclass -class HikDeviceInfo: - """Holds info of an NVR/DVR or single IP Camera.""" - - name: str = "" - manufacturer: str = "" - model: str = "" - serial_no: str = "" - firmware: str = "" - mac_address: str = "" - ip_address: str = "" - device_type: str = "" - is_nvr: bool = False - support_analog_cameras: int = 0 - support_digital_cameras: int = 0 - support_holiday_mode: bool = False - support_alarm_server: bool = False - support_channel_zero: bool = False - support_event_mutex_checking: bool = False - input_ports: int = 0 - output_ports: int = 0 - rtsp_port: int = 554 - storage: list[StorageInfo] = field(default_factory=list) - events_info: list[EventInfo] = field(default_factory=list) - - -@dataclass -class AnalogCamera: - """Analog cameras info.""" - - id: int - name: str - model: str - serial_no: str - input_port: int - connection_type: str - streams: list[CameraStreamInfo] = field(default_factory=list) - events_info: list[EventInfo] = field(default_factory=list) - - -@dataclass -class IPCamera(AnalogCamera): - """IP/Digital camera info.""" - - firmware: str = "" - ip_addr: str = "" - ip_port: int = 0 - - -class ISAPI: - """hikvisionapi async client wrapper.""" + +class ISAPIClient: + """Hikvision ISAPI client.""" def __init__( self, host: str, username: str, password: str, - session: Optional[httpx.AsyncClient] = None, + session: httpx.AsyncClient = None, ) -> None: """Initialize.""" - self.isapi = ISAPI_Client(host, username, password, session, timeout=20) + self.host = host - self.device_info = HikDeviceInfo() + self.username = username + self.password = password + self.timeout = 20 + self.isapi_prefix = "ISAPI" + self._session = session + self._auth_method: httpx._auth.Auth = None + + self.device_info = ISAPIDeviceInfo() + self.capabilities = CapabilitiesInfo() self.cameras: list[IPCamera | AnalogCamera] = [] - self.supported_events: list[SupportedEventsInfo] = [] + self.supported_events: list[EventInfo] = [] + self.storage: list[StorageInfo] = [] + self.protocols = ProtocolsInfo() self.pending_initialization = False async def get_device_info(self): """Get device info.""" hw_info = (await self.request(GET, "System/deviceInfo")).get("DeviceInfo", {}) - self.device_info = HikDeviceInfo( + self.device_info = ISAPIDeviceInfo( name=hw_info.get("deviceName"), manufacturer=str(hw_info.get("manufacturer", "Hikvision")).title(), model=hw_info.get("model"), @@ -211,43 +90,37 @@ async def get_device_info(self): ) async def get_hardware_info(self): - """Get base device data.""" - # Get base hw info + """Get device all data.""" await self.get_device_info() - - # Get device capabilities capabilities = (await self.request(GET, "System/capabilities")).get("DeviceCap", {}) - # Get all supported events to reduce isapi queries - self.supported_events = await self.get_supported_events(capabilities) - - # Set DeviceInfo - self.device_info.support_analog_cameras = int(deep_get(capabilities, "SysCap.VideoCap.videoInputPortNums", 0)) - self.device_info.support_digital_cameras = int(deep_get(capabilities, "RacmCap.inputProxyNums", 0)) - self.device_info.support_holiday_mode = str_to_bool(deep_get(capabilities, "SysCap.isSupportHolidy", "false")) - self.device_info.support_channel_zero = str_to_bool( + self.capabilities.support_analog_cameras = int(deep_get(capabilities, "SysCap.VideoCap.videoInputPortNums", 0)) + self.capabilities.support_digital_cameras = int(deep_get(capabilities, "RacmCap.inputProxyNums", 0)) + self.capabilities.support_holiday_mode = str_to_bool(deep_get(capabilities, "SysCap.isSupportHolidy", "false")) + self.capabilities.support_channel_zero = str_to_bool( deep_get(capabilities, "RacmCap.isSupportZeroChan", "false") ) - self.device_info.support_event_mutex_checking = str_to_bool( + self.capabilities.support_event_mutex_checking = str_to_bool( capabilities.get("isSupportGetmutexFuncErrMsg", "false") ) - self.device_info.input_ports = int(deep_get(capabilities, "SysCap.IOCap.IOInputPortNums", 0)) - self.device_info.output_ports = int(deep_get(capabilities, "SysCap.IOCap.IOOutputPortNums", 0)) - - with suppress(Exception): - self.device_info.storage = await self.get_storage_devices() - self.device_info.events_info = await self.get_device_event_capabilities( - self.supported_events, self.device_info.serial_no, 0 - ) - self.device_info.support_alarm_server = bool(await self.get_alarm_server()) - - await self.get_protocols() + self.capabilities.input_ports = int(deep_get(capabilities, "SysCap.IOCap.IOInputPortNums", 0)) + self.capabilities.output_ports = int(deep_get(capabilities, "SysCap.IOCap.IOOutputPortNums", 0)) + self.capabilities.support_alarm_server = bool(await self.get_alarm_server()) # Set if NVR based on whether more than 1 supported IP or analog cameras # Single IP camera will show 0 supported devices in total - if self.device_info.support_analog_cameras + self.device_info.support_digital_cameras > 1: + if self.capabilities.support_analog_cameras + self.capabilities.support_digital_cameras > 1: self.device_info.is_nvr = True + await self.get_cameras() + + self.supported_events = await self.get_supported_events(capabilities) + + await self.get_protocols() + + with suppress(Exception): + self.storage = await self.get_storage_devices() + async def get_cameras(self): """Get camera objects for all connected cameras.""" @@ -264,26 +137,17 @@ async def get_cameras(self): connection_type=CONNECTION_TYPE_DIRECT, ip_addr=self.device_info.ip_address, streams=await self.get_camera_streams(1), - events_info=await self.get_device_event_capabilities( - self.supported_events, - self.device_info.serial_no, - 1, - CONNECTION_TYPE_DIRECT, - ), ) ) else: # Get analog and digital cameras attached to NVR - if self.device_info.support_digital_cameras > 0: + if self.capabilities.support_digital_cameras > 0: digital_cameras = deep_get( (await self.request(GET, "ContentMgmt/InputProxy/channels")), "InputProxyChannelList.InputProxyChannel", [], ) - if not isinstance(digital_cameras, list): - digital_cameras = [digital_cameras] - for digital_camera in digital_cameras: camera_id = digital_camera.get("id") source = digital_camera.get("sourceInputPortDescriptor") @@ -309,26 +173,17 @@ async def get_cameras(self): ip_addr=source.get("ipAddress"), ip_port=source.get("managePortNo"), streams=await self.get_camera_streams(camera_id), - events_info=await self.get_device_event_capabilities( - self.supported_events, - self.device_info.serial_no, - camera_id, - CONNECTION_TYPE_PROXIED, - ), ) ) # Get analog cameras - if self.device_info.support_analog_cameras > 0: + if self.capabilities.support_analog_cameras > 0: analog_cameras = deep_get( (await self.request(GET, "System/Video/inputs/channels")), "VideoInputChannelList.VideoInputChannel", [], ) - if not isinstance(analog_cameras, list): - analog_cameras = [analog_cameras] - for analog_camera in analog_cameras: camera_id = analog_camera.get("id") device_serial_no = f"{self.device_info.serial_no}-VI{camera_id}" @@ -342,12 +197,6 @@ async def get_cameras(self): input_port=int(analog_camera.get("inputPort")), connection_type=CONNECTION_TYPE_DIRECT, streams=await self.get_camera_streams(camera_id), - events_info=await self.get_device_event_capabilities( - self.supported_events, - self.device_info.serial_no, - camera_id, - CONNECTION_TYPE_DIRECT, - ), ) ) @@ -361,47 +210,10 @@ async def get_protocols(self): for item in protocols: if item.get("protocol") == "RTSP" and item.get("portNo"): - self.device_info.rtsp_port = item.get("portNo") + self.protocols.rtsp_port = item.get("portNo") break - async def get_device_event_capabilities( - self, - supported_events: list[SupportedEventsInfo], - serial_no: str, - device_id: int, - connection_type: str = CONNECTION_TYPE_DIRECT, - ) -> list[EventInfo]: - """Get events info handled by integration (device id: NVR = 0, camera > 0).""" - events = [] - - if device_id == 0: # NVR - device_supported_events = [ - s for s in supported_events if (s.event_id in EVENTS and EVENTS[s.event_id].get("type") == EVENT_IO) - ] - else: # Camera - device_supported_events = [ - s for s in supported_events if (s.channel_id == int(device_id) and s.event_id in EVENTS) - ] - - for event in device_supported_events: - # Build unique_id - device_id_param = f"_{device_id}" if device_id != 0 else "" - io_port_id_param = f"_{event.io_port_id}" if event.io_port_id != 0 else "" - unique_id = f"{slugify(serial_no.lower())}{device_id_param}{io_port_id_param}_{event.event_id}" - - if EVENTS.get(event.event_id): - event_info = EventInfo( - id=event.event_id, - channel_id=event.channel_id, - io_port_id=event.io_port_id, - unique_id=unique_id, - url=self.get_event_url(event, connection_type), - disabled=("center" not in event.notifications), # Disable if not set Notify Surveillance Center - ) - events.append(event_info) - return events - - async def get_supported_events(self, system_capabilities: dict) -> list[SupportedEventsInfo]: + async def get_supported_events(self, system_capabilities: dict) -> list[EventInfo]: """Get list of all supported events available.""" def get_event(event_trigger: dict): @@ -415,21 +227,27 @@ def get_event(event_trigger: dict): if not is_supported: return None - channel = event_trigger.get("videoInputChannelID", event_trigger.get("dynVideoInputChannelID", 0)) - io_port = event_trigger.get("inputIOPortID", event_trigger.get("dynInputIOPortID", 0)) - notifications = notification_list.get("EventTriggerNotification", []) - - if not isinstance(notifications, list): - notifications = [notifications] + channel_id = int( + event_trigger.get( + "videoInputChannelID", + event_trigger.get("dynVideoInputChannelID", 0), + ) + ) + io_port = int(event_trigger.get("inputIOPortID", event_trigger.get("dynInputIOPortID", 0))) + notifications = deep_get(notification_list, "EventTriggerNotification", []) + event_id = event_type.lower() # Translate to alternate IDs - if event_type.lower() in EVENTS_ALTERNATE_ID: - event_type = EVENTS_ALTERNATE_ID[event_type.lower()] + if event_id in EVENTS_ALTERNATE_ID: + event_id = EVENTS_ALTERNATE_ID[event_id] + + url = self.get_event_url(event_id, channel_id, io_port) - return SupportedEventsInfo( - channel_id=int(channel), - io_port_id=int(io_port), - event_id=event_type.lower(), + return EventInfo( + channel_id=channel_id, + io_port_id=io_port, + id=event_id, + url=url, notifications=[notify.get("notificationMethod") for notify in notifications] if notifications else [], ) @@ -440,15 +258,13 @@ def get_event(event_trigger: dict): supported_events = deep_get(event_notification, "EventTriggerList.EventTrigger", []) else: supported_events = deep_get(event_triggers, "EventTriggerList.EventTrigger", []) - if not isinstance(supported_events, list): - supported_events = [supported_events] for event_trigger in supported_events: if event := get_event(event_trigger): events.append(event) # some devices do not have scenechangedetection in Event/triggers - if not [e for e in events if e.event_id == "scenechangedetection"]: + if not [e for e in events if e.id == "scenechangedetection"]: is_supported = str_to_bool(deep_get(system_capabilities, "SmartCap.isSupportSceneChangeDetection", False)) if is_supported: event_trigger = await self.request(GET, "Event/triggers/scenechangedetection-1") @@ -458,33 +274,38 @@ def get_event(event_trigger: dict): return events - def get_event_url(self, event: SupportedEventsInfo, connection_type: str) -> str: + def get_event_url(self, event_id: str, channel_id: int, io_port_id: int) -> str | None: """Get event ISAPI URL.""" - event_type = EVENTS[event.event_id]["type"] - slug = EVENTS[event.event_id]["slug"] + if not EVENTS.get(event_id): + return None + + event_type = EVENTS[event_id]["type"] + slug = EVENTS[event_id]["slug"] + camera = self.get_camera_by_id(channel_id) + connection_type = camera.connection_type if camera else CONNECTION_TYPE_DIRECT if event_type == EVENT_BASIC: if connection_type == CONNECTION_TYPE_PROXIED: # ISAPI/ContentMgmt/InputProxy/channels/{channel_id}/video/{event} - url = f"ContentMgmt/InputProxy/channels/{event.channel_id}/video/{slug}" + url = f"ContentMgmt/InputProxy/channels/{channel_id}/video/{slug}" else: # ISAPI/System/Video/inputs/channels/{channel_id}/{event} - url = f"System/Video/inputs/channels/{event.channel_id}/{slug}" + url = f"System/Video/inputs/channels/{channel_id}/{slug}" elif event_type == EVENT_IO: if connection_type == CONNECTION_TYPE_PROXIED: # ISAPI/ContentMgmt/IOProxy/{slug}/{channel_id} - url = f"ContentMgmt/IOProxy/{slug}/{event.io_port_id}" + url = f"ContentMgmt/IOProxy/{slug}/{io_port_id}" else: # ISAPI/System/IO/{slug}}/{channel_id} - url = f"System/IO/{slug}/{event.io_port_id}" + url = f"System/IO/{slug}/{io_port_id}" elif event_type == EVENT_PIR: # ISAPI/WLAlarm/PIR url = slug else: # ISAPI/Smart/{event}/{channel_id} - url = f"Smart/{slug}/{event.channel_id}" + url = f"Smart/{slug}/{channel_id}" return url async def get_camera_streams(self, channel_id: int) -> list[CameraStreamInfo]: @@ -513,13 +334,15 @@ async def get_camera_streams(self, channel_id: int) -> list[CameraStreamInfo]: def get_camera_by_id(self, camera_id: int) -> IPCamera | AnalogCamera | None: """Get camera object by id.""" try: + if camera_id == 0: + return None return [camera for camera in self.cameras if camera.id == camera_id][0] except IndexError: # Camera id does not exist return None async def get_storage_devices(self): - """Get HDD storage devices.""" + """Get HDD and NAS storage devices.""" storage_list = [] storage_info = (await self.request(GET, "ContentMgmt/Storage")).get("storage", {}) @@ -573,36 +396,12 @@ async def get_storage_devices(self): def get_storage_device_by_id(self, device_id: int) -> StorageInfo | None: """Get storage object by id.""" try: - return [storage_device for storage_device in self.device_info.storage if storage_device.id == device_id][0] + return [storage_device for storage_device in self.storage if storage_device.id == device_id][0] except IndexError: # Storage id does not exist return None - def hass_device_info(self, device_id: int = 0) -> DeviceInfo: - """Return Home Assistant entity device information.""" - if device_id == 0: - return DeviceInfo( - manufacturer=self.device_info.manufacturer, - identifiers={(DOMAIN, self.device_info.serial_no)}, - connections={(dr.CONNECTION_NETWORK_MAC, self.device_info.mac_address)}, - model=self.device_info.model, - name=self.device_info.name, - sw_version=self.device_info.firmware, - ) - else: - camera_info = self.get_camera_by_id(device_id) - is_ip_camera = isinstance(camera_info, IPCamera) - - return DeviceInfo( - manufacturer=self.device_info.manufacturer, - identifiers={(DOMAIN, camera_info.serial_no)}, - model=camera_info.model, - name=camera_info.name, - sw_version=camera_info.firmware if is_ip_camera else "Unknown", - via_device=(DOMAIN, self.device_info.serial_no) if self.device_info.is_nvr else None, - ) - - def get_event_state_node(self, event: EventInfo) -> str: + def _get_event_state_node(self, event: EventInfo) -> str: """Get xml key for event state.""" slug = EVENTS[event.id]["slug"] @@ -622,8 +421,11 @@ def get_event_state_node(self, event: EventInfo) -> str: async def get_event_enabled_state(self, event: EventInfo) -> bool: """Get event detection state.""" + if not event.url: + _LOGGER.warning("Cannot fetch event enabled state. Unknown event URL %s", event.id) + return False state = await self.request(GET, event.url) - node = self.get_event_state_node(event) + node = self._get_event_state_node(event) return str_to_bool(state[node].get("enabled", "false")) if state.get(node) else False async def get_event_switch_mutex(self, event: EventInfo, channel_id: int) -> list[MutexIssue]: @@ -635,8 +437,8 @@ async def get_event_switch_mutex(self, event: EventInfo, channel_id: int) -> lis # Use alt event ID for mutex due to crap API! event_id = event.id - if MUTEX_ALTERNATE_IDS.get(event.id): - event_id = MUTEX_ALTERNATE_IDS[event.id] + if MUTEX_ALTERNATE_ID.get(event.id): + event_id = MUTEX_ALTERNATE_ID[event.id] data = {"function": event_id, "channelID": int(channel_id)} url = "System/mutexFunction?format=json" @@ -661,15 +463,17 @@ async def get_event_switch_mutex(self, event: EventInfo, channel_id: int) -> lis async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_enabled: bool) -> None: """Set event detection state.""" - + if not event.url: + _LOGGER.warning("Cannot set event enabled state. Unknown event URL %s", event.id) + return False # Validate that this event switch is not mutually exclusive with another enabled one mutex_issues = [] - if channel_id != 0 and is_enabled and self.device_info.support_event_mutex_checking: + if channel_id != 0 and is_enabled and self.capabilities.support_event_mutex_checking: mutex_issues = await self.get_event_switch_mutex(event, channel_id) if not mutex_issues: data = await self.request(GET, event.url) - node = self.get_event_state_node(event) + node = self._get_event_state_node(event) new_state = bool_to_str(is_enabled) if new_state == data[node]["enabled"]: return @@ -677,13 +481,9 @@ async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_en xml = xmltodict.unparse(data) await self.request(PUT, event.url, present="xml", data=xml) else: - raise HomeAssistantError( - f"""You cannot enable {EVENTS[event.id]['label']} events. - Please disable {EVENTS[mutex_issues[0].event_id]['label']} - on channels {mutex_issues[0].channels} first""" - ) + raise SetEventStateMutexError(event, mutex_issues) - async def get_port_status(self, port_type: str, port_no: int) -> str: + async def get_io_port_status(self, port_type: str, port_no: int) -> str: """Get status of physical ports.""" if port_type == "input": status = await self.request(GET, f"System/IO/inputs/{port_no}/status") @@ -691,7 +491,7 @@ async def get_port_status(self, port_type: str, port_no: int) -> str: status = await self.request(GET, f"System/IO/outputs/{port_no}/status") return deep_get(status, "IOPortStatus.ioState") - async def set_port_state(self, port_no: int, turn_on: bool): + async def set_output_port_state(self, port_no: int, turn_on: bool): """Set status of output port.""" data = {} if turn_on: @@ -731,12 +531,9 @@ async def set_holiday_enabled_state(self, is_enabled: bool, holiday_index=0) -> await self.request(PUT, "System/Holidays", present="xml", data=xml) def _get_event_notification_host(self, data: Node) -> Node: - hosts = deep_get(data, "HttpHostNotificationList.HttpHostNotification", {}) - if isinstance(hosts, list): - # + hosts = deep_get(data, "HttpHostNotificationList.HttpHostNotification", []) + if hosts: return hosts[0] - # - return hosts async def get_alarm_server(self) -> AlarmServer | None: """Get event notifications listener server URL.""" @@ -783,59 +580,6 @@ async def reboot(self): """Reboot device.""" await self.request(PUT, "System/reboot", present="xml") - async def request( - self, - method: str, - url: str, - present: str = "dict", - **data, - ) -> Any: - """Send request and log response, returns {} if request fails.""" - - full_url = self.isapi.get_url(url) - try: - response = await self.isapi.request(method, full_url, present, **data) - _LOGGER.debug("--- [%s] %s", method, full_url) - if data: - _LOGGER.debug(">>> payload:\n%s", data) - _LOGGER.debug("\n%s", response) - except HTTPStatusError as ex: - _LOGGER.info("--- [%s] %s\n%s", method, full_url, ex) - if self.pending_initialization: - # supress http errors during initialization - return {} - raise - else: - return response - - def handle_exception(self, ex: Exception, details: str = "") -> bool: - """Handle common exception, returns False if exception remains unhandled.""" - - def is_reauth_needed(): - if isinstance(ex, HTTPStatusError): - status_code = ex.response.status_code - if status_code in (HTTPStatus.UNAUTHORIZED,): - return True - return False - - host = self.isapi.host - if is_reauth_needed(): - # Re-establish session - self.isapi = ISAPI_Client( - host, - self.isapi.username, - self.isapi.password, - self.isapi.session, - timeout=20, - ) - return True - - elif isinstance(ex, (asyncio.TimeoutError, TimeoutException)): - raise HomeAssistantError(f"Timeout while connecting to {host} {details}") from ex - - _LOGGER.warning("Unexpected exception | %s | %s", details, ex) - return False - @staticmethod def parse_event_notification(xml: str) -> AlertInfo: """Parse incoming EventNotificationAlert XML message.""" @@ -896,12 +640,12 @@ async def get_camera_image( if stream.use_alternate_picture_url: url = f"ContentMgmt/StreamingProxy/channels/{stream.id}/picture" - full_url = self.isapi.get_url(url) - chunks = self.isapi.request_bytes(GET, full_url, params=params) + full_url = self.get_isapi_url(url) + chunks = self.request_bytes(GET, full_url, params=params) else: url = f"Streaming/channels/{stream.id}/picture" - full_url = self.isapi.get_url(url) - chunks = self.isapi.request_bytes(GET, full_url, params=params) + full_url = self.get_isapi_url(url) + chunks = self.request_bytes(GET, full_url, params=params) data = b"".join([chunk async for chunk in chunks]) if data.startswith(b" str: """Get stream source.""" - u = quote(self.isapi.username, safe="") - p = quote(self.isapi.password, safe="") - url = f"{self.device_info.ip_address}:{self.device_info.rtsp_port}/Streaming/channels/{stream.id}" + u = quote(self.username, safe="") + p = quote(self.password, safe="") + url = f"{self.device_info.ip_address}:{self.protocols.rtsp_port}/Streaming/channels/{stream.id}" return f"rtsp://{u}:{p}@{url}" + async def _detect_auth_method(self): + """Establish the connection with device.""" + if not self._session: + self._session = httpx.AsyncClient(timeout=self.timeout) + + url = urljoin(self.host, self.isapi_prefix + "/System/deviceInfo") + _LOGGER.debug("--- [WWW-Authenticate detection] %s", self.host) + response = await self._session.get(url) + if response.status_code == 401: + www_authenticate = response.headers.get("WWW-Authenticate", "") + _LOGGER.debug("WWW-Authenticate header: %s", www_authenticate) + if "Basic" in www_authenticate: + self._auth_method = httpx.BasicAuth(self.username, self.password) + elif "Digest" in www_authenticate: + self._auth_method = httpx.DigestAuth(self.username, self.password) + + if not self._auth_method: + _LOGGER.error("Authentication method not detected, %s", response.status_code) + if response.headers: + _LOGGER.error("response.headers %s", response.headers) + response.raise_for_status() + + def get_isapi_url(self, relative_url: str) -> str: + return f"{self.host}/{self.isapi_prefix}/{relative_url}" -def str_to_bool(value: str) -> bool: - """Convert text to boolean.""" - if value: - return value.lower() == "true" - return False + async def request( + self, + method: str, + url: str, + present: str = "dict", + data: str = None, + ) -> Any: + """Send request and log response, returns {} if request fails.""" + if not self._auth_method: + await self._detect_auth_method() + full_url = self.get_isapi_url(url) + try: + response = await self._session.request( + method, + full_url, + auth=self._auth_method, + data=data, + timeout=self.timeout, + ) + response.raise_for_status() + result = parse_isapi_response(response, present) + _LOGGER.debug("--- [%s] %s", method, full_url) + if data: + _LOGGER.debug(">>> payload:\n%s", data) + _LOGGER.debug("\n%s", result) + except HTTPStatusError as ex: + _LOGGER.info("--- [%s] %s\n%s", method, full_url, ex) + if self.pending_initialization: + # supress http errors during initialization + return {} + raise + else: + return result -def bool_to_str(value: bool) -> str: - """Convert boolean to 'true' or 'false'.""" - return "true" if value else "false" + async def request_bytes( + self, + method: str, + full_url: str, + **data, + ) -> AsyncIterator[bytes]: + if not self._auth_method: + await self._detect_auth_method() + async with self._session.stream(method, full_url, auth=self._auth_method, **data) as response: + async for chunk in response.aiter_bytes(): + yield chunk -def get_stream_id(channel_id: str, stream_type: int = 1) -> int: - """Get stream id.""" - return int(channel_id) * 100 + stream_type +class SetEventStateMutexError(Exception): + """Error setting event mutex.""" -def deep_get(dictionary: dict, path: str, default: Any = None) -> Any: - """Get safely nested dictionary attribute.""" - return reduce( - lambda d, key: d.get(key, default) if isinstance(d, dict) else default, - path.split("."), - dictionary, - ) + def __init__(self, event: EventInfo, mutex_issues: []) -> None: + """Initialize exception.""" + self.event = event + self.mutex_issues = mutex_issues diff --git a/custom_components/hikvision_next/isapi/models.py b/custom_components/hikvision_next/isapi/models.py new file mode 100644 index 0000000..defefd8 --- /dev/null +++ b/custom_components/hikvision_next/isapi/models.py @@ -0,0 +1,135 @@ +from dataclasses import dataclass, field + + +@dataclass +class AlarmServer: + """Holds alarm server info.""" + + # Uses pylint invalid names to not break previous versions + ipAddress: str # pylint: disable=invalid-name + portNo: int # pylint: disable=invalid-name + url: str # pylint: disable=invalid-name + protocolType: str # pylint: disable=invalid-name + + +@dataclass +class AlertInfo: + """Holds NVR/Camera event notification info.""" + + channel_id: int + io_port_id: int + event_id: str + device_serial_no: str = field(default=None) + mac: str = "" + region_id: int = 0 + detection_target: str = field(default=None) + + +@dataclass +class MutexIssue: + """Holds mutually exclusive event checking info.""" + + event_id: str + channels: list = field(default_factory=list) + + +@dataclass +class EventInfo: + """Holds event info of Hikvision device.""" + + id: str + channel_id: int + io_port_id: int + unique_id: str = None + url: str = None + disabled: bool = False + notifications: list[str] = field(default_factory=list) + + +@dataclass +class CameraStreamInfo: + """Holds info of a camera stream.""" + + id: int + name: str + type_id: int + type: str + enabled: bool + codec: str + width: int + height: int + audio: bool + use_alternate_picture_url: bool = False + + +@dataclass +class StorageInfo: + """Holds info for internal and NAS storage devices.""" + + id: int + name: str + type: str + status: str + capacity: int + freespace: int + property: str + ip: str = "" + + +@dataclass +class ISAPIDeviceInfo: + """Holds info of an NVR/DVR or single IP Camera.""" + + name: str = "" + manufacturer: str = "" + model: str = "" + serial_no: str = "" + firmware: str = "" + mac_address: str = "" + ip_address: str = "" + device_type: str = "" + is_nvr: bool = False + + +@dataclass +class CapabilitiesInfo: + """Holds info of an NVR/DVR or single IP Camera.""" + + support_analog_cameras: int = 0 + support_digital_cameras: int = 0 + support_holiday_mode: bool = False + support_alarm_server: bool = False + support_channel_zero: bool = False + support_event_mutex_checking: bool = False + input_ports: int = 0 + output_ports: int = 0 + + +@dataclass +class AnalogCamera: + """Analog cameras info.""" + + id: int + name: str + model: str + serial_no: str + input_port: int + connection_type: str + streams: list[CameraStreamInfo] = field(default_factory=list) + events_info: list[EventInfo] = field(default_factory=list) + + +@dataclass +class IPCamera(AnalogCamera): + """IP/Digital camera info.""" + + firmware: str = "" + ip_addr: str = "" + ip_port: int = 0 + + +@dataclass +class ProtocolsInfo: + """Holds info of supported protocols.""" + + rtsp_port: int = 554 diff --git a/custom_components/hikvision_next/isapi/utils.py b/custom_components/hikvision_next/isapi/utils.py new file mode 100644 index 0000000..1cb1e06 --- /dev/null +++ b/custom_components/hikvision_next/isapi/utils.py @@ -0,0 +1,56 @@ +from functools import reduce +import json +from typing import Any + +import xmltodict + + +def parse_isapi_response(response, present="dict"): + """Parse Hikvision results.""" + if isinstance(response, (list,)): + result = "".join(response) + elif isinstance(response, str): + result = response + else: + result = response.text + + if present is None or present == "dict": + if isinstance(response, (list,)): + events = [] + for event in response: + e = json.loads(json.dumps(xmltodict.parse(event))) + events.append(e) + return events + return json.loads(json.dumps(xmltodict.parse(result))) + else: + return result + + +def str_to_bool(value: str) -> bool: + """Convert text to boolean.""" + if value: + return value.lower() == "true" + return False + + +def bool_to_str(value: bool) -> str: + """Convert boolean to 'true' or 'false'.""" + return "true" if value else "false" + + +def get_stream_id(channel_id: str, stream_type: int = 1) -> int: + """Get stream id.""" + return int(channel_id) * 100 + stream_type + + +def deep_get(dictionary: dict, path: str, default: Any = None) -> Any: + """Get safely nested dictionary attribute.""" + result = reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + path.split("."), + dictionary, + ) + if default == [] and not isinstance(result, list): + return [result] + + return result diff --git a/custom_components/hikvision_next/isapi_client.py b/custom_components/hikvision_next/isapi_client.py deleted file mode 100644 index 53e3dc3..0000000 --- a/custom_components/hikvision_next/isapi_client.py +++ /dev/null @@ -1,94 +0,0 @@ -import httpx -import logging -from typing import Any, AsyncIterator, List, Union -from urllib.parse import urljoin -import json -import xmltodict -from dataclasses import dataclass - -_LOGGER = logging.getLogger(__name__) - - -def response_parser(response, present="dict"): - """Parse Hikvision results.""" - if isinstance(response, (list,)): - result = "".join(response) - elif isinstance(response, str): - result = response - else: - result = response.text - - if present is None or present == "dict": - if isinstance(response, (list,)): - events = [] - for event in response: - e = json.loads(json.dumps(xmltodict.parse(event))) - events.append(e) - return events - return json.loads(json.dumps(xmltodict.parse(result))) - else: - return result - - -@dataclass -class ISAPI_Client: - host: str - username: str - password: str - session: httpx.AsyncClient | None = None - timeout: float = 3 - isapi_prefix: str = "ISAPI" - _auth_method: httpx._auth.Auth = None - - async def _detect_auth_method(self): - """Establish the connection with device.""" - if not self.session: - self.session = httpx.AsyncClient(timeout=self.timeout) - - url = urljoin(self.host, self.isapi_prefix + "/System/deviceInfo") - _LOGGER.debug("--- [WWW-Authenticate detection] %s", self.host) - response = await self.session.get(url) - if response.status_code == 401: - www_authenticate = response.headers.get("WWW-Authenticate", "") - _LOGGER.debug("WWW-Authenticate header: %s", www_authenticate) - if "Basic" in www_authenticate: - self._auth_method = httpx.BasicAuth(self.username, self.password) - elif "Digest" in www_authenticate: - self._auth_method = httpx.DigestAuth(self.username, self.password) - - if not self._auth_method: - _LOGGER.error("Authentication method not detected, %s", response.status_code) - if response.headers: - _LOGGER.error("response.headers %s", response.headers) - response.raise_for_status() - - def get_url(self, relative_url: str) -> str: - return f"{self.host}/{self.isapi_prefix}/{relative_url}" - - async def request( - self, - method: str, - full_url: str, - present: str = "dict", - data: dict[str, Any] | None = None, - ) -> Union[List[str], str]: - """Send request to the device.""" - if not self._auth_method: - await self._detect_auth_method() - - response = await self.session.request(method, full_url, auth=self._auth_method, data=data, timeout=self.timeout) - response.raise_for_status() - return response_parser(response, present) - - async def request_bytes( - self, - method: str, - full_url: str, - **data, - ) -> AsyncIterator[bytes]: - if not self._auth_method: - await self._detect_auth_method() - - async with self.session.stream(method, full_url, auth=self._auth_method, **data) as response: - async for chunk in response.aiter_bytes(): - yield chunk diff --git a/custom_components/hikvision_next/notifications.py b/custom_components/hikvision_next/notifications.py index 5bc4a89..f11bdd8 100755 --- a/custom_components/hikvision_next/notifications.py +++ b/custom_components/hikvision_next/notifications.py @@ -17,8 +17,9 @@ from homeassistant.helpers.entity_registry import async_get from homeassistant.util import slugify -from .const import ALARM_SERVER_PATH, DATA_ISAPI, DOMAIN, HIKVISION_EVENT -from .isapi import ISAPI, AlertInfo, IPCamera +from .const import ALARM_SERVER_PATH, DOMAIN, HIKVISION_EVENT +from .hikvision_device import HikvisionDevice +from .isapi import ISAPIClient, AlertInfo, IPCamera _LOGGER = logging.getLogger(__name__) @@ -40,7 +41,7 @@ def __init__(self, hass: HomeAssistant): self.requires_auth = False self.url = ALARM_SERVER_PATH self.name = DOMAIN - self.isapi: ISAPI + self.device: HikvisionDevice self.hass = hass async def post(self, request: web.Request): @@ -49,7 +50,7 @@ async def post(self, request: web.Request): try: _LOGGER.debug("--- Incoming event notification ---") _LOGGER.debug("Source: %s", request.remote) - self.isapi = self.get_isapi_instance(request.remote) + self.device = self.get_isapi_device(request.remote) xml = await self.parse_event_request(request) _LOGGER.debug("alert info: %s", xml) self.trigger_sensor(xml) @@ -59,7 +60,7 @@ async def post(self, request: web.Request): response = web.Response(status=HTTPStatus.OK, content_type=CONTENT_TYPE_TEXT_PLAIN) return response - def get_isapi_instance(self, device_ip) -> ISAPI: + def get_isapi_device(self, device_ip) -> HikvisionDevice: """Get isapi instance for device sending alert.""" integration_entries = self.hass.config_entries.async_entries(DOMAIN) instances_hosts = [] @@ -79,8 +80,7 @@ def get_isapi_instance(self, device_ip) -> ISAPI: if not entry: raise ValueError(f"Cannot find ISAPI instance for device {device_ip} in {instances_hosts}") - config = self.hass.data[DOMAIN][entry.entry_id] - return config.get(DATA_ISAPI) + return entry.runtime_data def get_ip(self, ip_string: str) -> str: """Return an IP if either hostname or IP is provided.""" @@ -134,7 +134,7 @@ async def parse_event_request(self, request: web.Request) -> str: def get_alert_info(self, xml: str) -> AlertInfo: """Parse incoming EventNotificationAlert XML message.""" - alert = ISAPI.parse_event_notification(xml) + alert = ISAPIClient.parse_event_notification(xml) if alert.channel_id > 32: # channel id above 32 is an IP camera @@ -143,7 +143,7 @@ def get_alert_info(self, xml: str) -> AlertInfo: try: alert.channel_id = [ camera.id - for camera in self.isapi.cameras + for camera in self.device.cameras if isinstance(camera, IPCamera) and camera.input_port == alert.channel_id - 32 ][0] except IndexError: @@ -157,7 +157,7 @@ def trigger_sensor(self, xml: str) -> None: alert = self.get_alert_info(xml) _LOGGER.debug("Alert: %s", alert) - serial_no = self.isapi.device_info.serial_no.lower() + serial_no = self.device.device_info.serial_no.lower() device_id_param = f"_{alert.channel_id}" if alert.channel_id != 0 else "" io_port_id_param = f"_{alert.io_port_id}" if alert.io_port_id != 0 else "" @@ -178,7 +178,7 @@ def trigger_sensor(self, xml: str) -> None: def fire_hass_event(self, alert: AlertInfo): """Fire HASS event.""" camera_name = "" - if camera := self.isapi.get_camera_by_id(alert.channel_id): + if camera := self.device.get_camera_by_id(alert.channel_id): camera_name = camera.name message = { diff --git a/custom_components/hikvision_next/sensor.py b/custom_components/hikvision_next/sensor.py index 4842264..099e390 100644 --- a/custom_components/hikvision_next/sensor.py +++ b/custom_components/hikvision_next/sensor.py @@ -3,18 +3,13 @@ from __future__ import annotations from homeassistant.components.sensor import ENTITY_ID_FORMAT, SensorEntity -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity -from .const import ( - DATA_ALARM_SERVER_HOST, - DOMAIN, - EVENTS_COORDINATOR, - SECONDARY_COORDINATOR, -) +from . import HikvisionConfigEntry +from .const import CONF_ALARM_SERVER_HOST, SECONDARY_COORDINATOR from .isapi import StorageInfo NOTIFICATION_HOST_KEYS = { @@ -27,25 +22,23 @@ async def async_setup_entry( hass: HomeAssistant, - entry: ConfigEntry, + entry: HikvisionConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: - """Add diagnostic sensors for hikvision alarm server settings.""" + """Add diagnostic sensors for hikvision alarm server settings and storage items.""" - config = hass.data[DOMAIN][entry.entry_id] - coordinator = config.get(SECONDARY_COORDINATOR) + device = entry.runtime_data + coordinator = device.coordinators.get(SECONDARY_COORDINATOR) entities = [] if coordinator: for key in NOTIFICATION_HOST_KEYS: entities.append(AlarmServerSensor(coordinator, key)) - events_coordinator = config.get(EVENTS_COORDINATOR) - if events_coordinator: - for item in list(events_coordinator.isapi.device_info.storage): + for item in list(device.storage): entities.append(StorageSensor(coordinator, item)) - async_add_entities(entities, True) + async_add_entities(entities, True) class AlarmServerSensor(CoordinatorEntity, SensorEntity): @@ -58,19 +51,17 @@ class AlarmServerSensor(CoordinatorEntity, SensorEntity): def __init__(self, coordinator, key: str) -> None: """Initialize.""" super().__init__(coordinator) - isapi = coordinator.isapi - self._attr_unique_id = ( - f"{isapi.device_info.serial_no}_{DATA_ALARM_SERVER_HOST}_{key}" - ) + device = coordinator.device + self._attr_unique_id = f"{device.device_info.serial_no}_{CONF_ALARM_SERVER_HOST}_{key}" self.entity_id = ENTITY_ID_FORMAT.format(self.unique_id) - self._attr_device_info = isapi.hass_device_info() + self._attr_device_info = device.hass_device_info() self._attr_translation_key = f"notifications_host_{NOTIFICATION_HOST_KEYS[key]}" self.key = key @property def native_value(self) -> str | None: """Return the state of the sensor.""" - host = self.coordinator.data.get(DATA_ALARM_SERVER_HOST) + host = self.coordinator.data.get(CONF_ALARM_SERVER_HOST) return getattr(host, self.key) if host else None @@ -84,17 +75,17 @@ class StorageSensor(CoordinatorEntity, SensorEntity): def __init__(self, coordinator, hdd: StorageInfo) -> None: """Initialize.""" super().__init__(coordinator) - isapi = coordinator.isapi - self._attr_unique_id = f"{isapi.device_info.serial_no}_{hdd.id}_{hdd.name}" + device = coordinator.device + self._attr_unique_id = f"{device.device_info.serial_no}_{hdd.id}_{hdd.name}" self.entity_id = ENTITY_ID_FORMAT.format(self.unique_id) - self._attr_device_info = isapi.hass_device_info() + self._attr_device_info = device.hass_device_info() self._attr_name = f"{hdd.type} {hdd.name}" self.hdd = hdd @property def native_value(self) -> str | None: """Return the state of the sensor.""" - hdd = self.coordinator.isapi.get_storage_device_by_id(self.hdd.id) + hdd = self.coordinator.device.get_storage_device_by_id(self.hdd.id) return str(hdd.status).upper() if hdd else None @property diff --git a/custom_components/hikvision_next/services.py b/custom_components/hikvision_next/services.py index 8b5aa09..62c158a 100644 --- a/custom_components/hikvision_next/services.py +++ b/custom_components/hikvision_next/services.py @@ -11,13 +11,7 @@ ) from homeassistant.exceptions import HomeAssistantError -from .const import ( - ACTION_ISAPI_REQUEST, - ACTION_REBOOT, - ATTR_CONFIG_ENTRY_ID, - DATA_ISAPI, - DOMAIN, -) +from .const import ACTION_ISAPI_REQUEST, ACTION_REBOOT, ATTR_CONFIG_ENTRY_ID, DOMAIN ACTION_ISAPI_REQUEST_SCHEMA = vol.Schema( { @@ -34,24 +28,24 @@ def setup_services(hass: HomeAssistant) -> None: async def handle_reboot(call: ServiceCall): """Handle the reboot action call.""" - entries = hass.data[DOMAIN] entry_id = call.data.get(ATTR_CONFIG_ENTRY_ID) - isapi = entries[entry_id][DATA_ISAPI] + entry = hass.config_entries.async_get_entry(entry_id) + device = entry.runtime_data try: - await isapi.reboot() + await device.reboot() except HTTPStatusError as ex: raise HomeAssistantError(ex.response.content) from ex async def handle_isapi_request(call: ServiceCall) -> ServiceResponse: """Handle the custom ISAPI request action call.""" - entries = hass.data[DOMAIN] entry_id = call.data.get(ATTR_CONFIG_ENTRY_ID) - isapi = entries[entry_id][DATA_ISAPI] + entry = hass.config_entries.async_get_entry(entry_id) + device = entry.runtime_data method = call.data.get("method", "POST") path = call.data["path"].strip("/") payload = call.data.get("payload") try: - response = await isapi.request(method, path, present="xml", data=payload) + response = await device.request(method, path, present="xml", data=payload) except HTTPStatusError as ex: if isinstance(ex.response.content, bytes): response = ex.response.content.decode("utf-8") diff --git a/custom_components/hikvision_next/switch.py b/custom_components/hikvision_next/switch.py index 8ce2baa..4636e49 100644 --- a/custom_components/hikvision_next/switch.py +++ b/custom_components/hikvision_next/switch.py @@ -5,47 +5,47 @@ from typing import Any from homeassistant.components.switch import ENTITY_ID_FORMAT, SwitchEntity -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.util import slugify -from .const import ( - DOMAIN, - EVENT_IO, - EVENTS_COORDINATOR, - HOLIDAY_MODE, - SECONDARY_COORDINATOR, -) -from .isapi import EventInfo +from . import HikvisionConfigEntry +from .isapi.const import EVENT_IO +from .const import EVENTS, EVENTS_COORDINATOR, HOLIDAY_MODE, SECONDARY_COORDINATOR +from .isapi import EventInfo, SetEventStateMutexError -async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None: +async def async_setup_entry( + hass: HomeAssistant, + entry: HikvisionConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: """Add hikvision_next entities from a config_entry.""" - config = hass.data[DOMAIN][entry.entry_id] - events_coordinator = config[EVENTS_COORDINATOR] - secondary_coordinator = config.get(SECONDARY_COORDINATOR) + device = entry.runtime_data + events_coordinator = device.coordinators.get(EVENTS_COORDINATOR) + secondary_coordinator = device.coordinators.get(SECONDARY_COORDINATOR) entities = [] # Camera supported events - for camera in events_coordinator.isapi.cameras: + for camera in device.cameras: for event in camera.events_info: entities.append(EventSwitch(camera.id, event, events_coordinator)) # NVR supported events - if events_coordinator.isapi.device_info.is_nvr: - for event in events_coordinator.isapi.device_info.events_info: + if device.device_info.is_nvr: + for event in device.events_info: entities.append(EventSwitch(0, event, events_coordinator)) # Output port switch - for i in range(1, events_coordinator.isapi.device_info.output_ports + 1): + for i in range(1, device.capabilities.output_ports + 1): entities.append(NVROutputSwitch(events_coordinator, i)) # Holiday mode switch - if secondary_coordinator.isapi.device_info.support_holiday_mode: + if device.capabilities.support_holiday_mode: entities.append(HolidaySwitch(secondary_coordinator)) async_add_entities(entities) @@ -62,7 +62,7 @@ def __init__(self, device_id: int, event: EventInfo, coordinator) -> None: super().__init__(coordinator) self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) self._attr_unique_id = self.entity_id - self._attr_device_info = coordinator.isapi.hass_device_info(device_id) + self._attr_device_info = coordinator.device.hass_device_info(device_id) self._attr_translation_key = event.id if event.id == EVENT_IO: self._attr_translation_placeholders = {"io_port_id": event.io_port_id} @@ -78,7 +78,13 @@ def is_on(self) -> bool | None: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on.""" try: - await self.coordinator.isapi.set_event_enabled_state(self.device_id, self.event, True) + await self.coordinator.device.set_event_enabled_state(self.device_id, self.event, True) + except SetEventStateMutexError as ex: + raise HomeAssistantError( + f"""You cannot enable {EVENTS[ex.event.id]['label']} events. + Please disable {EVENTS[ex.mutex_issues[0].event_id]['label']} + on channels {ex.mutex_issues[0].channels} first""" + ) except Exception as ex: raise ex finally: @@ -87,7 +93,7 @@ async def async_turn_on(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None: """Turn off.""" try: - await self.coordinator.isapi.set_event_enabled_state(self.device_id, self.event, False) + await self.coordinator.device.set_event_enabled_state(self.device_id, self.event, False) except Exception: raise finally: @@ -105,10 +111,10 @@ def __init__(self, coordinator, port_no: int) -> None: """Initialize.""" super().__init__(coordinator) self.entity_id = ENTITY_ID_FORMAT.format( - f"{slugify(coordinator.isapi.device_info.serial_no.lower())}_{port_no}_alarm_output" + f"{slugify(coordinator.device.device_info.serial_no.lower())}_{port_no}_alarm_output" ) self._attr_unique_id = self.entity_id - self._attr_device_info = coordinator.isapi.hass_device_info(0) + self._attr_device_info = coordinator.device.hass_device_info(0) self._attr_translation_placeholders = {"port_no": port_no} self._port_no = port_no @@ -120,7 +126,7 @@ def is_on(self) -> bool | None: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on.""" try: - await self.coordinator.isapi.set_port_state(self._port_no, True) + await self.coordinator.device.set_port_state(self._port_no, True) except Exception as ex: raise ex finally: @@ -128,7 +134,7 @@ async def async_turn_on(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None: try: - await self.coordinator.isapi.set_port_state(self._port_no, False) + await self.coordinator.device.set_port_state(self._port_no, False) except Exception as ex: raise ex finally: @@ -145,9 +151,9 @@ class HolidaySwitch(CoordinatorEntity, SwitchEntity): def __init__(self, coordinator) -> None: """Initialize.""" super().__init__(coordinator) - self._attr_unique_id = f"{slugify(coordinator.isapi.device_info.serial_no.lower())}_{HOLIDAY_MODE}" + self._attr_unique_id = f"{slugify(coordinator.device.device_info.serial_no.lower())}_{HOLIDAY_MODE}" self.entity_id = ENTITY_ID_FORMAT.format(self.unique_id) - self._attr_device_info = coordinator.isapi.hass_device_info() + self._attr_device_info = coordinator.device.hass_device_info() @property def is_on(self) -> bool | None: @@ -156,10 +162,10 @@ def is_on(self) -> bool | None: async def async_turn_on(self, **kwargs: Any) -> None: """Turn on.""" - await self.coordinator.isapi.set_holiday_enabled_state(True) + await self.coordinator.device.set_holiday_enabled_state(True) await self.coordinator.async_request_refresh() async def async_turn_off(self, **kwargs: Any) -> None: """Turn off.""" - await self.coordinator.isapi.set_holiday_enabled_state(False) + await self.coordinator.device.set_holiday_enabled_state(False) await self.coordinator.async_request_refresh() diff --git a/requirements.test.txt b/requirements.test.txt index 3f2db67..08c67bd 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -6,7 +6,7 @@ requests-toolbelt==1.0.0 pytest pytest-asyncio pytest-cov>=4.1.0 -pytest-homeassistant-custom-component +pytest-homeassistant-custom-component>=0.13.179 # async-timeout diff --git a/tests/conftest.py b/tests/conftest.py index 67c51cb..bad5d01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,10 +4,10 @@ import pytest import respx import xmltodict -from custom_components.hikvision_next.const import DOMAIN, DATA_SET_ALARM_SERVER, DATA_ALARM_SERVER_HOST +from custom_components.hikvision_next.const import DOMAIN, CONF_SET_ALARM_SERVER, CONF_ALARM_SERVER_HOST from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME from pytest_homeassistant_custom_component.common import MockConfigEntry -from custom_components.hikvision_next.isapi import ISAPI +from custom_components.hikvision_next.isapi import ISAPIClient from homeassistant.core import HomeAssistant TEST_HOST_IP = "1.0.0.255" @@ -17,11 +17,11 @@ CONF_USERNAME: "u1", CONF_PASSWORD: "***", } -TEST_CONFIG = {**TEST_CLIENT, DATA_SET_ALARM_SERVER: False, DATA_ALARM_SERVER_HOST: ""} +TEST_CONFIG = {**TEST_CLIENT, CONF_SET_ALARM_SERVER: False, CONF_ALARM_SERVER_HOST: ""} TEST_CONFIG_WITH_ALARM_SERVER = { **TEST_CLIENT, - DATA_SET_ALARM_SERVER: True, - DATA_ALARM_SERVER_HOST: "http://1.0.0.11:8123", + CONF_SET_ALARM_SERVER: True, + CONF_ALARM_SERVER_HOST: "http://1.0.0.11:8123", } @@ -39,6 +39,7 @@ def mock_config_entry(request) -> MockConfigEntry: domain=DOMAIN, data=config, version=2, + title=request.node.callspec.id, ) @@ -73,11 +74,14 @@ def mock_device_endpoints(model): @pytest.fixture -def mock_isapi(): +def mock_isapi(respx_mock): """Mock ISAPI instance.""" - respx.get(f"{TEST_HOST}/ISAPI/System/deviceInfo").respond(status_code=200) - isapi = ISAPI(**TEST_CLIENT) + digest_header = 'Digest realm="testrealm", qop="auth", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="799d5"' + respx.get(f"{TEST_HOST}/ISAPI/System/deviceInfo").respond( + status_code=401, headers={"WWW-Authenticate": digest_header} + ) + isapi = ISAPIClient(**TEST_CLIENT) return isapi @@ -114,5 +118,4 @@ async def init_integration(respx_mock, request, mock_isapi, hass: HomeAssistant, await hass.config_entries.async_setup(mock_config_entry.entry_id) await hass.async_block_till_done() - mock_config_entry.title = model return mock_config_entry diff --git a/tests/test_camera.py b/tests/test_camera.py index 835d42b..e58d63a 100644 --- a/tests/test_camera.py +++ b/tests/test_camera.py @@ -5,7 +5,7 @@ import httpx from homeassistant.core import HomeAssistant from homeassistant.const import STATE_IDLE -from homeassistant.components import camera as camera_component +from homeassistant.components.camera.helper import get_camera_from_entity_id from homeassistant.components.camera import DOMAIN as CAMERA_DOMAIN from pytest_homeassistant_custom_component.common import MockConfigEntry from tests.conftest import load_fixture @@ -22,7 +22,7 @@ async def test_camera(hass: HomeAssistant, init_integration: MockConfigEntry) -> entity_id = "camera.ds_7608nxi_i0_0p_s0000000000ccrrj00000000wcvu_101" assert hass.states.get(entity_id) - camera_entity = camera_component._get_camera_from_entity_id(hass, entity_id) + camera_entity = get_camera_from_entity_id(hass, entity_id) assert camera_entity.state == STATE_IDLE assert camera_entity.name == "garden" @@ -47,7 +47,7 @@ async def test_camera_snapshot(hass: HomeAssistant, init_integration: MockConfig """Test camera snapshot.""" entity_id = "camera.ds_7608nxi_i0_0p_s0000000000ccrrj00000000wcvu_101" - camera_entity = camera_component._get_camera_from_entity_id(hass, entity_id) + camera_entity = get_camera_from_entity_id(hass, entity_id) image_url = f"{TEST_HOST}/ISAPI/Streaming/channels/101/picture" respx.get(image_url).respond(content=b"binary image data") @@ -61,7 +61,7 @@ async def test_camera_snapshot_device_error(hass: HomeAssistant, init_integratio """Test camera snapshot with 2 attempts.""" entity_id = "camera.ds_7608nxi_i0_0p_s0000000000ccrrj00000000wcvu_101" - camera_entity = camera_component._get_camera_from_entity_id(hass, entity_id) + camera_entity = get_camera_from_entity_id(hass, entity_id) image_url = f"{TEST_HOST}/ISAPI/Streaming/channels/101/picture" route = respx.get(image_url) @@ -81,7 +81,7 @@ async def test_camera_snapshot_alternate_url(hass: HomeAssistant, init_integrati """Test camera snapshot with alternate url.""" entity_id = "camera.ds_7616ni_q2_00p0000000000ccrre00000000wcvu_101" - camera_entity = camera_component._get_camera_from_entity_id(hass, entity_id) + camera_entity = get_camera_from_entity_id(hass, entity_id) error_response = load_fixture("ISAPI/Streaming.channels.x0y.picture", "badXmlContent") image_url = f"{TEST_HOST}/ISAPI/Streaming/channels/101/picture" @@ -116,7 +116,7 @@ async def test_camera_stream_info(hass: HomeAssistant, init_integration: MockCon data = device_data[init_integration.title] entity_id = data["entity_id"] - camera_entity = camera_component._get_camera_from_entity_id(hass, entity_id) + camera_entity = get_camera_from_entity_id(hass, entity_id) assert camera_entity.stream_info.codec == data["codec"] assert camera_entity.stream_info.width == data["width"] diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 089bcd6..a1bce2f 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -61,7 +61,7 @@ async def test_wrong_credentials_config_flow(hass, mock_isapi): assert result.get("errors") == {"base": "invalid_auth"} -@patch("custom_components.hikvision_next.isapi.ISAPI.get_device_info") +@patch("custom_components.hikvision_next.isapi.ISAPIClient.get_device_info") async def test_unexpeced_exception_config_flowget_device_info_mock(get_device_info_mock, hass, mock_isapi): """Test a config flow with unexpeced exception.""" diff --git a/tests/test_init.py b/tests/test_init.py index c032df6..47e3b4a 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -4,6 +4,7 @@ from unittest.mock import patch from homeassistant.core import HomeAssistant from custom_components.hikvision_next.const import DOMAIN +from custom_components.hikvision_next.hikvision_device import HikvisionDevice from pytest_homeassistant_custom_component.common import MockConfigEntry from homeassistant.config_entries import ConfigEntryState from tests.conftest import TEST_CONFIG, TEST_CONFIG_WITH_ALARM_SERVER @@ -27,9 +28,9 @@ async def test_basic_init(hass: HomeAssistant, init_integration: MockConfigEntry entry = init_integration assert entry.state == ConfigEntryState.LOADED - isapi = hass.data[DOMAIN][entry.entry_id]["isapi"] - assert isapi.host == TEST_CONFIG["host"] - assert init_integration.title in isapi.device_info.model + device: HikvisionDevice = entry.runtime_data + assert device.host == TEST_CONFIG["host"] + assert init_integration.title in device.device_info.model @pytest.mark.parametrize("init_integration", ["DS-7608NXI-I2"], indirect=True) @@ -39,31 +40,32 @@ async def test_async_setup_entry_nvr(hass: HomeAssistant, init_integration: Mock entry = init_integration assert entry.state == ConfigEntryState.LOADED - isapi = hass.data[DOMAIN][entry.entry_id]["isapi"] - assert isapi.host == TEST_CONFIG["host"] - assert len(isapi.cameras) == 4 - assert len(isapi.supported_events) == 63 + device: HikvisionDevice = entry.runtime_data + assert device.host == TEST_CONFIG["host"] + assert len(device.cameras) == 4 + assert len(device.supported_events) == 63 - device_info = isapi.device_info + device_info = device.device_info + capabilities = device.capabilities assert device_info.device_type == "NVR" assert device_info.firmware == "V4.62.210" - assert device_info.input_ports == 4 + assert capabilities.input_ports == 4 assert TEST_CONFIG["host"].endswith(device_info.ip_address) assert device_info.is_nvr is True assert len(device_info.mac_address) == 17 assert device_info.manufacturer == "Hikvision" assert device_info.model == "DS-7608NXI-I2/8P/S" assert device_info.name == "nvr" - assert device_info.output_ports == 1 - assert device_info.rtsp_port == "10554" + assert capabilities.output_ports == 1 + assert device.protocols.rtsp_port == "10554" assert device_info.serial_no == "DS-7608NXI-I0/0P/S0000000000CCRRJ00000000WCVU" - assert len(device_info.storage) == 1 - assert device_info.support_alarm_server is True - assert device_info.support_analog_cameras == 0 - assert device_info.support_channel_zero is True - assert device_info.support_digital_cameras == 8 - assert device_info.support_event_mutex_checking is False - assert device_info.support_holiday_mode is True + assert len(device.storage) == 1 + assert capabilities.support_alarm_server is True + assert capabilities.support_analog_cameras == 0 + assert capabilities.support_channel_zero is True + assert capabilities.support_digital_cameras == 8 + assert capabilities.support_event_mutex_checking is False + assert capabilities.support_holiday_mode is True # test successful unload await hass.config_entries.async_unload(entry.entry_id) @@ -79,32 +81,32 @@ async def test_async_setup_entry_ipc(hass: HomeAssistant, init_integration: Mock entry = init_integration assert entry.state == ConfigEntryState.LOADED - isapi = hass.data[DOMAIN][entry.entry_id]["isapi"] - assert isapi.host == TEST_CONFIG["host"] - assert len(isapi.cameras) == 1 - assert len(isapi.supported_events) == 14 + device: HikvisionDevice = entry.runtime_data + assert device.host == TEST_CONFIG["host"] + assert len(device.cameras) == 1 + assert len(device.supported_events) == 14 - device_info = isapi.device_info + device_info = device.device_info + capabilities = device.capabilities assert device_info.device_type == "IPCamera" assert device_info.firmware == "V5.7.15" - assert device_info.input_ports == 0 + assert capabilities.input_ports == 0 assert TEST_CONFIG["host"].endswith(device_info.ip_address) assert device_info.is_nvr is False assert len(device_info.mac_address) == 17 assert device_info.manufacturer == "Hikvision" assert device_info.model == "DS-2CD2386G2-IU" assert device_info.name == "yard" - assert device_info.output_ports == 0 - assert device_info.output_ports == 0 - assert device_info.rtsp_port == "10554" + assert capabilities.output_ports == 0 + assert device.protocols.rtsp_port == "10554" assert device_info.serial_no == "DS-2CD2386G2-IU00000000AAWRJ00000000" - assert len(device_info.storage) == 2 - assert device_info.support_alarm_server is True - assert device_info.support_analog_cameras == 0 - assert device_info.support_channel_zero is False - assert device_info.support_digital_cameras == 0 - assert device_info.support_event_mutex_checking is False - assert device_info.support_holiday_mode is False + assert len(device.storage) == 2 + assert capabilities.support_alarm_server is True + assert capabilities.support_analog_cameras == 0 + assert capabilities.support_channel_zero is False + assert capabilities.support_digital_cameras == 0 + assert capabilities.support_event_mutex_checking is False + assert capabilities.support_holiday_mode is False # test successful unload await hass.config_entries.async_unload(entry.entry_id) @@ -120,7 +122,7 @@ async def test_async_setup_entry_nvr_with_alarm_server(hass: HomeAssistant, init entry = init_integration - with patch("custom_components.hikvision_next.isapi.ISAPI.set_alarm_server") as set_alarm_server_mock: + with patch("custom_components.hikvision_next.isapi.ISAPIClient.set_alarm_server") as set_alarm_server_mock: await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() diff --git a/tests/test_pir.py b/tests/test_pir.py index a4100d6..edfdb2d 100644 --- a/tests/test_pir.py +++ b/tests/test_pir.py @@ -4,10 +4,11 @@ import pytest from http import HTTPStatus from homeassistant.core import HomeAssistant -from custom_components.hikvision_next.const import DOMAIN, EVENT_PIR +from custom_components.hikvision_next.isapi.const import EVENT_PIR from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN from pytest_homeassistant_custom_component.common import MockConfigEntry import homeassistant.helpers.entity_registry as er +from custom_components.hikvision_next.hikvision_device import HikvisionDevice from custom_components.hikvision_next.notifications import EventNotificationsView from tests.test_notifications import mock_event_notification from tests.conftest import TEST_HOST @@ -96,9 +97,9 @@ async def test_pir_support_detection( } entry = init_integration - isapi = hass.data[DOMAIN][entry.entry_id]["isapi"] + device: HikvisionDevice = entry.runtime_data data = device_data[init_integration.title] pir_events = [ - s for s in isapi.supported_events if (s.event_id == EVENT_PIR) + s for s in device.supported_events if (s.id == EVENT_PIR) ] assert (len(pir_events) == 1) == data["isSupportPIR"]