Skip to content

Commit

Permalink
Switch from Docker storage to Webhook storage (#14)
Browse files Browse the repository at this point in the history
* more fixes to tests and docs

* remove _store_flow()

* change URL scheme
  • Loading branch information
jameslamb authored Aug 13, 2020
1 parent 4008118 commit ab04d98
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 307 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,34 @@

## Getting Started

```python
import prefect
from prefect import Flow, task
from prefect_saturn import PrefectCloudIntegration


@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("hello prefect-saturn")


flow = Flow("sample-flow", tasks=[hello_task])

project_name = "sample-project"
integration = PrefectCloudIntegration(
prefect_cloud_project_name=project_name
)
flow = integration.register_flow_with_saturn(flow)

flow.register(
project_name=project_name,
labels=["saturn-cloud"]
)
```

## Installation

`prefect-saturn` is available on PyPi.

```shell
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.2
0.1.0
226 changes: 130 additions & 96 deletions prefect_saturn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@

import hashlib
import os
import uuid

from typing import Any, Dict, Optional
from requests import Session
from requests.adapters import HTTPAdapter
from requests.models import Response
from requests.packages.urllib3.util.retry import Retry

import cloudpickle
from prefect import Flow
from prefect.client import Client
from prefect.engine.executors import DaskExecutor
from prefect.environments.storage import Docker
from prefect.environments.storage import Webhook
from prefect.environments import KubernetesJobEnvironment
import yaml

Expand All @@ -29,18 +27,28 @@ class PrefectCloudIntegration:
.. code-block:: python
integration = PrefectCloudIntegration(PROJECT_NAME)
integration.register_flow_with_saturn(flow)
flow = integration.add_environment(flow)
flow = integration.add_storage(flow)
flow.storage.add_flow(flow)
integration.build_storage(flow)
import prefect
from prefect import Flow, task
from prefect_saturn import PrefectCloudIntegration
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("hello prefect-saturn")
flow = Flow("sample-flow", tasks=[hello_task])
project_name = "sample-project"
integration = PrefectCloudIntegration(
prefect_cloud_project_name=project_name
)
flow = integration.register_flow_with_saturn(flow)
flow.register(
project_name=PROJECT_NAME,
build=False,
labels=[
"saturn-cloud"
]
project_name=project_name,
labels=["saturn-cloud"]
)
"""

Expand All @@ -59,7 +67,9 @@ def __init__(self, prefect_cloud_project_name: str):
raise RuntimeError(Errors.missing_env_var("BASE_URL"))

self.prefect_cloud_project_name: str = prefect_cloud_project_name
self._saturn_flow_id: Optional[int] = None
self._saturn_flow_id: Optional[str] = None
self._saturn_flow_version_id: Optional[str] = None
self._saturn_image: Optional[str] = None

# set up logic for authenticating with Saturn back-end service
retry_logic = HTTPAdapter(max_retries=Retry(total=3))
Expand All @@ -68,15 +78,6 @@ def __init__(self, prefect_cloud_project_name: str):
self._session.mount("https://", retry_logic)
self._session.headers.update({"Authorization": f"token {SATURN_TOKEN}"})

# figure out the image this notebook is running in
res = self._session.get(
url=f"{self._base_url}api/current_image", headers={"Content-Type": "application/json"}
)
res.raise_for_status()
self._saturn_base_image = res.json()["image"]

self._storage_details: Optional[Dict[str, Any]] = None

def _hash_flow(self, flow: Flow) -> str:
"""
In Prefect Cloud, all versions of a flow in a project are tied together
Expand Down Expand Up @@ -105,25 +106,34 @@ def _hash_flow(self, flow: Flow) -> str:
hasher.update(cloudpickle.dumps(identifying_content))
return hasher.hexdigest()

def register_flow_with_saturn(self, flow: Flow) -> bool:
def _set_flow_metadata(self, flow: Flow) -> None:
"""
Given a Flow, register it with Saturn. This method has
the following side effects:
the following side effects.
The first time you run it:
The first time you run it for a flow:
* create a Deployment in Saturn's database for this flow
* create a record of the flow in Saturn's database
* store a hash of the flow in the database
* store a commit hash for the current state of /home/jovyan/project
* creates a Deployment in Saturn's database for this flow
* creates a record of the flow in Saturn's database
* stores a hash of the flow in the database
* stores a commit hash for the current state of the git repo in
/home/jovyan/project
* sets ``self._saturn_flow_id`` to an identifier for the flow in Saturn
* sets ``self._saturn_flow_version_id`` to an identifier for this flow
version
* sets ``self._saturn_image`` to the name of a Docker image the flow
will run in
Any time you run it
If you run this method for a flow that already exists in Saturn:
* update the flow's hash in the database
* update the /home/jovyan/project commit hash stored in the DB
* update the deployment's image to the current image for the userproject
* updates the /home/jovyan/project commit hash stored in the DB
* updates the deployment's image to the current image for the project
it belongs to
* sets ``self._saturn_flow_id`` on this instance
* updates ``self._saturn_flow_version_id`` to a new value. Just like
``flow.register()`` from ``prefect``, each call of this method
generates a new flow version.
* updates ``self._saturn_image`` in case the image for the flow
has changed
"""
res = self._session.put(
url=f"{self._base_url}api/prefect_cloud/flows",
Expand All @@ -136,91 +146,115 @@ def register_flow_with_saturn(self, flow: Flow) -> bool:
)
res.raise_for_status()
response_json = res.json()
self._saturn_flow_id = response_json["id"]
return True
self._saturn_flow_id = str(response_json["id"])
self._saturn_flow_version_id = response_json["flow_version_id"]
self._saturn_image = response_json["image"]

@property
def storage_details(self) -> Dict[str, Any]:
def flow_id(self) -> str:
"""
Information from Atlas used to build flow storage.
This method can only be run for flows which have already been registered
with ``register_flow_with_saturn()``.
updates ``self._storage_details`` with the following Saturn-specific details for this flow:
- registry_url: the container registry to push flow storage too, since
Saturn currently uses ``Docker`` storage from Prefect
- image_name: name for the Docker image that will store the flow
Saturn identifier for a flow. This uniquely identifies a flow
within a Saturn instance.
"""
if self._saturn_flow_id is None:
raise RuntimeError(Errors.NOT_REGISTERED)
return self._saturn_flow_id

res = self._session.get(
url=f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/storage_details",
headers={"Content-Type": "application/json"},
)
res.raise_for_status()
response_json = res.json()
self._storage_details = response_json
return response_json
@property
def flow_version_id(self) -> str:
"""
Unique identifier for one version of a flow. This is randomly
generated by Saturn and is not an identifier known to
Prefect Cloud. This is used to ensure that each flow run
pulls the correct flow version from storage.
"""
if self._saturn_flow_version_id is None:
raise RuntimeError(Errors.NOT_REGISTERED)
return self._saturn_flow_version_id

def add_storage(self, flow: Flow) -> Flow:
@property
def image(self) -> str:
"""
Docker image that flows will run in. This is the same as
the image used in the Saturn project where the flow was
created.
"""
Create a Docker Storage object with Saturn-y details and set
it on `flow.storage`.
if self._saturn_image is None:
raise RuntimeError(Errors.NOT_REGISTERED)
return self._saturn_image

def register_flow_with_saturn(
self, flow: Flow, dask_cluster_kwargs=None, dask_adapt_kwargs=None
) -> Flow:
"""
Given a flow, set up all the details needed to run it on
a Saturn Dask cluster.
This method modifies the following components of ``Flow`` objects
passed to it.
This method sets the `image_tag` to a random string to avoid conflicts.
The image name is generated in Saturn.
* ``.storage``: a ``Webhook`` storage instance is added
* ``.environment``: a ``KubernetesJobEnvironment`` with a ``DaskExecutor``
is added. This environment will use the same image as the notebook
from which this code is run.
"""
storage_details = self.storage_details
image_tag = str(uuid.uuid4())
# NOTE: SATURN_TOKEN and BASE_URL have to be set to be able
# to load the flow. Those variables will be overridden by
# Kubernetes in all the places where it matters, and values
# set in the image are overridden by those from kubernetes
# https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container
flow.storage = Docker(
base_image=self._saturn_base_image,
image_name=storage_details["image_name"],
image_tag=image_tag,
registry_url=storage_details["registry_url"],
prefect_directory="/tmp",
env_vars={"SATURN_TOKEN": "placeholder-token", "BASE_URL": "placeholder-url"},
python_dependencies=["kubernetes"],
ignore_healthchecks=True,
self._set_flow_metadata(flow)

storage = self._get_storage()
flow.storage = storage

environment = self._get_environment(
cluster_kwargs=dask_cluster_kwargs, adapt_kwargs=dask_adapt_kwargs
)
flow.environment = environment

return flow

def build_storage(self, flow: Flow) -> Response:
def _get_storage(self) -> Webhook:
"""
Actually build and push the storage
Create a `Webhook` storage object with Saturn-y details.
"""
if self._saturn_flow_id is None:
raise RuntimeError(Errors.NOT_REGISTERED)
res = self._session.post(
url=f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/store",
headers={"Content-Type": "application/octet-stream"},
data=cloudpickle.dumps(flow),
url = (
"${BASE_URL}api/prefect_cloud/flows/"
+ self.flow_id
+ "/"
+ self.flow_version_id
+ "/content"
)
res.raise_for_status()
return res
storage = Webhook(
build_request_kwargs={
"url": url,
"headers": {
"Content-Type": "application/octet-stream",
"Authorization": "token ${SATURN_TOKEN}",
},
},
build_request_http_method="POST",
get_flow_request_kwargs={
"url": url,
"headers": {
"Accept": "application/octet-stream",
"Authorization": "token ${SATURN_TOKEN}",
},
},
get_flow_request_http_method="GET",
)

return storage

def add_environment(
def _get_environment(
self,
flow: Flow,
cluster_kwargs: Optional[Dict[str, Any]] = None,
adapt_kwargs: Optional[Dict[str, Any]] = None,
) -> Flow:
) -> KubernetesJobEnvironment:
"""
Get an environment that customizes the execution of a Prefect flow run.
"""
cluster_kwargs = cluster_kwargs or {"n_workers": 1}
adapt_kwargs = adapt_kwargs or {"minimum": 1, "maximum": 2}

if self._saturn_flow_id is None:
raise RuntimeError(Errors.NOT_REGISTERED)

# get job spec with Saturn details from Atlas
url = f"{self._base_url}api/prefect_cloud/flows/{self._saturn_flow_id}/run_job_spec"
url = f"{self._base_url}api/prefect_cloud/flows/{self.flow_id}/run_job_spec"
response = self._session.get(url=url)
response.raise_for_status()
job_dict = response.json()
Expand All @@ -229,8 +263,9 @@ def add_environment(
with open(local_tmp_file, "w") as f:
f.write(yaml.dump(job_dict))

# saturn_flow_id is used by Saturn's custom Prefect agent
k8s_environment = KubernetesJobEnvironment(
metadata={"saturn_flow_id": self._saturn_flow_id},
metadata={"saturn_flow_id": self.flow_id, "image": self.image},
executor=DaskExecutor(
cluster_class="dask_saturn.SaturnCluster",
cluster_kwargs=cluster_kwargs,
Expand All @@ -253,5 +288,4 @@ def add_environment(
new_args = f"source /home/jovyan/.saturn/start.sh; {args_from_prefect}"
k8s_environment._job_spec["spec"]["template"]["spec"]["containers"][0]["args"] = [new_args]

flow.environment = k8s_environment
return flow
return k8s_environment
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
with open("VERSION", "r") as f:
version = f.read().strip()

install_requires = ["cloudpickle", "dask-saturn>=0.0.4", "prefect", "requests"]
# prefect has to be 0.13.0 or newer to get Webhook storage
install_requires = ["cloudpickle", "dask-saturn>=0.0.4", "prefect>0.13.0", "requests"]
testing_deps = ["pytest", "pytest-cov", "responses"]

setup(
Expand Down
Loading

0 comments on commit ab04d98

Please sign in to comment.