diff --git a/README.rst b/README.rst index 7e17aea..4661057 100644 --- a/README.rst +++ b/README.rst @@ -15,7 +15,7 @@ Requirements ============ **niflexlogger-automation** has the following requirements: -* FlexLogger 2023 Q2+ +* FlexLogger 2023 Q3+ * CPython 3.6 - 3.10. If you do not have Python installed on your computer, go to python.org/downloads to download and install it. .. _installation_section: diff --git a/examples/Basic/events.py b/examples/Basic/events.py new file mode 100644 index 0000000..af91c09 --- /dev/null +++ b/examples/Basic/events.py @@ -0,0 +1,92 @@ +from flexlogger.automation import AlarmPayload +from flexlogger.automation import Application +from flexlogger.automation import EventPayload +from flexlogger.automation import EventType +from flexlogger.automation import FilePayload +import os +import sys + + +def main(project_path): + """Launch FlexLogger, open a project, and wait for an event.""" + with Application.launch() as app: + project = app.open_project(path=project_path) + + # Get a reference to the event handler and register callback functions for different event types. + event_handler = app.event_handler + event_handler.register_event_callback(alarms_event_handler, [EventType.ALARM]) + event_handler.register_event_callback(log_file_event_handler, [EventType.LOG_FILE]) + event_handler.register_event_callback(session_event_handler, [EventType.TEST_SESSION]) + + test_session = project.test_session + test_session.start() + print("Test started. Press Enter to stop the test and close the project...") + + # Wait for the user to press enter + input() + + # Cleanup: stop the session, close the project and unregister from the events + test_session.stop() + project.close() + event_handler.unregister_from_events() + + return 0 + + +def alarms_event_handler(application: Application, event_type: EventType, payload: AlarmPayload): + """Alarm Event Handler + This method will be called when an alarm event is fired. + + Args: + application: Application Reference to the application, so that you can access the project, session, etc + event_type: EventType The event type + payload: AlarmPayload The event payload + """ + + print("Event of type: {}, received at {}".format(event_type, payload.timestamp)) + print("Event Name: {}".format(payload.event_name)) + print("Alarm occurred on channel: {}".format(payload.channel)) + print("Alarm Severity Level: {}".format(payload.severity_level)) + + # Stop the session when the alarm is received + project = application.get_active_project() + session = project.test_session + session.stop() + + +def log_file_event_handler(application: Application, event_type: EventType, payload: FilePayload): + """FlexLogger Event Handler + This method will be called when a file event is fired. + + Args: + application: Application Reference to the application, so that you can access the project, session, etc + event_type: EventType The event type + payload: FilePayload The event payload + """ + + print("Event of type: {}, received at {}".format(event_type, payload.timestamp)) + print("Event Name: {}".format(payload.event_name)) + print("TDMS file path: {}".format(payload.file_path)) + + +def session_event_handler(application: Application, event_type: EventType, payload: EventPayload): + """FlexLogger Event Handler + This method will be called when a session event is fired. + + Args: + application: Application Reference to the application, so that you can access the project, session, etc + event_type: EventType The event type + payload: EventPayload The event payload + """ + + print("Event of type: {}, received at {}".format(event_type, payload.timestamp)) + print("Event Name: {}".format(payload.event_name)) + + +if __name__ == "__main__": + argv = sys.argv + if len(argv) < 2: + print("Usage: %s " % os.path.basename(__file__)) + sys.exit() + project_path_arg = argv[1] + sys.exit(main(project_path_arg)) diff --git a/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/EventType.proto b/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/EventType.proto new file mode 100644 index 0000000..ea8def0 --- /dev/null +++ b/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/EventType.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package national_instruments.flex_logger.automation.protocols; + +enum EventType { + EVENT_TYPE_NONE = 0; + // Alarm event + EVENT_TYPE_ALARM = 1; + // Log File event + EVENT_TYPE_LOG_FILE = 2; + // Test Session event + EVENT_TYPE_TEST_SESSION = 3; + // Custom event + EVENT_TYPE_CUSTOM = 4; +} diff --git a/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/Events.proto b/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/Events.proto new file mode 100644 index 0000000..60a12d0 --- /dev/null +++ b/protobuf/ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/Events.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; + +package national_instruments.flex_logger.automation.protocols; + +import "ConfigurationBasedSoftware/FlexLogger/Automation/FlexLogger.Automation.Protocols/EventType.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +// Service interface for the FlexLogger events. +service FlexLoggerEvents { + // RPC call to send an event for test purposes. + rpc SendEvent(SubscribeToEventsResponse) returns (google.protobuf.Empty) {} + // RPC call to subscribe to FlexLogger events. + rpc SubscribeToEvents(SubscribeToEventsRequest) returns (stream SubscribeToEventsResponse) {} + // RPC call to unsubscribe from events. + rpc UnsubscribeFromEvents(UnsubscribeFromEventsRequest) returns (google.protobuf.Empty) {} + // RPC call to specify which events to subscribe to. + rpc RegisterEvents(SubscribeToEventsRequest) returns (google.protobuf.Empty) {} + // RPC call to get which events are currently registered. + rpc GetRegisteredEvents(GetRegisteredEventsRequest) returns (GetRegisteredEventsResponse) {} +} + +// Request object for subscribing to events +message SubscribeToEventsRequest { + // The event client id + string client_id = 1; + // The event types to register to + repeated EventType event_types = 2; +} + +// SubscribeToEvents Response +message SubscribeToEventsResponse { + // The type of event being sent + EventType event_type = 1; + // Event Name + string event_name = 2; + // Event payload + string payload = 3; + // Time the event was sent + google.protobuf.Timestamp timestamp = 4; +} + +// Request object for unsubscribing +message UnsubscribeFromEventsRequest { + // The event client id + string client_id = 1; +} + +// Request object for getting registered events +message GetRegisteredEventsRequest { + // The event client id + string client_id = 1; +} + +// GetRegisteredEvents response +message GetRegisteredEventsResponse { + // The registered events + repeated EventType event_types = 1; +} diff --git a/requirements.txt b/requirements.txt index 6f4b17b..d2ded3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ console-menu grpcio-tools grpcio +importlib prettytable psutil # This package only works correctly on Windows, diff --git a/setup.py b/setup.py index cee77d4..9ce3d8b 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ def _get_version(name: str) -> str: script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.join(script_dir, name) if not os.path.exists(os.path.join(script_dir, "VERSION")): - version = "0.1.5" + version = "0.1.6" else: with open(os.path.join(script_dir, "VERSION"), "r") as version_file: version = version_file.read().rstrip() diff --git a/src/flexlogger/automation/__init__.py b/src/flexlogger/automation/__init__.py index eb31ab9..fd997b7 100644 --- a/src/flexlogger/automation/__init__.py +++ b/src/flexlogger/automation/__init__.py @@ -11,3 +11,10 @@ from ._channel_data_point import ChannelDataPoint from ._test_property import TestProperty from ._data_rate_level import DataRateLevel +from ._event_payloads import EventPayload +from ._event_payloads import AlarmPayload +from ._event_payloads import FilePayload +from . import _event_names as EventNames +from ._event_type import EventType +from ._events import FlexLoggerEventHandler +from ._severity_level import SeverityLevel diff --git a/src/flexlogger/automation/_application.py b/src/flexlogger/automation/_application.py index d563bd0..ff4b419 100644 --- a/src/flexlogger/automation/_application.py +++ b/src/flexlogger/automation/_application.py @@ -17,6 +17,7 @@ import psutil # type: ignore from grpc import insecure_channel, RpcError +from ._events import FlexLoggerEventHandler from ._flexlogger_error import FlexLoggerError from ._project import Project from .proto import ( @@ -49,6 +50,8 @@ def __init__(self, server_port: int = None) -> None: self._server_port = server_port if server_port is not None else self._detect_server_port() self._connect() self._launched = False + self._event_handler = None + self._client_id = uuid.uuid4().hex def __enter__(self) -> "Application": return self @@ -59,6 +62,18 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # or disconnect() explicitly. self._disconnect(exit_application=self._launched) + @property + def event_handler(self) -> FlexLoggerEventHandler: + """The application event handler.""" + if self._event_handler is not None: + return self._event_handler + + self._event_handler = FlexLoggerEventHandler(self._channel, + self._client_id, + self, + self._raise_exception_if_closed) + return self._event_handler + @property def server_port(self) -> int: """The port that the automation server is listening to.""" @@ -173,6 +188,7 @@ def _disconnect(self, exit_application: bool) -> None: finally: self._channel.close() self._channel = None + self._event_handler = None def _raise_exception_if_closed(self) -> None: if self._channel is None: diff --git a/src/flexlogger/automation/_event_names.py b/src/flexlogger/automation/_event_names.py new file mode 100644 index 0000000..eb3d4c5 --- /dev/null +++ b/src/flexlogger/automation/_event_names.py @@ -0,0 +1,10 @@ +ALARM_ADDED = "Active Alarm Added" +ALARM_REMOVED = "Active Alarm Removed" +ALARM_CHANGED = "Active Alarm Changed" +ALARM_CLEARED = "Alarm Cleared and Acknowledged" +LOG_FILE_CREATED = "Log File Created" +LOG_FILE_CLOSED = "Log File Closed" +TEST_STARTED = "Test Started" +TEST_PAUSED = "Test Paused" +TEST_RESUMED = "Test Resumed" +TEST_STOPPED = "Test Stopped" diff --git a/src/flexlogger/automation/_event_payloads.py b/src/flexlogger/automation/_event_payloads.py new file mode 100644 index 0000000..afa223d --- /dev/null +++ b/src/flexlogger/automation/_event_payloads.py @@ -0,0 +1,111 @@ +from ._event_type import EventType +from ._severity_level import SeverityLevel +from .proto import Events_pb2 +from datetime import datetime +import json + + +class EventPayload: + def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: + self._event_type = EventType.from_event_type_pb2(event_response.event_type) + self._event_name = event_response.event_name + self._payload = event_response.payload + self._timestamp = event_response.timestamp + + @property + def event_type(self) -> EventType: + """The type of event received.""" + return self._event_type + + @property + def event_name(self) -> str: + """The name of the event received.""" + return self._event_name + + @property + def timestamp(self) -> datetime: + """The time the event was received.""" + return self._timestamp + + +class AlarmPayload(EventPayload): + def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: + super().__init__(event_response) + json_payload = json.loads(self._payload) + self._alarm_id = json_payload['AlarmId'] + self._active = json_payload['Active'] + self._acknowledged = json_payload['Acknowledged'] + self._acknowledged_at = json_payload['AcknowledgedAt'] + self._occurred_at = json_payload['OccurredAt'] + self._severity_level = json_payload['SeverityLevel'] + self._updated_at = json_payload['UpdatedAt'] + self._channel = json_payload['Channel'] + self._condition = json_payload['Condition'] + self._display_name = json_payload['DisplayName'] + self._description = json_payload['Description'] + + @property + def alarm_id(self) -> str: + """The alarm ID.""" + return self._alarm_id + + @property + def active(self) -> bool: + """Whether the alarm is active.""" + return self._active + + @property + def acknowledged(self) -> bool: + """Whether the alarm has been acknowledged.""" + return self._acknowledged + + @property + def acknowledged_at(self) -> datetime: + """When the alarm was acknowledged.""" + return self._acknowledged_at + + @property + def occurred_at(self) -> datetime: + """When the alarm occurred.""" + return self._occurred_at + + @property + def severity_level(self) -> SeverityLevel: + """The alarm's severity level.""" + return self._severity_level + + @property + def updated_at(self) -> datetime: + """When the alarm was last updated.""" + return self._updated_at + + @property + def channel(self) -> str: + """Channel associated with the alarm.""" + return self._channel + + @property + def condition(self) -> str: + """The alarm condition.""" + return self._condition + + @property + def display_name(self) -> str: + """The alarm's display name.""" + return self._display_name + + @property + def description(self) -> str: + """Description of the alarm.""" + return self._description + + +class FilePayload(EventPayload): + def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: + super().__init__(event_response) + self._file_path = self._payload + + @property + def file_path(self) -> str: + """The TDMS file path.""" + return self._file_path diff --git a/src/flexlogger/automation/_event_type.py b/src/flexlogger/automation/_event_type.py new file mode 100644 index 0000000..0d54d9c --- /dev/null +++ b/src/flexlogger/automation/_event_type.py @@ -0,0 +1,33 @@ +from .proto.EventType_pb2 import EventType as EventType_pb2 +from enum import Enum + + +class EventType(Enum): + """An enumeration describing the different types of events.""" + + ALARM = 1 + LOG_FILE = 2 + TEST_SESSION = 3 + CUSTOM = 4 + + def to_event_type_pb2(self) -> EventType_pb2: + return EVENT_TYPE_MAP.get(self.value) + + @staticmethod + def from_event_type_pb2(event_type: EventType_pb2): + return EVENT_TYPE_PB2_MAP.get(event_type) + + +EVENT_TYPE_MAP = { + 1: EventType_pb2.EVENT_TYPE_ALARM, + 2: EventType_pb2.EVENT_TYPE_LOG_FILE, + 3: EventType_pb2.EVENT_TYPE_TEST_SESSION, + 4: EventType_pb2.EVENT_TYPE_CUSTOM +} + +EVENT_TYPE_PB2_MAP = { + EventType_pb2.EVENT_TYPE_ALARM: EventType.ALARM, + EventType_pb2.EVENT_TYPE_LOG_FILE: EventType.LOG_FILE, + EventType_pb2.EVENT_TYPE_TEST_SESSION: EventType.TEST_SESSION, + EventType_pb2.EVENT_TYPE_CUSTOM: EventType.CUSTOM +} diff --git a/src/flexlogger/automation/_events.py b/src/flexlogger/automation/_events.py new file mode 100644 index 0000000..cd164d4 --- /dev/null +++ b/src/flexlogger/automation/_events.py @@ -0,0 +1,157 @@ +import time + +from ._event_payloads import AlarmPayload +from ._event_payloads import EventPayload +from ._event_payloads import FilePayload +from ._event_type import EventType +from ._flexlogger_error import FlexLoggerError +from .proto import ( + Events_pb2, + Events_pb2_grpc, +) +from .proto.EventType_pb2 import EventType as EventType_pb2 +from concurrent.futures import ThreadPoolExecutor +from google.protobuf.timestamp_pb2 import Timestamp +from grpc import Channel, RpcError +from typing import Callable, Iterator + + +class FlexLoggerEventHandler: + def __init__(self, channel: Channel, + client_id: str, + application, + raise_if_application_closed: Callable[[], None]) -> None: + self._application = application + self._callbacks = [] + self._channel = channel + self._client_id = client_id + self._event_types_per_callback = [] + self._is_subscribed = False + self._raise_if_application_closed = raise_if_application_closed + self._stub = Events_pb2_grpc.FlexLoggerEventsStub(self._channel) + self._thread_executor = ThreadPoolExecutor() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.unregister_from_events() + + def get_registered_events(self) -> [EventType]: + """Gets the list of registered event types. + + Returns + The list of registered event types + + Raises: + FlexLoggerError: if the request failed. + """ + try: + response = self._stub.GetRegisteredEvents(Events_pb2.GetRegisteredEventsRequest(client_id=self._client_id)) + event_types_pb2 = response.event_types + event_types = self._marshal_event_types_pb2(event_types_pb2) + return event_types + except (RpcError, ValueError, AttributeError) as rpc_error: + self._raise_if_application_closed() + raise FlexLoggerError("Failed to get the registered events") from rpc_error + + def unregister_from_events(self) -> None: + """Unregister from events.""" + try: + self._callbacks = [] + self._event_types_per_callback = [] + self._is_subscribed = False + self._stub.UnsubscribeFromEvents(Events_pb2.UnsubscribeFromEventsRequest(client_id=self._client_id)) + except (RpcError, ValueError, AttributeError) as rpc_error: + self._raise_if_application_closed() + raise FlexLoggerError("Failed to unregister from events") from rpc_error + + def register_event_callback(self, callback, event_types=None) -> None: + """Register for events and specify a callback method + + Args: + callback: callback method. The callback method must have the following parameters: + application: Application Reference to the FlexLogger application + event_type: EventType The event type + event_payload: EventPayload The event payload + event_types: List of EventType to subscribe to. + + Raises: + FlexLoggerError: if the callback registration failed. + """ + event_types_parameter = self._marshal_event_types(event_types) + if self._is_subscribed: + try: + self._stub.RegisterEvents(Events_pb2.SubscribeToEventsRequest(client_id=self._client_id, + event_types=event_types_parameter)) + self._add_callback_and_events_to_dictionary(callback, event_types) + except (RpcError, ValueError, AttributeError) as rpc_error: + self._raise_if_application_closed() + raise FlexLoggerError("Failed to register events") from rpc_error + return + + # Add function and event types to respective lists + self._is_subscribed = True + self._add_callback_and_events_to_dictionary(callback, event_types) + event_iterator = self._stub.SubscribeToEvents( + Events_pb2.SubscribeToEventsRequest(client_id=self._client_id, event_types=event_types_parameter)) + self._thread_executor.submit(self._event_handler, event_iterator) + # Wait for the server to register the client ID. + time.sleep(0.15) + + @staticmethod + def _marshal_event_types(event_types): + if event_types is None: + return None + event_types_pb2 = [] + for event_type in event_types: + event_types_pb2.append(event_type.to_event_type_pb2()) + + return event_types_pb2 + + @staticmethod + def _marshal_event_types_pb2(event_types_pb2): + if event_types_pb2 is None: + return None + event_types = [] + for event_type_pb2 in event_types_pb2: + event_types.append(EventType.from_event_type_pb2(event_type_pb2)) + + return event_types + + def _add_callback_and_events_to_dictionary(self, callback, event_types: [EventType]): + # If the event type is not provided, respond to all. + if event_types is None: + event_types = [EventType.ALARM, EventType.LOG_FILE, EventType.TEST_SESSION, EventType.CUSTOM] + + if callback in self._callbacks: + index = self._callbacks.index(callback) + self._event_types_per_callback[index] = event_types + else: + self._callbacks.append(callback) + self._event_types_per_callback.append(event_types) + + def _event_handler(self, event_iterator: Iterator[Events_pb2.SubscribeToEventsResponse]) -> None: + try: + while self._is_subscribed: + event_response = next(event_iterator) + if event_response is not None: + event_type = EventType.from_event_type_pb2(event_response.event_type) + for i in range(len(self._callbacks)): + if event_type in self._event_types_per_callback[i]: + payload = self._create_payload(event_response) + self._callbacks[i](self._application, event_type, payload) + except Exception as error: + self._raise_if_application_closed() + raise FlexLoggerError("Failed to receive event.") from error + + @staticmethod + def _create_payload(event_response: Events_pb2.SubscribeToEventsResponse): + if event_response.event_type == EventType_pb2.EVENT_TYPE_ALARM: + payload = AlarmPayload(event_response) + elif event_response.event_type == EventType_pb2.EVENT_TYPE_LOG_FILE: + payload = FilePayload(event_response) + else: + payload = EventPayload(event_response) + + return payload diff --git a/src/flexlogger/automation/_severity_level.py b/src/flexlogger/automation/_severity_level.py new file mode 100644 index 0000000..e3d339b --- /dev/null +++ b/src/flexlogger/automation/_severity_level.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class SeverityLevel(Enum): + """Alarm Severity Level.""" + + WARNING = 2 + CRITICAL = 4 diff --git a/tests/assets/ProjectWithAlarms/Channel Specification.flxio b/tests/assets/ProjectWithAlarms/Channel Specification.flxio new file mode 100644 index 0000000..dc48c06 --- /dev/null +++ b/tests/assets/ProjectWithAlarms/Channel Specification.flxio @@ -0,0 +1,80 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/assets/ProjectWithAlarms/Logging Specification.flxcfg b/tests/assets/ProjectWithAlarms/Logging Specification.flxcfg new file mode 100644 index 0000000..b649f98 --- /dev/null +++ b/tests/assets/ProjectWithAlarms/Logging Specification.flxcfg @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + gstoll + + + "" + + + + + + + \ No newline at end of file diff --git a/tests/assets/ProjectWithAlarms/ProjectWithAlarms.flxproj b/tests/assets/ProjectWithAlarms/ProjectWithAlarms.flxproj new file mode 100644 index 0000000..9bcfda3 --- /dev/null +++ b/tests/assets/ProjectWithAlarms/ProjectWithAlarms.flxproj @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/assets/ProjectWithAlarms/Screen.flxscr b/tests/assets/ProjectWithAlarms/Screen.flxscr new file mode 100644 index 0000000..bdc6a9b --- /dev/null +++ b/tests/assets/ProjectWithAlarms/Screen.flxscr @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/assets/ProjectWithAlarms/Test Specification.flxtest b/tests/assets/ProjectWithAlarms/Test Specification.flxtest new file mode 100644 index 0000000..4758fdb --- /dev/null +++ b/tests/assets/ProjectWithAlarms/Test Specification.flxtest @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..f317898 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,153 @@ +from .utils import open_project +import datetime +from flexlogger.automation import ( + Application, + EventPayload, + EventNames, + EventType +) +import pytest # type: ignore + +received_event_type = None +received_event_payload = None + + +def generic_event_handler(application: Application, event_type: EventType, payload: EventPayload): + """Event Handler + This method will be called when a file event is fired. + + Args: + application: Application Reference to the application, so that you can access the project, session, etc + event_type: EventType The event type + payload: EventPayload The event payload + """ + global received_event_type + global received_event_payload + + received_event_type = event_type + received_event_payload = payload + print("Event received: {}".format(event_type)) + + +class TestEvents: + @staticmethod + def wait_for_registered_events(event_handler, timeout=3) -> [EventType]: + start_time = datetime.datetime.now() + while True: + registered_events = event_handler.get_registered_events() + current_time = datetime.datetime.now() + delta = current_time - start_time + if len(registered_events) > 0 or delta.total_seconds() > timeout: + break + + return registered_events + + @staticmethod + def wait_for_event(event_type=None, event_name=None, timeout=5) -> bool: + global received_event_type + global received_event_payload + + start_time = datetime.datetime.now() + while True: + correct_event_type = received_event_type == event_type if event_type is not None else True + if event_name is not None: + correct_event_name = False if received_event_payload is None else received_event_payload.event_name == event_name + else: + correct_event_name = True + + current_time = datetime.datetime.now() + delta = current_time - start_time + if (correct_event_type and correct_event_name) or delta.total_seconds() > timeout: + break + + return correct_event_type and correct_event_name + + @staticmethod + def reset_event() -> [None]: + global received_event_type + global received_event_payload + received_event_type = None + received_event_payload = None + + @pytest.mark.integration # type: ignore + def test__register_events__events_registered(self, app: Application) -> None: + self.reset_event() + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.ALARM]) + registered_events = self.wait_for_registered_events(event_handler) + assert EventType.ALARM in registered_events + assert event_handler._is_subscribed + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test__unregister_events__no_events_registered(self, app: Application) -> None: + self.reset_event() + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.ALARM]) + event_handler.unregister_from_events() + registered_events = self.wait_for_registered_events(event_handler) + assert len(registered_events) == 0 + assert not event_handler._is_subscribed + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test__start_session__session_event_received(self, app: Application) -> None: + self.reset_event() + with open_project(app, "ProjectWithProducedData") as project: + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.TEST_SESSION]) + session = project.test_session + session.start() + event_received = self.wait_for_event(EventType.TEST_SESSION, EventNames.TEST_STARTED) + assert event_received + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test__start_session__log_file_created_event_received(self, app: Application) -> None: + self.reset_event() + with open_project(app, "ProjectWithProducedData") as project: + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.LOG_FILE]) + session = project.test_session + session.start() + event_received = self.wait_for_event(EventType.LOG_FILE, EventNames.LOG_FILE_CREATED) + assert event_received + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test__stop_session__session_event_received(self, app: Application) -> None: + self.reset_event() + with open_project(app, "ProjectWithProducedData") as project: + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.TEST_SESSION]) + session = project.test_session + session.start() + session.stop() + event_received = self.wait_for_event(EventType.TEST_SESSION, EventNames.TEST_STOPPED) + assert event_received + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test__stop_session__log_file_closed_event_received(self, app: Application) -> None: + self.reset_event() + with open_project(app, "ProjectWithProducedData") as project: + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.LOG_FILE]) + session = project.test_session + session.start() + session.stop() + event_received = self.wait_for_event(EventType.LOG_FILE, EventNames.LOG_FILE_CLOSED) + assert event_received + event_handler.unregister_from_events() + + @pytest.mark.integration # type: ignore + def test_open_project_with_alarms__start_session__alarm_event_received(self, app: Application) -> None: + self.reset_event() + with open_project(app, "ProjectWithAlarms") as project: + event_handler = app.event_handler + event_handler.register_event_callback(generic_event_handler, [EventType.ALARM]) + session = project.test_session + session.start() + event_received = self.wait_for_event(EventType.ALARM) + assert event_received + event_handler.unregister_from_events()