From 20b9c6b266093dfff83f7a5f403cba8bbea7662c Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 27 Jul 2023 15:11:01 +1000 Subject: [PATCH 1/2] apply patch from #96 --- .../slurm-sidecar.py | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/{{cookiecutter.profile_name}}/slurm-sidecar.py b/{{cookiecutter.profile_name}}/slurm-sidecar.py index af18c12..a3daaaa 100755 --- a/{{cookiecutter.profile_name}}/slurm-sidecar.py +++ b/{{cookiecutter.profile_name}}/slurm-sidecar.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """Run a Snakemake v7+ sidecar process for Slurm -This sidecar process will poll ``squeue --user [user] --format='%i,%T'`` -every 60 seconds by default (use environment variable -``SNAKEMAKE_SLURM_SQUEUE_WAIT`` for adjusting this). +This sidecar process will poll ``squeue --me --format='%i,%T'`` every 60 +seconds by default (use environment variable ``SNAKEMAKE_SLURM_SQUEUE_WAIT`` +for adjusting this). Note that you have to adjust the value to fit to your ``MinJobAge`` Slurm configuration. Jobs remain at least ``MinJobAge`` seconds known to the @@ -63,7 +63,7 @@ def __init__( squeue_cmd, squeue_timeout=2, sleep_time=0.01, - max_tries=3, + max_tries=10, *args, **kwargs ): @@ -103,7 +103,10 @@ def get_state(self, jobid): """Return the job state for the given jobid.""" jobid = str(jobid) if jobid not in self.states: - self.states[jobid] = self._get_state_sacct(jobid) + try: + self.states[jobid] = self._get_state_sacct(jobid) + except: + return "__not_seen_yet__" return self.states.get(jobid, "__not_seen_yet__") def register_job(self, jobid): @@ -122,17 +125,22 @@ def _get_state_sacct(self, jobid): try: logger.debug("Calling %s (try %d)", cmd, try_num) output = subprocess.check_output(cmd, timeout=self.squeue_timeout, text=True) - break except subprocess.TimeoutExpired as e: - logger.debug("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries) + logger.warning("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries) + continue except subprocess.CalledProcessError as e: - logger.debug("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries) - if try_num >= self.max_tries: - raise Exception("Problem with call to %s" % cmd) - else: - parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")} - logger.debug("Returning state of %s as %s", jobid, parsed[jobid]) - return parsed[jobid] + logger.warning("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries) + continue + try: + parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")} + logger.debug("Returning state of %s as %s", jobid, parsed[jobid]) + return parsed[jobid] + except IndexError: + logger.warning("Could not parse %s (try %d of %d)", repr(output), try_num, self.max_tries) + secs = try_num / 2.0 + loger.info("Sleeping %f seconds", secs) + time.sleep(secs) + raise Exception("Problem with call to %s" % cmd) def stop(self): """Flag thread to stop execution""" @@ -143,7 +151,7 @@ def _call_squeue(self, allow_failure=True): """Run the call to ``squeue``""" cluster = CookieCutter.get_cluster_option() try_num = 0 - cmd = [SQUEUE_CMD, "--user={}".format(os.environ.get("USER")), "--format=%i,%T", "--state=all"] + cmd = [SQUEUE_CMD, "--me", "--format=%i,%T", "--state=all"] if cluster: cmd.append(cluster) while try_num < self.max_tries: @@ -209,6 +217,12 @@ def do_GET(self): return # Otherwise, query job ID status job_id = self.path[len("/job/status/") :] + try: + logger.debug("CLEMENS:") + logger.debug(job_id) + job_id=job_id.split("%20")[3] + except IndexError: + pass logger.debug("Querying for job ID %s" % repr(job_id)) status = self.server.poll_thread.get_state(job_id) logger.debug("Status: %s" % status) @@ -286,8 +300,8 @@ def main(): poll_thread = PollSqueueThread(SQUEUE_WAIT, SQUEUE_CMD, name="poll-squeue") poll_thread.start() - # Initialize HTTP server that makes available the output of ``squeue --user [user]`` - # in a controlled fashion. + # Initialize HTTP server that makes available the output of ``squeue --me`` in a + # controlled fashion. http_server = JobStateHttpServer(poll_thread) http_thread = threading.Thread(name="http-server", target=http_server.serve_forever) http_thread.start() From 20e9fc88d34cb71507497b11ee20ffa927b3dc5f Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Tue, 1 Aug 2023 13:42:48 +1000 Subject: [PATCH 2/2] undo some patch changes that were old --- {{cookiecutter.profile_name}}/slurm-sidecar.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/{{cookiecutter.profile_name}}/slurm-sidecar.py b/{{cookiecutter.profile_name}}/slurm-sidecar.py index a3daaaa..e79f5da 100755 --- a/{{cookiecutter.profile_name}}/slurm-sidecar.py +++ b/{{cookiecutter.profile_name}}/slurm-sidecar.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """Run a Snakemake v7+ sidecar process for Slurm -This sidecar process will poll ``squeue --me --format='%i,%T'`` every 60 -seconds by default (use environment variable ``SNAKEMAKE_SLURM_SQUEUE_WAIT`` -for adjusting this). +This sidecar process will poll ``squeue --user [user] --format='%i,%T'`` +every 60 seconds by default (use environment variable +``SNAKEMAKE_SLURM_SQUEUE_WAIT`` for adjusting this). Note that you have to adjust the value to fit to your ``MinJobAge`` Slurm configuration. Jobs remain at least ``MinJobAge`` seconds known to the @@ -63,7 +63,7 @@ def __init__( squeue_cmd, squeue_timeout=2, sleep_time=0.01, - max_tries=10, + max_tries=3, *args, **kwargs ): @@ -151,7 +151,7 @@ def _call_squeue(self, allow_failure=True): """Run the call to ``squeue``""" cluster = CookieCutter.get_cluster_option() try_num = 0 - cmd = [SQUEUE_CMD, "--me", "--format=%i,%T", "--state=all"] + cmd = [SQUEUE_CMD, "--user={}".format(os.environ.get("USER")), "--format=%i,%T", "--state=all"] if cluster: cmd.append(cluster) while try_num < self.max_tries: @@ -218,8 +218,6 @@ def do_GET(self): # Otherwise, query job ID status job_id = self.path[len("/job/status/") :] try: - logger.debug("CLEMENS:") - logger.debug(job_id) job_id=job_id.split("%20")[3] except IndexError: pass @@ -300,8 +298,8 @@ def main(): poll_thread = PollSqueueThread(SQUEUE_WAIT, SQUEUE_CMD, name="poll-squeue") poll_thread.start() - # Initialize HTTP server that makes available the output of ``squeue --me`` in a - # controlled fashion. + # Initialize HTTP server that makes available the output of ``squeue --user [user]`` + # in a controlled fashion. http_server = JobStateHttpServer(poll_thread) http_thread = threading.Thread(name="http-server", target=http_server.serve_forever) http_thread.start()