From 7522845644f6475e1d2d573624dd52caab599758 Mon Sep 17 00:00:00 2001 From: "Kamil Cukrowski from root@leonidas" Date: Fri, 18 Oct 2024 12:16:45 +0200 Subject: [PATCH] githubrunner: update --- Makefile | 4 + deploy/.gitignore | 1 + deploy/githubrunner.yml | 17 +- .../entry_githubrunner/__init__.py | 1 + .../entry_githubrunner/default.nomad.hcl | 69 +- .../entry_githubrunner.py | 866 +++++++----------- .../entry_githubrunner/startscript.sh | 93 +- .../trydispatcheddoesntwork.nomad.hcl | 89 ++ src/nomad_tools/nomadlib/connection.py | 9 +- src/nomad_tools/nomadlib/types.py | 12 +- 10 files changed, 573 insertions(+), 588 deletions(-) create mode 100644 src/nomad_tools/entry_githubrunner/__init__.py rename src/nomad_tools/{ => entry_githubrunner}/entry_githubrunner.py (55%) create mode 100644 src/nomad_tools/entry_githubrunner/trydispatcheddoesntwork.nomad.hcl diff --git a/Makefile b/Makefile index 239ee0a..ff65fe3 100644 --- a/Makefile +++ b/Makefile @@ -55,3 +55,7 @@ run_githubrunner: -var NOMAD_NAMESPACE=$$NOMAD_NAMESPACE \ -var VERBOSE=$(VERBOSE) \ ./deploy/nomadtools-githubrunner.nomad.hcl + +weles_run_githubrunner: + ,rsync --delete $(CURDIR)/ kamil@weles:./myprojects/nomad-tools/ + ssh kamil@weles make -C ./myprojects/nomad-tools/ run_githubrunner diff --git a/deploy/.gitignore b/deploy/.gitignore index 8ce636e..052c47d 100644 --- a/deploy/.gitignore +++ b/deploy/.gitignore @@ -1,2 +1,3 @@ githubcache.json ./githubcache.json +cachedir diff --git a/deploy/githubrunner.yml b/deploy/githubrunner.yml index d25a8cb..23a7159 100644 --- a/deploy/githubrunner.yml +++ b/deploy/githubrunner.yml @@ -1,9 +1,14 @@ --- -opts: - docker: host repos: - - Kamilcuk - # - Kamilcuk/nomad-tools - # - Kamilcuk/runnertest + # - Kamilcuk + - Kamilcuk/nomad-tools + - Kamilcuk/runnertest +nomad: + namespace: github runner_inactivity_timeout: 1w -loop: 60 +loop: 10 +# loop: 60 +template_context: + docker: host + cachedir: /home/kamil/myprojects/nomad-tools/deploy/cachedir + diff --git a/src/nomad_tools/entry_githubrunner/__init__.py b/src/nomad_tools/entry_githubrunner/__init__.py new file mode 100644 index 0000000..cfbfc15 --- /dev/null +++ b/src/nomad_tools/entry_githubrunner/__init__.py @@ -0,0 +1 @@ +from .entry_githubrunner import * diff --git a/src/nomad_tools/entry_githubrunner/default.nomad.hcl b/src/nomad_tools/entry_githubrunner/default.nomad.hcl index 458e8d6..fa74e3e 100644 --- a/src/nomad_tools/entry_githubrunner/default.nomad.hcl +++ b/src/nomad_tools/entry_githubrunner/default.nomad.hcl @@ -1,5 +1,3 @@ -#{# param is all arguments, options and run configuration in one dictionary #} -#{% set param = (param | default({})) or {**arg, **opts, **run} %} job "{{ param.JOB_NAME }}" { type = "batch" meta { @@ -10,7 +8,14 @@ It also starts a docker daemon and is running as privileged {% elif param.docker == "host" %} It also mounts a docker daemon from the host it is running on {% endif %} + EOF + +{% if param.debug %} + PARAM = <= 0 @@ -220,7 +137,7 @@ def get_runner_inactivity_timeout(self) -> Optional[datetime.timedelta]: ############################################################################### -# counters +# {{{1 counters @dataclass @@ -318,7 +235,7 @@ def print(self): """Small object to accumulate what is happening in a single loop""" ############################################################################### -# helpers +# {{{1 helpers T = TypeVar("T") R = TypeVar("R") @@ -340,6 +257,22 @@ def flatten(xss: Iterable[Iterable[T]]) -> Iterable[T]: return (x for xs in xss for x in xs) +def list_split( + condition: Callable[[T], bool], data: List[T] +) -> Tuple[List[T], List[T]]: + ret: Tuple[List[T], List[T]] = ([], []) + for v in data: + ret[condition(v)].append(v) + return ret + + +def list_group_by(data: List[T], key: Callable[[T], R]) -> Dict[R, List[T]]: + ret: Dict[R, List[T]] = {} + for v in data: + ret.setdefault(key(v), []).append(v) + return ret + + PARSE_TIME_REGEX = re.compile( r"((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?" ) @@ -410,8 +343,71 @@ def nomad_job_to_json(job: str) -> dict: return data +def github_repo_from_url(repo: str): + return "/".join(repo.split("/")[3:4]) + + +def labelstr_to_kv(labelstr: str) -> Dict[str, str]: + arg = {} + for part in shlex.split(labelstr): + split = part.split("=", 1) + arg[split[0]] = split[1] if len(split) == 2 else "" + return arg + + +def nomad_job_text_to_json(jobtext: str): + try: + jobspec = json.loads(jobtext) + except json.JSONDecodeError: + try: + jobspec = json.loads( + subprocess.check_output( + "nomad job run -output -".split(), input=jobtext, text=True + ) + ) + except subprocess.CalledProcessError: + log.exception(f"Could not decode Nomad job to json:\n{jobtext}") + raise + jobspec = jobspec.get("Job", jobspec) + return jobspec + + ############################################################################### -# github +# {{{1 abstract description + + +@dataclass +class Desc: + labels: str + repo_url: str + + def __post_init__(self): + assert self.labels + assert self.repo_url + assert "://" in self.repo_url + assert re.match(CONFIG.label_match, self.labels) + + @property + def repo(self): + return github_repo_from_url(self.repo_url) + + def tostr(self): + return f"labels={self.labels} url={self.repo_url}" + + +class DescProtocol(ABC): + @abstractmethod + def get_desc(self) -> Desc: ... + + def try_get_desc(self) -> Optional[Desc]: + try: + return self.get_desc() + except Exception: + return None + + +############################################################################### +# {{{1 github @functools.lru_cache() @@ -529,7 +525,8 @@ def gh_get(url: str, key: str = "") -> Any: # Prepare the request adding headers from Github cache. GITHUB_CACHE.prepare(url, headers) response = requests.get(url, headers=headers) - log.debug(f"{url} {headers} {response}") + headerslog = {**headers, "Authorization": "***"} + log.debug(f"{url} {headerslog} {response}") try: response.raise_for_status() except Exception: @@ -564,19 +561,19 @@ def gh_get_cached(url: str, key: str = ""): ############################################################################### -# github high level +# {{{1 github high level GithubRepo = str @dataclass(frozen=True) -class GithubJob: +class GithubJob(DescProtocol): repo: GithubRepo run: dict job: dict def labelsstr(self): - return ",".join(sorted(list(set(self.job["labels"])))) + return "\n".join(sorted(list(set(self.job["labels"])))) def job_url(self): return self.job["html_url"] @@ -584,6 +581,9 @@ def job_url(self): def repo_url(self): return self.run["repository"]["html_url"] + def get_desc(self): + return Desc(self.labelsstr(), self.repo_url()) + def get_gh_repos_one_to_many(repo: str) -> List[GithubRepo]: """If github repo is a organization or user, expand to @@ -627,7 +627,9 @@ def get_gh_run_jobs(repo: GithubRepo, run: dict) -> List[GithubJob]: ) for job in jobs: if job["status"] in interesting: - ret.append(GithubJob(repo, run, job)) + gj = GithubJob(repo, run, job) + if gj.try_get_desc(): + ret.append(gj) return ret @@ -649,85 +651,126 @@ def get_gh_state(repos: Set[GithubRepo]) -> list[GithubJob]: # desc: str = ", ".join(s.labelsstr() + " for " + s.job_url() for s in reqstate) # log.info(f"Found {len(reqstate)} required runners to run: {desc}") for idx, s in enumerate(reqstate): - logging.info(f"GHJOB: {idx} {s.job_url()} {s.labelsstr()} {s.run['status']}") + logging.info(f"GHJOB={idx} {s.job_url()} {s.labelsstr()} {s.run['status']}") return reqstate ############################################################################### +# {{{1 nomad + + +def get_desc_from_nomadjob(job): + return Desc(**json.loads((job["Meta"] or {})[CONFIG.nomad.meta])) @dataclass -class StateSpec: - labels: str - repo: str - repo_url: str - job_url: str +class NomadJobToRun: + nj: nomadlib.Job + hcl: str + def get_desc(self): + return get_desc_from_nomadjob(self.nj) -class RunnerProtocol(Protocol): - @property - def ID(self) -> str: ... - def get_spec(self) -> StateSpec: ... - def stop(self): ... - def purge(self): ... - def get_logs(self) -> Optional[str]: ... + def tostr(self): + return f"id={self.nj.ID} {self.get_desc().tostr()}" -############################################################################### -# nomad +@dataclass +class GithubRunnerState: + inactive_since: Optional[datetime.datetime] = None -class NomadJobCommon(DataDict): - def labelsstr(self) -> str: - return self["Meta"][CONFIG.nomad.meta + "_LABELS"] +class RunnerState(MyStrEnum): + pending = enum.auto() + listening = enum.auto() + running = enum.auto() + unknown = enum.auto() + dead_success = enum.auto() + dead_failure = enum.auto() - def repo(self) -> str: - return self["Meta"][CONFIG.nomad.meta + "_REPO"] - def repo_url(self) -> str: - return self["Meta"][CONFIG.nomad.meta + "_REPO_URL"] +@dataclass(frozen=True) +class RunnerStateSince: + state: RunnerState + since: datetime.datetime = datetime.datetime.min - def job_url(self) -> str: - return self["Meta"][CONFIG.nomad.meta + "_JOB_URL"] + def is_dead(self): + return self.state in [RunnerState.dead_failure, RunnerState.dead_success] + def is_pending(self): + return self.state == RunnerState.pending -class NomadJob(nomadlib.Job, NomadJobCommon): - """/v1/job/ID return value of the runner""" + def is_listening(self): + return self.state == RunnerState.listening - pass + def is_unknown(self): + return self.state == RunnerState.unknown + def passed(self): + return datetime.datetime.now() - self.since -@dataclass -class GithubRunnerState: - inactive_since: Optional[datetime.datetime] = None + def __eq__(self, o: Union[RunnerStateSince, RunnerState]) -> bool: + if isinstance(o, RunnerState): + return self.state == o + else: + return self.state == o.state and self.since == o.since + + def tostr(self): + return f"state={self.state} since={self.since.isoformat()}" -class NomadRunner(nomadlib.JobsJob, NomadJobCommon): - """/v1/jobs return value of the runner""" +@dataclass(frozen=True) +class NomadRunner(DescProtocol): + nj: nomadlib.JobsJob + + @property + def ID(self): + return self.nj.ID + + def is_dead(self): + return self.nj.is_dead() + + def get_desc(self): + return get_desc_from_nomadjob(self.nj) + + @cached_property + def state(self) -> RunnerStateSince: + if self.nj.is_pending(): + return RunnerStateSince(RunnerState.pending) + if self.nj.is_dead(): + if self.nj.JobSummary.get_sum_summary().only_completed(): + return RunnerStateSince(RunnerState.dead_success) + return RunnerStateSince(RunnerState.dead_failure) + if self.nj.is_running(): + logs = self._get_logs() + if logs is None: + return RunnerStateSince(RunnerState.pending) + return self._parse_logs(logs) + return RunnerStateSince(RunnerState.unknown) def tostr(self): - return f"{self.ID} {self.repo_url()} {self.Status}" + return f"id={self.nj.ID} {self.state.tostr()} {self.get_desc().tostr()}" - def get_github_runner_state(self) -> Optional[GithubRunnerState]: - """Check if this runner is _right now_ - executing a github job. This is best efforted - by parsing stdout logs of the runner""" - if not self.is_running(): - return None + @cached_property + def _allocs(self): COUNTERS.nomad_get.inc() - allocs = [ + return [ nomadlib.Alloc(x) for x in mynomad.get( - f"job/{urlquote(self.ID)}/allocations", + f"job/{urlquote(self.nj.ID)}/allocations", params=dict( namespace=CONFIG.nomad.namespace, filter='ClientStats == "running"', ), ) ] - alloc = next((alloc for alloc in allocs if alloc.is_running_started()), None) - if not alloc: + + def _get_logs(self) -> Optional[str]: + allocs = self._allocs + allocs = [alloc for alloc in allocs if alloc.is_running_started()] + if not allocs: return None + alloc = allocs[0] COUNTERS.nomad_get.inc() logs = mynomad.request( "GET", @@ -740,6 +783,12 @@ def get_github_runner_state(self) -> Optional[GithubRunnerState]: plain=True, ), ).text + return logs + + def _parse_logs(self, logs: str) -> RunnerStateSince: + """Check if this runner is _right now_ + executing a github job. This is best efforted + by parsing stdout logs of the runner""" """ 2024-07-14 12:14:17Z: Listening for Jobs 2024-07-14 12:14:20Z: Running job: build @@ -751,23 +800,22 @@ def get_github_runner_state(self) -> Optional[GithubRunnerState]: parts = line.split(": ", 1) if len(parts) == 2: try: - time = datetime.datetime.fromisoformat(parts[0]).astimezone() - except ValueError as e: - log.debug(f"{line} -> {e}") + time = dateutil.parser.parse(parts[0]) + except Exception as e: + print(f"ERR {e}") continue msg = parts[1] - if ( - "Listening for jobs".lower() in msg.lower() - or "completed with result".lower() in msg.lower() - ): - return GithubRunnerState(time) + if "Listening for jobs".lower() in msg.lower(): + return RunnerStateSince(RunnerState.listening, time) + if "completed with result".lower() in msg.lower(): + return RunnerStateSince(RunnerState.listening, time) if "Running job".lower() in msg.lower(): # is not inactive - return now - return GithubRunnerState() - return None + return RunnerStateSince(RunnerState.running, time) + return RunnerStateSince(RunnerState.unknown) -def get_nomad_state() -> list[NomadRunner]: +def get_nomad_state() -> List[NomadRunner]: COUNTERS.nomad_get.inc() jobsjobs = mynomad.get( "jobs", @@ -780,338 +828,149 @@ def get_nomad_state() -> list[NomadRunner]: # curstate: list[NomadRunner] = [] for jobsjob in jobsjobs: - nj = NomadRunner(jobsjob) + nr = NomadRunner(jobsjob) try: # Get only valid jobsjobs - nj.repo_url() - nj.labelsstr() + nr.get_desc() except (KeyError, AttributeError): continue - curstate.append(nj) + curstate.append(nr) # - # log.info(f"Found {len(curstate)} runners:") + log.debug(f"Found {len(curstate)} runners:") for i, s in enumerate(curstate): - log.info(f"RUNNER: {i} {s.tostr()}") + log.info(f"RUNNER={i} {s.tostr()}") return curstate ############################################################################### -# runners - - -@dataclass -class Runners: - data: Dict[re.Pattern, jinja2.Template] = field(default_factory=dict) - env: jinja2.Environment = field( - default_factory=lambda: jinja2.Environment(loader=jinja2.BaseLoader()) - ) - - @staticmethod - def load(specs: Dict[str, str]) -> Runners: - ret = Runners() - for k, v in specs.items(): - ret.data[re.compile(k)] = ret.env.from_string(v) - return ret - - def find(self, labelsstr: str) -> Optional[jinja2.Template]: - template = next((v for k, v in self.data.items() if k.findall(labelsstr)), None) - if not template: - return None - return template +# {{{1 runners @dataclass -class TemplateContext: - JOB_NAME: str - ACCESS_TOKEN: str - REPO: str - REPO_URL: str - JOB_URL: str - LABELS: str +class RunnerGenerator: + """Stores parsed runners ready to template""" + + desc: Desc + info: str + + def _generate_job_name(self): + # Github runner name can be max 64 characters and no special chars. + # Still try to be as verbose as possible. + parts = [ + CONFIG.nomad.jobprefix, + COUNTERS.cnt.inc(), + self.desc.repo, + self.desc.labels.replace("nomadtools ", ""), + ] + return (re.sub(r"[^a-zA-Z0-9_.-]", "", "-".join(str(x) for x in parts))[:64],) @staticmethod def make_example(labelsstr: str): - return TemplateContext( - JOB_NAME="Example_job_name", - ACCESS_TOKEN="example_access_token", - REPO="user/repo", - REPO_URL="http://example.repo.url/user/repo", - JOB_URL="http://example.job.url/user/repo/run/12312/job/1231", - LABELS=labelsstr, - ) - - @staticmethod - def make_from_github_job(gj: GithubJob): - return TemplateContext( - JOB_NAME=re.sub( - r"[^a-zA-Z0-9_.-]", - "", - "-".join( - str(x) - for x in [ - CONFIG.nomad.jobprefix, - COUNTERS.cnt.inc(), - gj.repo, - gj.labelsstr(), - ] - ), - )[:64], - ACCESS_TOKEN=CONFIG.github.token or "", - REPO=gj.repo, - REPO_URL=gj.repo_url(), - JOB_URL=gj.job_url(), - LABELS=gj.labelsstr(), + return RunnerGenerator( + Desc(labels=labelsstr, repo_url="http://example.repo.url/user/repo"), + info="This is an information", ) - def to_template_args(self) -> dict: - arg = {} - for label in self.LABELS.split(","): - for part in shlex.split(label): - split = part.split("=", 1) - arg[split[0]] = split[1] if len(split) == 2 else "" + def _to_template_args(self) -> dict: + arg = labelstr_to_kv(self.desc.labels) + name = self._generate_job_name() return dict( + param={ + **arg, + **CONFIG.template_context, + **CONFIG.default_template_context, + **dict( + REPO_URL=self.desc.repo_url, + RUNNER_NAME=name, + JOB_NAME=name, + LABELS=self.desc.labels, + ), + }, run=asdict(self), arg=arg, - opts=CONFIG.opts, CONFIG=CONFIG, - RUNNERS=RUNNERS, + TEMPLATE=TEMPLATE, ARGS=ARGS, + escape=nomadlib.escape, + META=json.dumps(asdict(self.desc)), ) - -def get_runner_jobspec(gj: GithubJob) -> Optional[NomadJob]: - template = RUNNERS.find(gj.labelsstr()) - if not template: - return None - tc = TemplateContext.make_from_github_job(gj) - jobtext = template.render(tc.to_template_args()) - if not jobtext: - return None - # - try: - jobspec = json.loads(jobtext) - except json.JSONDecodeError: - try: - jobspec = json.loads( - subprocess.check_output( - "nomad job run -output -".split(), input=jobtext, text=True - ) - ) - except subprocess.CalledProcessError: - log.exception(f"Could not decode template for {gj.labelsstr()}: {template}") - raise - jobspec = jobspec.get("Job", jobspec) - # Apply default transformations. - for key in ["ID", "Name"]: - if key in jobspec: - # Github runner name can be max 64 characters and no special chars. - # Still try to be as verbose as possible. - # The hash generated from job_url should be unique enough. - jobspec[key] = tc.JOB_NAME - jobspec["Namespace"] = CONFIG.nomad.namespace - jobspec["Meta"] = { - **(jobspec.get("Meta", {}) or {}), - **{ - CONFIG.nomad.meta + "_" + k: str(v) - for k, v in asdict(tc).items() - if k.lower() != "token" - }, - } - jobspec = NomadJob(jobspec) - return jobspec + def get_runnertorun(self) -> NomadJobToRun: + tc = self._to_template_args() + jobtext = TEMPLATE.render(tc) + jobspec = nomad_job_text_to_json(jobtext) + # Apply default transformations. + jobspec["Namespace"] = CONFIG.nomad.namespace + for key in ["ID", "Name"]: + if key in jobspec: + jobspec[key] = tc["param"]["JOB_NAME"] + jobspec["Meta"][CONFIG.nomad.meta] = tc["META"] + # + nomadjobtorun = NomadJobToRun(jobspec, jobtext) + assert nomadjobtorun.get_desc() == self.desc + return nomadjobtorun ############################################################################### -# scheduler +# {{{1 scheduler @dataclass -class Todo: - """Represents actions to take""" - - tostart: List[NomadJob] = field(default_factory=list) - tostop: List[str] = field(default_factory=list) - topurge: List[str] = field(default_factory=list) - - def __add__(self, o: Todo): - return Todo( - self.tostart + o.tostart, - self.tostop + o.tostop, - self.topurge + o.topurge, - ) - - def execute(self): - # Sanity checks. - tostartids: List[str] = [x["ID"] for x in self.tostart] - ID: str - for ID in tostartids: - assert ID not in self.tostop, f"{ID} is tostart and tostop" - assert ID not in self.topurge, f"{ID} is tostart and topurge" - for ID in self.tostop: - assert ID not in self.topurge, f"{ID} is tostrop and topurge" - for ID, cnt in collections.Counter(tostartids).items(): - assert cnt == 1, f"Repeated id in tostart: {ID}" - for ID, cnt in collections.Counter(self.tostop).items(): - assert cnt == 1, f"Repeated id in tostop: {ID}" - for ID, cnt in collections.Counter(self.topurge).items(): - assert cnt == 1, f"Repeated id in tostop: {ID}" - # Print them. - if self.tostart: - log.info(f"tostart: {' '.join(tostartids)}") - if self.tostop: - log.info(f"tostop: {' '.join(self.tostop)}") - if self.topurge: - log.info(f"topurge: {' '.join(self.topurge)}") - # Execute them. - if ARGS.dryrun: - log.error("DRYRUN") - else: - for spec in self.tostart: - COUNTERS.nomad_run.inc() - try: - resp = mynomad.start_job(spec.asdict()) - except Exception: - log.exception(f"Could not start: {json.dumps(spec.asdict())}") - raise - log.info(resp) - for name in self.tostop: - COUNTERS.nomad_stop.inc() - resp = mynomad.stop_job(name) - log.info(resp) - if CONFIG.nomad.purge: - for name in self.topurge: - COUNTERS.nomad_purge.inc() - resp = mynomad.stop_job(name, purge=True) - log.info(resp) - - -@dataclass(frozen=True) -class RepoState: - """Represents state of one repository. - Currently this program supports only github-runners bound to one repo""" - - repo_url: str - githubjobs: List[GithubJob] = field(default_factory=list) - nomadrunners: List[NomadRunner] = field(default_factory=list) - - def validate(self): - # Check that all data are for one repo. - for gj in self.githubjobs: - assert gj.repo_url() == self.repo_url - for nr in self.nomadrunners: - assert nr.repo_url() == self.repo_url - for k, v in collections.Counter(nr.ID for nr in self.nomadrunners).items(): - assert v == 1, f"Repeated runner: {k}" - - def __find_free_githubjob(self, labelsstr: str, todo: Todo) -> GithubJob: - used_job_urls = set( - itertools.chain( - ( - (x.job_url() for x in todo.tostart if x.labelsstr() == labelsstr), - ( - nr.job_url() - for nr in self.nomadrunners - if nr.labelsstr() == labelsstr - ), - ) - ) - ) - gjobs = [x for x in self.githubjobs if x.labelsstr() == labelsstr] - picked = next((x for x in gjobs if x.job_url() not in used_job_urls), None) - if picked: - return picked - return random.choice(tuple(gjobs)) +class Scheduler: + desc: Desc + gjs: List[GithubJob] + nrs: List[NomadRunner] - def gen_runners_to_stop(self, labelsstr: str) -> Generator[NomadRunner]: + def gen_runners_to_trim(self) -> List[NomadRunner]: """Generate runners to stop.""" - # To stop runner it can't be alredy stopped - notdeadnotstop = [ - nr - for nr in self.nomadrunners - if nr.labelsstr() == labelsstr and not nr.is_dead() and not nr.Stop + order = [ + RunnerState.unknown, + RunnerState.pending, + RunnerState.listening, ] - # First stop pending ones. - for nr in notdeadnotstop: - if nr.is_pending(): - yield nr - # Then stop ones inactive for longer thatn the configured timeout. - # Getting runner state is costly. - timeout = CONFIG.get_runner_inactivity_timeout() - if timeout: - for nr in notdeadnotstop: - if nr.is_running(): - rstate = nr.get_github_runner_state() - if rstate and rstate.inactive_since: - inactivefor = ( - datetime.datetime.now().astimezone() - rstate.inactive_since - ) - if inactivefor > timeout: - log.warning( - f"INACTIVE: Runner is inactive for {inactivefor} > {CONFIG.runner_inactivity_timeout}: {nr.tostr()}" - ) - yield nr - else: - log.info( - f"INACTIVE: Runner continues inactive for {inactivefor} < {CONFIG.runner_inactivity_timeout}: {nr.tostr()}" - ) - - def scheduler(self) -> Todo: - """Decide which jobs to run or stop""" - self.validate() - todo = Todo() - neededlabelsstrs: Dict[str, int] = collections.Counter( - gj.labelsstr() for gj in self.githubjobs - ) - runninglabelsstrs: Dict[str, int] = collections.Counter( - nr.labelsstr() for nr in self.nomadrunners if not nr.is_dead() - ) - alllabelsstrs: set[str] = set( - itertools.chain(neededlabelsstrs.keys(), runninglabelsstrs.keys()) + nrs = [nr for nr in self.nrs if nr.state.state in order] + nrs.sort( + key=lambda nr: ( + order.index(nr.state.state), + nr.state.since, + -nr.nj.ModifyIndex, + ) ) - for labelsstr in alllabelsstrs: - needed = neededlabelsstrs.get(labelsstr, 0) - running = runninglabelsstrs.get(labelsstr, 0) - diff = needed - running - if diff > 0: - for _ in range(diff): - jobspec = get_runner_jobspec( - self.__find_free_githubjob(labelsstr, todo) - ) - if jobspec is not None: - log.info( - f"Running job {jobspec.ID} with {jobspec.labelsstr()} for {jobspec.job_url()}" - ) - log.debug(f"Running {jobspec}") - todo.tostart.append(jobspec) - else: - log.warning( - f"Runner for {labelsstr} in {self.repo_url} not found" - ) - elif diff < 0: - # Only remove those ones that do not run anything. - # Generator to evaluate only those that needed. - # Diff id negative. - todo.tostop.extend( - x.ID - for x in itertools.islice( - self.gen_runners_to_stop(labelsstr), -diff - ) + return nrs + + def schedule(self): + nrs_not_dead, nrs_dead = list_split(lambda x: x.state.is_dead(), self.nrs) + torun = len(self.gjs) - len(nrs_not_dead) + if torun > 0: + for _ in range(torun): + nomadjobtorun = RunnerGenerator(self.desc, "").get_runnertorun() + log.info(f"Starting runner {nomadjobtorun.tostr()}") + COUNTERS.nomad_run.inc() + mynomad.start_job( + nomadjobtorun.nj.asdict(), + nomadlib.JobSubmission.mk_hcl(nomadjobtorun.hcl), ) - # + elif torun < 0: + tostop = -torun + nomadjobstostop = self.gen_runners_to_trim()[:tostop] + for nr in nomadjobstostop: + log.info(f"Stopping runner {nr.tostr()}") + COUNTERS.nomad_purge.inc() + mynomad.stop_job(nr.nj.ID, purge=True) if CONFIG.nomad.purge: - for nr in self.nomadrunners: - if nr.is_dead() and nr.JobSummary.get_sum_summary().only_completed(): - # Check that there are no running evaluations to be sure. - evals = mynomad.get(f"job/{nr.ID}/evaluations") - if len(evals) == 0 or all( - eval["Status"] == "complete" for eval in evals - ): - log.info( - f"Purging {nr.ID} for {nr.repo_url()} with no running deployments" - ) - todo.topurge.append(nr.ID) - # - return todo + for nr in nrs_dead: + log.info(f"Purging runner {nr.tostr()}") + COUNTERS.nomad_purge.inc() + mynomad.stop_job(nr.nj.ID, purge=True) + + +def merge_states(gjs: List[GithubJob], nrs: List[NomadRunner]): + state: Dict[Desc, Tuple[List[GithubJob], List[NomadRunner]]] = {} + for i in gjs: + state.setdefault(i.get_desc(), ([], []))[0].append(i) + for i in nrs: + state.setdefault(i.get_desc(), ([], []))[1].append(i) + return state def loop(): @@ -1121,43 +980,16 @@ def loop(): reqstate: list[GithubJob] = get_gh_state(repos) # Get the nomad state. curstate: list[NomadRunner] = get_nomad_state() - # Construct current state with repo as the key. - repostates: Dict[str, RepoState] = {} - for gj in reqstate: - repostates.setdefault( - gj.repo_url(), RepoState(gj.repo_url()) - ).githubjobs.append(gj) - for nj in curstate: - repostates.setdefault( - nj.repo_url(), RepoState(nj.repo_url()) - ).nomadrunners.append(nj) - # For each repo, run the scheduler and collect results. - todo: Todo = functools.reduce( - Todo.__add__, parallelmap(RepoState.scheduler, repostates.values()), Todo() - ) - # Apply limits - for ll in CONFIG.limits: - if ll.max >= 0: - count = 0 - tmp = [] - for tostart in todo.tostart: - if re.match(ll.repo, tostart.repo()) and re.match( - ll.labels, tostart.labelsstr() - ): - count += 1 - if count <= ll.max: - tmp.append(tostart) - else: - tmp.append(tostart) - todo.tostart = tmp - # Execute what we need to execute. - todo.execute() + # Merge states on label and repo. + state = merge_states(reqstate, curstate) + for k, v in state.items(): + Scheduler(k, v[0], v[1]).schedule() # Ending. COUNTERS.print() ############################################################################### -# command line +# {{{1 command line @dataclass @@ -1197,15 +1029,14 @@ def cli(args: Args): else: with open(args.config) as f: configstr = f.read() - global CONFIG tmp = yaml.safe_load(configstr) + global CONFIG CONFIG = Config(tmp) - # Load runners - global RUNNERS - RUNNERS = Runners.load(CONFIG.runners) - # + global TEMPLATE + TEMPLATE = jinja2.Environment(loader=jinja2.BaseLoader()).from_string( + CONFIG.template + ) mynomad.namespace = os.environ["NOMAD_NAMESPACE"] = CONFIG.nomad.namespace - # global GITHUB_CACHE GITHUB_CACHE = GithubCache.load() @@ -1249,14 +1080,6 @@ def dumpconfig(): print(yaml.safe_dump(CONFIG.asdict())) -@cli.command(help="List configured runners") -def listrunners(): - for rgx, tmpl in RUNNERS.data.items(): - print(f"{rgx}") - print(f"{tmpl}") - print() - - @cli.command( help="Prints out github cache. If given argument URL, print the cached response" ) @@ -1281,10 +1104,7 @@ def once(): @click.argument("label", required=True, nargs=-1) def rendertemplate(label: Tuple[str, ...]): labelsstr: str = ",".join(sorted(list(label))) - template = RUNNERS.find(labelsstr) - if not template: - raise Exception(f"Template not found for {labelsstr}") - res = template.render(TemplateContext.make_example(labelsstr).to_template_args()) + res = RunnerGenerator.make_example(labelsstr).get_runnertorun() print(res) diff --git a/src/nomad_tools/entry_githubrunner/startscript.sh b/src/nomad_tools/entry_githubrunner/startscript.sh index a0665d3..fb0cf19 100644 --- a/src/nomad_tools/entry_githubrunner/startscript.sh +++ b/src/nomad_tools/entry_githubrunner/startscript.sh @@ -1,31 +1,43 @@ #!/bin/bash -set -xeuo pipefail +set -euo pipefail + +if [[ "${DEBUG:-}" ]]; then + set -x +fi fatal() { - echo "startscript: FATAL: $*" >&2 + echo "startscript: FATAL: $*" exit 123 } warn() { - echo "startscript: WARNING: $*" >&2 + echo "startscript: WARNING: $*" } info() { - echo "startscript: INFO: $*" >&2 + echo "startscript: INFO: $*" +} + +r() { + echo "+ $*" + "$@" } -choosedir() { - local dir i ret=0 +choosedirin() { + local dir dir="$1" shift if ((!$#)); then fatal "missing arguments"; fi + # + local i ret=0 for ((i = 0; ; i++)); do - mkdir -vp "$dir/$i" + r mkdir -vp "$dir/$i" { if flock -n 249; then info "Using $dir/$i directory" - "$@" "$dir/$i" || ret=$? - return $ret + info "+ $* $dir/$i" + r "$@" "$dir/$i" || ret=$? + return "$ret" fi } 249>"$dir/$i/lockfile" done @@ -38,42 +50,73 @@ dir_is_empty() { configure_dockerd() { local dir dir="$1" - dir=$(readlink -f "$dir/docker") + shift if (($#)); then fatal "too many args"; fi - if [[ ! -d /var/lib/docker ]]; then - info "Dockerd using $dir" - ln -nvfs "$dir" /var/lib/docker/ + # + dir=$(readlink -f "$dir/docker") + local dockersock + dockersock=/var/run/docker.sock + if [[ -e $dockersock ]]; then + warn "Skipping dockerd configuration: $dockersock already exists" + if ! r docker info; then + warn "docker info failed" + fi else - warn "Skipping dockerd configuratin: /var/lib/docker already exists" + info "Symlinking docker to $dir" + r mkdir -vp "$dir" + r ln -nvfs "$dir" /var/lib/docker fi } configure_runner() { - local dir tmp + local dir dir="$1" + shift + if (($#)); then fatal "too many args"; fi # - tmp=$(readlink -f "$dir/run") - mkdir -vp "$tmp" - export RUNNER_WORKDIR="$tmp" + local tmp + tmp=$(readlink -f "$dir/configure") + r mkdir -vp "$tmp" + r export RUNNER_WORKDIR="$tmp" info "RUNNER_WORKDIR=$RUNNER_WORKDIR" # tmp=$(readlink -f "$dir/hostedtoolcache") if [[ -e /opt/hostedtoolcache/ ]]; then - rmdir -v /opt/hostedtoolcache/ + r rmdir -v /opt/hostedtoolcache/ fi - mkdir -vp "$tmp" - ln -nvfs "$tmp" /opt/hostedtoolcache/ - export AGENT_TOOLSDIRECTORY="$tmp" + r mkdir -vp "$tmp" + r ln -nvfs "$tmp" /opt/hostedtoolcache + r export AGENT_TOOLSDIRECTORY="$tmp" info "AGENT_TOOLSDIRECTORY=$tmp | /opt/hostedtoolcache -> $tmp" } -run() { +configure() { local dir dir="$1" + shift + if (($#)); then fatal "too many args"; fi + # configure_dockerd "$dir" configure_runner "$dir" - exec /entrypoint.sh ./bin/Runner.Listener run --startuptype service } +save_stdout_in_fd3_and_redirect_stdout_stderr() { + exec 3>&1 1>&2 +} + +restore_stdout_from_fd3() { + exec 1>&3 3>&- +} + +save_stdout_in_fd3_and_redirect_stdout_stderr +if (($#)); then fatal "too many args"; fi if [[ ! -e /.dockerenv ]]; then fatal "Not in docker"; fi -run /_work +if mountpoint /_work; then + choosedirin /_work configure +else + info "/_work is not a mountpoint" +fi +info 'start github runner' +restore_stdout_from_fd3 +# synchronize with https://github.com/myoung34/docker-github-actions-runner/blob/master/Dockerfile#L25 +r exec /entrypoint.sh ./bin/Runner.Listener run --startuptype service diff --git a/src/nomad_tools/entry_githubrunner/trydispatcheddoesntwork.nomad.hcl b/src/nomad_tools/entry_githubrunner/trydispatcheddoesntwork.nomad.hcl new file mode 100644 index 0000000..954a0b6 --- /dev/null +++ b/src/nomad_tools/entry_githubrunner/trydispatcheddoesntwork.nomad.hcl @@ -0,0 +1,89 @@ +job "nomadtools-githubrunner-executor" { + type = "batch" + parameterized { + meta_required = [ + "ACCESS_TOKEN", + "REPO_URL", + "JOB_NAME", + "LABELS", + "cpu", + "mem", + "memmax", + "image", + ] + } + meta { + INFO = < API. DO NOT mix with JobsJob""" Version: int - Namespace: Optional[str] ModifyIndex: int JobModifyIndex: int TaskGroups: List[JobTaskGroup] - Stop: bool - Meta: Optional[Dict[str, str]] SubmitTime: int def description(self): @@ -140,9 +143,6 @@ def allservices(self): class JobsJob(_BothJobAndJobsJob): """Returned aby jobs API. DO NOT mix with Job""" - Namespace: Optional[str] - Stop: bool - Meta: Optional[Dict[str, str]] = None JobSummary: JobSummary