From bcb6d9855e141b8f6634fb21c5c2ebb323a7be40 Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Thu, 28 Mar 2024 18:34:50 +0100 Subject: [PATCH 1/6] add simple function for netcat-like send behaviour --- lib/vsc/utils/netcat.py | 54 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 lib/vsc/utils/netcat.py diff --git a/lib/vsc/utils/netcat.py b/lib/vsc/utils/netcat.py new file mode 100644 index 0000000..91d177d --- /dev/null +++ b/lib/vsc/utils/netcat.py @@ -0,0 +1,54 @@ +# +# Copyright 2024-2024 Ghent University +# +# This file is part of vsc-utils, +# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en), +# with support of Ghent University (http://ugent.be/hpc), +# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be), +# the Flemish Research Foundation (FWO) (http://www.fwo.be/en) +# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en). +# +# https://github.com/hpcugent/vsc-utils +# +# vsc-utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU Library General Public License as +# published by the Free Software Foundation, either version 2 of +# the License, or (at your option) any later version. +# +# vsc-utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU Library General Public License +# along with vsc-utils. If not, see . +# +""" +Utilities for sending data like netcat utility. + +@author Wouter Depypere (Ghent University) +""" + +import socket + +def connect_and_send(host, port, data, timeout=10): + """ + connect to a host with given timeout and write data to it. + data can be: + - bytes + - string + - list of strings + + timeout value is counted for the entire connection so includes + sending of data. + """ + # convert list to string if needed + if isinstance(data, list): + data = "".join(data) + + # make sure data is bytes + if isinstance(data, str): + data = data.encode('utf-8') + + with socket.create_connection((host, port), timeout=timeout) as sock: + sock.sendall(data) From 7a0d8adbd345bd9f8ff9777eeacc49c8a48d9ffe Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Thu, 28 Mar 2024 18:39:01 +0100 Subject: [PATCH 2/6] add basic logging --- lib/vsc/utils/netcat.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/vsc/utils/netcat.py b/lib/vsc/utils/netcat.py index 91d177d..c3d93f7 100644 --- a/lib/vsc/utils/netcat.py +++ b/lib/vsc/utils/netcat.py @@ -29,6 +29,7 @@ @author Wouter Depypere (Ghent University) """ +import logging import socket def connect_and_send(host, port, data, timeout=10): @@ -50,5 +51,7 @@ def connect_and_send(host, port, data, timeout=10): if isinstance(data, str): data = data.encode('utf-8') + logging.debug("connection to %s:%s with a timeout of %s", host, port, timeout) with socket.create_connection((host, port), timeout=timeout) as sock: + logging.debug("sending data: %s", data) sock.sendall(data) From 4f24492bd1aca2bb816bff26e77166034a6d08c5 Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Thu, 28 Mar 2024 18:39:53 +0100 Subject: [PATCH 3/6] bump version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8fe1c4e..8b058ad 100755 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ ] PACKAGE = { - 'version': '2.2.6', + 'version': '2.2.7', 'author': [ag, sdw], 'maintainer': [ag, sdw], 'excluded_pkgs_rpm': ['vsc', 'vsc.utils'], # vsc is default, vsc.utils is provided by vsc-base From 57e15638ee0d3d0da37dbb2b3798c1efcc6cdac6 Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Thu, 28 Mar 2024 18:44:01 +0100 Subject: [PATCH 4/6] assume newlines --- lib/vsc/utils/netcat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vsc/utils/netcat.py b/lib/vsc/utils/netcat.py index c3d93f7..4f0e271 100644 --- a/lib/vsc/utils/netcat.py +++ b/lib/vsc/utils/netcat.py @@ -45,7 +45,7 @@ def connect_and_send(host, port, data, timeout=10): """ # convert list to string if needed if isinstance(data, list): - data = "".join(data) + data = "\n".join(data) # make sure data is bytes if isinstance(data, str): From 5acf86ed68b285d8f432c39ce20449082a828123 Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Thu, 28 Mar 2024 18:55:24 +0100 Subject: [PATCH 5/6] remove string useage --- lib/vsc/utils/netcat.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/vsc/utils/netcat.py b/lib/vsc/utils/netcat.py index 4f0e271..19df0f9 100644 --- a/lib/vsc/utils/netcat.py +++ b/lib/vsc/utils/netcat.py @@ -38,15 +38,10 @@ def connect_and_send(host, port, data, timeout=10): data can be: - bytes - string - - list of strings timeout value is counted for the entire connection so includes sending of data. """ - # convert list to string if needed - if isinstance(data, list): - data = "\n".join(data) - # make sure data is bytes if isinstance(data, str): data = data.encode('utf-8') From 1c0e21c4b4bf617d917ed708006fa5e6bea7644d Mon Sep 17 00:00:00 2001 From: Wouter Depypere Date: Wed, 10 Apr 2024 15:43:52 +0200 Subject: [PATCH 6/6] replace mini netcat with graphyte --- lib/vsc/utils/graphyte.py | 266 ++++++++++++++++++++++++++++++++++++++ lib/vsc/utils/netcat.py | 52 -------- 2 files changed, 266 insertions(+), 52 deletions(-) create mode 100644 lib/vsc/utils/graphyte.py delete mode 100644 lib/vsc/utils/netcat.py diff --git a/lib/vsc/utils/graphyte.py b/lib/vsc/utils/graphyte.py new file mode 100644 index 0000000..db15d56 --- /dev/null +++ b/lib/vsc/utils/graphyte.py @@ -0,0 +1,266 @@ +# +# Copyright 2024-2024 Ghent University +# +# This file is part of vsc-utils, +# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en), +# with support of Ghent University (http://ugent.be/hpc), +# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be), +# the Flemish Research Foundation (FWO) (http://www.fwo.be/en) +# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en). +# +# https://github.com/hpcugent/vsc-utils +# +# vsc-utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU Library General Public License as +# published by the Free Software Foundation, either version 2 of +# the License, or (at your option) any later version. +# +# vsc-utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU Library General Public License +# along with vsc-utils. If not, see . +# +""" +The graphyte project lives on GitHub here: +https://github.com/benhoyt/graphyte + +It is redistributed here for ease of use. It has been adapted to vsc standards. + +Send data to Graphite metrics server (synchronously or on a background thread). +This code is licensed under a permissive MIT license -- see LICENSE.txt in the +original github repository. + +""" + +import atexit +import logging +import queue +import socket +import threading +import time + +__all__ = ['Sender', 'init', 'send'] + +__version__ = '1.7.1' + +default_sender = None +logger = logging.getLogger(__name__) + + +def _has_whitespace(value): + return not value or value.split(None, 1)[0] != value + + +class Sender: + def __init__(self, host, port=2003, prefix=None, timeout=5, interval=None, + queue_size=None, log_sends=False, protocol='tcp', + batch_size=1000, tags=None, raise_send_errors=False): + """Initialize a Sender instance, starting the background thread to + send messages at given interval (in seconds) if "interval" is not + None. Send at most "batch_size" messages per socket send operation. + Default protocol is TCP; use protocol='udp' for UDP. + + Use "tags" to specify common or default tags for this Sender, which + are sent with each metric along with any tags passed to send(). + """ + + self.host = host + self.port = port + self.prefix = prefix + self.timeout = timeout + self.interval = interval + self.log_sends = log_sends + self.protocol = protocol + self.batch_size = batch_size + if tags is None: + self.tags = {} + else: + self.tags = tags + self.raise_send_errors = raise_send_errors + + if self.interval is not None: + if raise_send_errors: + raise ValueError('raise_send_errors must be disabled when interval is set') + if queue_size is None: + queue_size = int(round(interval)) * 100 + self._queue = queue.Queue(maxsize=queue_size) + self._thread = threading.Thread(target=self._thread_loop) + self._thread.daemon = True + self._thread.start() + atexit.register(self.stop) + + def __del__(self): + self.stop() + + def stop(self): + """Tell the sender thread to finish and wait for it to stop sending + (should be at most "timeout" seconds). + """ + if self.interval is not None: + self._queue.put_nowait(None) + self._thread.join() + self.interval = None + + def build_message(self, metric, value, timestamp, tags=None): + """Build a Graphite message to send and return it as a byte string.""" + if tags is None: + tags = {} + if _has_whitespace(metric): + raise ValueError('"metric" must not have whitespace in it') + if not isinstance(value, (int, float)): + raise TypeError('"value" must be an int or a float, not a %s', + type(value).__name__) + + all_tags = self.tags.copy() + all_tags.update(tags) + tags_strs = [f';{k}={v}' for k, v in sorted(all_tags.items())] + if any(_has_whitespace(t) for t in tags_strs): + raise ValueError('"tags" keys and values must not have whitespace in them') + tags_suffix = ''.join(tags_strs) + + prefix = self.prefix + '.' if self.prefix else '' + message = f"{prefix}{metric}{tags_suffix} {value} {int(round(timestamp))}\n" + message = message.encode('utf-8') + return message + + def send(self, metric, value, timestamp=None, tags=None): + """Send given metric and (int or float) value to Graphite host. + Performs send on background thread if "interval" was specified when + creating this Sender. + + If a "tags" dict is specified, send the tags to the Graphite host along + with the metric, in addition to any default tags passed to Sender() -- + the tags argument here overrides any default tags. + """ + if tags is None: + tags = {} + if timestamp is None: + timestamp = time.time() + message = self.build_message(metric, value, timestamp, tags=tags) + + if self.interval is None: + self.send_socket(message) + else: + try: + self._queue.put_nowait(message) + except queue.Full: + logger.error('queue full when sending %s', message) + + def send_message(self, message): + if self.protocol == 'tcp': + with socket.create_connection((self.host, self.port), self.timeout) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.sendall(message) + elif self.protocol == 'udp': + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.sendto(message, (self.host, self.port)) + else: + raise ValueError('"protocol" must be \'tcp\' or \'udp\', not %s', self.protocol) + + def send_socket(self, message): + """Low-level function to send message bytes to this Sender's socket. + You should usually call send() instead of this function (unless you're + subclassing or writing unit tests). + """ + if self.log_sends: + start_time = time.time() + try: + self.send_message(message) + except Exception as error: + if self.raise_send_errors: + raise + logger.error('error sending message %s: %s', message, error) + else: + if self.log_sends: + elapsed_time = time.time() - start_time + logger.info('sent message %s to %s:%s in %s seconds', + message, self.host, self.port, elapsed_time) + + def _thread_loop(self): + """Background thread used when Sender is in asynchronous/interval mode.""" + last_check_time = time.time() + messages = [] + while True: + # Get first message from queue, blocking until the next time we + # should be sending + time_since_last_check = time.time() - last_check_time + time_till_next_check = max(0, self.interval - time_since_last_check) + try: + message = self._queue.get(timeout=time_till_next_check) + except queue.Empty: + pass + else: + if message is None: + # None is the signal to stop this background thread + break + messages.append(message) + + # Get any other messages currently on queue without blocking, + # paying attention to None ("stop thread" signal) + should_stop = False + while True: + try: + message = self._queue.get_nowait() + except queue.Empty: + break + if message is None: + should_stop = True + break + messages.append(message) + if should_stop: + break + + # If it's time to send, send what we've collected + current_time = time.time() + if current_time - last_check_time >= self.interval: + last_check_time = current_time + for i in range(0, len(messages), self.batch_size): + batch = messages[i:i + self.batch_size] + self.send_socket(b''.join(batch)) + messages = [] + + # Send any final messages before exiting thread + for i in range(0, len(messages), self.batch_size): + batch = messages[i:i + self.batch_size] + self.send_socket(b''.join(batch)) + +def init(*args, **kwargs): + """Initialize default Sender instance with given args.""" + global default_sender + default_sender = Sender(*args, **kwargs) + + +def send(*args, **kwargs): + """Send message using default Sender instance.""" + default_sender.send(*args, **kwargs) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('metric', + help='name of metric to send') + parser.add_argument('value', type=float, + help='numeric value to send') + parser.add_argument('-s', '--server', default='localhost', + help='hostname of Graphite server to send to, default %(default)s') + parser.add_argument('-p', '--port', type=int, default=2003, + help='port to send message to, default %(default)d') + parser.add_argument('-u', '--udp', action='store_true', + help='send via UDP instead of TCP') + parser.add_argument('-t', '--timestamp', type=int, + help='Unix timestamp for message (defaults to current time)') + parser.add_argument('-q', '--quiet', action='store_true', + help="quiet mode (don't log send to stdout)") + args = parser.parse_args() + + if not args.quiet: + logging.basicConfig(level=logging.INFO, format='%(message)s') + + sender = Sender(args.server, port=args.port, log_sends=not args.quiet, + protocol='udp' if args.udp else 'tcp') + sender.send(args.metric, args.value, timestamp=args.timestamp) diff --git a/lib/vsc/utils/netcat.py b/lib/vsc/utils/netcat.py deleted file mode 100644 index 19df0f9..0000000 --- a/lib/vsc/utils/netcat.py +++ /dev/null @@ -1,52 +0,0 @@ -# -# Copyright 2024-2024 Ghent University -# -# This file is part of vsc-utils, -# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en), -# with support of Ghent University (http://ugent.be/hpc), -# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be), -# the Flemish Research Foundation (FWO) (http://www.fwo.be/en) -# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en). -# -# https://github.com/hpcugent/vsc-utils -# -# vsc-utils is free software: you can redistribute it and/or modify -# it under the terms of the GNU Library General Public License as -# published by the Free Software Foundation, either version 2 of -# the License, or (at your option) any later version. -# -# vsc-utils is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Library General Public License for more details. -# -# You should have received a copy of the GNU Library General Public License -# along with vsc-utils. If not, see . -# -""" -Utilities for sending data like netcat utility. - -@author Wouter Depypere (Ghent University) -""" - -import logging -import socket - -def connect_and_send(host, port, data, timeout=10): - """ - connect to a host with given timeout and write data to it. - data can be: - - bytes - - string - - timeout value is counted for the entire connection so includes - sending of data. - """ - # make sure data is bytes - if isinstance(data, str): - data = data.encode('utf-8') - - logging.debug("connection to %s:%s with a timeout of %s", host, port, timeout) - with socket.create_connection((host, port), timeout=timeout) as sock: - logging.debug("sending data: %s", data) - sock.sendall(data)