diff --git a/jobs/test-json.nomad.hcl b/jobs/test-json.nomad.hcl new file mode 100644 index 0000000..273399a --- /dev/null +++ b/jobs/test-json.nomad.hcl @@ -0,0 +1,21 @@ +job "test-jsons" { + group "example" { + task "server" { + identity { + file = true + ttl = "1h" + } + driver = "raw_exec" + config { + command = "echo" + } + template { + source = "local/redis.conf.tpl" + destination = "local/redis.conf" + change_mode = "signal" + change_signal = "SIGINT" + error_on_missing_key = false + } + } + } +} diff --git a/src/nomad_tools/entry_go.py b/src/nomad_tools/entry_go.py index e017292..bbd7989 100644 --- a/src/nomad_tools/entry_go.py +++ b/src/nomad_tools/entry_go.py @@ -1,9 +1,10 @@ import contextlib -import csv +import datetime import io import json import logging import os +import re import shlex import signal import socket @@ -13,7 +14,7 @@ import threading import uuid from dataclasses import dataclass -from typing import IO, Any, Dict, List, Optional, Tuple, TypeVar, cast +from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast import click import clickdc @@ -22,7 +23,7 @@ from . import nomad_watch, taskexec from .common_base import NOMAD_NAMESPACE, quotearr -from .common_click import common_options +from .common_click import EPILOG, common_options from .common_nomad import namespace_option clickforward.init() @@ -67,61 +68,58 @@ def __exit__(self, exctype, excinst, exctb): sys.stdin = self.old -class JsonType(click.ParamType): - name = "JSON" +def snake_to_camel_case(txt: str) -> str: + return "".join(x.capitalize() for x in txt.split("_")) if txt.islower() else txt - def convert( - self, value: Any, param: Optional[click.Parameter], ctx: Optional[click.Context] - ) -> dict: - """Entrypoint for click option conversion""" - return json.loads(value) if isinstance(value, str) else value +def timestr_to_nanos(time_str: str) -> int: + """Convert like 1h into 3600000000""" + RGX = re.compile( + r"^((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$" + ) + parts = RGX.match(time_str) + assert ( + parts is not None + ), "Could not parse any time information from '{}'. Examples of valid strings: '8h', '2d8h5m20s', '2m4s'".format( + time_str + ) + time_params = { + name: float(param) for name, param in parts.groupdict().items() if param + } + return int(datetime.timedelta(**time_params).total_seconds()) * 10**9 -class JsonOrKvCsvType(click.ParamType): - name = "JSON_OR_KVCSV" - def convert( - self, value: Any, param: Optional[click.Parameter], ctx: Optional[click.Context] - ) -> dict: - """Entrypoint for click option conversion""" - if not isinstance(value, str): - return value - try: - return json.loads(value) - except json.JSONDecodeError: - return { - key: val - for line in csv.reader(io.StringIO(value)) - for elem in line - for key, val in [elem.split("=", 1)] - } +############################################################################### -class JsonOrShellKvType(click.ParamType): - name = "json_or_shellKV" +class JsonType(click.ParamType): + name = "JSON" def convert( self, value: Any, param: Optional[click.Parameter], ctx: Optional[click.Context] ) -> dict: """Entrypoint for click option conversion""" - if not isinstance(value, str): - return value - try: - return json.loads(value) - except json.JSONDecodeError: - return { - k: v for elem in shlex.split(value) for k, v in [elem.split("=", 1)] - } - + return json.loads(value) if isinstance(value, str) else value -class TemplateType(click.ParamType): - name = "json_or_shellKV" - MAP: Dict[str, str] = dict( - destination="DestPath", - data="EmbeddedTmpl", - source="SourcePath", - ) +class JsonOrShellKvType(click.ParamType): + """ + The click argument is a json array if it can be parsed as + or it is a list of shell quoted values in the form var=val + """ + + name = "JSON_OR_SHELLKV" + + def __init__( + self, + map: Dict[str, str] = {}, + snake_to_camel: bool = False, + transform: Optional[Callable[[dict], Any]] = None, + ): + super().__init__() + self.map = map + self.snake_to_camel = snake_to_camel + self.transform = transform def convert( self, value: Any, param: Optional[click.Parameter], ctx: Optional[click.Context] @@ -133,7 +131,12 @@ def convert( ret = json.loads(value) except json.JSONDecodeError: ret = {k: v for elem in shlex.split(value) for k, v in [elem.split("=", 1)]} - return {self.MAP.get(k, k): v for k, v in ret.items()} + ret = {self.map.get(k, k): v for k, v in ret.items()} + if self.snake_to_camel: + ret = {snake_to_camel_case(k): v for k, v in ret.items()} + if self.transform: + self.transform(ret) + return ret class MountNfsType(click.ParamType): @@ -176,7 +179,11 @@ class Parsed: @dataclass class Args: name: Optional[str] = clickdc.option(help="Set the name of the job, group and task") - type: str = clickdc.option(default="batch") + type: str = clickdc.option( + default="batch", + type=click.Choice("service system batch sysbatch".split()), + help="Nomad job type. Specifies Nomad scheduler to use.", + ) cap_add: Tuple[str, ...] = clickdc.option( multiple=True, help="Add Linux capabilities" ) @@ -204,17 +211,60 @@ class Args: name: Optional[str] = clickdc.option( help=f"Job, group and task name. Default: {NAME_PREFIX}" ) - mount: Tuple[Any, ...] = clickdc.option( - multiple=True, type=JsonOrKvCsvType(), help="Add a mount block" + mount: Tuple[dict, ...] = clickdc.option( + multiple=True, type=JsonOrShellKvType(), help="Add a mount block" + ) + identity: Tuple[dict, ...] = clickdc.option( + type=JsonOrShellKvType( + snake_to_camel=True, + map=dict(aud="Audience", ttl="TTL"), + transform=lambda x: x.update({"TTL": timestr_to_nanos(x["TTL"])}) + if isinstance(x.get("TTL"), str) + else None, + ), + multiple=True, + help=""" + Add a identity block + Example: --identity "env=true file=true change_mode=restart" + --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "ttl": 1h", "change_mode": "signal", "change_signal": "SIGHUP"}' + """, + ) + vault: Optional[dict] = clickdc.option( + type=JsonOrShellKvType(snake_to_camel=True), + help="Add a vault block", + ) + consul: Optional[dict] = clickdc.option( + type=JsonOrShellKvType(snake_to_camel=True), + help="Add a consul block", + ) + numa: Optional[dict] = clickdc.option( + type=JsonOrShellKvType(snake_to_camel=True), + help=""" + Add a numa block. + Example: --numa affinity=require + """, + ) + device: Tuple[dict, ...] = clickdc.option( + type=JsonType(), + help=""" + Add a device block to resources of the task. + Example --device '{"Name":"nvidia/gpu","Count":2,"Constraints":[{"LTarget":"${device.attr.memory}","RTarget":"2 GiB","Operand":">="}],"Affinities":[{"LTarget":"${device.attr.memory}","RTarget":"4 GiB","Operand":">=","Weight":75}]}' + """, ) + cores: Optional[int] = clickdc.option(help="Sepcify cores") mountnfs: Tuple[dict, ...] = clickdc.option( multiple=True, type=MountNfsType(), - help="Generates a mount block that mounts NFS share from SRV:SRC to TGT using OPTS options.", + help=""" + Generates a mount block that mounts NFS share from SRV:SRC to TGT using OPTS options. + """, ) memorymb: Optional[int] = clickdc.option( "-m", type=int, help="Memory limit in MegaBytes" ) + memorymaxmb: Optional[int] = clickdc.option( + type=int, help="Memory max limit in MegaBytes" + ) privileged: bool = clickdc.option( is_flag=True, help="Add privileged=true to config" ) @@ -233,7 +283,10 @@ class Args: default="no", help="With restart always, will add always restarting restart policy", ) - driver: str = clickdc.option(default="docker", help="Nomad task driver") + driver: str = clickdc.option( + default="docker", + help="Nomad task driver. One of docker, raw_exec, exec, java, etc.", + ) datacenters: Tuple[str, ...] = clickdc.option( "--dc", multiple=True, @@ -241,16 +294,37 @@ class Args: ) constraint: Tuple[str, ...] = clickdc.option( multiple=True, - help="List of constrains of 3 elements parsed with shlex.split(). Can be specified multiple times.", + help=""" + List of constrains of 3 elements parsed with shlex.split(). Can be specified multiple times. + Example: --constarint '${attr.os.name} regexp "Ubuntu"' + """, ) constraint_here: bool = clickdc.alias_option( aliased=dict( constraint="${attr.unique.hostname} == " + shlex.quote(socket.gethostname()) ) ) - template: Optional[Any] = clickdc.option(multiple=True, type=TemplateType()) + template: Optional[Any] = clickdc.option( + multiple=True, + type=JsonOrShellKvType( + snake_to_camel=True, + map=dict( + destination="DestPath", + data="EmbeddedTmpl", + source="SourcePath", + error_on_missing_key="ErrMissingKey", + ), + ), + help=""" + Add template block. + """, + ) auth: Optional[Any] = clickdc.option( - type=JsonOrKvCsvType(), help="Pass the JSON data as auth field in config" + type=JsonOrShellKvType(), + help=""" + Add config.auth block. + Example: --auth "username=dockerhub_user password=dockerhub_password" + """, ) image_pull_timeout: Optional[str] = clickdc.option( help="Add image_pull_timeout to task config" @@ -267,13 +341,18 @@ class Args: interactive: bool = clickdc.option( "-i", is_flag=True, - help=""" Run the job, wait for the job to be started, - then get the allocation ID of the running task and execute inside it """, + help=""" + Run the job, wait for the job to be started, + then get the allocation ID of the running task and execute inside it. + """, ) interactive_foreground: str = clickdc.option( default="sleep infinity", - help=""" If using foreground, this is the command that will run in the job, -e and interactive terminal will connect using websockets """, + help=""" + If using foreground, this is the command that will run in the job, + and interactive terminal will connect using websockets. + Default: sleep infinity + """, ) network: Optional[str] = clickdc.option(help="Connect a container to a network") group_network_mode: Optional[str] = clickdc.option( @@ -295,7 +374,7 @@ class Args: """ ) kill_signal: Optional[str] = clickdc.option( - """ + help=""" Specifies a configurable kill signal for a task, where the default is SIGINT (or SIGTERM for docker, or CTRL_BREAK_EVENT for raw_exec on Windows). Note that this is only supported for drivers sending signals @@ -304,16 +383,16 @@ class Args: ) # extra_config: Any = clickdc.option( - type=JsonType(), default={}, help="Add extra JSON to config" + type=JsonOrShellKvType(), default={}, help="Add extra JSON to config" ) extra_task: Any = clickdc.option( - type=JsonType(), default={}, help="Add extra JSON to task" + type=JsonOrShellKvType(), default={}, help="Add extra JSON to task" ) extra_group: Any = clickdc.option( - type=JsonType(), default={}, help="Add extra JSON to group" + type=JsonOrShellKvType(), default={}, help="Add extra JSON to group" ) extra_job: Any = clickdc.option( - type=JsonType(), default={}, help="Add extra JSON to job" + type=JsonOrShellKvType(), default={}, help="Add extra JSON to job" ) command: Tuple[str, ...] = clickdc.argument( required=True, nargs=-1, type=clickforward.FORWARD @@ -451,15 +530,28 @@ def parse(self) -> Parsed: ), "Resources": ( { - "MemoryMB": self.memorymb - if self.memorymb is not None - else None, + "MemoryMB": self.memorymb, + "MemoryMaxMB": self.memorymaxmb, "CPU": self.cpu, + "NUMA": self.numa, + "Device": self.device, + "Cores": self.cores, } - if self.memorymb or self.cpu + if self.memorymb is not None + or self.cpu is not None + or self.numa is not None + or self.memorymaxmb is not None + or self.cores is not None + or self.device else None ), "RestartPolicy": {"Attempts": 0, "Mode": "fail"}, + "Vault": self.vault, + "Consul": self.consul, + "Identity": self.identity[0] if self.identity else None, + "Identities": self.identity[1:] + if len(self.identity) > 1 + else None, **self.extra_task, } ], @@ -523,7 +615,23 @@ def __thread(self): Given the arguments on the command this command constructs a JSON Nomad job specification. Then this specification is executed using `nomadt watch run` command. + +The JSON_OR_SHELLKV options argument is parsed as a JSON. If it is not a valid JSON, +then the value is split using shlex.split() and parsed as a list of values in the form +val=var. See examples below. """, + epilog=""" +\b +Examples: + go --rm hello-world + Runs hello-world docker image. + go --rm -m 3000 -e NAME=you alpine sh -c 'echo hello $NAME' + Runs alpine docker imge with 3G of memory and NAME environment variable + go --constraint '${attr.os.name} regexp Ubuntu' --driver raw_exec sh + go --cores=3 --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "change_mode": "signal", "change_signal": "SIGHUP", "TTL": "1h"}' --driver raw_exec echo hello + go --rm --template "destination=local/script.sh data=$(printf "%q" "$(cat ./script.sh)")" alpine sh -c 'sh ${NOMAD_TASK_DIR}/script.sh' +""" + f"{EPILOG}", ) @common_options() @namespace_option() diff --git a/tests/integration/test_go.py b/tests/integration/test_go.py index 5565e49..6c757c9 100644 --- a/tests/integration/test_go.py +++ b/tests/integration/test_go.py @@ -21,3 +21,25 @@ def test_go_exit_24(): def test_go_exit_2(): run_nomadt("go --rm busybox:stable sh -xc 'exit 2'", check=2) + + +def test_go_identy(): + run_nomadt( + """ + go --rm + --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "change_mode": "signal", "change_signal": "SIGHUP", "TTL": "1h"}' + --driver raw_exec echo hello + """ + ) + + +def test_go_identities(): + run_nomadt( + """ + go --rm + --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "change_mode": "signal", "change_signal": "SIGHUP", "TTL": "1h"}' + --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "change_mode": "signal", "change_signal": "SIGHUP", "TTL": "1h"}' + --identity '{"name": "example", "aud": ["oidc.example.com"], "file": true, "change_mode": "signal", "change_signal": "SIGHUP", "TTL": "1h"}' + --driver raw_exec echo hello + """ + ) diff --git a/tests/unit/test_go.py b/tests/unit/test_go.py new file mode 100644 index 0000000..2d184cd --- /dev/null +++ b/tests/unit/test_go.py @@ -0,0 +1,84 @@ +import json +import shlex + +from tests.testlib import run_nomadt + + +def test_go_json(): + json.loads(run_nomadt("go -O --driver raw_exec command", stdout=True).stdout) + + +def test_go_auth(): + ret = json.loads( + run_nomadt( + "go -O --auth 'username=a password=b' --driver raw_exec command", + stdout=True, + ).stdout + ) + assert ret["TaskGroups"][0]["Tasks"][0]["Config"]["auth"] == { + "username": "a", + "password": "b", + } + ret = json.loads( + run_nomadt( + r""" + go -O --auth 'username="!@#$%^&*" password="\"'\''\""' --driver raw_exec command + """, + stdout=True, + ).stdout + ) + assert ret["TaskGroups"][0]["Tasks"][0]["Config"]["auth"] == { + "username": "!@#$%^&*", + "password": '"\'"', + } + ret = json.loads( + run_nomadt( + """go -O --auth '{"username": "a", "password": "b"}' --driver raw_exec command""", + stdout=True, + ).stdout + ) + assert ret["TaskGroups"][0]["Tasks"][0]["Config"]["auth"] == { + "username": "a", + "password": "b", + } + + +def test_go_raw_exec_cmd(): + ret = json.loads( + run_nomadt("""go -O --driver raw_exec cmd arg1 arg2""", stdout=True).stdout + ) + config = ret["TaskGroups"][0]["Tasks"][0]["Config"] + assert config["command"] == "cmd", config + assert config["args"] == ["arg1", "arg2"] + + +def test_go_docker_cmd(): + ret = json.loads(run_nomadt("""go -O image cmd arg1 arg2""", stdout=True).stdout) + config = ret["TaskGroups"][0]["Tasks"][0]["Config"] + assert config["image"] == "image" + assert config["command"] == "cmd" + assert config["args"] == ["arg1", "arg2"] + ret = json.loads( + run_nomadt( + """go -O --driver docker --image image cmd arg1 arg2""", stdout=True + ).stdout + ) + config = ret["TaskGroups"][0]["Tasks"][0]["Config"] + assert config["image"] == "image" + assert config["command"] == "cmd" + assert config["args"] == ["arg1", "arg2"] + + +def test_go_template(): + txt = "".join(chr(i) for i in range(1, 127)) + arg = f"destination=local/script.sh data={shlex.quote(txt)}" + ret = json.loads( + run_nomadt(f"""go -O --template {shlex.quote(arg)} cmd""", stdout=True).stdout + ) + config1 = ret["TaskGroups"][0]["Tasks"][0]["Config"] + arg = json.dumps({"DestPath": "local/script.sh", "EmbeddedTmpl": txt}) + ret = json.loads( + run_nomadt(f"""go -O --template {shlex.quote(arg)} cmd""", stdout=True).stdout + ) + config2 = ret["TaskGroups"][0]["Tasks"][0]["Config"] + assert config1 == config2