From 947b5059a03a5634b17662176dc41918edb768a9 Mon Sep 17 00:00:00 2001 From: Toby Godfrey Date: Sat, 23 Dec 2023 12:50:11 +0000 Subject: [PATCH] comments and new fns --- examples/dynamic_list_ex.py | 4 +- examples/fixed_list_ex.py | 4 +- src/swarmnet/swarmnet.py | 118 ++++++++++++++++++++++++++++-------- 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/examples/dynamic_list_ex.py b/examples/dynamic_list_ex.py index 4d265c4..f9c6a3f 100644 --- a/examples/dynamic_list_ex.py +++ b/examples/dynamic_list_ex.py @@ -33,7 +33,9 @@ def text_recv(msg: Optional[str]) -> None: while(not get_cont()): ctrl.send("READY") sleep(1) + + ctrl.clear_rx_queue() - ctrl.send("TEXT hello world")# + ctrl.send("TEXT hello world") print("Finished") ctrl.kill() \ No newline at end of file diff --git a/examples/fixed_list_ex.py b/examples/fixed_list_ex.py index af233d3..e7b4d92 100644 --- a/examples/fixed_list_ex.py +++ b/examples/fixed_list_ex.py @@ -32,7 +32,9 @@ def text_recv(msg: Optional[str]) -> None: while(not get_cont()): ctrl.send("READY") - sleep(1) + #sleep(0.5) + + ctrl.clear_rx_queue() ctrl.send("TEXT hello world") print("Finished") diff --git a/src/swarmnet/swarmnet.py b/src/swarmnet/swarmnet.py index f3131d8..9c55986 100644 --- a/src/swarmnet/swarmnet.py +++ b/src/swarmnet/swarmnet.py @@ -13,15 +13,23 @@ log = logger.Logger("controller") class SwarmNet: + + """! + @brief SwarmNet controller constructor + + @param mapping A mapping of command token to parsing function + @param port The communication port (defaults to 51000) + @param device_list A static device list of (IP, Port) (If not provided device discovery will be used) + """ def __init__(self, mapping: Dict[str, Callable[[Optional[str]], None]], - device_retries: int = 3, - device_refresh_interval: int = 60, + # device_retries: int = 3, + # device_refresh_interval: int = 60, port: int = 51000, device_list: List[Tuple[str, int]] = []): self.fn_map = mapping - self.discovery_retries = device_retries - self.discovery_interval = device_refresh_interval + # self.discovery_retries = device_retries + # self.discovery_interval = device_refresh_interval self.port = port if(device_list != []): @@ -40,7 +48,10 @@ def __init__(self, log.info(f"This address is {self.addr}:{self.port}") log.success("SwarmNet controller started") - + + """! + @brief Starts the required threads + """ def start(self) -> None: self.swarm_list_lock = threading.Lock() self.received_ids = [] @@ -49,21 +60,21 @@ def start(self) -> None: self.tx_queue = queue.Queue(32) self.fn_map["JOIN"] = self._register_new_member self.parser = msg_parser.MessageParser(self.fn_map, self.rx_queue) - self.receiver = receiver.Receiver(self.addr, self.port, self.add_device, self.has_seen_message, self.append_seen_messages, rx_queue=self.rx_queue, tx_queue=self.tx_queue) + self.receiver = receiver.Receiver(self.addr, self.port, self.add_device, self._has_seen_message, self._append_seen_messages, rx_queue=self.rx_queue, tx_queue=self.tx_queue) self.sender = sender.Sender(self.addr, self.tx_queue, self.remove_device) self.broadcaster = broadcaster.Broadcaster(self.addr, self.port, self.rx_queue, self.add_device, self.fixed_list, self.swarm_list) - self.parse_thread = threading.Thread(target=parse_thread_target, args=[self]) + self.parse_thread = threading.Thread(target=_parse_thread_target, args=[self]) self.parse_thread_exit_request = False self.parse_thread.start() log.info("Parser thread started") - self.receiver_thread = threading.Thread(target=receiver_thread_target, args=[self]) + self.receiver_thread = threading.Thread(target=_receiver_thread_target, args=[self]) self.receiver_thread_exit_request = False self.receiver_thread.start() log.info("Receiver thread started") - self.sender_thread = threading.Thread(target=sender_thread_target, args=[self]) + self.sender_thread = threading.Thread(target=_sender_thread_target, args=[self]) self.sender_thread_exit_request = False self.sender_thread.start() log.info("Sender thread started") @@ -71,6 +82,9 @@ def start(self) -> None: log.info_header("Beginning broadcast of JOIN command") self.broadcast(f"JOIN {self.addr} {self.port}") + """! + @brief Kills all threads in a safe way + """ def kill(self) -> None: self.broadcaster_thread_exit_request = True self.parse_thread_exit_request = True @@ -82,18 +96,44 @@ def kill(self) -> None: self.sender_thread.join() log.warn("All threads have been killed") + + """! + @brief Clears all messages waiting to be parsed + """ + def clear_rx_queue(self): + with self.rx_queue.mutex: + self.rx_queue.queue.clear() + + """! + @brief Clears all messages waiting to be sent + """ + def clear_tx_queue(self): + with self.tx_queue.mutex: + self.tx_queue.queue.clear() def _register_new_member(self, msg: Optional[str]) -> None: addr = msg.split(" ", 1)[0] port = msg.split(" ", 1)[1] self.add_device((addr, int(port))) + """! + @brief Get the list of connected devices + + @returns A list of devices + """ def get_devices(self) -> List[Tuple[str, int]]: self.swarm_list_lock.acquire() ds = self.swarm_list self.swarm_list_lock.release() return ds + """! + @brief Set the list of connected devices + + This does not work if dynamic device discovery is disabled + + @param ds The list of devices in the form of (IP, Port) + """ def set_devices(self, ds: List[Tuple[str, int]]) -> None: if(self.fixed_list): return @@ -101,7 +141,14 @@ def set_devices(self, ds: List[Tuple[str, int]]) -> None: self.swarm_list_lock.acquire() self.swarm_list = ds self.swarm_list_lock.release() - + + """! + @brief Append a device to the list of connected devices + + This does not work if dynamic device discovery is disabled + + @param d The device (IP, Port) + """ def add_device(self, d: Tuple[str, int]) -> None: if(self.fixed_list): return @@ -116,7 +163,14 @@ def add_device(self, d: Tuple[str, int]) -> None: self.swarm_list.append(d) self.swarm_list_lock.release() log.info(f"New device added at {d[0]}:{d[1]}") - + + """! + @brief Remove a device from the list of connected devices + + This does not work if dynamic device discovery is disabled + + @param d The device to remove in the form (IP, Port) + """ def remove_device(self, d: Tuple[str, int]) -> None: if(self.fixed_list): return @@ -125,46 +179,63 @@ def remove_device(self, d: Tuple[str, int]) -> None: self.swarm_list.remove(d) self.swarm_list_lock.release() log.warn(f"Device removed at {d[0]}:{d[1]}") - - def get_seen_messages(self) -> List[str]: + + def _get_seen_messages(self) -> List[str]: self.received_ids_lock.acquire() ms = self.received_ids self.received_ids_lock.release() return ms - def set_seen_messages(self, ms: List[str]) -> None: + def _set_seen_messages(self, ms: List[str]) -> None: self.received_ids_lock.acquire() self.received_ids = ms self.received_ids_lock.release() - def append_seen_messages(self, m: str) -> None: + def _append_seen_messages(self, m: str) -> None: self.received_ids_lock.acquire() self.received_ids.append(m) self.received_ids_lock.release() - def has_seen_message(self, m: str) -> bool: + def _has_seen_message(self, m: str) -> bool: self.received_ids_lock.acquire() b = m in self.received_ids self.received_ids_lock.release() return b + """! + @brief Set the logging level + + @param lv The minimum log level to output + """ def set_log_level(lv: logger.Logger.Log_Level) -> None: log.set_log_level(lv) def _calc_header(self) -> str: return f"{time.time()}/{self.addr}/{self.port}" - + + """! + @brief Send a message + + @param msg The message to send + """ def send(self, msg: str): header = self._calc_header() - self.append_seen_messages(header) + self._append_seen_messages(header) self.tx_queue.put(f"{header}:{msg}", block=True) - + + """! + @brief Broadcast a message to all IPs in the subnet + + If a static device list is provided, this is no different to the `send` method + + @param The message to send + """ def broadcast(self, msg: str): header = self._calc_header() - self.append_seen_messages(header) + self._append_seen_messages(header) self.broadcaster.broadcast(f"{header}:{msg}") -def parse_thread_target(ctrl: SwarmNet): +def _parse_thread_target(ctrl: SwarmNet): while(not ctrl.parse_thread_exit_request): if not ctrl.rx_queue.empty(): ctrl.parser.parse_msg() @@ -172,13 +243,12 @@ def parse_thread_target(ctrl: SwarmNet): time.sleep(0.01) log.warn("Parse thread killed") -def receiver_thread_target(ctrl: SwarmNet): +def _receiver_thread_target(ctrl: SwarmNet): while(not ctrl.receiver_thread_exit_request): ctrl.receiver.accept_connection() log.warn("Receiver thread killed") - -def sender_thread_target(ctrl: SwarmNet): +def _sender_thread_target(ctrl: SwarmNet): while(not ctrl.sender_thread_exit_request): if not ctrl.tx_queue.empty(): ctrl.sender.flush_queue(ctrl.get_devices())