diff --git a/README.md b/README.md index 00c9c28..9994cee 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,34 @@ ## Getting Started +```python +import prefect +from prefect import Flow, task +from prefect_saturn import PrefectCloudIntegration + + +@task +def hello_task(): + logger = prefect.context.get("logger") + logger.info("hello prefect-saturn") + + +flow = Flow("sample-flow", tasks=[hello_task]) + +project_name = "sample-project" +integration = PrefectCloudIntegration( + prefect_cloud_project_name=project_name +) +flow = integration.register_flow_with_saturn(flow) + +flow.register( + project_name=project_name, + labels=["saturn-cloud"] +) +``` + +## Installation + `prefect-saturn` is available on PyPi. ```shell diff --git a/VERSION b/VERSION index 4e379d2..6e8bf73 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.2 +0.1.0 diff --git a/prefect_saturn/core.py b/prefect_saturn/core.py index 72cf940..963afe2 100644 --- a/prefect_saturn/core.py +++ b/prefect_saturn/core.py @@ -4,19 +4,17 @@ import hashlib import os -import uuid from typing import Any, Dict, Optional from requests import Session from requests.adapters import HTTPAdapter -from requests.models import Response from requests.packages.urllib3.util.retry import Retry import cloudpickle from prefect import Flow from prefect.client import Client from prefect.engine.executors import DaskExecutor -from prefect.environments.storage import Docker +from prefect.environments.storage import Webhook from prefect.environments import KubernetesJobEnvironment import yaml @@ -29,18 +27,28 @@ class PrefectCloudIntegration: .. code-block:: python - integration = PrefectCloudIntegration(PROJECT_NAME) - integration.register_flow_with_saturn(flow) - flow = integration.add_environment(flow) - flow = integration.add_storage(flow) - flow.storage.add_flow(flow) - integration.build_storage(flow) + import prefect + from prefect import Flow, task + from prefect_saturn import PrefectCloudIntegration + + + @task + def hello_task(): + logger = prefect.context.get("logger") + logger.info("hello prefect-saturn") + + + flow = Flow("sample-flow", tasks=[hello_task]) + + project_name = "sample-project" + integration = PrefectCloudIntegration( + prefect_cloud_project_name=project_name + ) + flow = integration.register_flow_with_saturn(flow) + flow.register( - project_name=PROJECT_NAME, - build=False, - labels=[ - "saturn-cloud" - ] + project_name=project_name, + labels=["saturn-cloud"] ) """ @@ -59,7 +67,9 @@ def __init__(self, prefect_cloud_project_name: str): raise RuntimeError(Errors.missing_env_var("BASE_URL")) self.prefect_cloud_project_name: str = prefect_cloud_project_name - self._saturn_flow_id: Optional[int] = None + self._saturn_flow_id: Optional[str] = None + self._saturn_flow_version_id: Optional[str] = None + self._saturn_image: Optional[str] = None # set up logic for authenticating with Saturn back-end service retry_logic = HTTPAdapter(max_retries=Retry(total=3)) @@ -68,15 +78,6 @@ def __init__(self, prefect_cloud_project_name: str): self._session.mount("https://", retry_logic) self._session.headers.update({"Authorization": f"token {SATURN_TOKEN}"}) - # figure out the image this notebook is running in - res = self._session.get( - url=f"{self._base_url}api/current_image", headers={"Content-Type": "application/json"} - ) - res.raise_for_status() - self._saturn_base_image = res.json()["image"] - - self._storage_details: Optional[Dict[str, Any]] = None - def _hash_flow(self, flow: Flow) -> str: """ In Prefect Cloud, all versions of a flow in a project are tied together @@ -105,25 +106,34 @@ def _hash_flow(self, flow: Flow) -> str: hasher.update(cloudpickle.dumps(identifying_content)) return hasher.hexdigest() - def register_flow_with_saturn(self, flow: Flow) -> bool: + def _set_flow_metadata(self, flow: Flow) -> None: """ Given a Flow, register it with Saturn. This method has - the following side effects: + the following side effects. - The first time you run it: + The first time you run it for a flow: - * create a Deployment in Saturn's database for this flow - * create a record of the flow in Saturn's database - * store a hash of the flow in the database - * store a commit hash for the current state of /home/jovyan/project + * creates a Deployment in Saturn's database for this flow + * creates a record of the flow in Saturn's database + * stores a hash of the flow in the database + * stores a commit hash for the current state of the git repo in + /home/jovyan/project + * sets ``self._saturn_flow_id`` to an identifier for the flow in Saturn + * sets ``self._saturn_flow_version_id`` to an identifier for this flow + version + * sets ``self._saturn_image`` to the name of a Docker image the flow + will run in - Any time you run it + If you run this method for a flow that already exists in Saturn: - * update the flow's hash in the database - * update the /home/jovyan/project commit hash stored in the DB - * update the deployment's image to the current image for the userproject + * updates the /home/jovyan/project commit hash stored in the DB + * updates the deployment's image to the current image for the project it belongs to - * sets ``self._saturn_flow_id`` on this instance + * updates ``self._saturn_flow_version_id`` to a new value. Just like + ``flow.register()`` from ``prefect``, each call of this method + generates a new flow version. + * updates ``self._saturn_image`` in case the image for the flow + has changed """ res = self._session.put( url=f"{self._base_url}api/prefect_cloud/flows", @@ -136,91 +146,115 @@ def register_flow_with_saturn(self, flow: Flow) -> bool: ) res.raise_for_status() response_json = res.json() - self._saturn_flow_id = response_json["id"] - return True + self._saturn_flow_id = str(response_json["id"]) + self._saturn_flow_version_id = response_json["flow_version_id"] + self._saturn_image = response_json["image"] @property - def storage_details(self) -> Dict[str, Any]: + def flow_id(self) -> str: """ - Information from Atlas used to build flow storage. - This method can only be run for flows which have already been registered - with ``register_flow_with_saturn()``. - - updates ``self._storage_details`` with the following Saturn-specific details for this flow: - - registry_url: the container registry to push flow storage too, since - Saturn currently uses ``Docker`` storage from Prefect - - image_name: name for the Docker image that will store the flow + Saturn identifier for a flow. This uniquely identifies a flow + within a Saturn instance. """ if self._saturn_flow_id is None: raise RuntimeError(Errors.NOT_REGISTERED) + return self._saturn_flow_id - res = self._session.get( - url=f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/storage_details", - headers={"Content-Type": "application/json"}, - ) - res.raise_for_status() - response_json = res.json() - self._storage_details = response_json - return response_json + @property + def flow_version_id(self) -> str: + """ + Unique identifier for one version of a flow. This is randomly + generated by Saturn and is not an identifier known to + Prefect Cloud. This is used to ensure that each flow run + pulls the correct flow version from storage. + """ + if self._saturn_flow_version_id is None: + raise RuntimeError(Errors.NOT_REGISTERED) + return self._saturn_flow_version_id - def add_storage(self, flow: Flow) -> Flow: + @property + def image(self) -> str: + """ + Docker image that flows will run in. This is the same as + the image used in the Saturn project where the flow was + created. """ - Create a Docker Storage object with Saturn-y details and set - it on `flow.storage`. + if self._saturn_image is None: + raise RuntimeError(Errors.NOT_REGISTERED) + return self._saturn_image + + def register_flow_with_saturn( + self, flow: Flow, dask_cluster_kwargs=None, dask_adapt_kwargs=None + ) -> Flow: + """ + Given a flow, set up all the details needed to run it on + a Saturn Dask cluster. + + This method modifies the following components of ``Flow`` objects + passed to it. - This method sets the `image_tag` to a random string to avoid conflicts. - The image name is generated in Saturn. + * ``.storage``: a ``Webhook`` storage instance is added + * ``.environment``: a ``KubernetesJobEnvironment`` with a ``DaskExecutor`` + is added. This environment will use the same image as the notebook + from which this code is run. """ - storage_details = self.storage_details - image_tag = str(uuid.uuid4()) - # NOTE: SATURN_TOKEN and BASE_URL have to be set to be able - # to load the flow. Those variables will be overridden by - # Kubernetes in all the places where it matters, and values - # set in the image are overridden by those from kubernetes - # https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container - flow.storage = Docker( - base_image=self._saturn_base_image, - image_name=storage_details["image_name"], - image_tag=image_tag, - registry_url=storage_details["registry_url"], - prefect_directory="/tmp", - env_vars={"SATURN_TOKEN": "placeholder-token", "BASE_URL": "placeholder-url"}, - python_dependencies=["kubernetes"], - ignore_healthchecks=True, + self._set_flow_metadata(flow) + + storage = self._get_storage() + flow.storage = storage + + environment = self._get_environment( + cluster_kwargs=dask_cluster_kwargs, adapt_kwargs=dask_adapt_kwargs ) + flow.environment = environment + return flow - def build_storage(self, flow: Flow) -> Response: + def _get_storage(self) -> Webhook: """ - Actually build and push the storage + Create a `Webhook` storage object with Saturn-y details. """ - if self._saturn_flow_id is None: - raise RuntimeError(Errors.NOT_REGISTERED) - res = self._session.post( - url=f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/store", - headers={"Content-Type": "application/octet-stream"}, - data=cloudpickle.dumps(flow), + url = ( + "${BASE_URL}api/prefect_cloud/flows/" + + self.flow_id + + "/" + + self.flow_version_id + + "/content" ) - res.raise_for_status() - return res + storage = Webhook( + build_request_kwargs={ + "url": url, + "headers": { + "Content-Type": "application/octet-stream", + "Authorization": "token ${SATURN_TOKEN}", + }, + }, + build_request_http_method="POST", + get_flow_request_kwargs={ + "url": url, + "headers": { + "Accept": "application/octet-stream", + "Authorization": "token ${SATURN_TOKEN}", + }, + }, + get_flow_request_http_method="GET", + ) + + return storage - def add_environment( + def _get_environment( self, - flow: Flow, cluster_kwargs: Optional[Dict[str, Any]] = None, adapt_kwargs: Optional[Dict[str, Any]] = None, - ) -> Flow: + ) -> KubernetesJobEnvironment: """ Get an environment that customizes the execution of a Prefect flow run. """ cluster_kwargs = cluster_kwargs or {"n_workers": 1} adapt_kwargs = adapt_kwargs or {"minimum": 1, "maximum": 2} - if self._saturn_flow_id is None: - raise RuntimeError(Errors.NOT_REGISTERED) - # get job spec with Saturn details from Atlas - url = f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/run_job_spec" + url = f"{self._base_url}api/prefect_cloud/flows/{self.flow_id}/run_job_spec" response = self._session.get(url=url) response.raise_for_status() job_dict = response.json() @@ -229,8 +263,9 @@ def add_environment( with open(local_tmp_file, "w") as f: f.write(yaml.dump(job_dict)) + # saturn_flow_id is used by Saturn's custom Prefect agent k8s_environment = KubernetesJobEnvironment( - metadata={"saturn_flow_id": self._saturn_flow_id}, + metadata={"saturn_flow_id": self.flow_id, "image": self.image}, executor=DaskExecutor( cluster_class="dask_saturn.SaturnCluster", cluster_kwargs=cluster_kwargs, @@ -253,5 +288,4 @@ def add_environment( new_args = f"source /home/jovyan/.saturn/start.sh; {args_from_prefect}" k8s_environment._job_spec["spec"]["template"]["spec"]["containers"][0]["args"] = [new_args] - flow.environment = k8s_environment - return flow + return k8s_environment diff --git a/setup.py b/setup.py index d091985..26b6bc4 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,8 @@ with open("VERSION", "r") as f: version = f.read().strip() -install_requires = ["cloudpickle", "dask-saturn>=0.0.4", "prefect", "requests"] +# prefect has to be 0.13.0 or newer to get Webhook storage +install_requires = ["cloudpickle", "dask-saturn>=0.0.4", "prefect>0.13.0", "requests"] testing_deps = ["pytest", "pytest-cov", "responses"] setup( diff --git a/tests/test_core.py b/tests/test_core.py index d1ef9f4..5a7803b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -6,21 +6,21 @@ import uuid import yaml -from copy import deepcopy from typing import Any, Dict, Optional from prefect import task, Flow from prefect.engine.executors import DaskExecutor -from prefect.environments import KubernetesJobEnvironment -from prefect.environments.storage import Docker +from prefect.environments import KubernetesJobEnvironment, LocalEnvironment +from prefect.environments.storage import Webhook from pytest import raises from requests.exceptions import HTTPError from unittest.mock import patch os.environ["SATURN_TOKEN"] = "placeholder-token" -os.environ["BASE_URL"] = "http://placeholder-url" +os.environ["BASE_URL"] = "http://placeholder-url/" -TEST_FLOW_ID = random.randint(1, 500) +TEST_FLOW_ID = str(random.randint(1, 500)) +TEST_FLOW_VERSION_ID = str(uuid.uuid4()) TEST_FLOW_NAME = "plz-w0rk" TEST_DEPLOYMENT_TOKEN = str(uuid.uuid4()) TEST_IMAGE = "test-image:0.1" @@ -36,92 +36,66 @@ def hello_task(): TEST_FLOW = Flow(TEST_FLOW_NAME, tasks=[hello_task]) -# ------------------ # -# /api/current_image # -# ------------------ # -CURRENT_IMAGE_RESPONSE = { - "method": responses.GET, - "url": f"{os.environ['BASE_URL']}/api/current_image", - "json": {"image": TEST_IMAGE}, - "status": 200, -} - - -def CURRENT_IMAGE_FAILURE_RESPONSE(status: int): - return { - "method": responses.GET, - "url": f"{os.environ['BASE_URL']}/api/current_image", - "json": {"image": TEST_IMAGE}, - "status": status, - } - # ------------------------ # # /api/prefect_cloud/flows # # ------------------------ # def REGISTER_FLOW_RESPONSE( - flow_id: Optional[int] = None, - name: Optional[str] = None, - prefect_cloud_project_name: Optional[str] = None, - flow_hash: Optional[str] = None, - deployment_id: Optional[int] = None, - status: Optional[int] = None, + flow_id: Optional[str] = None, status: Optional[int] = None, ) -> Dict[str, Any]: return { "method": responses.PUT, - "url": f"{os.environ['BASE_URL']}/api/prefect_cloud/flows", + "url": f"{os.environ['BASE_URL']}api/prefect_cloud/flows", "json": { "id": flow_id or TEST_FLOW_ID, - "name": name or "c00l-flow", - "prefect_cloud_project_name": prefect_cloud_project_name or TEST_PREFECT_PROJECT_NAME, - "flow_hash": flow_hash or "chicago", - "deployment": {"id": deployment_id or random.randint(1, 500)}, + "flow_version_id": TEST_FLOW_VERSION_ID, + "image": TEST_IMAGE, }, "status": status or 201, } -def REGISTER_FLOW_FAILURE_RESPONSE(status: int): +def REGISTER_FLOW_FAILURE_RESPONSE(status: int) -> Dict[str, Any]: return { "method": responses.PUT, - "url": f"{os.environ['BASE_URL']}/api/prefect_cloud/flows", + "url": f"{os.environ['BASE_URL']}api/prefect_cloud/flows", "status": status, } -# -------------------------------------------- # -# /api/prefect_cloud/flows/{id}/storage_details # -# -------------------------------------------- # -TEST_NODE_ROLE_KEY = "some-stuff/role" -TEST_NODE_ROLE = "medium" -TEST_ENV_SECRET_NAME = "prefect-some-n0ns3nse" -STORAGE_DETAILS_RESPONSE = { - "method": responses.GET, - "url": f"{os.environ['BASE_URL']}/api/prefect_cloud/flows/{TEST_FLOW_ID}/storage_details", - "json": {"registry_url": TEST_REGISTRY_URL, "image_name": TEST_IMAGE}, - "status": 200, -} - -STORAGE_DETAILS_FAILURE_RESPONSE = { - "method": responses.GET, - "url": f"{os.environ['BASE_URL']}/api/prefect_cloud/flows/{TEST_FLOW_ID}/storage_details", - "status": 404, -} - # ----------------------------------- # # /api/prefect_cloud/flows/{id}/store # # ----------------------------------- # -BUILD_STORAGE_RESPONSE = { - "method": responses.POST, - "url": f"{os.environ['BASE_URL']}/api/prefect_cloud/flows/{TEST_FLOW_ID}/store", - "status": 201, -} +def BUILD_STORAGE_RESPONSE( + flow_id: str = TEST_FLOW_ID, flow_version_id: str = TEST_FLOW_VERSION_ID +) -> Dict[str, Any]: + return { + "method": responses.POST, + "url": ( + f"{os.environ['BASE_URL']}api/prefect_cloud/flows" + f"/{flow_id}/{flow_version_id}/content" + ), + "status": 201, + } + + +def BUILD_STORAGE_FAILURE_RESPONSE( + status: int, flow_id: str = TEST_FLOW_ID, flow_version_id: str = TEST_FLOW_VERSION_ID +) -> Dict[str, Any]: + return { + "method": responses.POST, + "url": ( + f"{os.environ['BASE_URL']}api/prefect_cloud/flows" + f"/{flow_id}/{flow_version_id}/content" + ), + "status": status, + } # ------------------------------------------ # # /api/prefect_cloud/flows/{id}/run_job_spec # # ------------------------------------------ # -def REGISTER_RUN_JOB_SPEC_RESPONSE(status: int, flow_id: int = TEST_FLOW_ID) -> Dict[str, Any]: +def REGISTER_RUN_JOB_SPEC_RESPONSE(status: int, flow_id: str = TEST_FLOW_ID) -> Dict[str, Any]: run_job_spec_file = os.path.join(os.path.dirname(__file__), "run-job-spec.yaml") with open(run_job_spec_file, "r") as file: run_job_spec = yaml.load(file, Loader=yaml.FullLoader) @@ -129,7 +103,7 @@ def REGISTER_RUN_JOB_SPEC_RESPONSE(status: int, flow_id: int = TEST_FLOW_ID) -> base_url = os.environ["BASE_URL"] return { "method": responses.GET, - "url": f"{base_url}/api/prefect_cloud/flows/{flow_id}/run_job_spec", + "url": f"{base_url}api/prefect_cloud/flows/{flow_id}/run_job_spec", "json": run_job_spec, "status": status, } @@ -141,34 +115,28 @@ def __init__(self): pass -@responses.activate def test_initialize(): - responses.add(**CURRENT_IMAGE_RESPONSE) project_name = "some-pr0ject" integration = prefect_saturn.PrefectCloudIntegration(prefect_cloud_project_name=project_name) assert getattr(integration, "SATURN_TOKEN", None) is None assert integration._session.headers["Authorization"] == f"token {os.environ['SATURN_TOKEN']}" assert integration.prefect_cloud_project_name == project_name - assert integration._saturn_base_image == TEST_IMAGE + assert integration._saturn_image is None assert integration._saturn_flow_id is None + assert integration._saturn_flow_version_id is None - # __init__() should add this trailing slash if it's necessary - assert integration._base_url == os.environ["BASE_URL"] + "/" +def test_initialize_adds_slash_to_base_url(monkeypatch): + monkeypatch.setenv("BASE_URL", "http://abc") + integration = prefect_saturn.PrefectCloudIntegration("x") + assert integration._base_url == "http://abc/" -@responses.activate -def test_initialize_raises_error_on_failure(): - responses.add(**CURRENT_IMAGE_FAILURE_RESPONSE(403)) - with raises(HTTPError, match="403 Client Error"): - prefect_saturn.PrefectCloudIntegration(prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME) - - failure_response = CURRENT_IMAGE_FAILURE_RESPONSE(504) - failure_response["method_or_response"] = failure_response.pop("method") - responses.replace(**failure_response) - with raises(HTTPError, match="504 Server Error"): - prefect_saturn.PrefectCloudIntegration(prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME) +def test_initialize_does_not_add_double_slash_to_base_url(monkeypatch): + monkeypatch.setenv("BASE_URL", "http://abc/") + integration = prefect_saturn.PrefectCloudIntegration("x") + assert integration._base_url == "http://abc/" def test_initialize_raises_error_on_missing_saturn_token(monkeypatch): @@ -183,9 +151,19 @@ def test_initialize_raises_error_on_missing_base_url(monkeypatch): prefect_saturn.PrefectCloudIntegration(prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME) -@responses.activate +def test_initialize_raises_error_on_accessing_properties_if_not_registered(): + integration = prefect_saturn.PrefectCloudIntegration( + prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME + ) + with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): + integration.flow_id + with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): + integration.flow_version_id + with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): + integration.image + + def test_hash_flow(): - responses.add(**CURRENT_IMAGE_RESPONSE) flow = TEST_FLOW.copy() @@ -202,7 +180,12 @@ def test_hash_flow(): assert flow_hash == flow_hash_again # should not be impacted by storage - flow.storage = Docker() + flow.storage = Webhook( + build_request_kwargs={}, + build_request_http_method="POST", + get_flow_request_kwargs={}, + get_flow_request_http_method="GET", + ) assert flow_hash == integration._hash_flow(flow) # should not be impacted by environment @@ -234,9 +217,7 @@ def goodbye_task(): assert new_flow_hash != previous_flow_hash -@responses.activate def test_hash_flow_hash_changes_if_tenant_id_changes(): - responses.add(**CURRENT_IMAGE_RESPONSE) flow = TEST_FLOW.copy() @@ -259,89 +240,74 @@ def __init__(self): @responses.activate -def test_register_flow_with_saturn(): +def test_get_storage(): with patch("prefect_saturn.core.Client", new=MockClient): - test_flow_id = random.randint(1, 500) - responses.add(**CURRENT_IMAGE_RESPONSE) - responses.add(**REGISTER_FLOW_RESPONSE(flow_id=test_flow_id)) + responses.add(**REGISTER_FLOW_RESPONSE()) + responses.add(**BUILD_STORAGE_RESPONSE()) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200)) - # Set up integration integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) + flow = TEST_FLOW.copy() + integration.register_flow_with_saturn(flow=flow) - assert integration._saturn_flow_id is None - integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) - assert integration._saturn_flow_id == test_flow_id - + storage = integration._get_storage() + assert isinstance(storage, Webhook) + assert storage.stored_as_script is False + assert storage.build_request_http_method == "POST" + assert storage.get_flow_request_http_method == "GET" + assert storage.build_request_kwargs["headers"]["Content-Type"] == "application/octet-stream" + assert storage.get_flow_request_kwargs["headers"]["Accept"] == "application/octet-stream" -@responses.activate -def test_register_flow_with_saturn_raises_error_on_failure(): - with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) - integration = prefect_saturn.PrefectCloudIntegration( - prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME - ) - - responses.add(**REGISTER_FLOW_FAILURE_RESPONSE(500)) - with raises(HTTPError, match="500 Server Error"): - integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) - - failure_response = REGISTER_FLOW_FAILURE_RESPONSE(401) - failure_response["method_or_response"] = failure_response.pop("method") - responses.replace(**failure_response) - with raises(HTTPError, match="401 Client Error"): - integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) +def test_get_storage_fails_if_flow_not_registerd(): + integration = prefect_saturn.PrefectCloudIntegration( + prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME + ) + with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): + integration._get_storage() @responses.activate -def test_get_storage_details(): +def test_store_flow_fails_if_flow_not_registered(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) responses.add(**REGISTER_FLOW_RESPONSE()) + responses.add(**BUILD_STORAGE_FAILURE_RESPONSE(404)) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200)) - test_token = str(uuid.uuid4()) - test_registry = "8987.ecr.aws" - details = deepcopy(STORAGE_DETAILS_RESPONSE) - details["json"].update({"registry_url": test_registry, "deployment_token": test_token}) - responses.add(**details) - - # Set up integration integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) - integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) - - # response with details for building storage - storage_details = integration.storage_details - assert isinstance(storage_details, dict) - assert integration._storage_details["image_name"] == TEST_IMAGE - assert integration._storage_details["registry_url"] == test_registry + flow = TEST_FLOW.copy() + flow = integration.register_flow_with_saturn(flow=flow) + with raises(HTTPError, match="Not Found for url"): + flow.storage.add_flow(flow) + flow.storage.build() @responses.activate -def test_get_storage_details_raises_error_on_failure(): +def test_store_flow_fails_if_validation_fails(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) responses.add(**REGISTER_FLOW_RESPONSE()) - responses.add(**STORAGE_DETAILS_FAILURE_RESPONSE) + responses.add(**BUILD_STORAGE_FAILURE_RESPONSE(400)) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200)) - # Set up integration integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) - integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) - with raises(HTTPError, match="404 Client Error"): - integration.storage_details + flow = TEST_FLOW.copy() + flow = integration.register_flow_with_saturn(flow=flow) + with raises(HTTPError, match="Client Error"): + flow.storage.add_flow(flow) + flow.storage.build() @responses.activate -def test_build_environment(): +def test_get_environment(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) responses.add(**REGISTER_FLOW_RESPONSE()) - responses.add(**STORAGE_DETAILS_RESPONSE) + responses.add(**BUILD_STORAGE_RESPONSE()) responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200)) integration = prefect_saturn.PrefectCloudIntegration( @@ -350,23 +316,23 @@ def test_build_environment(): flow = TEST_FLOW.copy() integration.register_flow_with_saturn(flow=flow) - flow = integration.add_environment(flow=flow) - assert isinstance(flow.environment, KubernetesJobEnvironment) - assert isinstance(flow.environment.executor, DaskExecutor) - assert flow.environment.unique_job_name is True - env_args = flow.environment._job_spec["spec"]["template"]["spec"]["containers"][0]["args"] + environment = integration._get_environment() + assert isinstance(environment, KubernetesJobEnvironment) + assert environment.unique_job_name is True + env_args = environment._job_spec["spec"]["template"]["spec"]["containers"][0]["args"] assert len(env_args) == 1 assert env_args[0].startswith("source /home/jovyan/.saturn/start.sh;") - env_cmd = flow.environment._job_spec["spec"]["template"]["spec"]["containers"][0]["command"] + env_cmd = environment._job_spec["spec"]["template"]["spec"]["containers"][0]["command"] assert env_cmd == ["/bin/bash", "-ec"] + assert environment.metadata["image"] == integration._saturn_image @responses.activate -def test_add_storage(): +def test_get_environment_dask_kwargs(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) responses.add(**REGISTER_FLOW_RESPONSE()) - responses.add(**STORAGE_DETAILS_RESPONSE) + responses.add(**BUILD_STORAGE_RESPONSE()) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200)) integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME @@ -374,99 +340,83 @@ def test_add_storage(): flow = TEST_FLOW.copy() integration.register_flow_with_saturn(flow=flow) - assert flow.storage is None - flow = integration.add_storage(flow=flow) - assert isinstance(flow.storage, Docker) - assert flow.storage.base_image == TEST_IMAGE - assert flow.storage.image_name == integration.storage_details["image_name"] - assert flow.storage.registry_url == TEST_REGISTRY_URL - assert flow.storage.prefect_directory == "/tmp" - assert "kubernetes" in flow.storage.python_dependencies - assert "BASE_URL" in flow.storage.env_vars.keys() - assert "SATURN_TOKEN" in flow.storage.env_vars.keys() - - # healthchecks have to be ignored because otherwise we'd have - # to figure out how to run the Saturn start script inside - # the build process - assert flow.storage.ignore_healthchecks is True - - -@responses.activate -def test_add_storage_fails_if_flow_not_registerd(): - with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) - integration = prefect_saturn.PrefectCloudIntegration( - prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME + environment = integration._get_environment( + cluster_kwargs={"n_workers": 8}, adapt_kwargs={"minimum": 3, "maximum": 3} ) - flow = TEST_FLOW.copy() - with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): - integration.add_storage(flow=flow) + assert isinstance(environment, KubernetesJobEnvironment) + assert isinstance(environment.executor, DaskExecutor) + assert environment.executor.cluster_kwargs == {"n_workers": 8} + assert environment.executor.adapt_kwargs == {"minimum": 3, "maximum": 3} -@responses.activate -def test_build_storage(): - with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) - responses.add(**REGISTER_FLOW_RESPONSE()) - responses.add(**STORAGE_DETAILS_RESPONSE) - responses.add(**BUILD_STORAGE_RESPONSE) - - integration = prefect_saturn.PrefectCloudIntegration( - prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME - ) - flow = TEST_FLOW.copy() - integration.register_flow_with_saturn(flow=flow) - flow = integration.add_storage(flow=flow) - res = integration.build_storage(flow) - assert res.status_code == 201 +def test_get_environment_fails_if_flow_not_registered(): + integration = prefect_saturn.PrefectCloudIntegration( + prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME + ) + with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): + integration._get_environment() @responses.activate -def test_build_storage_fails_if_flow_not_registered(): +def test_add_environment_fails_if_id_not_recognized(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) - responses.add(**REGISTER_FLOW_RESPONSE()) - responses.add(**STORAGE_DETAILS_RESPONSE) - responses.add(**BUILD_STORAGE_RESPONSE) + responses.add(**REGISTER_FLOW_RESPONSE(flow_id=45)) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(404, flow_id=45)) integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) flow = TEST_FLOW.copy() - integration.register_flow_with_saturn(flow=flow) - flow = integration.add_storage(flow=flow) + integration._set_flow_metadata(flow=flow) - integration._saturn_flow_id = None - with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): - integration.build_storage(flow) + with raises(HTTPError, match="404 Client Error"): + integration._get_environment() @responses.activate -def test_add_environment_fails_if_flow_not_registered(): +def test_register_flow_with_saturn_raises_error_on_failure(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) - flow = TEST_FLOW.copy() - with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED): - integration.add_environment(flow=flow) + responses.add(**REGISTER_FLOW_FAILURE_RESPONSE(500)) + with raises(HTTPError, match="500 Server Error"): + integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) + + failure_response = REGISTER_FLOW_FAILURE_RESPONSE(401) + failure_response["method_or_response"] = failure_response.pop("method") + responses.replace(**failure_response) + with raises(HTTPError, match="401 Client Error"): + integration.register_flow_with_saturn(flow=TEST_FLOW.copy()) @responses.activate -def test_add_environment_fails_if_id_not_recognized(): +def test_register_flow_with_saturn_does_everything(): with patch("prefect_saturn.core.Client", new=MockClient): - responses.add(**CURRENT_IMAGE_RESPONSE) - responses.add(**REGISTER_FLOW_RESPONSE(45)) - responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(404, flow_id=45)) + test_flow_id = str(random.randint(1, 500)) + responses.add(**REGISTER_FLOW_RESPONSE(flow_id=test_flow_id)) + responses.add(**BUILD_STORAGE_RESPONSE(flow_id=test_flow_id)) + responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200, flow_id=test_flow_id)) integration = prefect_saturn.PrefectCloudIntegration( prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME ) + + # no details are stored just on initialization + assert integration._saturn_flow_id is None + assert integration._saturn_flow_version_id is None + assert integration._saturn_image is None + flow = TEST_FLOW.copy() - integration.register_flow_with_saturn(flow=flow) + assert flow.storage is None + assert isinstance(flow.environment, LocalEnvironment) - with raises(HTTPError, match="404 Client Error"): - integration.add_environment(flow=flow) + flow = integration.register_flow_with_saturn(flow=flow) + assert integration._saturn_flow_id == test_flow_id + assert integration._saturn_flow_version_id == TEST_FLOW_VERSION_ID + assert integration._saturn_image == TEST_IMAGE + assert isinstance(flow.storage, Webhook) + assert isinstance(flow.environment, KubernetesJobEnvironment) + assert isinstance(flow.environment.executor, DaskExecutor)