From 7e70a1fc26301ecaf1df9823a26ac3034e4df6c5 Mon Sep 17 00:00:00 2001 From: Kevin Carter Date: Sat, 22 May 2021 23:12:17 -0500 Subject: [PATCH] Rev060 (#61) * fix chat typo Signed-off-by: Kevin Carter * fix issue #57 Kevin is old, in py3 we don't need to inherit from Object... Signed-off-by: Kevin Carter * fix issue #56 Signed-off-by: Kevin Carter * fix issue #55 Signed-off-by: Kevin Carter --- README.md | 2 +- components/container_config_data.py | 8 ++---- directord/__init__.py | 20 ++++++------- directord/client.py | 27 +++++++++--------- directord/components/__init__.py | 10 +++---- directord/components/builtin_pod.py | 28 +++++++++--------- directord/components/builtin_query.py | 1 + directord/datastore/__init__.py | 10 +++---- directord/datastore/redis.py | 11 ++++--- directord/logger.py | 6 ++-- directord/main.py | 2 +- directord/mixin.py | 13 +++++---- directord/server.py | 41 +++++++++------------------ directord/tests/__init__.py | 12 ++++---- directord/tests/test_user.py | 4 ++- directord/ui.py | 2 +- directord/user.py | 4 +-- directord/utils.py | 4 +-- docs/index.md | 2 +- 19 files changed, 96 insertions(+), 111 deletions(-) diff --git a/README.md b/README.md index 39ba67d2..0f0bf6aa 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ diagrams, installation, usage, and more can all be ### Have Questions? -Join us on [`libra chat`](https://libera.chat/guides/connect) at +Join us on [`libera.chat`](https://libera.chat/guides/connect) at **#directord**. The community is just getting started: folks are here to help, answer questions, and support one another. diff --git a/components/container_config_data.py b/components/container_config_data.py index a0d752e6..843fb109 100644 --- a/components/container_config_data.py +++ b/components/container_config_data.py @@ -118,7 +118,7 @@ def client(self, conn, cache, job): for mc in matched_configs: name = os.path.splitext(os.path.basename(mc))[0] config = json.loads(self._slurp(mc)) - self.log.debug("Config found for {}: {}".format(name, config)) + self.log.debug("Config found for %s: %s", name, config) config_dict.update({name: config}) # Merge the config dict with given overrides @@ -131,9 +131,7 @@ def client(self, conn, cache, job): tag="component", ) else: - self.log.debug( - "{} does not exists, skipping step".format(config_path) - ) + self.log.debug("%s does not exists, skipping step", config_path) configs = dict() return configs, None, True @@ -148,5 +146,5 @@ def _slurp(self, path): f = open(path, "r") return f.read() else: - self.log.warn("{} was not found.".format(path)) + self.log.warn("%s was not found.", path) return "" diff --git a/directord/__init__.py b/directord/__init__.py index 9b5ac072..23e90507 100644 --- a/directord/__init__.py +++ b/directord/__init__.py @@ -47,8 +47,8 @@ def send_data(socket_path, data): chunk = s.recv(1024) if not chunk: break - else: - fragments.append(chunk) + + fragments.append(chunk) return b"".join(fragments) @@ -118,7 +118,7 @@ def component_import(component, job_id=None): return True, transfer, component_obj.Component() -class Processor(object): +class Processor: """Processing class, provides queing and threading utilities. This is a base class. @@ -170,9 +170,7 @@ def read_in_chunks(self, file_object, chunk_size=10240): data = file_object.read(chunk_size) if not data: break - self.log.debug( - "Transimitting a {size} Chunk ".format(size=len(data)) - ) + self.log.debug("Transimitting a %s Chunk ", len(data)) yield data @contextlib.contextmanager @@ -194,9 +192,9 @@ def timeout(self, time, job_id, reraise=False): yield except TimeoutError: self.log.warning( - "Timeout encountered after {} seconds running {}.".format( - time, job_id - ) + "Timeout encountered after %s seconds running %s.", + time, + job_id, ) if reraise: raise TimeoutError @@ -210,7 +208,7 @@ def raise_timeout(self, *args, **kwargs): raise TimeoutError -class UNIXSocketConnect(object): +class UNIXSocketConnect: """Context manager for connecting to a UNIX socket.""" def __init__(self, sock_path): @@ -238,7 +236,7 @@ def __exit__(self, *args, **kwargs): self.sock.close() -class DirectordConnect(object): +class DirectordConnect: """Library context manager providing easy access into Directord.""" def __init__(self, debug=False, socket_path="/var/run/directord.sock"): diff --git a/directord/client.py b/directord/client.py index 9dec9efb..4af76268 100644 --- a/directord/client.py +++ b/directord/client.py @@ -169,9 +169,8 @@ def run_heartbeat(self, sentinel=False): if time.time() > heartbeat_at and heartbeat_misses > 5: self.log.error("Heartbeat failure, can't reach server") self.log.warning( - "Reconnecting in {}s...".format( - self.heartbeat_failure_interval - ) + "Reconnecting in [ %s ]...", + self.heartbeat_failure_interval, ) time.sleep(self.heartbeat_failure_interval) @@ -247,12 +246,12 @@ def _job_executor( if cached and component.cacheable: # TODO(cloudnull): Figure out how to skip cache when file # transfering. - self.log.info("Cache hit on {}, task skipped.".format(job_sha1)) + self.log.info("Cache hit on %s, task skipped.", job_sha1) conn.info = b"job skipped" conn.job_state = self.job_end return None, None, None - else: - return component.client(**component_kwargs) + + return component.client(**component_kwargs) def run_job(self, sentinel=False): """Job entry point. @@ -330,7 +329,7 @@ def run_job(self, sentinel=False): job = json.loads(data.decode()) job_id = job.get("task", utils.get_uuid()) job_sha1 = job.get("task_sha1sum", utils.object_sha1(job)) - self.log.info("Job received {}".format(job_id)) + self.log.info("Job received %s", job_id) self.socket_multipart_send( zsocket=self.bind_job, msg_id=job_id.encode(), @@ -356,9 +355,9 @@ def run_job(self, sentinel=False): ) as c: if cache.get(job_parent_id) is False: self.log.error( - "Parent failure {} skipping {}".format( - job_parent_id, job_id - ) + "Parent failure %s skipping %s", + job_parent_id, + job_id, ) status = ( "Job [ {} ] was not allowed to run because" @@ -372,8 +371,8 @@ def run_job(self, sentinel=False): if sentinel: break - else: - continue + + continue with self.timeout( time=job.get("timeout", 600), job_id=job_id @@ -410,7 +409,7 @@ def run_job(self, sentinel=False): if outcome is False: state = c.job_state = self.job_failed - self.log.error("Job failed {}".format(job_id)) + self.log.error("Job failed %s", job_id) if job_parent_id: base_component.set_cache( cache=cache, @@ -420,7 +419,7 @@ def run_job(self, sentinel=False): ) elif outcome is True: state = c.job_state = self.job_end - self.log.info("Job complete {}".format(job_id)) + self.log.info("Job complete %s", job_id) if job_parent_id: base_component.set_cache( cache=cache, diff --git a/directord/components/__init__.py b/directord/components/__init__.py index ecc7e8eb..bea4fe49 100644 --- a/directord/components/__init__.py +++ b/directord/components/__init__.py @@ -23,7 +23,7 @@ from directord import utils -class ComponentBase(object): +class ComponentBase: """Component base class.""" def __init__(self, desc=None): @@ -103,8 +103,8 @@ def run_command( output, error = process.communicate() if process.returncode not in return_codes: return output, error, False - else: - return output, error, True + + return output, error, True def options_converter(self, documentation): """Convert an options YAML to Arguments. @@ -298,8 +298,8 @@ def blueprinter(self, content, values): return else: return rendered_content - else: - return content + + return content def parser_error(self): """Return parser help information.""" diff --git a/directord/components/builtin_pod.py b/directord/components/builtin_pod.py index 49d783d2..86940d02 100644 --- a/directord/components/builtin_pod.py +++ b/directord/components/builtin_pod.py @@ -184,23 +184,23 @@ def client(self, cache, conn, job): data = json.dumps(data) if status: return data, None, status - else: - return None, data, status - else: - return ( - None, - ( - "The action [ {action} ] failed to return" - " a function".format(action=job["pod_action"]) - ), - False, - ) + + return None, data, status + + return ( + None, + ( + "The action [ {action} ] failed to return" + " a function".format(action=job["pod_action"]) + ), + False, + ) except Exception as e: self.log.critical(str(e)) return None, traceback.format_exc(), False -class PodmanConnect(object): +class PodmanConnect: """Connect to the podman unix socket.""" def __init__(self, socket="/var/run/podman/podman.sock"): @@ -351,8 +351,8 @@ def play(self, pod_file, tls_verify=True): params={"tlsVerify": tls_verify, "start": True}, ) return resp.ok, self._decode(resp.content) - else: - return False, "Pod YAML did not exist" + + return False, "Pod YAML did not exist" def exec_run(self, name, command, env, privileged=False): """Create an exec container and run it. diff --git a/directord/components/builtin_query.py b/directord/components/builtin_query.py index 09cd8d0b..d495f8eb 100644 --- a/directord/components/builtin_query.py +++ b/directord/components/builtin_query.py @@ -75,4 +75,5 @@ def client(self, conn, cache, job): query = json.dumps(args.get(job["query"])) else: query = None + return query, None, True diff --git a/directord/datastore/__init__.py b/directord/datastore/__init__.py index 8145dfeb..6adaa0e7 100644 --- a/directord/datastore/__init__.py +++ b/directord/datastore/__init__.py @@ -41,8 +41,8 @@ def prune(self): self.pop(key) except (KeyError, TypeError): pass - else: - return len(self) + + return len(self) def set(self, key, value): """Set key and value if key doesn't already exist. @@ -56,9 +56,9 @@ def set(self, key, value): if key in self: return self[key] - else: - super().__setitem__(key, value) - return self[key] + + super().__setitem__(key, value) + return self[key] def __repr__(self): return f"{type(self).__name__}({super().__repr__()})" diff --git a/directord/datastore/redis.py b/directord/datastore/redis.py index ae47d0dc..735b8023 100644 --- a/directord/datastore/redis.py +++ b/directord/datastore/redis.py @@ -18,7 +18,7 @@ import redis -class BaseDocument(object): +class BaseDocument: """Create a document store object.""" def __init__(self, url, database=0): @@ -120,9 +120,8 @@ def prune(self): self.datastore.delete(item) except (KeyError, TypeError): pass - else: - return len(self.datastore.keys("*")) + return len(self.datastore.keys("*")) def get(self, key): """Return the value of a given key. @@ -147,6 +146,6 @@ def set(self, key, value): item = self.__getitem__(key) if item: return item - else: - self.__setitem__(key, value) - return value + + self.__setitem__(key, value) + return value diff --git a/directord/logger.py b/directord/logger.py index f22a83f0..89443f35 100644 --- a/directord/logger.py +++ b/directord/logger.py @@ -40,7 +40,7 @@ def getLogger(name, debug_logging=False): ) -class LogSetup(object): +class LogSetup: """Logging Class.""" def __init__(self, max_size=500, max_backup=5, debug_logging=False): @@ -159,5 +159,5 @@ def return_logfile(filename, log_dir="/var/log"): return os.path.join(log_dir, filename) elif log_dir_stat.st_gid == user: return os.path.join(log_dir, filename) - else: - return os.path.join(home, filename) + + return os.path.join(home, filename) diff --git a/directord/main.py b/directord/main.py index 8a189176..4ab41e74 100644 --- a/directord/main.py +++ b/directord/main.py @@ -337,7 +337,7 @@ def _args(exec_args=None): return args, parser -class SystemdInstall(object): +class SystemdInstall: """Simple system service unit creation class.""" def __init__(self): diff --git a/directord/mixin.py b/directord/mixin.py index 95c6b0c1..8f442b11 100644 --- a/directord/mixin.py +++ b/directord/mixin.py @@ -26,7 +26,7 @@ from directord import utils -class Mixin(object): +class Mixin: """Mixin class.""" def __init__(self, args): @@ -368,8 +368,8 @@ def _computed_totals(item, value_heading, value): seen_computed_key.append(key) tabulated_data.append(arranged_data) - else: - return tabulated_data, found_headings, computed_values + + return tabulated_data, found_headings, computed_values @staticmethod def bootstrap_catalog_entry(entry): @@ -413,8 +413,8 @@ def bootstrap_localfile_padding(localfile): else: base_path = os.path.join(sys.prefix, "share/directord/tools") return os.path.join(base_path, localfile) - else: - return localfile + + return localfile def bootstrap_flatten_jobs(self, jobs, return_jobs=None): """Return a flattened list of jobs. @@ -439,6 +439,7 @@ def bootstrap_flatten_jobs(self, jobs, return_jobs=None): ) else: return_jobs.append(job) + return return_jobs def bootstrap_run(self, job_def, catalog): @@ -463,7 +464,7 @@ def bootstrap_run(self, job_def, catalog): self.log.info("Running bootstrap for %s", job_def["host"]) for job in self.bootstrap_flatten_jobs(jobs=job_def["jobs"]): key, value = next(iter(job.items())) - self.log.debug("Executing: {} {}".format(key, value)) + self.log.debug("Executing: %s %s", key, value) with utils.ParamikoConnect( host=job_def["host"], username=job_def["username"], diff --git a/directord/server.py b/directord/server.py index 33c6d302..49782117 100644 --- a/directord/server.py +++ b/directord/server.py @@ -133,9 +133,8 @@ def run_heartbeat(self, sentinel=False): ) = self.socket_multipart_recv(zsocket=self.bind_heatbeat) if control in [self.heartbeat_ready, self.heartbeat_notice]: self.log.debug( - "Received Heartbeat from [ {} ], client online".format( - identity.decode() - ) + "Received Heartbeat from [ %s ], client online", + identity.decode(), ) expire = self.get_expiry worker_metadata = {"time": expire} @@ -155,14 +154,14 @@ def run_heartbeat(self, sentinel=False): info=struct.pack(" idle_time: for worker in list(self.workers.keys()): self.log.warning( - "Sending idle worker [ {} ] a heartbeat".format(worker) + "Sending idle worker [ %s ] a heartbeat", worker ) self.socket_multipart_send( zsocket=self.bind_heatbeat, @@ -172,16 +171,10 @@ def run_heartbeat(self, sentinel=False): info=struct.pack(" idle_time + 3: - self.log.warning( - "Removing dead worker {}".format(worker) - ) + self.log.warning("Removing dead worker %s", worker) self.workers.pop(worker) else: - self.log.debug( - "Items after prune {items}".format( - items=self.workers.prune() - ) - ) + self.log.debug("Items after prune %s", self.workers.prune()) if sentinel: break @@ -217,8 +210,8 @@ def _set_job_status( def return_exec_time(started): if started: return time.time() - started - else: - return 0 + + return 0 job_metadata = self.return_jobs.get(job_id) if not job_metadata: @@ -240,15 +233,13 @@ def return_exec_time(started): if job_status == self.job_ack: if not _createtime: job_metadata["_createtime"] = time.time() - self.log.debug("{} received job {}".format(identity, job_id)) + self.log.debug("%s received job %s", identity, job_id) elif job_status == self.job_processing: if not _starttime: job_metadata["_starttime"] = time.time() - self.log.debug("{} is processing {}".format(identity, job_id)) + self.log.debug("%s is processing %s", identity, job_id) elif job_status in [self.job_end, self.nullbyte]: - self.log.debug( - "{} finished processing {}".format(identity, job_id) - ) + self.log.debug("%s finished processing %s", identity, job_id) if "SUCCESS" in job_metadata: job_metadata["SUCCESS"].append(identity) else: @@ -371,9 +362,7 @@ def run_job(self): targets.append(job_target) else: self.log.critical( - "Target {} is in an unknown state.".format( - job_target - ) + "Target %s is in an unknown state.", job_target ) return 512, time.time() else: @@ -434,7 +423,7 @@ def run_job(self): data=json.dumps(job_item).encode(), ) - self.log.debug("Sent job {} to {}".format(task, identity)) + self.log.debug("Sent job %s to %s", task, identity) else: self.return_jobs[task] = job_info @@ -682,9 +671,7 @@ def run_socket_server(self, sentinel=False): str(e), ) else: - self.log.debug( - "Data sent to queue, {}".format(json_data) - ) + self.log.debug("Data sent to queue, %s", json_data) finally: self.job_queue.put(json_data) if sentinel: diff --git a/directord/tests/__init__.py b/directord/tests/__init__.py index 2e112dcb..3df41668 100644 --- a/directord/tests/__init__.py +++ b/directord/tests/__init__.py @@ -56,7 +56,7 @@ """ -class FakePopen(object): +class FakePopen: """Fake Shell Commands.""" def __init__(self, return_code=0, *args, **kwargs): @@ -67,7 +67,7 @@ def communicate(): return "stdout", "stderr" -class FakeStat(object): +class FakeStat: def __init__(self, uid, gid): self.st_uid = uid self.st_gid = gid @@ -76,7 +76,7 @@ def __init__(self, uid, gid): self.st_mode = 0 -class FakeArgs(object): +class FakeArgs: config_file = None datastore = None debug = False @@ -118,7 +118,7 @@ def __init__(self, rc=0): self.channel.recv_exit_status.return_value = rc -class FakeCache(object): +class FakeCache: def __init__(self): self.cache = {"args": {"test": 1}} @@ -129,8 +129,8 @@ def pop(self, key, **kwargs): if key not in self.cache: if "default" in kwargs: return kwargs["default"] - else: - return self.cache.pop(key) + + return self.cache.pop(key) def set(self, key, value, **kwargs): self.cache[key] = value diff --git a/directord/tests/test_user.py b/directord/tests/test_user.py index 3fb5b24c..9cc79596 100644 --- a/directord/tests/test_user.py +++ b/directord/tests/test_user.py @@ -187,7 +187,9 @@ def test_poll_job_timeout( self.manage.poll_job("test-id") mock_log_warn.assert_called_once_with( unittest.mock.ANY, - "Timeout encountered after 1 seconds running test-id.", + "Timeout encountered after %s seconds running %s.", + 1, + "test-id", ) mock_log_error.assert_called_once_with( unittest.mock.ANY, diff --git a/directord/ui.py b/directord/ui.py index 9d9062b3..1cc90bfb 100644 --- a/directord/ui.py +++ b/directord/ui.py @@ -46,7 +46,7 @@ def get(): return response -class UI(object): +class UI: """The Directord UI execution class.""" def __init__(self, args, jobs, nodes): diff --git a/directord/user.py b/directord/user.py index c324809d..fd96c337 100644 --- a/directord/user.py +++ b/directord/user.py @@ -147,8 +147,8 @@ def poll_job(self, job_id, miss=0): return True, "Job Success: {}".format(job_id) elif len(data_return.get("FAILED", list())) > 0: return None, "Job Degrated: {}".format(job_id) - else: - return None, "Job Skipped: {}".format(job_id) + + return None, "Job Skipped: {}".format(job_id) else: miss += 1 if miss > 5: diff --git a/directord/utils.py b/directord/utils.py index 89b40bbe..cdc7e59a 100644 --- a/directord/utils.py +++ b/directord/utils.py @@ -66,7 +66,7 @@ def merge_dict(base, new): return base -class ClientStatus(object): +class ClientStatus: """Context manager for transmitting client status.""" def __init__(self, socket, job_id, command, ctx): @@ -112,7 +112,7 @@ def __exit__(self, *args, **kwargs): ) -class ParamikoConnect(object): +class ParamikoConnect: """Context manager to remotly connect to servers using paramiko. The connection manager requires an SSH key to be defined, and exist, diff --git a/docs/index.md b/docs/index.md index 524ce2c0..1dbe8659 100644 --- a/docs/index.md +++ b/docs/index.md @@ -56,6 +56,6 @@ Documentation needed to be successful with Directord. ### Have Questions? -Join us on [`libra chat`](https://libera.chat/guides/connect) at +Join us on [`libera.chat`](https://libera.chat/guides/connect) at **#directord**. The community is just getting started: folks are here to help, answer questions, and support one another.