From 35860983f4719e122a814be83a4ba5110fa18110 Mon Sep 17 00:00:00 2001 From: Kamil Cukrowski Date: Mon, 15 Jul 2024 00:36:51 +0200 Subject: [PATCH] githubrunner: update --- src/nomad_tools/entry_githubrunner.py | 1332 ++++++++++++++++++------- src/nomad_tools/entry_watch.py | 15 +- src/nomad_tools/nomadlib/types.py | 42 +- 3 files changed, 997 insertions(+), 392 deletions(-) diff --git a/src/nomad_tools/entry_githubrunner.py b/src/nomad_tools/entry_githubrunner.py index 6ce687a..343833e 100644 --- a/src/nomad_tools/entry_githubrunner.py +++ b/src/nomad_tools/entry_githubrunner.py @@ -3,31 +3,118 @@ import collections import concurrent.futures -import copy +import datetime import functools -import glob +import itertools import json import logging import os -import pkgutil +import random +import re +import shlex import subprocess +import threading import time import urllib.parse from dataclasses import asdict, dataclass, field from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import ( + Any, + Callable, + Generator, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, +) import click import clickdc +import jinja2 import requests import yaml +from nomad_tools import nomadlib + from .common import mynomad from .nomadlib.datadict import DataDict log = logging.getLogger(__name__) ############################################################################### +# config + +DEFAULT_RUNNER: str = """ +{% set image = (arg.image|default("myoung34/github-runner")) + ":" + (arg.tag|default("latest")) %} +job "{{ run.JOB_NAME }}" { + type = "batch" + meta { + INFO = <= 0 + assert self.repos + # check if throws + self.get_runner_inactivity_timeout() + + def get_runner_inactivity_timeout(self) -> Optional[datetime.timedelta]: + return ( + parse_time(self.runner_inactivity_timeout) + if self.runner_inactivity_timeout + else None + ) + + +CONFIG: Config + +DEFAULTCONFIG = """ +--- +nomad: + namespace: default +repos: + - Kamilcuk/runnertest + # - Kamilcuk +loop: 1 +""" + +############################################################################### +# counters + + +@dataclass +class IncAtomicInt: + v: int = 0 + lock: threading.Lock = field(default_factory=threading.Lock) + + def set(self, v: int): + with self.lock: + self.v = v + + def inc(self): + with self.lock: + pre = self.v + self.v += 1 + return pre + + def get(self): + with self.lock: + return self.v + + def __str__(self): + return str(self.get()) + + def __add__(self, o: IncAtomicInt): + with self.lock: + return self.v + o.get() + + +class Counters: + def __init__(self): + self.github_cached = IncAtomicInt() + self.github_miss = IncAtomicInt() + self.nomad_get = IncAtomicInt() + self.nomad_run = IncAtomicInt() + self.nomad_stop = IncAtomicInt() + self.nomad_purge = IncAtomicInt() + # not reset, global counter + self.cnt = IncAtomicInt(random.randint(0, 1000)) + + def print(self): + stats: str = ( + "stats: Github(" + + " ".join( + [ + f"requests={self.github_cached + self.github_miss}", + f"cached={self.github_cached}", + f"miss={self.github_miss})", + ] + ) + + " Nomad(" + + " ".join( + [ + f"get={self.nomad_get}", + f"run={self.nomad_run}", + f"stop={self.nomad_stop}", + f"purge={self.nomad_purge}", + ] + ) + + ")" + ) + log.info(stats) + for i in dir(self): + if ( + not i.startswith("_") + and isinstance(getattr(self, i), IncAtomicInt) + and i != "cnt" + ): + getattr(self, i).set(0) +COUNTERS = Counters() +"""Small object to accumulate what is happening in a single loop""" + ############################################################################### +# helpers + +T = TypeVar("T") +R = TypeVar("R") + + +def parallelmap( + func: Callable[[T], R], arr: Iterable[T], nproc: Optional[int] = None +) -> Iterable[R]: + """Execute lambda over array in parellel and return an array of it""" + if ARGS.noparallel: + return map(func, arr) + else: + with concurrent.futures.ThreadPoolExecutor(nproc) as p: + futures = [p.submit(func, x) for x in arr] + return (x.result() for x in concurrent.futures.as_completed(futures)) + # return list(p.map(func, arr)) + + +def flatten(xss: Iterable[Iterable[T]]) -> Iterable[T]: + return (x for xs in xss for x in xs) + + +PARSE_TIME_REGEX = re.compile( + r"((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?" +) + + +def parse_time(time_str) -> Optional[datetime.timedelta]: + # https://stackoverflow.com/a/4628148/9072753 + parts = PARSE_TIME_REGEX.match(time_str) + if not parts: + return + parts = parts.groupdict() + time_params = {} + for name, param in parts.items(): + if param: + time_params[name] = int(param) + return datetime.timedelta(**time_params) @functools.lru_cache() @@ -135,22 +393,129 @@ def nomad_job_to_json(job: str) -> dict: return data -def gh_get(url: str, token: Optional[str], key: str = "") -> Any: +############################################################################### +# github + + +@functools.lru_cache() +def github_cachefile(): + return Path(os.path.expanduser(CONFIG.github.cachefile)) + + +@dataclass +class GithubCache: + """Stores and manages github cache + See: + https://docs.github.com/en/rest/using-the-rest-api/best-practices-for-using-the-rest-api?apiVersion=2022-11-28#use-conditional-requests-if-appropriate + """ + + type Url = str + + @dataclass + class Value: + """The cache may be etag or last-modified see github docs""" + + is_etag: bool + etag_or_last_modified: str + response: dict + timestamp: float = field(default_factory=lambda: time.time()) + + data: dict[Url, Value] = field(default_factory=dict) + """The data stored by cache is keyed with URL""" + version: int = 1 + + @classmethod + def load(cls): + """Load the GithubCache from file""" + try: + with open(github_cachefile()) as f: + data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return GithubCache() + if data["version"] != GithubCache.version: + return GithubCache() + gh = GithubCache({k: cls.Value(**v) for k, v in data["data"].items()}) + log.info( + f"Github cache loaded {len(gh.data)} entries datafrom {github_cachefile()}" + ) + return gh + + def save(self): + """Save the cache to file. Run each loop""" + data = { + "version": self.version, + "data": {k: asdict(v) for k, v in self.data.items()}, + } + os.makedirs(os.path.dirname(github_cachefile()), exist_ok=True) + with open(github_cachefile(), "w") as f: + json.dump(data, f) + log.debug( + f"Github cache saved {len(self.data)} entires to {github_cachefile()}" + ) + + def prepare(self, url: str, headers: dict[str, str]): + cached = self.data.get(url) + if cached: + if cached.is_etag: + headers["if-none-match"] = cached.etag_or_last_modified + else: + headers["if-modified-since"] = cached.etag_or_last_modified + + def handle(self, response: requests.Response) -> Optional[dict]: + if response.status_code == 304: + COUNTERS.github_cached.inc() + return self.data[response.url].response + else: + COUNTERS.github_miss.inc() + etag = response.headers.get("etag") + if etag: + self.data[response.url] = self.Value(True, etag, response.json()) + else: + last_modified = response.headers.get("last-modified") + if last_modified: + self.data[response.url] = self.Value( + False, last_modified, response.json() + ) + return None + + +GITHUB_CACHE: GithubCache +"""Stores and manages the github cache""" + + +def gh_get(url: str, key: str = "") -> Any: """Execute query to github @param key if set, means the output is paginated """ headers = { "Accept": "application/vnd.github+json", - **({"Authorization": "Bearer " + token} if token else {}), + **( + {"Authorization": "Bearer " + CONFIG.github.token} + if CONFIG.github.token + else {} + ), } - ret: list = [] + ret: list[dict] = [] while True: + # Prepare the request adding headers from Github cache. + GITHUB_CACHE.prepare(url, headers) response = requests.get(url, headers=headers) - data = response.json() + log.debug(f"{url} {headers} {response}") + response.raise_for_status() + # If the result is found in github cache, use it, otherwise extract it. + data = GITHUB_CACHE.handle(response) + if not data: + data = response.json() + # If no key, this is not paged url. if not key: return data - ret.extend(data[key]) + # If key, this is paged url - append it and continue. + try: + ret.extend(data[key]) + except KeyError: + log.exception(f"key={key} data={data}") + raise next = response.links.get("next") if not next: break @@ -160,50 +525,177 @@ def gh_get(url: str, token: Optional[str], key: str = "") -> Any: @functools.lru_cache() @functools.wraps(gh_get) -def gh_get_cached(url: str, token: Optional[str], key: str = ""): - return gh_get(url, token, key) +def gh_get_cached(url: str, key: str = ""): + return gh_get(url, key) -@dataclass -class Args: - dryrun: bool = clickdc.option("-n") - verbose: bool = clickdc.option("-v") - config: Path = clickdc.option( - "-c", type=click.Path(exists=True, dir_okay=False, path_type=Path) +############################################################################### +# github high level + +type GithubRepo = str + + +@dataclass(frozen=True) +class GithubJob: + repo: GithubRepo + run: dict + job: dict + + def labelsstr(self): + return ",".join(sorted(list(set(self.job["labels"])))) + + def job_url(self): + return self.job["html_url"] + + def repo_url(self): + return self.run["repository"]["html_url"] + + +def get_gh_repos_one_to_many(repo: str) -> List[GithubRepo]: + """If github repo is a organization or user, expand to + all repositories of that organization or user""" + assert repo.count("/") <= 1, f"{repo} has too many /" + if repo.count("/") == 0: + d = gh_get_cached(f"{CONFIG.github.url}/users/{repo}") + d = gh_get(d["repos_url"]) + return [x["full_name"] for x in d] + else: + return [repo] + + +def get_gh_repos() -> Set[GithubRepo]: + """Get list of configured repositoires with organizations + and users repositories expanded""" + return set(flatten(parallelmap(get_gh_repos_one_to_many, set(CONFIG.repos)))) + + +def get_gh_runs(repo: GithubRepo, run: dict) -> List[GithubJob]: + interesting = "queued in_progress".split() + ret: List[GithubJob] = [] + if run["status"] in interesting: + jobs = gh_get( + f"{CONFIG.github.url}/repos/{repo}/actions/runs/{run['id']}/jobs", + "jobs", + ) + for job in jobs: + if job["status"] in interesting: + ret.append(GithubJob(repo, run, job)) + return ret + + +def get_gh_jobs(repo: GithubRepo) -> Iterable[GithubJob]: + runs = gh_get( + f"{CONFIG.github.url}/repos/{repo}/actions/runs", + "workflow_runs", ) + return flatten(parallelmap(lambda x: get_gh_runs(repo, x), runs)) + + +def get_gh_state(repos: Set[GithubRepo]) -> list[GithubJob]: + reqstate: list[GithubJob] = list(flatten(parallelmap(get_gh_jobs, repos))) + # desc: str = ", ".join(s.labelsstr() + " for " + s.job_url() for s in reqstate) + # log.info(f"Found {len(reqstate)} required runners to run: {desc}") + for idx, s in enumerate(reqstate): + logging.info(f"GHJOB: {idx} {s.job_url()} {s.labelsstr()} {s.run['status']}") + return reqstate ############################################################################### +# nomad -@dataclass -class GithubRepo: - github: GithubConfig - repo: str - - -def get_repos() -> List[GithubRepo]: - repos: List[GithubRepo] = [] - for github in CONFIG.github: - for repo in github.repos: - if repo.count("/") == 0: - d = gh_get_cached(f"{github.url}/users/{repo}", github.token) - d = gh_get(d["repos_url"], github.token) - repos.extend([GithubRepo(github, x["full_name"]) for x in d]) - else: - repos.append(GithubRepo(github, repo)) - return repos +class NomadJobCommon(dict): + def labelsstr(self) -> str: + return self["Meta"][CONFIG.nomad.meta + "_LABELS"] + + def repo(self) -> str: + return self["Meta"][CONFIG.nomad.meta + "_REPO"] + + def repo_url(self) -> str: + return self["Meta"][CONFIG.nomad.meta + "_REPO_URL"] + + def job_url(self) -> str: + return self["Meta"][CONFIG.nomad.meta + "_JOB_URL"] -class NomadJob(dict): - def job_url(self) -> Optional[str]: - return (self["Meta"] or {}).get(CONFIG.nomad.meta + "_job_url") +class NomadJob(nomadlib.Job, NomadJobCommon): + """/v1/job/ID return value of the runner""" - def labels(self) -> Optional[str]: - return (self["Meta"] or {}).get(CONFIG.nomad.meta + "_labels") + pass -def get_curstate() -> list[NomadJob]: +@dataclass +class GithubRunnerState: + inactive_since: Optional[datetime.datetime] = None + + +class NomadRunner(nomadlib.JobsJob, NomadJobCommon): + """/v1/jobs return value of the runner""" + + def tostr(self): + return f"{self.ID} {self.repo_url()} {self.Status}" + + def get_github_runner_state(self) -> Optional[GithubRunnerState]: + """Check if this runner is _right now_ + executing a github job. This is best efforted + by parsing stdout logs of the runner""" + if not self.is_running(): + return None + COUNTERS.nomad_get.inc() + allocs = [ + nomadlib.Alloc(x) + for x in mynomad.get( + f"job/{self.ID}/allocations", + params=dict( + namespace=CONFIG.nomad.namespace, + filter='ClientStats == "running"', + ), + ) + ] + alloc = next((alloc for alloc in allocs if alloc.is_running_started()), None) + if not alloc: + return None + COUNTERS.nomad_get.inc() + logs = mynomad.request( + "GET", + f"/client/fs/logs/{alloc.ID}", + params=dict( + namespace=CONFIG.nomad.namespace, + task=alloc.get_tasknames()[0], + type="stdout", + origin="end", + plain=True, + ), + ).text + """ + 2024-07-14 12:14:17Z: Listening for Jobs + 2024-07-14 12:14:20Z: Running job: build + 2024-07-14 12:14:29Z: Job build completed with result: Succeeded + See: https://github.com/actions/runner/blob/main/src/Runner.Listener/JobDispatcher.cs + curl -sS 'https://raw.githubusercontent.com/actions/runner/main/src/Runner.Listener/JobDispatcher.cs' | grep term.WriteLine + """ + for line in reversed(logs.strip().splitlines()): + parts = line.split(": ", 1) + if len(parts) == 2: + try: + time = datetime.datetime.fromisoformat(parts[0]).astimezone() + except ValueError as e: + log.debug(f"{line} -> {e}") + continue + msg = parts[1] + if ( + "Listening for jobs".lower() in msg.lower() + or "completed with result".lower() in msg.lower() + ): + return GithubRunnerState(time) + if "Running job".lower() in msg.lower(): + # is not inactive - return now + return GithubRunnerState() + return None + + +def get_nomad_state() -> list[NomadRunner]: + COUNTERS.nomad_get.inc() jobsjobs = mynomad.get( "jobs", params=dict( @@ -212,336 +704,398 @@ def get_curstate() -> list[NomadJob]: namespace=CONFIG.nomad.namespace, ), ) - curstate: list[NomadJob] = [] + # + curstate: list[NomadRunner] = [] for jobsjob in jobsjobs: - nj = NomadJob(jobsjob) - if nj.job_url(): - curstate.append(nj) - desc = ", ".join(f"({x['Status']} {x.job_url()})" for x in curstate) - log.info(f"Found {len(curstate)} runners: {desc}") - for s in curstate: - logging.debug(f"CUR: {s}") + nj = NomadRunner(jobsjob) + try: + # Get only valid jobsjobs + nj.repo_url() + nj.labelsstr() + except (KeyError, AttributeError): + continue + curstate.append(nj) + # + # log.info(f"Found {len(curstate)} runners:") + for i, s in enumerate(curstate): + log.info(f"RUNNER: {i} {s.tostr()}") return curstate -JobUrl = str +############################################################################### +# runners @dataclass -class GithubJob(GithubRepo): - run: dict - job: dict +class Runners: + dict: dict[re.Pattern, jinja2.Template] = field(default_factory=dict) + env: jinja2.Environment = field( + default_factory=lambda: jinja2.Environment(loader=jinja2.BaseLoader()) + ) - def labels(self): - return ",".join(sorted(list(set(self.job["labels"])))) + @staticmethod + def load(specs: dict[str, str]) -> Runners: + ret = Runners() + for k, v in specs.items(): + ret.dict[re.compile(k)] = ret.env.from_string(v) + return ret - def job_url(self): - return self.job["html_url"] + def find(self, labelsstr: str) -> Optional[jinja2.Template]: + template = next((v for k, v in self.dict.items() if k.match(labelsstr)), None) + if not template: + return None + return template -def get_reqstate(repos: List[GithubRepo]): - reqstate: dict[JobUrl, GithubJob] = {} - for repo in repos: - runs = gh_get( - f"{repo.github.url}/repos/{repo.repo}/actions/runs", - repo.github.token, - "workflow_runs", +RUNNERS: Runners + + +@dataclass +class TemplateContext: + JOB_NAME: str + ACCESS_TOKEN: str + REPO: str + REPO_URL: str + JOB_URL: str + LABELS: str + + @staticmethod + def make_example(labelsstr: str): + return TemplateContext( + JOB_NAME="Example_job_name", + ACCESS_TOKEN="example_access_token", + REPO="user/repo", + REPO_URL="http://example.repo.url/user/repo", + JOB_URL="http://example.job.url/user/repo/run/12312/job/1231", + LABELS=labelsstr, + ) + + @staticmethod + def make_from_github_job(gj: GithubJob): + return TemplateContext( + JOB_NAME=urllib.parse.quote( + "-".join( + str(x) + for x in [ + CONFIG.nomad.jobprefix, + COUNTERS.cnt.inc(), + gj.repo.replace("/", "-").strip("-"), + gj.labelsstr(), + ] + ), + safe="", + )[:64], + ACCESS_TOKEN=CONFIG.github.token or "", + REPO=gj.repo, + REPO_URL=gj.repo_url(), + JOB_URL=gj.job_url(), + LABELS=gj.labelsstr(), + ) + + def to_template_args(self) -> dict: + arg = {} + for label in self.LABELS.split(","): + for part in shlex.split(label): + split = part.split("=", 1) + arg[split[0]] = split[1] if len(split) == 2 else "" + return dict( + run=asdict(self), + arg=arg, + add=CONFIG.add, + CONFIG=CONFIG, + RUNNERS=RUNNERS, + ARGS=ARGS, ) - for run in runs: - if run["status"] in ["queued"]: - jobs = gh_get( - f"{repo.github.url}/repos/{repo.repo}/actions/runs/{run['id']}/jobs", - repo.github.token, - "jobs", + + +def get_runner_jobspec(gj: GithubJob) -> Optional[NomadJob]: + template = RUNNERS.find(gj.labelsstr()) + if not template: + return None + tc = TemplateContext.make_from_github_job(gj) + jobtext = template.render(tc.to_template_args()) + if not jobtext: + return None + # + try: + jobspec = json.loads(jobtext) + except json.JSONDecodeError: + try: + jobspec = json.loads( + subprocess.check_output( + "nomad job run -output -".split(), input=jobtext, text=True ) - for job in jobs: - if job["status"] in "queued running".split(): - gj = GithubJob(repo.github, repo.repo, run, job) - reqstate[gj.job_url()] = gj - desc: str = ", ".join(s.labels() + " for " + s.job_url() for s in reqstate.values()) - log.info(f"Found {len(reqstate)} required runners to run : {desc}") - for s in reqstate.values(): - logging.debug(f"REQ: {s}") - return reqstate + ) + except subprocess.CalledProcessError: + log.exception(f"Could not decode template for {gj.labelsstr()}: {template}") + raise + jobspec = jobspec.get("Job", jobspec) + # Apply default transformations. + for key in ["ID", "Name"]: + if key in jobspec: + # Github runner name can be max 64 characters and no special chars. + # Still try to be as verbose as possible. + # The hash generated from job_url should be unique enough. + jobspec[key] = tc.JOB_NAME + jobspec["Namespace"] = CONFIG.nomad.namespace + jobspec["Meta"] = { + **(jobspec.get("Meta", {}) or {}), + **{ + CONFIG.nomad.meta + "_" + k: str(v) + for k, v in asdict(tc).items() + if k.lower() != "token" + }, + } + jobspec = NomadJob(jobspec) + return jobspec + + +############################################################################### +# scheduler @dataclass class Todo: - tostart: List[dict] = field(default_factory=list) + """Represents actions to take""" + + tostart: List[NomadJob] = field(default_factory=list) tostop: List[str] = field(default_factory=list) topurge: List[str] = field(default_factory=list) + def __add__(self, o: Todo): + return Todo( + self.tostart + o.tostart, + self.tostop + o.tostop, + self.topurge + o.topurge, + ) -def loop(): - repos = get_repos() - reqstate: dict[JobUrl, GithubJob] = get_reqstate(repos) - curstatearr: list[NomadJob] = get_curstate() - # - tostart: List[dict] = [] - tostop: List[str] = [] - topurge: List[str] = [] - # - curstate: Dict[JobUrl, NomadJob] = {} - for nj in curstatearr: - joburl = nj.job_url() - assert joburl - if joburl not in curstate: - curstate[joburl] = nj + def execute(self): + # Sanity checks. + tostartids: List[str] = [x["ID"] for x in self.tostart] + ID: str + for ID in tostartids: + assert ID not in self.tostop, f"{ID} is tostart and tostop" + assert ID not in self.topurge, f"{ID} is tostart and topurge" + for ID in self.tostop: + assert ID not in self.topurge, f"{ID} is tostrop and topurge" + for ID, cnt in collections.Counter(tostartids).items(): + assert cnt == 1, f"Repeated id in tostart: {ID}" + for ID, cnt in collections.Counter(self.tostop).items(): + assert cnt == 1, f"Repeated id in tostop: {ID}" + for ID, cnt in collections.Counter(self.topurge).items(): + assert cnt == 1, f"Repeated id in tostop: {ID}" + # Print them. + if self.tostart: + log.info(f"tostart: {' '.join(tostartids)}") + if self.tostop: + log.info(f"tostop: {' '.join(self.tostop)}") + if self.topurge: + log.info(f"topurge: {' '.join(self.topurge)}") + # Execute them. + if ARGS.dryrun: + log.error("DRYRUN") else: - log.error( - f"Found two nomad jobs {nj['ID']} and {curstate[joburl]['ID']} for one github job {joburl} - stopping" - ) - tostop.append(nj["ID"]) - # - for joburl, req in reqstate.items(): - nomadjob = curstate.get(joburl) - if not nomadjob: - # The job is missing - run the job. - jobspec = find_runner_jobspec(set(req.job["labels"])) - if not jobspec: - logging.error( - f"Did not found runner matching {req.labels()} for {req.job_url()}" + for spec in self.tostart: + COUNTERS.nomad_run.inc() + try: + resp = mynomad.start_job(spec.asdict()) + except Exception: + log.exception(f"Could not start: {json.dumps(spec)}") + raise + log.info(resp) + for name in self.tostop: + COUNTERS.nomad_stop.inc() + resp = mynomad.stop_job(name) + log.info(resp) + if CONFIG.nomad.purge: + for name in self.topurge: + COUNTERS.nomad_purge.inc() + resp = mynomad.stop_job(name, purge=True) + log.info(resp) + + +@dataclass +class RepoState: + """Represents state of one repository. + Currently this program supports only github-runners bound to one repo""" + + repo_url: str + githubjobs: List[GithubJob] = field(default_factory=list) + nomadrunners: List[NomadRunner] = field(default_factory=list) + + def validate(self): + # Check that all data are for one repo. + for gj in self.githubjobs: + assert gj.repo_url() == self.repo_url + for nr in self.nomadrunners: + assert nr.repo_url() == self.repo_url + + def __find_free_githubjob(self, labelsstr: str, todo: Todo) -> GithubJob: + used_job_urls = set( + itertools.chain( + ( + (x.job_url() for x in todo.tostart if x.labelsstr() == labelsstr), + ( + nr.job_url() + for nr in self.nomadrunners + if nr.labelsstr() == labelsstr + ), ) - continue - # Change the ID and Name of the job with the prefix we need. - for key in ["ID", "Name"]: - if key in jobspec: - # Github runner name can be max 64 characters and no special chars. - # Still try to be as verbose as possible. - # The hash generated from job_url should be unique enough. - jobspec[key] = urllib.parse.quote( - "-".join( - [ - CONFIG.nomad.jobprefix, - str(abs(hash(req.job_url()))), - req.repo.replace("/", "-"), - req.run["display_title"], - ] - ), - safe="", - )[:64] - jobspec["Namespace"] = CONFIG.nomad.namespace - jobspec["Meta"] = { - **(jobspec["Meta"] or {}), - **{ - CONFIG.nomad.meta + "_" + k: str(v) - for k, v in dict( - job_url=req.job["html_url"], - job_id=req.job["id"], - run_id=req.job["run_id"], - labels=req.labels(), - repo_url=req.run["repository"]["html_url"], - token=req.github.access_token or req.github.token, - ).items() - }, - } - log.info( - f"Running job {jobspec['ID']} with {req.labels()} for {req.job_url()}" ) - log.debug(f"Running {jobspec}") - tostart.append(jobspec) - for joburl, nomadjob in curstate.items(): - if joburl not in reqstate: - if nomadjob["Status"].lower() != "dead" and not nomadjob["Stop"]: - log.warning( - f"Job {nomadjob['ID']} for {joburl} is not required, stopping" - ) - tostop.append(nomadjob["ID"]) - elif CONFIG.nomad.purge and nomadjob["Status"].lower() == "dead": - # Check that there are no running evaluations. - evals = mynomad.get(f"job/{nomadjob['ID']}/evaluations") - if len(evals) == 0 or all( - eval["Status"] == "complete" for eval in evals - ): - name = nomadjob["ID"] - log.info(f"Purging {name} for {joburl} with no running deployments") - topurge.append(name) - # - for nj in tostop: - assert nj not in [x["ID"] for x in tostart], f"{nj} is tostop and tostart" - for nj in tostart: - assert nj not in tostop - assert nj not in topurge - for nj in tostop: - assert nj not in topurge - for nj in topurge: - assert nj not in [x["ID"] for x in tostart] - # - if tostart: - log.info(f"tostart: {' '.join(x['ID'] for x in tostart)}") - if tostop: - log.info(f"tostop: {' '.join(tostop)}") - if topurge: - log.info(f"topurge: {' '.join(topurge)}") - if ARGS.dryrun: - log.error("DRYRUN") - else: - for spec in tostart: - jsonspec = json.dumps(spec) - try: - subprocess.run( - "nomad job run -detach -json -".split(), - input=jsonspec, - check=True, - text=True, + ) + gjobs = [x for x in self.githubjobs if x.labelsstr() == labelsstr] + picked = next((x for x in gjobs if x.job_url() not in used_job_urls), None) + if picked: + return picked + return random.choice(tuple(gjobs)) + + def gen_runners_to_stop(self) -> Generator[NomadRunner]: + """Generate runners to stop.""" + # To stop runner it can't be alredy stopped + notdeadnotstop = [ + nr for nr in self.nomadrunners if not nr.is_dead() and not nr.Stop + ] + # First stop pending ones. + for nr in notdeadnotstop: + if nr.is_pending(): + yield nr + # Then stop ones inactive for longer thatn the configured timeout. + # Getting runner state is costly. + timeout = CONFIG.get_runner_inactivity_timeout() + if timeout: + for nr in notdeadnotstop: + if nr.is_running(): + rstate = nr.get_github_runner_state() + if rstate and rstate.inactive_since: + inactivefor = ( + datetime.datetime.now().astimezone() - rstate.inactive_since + ) + if inactivefor > timeout: + log.warning( + f"INACTIVE: Runner is inactive for {inactivefor} > {CONFIG.runner_inactivity_timeout}: {nr.tostr()}" + ) + yield nr + else: + log.info( + f"INACTIVE: Runner continues inactive for {inactivefor} < {CONFIG.runner_inactivity_timeout}: {nr.tostr()}" + ) + + def scheduler(self) -> Todo: + """Decide which jobs to run or stop""" + self.validate() + todo = Todo() + neededlabelsstrs: dict[str, int] = collections.Counter( + gj.labelsstr() for gj in self.githubjobs + ) + runninglabelsstrs: dict[str, int] = collections.Counter( + nr.labelsstr() for nr in self.nomadrunners + ) + alllabelsstrs: set[str] = set( + itertools.chain(neededlabelsstrs.keys(), runninglabelsstrs.keys()) + ) + for labelsstr in alllabelsstrs: + needed = neededlabelsstrs.get(labelsstr, 0) + running = runninglabelsstrs.get(labelsstr, 0) + diff = needed - running + if diff > 0: + for _ in range(diff): + jobspec = get_runner_jobspec( + self.__find_free_githubjob(labelsstr, todo) + ) + if jobspec is not None: + log.info( + f"Running job {jobspec.ID} with {jobspec.labelsstr()} for {jobspec.job_url()}" + ) + log.debug(f"Running {jobspec}") + todo.tostart.append(jobspec) + else: + log.error( + f"Runner for {labelsstr} in {self.repo_url} not found" + ) + elif diff < 0: + # Only remove those ones that do not run anything. + # Generator to evaluate only those that needed. + # Diff id negative. + todo.tostop.extend( + x.ID for x in itertools.islice(self.gen_runners_to_stop(), -diff) ) - except subprocess.CalledProcessError: - log.exception(f"Could not start: {jsonspec}") - raise - for name in tostop: - subprocess.check_call("nomad job stop -detach".split() + [name]) + # if CONFIG.nomad.purge: - for name in topurge: - subprocess.check_call("nomad job stop -detach -purge".split() + [name]) - + for nr in self.nomadrunners: + if nr.is_dead() and nr.JobSummary.get_sum_summary().only_completed(): + # Check that there are no running evaluations to be sure. + evals = mynomad.get(f"job/{nr.ID}/evaluations") + if len(evals) == 0 or all( + eval["Status"] == "complete" for eval in evals + ): + log.info( + f"Purging {nr.ID} for {nr.repo_url()} with no running deployments" + ) + todo.topurge.append(nr.ID) + # + return todo -############################################################################### -DEFAULTCONFIG = """ ---- -nomad: - namespace: default -github: - - repos: - - Kamilcuk/runnertest -""" +def loop(): + """The main loop of this program""" + # Get the github state. + repos = get_gh_repos() + reqstate: list[GithubJob] = get_gh_state(repos) + # Get the nomad state. + curstate: list[NomadRunner] = get_nomad_state() + # Construct current state with repo as the key. + repostates: dict[str, RepoState] = {} + for gj in reqstate: + repostates.setdefault( + gj.repo_url(), RepoState(gj.repo_url()) + ).githubjobs.append(gj) + for nj in curstate: + repostates.setdefault( + nj.repo_url(), RepoState(nj.repo_url()) + ).nomadrunners.append(nj) + # For each repo, run the scheduler and collect results. + todo: Todo = functools.reduce( + Todo.__add__, parallelmap(RepoState.scheduler, repostates.values()), Todo() + ) + # Apply limits + for ll in CONFIG.limits: + if ll.max >= 0: + count = 0 + tmp = [] + for tostart in todo.tostart: + if re.match(ll.repo, tostart.repo()) and re.match( + ll.labels, tostart.labelsstr() + ): + count += 1 + if count <= ll.max: + tmp.append(tostart) + else: + tmp.append(tostart) + todo.tostart = tmp + # Execute what we need to execute. + todo.execute() + # Ending. + GITHUB_CACHE.save() + COUNTERS.print() -def get_default_runners() -> List[str]: - arr = """ - latest - ubuntu-focal - ubuntu-noble - ubuntu-jammy - debian-buster - debian-bookworm - debian-sid - """ - ret = [] - for tag in arr.split(): - for mode in ["", "docker", "hostdocker"]: - name = f"nomadtools.{mode}{'.' if mode else ''}{tag}" - txt = f""" -job "{name}" {{ - type = "batch" - meta {{ - {CONFIG.nomad.meta}_INFO = < API. DO NOT mix with JobsJob""" - ID: str Version: int - Status: str Namespace: Optional[str] ModifyIndex: int JobModifyIndex: int @@ -110,9 +122,6 @@ class Job(DataDict): Meta: Optional[Dict[str, str]] SubmitTime: int - def is_dead(self): - return self.Status == JobStatus.dead - def description(self): return f"{self.ID}#{self.Version}@{self.Namespace}" @@ -128,11 +137,13 @@ def allservices(self): return out -class JobsJob(DataDict): +class JobsJob(_BothJobAndJobsJob): """Returned aby jobs API. DO NOT mix with Job""" Namespace: Optional[str] - ID: str + Stop: bool + Meta: Optional[Dict[str, str]] = None + JobSummary: JobSummary class NodeScoreMeta(DataDict): @@ -392,6 +403,11 @@ def is_pending(self): def is_running(self): return self.ClientStatus == AllocClientStatus.running + def is_running_started(self): + return self.is_running() and any( + taskstate.was_started() for taskstate in self.get_taskstates().values() + ) + def is_finished(self): return not self.is_pending_or_running() @@ -613,6 +629,16 @@ def __iadd__(self, o: JobSummarySummary) -> JobSummarySummary: self[k] = self.get(k, 0) + o.get(k, 0) return self + def only_completed(self): + return ( + self.Queued == 0 + and self.Complete != 0 + and self.Failed == 0 + and self.Running == 0 + and self.Starting == 0 + and self.Lost == 0 + ) + class JobSummary(DataDict): JobID: str