From b99d9ac0b878edea667ba4a7349de1bfed37f556 Mon Sep 17 00:00:00 2001 From: Hanne Moa Date: Wed, 23 Aug 2023 10:19:44 +0200 Subject: [PATCH] Add a SessionAdapter Prerquisite for improving configuration of the manager. --- src/zinolib/controllers/zino1.py | 129 +++++++++++++++++++----- tests/test_zinolib_controllers_zino1.py | 33 ++++-- 2 files changed, 130 insertions(+), 32 deletions(-) diff --git a/src/zinolib/controllers/zino1.py b/src/zinolib/controllers/zino1.py index 63c54bb..bb2ff32 100644 --- a/src/zinolib/controllers/zino1.py +++ b/src/zinolib/controllers/zino1.py @@ -1,7 +1,11 @@ """ Get a live Zino 1 session to use:: + +Manual: + > from zinolib.ritz import ritz, parse_tcl_config + > from zinolib.zino1 import Zino1EventManager > conf = parse_tcl_config("~/.ritz.tcl")['default'] > session = ritz( conf['Server'], @@ -9,12 +13,17 @@ password=conf['Secret'], timeout=30, ) - > session.connect() + > event_manager = Zino1EventManager() + > event_manager.session.request = session + > event_manager.connect() -Now you can use the session when initializing Zino1EventManager:: +Convenience: + > from zinolib.ritz import parse_tcl_config > from zinolib.zino1 import Zino1EventManager - > event_manager = Zino1EventManager(session) + > conf = parse_tcl_config("~/.ritz.tcl")['default'] + > event_manager = Zino1EventManager().configure(conf) + > event_manager.connect() To get a list of currently available events:: @@ -53,7 +62,7 @@ from .base import EventManager from ..event_types import EventType, Event, HistoryEntry, LogEntry, AdmState -from ..ritz import ProtocolError +from ..ritz import ProtocolError, ritz, notifier HistoryDict = TypedDict( @@ -68,10 +77,68 @@ ) +DEFAULT_TIMEOUT = 30 + + def convert_timestamp(timestamp: int) -> datetime: return datetime.fromtimestamp(timestamp, timezone.utc) +class SessionAdapter: + + class _Session: + push = None + request = None + + # should live in .config, not here + class _Config: + server: str + username: str + password: str + timeout: str + + @classmethod + def create_session(cls, config=None): + config = cls._setup_config(config) + session = cls._setup_request(cls._Session, config) + return session + + # should live in .config, not here + @classmethod + def _setup_config(cls, config): + if not isinstance(config, (dict, cls._Config)): + raise ValueError("Wrong format for config") + if isinstance(config, dict): + config = cls._Config() + config.server = config['Server'] + config.username = config['User'] + config.password = config['Secret'] + config.timeout = DEFAULT_TIMEOUT + return config + + @staticmethod + def _setup_request(session, config): + session.request = ritz( + config.server, + username=config.user, + password=config.secret, + timeout=config.timeout, + ) + return session + + @staticmethod + def connect_session(session): + session.request.connect() + session.push = notifier(session) + session.push.connect() # ntie + return session + + @staticmethod + def close_session(session): + session.push._sock.close() + session.request.close() + + class EventAdapter: FIELD_MAP = { "state": "adm_state", @@ -90,8 +157,8 @@ class EventAdapter: } @staticmethod - def get_attrlist(session, event_id: int): - return session.get_raw_attributes(event_id) + def get_attrlist(request, event_id: int): + return request.get_raw_attributes(event_id) @classmethod def attrlist_to_attrdict(cls, attrlist: Iterable[str]): @@ -120,20 +187,20 @@ def convert_values(cls, attrdict): return attrdict @staticmethod - def set_admin_state(session, event: EventType, state: AdmState) -> bool: - return session.set_state(event.id, state.value) + def set_admin_state(request, event: EventType, state: AdmState) -> bool: + return request.set_state(event.id, state.value) @staticmethod - def get_event_ids(session): - return session.get_caseids() + def get_event_ids(request): + return request.get_caseids() class HistoryAdapter: SYSTEM_USER = "monitor" @staticmethod - def get_history(session, event_id: int): - return session.get_raw_history(event_id).data + def get_history(request, event_id: int): + return request.get_raw_history(event_id).data @classmethod def parse_response(cls, history_data: Iterable[str]) -> list[HistoryDict]: @@ -184,11 +251,11 @@ def parse_response(cls, history_data: Iterable[str]) -> list[HistoryDict]: return history_list @classmethod - def add(cls, session, message: str, event: EventType) -> Optional[EventType]: - success = session.add_history(event.id, message) + def add(cls, request, message: str, event: EventType) -> Optional[EventType]: + success = request.add_history(event.id, message) if success: # fetch history - raw_history = cls.get_history(session, event.id) + raw_history = cls.get_history(request, event.id) parsed_history = cls.parse_response(raw_history) new_history = HistoryEntry.create_list(parsed_history) if new_history != event.history: @@ -199,8 +266,8 @@ def add(cls, session, message: str, event: EventType) -> Optional[EventType]: class LogAdapter: @staticmethod - def get_log(session, event_id: int) -> list[str]: - return session.get_raw_log(event_id).data + def get_log(request, event_id: int) -> list[str]: + return request.get_raw_log(event_id).data @staticmethod def parse_response(log_data: Iterable[str]) -> list[LogDict]: @@ -230,6 +297,7 @@ def parse_response(log_data: Iterable[str]) -> list[LogDict]: class Zino1EventManager(EventManager): # Easily replaced in order to ease testing + _session_adapter = SessionAdapter _event_adapter = EventAdapter _history_adapter = HistoryAdapter _log_adapter = LogAdapter @@ -242,6 +310,19 @@ def rename_exception(self, function, *args): except ProtocolError as e: raise self.ManagerException(e) + def check_session(self): + if not getattr(self.session, 'request', None): + raise ValueError + + @classmethod + def configure(cls, config): + session = cls._session_adapter.create_session(config) + return cls(session) + + def connect(self): + self.check_session() + self.session = self._session_adapter.connect_session(self.session) + def clear_flapping(self, event: EventType): """Clear flapping state of a PortStateEvent @@ -250,12 +331,12 @@ def clear_flapping(self, event: EventType): c.clear_clapping() """ if event.type == Event.Type.PortState: - return self.session.clear_flapping(event.router, event.ifindex) + return self.session.request.clear_flapping(event.router, event.ifindex) return None def get_events(self): self.check_session() - for event_id in self._event_adapter.get_event_ids(self.session): + for event_id in self._event_adapter.get_event_ids(self.session.request): try: event = self.create_event_from_id(event_id) except self.ManagerException: @@ -265,7 +346,7 @@ def get_events(self): def create_event_from_id(self, event_id: int): self.check_session() - attrlist = self.rename_exception(self._event_adapter.get_attrlist, self.session, event_id) + attrlist = self.rename_exception(self._event_adapter.get_attrlist, self.session.request, event_id) attrdict = self._event_adapter.attrlist_to_attrdict(attrlist) attrdict = self._event_adapter.convert_values(attrdict) return Event.create(attrdict) @@ -281,7 +362,7 @@ def get_updated_event_for_id(self, event_id): def change_admin_state_for_id(self, event_id, admin_state: AdmState) -> Optional[Event]: self.check_session() event = self._get_event(event_id) - success = self._event_adapter.set_admin_state(self.session, event, admin_state) + success = self._event_adapter.set_admin_state(self.session.request, event, admin_state) if success: event = self.get_updated_event_for_id(event_id) self._set_event(event) @@ -290,14 +371,14 @@ def change_admin_state_for_id(self, event_id, admin_state: AdmState) -> Optional def get_history_for_id(self, event_id: int) -> list[HistoryEntry]: self.check_session() - raw_history = self.rename_exception(self._history_adapter.get_history, self.session, event_id) + raw_history = self.rename_exception(self._history_adapter.get_history, self.session.request, event_id) parsed_history = self._history_adapter.parse_response(raw_history) return HistoryEntry.create_list(parsed_history) def add_history_entry_for_id(self, event_id: int, message) -> Optional[EventType]: self.check_session() event = self._get_event(event_id) - success = self._history_adapter.add(self.session, message, event) + success = self._history_adapter.add(self.session.request, message, event) if success: event = self.get_updated_event_for_id(event_id) self._set_event(event) @@ -306,6 +387,6 @@ def add_history_entry_for_id(self, event_id: int, message) -> Optional[EventType def get_log_for_id(self, event_id: int) -> list[LogEntry]: self.check_session() - raw_log = self.rename_exception(self._log_adapter.get_log, self.session, event_id) + raw_log = self.rename_exception(self._log_adapter.get_log, self.session.request, event_id) parsed_log = self._log_adapter.parse_response(raw_log) return LogEntry.create_list(parsed_log) diff --git a/tests/test_zinolib_controllers_zino1.py b/tests/test_zinolib_controllers_zino1.py index 11d8ef9..e8a1220 100644 --- a/tests/test_zinolib_controllers_zino1.py +++ b/tests/test_zinolib_controllers_zino1.py @@ -2,7 +2,7 @@ from datetime import datetime, timedelta, timezone from zinolib.event_types import AdmState, Event, HistoryEntry, LogEntry -from zinolib.controllers.zino1 import EventAdapter, HistoryAdapter, LogAdapter, Zino1EventManager +from zinolib.controllers.zino1 import EventAdapter, HistoryAdapter, LogAdapter, SessionAdapter, Zino1EventManager raw_event_id = 139110 raw_attrlist = [ @@ -41,7 +41,7 @@ class FakeEventAdapter: @staticmethod - def get_attrlist(session, event_id: int): + def get_attrlist(request, event_id: int): return raw_attrlist @classmethod @@ -53,26 +53,39 @@ def convert_values(cls, attrdict): return EventAdapter.convert_values(attrdict) @staticmethod - def get_event_ids(session): + def get_event_ids(request): return [raw_event_id] class FakeHistoryAdapter(HistoryAdapter): @staticmethod - def get_history(session, event_id: int): + def get_history(request, event_id: int): return raw_history class FakeLogAdapter(LogAdapter): @staticmethod - def get_log(session, event_id: int): + def get_log(request, event_id: int): return raw_log +class FakeSessionAdapter(SessionAdapter): + + @classmethod + def _setup_config(cls, config): + pass + + @staticmethod + def _setup_request(session, config): + session.request = 'foo' # needs to be truthy + return session + + class FakeZino1EventManager(Zino1EventManager): _event_adapter = FakeEventAdapter _history_adapter = FakeHistoryAdapter _log_adapter = FakeLogAdapter + _session_adapter = FakeSessionAdapter def __init__(self, session=None): super().__init__(session) @@ -80,8 +93,12 @@ def __init__(self, session=None): class Zino1EventManagerTest(unittest.TestCase): + def init_manager(self): + zino1 = FakeZino1EventManager.configure(None) + return zino1 + def test_get_events(self): - zino1 = FakeZino1EventManager('foo') + zino1 = self.init_manager() self.assertEqual(len(zino1.events), 0) zino1.get_events() self.assertEqual(len(zino1.events), 1) @@ -89,7 +106,7 @@ def test_get_events(self): self.assertEqual(zino1.events[raw_event_id].id, raw_event_id) def test_get_history_for_id(self): - zino1 = FakeZino1EventManager('foo') + zino1 = self.init_manager() history_list = zino1.get_history_for_id(4567) expected_history_list = [ HistoryEntry( @@ -116,7 +133,7 @@ def test_get_history_for_id(self): self.assertEqual(history_list, expected_history_list) def test_get_log_for_id(self): - zino1 = FakeZino1EventManager('foo') + zino1 = self.init_manager() log_list = zino1.get_log_for_id(4567) expected_log_list = [ LogEntry(