Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sidecar index out of range issues #115

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions {{cookiecutter.profile_name}}/slurm-sidecar.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
"""Run a Snakemake v7+ sidecar process for Slurm

This sidecar process will poll ``squeue --user [user] --format='%i,%T'``
every 60 seconds by default (use environment variable
``SNAKEMAKE_SLURM_SQUEUE_WAIT`` for adjusting this).
This sidecar process will poll ``squeue --me --format='%i,%T'`` every 60
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
seconds by default (use environment variable ``SNAKEMAKE_SLURM_SQUEUE_WAIT``
for adjusting this).

Note that you have to adjust the value to fit to your ``MinJobAge`` Slurm
configuration. Jobs remain at least ``MinJobAge`` seconds known to the
Expand Down Expand Up @@ -63,7 +63,7 @@ def __init__(
squeue_cmd,
squeue_timeout=2,
sleep_time=0.01,
max_tries=3,
max_tries=10,
*args,
**kwargs
):
Expand Down Expand Up @@ -103,7 +103,10 @@ def get_state(self, jobid):
"""Return the job state for the given jobid."""
jobid = str(jobid)
if jobid not in self.states:
self.states[jobid] = self._get_state_sacct(jobid)
try:
self.states[jobid] = self._get_state_sacct(jobid)
except:
return "__not_seen_yet__"
return self.states.get(jobid, "__not_seen_yet__")

def register_job(self, jobid):
Expand All @@ -122,17 +125,22 @@ def _get_state_sacct(self, jobid):
try:
logger.debug("Calling %s (try %d)", cmd, try_num)
output = subprocess.check_output(cmd, timeout=self.squeue_timeout, text=True)
break
except subprocess.TimeoutExpired as e:
logger.debug("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries)
logger.warning("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries)
continue
except subprocess.CalledProcessError as e:
logger.debug("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries)
if try_num >= self.max_tries:
raise Exception("Problem with call to %s" % cmd)
else:
parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")}
logger.debug("Returning state of %s as %s", jobid, parsed[jobid])
return parsed[jobid]
logger.warning("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries)
continue
try:
parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")}
logger.debug("Returning state of %s as %s", jobid, parsed[jobid])
return parsed[jobid]
except IndexError:
logger.warning("Could not parse %s (try %d of %d)", repr(output), try_num, self.max_tries)
secs = try_num / 2.0
loger.info("Sleeping %f seconds", secs)
time.sleep(secs)
raise Exception("Problem with call to %s" % cmd)

def stop(self):
"""Flag thread to stop execution"""
Expand All @@ -143,7 +151,7 @@ def _call_squeue(self, allow_failure=True):
"""Run the call to ``squeue``"""
cluster = CookieCutter.get_cluster_option()
try_num = 0
cmd = [SQUEUE_CMD, "--user={}".format(os.environ.get("USER")), "--format=%i,%T", "--state=all"]
cmd = [SQUEUE_CMD, "--me", "--format=%i,%T", "--state=all"]
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
if cluster:
cmd.append(cluster)
while try_num < self.max_tries:
Expand Down Expand Up @@ -209,6 +217,12 @@ def do_GET(self):
return
# Otherwise, query job ID status
job_id = self.path[len("/job/status/") :]
try:
logger.debug("CLEMENS:")
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(job_id)
job_id=job_id.split("%20")[3]
except IndexError:
pass
logger.debug("Querying for job ID %s" % repr(job_id))
status = self.server.poll_thread.get_state(job_id)
logger.debug("Status: %s" % status)
Expand Down Expand Up @@ -286,8 +300,8 @@ def main():
poll_thread = PollSqueueThread(SQUEUE_WAIT, SQUEUE_CMD, name="poll-squeue")
poll_thread.start()

# Initialize HTTP server that makes available the output of ``squeue --user [user]``
# in a controlled fashion.
# Initialize HTTP server that makes available the output of ``squeue --me`` in a
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
# controlled fashion.
http_server = JobStateHttpServer(poll_thread)
http_thread = threading.Thread(name="http-server", target=http_server.serve_forever)
http_thread.start()
Expand Down