From f8fc08b121ebae976bbb503d61c872ed68c50f9e Mon Sep 17 00:00:00 2001 From: Kamil Cukrowski Date: Mon, 16 Oct 2023 04:42:35 +0200 Subject: [PATCH] WIP --- pyproject.toml | 5 + src/nomad_tools/common.py | 39 +- src/nomad_tools/nomad_port.py | 92 ++ src/nomad_tools/nomad_watch.py | 1448 ++++++++--------- src/nomad_tools/nomaddbjob.py | 253 +++ src/nomad_tools/nomadlib/connection.py | 1 + src/nomad_tools/nomadlib/datadict.py | 10 +- src/nomad_tools/nomadlib/types.py | 69 +- src/nomad_tools/nomadt.py | 117 ++ tests/integration/test_nomad_cp.py | 2 +- tests/integration/test_nomad_watch.py | 12 +- tests/integration/test_nomad_watch2.py | 16 + .../test-echo10sleep05.nomad.hcl} | 4 + tests/jobs/test-listen.nomad.hcl | 58 + tests/jobs/test-maintask.nomad.hcl | 60 + tests/testlib.py | 51 +- tests/unit/test_datadict.py | 19 +- 17 files changed, 1500 insertions(+), 756 deletions(-) create mode 100644 src/nomad_tools/nomad_port.py create mode 100644 src/nomad_tools/nomaddbjob.py create mode 100644 src/nomad_tools/nomadt.py create mode 100644 tests/integration/test_nomad_watch2.py rename tests/{echo10sleep05.nomad.hcl => jobs/test-echo10sleep05.nomad.hcl} (86%) create mode 100644 tests/jobs/test-listen.nomad.hcl create mode 100644 tests/jobs/test-maintask.nomad.hcl diff --git a/pyproject.toml b/pyproject.toml index c9a06b4..77fc53b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,8 @@ nomad-watch = "nomad_tools:nomad_watch.cli" nomad-cp = "nomad_tools:nomad_cp.cli" nomad-vardir = "nomad_tools:nomad_vardir.cli" nomad-gitlab-runner = "nomad_tools:nomad_gitlab_runner.main" +nomad-port = "nomad_tools:nomad_port.cli" +#nomadt = "nomad_tools:nomadt.cli" [project.optional-dependencies] test = [ @@ -46,6 +48,9 @@ log_cli_date_format = "%H:%M:%S" testpaths = [ "tests/unit", ] +filterwarnings = [ + "ignore::DeprecationWarning", +] [tool.pyright] reportUnnecessaryComparison = "error" diff --git a/src/nomad_tools/common.py b/src/nomad_tools/common.py index d72b966..17562fa 100644 --- a/src/nomad_tools/common.py +++ b/src/nomad_tools/common.py @@ -4,9 +4,9 @@ import click import pkg_resources -from .nomadlib import Nomadlib +from . import nomadlib -mynomad = Nomadlib() +mynomad = nomadlib.Nomadlib() def nomad_find_namespace(prefix: str): @@ -90,14 +90,35 @@ def _print_version(ctx, param, value): ctx.exit() +def version_option(): + return click.option( + "--version", + is_flag=True, + callback=_print_version, + expose_value=False, + is_eager=True, + help="Print program version then exit.", + ) + + def common_options(): return composed( click.help_option("-h", "--help"), - click.option( - "--version", - is_flag=True, - callback=_print_version, - expose_value=False, - is_eager=True, - ), + version_option(), + ) + + +class ArgumentAlloc(nomadlib.Alloc): + def __init__(self, allocid: str): + allocs = mynomad.get(f"allocations", params={"prefix": allocid}) + assert len(allocs) > 0, f"Allocation with id {allocid} not found" + assert len(allocs) < 2, f"Multiple allocations found starting with id {allocid}" + super().__init__(allocs[0]) + + +def argument_alloc(): + return click.argument( + "allocid", + shell_complete=completor(lambda: (x["ID"] for x in mynomad.get("allocations"))), + type=ArgumentAlloc, ) diff --git a/src/nomad_tools/nomad_port.py b/src/nomad_tools/nomad_port.py new file mode 100644 index 0000000..54054de --- /dev/null +++ b/src/nomad_tools/nomad_port.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-3.0-or-later + +import argparse +import logging +from typing import List + +import click + +from . import nomadlib +from .common import argument_alloc, common_options, complete_job, mynomad + +log = logging.getLogger(__name__) + + +def gen_alloc(alloc: nomadlib.Alloc) -> List[str]: + out = [] + log.debug(f"Parsing {alloc.ID}") + for i in alloc["AllocatedResources"]["Shared"]["Ports"]: + params = dict( + **{ + k: v + for k, v in alloc.items() + if isinstance(v, str) and isinstance(k, str) + }, + label=i["Label"], + host=i["HostIP"], + port=i["Value"], + ) + log.debug(f"Found") + out += [args.format % params] + return out + + +def output(out: List[str]): + if out: + print(*out, sep=args.separator) + + +@click.group() +@click.option( + "-f", "--format", default="%(Name)s %(label)s %(host)s:%(port)s", show_default=True +) +@click.option("-s", "--separator", default="\n", show_default=True) +@click.option("-v", "--verbose", count=True, help="Be more verbose.") +@common_options() +def cli(**kwargs): + global args + args = argparse.Namespace(**kwargs) + logging.basicConfig( + level=( + logging.DEBUG + if args.verbose > 0 + else logging.INFO + if args.verbose == 0 + else logging.WARN + if args.verbose == 1 + else logging.ERROR + ), + ) + + +@cli.command("alloc", help="Show ports of a single allocation.") +@argument_alloc() +def mode_alloc(alloc: nomadlib.Alloc): + out = gen_alloc(alloc) + output(out) + + +@cli.command("job", help="Show ports of all running or pending job allocations.") +@click.option( + "--all", + help="Show all allocation ports, not only running or pending.", + is_flag=True, +) +@click.argument( + "jobid", + shell_complete=complete_job(), +) +def mode_job(jobid, all): + jobid = mynomad.find_job(jobid) + out = [] + for alloc in mynomad.get(f"job/{jobid}/allocations"): + if all or nomadlib.Alloc(alloc).is_pending_or_running(): + # job/*/allocations does not have AllocatedResources information. + alloc = nomadlib.Alloc(mynomad.get(f"allocation/{alloc['ID']}")) + out += gen_alloc(alloc) + output(out) + + +if __name__ == "__main__": + cli() diff --git a/src/nomad_tools/nomad_watch.py b/src/nomad_tools/nomad_watch.py index 91b3b9a..d3ee806 100755 --- a/src/nomad_tools/nomad_watch.py +++ b/src/nomad_tools/nomad_watch.py @@ -7,13 +7,14 @@ import base64 import dataclasses import datetime +import enum import inspect import itertools import json import logging import os -import queue import re +import shlex import subprocess import sys import threading @@ -47,8 +48,8 @@ namespace_option, nomad_find_namespace, ) -from .nomad_smart_start_job import nomad_smart_start_job -from .nomadlib import ns2dt +from .nomaddbjob import NomadDbJob +from .nomadlib import Event, EventTopic, EventType, MyStrEnum, ns2dt log = logging.getLogger(__name__) @@ -59,46 +60,59 @@ START_NS: int = 0 -def _init_colors() -> Dict[str, str]: - tputdict = { - "bold": "bold", - "black": "setaf 0", - "red": "setaf 1", - "green": "setaf 2", - "orange": "setaf 3", - "blue": "setaf 4", - "magenta": "setaf 5", - "cyan": "setaf 6", - "white": "setaf 7", - "reset": "sgr0", - } - empty = {k: "" for k in tputdict.keys()} - if not sys.stdout.isatty() or not sys.stderr.isatty(): - return empty - tputscript = "\n".join(tputdict.values()).replace("\n", "\nlongname\nlongname\n") - try: - longname = subprocess.run( - f"tput longname".split(), - text=True, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - ).stdout - ret = subprocess.run( - "tput -S".split(), - input=tputscript, - text=True, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - ).stdout - except (subprocess.CalledProcessError, FileNotFoundError): - return empty - retarr = ret.split(f"{longname}{longname}") - if len(tputdict.keys()) != len(retarr): - return empty - return {k: v for k, v in zip(tputdict.keys(), retarr)} - - -COLORS = _init_colors() +@dataclasses.dataclass +class Colors: + bold: str = "bold" + black: str = "setaf 0" + red: str = "setaf 1" + green: str = "setaf 2" + orange: str = "setaf 3" + blue: str = "setaf 4" + magenta: str = "setaf 5" + cyan: str = "setaf 6" + white: str = "setaf 7" + brightblack: str = "setaf 8" + brightred: str = "setaf 9" + brightgreen: str = "setaf 10" + brightorange: str = "setaf 11" + brightblue: str = "setaf 12" + brightmagenta: str = "setaf 13" + brightcyan: str = "setaf 14" + brightwhite: str = "setaf 15" + reset: str = "sgr0" + + @staticmethod + def init() -> Colors: + tputdict = dataclasses.asdict(Colors()) + empty = Colors(**{k: "" for k in tputdict.keys()}) + if not sys.stdout.isatty() or not sys.stderr.isatty(): + return empty + tputscript = "\n".join(tputdict.values()).replace( + "\n", "\nlongname\nlongname\n" + ) + try: + longname = subprocess.run( + f"tput longname".split(), + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).stdout + ret = subprocess.run( + "tput -S".split(), + input=tputscript, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).stdout + except (subprocess.CalledProcessError, FileNotFoundError): + return empty + retarr = ret.split(f"{longname}{longname}") + if len(tputdict.keys()) != len(retarr): + return empty + return Colors(**{k: v for k, v in zip(tputdict.keys(), retarr)}) + + +COLORS = Colors.init() T = TypeVar("T") @@ -112,69 +126,114 @@ def set_not_in_add(s: Set[T], value: T) -> bool: return True +def andjoin(arr: Iterable[str]) -> str: + arr = list(arr) + if not len(arr): + return "" + if len(arr) == 1: + return arr[0] + return ", ".join(arr[:-1]) + " and " + arr[-1] + + +PARAMS = {} + + +def set_param( + opt: str, val: Any, ctx: click.Context, param: Optional[click.Parameter], value: Any +): + if value: + PARAMS[opt] = val + + ############################################################################### -@dataclasses.dataclass(frozen=True) -class LogFormat: - eval: str - alloc: str - stderr: str - stdout: str - module: str +class LogWhat(MyStrEnum): + deploy = enum.auto() + eval = enum.auto() + alloc = enum.auto() + stdout = enum.auto() + stderr = enum.auto() - @staticmethod - def mk( - prefix: str = "%(allocid).6s:%(group)s:%(task)s:", - log_timestamp: bool = False, - ): - now = "%(asctime)s:" if log_timestamp else "" - alloc_now = "" if log_timestamp else " %(asctime)s" - return LogFormat( - f"{now}%(evalid).6s:eval %(message)s", - f"{now}{prefix}A{alloc_now} %(message)s", - f"{now}{prefix}E %(message)s", - f"{now}{prefix}O %(message)s", - f"{now}%(module)s:%(lineno)03d: %(levelname)s %(message)s", - ).add_color() - - def add_color(self) -> LogFormat: - return LogFormat( - f"%(magenta)s{self.eval}%(reset)s", - f"%(cyan)s{self.alloc}%(reset)s", - f"%(orange)s{self.stderr}%(reset)s", - self.stdout, - f"%(blue)s{self.module}%(reset)s", - ) - def astuple(self): - return dataclasses.astuple(self) +class LOGFORMAT: + """Logging output format specification templates""" + + pre = "{color}{now.strftime(args.log_time_format) + '>' if args.log_time else ''}{mark}>" + post = " {message}{reset}" + task = "{task + '>' if task else ''}" + DEFAULT = ( + pre + + "{id:.{args.log_id_len}}{'>' if args.log_id_len else ''}" + + "{'#' + str(jobversion) + '>' if jobversion is not None else ''}" + + "{group + '>' if group else ''}" + + task + + post + ) + """Default log format. The log is templated with f-string using eval() below.""" + ONE = pre + task + post + """Log format with -1 option""" + ZERO = pre + post + """Log format with -0 option""" + LOGGING = ( + COLORS.blue + + "{'%(asctime)s>' if args.log_time else ''}" + + "%(module)s>%(lineno)03d> %(levelname)s %(message)s" + + COLORS.reset + ) + """Logging format, first templated with f-string, then by logging""" + colors: Dict[LogWhat, str] = { + LogWhat.deploy: COLORS.brightmagenta, + LogWhat.eval: COLORS.magenta, + LogWhat.alloc: COLORS.cyan, + LogWhat.stderr: COLORS.orange, + LogWhat.stdout: "", + } + """Logs colors""" + marks: Dict[LogWhat, str] = { + LogWhat.deploy: "deploy", + LogWhat.eval: "eval", + LogWhat.alloc: "A", + LogWhat.stderr: "E", + LogWhat.stdout: "O", + } + """Logs marks""" - def __params(self, params: Dict[str, Any] = {}) -> Dict[str, Any]: - return { - **params, - **COLORS, - "asctime": params["now"].strftime(args.log_timestamp_format), - } - def __log(self, fmt: str, **kwargs: Any): - if fmt: - print(fmt % self.__params(kwargs), flush=True) +@dataclasses.dataclass +class LogArgs: + id: str + message: str + what: str + now: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) + nodename: Optional[str] = None + jobversion: Optional[str] = None + group: Optional[str] = None + task: Optional[str] = None + reset: str = COLORS.reset + + def fmt(self): + global args + assert self.what in [e.value for e in LogWhat] + locals().update(dataclasses.asdict(self)) + mark: str = LOGFORMAT.marks[self.what] # type: ignore + color: str = LOGFORMAT.colors[self.what] + reset: str = self.reset if color else "" # type: ignore + return eval(f"f{args.log_format!r}") - def log_eval(self, evalid: str, now: datetime.datetime, message: str): - return self.__log(self.eval, evalid=evalid, now=now, message=message) - def log_alloc(self, **kwargs: Any): - return self.__log(self.alloc, **kwargs) +@dataclasses.dataclass +class LogEnabler: + """Should logging of a specific stream be enabled?""" - def log_std(self, stderr: bool, **kwargs: Any): - return self.__log( - self.stderr if stderr else self.stdout, - **kwargs, - ) + deploy: bool = True + eval: bool = True + alloc: bool = True + stdout: bool = True + stderr: bool = True -logformat = LogFormat.mk() +LOGENABLED = LogEnabler() def click_log_options(): @@ -182,97 +241,73 @@ def click_log_options(): return composed( click.option( "-T", - "--log-timestamp", + "--log-time", is_flag=True, help="Additionally add timestamp to the logs. The timestamp of stdout and stderr streams is when the log was received, as Nomad does not store timestamp of task logs.", ), click.option( - "--log-timestamp-format", + "--log-time-format", default="%Y-%m-%dT%H:%M:%S%z", show_default=True, + help="Format time with specific format. Passed to python datetime.strftime.", ), click.option( "-H", - "--log-timestamp-hour", + "--log-time-hour", is_flag=True, - help="Alias for --log-timestamp --log-timestamp-format %H:%M:%S", - ), - click.option("--log-format-eval", default=logformat.eval, show_default=True), - click.option("--log-format-alloc", default=logformat.alloc, show_default=True), - click.option( - "--log-format-stderr", default=logformat.stderr, show_default=True + help="Alias for --log-time --log-time-format=%H:%M:%S", + callback=lambda *args: ( + set_param("log_time_format", "%H:%M:%S", *args), + set_param("log_time", True, *args), + ), ), click.option( - "--log-format-stdout", default=logformat.stdout, show_default=True + "--log-format", + default=LOGFORMAT.DEFAULT, + show_default=True, + help="The format to use when printing job logs", ), click.option( - "--log-long-alloc", is_flag=True, help="Log full length allocation id" + "--log-id-len", + default=6, + help="The length of id to log. UUIDv4 has 36 characters.", ), click.option( - "-G", - "--log-no-group", + "-l", + "--log-id-long", is_flag=True, - help="Do not log group", - ), - click.option( - "--log-no-task", - is_flag=True, - help="Do not log task", + help="Alias to --log-id-len=36", + callback=lambda *args: set_param("log_id_len", 36, *args), ), click.option( "-1", "--log-only-task", is_flag=True, - help="Prefix the lines only with task name.", + help=f"Equal to --log-format={LOGFORMAT.ONE}", + callback=lambda *args: set_param("log_format", LOGFORMAT.ONE, *args), ), click.option( "-0", "--log-none", is_flag=True, - help="Log only stream prefix", + help=f"Equal to --log-format={LOGFORMAT.ZERO}", + callback=lambda *args: set_param("log_format", LOGFORMAT.ZERO, *args), ), ) def log_format_choose(): - global logformat - logformat = LogFormat( - args.log_format_eval, - args.log_format_alloc, - args.log_format_stderr, - args.log_format_stdout, - logformat.module, - ) - alloc = "%(allocid)s" if args.log_long_alloc else "%(allocid).6s" - group = "" if args.log_no_group else "%(group)s:" - task = "" if args.log_no_task else "%(task)s:" - logformat = LogFormat.mk(f"{alloc}:{group}{task}", args.log_timestamp) - args.log_timestamp = args.log_timestamp_hour or args.log_timestamp - args.log_timestamp_format = ( - "%H:%M:%S" if args.log_timestamp_hour else args.log_timestamp_format + global LOG_ENABLED + LOG_ENABLED = LogEnabler( + deploy=any(s in "all deployment deploy d".split() for s in args.out), + eval=any(s in "all evaluation eval e".split() for s in args.out), + alloc=any(s in "all alloc a".split() for s in args.out), + stderr=any(s in "all stderr err e 2".split() for s in args.out), + stdout=any(s in "all stdout out o 1".split() for s in args.out), ) - if args.log_only_task: - logformat = LogFormat.mk("%(task)s:", args.log_timestamp) - elif args.log_none: - logformat = LogFormat.mk("", args.log_timestamp) - else: - logformat = LogFormat.mk(log_timestamp=args.log_timestamp) - # - allow_eval = any(s in "all evaluation eval e".split() for s in args.out) - allow_alloc = any(s in "all alloc a".split() for s in args.out) - allow_stderr = any(s in "all stderr err e 2".split() for s in args.out) - allow_stdout = any(s in "all stdout out o 1".split() for s in args.out) - logformat = LogFormat( - eval=logformat.eval if allow_eval else "", - alloc=logformat.alloc if allow_alloc else "", - stderr=logformat.stderr if allow_stderr else "", - stdout=logformat.stdout if allow_stdout else "", - module=logformat.module, - ) - # logging.basicConfig( - format=logformat.module, - datefmt=args.log_timestamp_format, + format=eval(f"f{LOGFORMAT.LOGGING!r}"), + datefmt=args.log_time_format, level=( logging.DEBUG if args.verbose > 0 @@ -283,16 +318,53 @@ def log_format_choose(): else logging.ERROR ), ) - # https://stackoverflow.com/questions/17558552/how-do-i-add-custom-field-to-python-log-format-string - old_factory = logging.getLogRecordFactory() - def record_factory(*args, **kwargs): - record = old_factory(*args, **kwargs) - for k, v in COLORS.items(): - setattr(record, k, v) - return record - logging.setLogRecordFactory(record_factory) +@dataclasses.dataclass(frozen=True) +class mylogger: + """Used to log from the various streams we want to log from""" + + @staticmethod + def __log(what: str, **kwargs): + global args + logargs = LogArgs(what=what, **kwargs) + try: + out = logargs.fmt() + except KeyError: + print(f"fmt={args.log_format!r} params={args}", file=sys.stderr, flush=True) + raise + print(out, flush=True) + + @classmethod + def log_eval(cls, db: NomadDbJob, eval: nomadlib.Eval, message: str): + return cls.__log( + LogWhat.eval, + id=eval.ID, + jobversion=db.find_jobversion_from_modifyindex(eval.ModifyIndex), + now=ns2dt(eval.ModifyTime), + message=message, + ) + + @classmethod + def log_deploy(cls, db: NomadDbJob, deploy: nomadlib.Deploy, message: str): + job = db.jobversions.get(deploy.JobVersion) + return cls.__log( + LogWhat.deploy, + id=deploy.ID, + jobversion=deploy.JobVersion, + now=ns2dt(job.SubmitTime) if job else datetime.datetime.now(), + message=message, + ) + + @classmethod + def log_alloc(cls, allocid: str, **kwargs): + return cls.__log(LogWhat.alloc, id=allocid, **kwargs) + + @classmethod + def log_std(cls, stderr: bool, allocid: str, **kwargs): + return cls.__log( + LogWhat.stderr if stderr else LogWhat.stdout, id=allocid, **kwargs + ) @dataclasses.dataclass(frozen=True) @@ -300,21 +372,22 @@ class TaskKey: """Represent data to unique identify a task""" allocid: str + jobversion: str nodename: str group: str task: str def __str__(self): - return f"{self.allocid:.6}:{self.group}:{self.task}" + return f"{self.allocid:.6}:v{self.jobversion}:{self.group}:{self.task}" def asdict(self): return dataclasses.asdict(self) def log_alloc(self, now: datetime.datetime, message: str): - logformat.log_alloc(now=now, message=message, **self.asdict()) + mylogger.log_alloc(now=now, message=message, **self.asdict()) def log_task(self, stderr: bool, message: str): - logformat.log_std( + mylogger.log_std( stderr=stderr, message=message, now=datetime.datetime.now().astimezone(), @@ -421,11 +494,10 @@ class TaskHandler: @staticmethod def _create_loggers(tk: TaskKey): - global logformat ths: List[TaskLogger] = [] - if logformat.stdout: + if LOGENABLED.stdout: ths.append(TaskLogger(tk, False)) - if logformat.stderr: + if LOGENABLED.stderr: ths.append(TaskLogger(tk, True)) for th in ths: th.start() @@ -434,9 +506,8 @@ def _create_loggers(tk: TaskKey): def notify(self, tk: TaskKey, taskstate: nomadlib.AllocTaskState): """Receive notification that a task state has changed""" - global logformat events = taskstate.Events - if logformat.alloc: + if LOGENABLED.alloc: for e in events: msgtime_ns = e.Time # Ignore message before ignore times. @@ -473,18 +544,25 @@ def stop(self): l.stop() +@dataclasses.dataclass class AllocWorker: """Represents a worker that prints out and manages state related to one allocation""" - def __init__(self): - self.taskhandlers: Dict[TaskKey, TaskHandler] = {} + db: NomadDbJob + taskhandlers: Dict[TaskKey, TaskHandler] = dataclasses.field(default_factory=dict) def notify(self, alloc: nomadlib.Alloc): """Update the state with alloc""" for taskname, task in alloc.get_taskstates().items(): if args.task and not args.task.search(taskname): continue - tk = TaskKey(alloc.ID, alloc.NodeName, alloc.TaskGroup, taskname) + tk = TaskKey( + alloc.ID, + str(self.db.get_allocation_job_version(alloc, "?")), + alloc.NodeName, + alloc.TaskGroup, + taskname, + ) self.taskhandlers.setdefault(tk, TaskHandler()).notify(tk, task) @@ -492,7 +570,9 @@ class ExitCode: success = 0 exception = 1 interrupted = 2 - """This program execution flow was interrupted""" + """This program execution flow was interrupted - user clicked ctrl+c""" + failed = 3 + """Starting a job failed - deployment was reverted""" any_failed_tasks = 124 all_failed_tasks = 125 any_unfinished_tasks = 126 @@ -500,38 +580,39 @@ class ExitCode: @dataclasses.dataclass -class AllocWorkers: +class NotifierWorker: """An containers for storing a map of allocation workers""" - db: "Db" + db: NomadDbJob """Link to database""" workers: Dict[str, AllocWorker] = dataclasses.field(default_factory=dict) - evalmessages: Set[Tuple[int, str]] = dataclasses.field(default_factory=set) + messages: Set[Tuple[int, str]] = dataclasses.field(default_factory=set) """A set of evaluation ModifyIndex to know what has been printed.""" def lineno_key_not_printed(self, key: str) -> bool: lineno = inspect.currentframe().f_back.f_lineno # type: ignore - return set_not_in_add(self.evalmessages, (lineno, key)) + return set_not_in_add(self.messages, (lineno, key)) def notify_alloc(self, alloc: nomadlib.Alloc): - if logformat.eval and self.lineno_key_not_printed(alloc.EvalID): - createtime = ns2dt(alloc.CreateTime) - logformat.log_eval( - alloc.EvalID, createtime, f"Allocation {alloc.ID} started" - ) + if LOGENABLED.eval: + evaluation = self.db.evaluations.get(alloc.EvalID) + if evaluation and self.lineno_key_not_printed(alloc.EvalID): + mylogger.log_eval( + self.db, + evaluation, + f"Allocation {alloc.ID} started on {alloc.NodeName}", + ) # - self.workers.setdefault(alloc.ID, AllocWorker()).notify(alloc) + self.workers.setdefault(alloc.ID, AllocWorker(self.db)).notify(alloc) # - if ( - logformat.eval - and alloc.is_finished() - and self.lineno_key_not_printed(alloc.EvalID) - ): - logformat.log_eval( - alloc.EvalID, ns2dt(alloc.ModifyTime), f"Allocation {alloc.ID} finished" - ) + if LOGENABLED.eval and alloc.is_finished(): + evaluation = self.db.evaluations.get(alloc.EvalID) + if evaluation and self.lineno_key_not_printed(alloc.EvalID): + mylogger.log_eval( + self.db, evaluation, f"Allocation {alloc.ID} finished" + ) # - if logformat.eval and alloc.FollowupEvalID: + if LOGENABLED.eval and alloc.FollowupEvalID: followupeval = self.db.evaluations.get(alloc.FollowupEvalID) if followupeval: waituntil = followupeval.getWaitUntil() @@ -539,28 +620,46 @@ def notify_alloc(self, alloc: nomadlib.Alloc): utcnow = datetime.datetime.now(datetime.timezone.utc) delay = waituntil - utcnow if delay > datetime.timedelta(0): - logformat.log_eval( - evalid=followupeval.ID, - now=ns2dt(followupeval.ModifyTime), - message=f"Nomad will attempt to reschedule in {delay} seconds", + mylogger.log_eval( + self.db, + followupeval, + f"Nomad will attempt to reschedule in {delay} seconds", ) def notify_eval(self, evaluation: nomadlib.Eval): if ( - logformat.eval + LOGENABLED.eval and evaluation.Status == nomadlib.EvalStatus.blocked and "FailedTGAllocs" in evaluation and self.lineno_key_not_printed(evaluation.ID) ): - modifytime = ns2dt(evaluation.ModifyTime) - logformat.log_eval( - evaluation.ID, - modifytime, + mylogger.log_eval( + self.db, + evaluation, f"{evaluation.JobID}: Placement Failures: {len(evaluation.FailedTGAllocs)} unplaced", ) for task, metric in evaluation.FailedTGAllocs.items(): for msg in metric.format(True, f"{task}: ").splitlines(): - logformat.log_eval(evaluation.ID, modifytime, msg) + mylogger.log_eval(self.db, evaluation, msg) + + def notify_deploy(self, deployment: nomadlib.Deploy): + # If the job has any service defined. + if ( + LOGENABLED.eval + and self.db.job + and deployment.Status + in [ + nomadlib.DeploymentStatus.successful, + nomadlib.DeploymentStatus.failed, + ] + ): + """https://github.com/hashicorp/nomad/blob/e02dd2a331c778399fd271e85c75bff3e3783d80/ui/app/templates/components/job-deployment/deployment-metrics.hbs#L10""" + for task, tg in deployment.TaskGroups.items(): + mylogger.log_deploy( + self.db, + deployment, + f"{task} Canaries={len(tg.PlacedCanaries or [])}/{tg.DesiredCanaries} Placed={tg.PlacedAllocs} Desired={tg.DesiredTotal} Healthy={tg.HealthyAllocs} Unhealthy={tg.UnhealthyAllocs} {deployment.StatusDescription}", + ) def stop(self): for w in self.workers.values(): @@ -616,259 +715,33 @@ def exitcode(self) -> int: ############################################################################### -Event = nomadlib.Event -EventTopic = nomadlib.EventTopic -EventType = nomadlib.EventType - - -class Db: - """Represents relevant state cache from Nomad database""" - - def __init__( - self, - topics: List[str], - select_event_cb: Callable[[Event], bool], - init_cb: Callable[[], List[Event]], - ): - """ - :param topic The topics to listen to, see Nomad event stream API documentation. - :param select_event_cb: Filter only relevant events from Nomad event stream. - :param init_cb Return a list of events to populate the database with and to poll. Has to be threadsafe! - """ - self.topics: List[str] = topics - self.init_cb: Callable[[], List[Event]] = init_cb - self.select_event_cb: Callable[[Event], bool] = select_event_cb - self.queue: queue.Queue[Optional[List[Event]]] = queue.Queue() - """Queue where database thread puts the received events""" - self.job: Optional[nomadlib.Job] = None - """Watched job""" - self.evaluations: Dict[str, nomadlib.Eval] = {} - """Database of evaluations""" - self.allocations: Dict[str, nomadlib.Alloc] = {} - """Database of allocations""" - self.deployments: Dict[str, nomadlib.Deploy] = {} - """Database of deployments""" - self.initialized = threading.Event() - """Was init_cb called to initiliaze the database""" - self.stopevent = threading.Event() - """If set, the database thread should exit""" - self.thread = threading.Thread( - target=self._thread_entry, name="db", daemon=True - ) - assert self.topics - assert not any(not x for x in topics) - - ############################################################################### - - def _thread_run_stream(self): - log.debug(f"Starting listen Nomad stream with {' '.join(self.topics)}") - with mynomad.stream( - "event/stream", - params={"topic": self.topics}, - ) as stream: - for line in stream.iter_lines(): - if line: - data = json.loads(line) - events: List[Event] = [ - Event( - EventTopic[event["Topic"]], - EventType[event["Type"]], - event["Payload"][event["Topic"]], - stream=True, - ) - for event in data.get("Events", []) - ] - # log.debug(f"RECV EVENTS: {events}") - self.queue.put(events) - if self.stopevent.is_set(): - break - - def _thread_poll(self): - """If listening to stream fails, we fallback to calling init_cb in a loop""" - # Delay polling start up until init_cb has been called. - self.initialized.wait() - while not self.stopevent.wait(1): - self.queue.put(self.init_cb()) - - def _thread_entry(self): - """Database thread entry""" - try: - try: - if args.polling: - self._thread_poll() - else: - try: - self._thread_run_stream() - except nomadlib.PermissionDenied as e: - log.warning( - f"Falling to polling method because stream API returned permission denied: {e}" - ) - self._thread_poll() - finally: - log.debug("Nomad database thread exiting") - self.queue.put(None) - except requests.HTTPError as e: - log.exception("http request failed") - exit(ExitCode.exception) - - ############################################################################### - - def start(self): - """Start the database thread""" - assert ( - mynomad.namespace - ), "Nomad namespace has to be set before starting to listen" - self.thread.start() - - def select_is_in_db(self, e: Event): - """Select events that are already in the database""" - if e.topic == EventTopic.Evaluation: - return e.data["ID"] in self.evaluations - elif e.topic == EventTopic.Allocation: - return e.data["ID"] in self.allocations - elif e.topic == EventTopic.Deployment: - return e.data["ID"] in self.deployments - return False - - def _add_event_to_db(self, e: Event): - """Update database state to reflect received event""" - if e.topic == EventTopic.Job: - if e.type == EventType.JobDeregistered: - self.job = None - else: - self.job = nomadlib.Job(e.data) - elif e.topic == EventTopic.Evaluation: - if e.type == EventType.JobDeregistered: - self.job = None - self.evaluations[e.data["ID"]] = nomadlib.Eval(e.data) - elif e.topic == EventTopic.Allocation: - self.allocations[e.data["ID"]] = nomadlib.Alloc(e.data) - elif e.topic == EventTopic.Deployment: - self.deployments[e.data["ID"]] = nomadlib.Deploy(e.data) - - def handle_event(self, e: Event) -> bool: - if self._select_new_event(e): - if self.select_is_in_db(e) or self.select_event_cb(e): - # log.debug(f"EVENT: {e}") - self._add_event_to_db(e) - return True - else: - # log.debug(f"USER FILTERED: {e}") - pass - else: - # log.debug(f"OLD EVENT: {e}") - pass - return False - - def handle_events(self, events: List[Event]) -> List[Event]: - """From a list of events, filter out ignored and add the rest to database""" - return [e for e in events if self.handle_event(e)] - - @staticmethod - def apply_selects( - e: Event, - job_select: Callable[[nomadlib.Job], bool], - eval_select: Callable[[nomadlib.Eval], bool], - alloc_select: Callable[[nomadlib.Alloc], bool], - deploy_select: Callable[[nomadlib.Deploy], bool], - ) -> bool: - """Apply specific selectors depending on event type""" - return e.apply(job_select, eval_select, alloc_select, deploy_select) - - def _select_new_event(self, e: Event): - """Select events which are newer than those in the database""" - job_select: Callable[[nomadlib.Job], bool] = ( - lambda job: self.job is None or job.ModifyIndex > self.job.ModifyIndex - ) - eval_select: Callable[[nomadlib.Eval], bool] = ( - lambda eval: eval.ID not in self.evaluations - or eval.ModifyIndex > self.evaluations[eval.ID].ModifyIndex - ) - alloc_select: Callable[[nomadlib.Alloc], bool] = ( - lambda alloc: alloc.ID not in self.allocations - or alloc.ModifyIndex > self.allocations[alloc.ID].ModifyIndex - ) - deploy_select: Callable[[nomadlib.Deploy], bool] = ( - lambda deploy: deploy.ID not in self.deployments - or deploy.ModifyIndex > self.deployments[deploy.ID].ModifyIndex - ) - return e.data["Namespace"] == mynomad.namespace and self.apply_selects( - e, job_select, eval_select, alloc_select, deploy_select - ) - - def stop(self): - log.debug("Stopping listen Nomad stream") - self.initialized.set() - self.stopevent.set() - - def join(self): - # Not joining - neither requests nor stream API allow for timeouts. - # self.thread.join() - pass - - def events(self) -> Iterable[List[Event]]: - """Nomad stream returns Events array. Iterate over batches of events returned from Nomad stream""" - assert self.thread.is_alive(), "Thread not alive" - if not self.initialized.is_set(): - events = self.init_cb() - events = self.handle_events(events) - yield events - self.initialized.set() - log.debug("Starting getting events from thread") - while not self.queue.empty() or ( - self.thread.is_alive() and not self.stopevent.is_set() - ): - events = self.queue.get() - if events is None: - break - yield self.handle_events(events) - log.debug("db exiting") - - ############################################################################### -def nomad_watch_eval(evalid: str): - assert isinstance(evalid, str), f"not a string: {evalid}" - db = Db( - topics=[ - f"Evaluation:{evalid}", - ], - select_event_cb=lambda e: e.topic == EventTopic.Evaluation - and e.data["ID"] == evalid, - init_cb=lambda: [ - Event( - EventTopic.Evaluation, - EventType.EvaluationUpdated, - mynomad.get(f"evaluation/{evalid}"), - ) - ], +def nomad_job_run(opts: Tuple[str]) -> str: + """Call nomad job run to start a Nomad job from a file or from stdin or from input""" + cmd: List[str] = "nomad job run -detach -verbose".split() + list(opts) + log.info(f"+ {' '.join(shlex.quote(x) for x in cmd)}") + try: + output = subprocess.check_output(cmd, text=True) + except subprocess.CalledProcessError as e: + # nomad will print its error, we can just exit + exit(e.returncode) + # Extract evaluation id from Nomad output. + for line in output.splitlines(): + log.info(line) + founduuids = re.findall( + "[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", + output.lower(), + re.MULTILINE, ) - db.start() - log.info(f"Waiting for evaluation {evalid}") - eval_ = None - for events in db.events(): - for event in events: - assert event.topic == EventTopic.Evaluation - eval_ = nomadlib.Eval(event.data) - if eval_.Status != "pending": - break - if eval_ and eval_.Status != "pending": - break - db.stop() - assert eval_ is not None - assert ( - eval_.Status == "complete" - ), f"Evaluation {evalid} did not complete: {eval_.get('StatusDescription')}" - FailedTGAllocs = eval_.get("FailedTGAllocs") - if FailedTGAllocs: - groups = " ".join(list(FailedTGAllocs.keys())) - log.warning(f"Evaluation {evalid} failed to place groups: {groups}") + assert len(founduuids) == 1, f"Could not find uuid in nomad job run output" + evalid = founduuids[0] + return evalid -def nomad_start_job(input: str) -> nomadlib.Eval: - assert isinstance(input, str) - evalid = nomad_smart_start_job(input, args.json) +def nomad_start_job(opts: Tuple[str]) -> nomadlib.Eval: + evalid = nomad_job_run(opts) return nomadlib.Eval(mynomad.get(f"evaluation/{evalid}")) @@ -893,15 +766,17 @@ def __init__( self.eval = eval self.endstatusstr = endstatusstr assert self.eval or self.job - if self.job: - assert not self.eval - self.jobid = self.job.ID - mynomad.namespace = self.job.Namespace - else: - assert self.eval + if self.eval: + assert not self.job self.jobid = self.eval.JobID mynomad.namespace = self.eval.Namespace - self.db = Db( + self.jobmodifyindex = self.eval.JobModifyIndex + else: + assert self.job + self.jobid = self.job.ID + mynomad.namespace = self.job.Namespace + self.jobmodifyindex = self.job.JobModifyIndex + self.db = NomadDbJob( topics=[ f"Job:{self.jobid}", f"Evaluation:{self.jobid}", @@ -910,27 +785,73 @@ def __init__( ], select_event_cb=self.db_select_event_job, init_cb=self.db_init_cb, + force_polling=True if args.polling else None, ) - self.allocworkers = AllocWorkers(self.db) + self.notifier = NotifierWorker(self.db) self.done = threading.Event() """I am using threading.Event because you can't handle KeyboardInterrupt while Thread.join().""" self.thread = threading.Thread( - target=self.thread_run, + target=self.__thread_run, name=f"{self.__class__.__name__}({self.jobid})", daemon=True, ) - # Start threads + self.dbseenjob: bool = False + """The job was found at least once.""" + self.purged: bool = False + """If set, we are waiting until the job is there and it means that JobNotFound is not an error - the job was removed.""" + self.purgedlock: threading.Lock = threading.Lock() + """A special lock to synchronize callback with changing finish conditions""" + self.donemsg = "" + """Message printed when done watching. It is not printed right away, because we might decide to wait for purging later.""" + self.started: bool = False + """The job finished because all allocations started, not because they failed.""" + # self.db.start() self.thread.start() + def was_purged(self): + with self.purgedlock: + return self.purged + def db_init_cb(self) -> List[Event]: """Db initialization callback""" - job: dict = mynomad.get(f"job/{self.jobid}") - evaluations: List[dict] = mynomad.get(f"job/{self.jobid}/evaluations") - allocations: List[dict] = mynomad.get(f"job/{self.jobid}/allocations") - deployments: List[dict] = mynomad.get(f"job/{self.jobid}/deployments") + try: + job: dict = mynomad.get(f"job/{self.jobid}") + jobversions: dict = mynomad.get(f"job/{self.jobid}/versions") + evaluations: List[dict] = mynomad.get(f"job/{self.jobid}/evaluations") + allocations: List[dict] = mynomad.get(f"job/{self.jobid}/allocations") + deployments: List[dict] = mynomad.get(f"job/{self.jobid}/deployments") + except nomadlib.JobNotFound: + # This is fine to fail. + if self.was_purged() and self.job: + # If the job was purged, generate one event so that listener can catch it. + return [ + Event(EventTopic.Job, EventType.JobRegistered, self.job.asdict()) + ] + raise + if not self.job: + self.job = nomadlib.Job(job) + log.debug( + f"Found job {self.job.description()} from {self.eval.ID if self.eval else None}" + ) if not allocations: - log.debug(f"Job {nomadlib.Job(job).description()} has no allocations") + log.debug(f"Job {self.job.description()} has no allocations") + else: + # If there are active alocations with JobModifyIndex lower than current watched one, + # also start watching them by lowering JobModfyIndex threashold. + minactiveallocationjobmodifyindex = min( + ( + next((e for e in evaluations if e["ID"] == a["EvalID"]), {}).get( + "JobModifyIndex", self.jobmodifyindex + ) + for a in allocations + if nomadlib.Alloc(a).is_pending_or_running() + ), + default=self.jobmodifyindex, + ) + self.jobmodifyindex = min( + self.jobmodifyindex, minactiveallocationjobmodifyindex + ) return [ *[ Event(EventTopic.Deployment, EventType.DeploymentStatusUpdate, d) @@ -940,15 +861,19 @@ def db_init_cb(self) -> List[Event]: Event(EventTopic.Evaluation, EventType.EvaluationUpdated, e) for e in evaluations ], - *[Event(EventTopic.Job, EventType.JobRegistered, job)], + Event(EventTopic.Job, EventType.JobRegistered, job), *[ Event(EventTopic.Allocation, EventType.AllocationUpdated, a) for a in allocations ], + *[ + Event(EventTopic.Job, EventType.JobRegistered, job) + for job in jobversions["Versions"] + ], ] def db_select_event_jobid(self, e: Event): - return Db.apply_selects( + return self.db.apply_selects( e, lambda job: job.ID == self.jobid, lambda eval: eval.JobID == self.jobid, @@ -962,39 +887,32 @@ def db_select_event_job(self, e: Event) -> bool: job_filter: Callable[[nomadlib.Job], bool] = lambda _: True eval_filter: Callable[[nomadlib.Eval], bool] = lambda eval: ( (self.eval is not None and self.eval.ID == eval.ID) - or ( - self.job is not None - # The JobModifyIndex has to be greater. - and eval.get("JobModifyIndex", -1) >= self.job.JobModifyIndex - ) + # The JobModifyIndex has to be greater. + or eval.get("JobModifyIndex", -1) >= self.jobmodifyindex ) alloc_filter: Callable[[nomadlib.Alloc], bool] = lambda alloc: ( - self.job is not None - and ( - # If allocation has JobVersion, then it has to match the version in the job. - alloc.get("JobVersion", -1) >= self.job.Version + # If allocation has JobVersion, then it has to match the version in the job. + (self.job and alloc.get("JobVersion", -1) >= self.job.Version) + or ( # If the allocation has no JobVersion, find the maching evaluation. - # The JobModifyIndex from the evalution has to match. - or ( - self.db.evaluations.get(alloc.EvalID, {}).get("JobModifyIndex", -1) - >= self.job.JobModifyIndex - ) + # The JobModifyIndex from the evaluation has to match. + self.db.evaluations.get(alloc.EvalID, {}).get("JobModifyIndex", -1) + >= self.jobmodifyindex ) ) deploy_filter: Callable[[nomadlib.Deploy], bool] = lambda deploy: ( - self.job is not None - and deploy.get("JobModifyIndex", -1) >= self.job.JobModifyIndex + deploy.get("JobModifyIndex", -1) >= self.jobmodifyindex ) - return self.db_select_event_jobid(e) and Db.apply_selects( + return self.db_select_event_jobid(e) and self.db.apply_selects( e, job_filter, eval_filter, alloc_filter, deploy_filter ) - @abstractmethod def finish_cb(self) -> bool: """Overloaded callback to call to determine if we should finish watching the job""" - raise NotImplementedError() + # Default is to return False and fallback to job_is_finished. + return False - def thread_run(self): + def __thread_run(self): untilstr = ( "forever" if args.all @@ -1009,13 +927,18 @@ def thread_run(self): for event in events: if event.topic == EventTopic.Allocation: alloc = nomadlib.Alloc(event.data) - self.allocworkers.notify_alloc(alloc) + self.notifier.notify_alloc(alloc) elif event.topic == EventTopic.Evaluation: evaluation = nomadlib.Eval(event.data) - self.allocworkers.notify_eval(evaluation) + self.notifier.notify_eval(evaluation) if self.eval and self.eval.ID == evaluation.ID: self.eval = evaluation + elif event.topic == EventTopic.Deployment: + deployment = nomadlib.Deploy(event.data) + self.notifier.notify_deploy(deployment) elif event.topic == EventTopic.Job: + if self.db.job: + self.dbseenjob = True if ( self.job and self.db.job @@ -1023,56 +946,57 @@ def thread_run(self): ): # self.job follows newest job definition. self.job = self.db.job - if self.job is None and self.db.job: - # If the job is missing, wait for the evaluation to finish, then get job. - assert self.eval - if self.eval.Status != nomadlib.EvalStatus.pending: - self.job = self.db.job - log.debug( - f"Found job {self.job.description()} from {self.eval.ID} with status {self.eval.Status}" - ) if ( not self.done.is_set() - and (not args.all and self.finish_cb()) - or (args.no_follow and time.time() > no_follow_timeend) + and not args.follow + # Evaluation is finished if was passed. + and (self.eval is None or not self.eval.is_pending()) + and ( + (self.finish_cb() or self.job_is_finished()) + or (args.no_follow and time.time() > no_follow_timeend) + ) ): self.done.set() log.debug(f"Watching job {self.jobid}@{mynomad.namespace} exiting") @abstractmethod - def _get_exitcode(self) -> int: + def _get_exitcode_cb(self) -> int: raise NotImplementedError() def get_exitcode(self) -> int: assert self.db.stopevent.is_set(), "stop not called" assert self.done.is_set(), "stop not called" - return self._get_exitcode() + return self._get_exitcode_cb() def wait(self): self.done.wait() - def stop(self): + def stop_job(self, purge: bool): + self.db.initialized.wait() + if purge: + with self.purgedlock: + self.purged = True + self.done.clear() + mynomad.stop_job(self.jobid, purge) + + def stop_threads(self): log.debug(f"stopping {self.__class__.__name__} done={self.done.is_set()}") - self.allocworkers.stop() + self.notifier.stop() self.db.stop() - self.allocworkers.join() + self.notifier.join() self.db.join() mynomad.session.close() + if self.donemsg: + log.info(self.donemsg) def run_and_exit(self): try: self.wait() finally: - self.stop() + self.stop_threads() exit(self.get_exitcode()) - def no_active_allocations_and_evaluations_and_deployments(self): - for allocation in self.db.allocations.values(): - if allocation.is_pending_or_running(): - return False - for evaluation in self.db.evaluations.values(): - if evaluation.is_pending(): - return False + def has_active_deployments(self): for deployments in self.db.deployments.values(): if deployments.Status in [ nomadlib.DeploymentStatus.initializing, @@ -1081,13 +1005,134 @@ def no_active_allocations_and_evaluations_and_deployments(self): nomadlib.DeploymentStatus.blocked, nomadlib.DeploymentStatus.paused, ]: + return True + return False + + def has_active_evaluations(self): + for evaluation in self.db.evaluations.values(): + if evaluation.is_pending(): + return True + return False + + def has_no_active_allocations_and_evaluations_and_deployments(self): + for allocation in self.db.allocations.values(): + if allocation.is_pending_or_running(): return False - return True + return not self.has_active_deployments() and not self.has_active_evaluations() def _job_is_dead_message(self): assert self.job return f"Job {self.job.description()} is dead with no active allocations, evaluations nor deployments." + def job_is_finished(self) -> bool: + """Return True if the job is finished and nothing more will happen to it. Sets donemsg""" + # Protect against the situation when the job JSON is not in a database. + # init_cb first queries allocations, then the job itself. + if ( + self.dbseenjob + and self.job + and self.has_no_active_allocations_and_evaluations_and_deployments() + ): + # Depending on purge argument, we wait for the job to stop existing + # or for the job to be dead. + if self.was_purged(): + if self.db.job is None: + self.donemsg = f"Job {self.job.description()} removed with no active allocations, evaluations nor deployments. Exiting." + return True + else: + if self.job.is_dead(): + self.donemsg = f"{self._job_is_dead_message()} Exiting." + return True + return False + + def is_current_allocation(self, alloc: nomadlib.Alloc) -> bool: + """Return True if the allocation is for the current job version""" + return self.job is not None and ( + alloc.get("JobVersion", -1) >= self.job.Version + or self.db.evaluations.get(alloc.EvalID, {}).get("JobModifyIndex", -1) + >= self.job.JobModifyIndex + ) + + @staticmethod + def nomad_job_group_main_tasks(group: nomadlib.JobTaskGroup): + """Get a set of names of Nomad job group main tasks""" + # Main tasks are tasks that: + # - do not have lifecycle + # - have lifecycle prestart with sidecar = true + # - have lifecycle poststart + # All these tasks have to be started. + maintasks = set( + t.Name + for t in group.Tasks + if "Lifecycle" not in t + or t.Lifecycle is None + or ( + t.Lifecycle.Hook == nomadlib.LifecycleHook.prestart + and t.Lifecycle.get_sidecar() == True + ) + or t.Lifecycle.Hook == nomadlib.LifecycleHook.poststart + ) + assert len(maintasks) > 0, f"Internal error when getting main tasks of {group}" + return maintasks + + def job_has_finished_starting(self) -> bool: + """ + Return True if the job is not starting anymore. + self.started will be set to True, if the job was successfully started. + """ + # log.debug(f"has_active_deployments={self.has_active_deployments()} has_active_evaluations={self.has_active_evaluations()} allocations={len(self.db.allocations)}") + if ( + not self.job + or self.has_active_deployments() + or self.has_active_evaluations() + ): + return False + allocations = self.db.allocations.values() + if not allocations: + return False + groupmsgs: List[str] = [] + for group in self.job.TaskGroups: + # Similar logic in db_filter_event_job() + groupallocs: List[nomadlib.Alloc] = [ + alloc + for alloc in allocations + if alloc.TaskGroup == group.Name and self.is_current_allocation(alloc) + ] + if len(groupallocs) != group.Count: + # This group has no current allocations, + # and no active evaluation and deployments (checked above). + log.info( + f"Job {self.job.description()} has failed to start group {group.Name!r}" + ) + return True + maintasks: Set[str] = self.nomad_job_group_main_tasks(group) + for alloc in groupallocs: + # List of started tasks. + startedtasks: Set[str] = set( + name + for name, taskstate in alloc.get_taskstates().items() + if taskstate.was_started() + ) + notrunningmaintasks = maintasks.difference(startedtasks) + if notrunningmaintasks: + # There are main tasks that are not running. + if alloc.is_pending_or_running(): + return False + else: + # The allocation has finished - the tasks will never start. + log.info( + f"Job {self.job.description()} has failed to start group {group.Name!r} tasks {' '.join(notrunningmaintasks)}" + ) + return True + groupallocsidsstr = andjoin(alloc.ID[:6] for alloc in groupallocs) + groupmsgs.append( + f"allocations {groupallocsidsstr} running group {group.Name!r} with {len(maintasks)} main tasks" + ) + msg = f"Job {self.job.description()} started " + andjoin(groupmsgs) + "." + log.info(msg) + self.started = True + return True + class NomadJobWatcherUntilFinished(NomadJobWatcher): """Watcher a job until the job is dead or purged""" @@ -1098,73 +1143,16 @@ def __init__( eval: Optional[nomadlib.Eval] = None, ): super().__init__(job, eval, "finished") - self.dbseenjob: bool = False - """The job was found at least once.""" - self.purged: bool = False - """If set, we are waiting until the job is there and it means that JobNotFound is not an error - the job was removed.""" - self.purgedlock: threading.Lock = threading.Lock() - """A special lock to synchronize callback with changing finish conditions""" - self.donemsg = "" - """Message printed when done watching. It is not printed right away, because we might decide to wait for purging later.""" - def db_init_cb(self) -> List[Event]: - try: - return super().db_init_cb() - except nomadlib.JobNotFound: - # This is fine to fail. - with self.purgedlock: - if self.purged and self.job: - # If the job was purged, generate one event so that listener can catch it. - return [ - Event( - EventTopic.Job, EventType.JobRegistered, self.job.asdict() - ) - ] - raise - - def finish_cb(self) -> bool: - # Protect against the situation when the job JSON is not in a database. - # init_cb first queries allocations, then the job itself. - if self.db.job: - self.dbseenjob = True - elif not self.dbseenjob: - return False - if self.job and self.no_active_allocations_and_evaluations_and_deployments(): - with self.purgedlock: - # Depending on purge argument, we wait for the job to stop existing - # or for the job to be dead. - if self.purged: - if self.db.job is None: - self.donemsg = f"Job {self.job.description()} removed no active allocations, evaluations nor deployments. Exiting." - return True - else: - if self.job.is_dead(): - self.donemsg = f"{self._job_is_dead_message()} Exiting." - return True - return False - - def stop(self): - super().stop() - if self.donemsg: - log.info(self.donemsg) - - def _get_exitcode(self) -> int: + def _get_exitcode_cb(self) -> int: exitcode: int = ( (ExitCode.success if self.done.is_set() else ExitCode.interrupted) if args.no_preserve_status - else self.allocworkers.exitcode() + else self.notifier.exitcode() ) log.debug(f"exitcode={exitcode}") return exitcode - def stop_job(self, purge: bool): - self.db.initialized.wait() - if purge: - with self.purgedlock: - self.purged = True - self.done.clear() - mynomad.stop_job(self.jobid, purge) - def job_finished_successfully(self): assert self.job if self.job.Status != "dead": @@ -1208,85 +1196,11 @@ def __init__( eval: Optional[nomadlib.Eval] = None, ): super().__init__(job, eval, "started") - self.hadalloc: bool = False - """The job had allocations.""" - self.started: bool = False - """The job finished because all allocations started, not because they failed.""" - - def __job_is_started(self) -> bool: - """Job is started if in current allocations all main tasks from all groups have started""" - assert self.job - allocations = self.db.allocations.values() - if not self.job.TaskGroups: - # Job can't possibly be running with no taskgroups. - return False - groupmsgs: List[str] = [] - for group in self.job.TaskGroups: - # Similar logic in db_filter_event_job() - groupallocs: List[nomadlib.Alloc] = [ - alloc - for alloc in allocations - if alloc.TaskGroup == group.Name - and ( - alloc.get("JobVersion", -1) >= self.job.Version - or self.db.evaluations.get(alloc.EvalID, {}).get( - "JobModifyIndex", -1 - ) - >= self.job.JobModifyIndex - ) - ] - if not groupallocs: - # This group has no allocations - yet! - return False - groupallocs.sort(key=lambda alloc: alloc.ModifyIndex, reverse=True) - lastalloc: nomadlib.Alloc = groupallocs[0] - # List of started tasks. - startedtasks: Set[str] = set( - name - for name, taskstate in lastalloc.get_taskstates().items() - if taskstate.was_started() - ) - # Main tasks are tasks that: - # - do not have lifecycle - # - have lifecycle prestart with sidecar = true - # - have lifecycle poststart - # All these tasks have to be started. - allmaintasks: Set[str] = set( - t.Name - for t in group.Tasks - if "Lifecycle" not in t - or t.Lifecycle is None - or ( - t.Lifecycle.Hook == nomadlib.LifecycleHook.prestart - and t.Lifecycle.get_sidecar() == True - ) - or t.Lifecycle.Hook == nomadlib.LifecycleHook.poststart - ) - assert len(allmaintasks) > 0, f"{allmaintasks} {startedtasks}" - notrunningmaintasks = allmaintasks.difference(startedtasks) - if notrunningmaintasks: - # There are main tasks that are not running. - return False - groupmsgs.append( - f"allocation {lastalloc.ID:.6} with {len(allmaintasks)} running main tasks of group {group.Name!r}" - ) - msg = f"Job {self.job.description()} started " + " and ".join(groupmsgs) + "." - log.info(msg) - self.started = True - return True def finish_cb(self) -> bool: - if not self.job or self.__job_is_started(): - return True - if ( - self.job.is_dead() - and self.no_active_allocations_and_evaluations_and_deployments() - ): - log.info(f"{self._job_is_dead_message()} Bailing out.") - return True - return False + return self.job_has_finished_starting() - def _get_exitcode(self) -> int: + def _get_exitcode_cb(self) -> int: exitcode = ExitCode.success if self.started else ExitCode.interrupted log.debug(f"started={self.started} so exitcode={exitcode}") return exitcode @@ -1297,8 +1211,8 @@ class NomadAllocationWatcher: def __init__(self, alloc: nomadlib.Alloc): self.alloc = alloc - self.db = Db( - topics=[f"Allocation:{alloc.JobID}"], + self.db = NomadDbJob( + topics=[f"Allocation:{alloc.ID}"], select_event_cb=lambda e: e.topic == EventTopic.Allocation and e.data["ID"] == alloc.ID, init_cb=lambda: [ @@ -1308,8 +1222,9 @@ def __init__(self, alloc: nomadlib.Alloc): mynomad.get(f"allocation/{alloc.ID}"), ) ], + force_polling=True if args.polling else None, ) - self.allocworkers = AllocWorkers(self.db) + self.allocworkers = NotifierWorker(self.db) self.finished = False # log.info(f"Watching allocation {alloc.ID}") @@ -1398,32 +1313,25 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: @click.group( help=f""" - Run a Nomad job in Nomad. Watch over the job and print all job - allocation events and tasks stdouts and tasks stderrs logs. Depending - on mode, wait for a specific event to happen to finish watching. - The script is intended to help debugging issues with running jobs - in Nomad and for synchronizing with execution of batch jobs in Nomad. - - \b - If the option --no-preserve-exit is given, then exit with the following status: - 0 if operation was successful - the job was run or was purged on --purge - Ohterwise, when mode is alloc, run, job, stop or stopped, exit with the following status: - ? when the job has one task, with that task exit status, - 0 if all tasks of the job exited with 0 exit status, - {ExitCode.any_failed_tasks} if any of the job tasks have failed, - {ExitCode.all_failed_tasks} if all job tasks have failed, - {ExitCode.any_unfinished_tasks} if any tasks are still running, - {ExitCode.no_allocations} if job has no started tasks. - In any case, exit with the following status: - 1 if some error occured, like python exception. - - \b - Examples: - nomad-watch --namespace default run ./some-job.nomad.hcl - nomad-watch job some-job - nomad-watch alloc af94b2 - nomad-watch -N services --task redis -1f job redis - """, +Depending on the command, run or stop a Nomad job. Watch over the job and +print all job allocation events and tasks stdouts and tasks stderrs +logs. Depending on command, wait for a specific event to happen to finish +watching. This program is intended to help debugging issues with running +jobs in Nomad and for synchronizing with execution of batch jobs in Nomad. + +Logs are printed in the format: 'mark>id>#version>group>task> message'. +The mark in the log lines is equal to: 'deploy' for messages printed as +a result of deployment, 'eval' for messages printed from evaluations, +'A' from allocation, 'E' for stderr logs of a task and 'O' from stdout +logs of a taks. + +\b +Examples: + nomad-watch run ./some-job.nomad.hcl + nomad-watch job some-job + nomad-watch alloc af94b2 + nomad-watch -N services --task redis -1f job redis +""", epilog=""" Written by Kamil Cukrowski 2023. Licensed under GNU GPL version 3 or later. """, @@ -1433,16 +1341,13 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: "-a", "--all", is_flag=True, - help=""" - Do not exit after the current job version is finished. - Instead, watch endlessly for any existing and new allocations of a job. - """, + help="Print logs from all allocations, including previous versions of the job.", ) @click.option( "-o", "--out", type=click.Choice( - "all alloc A stdout out O 1 stderr err E 2 evaluation eval e none".split() + "all alloc A stdout out O 1 stderr err E 2 evaluation eval e deployment deploy d none".split() ), default=["all"], multiple=True, @@ -1451,11 +1356,6 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: ) @click.option("-v", "--verbose", count=True, help="Be more verbose.") @click.option("-q", "--quiet", count=True, help="Be less verbose.") -@click.option( - "--json", - is_flag=True, - help="Job input is in json form. Passed to nomad command line interface with -json.", -) @click.option( "-A", "--attach", @@ -1478,14 +1378,12 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: @click.option( "-n", "--lines", - default=-1, + default=10, show_default=True, type=int, help=""" Sets the tail location in best-efforted number of lines relative to the end of logs. - Default prints all the logs. - Set to 0 to try try best-efforted logs from the current log position. - See also --lines-timeout. + Negative value prints all available log lines. """, ) @click.option( @@ -1506,7 +1404,7 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: "-f", "--follow", is_flag=True, - help="Shorthand for --all --lines=10 to act similar to tail -f.", + help="Never exit", ) @click.option( "--no-follow", @@ -1532,18 +1430,23 @@ def complete(ctx: click.Context, _: str, incomplete: str) -> List[str]: ) @click_log_options() @common_options() -def cli(**kwargs): +@click.pass_context +def cli(ctx, **kwargs): global args - args = argparse.Namespace(**kwargs) + args = argparse.Namespace(**{**kwargs, **PARAMS}) args.verbose -= args.quiet # if args.verbose > 1: http_client.HTTPConnection.debuglevel = 1 - if args.follow: - args.lines = 10 - args.all = True + if args.all: + args.lines = -1 if args.namespace: os.environ["NOMAD_NAMESPACE"] = nomad_find_namespace(args.namespace) + assert ( + (not args.follow and not args.no_follow) + or (args.follow and not args.no_follow) + or (not args.follow and args.no_follow) + ), f"--follow and --no-follow conflict" # global START_NS START_NS = time.time_ns() @@ -1555,19 +1458,68 @@ def cli(**kwargs): "jobid", shell_complete=complete_job(), ) -cli_jobfile_help = """ -JOBFILE can be file with a HCL or JSON nomad job or -it can be a string containing a HCL or JSON nomad job. -""" -cli_jobfile = click.argument( - "jobfile", - shell_complete=click.File().shell_complete, -) + + +def cli_jobfile(name: str, help: str): + # Disabled, becuase maintainance. + nomad_job_run_flags = [ + click.option("-check-index", type=int), + click.option("-detach", is_flag=True), + click.option("-eval-priority", type=int), + click.option("-json", is_flag=True), + click.option("-hcl1", is_flag=True), + click.option("-hcl2-strict", is_flag=True), + click.option("-policy-override", is_flag=True), + click.option("-preserve-counts", is_flag=True), + click.option("-consul-token"), + click.option("-vault-token"), + click.option("-vault-namespace"), + click.option("-var", multiple=True), + click.option("-var-file", type=click.File()), + ] + cli_jobfile_help = """ + JOBFILE can be file with a HCL or JSON nomad job or + it can be a string containing a HCL or JSON nomad job. + """ + return composed( + cli.command( + name, + help + cli_jobfile_help, + context_settings=dict(ignore_unknown_options=True), + ), + *nomad_job_run_flags, + click.argument( + "jobfile", + shell_complete=click.File().shell_complete, + ), + ) + + +def cli_command_run_nomad_job_run(name: str, help: str): + return composed( + cli.command( + name, + help=help.rstrip() + """\ + All following command arguments are passed to nomad job run command. + Note that nomad job run has arguments with a single dash. + """, + context_settings=dict(ignore_unknown_options=True), + ), + click.argument( + "cmd", + nargs=-1, + shell_complete=click.File().shell_complete, + ), + ) + ############################################################################### -@cli.command("alloc", help="Watch over specific allocation.") +@cli.command( + "alloc", + help="Watch over specific allocation. Like job mode, but only one allocation is filtered.", +) @click.argument( "allocid", shell_complete=completor(lambda: (x["ID"] for x in mynomad.get("allocations"))), @@ -1583,17 +1535,26 @@ def mode_alloc(allocid): @cli.command( - "run", - help=f""" -Run a Nomad job and then watch over it until the job is dead and has no pending or running job allocations. -Stop the job on interrupt or when finished, unless --no-stop option is given. -{cli_jobfile_help} -""", + "eval", help="Watch like job mode the job that results from a specific evaluation." +) +@click.argument( + "evalid", + shell_complete=completor(lambda: (x["ID"] for x in mynomad.get("evaluations"))), ) -@cli_jobfile @common_options() -def mode_run(jobfile): - evaluation = nomad_start_job(jobfile) +def mode_eval(evalid): + evaluation = mynomad.get(f"evaluations", params={"prefix": evalid}) + assert len(evaluation) > 0, f"Evaluation with id {evalid} not found" + assert len(evaluation) < 2, f"Multiple evaluations found starting with id {evalid}" + NomadJobWatcherUntilFinished(None, evaluation).run_and_exit() + + +@cli_command_run_nomad_job_run( + "run", + help=f"Run a Nomad job and then act like started mode.", +) +def mode_run(cmd: Tuple[str]): + evaluation = nomad_start_job(cmd) do = NomadJobWatcherUntilFinished(None, evaluation) try: do.wait() @@ -1606,46 +1567,52 @@ def mode_run(jobfile): ) do.stop_job(purge) do.wait() - do.stop() + do.stop_threads() exit(do.get_exitcode()) @cli.command( "job", - help="Watch a Nomad job. Show the job allocation events and logs.", + help="Alias to stopped command.", ) @cli_jobid @common_options() -def mode_job(jobid): +def mode_job(jobid: str): jobinit = nomad_find_job(jobid) NomadJobWatcherUntilFinished(jobinit).run_and_exit() -@cli.command( - "start", help=f"Start a Nomad Job. Then act like mode started. {cli_jobfile_help}" +@cli_command_run_nomad_job_run( + "start", help=f"Start a Nomad Job and then act like started command." ) -@cli_jobfile -@common_options() -def mode_start(jobfile): - evaluation = nomad_start_job(jobfile) +def mode_start(cmd: Tuple[str]): + evaluation = nomad_start_job(cmd) NomadJobWatcherUntilStarted(None, evaluation).run_and_exit() @cli.command( "started", - help=""" -Watch a Nomad job until the jobs main tasks are running or have been run. -Main tasks are all tasks without lifetime or sidecar prestart tasks or poststart tasks. + help=f""" +Watch a Nomad job until the job is started. +Job is started when it has no active deployments +and no active evaluations +and the number of allocations is equal to the number of groups multiplied by group count +and all main tasks in each allocation are running. +An active deployment is a deployment that has status equal to +initializing, running, pending, blocked or paused. +Main tasks are all tasks without lifetime property or sidecar prestart tasks or poststart tasks. \b Exit with the following status: - 0 all tasks of the job have started running, - 2 the job was stopped before any of the tasks could start. + {ExitCode.success } when all tasks of the job have started running, + {ExitCode.exception } when throwed python exception, + {ExitCode.interrupted} when process was interrupted, + {ExitCode.failed } when job was stopped or job deployment was reverted. """, ) @cli_jobid @common_options() -def mode_started(jobid): +def mode_started(jobid: str): jobinit = nomad_find_job(jobid) NomadJobWatcherUntilStarted(jobinit).run_and_exit() @@ -1662,7 +1629,7 @@ def mode_stop_in(jobid: str): @cli.command( "stop", - help="Stop a Nomad job. Then watch the job until the job is dead and has no pending or running allocations.", + help="Stop a Nomad job and then act like stopped command.", ) @cli_jobid @common_options() @@ -1672,7 +1639,7 @@ def mode_stop(jobid: str): @cli.command( "purge", - help="Same as stop with --purge.", + help="Alias to --purge stop.", ) @cli_jobid @common_options() @@ -1683,11 +1650,30 @@ def mode_purge(jobid: str): @cli.command( "stopped", - help="Watch a Nomad job until the job is stopped - has not running allocation.", + help=f""" +Watch a Nomad job until the job is stopped. +Job is stopped when the job is dead or, if the job was purged, does not exists anymore, +and the job has no running or pending allocations, +no active deployments and no active evaluations. + +\b +If the option --no-preserve-status is given, then exit with the following status: + {ExitCode.success } when the job was stopped +Ohterwise, exit with the following status: + {'?' } when the job has one task, with that task exit status, + {ExitCode.success } when all tasks of the job exited with 0 exit status, + {ExitCode.any_failed_tasks } when any of the job tasks have failed, + {ExitCode.all_failed_tasks } when all job tasks have failed, + {ExitCode.any_unfinished_tasks} when any tasks are still running, + {ExitCode.no_allocations } when job has no started tasks. +In any case, exit with the following exit status: + {ExitCode.exception } when throwed python exception, + {ExitCode.interrupted } when the process was interrupted. +""", ) @cli_jobid @common_options() -def mode_stopped(jobid): +def mode_stopped(jobid: str): jobinit = nomad_find_job(jobid) NomadJobWatcherUntilFinished(jobinit).run_and_exit() diff --git a/src/nomad_tools/nomaddbjob.py b/src/nomad_tools/nomaddbjob.py new file mode 100644 index 0000000..5f367d6 --- /dev/null +++ b/src/nomad_tools/nomaddbjob.py @@ -0,0 +1,253 @@ +import json +import logging +import queue +import threading +from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union + +import requests + +from . import nomadlib +from .common import mynomad +from .nomadlib import Event, EventTopic, EventType + +log = logging.getLogger(__name__) +T = TypeVar("T") + + +class NomadDbJob: + """Represents relevant state cache from Nomad database of a single job""" + + def __init__( + self, + topics: List[str], + select_event_cb: Callable[[Event], bool], + init_cb: Callable[[], List[Event]], + force_polling: Optional[bool] = None, + ): + """ + :param topic The topics to listen to, see Nomad event stream API documentation. + :param select_event_cb: Filter only relevant events from Nomad event stream. + :param init_cb Return a list of events to populate the database with and to poll. Has to be threadsafe! + """ + self.topics: List[str] = topics + self.init_cb: Callable[[], List[Event]] = init_cb + self.select_event_cb: Callable[[Event], bool] = select_event_cb + self.force_polling = force_polling + self.queue: queue.Queue[Optional[List[Event]]] = queue.Queue() + """Queue where database thread puts the received events""" + self.job: Optional[nomadlib.Job] = None + """Watched job""" + self.jobversions: Dict[int, nomadlib.Job] = {} + """Job version to job definition""" + self.evaluations: Dict[str, nomadlib.Eval] = {} + """Database of evaluations""" + self.allocations: Dict[str, nomadlib.Alloc] = {} + """Database of allocations""" + self.deployments: Dict[str, nomadlib.Deploy] = {} + """Database of deployments""" + self.initialized = threading.Event() + """Was init_cb called to initiliaze the database""" + self.stopevent = threading.Event() + """If set, the database thread should exit""" + self.thread = threading.Thread( + target=self.__thread_entry, name="db", daemon=True + ) + assert self.topics + assert not any(not x for x in topics) + + ############################################################################### + + def __thread_run_stream(self): + log.debug(f"Starting listen Nomad stream with {' '.join(self.topics)}") + with mynomad.stream( + "event/stream", + params={"topic": self.topics}, + ) as stream: + for line in stream.iter_lines(): + if line: + data = json.loads(line) + events: List[Event] = [ + Event( + EventTopic[event["Topic"]], + EventType[event["Type"]], + event["Payload"][event["Topic"]], + stream=True, + ) + for event in data.get("Events", []) + ] + # log.debug(f"RECV EVENTS: {events}") + self.queue.put(events) + if self.stopevent.is_set(): + break + + def __thread_poll(self): + """If listening to stream fails, we fallback to calling init_cb in a loop""" + # Delay polling start up until init_cb has been called. + self.initialized.wait() + while not self.stopevent.wait(1): + self.queue.put(self.init_cb()) + + def __thread_entry(self): + """Database thread entry""" + try: + try: + if self.force_polling is True: + self.__thread_poll() + else: + try: + self.__thread_run_stream() + except nomadlib.PermissionDenied as e: + if self.force_polling is False: + raise + else: + log.warning( + f"Falling to polling method because stream API returned permission denied: {e}" + ) + self.__thread_poll() + finally: + log.debug("Nomad database thread exiting") + self.queue.put(None) + except requests.HTTPError as e: + log.exception("http request failed") + exit(1) + + ############################################################################### + + def start(self): + """Start the database thread""" + assert ( + mynomad.namespace + ), "Nomad namespace has to be set before starting to listen" + self.thread.start() + + def select_is_in_db(self, e: Event): + """Select events that are already in the database""" + if e.topic == EventTopic.Evaluation: + return e.data["ID"] in self.evaluations + elif e.topic == EventTopic.Allocation: + return e.data["ID"] in self.allocations + elif e.topic == EventTopic.Deployment: + return e.data["ID"] in self.deployments + elif e.topic == EventTopic.Job: + return e.data["Version"] in self.jobversions + return False + + def _add_event_to_db(self, e: Event): + """Update database state to reflect received event""" + if e.topic == EventTopic.Job: + job = nomadlib.Job(e.data) + self.jobversions[job.get("Version")] = job + self.job = None if e.type == EventType.JobDeregistered else job + elif e.topic == EventTopic.Evaluation: + if e.type == EventType.JobDeregistered: + self.job = None + self.evaluations[e.data["ID"]] = nomadlib.Eval(e.data) + elif e.topic == EventTopic.Allocation: + # Events from event stream are missing JobVersion. Try to preserve it here by preserving keys. + self.allocations[e.data["ID"]] = nomadlib.Alloc( + {**self.allocations.get(e.data["ID"], {}), **e.data} + ) + elif e.topic == EventTopic.Deployment: + self.deployments[e.data["ID"]] = nomadlib.Deploy(e.data) + + def handle_event(self, e: Event) -> bool: + if self._select_new_event(e): + if self.select_is_in_db(e) or self.select_event_cb(e): + # log.debug(f"EVENT: {e}") + # log.debug(f"EVENT: {e} {e.data}") + self._add_event_to_db(e) + return True + else: + # log.debug(f"USER FILTERED: {e}") + pass + else: + # log.debug(f"OLD EVENT: {e}") + pass + return False + + def handle_events(self, events: List[Event]) -> List[Event]: + """From a list of events, filter out ignored and add the rest to database""" + return [e for e in events if self.handle_event(e)] + + @staticmethod + def apply_selects( + e: Event, + job_select: Callable[[nomadlib.Job], bool], + eval_select: Callable[[nomadlib.Eval], bool], + alloc_select: Callable[[nomadlib.Alloc], bool], + deploy_select: Callable[[nomadlib.Deploy], bool], + ) -> bool: + """Apply specific selectors depending on event type""" + return e.apply(job_select, eval_select, alloc_select, deploy_select) + + def _select_new_event(self, e: Event): + """Select events which are newer than those in the database""" + job_select: Callable[[nomadlib.Job], bool] = lambda _: True + eval_select: Callable[[nomadlib.Eval], bool] = ( + lambda eval: eval.ID not in self.evaluations + or eval.ModifyIndex > self.evaluations[eval.ID].ModifyIndex + ) + alloc_select: Callable[[nomadlib.Alloc], bool] = ( + lambda alloc: alloc.ID not in self.allocations + or alloc.ModifyIndex > self.allocations[alloc.ID].ModifyIndex + ) + deploy_select: Callable[[nomadlib.Deploy], bool] = ( + lambda deploy: deploy.ID not in self.deployments + or deploy.ModifyIndex > self.deployments[deploy.ID].ModifyIndex + ) + return e.data["Namespace"] == mynomad.namespace and self.apply_selects( + e, job_select, eval_select, alloc_select, deploy_select + ) + + def stop(self): + log.debug("Stopping listen Nomad stream") + self.initialized.set() + self.stopevent.set() + + def join(self): + # Not joining - neither requests nor stream API allow for timeouts. + # self.thread.join() + pass + + def events(self) -> Iterable[List[Event]]: + """Nomad stream returns Events array. Iterate over batches of events returned from Nomad stream""" + assert self.thread.is_alive(), "Thread not alive" + if not self.initialized.is_set(): + events = self.init_cb() + events = self.handle_events(events) + yield events + self.initialized.set() + log.debug("Starting getting events from thread") + while not self.queue.empty() or ( + self.thread.is_alive() and not self.stopevent.is_set() + ): + events = self.queue.get() + if events is None: + break + yield self.handle_events(events) + log.debug("db exiting") + + def get_allocation_job_version( + self, alloc: nomadlib.Alloc, default: T = None + ) -> Union[T, int]: + """Given an allocation return the job version associated with that allocation""" + if "JobVersion" in alloc: + return alloc.JobVersion + evaluation = self.evaluations.get(alloc.EvalID) + if not evaluation: + return default + jobversion = self.find_jobversion_from_modifyindex(evaluation.JobModifyIndex) + if jobversion is not None: + alloc.JobVersion = jobversion + return jobversion + return default + + def find_job_from_modifyindex(self, jobmodifyindex: int) -> Optional[nomadlib.Job]: + for _, job in sorted(self.jobversions.items(), reverse=True): + if job.JobModifyIndex <= jobmodifyindex: + return job + return None + + def find_jobversion_from_modifyindex(self, jobmodifyindex: int) -> Optional[int]: + ret = self.find_job_from_modifyindex(jobmodifyindex) + return ret.Version if ret else None diff --git a/src/nomad_tools/nomadlib/connection.py b/src/nomad_tools/nomadlib/connection.py index dbbdd22..f58d227 100644 --- a/src/nomad_tools/nomadlib/connection.py +++ b/src/nomad_tools/nomadlib/connection.py @@ -157,6 +157,7 @@ def request( raise JobNotFound(str(e)) from e elif resp == (404, "variable not found"): raise VariableNotFound(str(e)) from e + log.exception(resp) raise return ret diff --git a/src/nomad_tools/nomadlib/datadict.py b/src/nomad_tools/nomadlib/datadict.py index 27019cd..1165ef7 100644 --- a/src/nomad_tools/nomadlib/datadict.py +++ b/src/nomad_tools/nomadlib/datadict.py @@ -27,17 +27,23 @@ def msg() -> str: try: if dstorigin == list: assert type(srcval) == dstorigin, msg() - return [dsttype.__args__[0](x) for x in srcval] + return [ + _init_value(classname, dstname, dsttype.__args__[0], x) for x in srcval + ] elif dstorigin == dict: assert type(srcval) == dstorigin, msg() return { - dsttype.__args__[0](k): dsttype.__args__[1](v) + _init_value(classname, dstname, dsttype.__args__[0], k): _init_value( + classname, dstname, dsttype.__args__[1], v + ) for k, v in srcval.items() } elif dstorigin == Union: if type(srcval) in dsttype.__args__: return srcval return _init_value(classname, dstname, dsttype.__args__[0], srcval) + elif issubclass(dsttype, Any): + return srcval elif issubclass(dsttype, DataDict): return dsttype(srcval) elif issubclass(dsttype, enum.Enum): diff --git a/src/nomad_tools/nomadlib/types.py b/src/nomad_tools/nomadlib/types.py index 1e18851..133da7b 100644 --- a/src/nomad_tools/nomadlib/types.py +++ b/src/nomad_tools/nomadlib/types.py @@ -4,7 +4,8 @@ import datetime import enum import logging -from typing import Callable, Dict, List, Optional, TypeVar +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar from .datadict import DataDict @@ -56,13 +57,16 @@ class JobTask(DataDict): Driver: str User: str Config: JobTaskConfig - Lifecycle: Optional[JobTaskLifecycle] + Lifecycle: Optional[JobTaskLifecycle] = None Env: Optional[Dict[str, str]] + Services: Optional[List[Any]] class JobTaskGroup(DataDict): Name: str + Count: int Tasks: List[JobTask] + Services: Optional[List[Any]] class JobStatus(MyStrEnum): @@ -86,12 +90,24 @@ class Job(DataDict): TaskGroups: List[JobTaskGroup] Stop: bool Meta: Optional[Dict[str, str]] + SubmitTime: int def is_dead(self): return self.Status == JobStatus.dead def description(self): - return f"{self.ID}@v{self.Version}@{self.Namespace}" + return f"{self.ID}#{self.Version}@{self.Namespace}" + + def allservices(self): + """Return all services from groups and tasks of a job""" + out = [] + for tg in self.TaskGroups: + if tg.Services: + out += tg.Services + for task in tg.Tasks: + if task.Services: + out += task.Services + return out class JobsJob(DataDict): @@ -340,13 +356,30 @@ class DeploymentStatusDescription(MyStrEnum): PendingForPeer = "Deployment is pending, waiting for peer region" +class DeploymentTaskGroup(DataDict): + AutoPromote: bool + AutoRevert: bool + DesiredCanaries: int + DesiredTotal: int + HealthyAllocs: int + PlacedAllocs: int + PlacedCanaries: Optional[List[str]] = None + ProgressDeadline: int + Promoted: bool + RequireProgressBy: str + UnhealthyAllocs: int + + class Deploy(DataDict): ID: str ModifyIndex: int JobCreateIndex: int + JobModifyIndex: int + JobVersion: int JobID: str Status: str StatusDescription: str + TaskGroups: Dict[str, DeploymentTaskGroup] class EventTopic(enum.Enum): @@ -468,6 +501,36 @@ def apply( return callbacks[self.topic](self.data) +class EventApplier(ABC, Generic[R]): + @abstractmethod + def apply_job(self, job: Job) -> R: + raise NotImplementedError() + + @abstractmethod + def apply_eval(self, eval: Eval) -> R: + raise NotImplementedError() + + @abstractmethod + def apply_alloc(self, alloc: Alloc) -> R: + raise NotImplementedError() + + @abstractmethod + def apply_deploy(self, deploy: Deploy) -> R: + raise NotImplementedError() + + def apply(self, e: Event) -> R: + if e.is_job(): + return self.apply_job(Job(e.data)) + elif e.is_alloc(): + return self.apply_alloc(Alloc(e.data)) + elif e.is_eval(): + return self.apply_eval(Eval(e.data)) + elif e.is_deployment(): + return self.apply_deploy(Deploy(e.data)) + else: + raise KeyError(f"{e.topic} not handled") + + class VariableNoItems(DataDict): Namespace: str Path: str diff --git a/src/nomad_tools/nomadt.py b/src/nomad_tools/nomadt.py new file mode 100644 index 0000000..5906c08 --- /dev/null +++ b/src/nomad_tools/nomadt.py @@ -0,0 +1,117 @@ +#!/usr/bin/python3 + +import os +import shutil +from typing import Dict, List + +import click + +from .common import common_options, namespace_option + + +def get_nomad_commands() -> Dict[str, str]: + nomad_help = """\ + run Run a new job or update an existing job + stop Stop a running job + status Display the status output for a resource + alloc Interact with allocations + job Interact with jobs + node Interact with nodes + agent Runs a Nomad agent + acl Interact with ACL policies and tokens + agent-info Display status information about the local agent + config Interact with configurations + deployment Interact with deployments + eval Interact with evaluations + exec Execute commands in task + fmt Rewrites Nomad config and job files to canonical format + license Interact with Nomad Enterprise License + login Login to Nomad using an auth method + monitor Stream logs from a Nomad agent + namespace Interact with namespaces + operator Provides cluster-level tools for Nomad operators + plugin Inspect plugins + quota Interact with quotas + recommendation Interact with the Nomad recommendation endpoint + scaling Interact with the Nomad scaling endpoint + sentinel Interact with Sentinel policies + server Interact with servers + service Interact with registered services + system Interact with the system API + tls Generate Self Signed TLS Certificates for Nomad + ui Open the Nomad Web UI + var Interact with variables + version Prints the Nomad version + volume Interact with volumes + """ + ret = {} + for x in nomad_help.splitlines(): + if x: + tmp = x.strip().split(" ", 1) + if tmp and len(tmp) == 2: + ret[tmp[0]] = tmp[1].strip() + return ret + + +def get_additional_commands() -> Dict[str, str]: + """ + Search all executables in PATH environment variable and returns a List[str] of executables starting with prefix. + """ + path_env = os.environ.get("PATH", os.defpath) + executables = [] + prefix = "nomad-" + for path in path_env.split(os.pathsep): + try: + for file in os.listdir(path): + if file.startswith(prefix): + executables.append(file.lstrip(prefix)) + except FileNotFoundError: + pass + return {exec: f"**Run {prefix}{exec}" for exec in executables} + + +commands: Dict[str, str] = { + **get_nomad_commands(), + **get_additional_commands(), +} + +############################################################################### + + +@click.group( + help=f""" +If 'nomad-command[0]' exists, run it. +Otherwise, run 'nomad' with the arguments. +""", +) +@namespace_option() +@common_options() +def cli(**kwargs): + pass + + +for cmd, help in commands.items(): + + @cli.command( + cmd, + help=help, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), + ) + @click.argument("command", nargs=-1, type=click.UNPROCESSED) + @click.pass_context + def func(ctx: click.Context, command: List[str]): + name = ctx.command.name + assert name + command = [name, *command] + cmd = f"nomad-{command[0]}" + if shutil.which(cmd): + os.execvp(cmd, command) + else: + os.execvp("nomad", command) + + +if __name__ == "__main__": + cli() diff --git a/tests/integration/test_nomad_cp.py b/tests/integration/test_nomad_cp.py index 9b844e3..ed87542 100644 --- a/tests/integration/test_nomad_cp.py +++ b/tests/integration/test_nomad_cp.py @@ -28,7 +28,7 @@ def run_temp_job(): job = jobjson["Job"] jobname = job["ID"] try: - run_nomad_watch("--json start -", input=json.dumps(jobjson)) + run_nomad_watch("start -json -", input=json.dumps(jobjson)) with NomadTempdir(jobname) as nomaddir: with tempfile.TemporaryDirectory() as hostdir: yield jobname, nomaddir, hostdir diff --git a/tests/integration/test_nomad_watch.py b/tests/integration/test_nomad_watch.py index 456723a..4e1bf50 100644 --- a/tests/integration/test_nomad_watch.py +++ b/tests/integration/test_nomad_watch.py @@ -16,7 +16,7 @@ def test_nomad_watch_run_0(): job = gen_job(script=f"echo hello world") - run_nomad_watch("--purge --json -", input=json.dumps(job)) + run_nomad_watch("--purge run -json -", input=json.dumps(job)) def test_nomad_watch_run(): @@ -26,7 +26,7 @@ def test_nomad_watch_run(): jobid = job["Job"]["ID"] run(f"nomad stop --purge {jobid}", check=False) output = run_nomad_watch( - "--json run -", + "run -json -", input=json.dumps(job), check=exitstatus, stdout=1, @@ -41,7 +41,7 @@ def test_nomad_watch_start(): job = gen_job(script=f"sleep 1; echo {mark} ; exit {exitstatus}") jobid = job["Job"]["ID"] try: - run_nomad_watch("--json start -", input=json.dumps(job)) + run_nomad_watch("start -json -", input=json.dumps(job)) assert mark in run_nomad_watch(f"started {jobid}", stdout=1).stdout cmds = [ f"stop {jobid}", @@ -142,7 +142,7 @@ def test_nomad_watch_purge_successful_0(): job = gen_job("exit 0") jobid = job["Job"]["ID"] run_nomad_watch( - "--purge-successful --json run -", input=json.dumps(job), check=[0] + "--purge-successful run -json -", input=json.dumps(job), check=[0] ) assert not job_exists(jobid) @@ -152,7 +152,7 @@ def test_nomad_watch_purge_successful_123(): jobid = job["Job"]["ID"] try: run_nomad_watch( - "--purge-successful --json run -", input=json.dumps(job), check=[123] + "--purge-successful run -json -", input=json.dumps(job), check=[123] ) assert job_exists(jobid) finally: @@ -212,7 +212,7 @@ def test_nomad_watch_starting_with_preinit_tasks(): } } try: - run_nomad_watch("--json start -", input=json.dumps(job)) + run_nomad_watch("start -json -", input=json.dumps(job)) assert job_exists(jobid) allocs = [ nomadlib.Alloc(x) diff --git a/tests/integration/test_nomad_watch2.py b/tests/integration/test_nomad_watch2.py new file mode 100644 index 0000000..6f448d3 --- /dev/null +++ b/tests/integration/test_nomad_watch2.py @@ -0,0 +1,16 @@ +import datetime +import json +import time +from typing import Dict, List + +from nomad_tools import nomadlib +from tests.testlib import ( + caller, + gen_job, + job_exists, + nomad_has_docker, + run, + run_nomad_watch, +) + + diff --git a/tests/echo10sleep05.nomad.hcl b/tests/jobs/test-echo10sleep05.nomad.hcl similarity index 86% rename from tests/echo10sleep05.nomad.hcl rename to tests/jobs/test-echo10sleep05.nomad.hcl index ec15141..ba3d00a 100644 --- a/tests/echo10sleep05.nomad.hcl +++ b/tests/jobs/test-echo10sleep05.nomad.hcl @@ -1,5 +1,9 @@ job "test-echo10sleep05" { type = "batch" + namespace = "test" + meta { + uuid = uuidv4() + } group "example" { task "example" { driver = "docker" diff --git a/tests/jobs/test-listen.nomad.hcl b/tests/jobs/test-listen.nomad.hcl new file mode 100644 index 0000000..3183fc3 --- /dev/null +++ b/tests/jobs/test-listen.nomad.hcl @@ -0,0 +1,58 @@ +variable "ok" { + type = bool + default = false +} +job "test-listen" { + type = "service" + meta { + uuid = uuidv4() + } + group "listen" { + network { + port "http" { + } + } + reschedule { + attempts = 1 + interval = "15s" + delay = "5s" + unlimited = false + } + update { + canary = 1 + auto_revert = true + auto_promote = true + } + task "listen" { + driver = "docker" + restart { + attempts = 1 + interval = "15s" + delay = "1s" + } + config { + image = "busybox" + command = "sh" + args = [ + "-xc", + var.ok ? + "hostname > /tmp/index.html && exec httpd -fp ${NOMAD_PORT_http} -h /tmp" : + "exec false", + ] + ports = ["http"] + init = true + } + service { + port = "http" + #provider = "nomad" + check { + type = "http" + path = "/" + interval = "1s" + timeout = "1s" + failures_before_critical = 1 + } + } + } + } +} diff --git a/tests/jobs/test-maintask.nomad.hcl b/tests/jobs/test-maintask.nomad.hcl new file mode 100644 index 0000000..1e1796d --- /dev/null +++ b/tests/jobs/test-maintask.nomad.hcl @@ -0,0 +1,60 @@ +variable "count" { + default = 2 +} +job "test-maintask" { + type = "batch" + meta { + uuid = uuidv4() + } + group "maintask" { + count = var.count + task "main" { + driver = "raw_exec" + config { + command = "sh" + args = ["-xc", "exec sleep 60"] + } + } + task "prestart" { + driver = "raw_exec" + config { + command = "sh" + args = ["-xc", "echo prestart"] + } + lifecycle { + hook = "prestart" + } + } + task "prestart_sidecar" { + driver = "raw_exec" + config { + command = "sh" + args = ["-xc", "exec sleep 60" ] + } + lifecycle { + hook = "prestart" + sidecar = true + } + } + task "poststart" { + driver = "raw_exec" + config { + command = "sh" + args = ["-xc", "exec sleep 60"] + } + lifecycle { + hook = "poststart" + } + } + task "poststop" { + driver = "raw_exec" + config { + command = "sh" + args = ["-xc", "exec sleep 1"] + } + lifecycle { + hook = "poststop" + } + } + } +} diff --git a/tests/testlib.py b/tests/testlib.py index 90d9e8c..b2c39c5 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -3,17 +3,19 @@ import inspect import json import os +import re import shlex import subprocess import sys +import tempfile import time +from pathlib import Path from typing import IO, List, Optional, Union -os.environ.setdefault("NOMAD_NAMESPACE", "default") -import tempfile - from nomad_tools import nomadlib +os.environ.setdefault("NOMAD_NAMESPACE", "default") + def caller(up=0): return inspect.stack()[1 + up][3] @@ -102,6 +104,10 @@ def gen_job( return {"Job": job} +def get_test_job_file(name: str): + return Path(f"./tests/jobs/{name}.nomad.hcl") + + def quotearr(cmd: List[str]): return " ".join(shlex.quote(x) for x in cmd) @@ -148,3 +154,42 @@ def run_nomad_watch(cmd: str, **kwargs): def run_nomad_vardir(cmd: str, **kwargs): return run(f"python3 -m nomad_tools.nomad_vardir -v {cmd}", **kwargs) + + +def runcheck( + cmd: str, + exits: List[int] = [0], + stdout: List[Union[str, re.Pattern]] = [], + **kwargs, +): + cmda = shlex.split(cmd) + if ( + cmda[0].startswith("nomad-") + and cmda[0].split("-", 1)[1] in "cp watch vardir".split() + ): + cmda = [ + "python3 -m nomad_tools.{cmda[0].replace('-', '_')} -v".split(), + *cmda[1:], + ] + print(" ", file=sys.stderr, flush=True) + print(f"+ {quotearr(cmda)}", file=sys.stderr, flush=True) + rr = subprocess.run( + cmda, + text=True, + stdout=subprocess.PIPE if stdout else sys.stderr, + **kwargs, + ) + assert ( + rr.returncode in exits + ), f"Command {rr} died with {rr.returncode} not in {exits}" + if stdout: + for line in rr.stdout.splitlines(): + print("STDOUT:", rr.stdout) + for pat in stdout: + if isinstance(pat, str): + assert pat in rr.stdout, f"String {pat} not found" + elif isinstance(pat, re.Pattern): + assert pat.findall(rr.stdout), f"Pattern {pat} not found" + else: + assert 0 + return rr diff --git a/tests/unit/test_datadict.py b/tests/unit/test_datadict.py index 0562a84..128d7a3 100644 --- a/tests/unit/test_datadict.py +++ b/tests/unit/test_datadict.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from nomad_tools.nomadlib.datadict import DataDict @@ -104,3 +104,20 @@ def test_datadict_todict(): } f = ToDict(init) assert f.asdict() == init + + +def test_datadict_any(): + class A(DataDict): + a: List[Any] + b: Optional[List[List[Any]]] + + init = { + "a": ["a", 1, None], + "b": None, + } + assert A(init).asdict() == init + init2 = { + "a": ["a", 1, None], + "b": [["b", 2], ["c", 3]], + } + assert A(init2).asdict() == init2