From b9cb48c260f072c71fa9ba6eae67ef8d4a034716 Mon Sep 17 00:00:00 2001 From: Keith Ralphs Date: Mon, 14 Oct 2024 13:51:22 +0000 Subject: [PATCH] Add tracing adjusting startup to prevent multiple FastAPI apps --- .devcontainer/devcontainer.json | 4 +- .vscode/launch.json | 20 ++- Dockerfile | 5 +- dev-requirements.txt | 30 +++-- docs/reference/openapi.yaml | 4 + helm/blueapi/values.yaml | 22 ++-- pyproject.toml | 8 +- src/blueapi/cli/cli.py | 35 ++++-- src/blueapi/client/rest.py | 3 +- src/blueapi/config.py | 18 --- src/blueapi/core/__init__.py | 4 + src/blueapi/service/main.py | 9 +- src/blueapi/service/runner.py | 2 +- src/blueapi/worker/task_worker.py | 29 ++--- tests/system_tests/plans.json | 132 ++++---------------- tests/unit_tests/core/fake_device_module.py | 2 +- tests/unit_tests/service/test_interface.py | 59 +++++---- tests/unit_tests/service/test_rest_api.py | 33 ++++- tests/unit_tests/service/test_runner.py | 18 +-- tests/unit_tests/worker/test_task_worker.py | 38 ++++-- 20 files changed, 230 insertions(+), 245 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 31547f83e..4be3c2073 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -40,9 +40,7 @@ "--security-opt=label=disable" ], // Remove this before pushing, only to allow local testing with editable library - "mounts": [ - "source=/scratch/athena/observability-utils,target=/scratch/athena/observability-utils,type=bind,consistency=cached" - ], + "mounts": [], // Mount the parent as /workspaces so we can pip install peers as editable "workspaceMount": "source=${localWorkspaceFolder}/..,target=/workspaces,type=bind", // After the container is created, install the python project in editable form diff --git a/.vscode/launch.json b/.vscode/launch.json index 09d5b6de6..98fd60d85 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -25,9 +25,10 @@ "request": "launch", "justMyCode": false, "module": "blueapi", - "args": [ - "serve" - ] + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, + "args": "-c ${input:config_path} serve" }, { "name": "Blueapi Controller", @@ -35,10 +36,19 @@ "request": "launch", "justMyCode": false, "module": "blueapi", - "args": "controller ${input:args}" + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, + "args": "-c ${input:config_path} controller ${input:args}" }, ], "inputs": [ + { + "id": "config_path", + "type": "promptString", + "description": "Path to configuration YAML file", + "default": "" + }, { "id": "args", "type": "promptString", @@ -46,4 +56,4 @@ "default": "" } ] -} +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 7ec1b29f5..601323921 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,12 +12,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN python -m venv /venv ENV PATH=/venv/bin:$PATH +ENV OTLP_EXPORT_ENABLED=false # enable opentelemetry support ENV OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf -# Chnage this to point to Jaeger server before merging +# Change this to point to Jaeger server before merging e.g. https://daq-services-jaeger ENV OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318 -# Change this to enable proper secrity before merging -ENV OTEL_EXPORTER_OTLP_INSECURE=true # Ensure that all Http headers are captured ENV OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*" ENV OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*" diff --git a/dev-requirements.txt b/dev-requirements.txt index 68b3ecac7..c37b65e09 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,8 +9,8 @@ annotated-types==0.7.0 anyio==4.4.0 appdirs==1.4.4 asciitree==0.3.3 +asgiref==3.8.1 asttokens==2.4.1 -async-timeout==4.0.3 attrs==24.2.0 babel==2.16.0 beautifulsoup4==4.12.3 @@ -20,7 +20,6 @@ bluesky-kafka==0.10.0 bluesky-live==0.0.8 bluesky-stomp==0.1.2 boltons==24.0.0 -bump-pydantic==0.8.0 cachetools==5.5.0 caproto==1.1.1 certifi==2024.8.30 @@ -42,6 +41,7 @@ databroker==1.2.5 dataclasses-json==0.6.7 decorator==5.1.1 deepmerge==2.0 +Deprecated==1.2.14 distlib==0.3.8 dls-bluesky-core==0.0.4 dls-dodal==1.31.1 @@ -54,7 +54,6 @@ email_validator==2.2.0 entrypoints==0.4 epicscorelibs==7.0.7.99.0.2 event-model==1.21.0 -exceptiongroup==1.2.2 executing==2.1.0 fastapi==0.114.2 fastapi-cli==0.0.5 @@ -68,7 +67,9 @@ fsspec==2024.9.0 funcy==2.0 gitdb==4.0.11 GitPython==3.1.43 +googleapis-common-protos==1.65.0 graypy==2.1.0 +grpcio==1.66.2 h11==0.14.0 h5py==3.11.0 HeapDict==1.0.1 @@ -81,7 +82,7 @@ identify==2.6.1 idna==3.10 imageio==2.35.1 imagesize==1.4.1 -importlib_metadata==8.5.0 +importlib_metadata==8.4.0 importlib_resources==6.4.5 iniconfig==2.0.0 intake==0.6.4 @@ -96,8 +97,6 @@ jsonschema-specifications==2023.12.1 jupyterlab_widgets==3.0.13 kiwisolver==1.4.7 ldap3==2.9.1 -libcst==1.4.0 -livereload==2.7.0 locket==1.0.0 lz4==4.3.3 markdown-it-py==3.0.0 @@ -122,7 +121,21 @@ nose2==0.15.1 nslsii==0.10.3 numcodecs==0.13.0 numpy==1.26.4 +observability-utils==0.1.2 opencv-python-headless==4.10.0.84 +opentelemetry-api==1.27.0 +opentelemetry-distro==0.48b0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-grpc==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-instrumentation==0.48b0 +opentelemetry-instrumentation-asgi==0.48b0 +opentelemetry-instrumentation-fastapi==0.48b0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 +opentelemetry-util-http==0.48b0 ophyd==1.9.0 ophyd-async==0.5.2 orjson==3.10.7 @@ -147,6 +160,7 @@ ply==3.11 pre-commit==3.8.0 prettytable==3.11.0 prompt-toolkit==3.0.36 +protobuf==4.25.5 psutil==6.0.0 ptyprocess==0.7.0 pure_eval==0.2.3 @@ -173,7 +187,6 @@ python-dotenv==1.0.1 python-multipart==0.0.9 pytz==2024.2 PyYAML==6.0.2 -pyyaml-include==2.1 questionary==2.0.1 redis==5.0.8 redis-json-dict==0.2.0 @@ -217,9 +230,7 @@ suitcase-msgpack==0.3.0 suitcase-utils==0.5.4 super-state-machine==2.0.2 tifffile==2024.8.30 -tomli==2.0.1 toolz==0.12.1 -tornado==6.4.1 tox==3.28.0 tox-direct==0.4 tqdm==4.66.5 @@ -244,6 +255,7 @@ websocket-client==1.8.0 websockets==13.0.1 widgetsnbextension==4.0.13 workflows==2.27 +wrapt==1.16.0 xarray==2024.9.0 yarl==1.11.1 zarr==2.18.3 diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index a9fdf63cc..57a4d9352 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -179,6 +179,10 @@ components: default: true title: Is Pending type: boolean + request_id: + default: '' + title: Request Id + type: string task: title: Task task_id: diff --git a/helm/blueapi/values.yaml b/helm/blueapi/values.yaml index feb7adccf..ee7d5b2b0 100644 --- a/helm/blueapi/values.yaml +++ b/helm/blueapi/values.yaml @@ -77,29 +77,29 @@ listener: # Additional envVars to mount to the pod as a String extraEnvVars: | + - name: OTLP_EXPORT_ENABLED + value: {{ .Values.tracing.otlp.export_enabled] }} - name: OTEL_EXPORTER_OTLP_TRACES_PROTOCOL - value: {{ .Values.jaeger.otlp.protocol }} + value: {{ .Values.tracing.otlp.protocol }} - name: OTEL_EXPORTER_OTLP_ENDPOINT - value: "{{ .Values.jaeger.otlp.host }}:{{ .Values.jaeger.otlp.port }}" - - name: OTEL_EXPORTER_OTLP_INSECURE - value: "{{ .Values.jaeger.otlp.insecure }}" + value: "{{ .Values.tracing.otlp.host }}:{{ .Values.tracing.otlp.port }}" - name: OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST - value: {{ .Values.jaeger.otlp.request.headers }} + value: {{ .Values.tracing.http.request.headers }} - name: OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE - value: {{ .Values.jaeger.otlp.response.headers }} + value: {{ .Values.tracing.http.response.headers }} # - name: RABBITMQ_PASSWORD # valueFrom: # secretKeyRef: # name: rabbitmq-password # key: rabbitmq-password -jaeger: +tracing: otlp: + export_enabled: false protocol: http/protobuf - insecure: true - # - host: http://localhost + host: https://daq-services-jaeger # replace with central instance port: 4318 + http: request: headers: ".*" response: @@ -126,6 +126,8 @@ worker: password: guest host: rabbitmq port: 61613 + tracing_exporter: + host: "" initContainer: scratch: diff --git a/pyproject.toml b/pyproject.toml index 60a220945..605d582e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,14 +27,14 @@ dependencies = [ "fastapi>=0.112.0", "uvicorn", "requests", - "dls-bluesky-core", #requires ophyd-async + "dls-bluesky-core", #requires ophyd-async "dls-dodal>=1.31.0", - "super-state-machine", # See GH issue 553 + "super-state-machine", # See GH issue 553 "GitPython", "bluesky-stomp>=0.1.2", "opentelemetry-distro>=0.48b0", "opentelemetry-instrumentation-fastapi>=0.48b0", - "observability-utils @ file:///scratch/athena/observability-utils/", # Uncomment this before merging + "observability-utils>=0.1.2", ] dynamic = ["version"] license.file = "LICENSE" @@ -82,7 +82,7 @@ write_to = "src/blueapi/_version.py" [tool.mypy] ignore_missing_imports = true # Ignore missing stubs in imported modules -namespace_packages = false # rely only on __init__ files to determine fully qualified module names. +namespace_packages = true # necessary for tracing sdk to work with mypy [tool.pytest.ini_options] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 14b6d84dc..0a2e612be 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -10,6 +10,8 @@ from bluesky_stomp.messaging import MessageContext, StompClient from bluesky_stomp.models import Broker from observability_utils.tracing import setup_tracing +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.trace import get_tracer_provider from pydantic import ValidationError from requests.exceptions import ConnectionError @@ -19,14 +21,7 @@ from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient from blueapi.client.rest import BlueskyRemoteControlError from blueapi.config import ApplicationConfig, ConfigLoader -from blueapi.core import DataEvent -from blueapi.service.main import start -from blueapi.service.openapi import ( - DOCS_SCHEMA_LOCATION, - generate_schema, - print_schema_as_yaml, - write_schema_as_yaml, -) +from blueapi.core import OTLP_EXPORT_ENABLED, DataEvent from blueapi.worker import ProgressEvent, Task, WorkerEvent from .scratch import setup_scratch @@ -42,7 +37,6 @@ def main(ctx: click.Context, config: Path | None | tuple[Path, ...]) -> None: # if no command is supplied, run with the options passed - setup_tracing("BlueAPI") # initialise TracerProvider for server app config_loader = ConfigLoader(ApplicationConfig) if config is not None: configs = (config,) if isinstance(config, Path) else config @@ -72,6 +66,16 @@ def main(ctx: click.Context, config: Path | None | tuple[Path, ...]) -> None: help="[Development only] update the schema in the documentation", ) def schema(output: Path | None = None, update: bool = False) -> None: + """Only import the service functions when starting the service or generating + the schema, not the controller as a new FastAPI app will be started each time. + """ + from blueapi.service.openapi import ( + DOCS_SCHEMA_LOCATION, + generate_schema, + print_schema_as_yaml, + write_schema_as_yaml, + ) + """Generate the schema for the REST API""" schema = generate_schema() @@ -89,6 +93,17 @@ def start_application(obj: dict): """Run a worker that accepts plans to run""" config: ApplicationConfig = obj["config"] + """Only import the service functions when starting the service or generating + the schema, not the controller as a new FastAPI app will be started each time. + """ + from blueapi.service.main import app, start + + """ + Set up basic automated instrumentation for the FastAPI app, creating the + observability context. + """ + setup_tracing("BlueAPI", OTLP_EXPORT_ENABLED) + FastAPIInstrumentor().instrument_app(app, tracer_provider=get_tracer_provider()) start(config) @@ -103,7 +118,7 @@ def start_application(obj: dict): def controller(ctx: click.Context, output: str) -> None: """Client utility for controlling and introspecting the worker""" - setup_tracing("BlueAPICLI") # initialise TracerProvider for controller app + setup_tracing("BlueAPICLI", OTLP_EXPORT_ENABLED) if ctx.invoked_subcommand is None: print("Please invoke subcommand!") return diff --git a/src/blueapi/client/rest.py b/src/blueapi/client/rest.py index 274bc72cb..e6837fec6 100644 --- a/src/blueapi/client/rest.py +++ b/src/blueapi/client/rest.py @@ -135,11 +135,12 @@ def _request_and_deserialize( get_exception: Callable[[requests.Response], Exception | None] = _exception, ) -> T: url = self._url(suffix) + # Get the trace context to propagate to the REST API carr = get_context_propagator() if data: response = requests.request(method, url, json=data, headers=carr) else: - response = requests.request(method, url) + response = requests.request(method, url, headers=carr) exception = get_exception(response) if exception is not None: raise exception diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 21e6e9f3b..3502590ba 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -23,24 +23,6 @@ class Source(BaseModel): module: Path | str -# class BasicAuthentication(BaseModel): -# """ -# Log in details for when a server uses authentication. -# If username or passcode match exactly the regex ^\\${(.*)}$ -# they attempt to replace with an environment variable of the same. -# i.e. ${foo} or ${FOO} are replaced with the value of FOO -# """ - -# username: str = "test" # "guest" -# passcode: str = "test" # "guest" - -# @validator("username", "passcode") -# def get_from_env(cls, v: str): -# if v.startswith("${") and v.endswith("}"): -# return os.environ[v.removeprefix("${").removesuffix("}").upper()] -# return v - - class StompConfig(BaseModel): """ Config for connecting to stomp broker diff --git a/src/blueapi/core/__init__.py b/src/blueapi/core/__init__.py index 7b2306b0f..15e3b2602 100644 --- a/src/blueapi/core/__init__.py +++ b/src/blueapi/core/__init__.py @@ -1,3 +1,5 @@ +from os import environ + from .bluesky_event_loop import configure_bluesky_event_loop from .bluesky_types import ( BLUESKY_PROTOCOLS, @@ -14,6 +16,8 @@ from .context import BlueskyContext from .event import EventPublisher, EventStream +OTLP_EXPORT_ENABLED = environ.get("OTLP_EXPORT_ENABLED") == "true" + __all__ = [ "Plan", "PlanGenerator", diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 36d0a352f..d7b4480ca 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -16,7 +16,6 @@ start_as_current_span, ) from opentelemetry.context import attach -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.propagate import get_global_textmap from pydantic import ValidationError from starlette.responses import JSONResponse @@ -85,12 +84,7 @@ async def lifespan(app: FastAPI): version=REST_API_VERSION, ) -FastAPIInstrumentor().instrument_app(app) -TRACER = get_tracer("API") -""" -Set up basic automated instrumentation for the FastAPI app, creating the -observability context. -""" +TRACER = get_tracer("interface") @app.exception_handler(KeyError) @@ -111,7 +105,6 @@ def get_environment( @app.delete("/environment", response_model=EnvironmentResponse) -@start_as_current_span(TRACER, "background_tasks", "runner") async def delete_environment( background_tasks: BackgroundTasks, runner: WorkerDispatcher = Depends(_runner), diff --git a/src/blueapi/service/runner.py b/src/blueapi/service/runner.py index a01bb4ce3..293cae2ee 100644 --- a/src/blueapi/service/runner.py +++ b/src/blueapi/service/runner.py @@ -23,7 +23,7 @@ set_start_method("spawn", force=True) LOGGER = logging.getLogger(__name__) -TRACER = get_tracer("interface") +TRACER = get_tracer("runner") P = ParamSpec("P") T = TypeVar("T") diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index 9ae907901..97e63e034 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -23,6 +23,7 @@ from super_state_machine.errors import TransitionError from blueapi.core import ( + OTLP_EXPORT_ENABLED, BlueskyContext, DataEvent, EventPublisher, @@ -46,7 +47,7 @@ from .worker_errors import WorkerAlreadyStartedError, WorkerBusyError LOGGER = logging.getLogger(__name__) -TRACER = get_tracer("reworker") +TRACER = get_tracer("task_worker") """ Initialise a Tracer for this module provided by the app's global TracerProvider. """ DEFAULT_START_STOP_TIMEOUT: float = 30.0 @@ -126,10 +127,10 @@ def __init__( self._stopped.set() self._broadcast_statuses = broadcast_statuses self._context_register = {} - setup_tracing("BlueAPIWorker") + setup_tracing("BlueAPIWorker", OTLP_EXPORT_ENABLED) @start_as_current_span(TRACER, "task_id") - def clear_task(self, task_id: str, carr: dict[str, Any] = None) -> str: + def clear_task(self, task_id: str, carr: dict[str, Any] | None = None) -> str: task = self._tasks.pop(task_id) return task.task_id @@ -138,7 +139,7 @@ def cancel_active_task( self, failure: bool = False, reason: str | None = None, - carr: dict[str, Any] = None, + carr: dict[str, Any] | None = None, ) -> str: if self._current is None: # Persuades mypy that self._current is not None @@ -153,18 +154,18 @@ def cancel_active_task( return self._current.task_id @start_as_current_span(TRACER) - def get_tasks(self, carr: dict[str, Any] = None) -> list[TrackableTask]: + def get_tasks(self, carr: dict[str, Any] | None = None) -> list[TrackableTask]: return list(self._tasks.values()) @start_as_current_span(TRACER, "task_id") def get_task_by_id( - self, task_id: str, carr: dict[str, Any] = None + self, task_id: str, carr: dict[str, Any] | None = None ) -> TrackableTask | None: return self._tasks.get(task_id) @start_as_current_span(TRACER, "status") def get_tasks_by_status( - self, status: TaskStatusEnum, carr: dict[str, Any] = None + self, status: TaskStatusEnum, carr: dict[str, Any] | None = None ) -> list[TrackableTask]: if status == TaskStatusEnum.RUNNING: return [ @@ -180,7 +181,7 @@ def get_tasks_by_status( @start_as_current_span(TRACER) def get_active_task( - self, carr: dict[str, Any] = None + self, carr: dict[str, Any] | None = None ) -> TrackableTask[Task] | None: current = self._current if current is not None: @@ -188,7 +189,7 @@ def get_active_task( return current @start_as_current_span(TRACER, "task_id") - def begin_task(self, task_id: str, carr: dict[str, Any] = None) -> None: + def begin_task(self, task_id: str, carr: dict[str, Any] | None = None) -> None: task = self._tasks.get(task_id) if task is not None: self._submit_trackable_task(task) @@ -196,7 +197,7 @@ def begin_task(self, task_id: str, carr: dict[str, Any] = None) -> None: raise KeyError(f"No pending task with ID {task_id}") @start_as_current_span(TRACER, "task.name", "task.params") - def submit_task(self, task: Task, carr: dict[str, Any] = None) -> str: + def submit_task(self, task: Task, carr: dict[str, Any] | None = None) -> str: task.prepare_params(self._ctx) # Will raise if parameters are invalid task_id: str = str(uuid.uuid4()) add_span_attributes({"TaskId": task_id}) @@ -298,12 +299,12 @@ def run(self) -> None: self._stopped.set() @start_as_current_span(TRACER, "defer") - def pause(self, defer=False, carr: dict[str, Any] = None): + def pause(self, defer=False, carr: dict[str, Any] | None = None): LOGGER.info("Requesting to pause the worker") self._ctx.run_engine.request_pause(defer) @start_as_current_span(TRACER) - def resume(self, carr: dict[str, Any] = None): + def resume(self, carr: dict[str, Any] | None = None): LOGGER.info("Requesting to resume the worker") self._ctx.run_engine.resume() @@ -411,10 +412,6 @@ def _report_status( def _on_document(self, name: str, document: Mapping[str, Any]) -> None: if self._current is not None: - correlation_id = self._current.task_id - self._data_events.publish( - DataEvent(name=name, doc=document), correlation_id - ) with TRACER.start_as_current_span( "_on_document", context=self._context_register[self._current.task_id], diff --git a/tests/system_tests/plans.json b/tests/system_tests/plans.json index b5d0f76e2..d0bba9319 100644 --- a/tests/system_tests/plans.json +++ b/tests/system_tests/plans.json @@ -141,7 +141,7 @@ }, "radius": { "description": "Radius of the circle", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "Radius", "type": "number" }, @@ -170,19 +170,11 @@ "description": "Abstract baseclass for a combination of two regions, left and right.", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -207,19 +199,11 @@ "description": "Concatenate two Specs together, running one after the other.\n\nEach Dimension of left and right must contain the same axes. Typically\nformed using `Spec.concat`.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"x\", 1, 3, 3).concat(Line(\"x\", 4, 5, 5))", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The left-hand Spec to Concat, midpoints will appear earlier" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The right-hand Spec to Concat, midpoints will appear later" }, "gap": { @@ -256,19 +240,11 @@ "description": "A point is in DifferenceOf(a, b) if in a and not in b.\n\nTypically created with the ``-`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) - Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, False, False, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -312,18 +288,18 @@ }, "x_radius": { "description": "The radius along the x axis of the ellipse", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "X Radius", "type": "number" }, "y_radius": { "description": "The radius along the y axis of the ellipse", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "Y Radius", "type": "number" }, "angle": { - "default": 0, + "default": 0.0, "description": "The angle of the ellipse (degrees)", "title": "Angle", "type": "number" @@ -354,19 +330,11 @@ "description": "A point is in IntersectionOf(a, b) if in both a and b.\n\nTypically created with the ``&`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) & Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, False, True, False, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -434,19 +402,11 @@ "description": "Restrict Spec to only midpoints that fall inside the given Region.\n\nTypically created with the ``&`` operator. It also pushes down the\n``& | ^ -`` operators to its `Region` to avoid the need for brackets on\ncombinations of Regions.\n\nIf a Region spans multiple Frames objects, they will be squashed together.\n\n.. example_spec::\n\n from scanspec.regions import Circle\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 3, 3) * Line(\"x\", 3, 5, 5) & Circle(\"x\", \"y\", 4, 2, 1.2)\n\nSee Also: `why-squash-can-change-path`", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec containing the source midpoints" }, "region": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The Region that midpoints will be inside" }, "check_path_changes": { @@ -526,19 +486,11 @@ "description": "Outer product of two Specs, nesting inner within outer.\n\nThis means that inner will run in its entirety at each point in outer.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 2, 3) * Line(\"x\", 3, 4, 12)", "properties": { "outer": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "Will be executed once" }, "inner": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "Will be executed len(outer) times" }, "type": { @@ -627,7 +579,7 @@ "type": "number" }, "angle": { - "default": 0, + "default": 0.0, "description": "Clockwise rotation angle of the rectangle", "title": "Angle", "type": "number" @@ -739,11 +691,7 @@ "description": "Run the Spec in reverse on every other iteration when nested.\n\nTypically created with the ``~`` operator.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 3, 3) * ~Line(\"x\", 3, 5, 5)", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec to run in reverse every other iteration" }, "type": { @@ -850,7 +798,7 @@ "type": "integer" }, "rotate": { - "default": 0, + "default": 0.0, "description": "How much to rotate the angle of the spiral", "title": "Rotate", "type": "number" @@ -882,11 +830,7 @@ "description": "Squash a stack of Frames together into a single expanded Frames object.\n\nSee Also:\n `why-squash-can-change-path`\n\n.. example_spec::\n\n from scanspec.specs import Line, Squash\n\n spec = Squash(Line(\"y\", 1, 2, 3) * Line(\"x\", 0, 1, 4))", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec to squash the dimensions of" }, "check_path_changes": { @@ -953,19 +897,11 @@ "description": "A point is in SymmetricDifferenceOf(a, b) if in either a or b, but not both.\n\nTypically created with the ``^`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) ^ Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, False, True, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -990,19 +926,11 @@ "description": "A point is in UnionOf(a, b) if in either a or b.\n\nTypically created with the ``|`` operator\n\n>>> r = Range(\"x\", 0.5, 2.5) | Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, True, True, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -1027,19 +955,11 @@ "description": "Run two Specs in parallel, merging their midpoints together.\n\nTypically formed using `Spec.zip`.\n\nStacks of Frames are merged by:\n\n- If right creates a stack of a single Frames object of size 1, expand it to\n the size of the fastest Frames object created by left\n- Merge individual Frames objects together from fastest to slowest\n\nThis means that Zipping a Spec producing stack [l2, l1] with a Spec\nproducing stack [r1] will assert len(l1)==len(r1), and produce\nstack [l2, l1.zip(r1)].\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"z\", 1, 2, 3) * Line(\"y\", 3, 4, 5).zip(Line(\"x\", 4, 5, 5))", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The left-hand Spec to Zip, will appear earlier in axes" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The right-hand Spec to Zip, will appear later in axes" }, "type": { diff --git a/tests/unit_tests/core/fake_device_module.py b/tests/unit_tests/core/fake_device_module.py index d27af8480..0e2ea690c 100644 --- a/tests/unit_tests/core/fake_device_module.py +++ b/tests/unit_tests/core/fake_device_module.py @@ -32,7 +32,7 @@ def _mock_with_name(name: str) -> MagicMock: return mock -def wrong_return_type() -> int: +def wrong_return_type(*args, **kwargs) -> int: return "str" # type: ignore diff --git a/tests/unit_tests/service/test_interface.py b/tests/unit_tests/service/test_interface.py index 86dd2e5c1..0def741c6 100644 --- a/tests/unit_tests/service/test_interface.py +++ b/tests/unit_tests/service/test_interface.py @@ -1,6 +1,6 @@ import uuid from dataclasses import dataclass -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import ANY, MagicMock, Mock, patch import pytest from bluesky_stomp.messaging import StompClient @@ -16,6 +16,10 @@ from blueapi.worker.task import Task from blueapi.worker.task_worker import TrackableTask +TEST_CARRIER = { + "traceparent": "00-6fc19ca6e39c1fc845bb7c6fe28c7fdc-00babae1b7c9591d-01" +} + @pytest.fixture def mock_connection() -> Mock: @@ -36,7 +40,7 @@ def ensure_worker_stopped(): of an assertion error. The start_worker method is not managed by a fixture as some of the tests require it to be customised.""" yield - interface.teardown() + interface.teardown(TEST_CARRIER) def my_plan() -> MsgGenerator: @@ -56,7 +60,7 @@ def test_get_plans(context_mock: MagicMock): context.plan(my_second_plan) context_mock.return_value = context - assert interface.get_plans() == [ + assert interface.get_plans(TEST_CARRIER) == [ PlanModel( name="my_plan", description="My plan does cool stuff.", @@ -88,7 +92,7 @@ def test_get_plan(context_mock: MagicMock): context.plan(my_second_plan) context_mock.return_value = context - assert interface.get_plan("my_plan") == PlanModel( + assert interface.get_plan(TEST_CARRIER, "my_plan") == PlanModel( name="my_plan", description="My plan does cool stuff.", schema={ @@ -100,7 +104,7 @@ def test_get_plan(context_mock: MagicMock): ) with pytest.raises(KeyError): - interface.get_plan("non_existing_plan") + interface.get_plan(TEST_CARRIER, "non_existing_plan") @dataclass @@ -115,7 +119,7 @@ def test_get_devices(context_mock: MagicMock): context.device(SynAxis(name="my_axis")) context_mock.return_value = context - assert interface.get_devices() == [ + assert interface.get_devices(TEST_CARRIER) == [ DeviceModel(name="my_device", protocols=["HasName"]), DeviceModel( name="my_axis", @@ -143,12 +147,12 @@ def test_get_device(context_mock: MagicMock): context.device(MyDevice(name="my_device")) context_mock.return_value = context - assert interface.get_device("my_device") == DeviceModel( + assert interface.get_device(TEST_CARRIER, "my_device") == DeviceModel( name="my_device", protocols=["HasName"] ) with pytest.raises(KeyError): - assert interface.get_device("non_existing_device") + assert interface.get_device(TEST_CARRIER, "non_existing_device") @patch("blueapi.service.interface.context") @@ -160,7 +164,7 @@ def test_submit_task(context_mock: MagicMock): mock_uuid_value = "8dfbb9c2-7a15-47b6-bea8-b6b77c31d3d9" with patch.object(uuid, "uuid4") as uuid_mock: uuid_mock.return_value = uuid.UUID(mock_uuid_value) - task_uuid = interface.submit_task(task) + task_uuid = interface.submit_task(TEST_CARRIER, task) assert task_uuid == mock_uuid_value @@ -173,9 +177,9 @@ def test_clear_task(context_mock: MagicMock): mock_uuid_value = "3d858a62-b40a-400f-82af-8d2603a4e59a" with patch.object(uuid, "uuid4") as uuid_mock: uuid_mock.return_value = uuid.UUID(mock_uuid_value) - interface.submit_task(task) + interface.submit_task(TEST_CARRIER, task) - clear_task_return = interface.clear_task(mock_uuid_value) + clear_task_return = interface.clear_task(TEST_CARRIER, mock_uuid_value) assert clear_task_return == mock_uuid_value @@ -183,7 +187,7 @@ def test_clear_task(context_mock: MagicMock): def test_begin_task(worker_mock: MagicMock): uuid_value = "350043fd-597e-41a7-9a92-5d5478232cf7" task = WorkerTask(task_id=uuid_value) - returned_task = interface.begin_task(task) + returned_task = interface.begin_task(TEST_CARRIER, task) assert task == returned_task worker_mock.assert_called_once_with(uuid_value) @@ -191,7 +195,7 @@ def test_begin_task(worker_mock: MagicMock): @patch("blueapi.service.interface.TaskWorker.begin_task") def test_begin_task_no_task_id(worker_mock: MagicMock): task = WorkerTask(task_id=None) - returned_task = interface.begin_task(task) + returned_task = interface.begin_task(TEST_CARRIER, task) assert task == returned_task worker_mock.assert_not_called() @@ -212,35 +216,37 @@ def mock_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]: get_tasks_by_status_mock.side_effect = mock_tasks_by_status - assert interface.get_tasks_by_status(TaskStatusEnum.PENDING) == [ + assert interface.get_tasks_by_status(TEST_CARRIER, TaskStatusEnum.PENDING) == [ pending_task1, pending_task2, ] - assert interface.get_tasks_by_status(TaskStatusEnum.RUNNING) == [running_task] - assert interface.get_tasks_by_status(TaskStatusEnum.COMPLETE) == [] + assert interface.get_tasks_by_status(TEST_CARRIER, TaskStatusEnum.RUNNING) == [ + running_task + ] + assert interface.get_tasks_by_status(TEST_CARRIER, TaskStatusEnum.COMPLETE) == [] def test_get_active_task(): - assert interface.get_active_task() is None + assert interface.get_active_task(TEST_CARRIER) is None def test_get_worker_state(): - assert interface.get_worker_state() == WorkerState.IDLE + assert interface.get_worker_state(TEST_CARRIER) == WorkerState.IDLE @patch("blueapi.service.interface.TaskWorker.pause") def test_pause_worker(pause_worker_mock: MagicMock): - interface.pause_worker(False) + interface.pause_worker(TEST_CARRIER, False) pause_worker_mock.assert_called_once_with(False) pause_worker_mock.reset_mock() - interface.pause_worker(True) + interface.pause_worker(TEST_CARRIER, True) pause_worker_mock.assert_called_once_with(True) @patch("blueapi.service.interface.TaskWorker.resume") def test_resume_worker(resume_worker_mock: MagicMock): - interface.resume_worker() + interface.resume_worker(TEST_CARRIER) resume_worker_mock.assert_called_once() @@ -250,7 +256,7 @@ def test_cancel_active_task(cancel_active_task_mock: MagicMock): reason = "End of session" task_id = "789" cancel_active_task_mock.return_value = task_id - assert interface.cancel_active_task(fail, reason) == task_id + assert interface.cancel_active_task(TEST_CARRIER, fail, reason) == task_id cancel_active_task_mock.assert_called_once_with(fail, reason) @@ -263,7 +269,7 @@ def test_get_tasks(get_tasks_mock: MagicMock): ] get_tasks_mock.return_value = tasks - assert interface.get_tasks() == tasks + assert interface.get_tasks(TEST_CARRIER) == tasks @patch("blueapi.service.interface.context") @@ -272,10 +278,13 @@ def test_get_task_by_id(context_mock: MagicMock): context.plan(my_plan) context_mock.return_value = context - task_id = interface.submit_task(Task(name="my_plan")) + task_id = interface.submit_task(TEST_CARRIER, Task(name="my_plan")) - assert interface.get_task_by_id(task_id) == TrackableTask( + assert interface.get_task_by_id( + TEST_CARRIER, task_id + ) == TrackableTask.model_construct( task_id=task_id, + request_id=ANY, task=Task(name="my_plan", params={}), is_complete=False, is_pending=True, diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 6bb08cb01..3eba9cdde 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -23,6 +23,22 @@ from blueapi.worker.task_worker import TrackableTask +class _ANY_DICT(dict): + "A helper object that compares equal to all dictionaries." + + def __eq__(self, other): + return isinstance(other, dict) + + def __ne__(self, other): + return False + + def __repr__(self): + return "" + + +ANY_DICT = _ANY_DICT() + + @pytest.fixture def client() -> Iterator[TestClient]: with ( @@ -70,7 +86,7 @@ class MyModel(BaseModel): response = client.get("/plans/my-plan") - get_plan_mock.assert_called_once_with("my-plan") + get_plan_mock.assert_called_once_with(ANY_DICT, "my-plan") assert response.status_code == status.HTTP_200_OK assert response.json() == { "description": None, @@ -128,7 +144,7 @@ class MyDevice: get_device_mock.return_value = DeviceModel.from_device(device) response = client.get("/devices/my-device") - get_device_mock.assert_called_once_with("my-device") + get_device_mock.assert_called_once_with(ANY_DICT, "my-device") assert response.status_code == status.HTTP_200_OK assert response.json() == { "name": "my-device", @@ -159,7 +175,7 @@ def test_create_task( response = client.post("/tasks", json=task.model_dump()) - submit_task_mock.assert_called_once_with(task) + submit_task_mock.assert_called_once_with(ANY_DICT, task) assert response.json() == {"task_id": task_id} @@ -252,6 +268,7 @@ def test_get_tasks(get_tasks_mock: MagicMock, client: TestClient) -> None: "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "sleep", "params": {"time": 0.0}}, "task_id": "0", }, @@ -259,6 +276,7 @@ def test_get_tasks(get_tasks_mock: MagicMock, client: TestClient) -> None: "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "first_task", "params": {}}, "task_id": "1", }, @@ -288,6 +306,7 @@ def test_get_tasks_by_status( "errors": [], "is_complete": True, "is_pending": False, + "request_id": "", "task": {"name": "third_task", "params": {}}, "task_id": "3", } @@ -379,6 +398,7 @@ def test_get_task(get_task_by_id: MagicMock, client: TestClient): "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "third_task", "params": {}}, "task_id": f"{task_id}", } @@ -404,6 +424,7 @@ def test_get_all_tasks(get_all_tasks: MagicMock, client: TestClient): "task": {"name": "third_task", "params": {}}, "is_complete": False, "is_pending": True, + "request_id": "", "errors": [], } ] @@ -464,7 +485,7 @@ def test_set_state_running_to_paused( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() ) - pause_worker_mock.assert_called_once_with(False) + pause_worker_mock.assert_called_once_with(ANY_DICT, False) assert response.status_code == status.HTTP_202_ACCEPTED assert response.json() == final_state @@ -502,7 +523,7 @@ def test_set_state_running_to_aborting( "/worker/state", json=StateChangeRequest(new_state=final_state).model_dump() ) - cancel_active_task_mock.assert_called_once_with(True, None) + cancel_active_task_mock.assert_called_once_with(ANY_DICT, True, None) assert response.status_code == status.HTTP_202_ACCEPTED assert response.json() == final_state @@ -524,7 +545,7 @@ def test_set_state_running_to_stopping_including_reason( json=StateChangeRequest(new_state=final_state, reason=reason).model_dump(), ) - cancel_active_task_mock.assert_called_once_with(False, reason) + cancel_active_task_mock.assert_called_once_with(ANY_DICT, False, reason) assert response.status_code == status.HTTP_202_ACCEPTED assert response.json() == final_state diff --git a/tests/unit_tests/service/test_runner.py b/tests/unit_tests/service/test_runner.py index af65e9b0a..bcce9874d 100644 --- a/tests/unit_tests/service/test_runner.py +++ b/tests/unit_tests/service/test_runner.py @@ -186,42 +186,42 @@ class GenericModel(BaseModel, Generic[T]): b: str -def return_int() -> int: +def return_int(*args, **kwargs) -> int: return 1 -def return_str() -> str: +def return_str(*args, **kwargs) -> str: return "hello" -def return_list() -> list[int]: +def return_list(*args, **kwargs) -> list[int]: return [1, 2, 3] -def return_dict() -> dict[str, int]: +def return_dict(*args, **kwargs) -> dict[str, int]: return { "test": 1, "other_test": 2, } -def return_simple_model() -> SimpleModel: +def return_simple_model(*args, **kwargs) -> SimpleModel: return SimpleModel(a=1, b="hi") -def return_nested_model() -> NestedModel: +def return_nested_model(*args, **kwargs) -> NestedModel: return NestedModel(nested=return_simple_model(), c=False) -def return_unbound_generic_model() -> GenericModel: +def return_unbound_generic_model(*args, **kwargs) -> GenericModel: return GenericModel(a="foo", b="bar") -def return_bound_generic_model() -> GenericModel[int]: +def return_bound_generic_model(*args, **kwargs) -> GenericModel[int]: return GenericModel(a=1, b="hi") -def return_explicitly_bound_generic_model() -> GenericModel[int]: +def return_explicitly_bound_generic_model(*args, **kwargs) -> GenericModel[int]: return GenericModel[int](a=1, b="hi") diff --git a/tests/unit_tests/worker/test_task_worker.py b/tests/unit_tests/worker/test_task_worker.py index cfb606424..601a38d27 100644 --- a/tests/unit_tests/worker/test_task_worker.py +++ b/tests/unit_tests/worker/test_task_worker.py @@ -4,7 +4,7 @@ from concurrent.futures import Future from queue import Full from typing import Any, TypeVar -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, patch import pytest @@ -115,7 +115,9 @@ def test_submit_task(worker: TaskWorker) -> None: assert worker.get_tasks() == [] task_id = worker.submit_task(_SIMPLE_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] @@ -123,12 +125,18 @@ def test_submit_multiple_tasks(worker: TaskWorker) -> None: assert worker.get_tasks() == [] task_id_1 = worker.submit_task(_SIMPLE_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id_1, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id_1, request_id=ANY, task=_SIMPLE_TASK + ) ] task_id_2 = worker.submit_task(_LONG_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id_1, request_id="None", task=_SIMPLE_TASK), - TrackableTask(task_id=task_id_2, request_id="None", task=_LONG_TASK), + TrackableTask.model_construct( + task_id=task_id_1, request_id=ANY, task=_SIMPLE_TASK + ), + TrackableTask.model_construct( + task_id=task_id_2, request_id=ANY, task=_LONG_TASK + ), ] @@ -141,12 +149,16 @@ def test_stop_with_task_pending(inert_worker: TaskWorker) -> None: def test_restart_leaves_task_pending(worker: TaskWorker) -> None: task_id = worker.submit_task(_SIMPLE_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] worker.stop() worker.start() assert worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] @@ -154,18 +166,24 @@ def test_submit_before_start_pending(inert_worker: TaskWorker) -> None: task_id = inert_worker.submit_task(_SIMPLE_TASK) inert_worker.start() assert inert_worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] inert_worker.stop() assert inert_worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] def test_clear_task(worker: TaskWorker) -> None: task_id = worker.submit_task(_SIMPLE_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id, request_id="None", task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] assert worker.clear_task(task_id) assert worker.get_tasks() == []