Skip to content

Commit

Permalink
disable Dask adaptive scaling by default (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
jameslamb authored Oct 21, 2020
1 parent 1e0ee62 commit a1ab85c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 17 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.0
0.4.0
43 changes: 37 additions & 6 deletions prefect_saturn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,39 @@ def image(self) -> str:
return self._saturn_image

def register_flow_with_saturn(
self, flow: Flow, dask_cluster_kwargs=None, dask_adapt_kwargs=None
self,
flow: Flow,
dask_cluster_kwargs: Optional[Dict[str, Any]] = None,
dask_adapt_kwargs: Optional[Dict[str, Any]] = None,
) -> Flow:
"""
Given a flow, set up all the details needed to run it on
a Saturn Dask cluster.
:param flow: A Prefect ``Flow`` object
:param dask_cluster_kwargs: Dictionary of keyword arguments
to the ``prefect_saturn.SaturnCluster`` constructor. If ``None``
(the default), the cluster will be created with
one worker (``{"n_workers": 1}``).
:param dask_adapt_kwargs: Dictionary of keyword arguments
to pass to ``prefect_saturn.SaturnCluster.adapt()``. If
``None`` (the default), adaptive scaling will not be used.
Why adaptive scaling is off by default
--------------------------------------
Dasks's `adaptive scaling <https://docs.dask.org/en/latest/setup/adaptive.html>`_
can improve resource utilization by allowing Dask to spin things up
and down based on your workload.
This is off by default in the ``DaskExecutor`` created by ``prefect-saturn``
because in some cases, the interaction between Dask and Prefect can lead
adaptive scaling to make choices that interfere with the way Prefect executes
flows.
Prefect components
------------------
This method modifies the following components of ``Flow`` objects
passed to it.
Expand All @@ -198,13 +225,19 @@ def register_flow_with_saturn(
is added. This environment will use the same image as the notebook
from which this code is run.
"""
if dask_cluster_kwargs is None:
dask_cluster_kwargs = {"n_workers": 1}

if dask_adapt_kwargs is None:
dask_adapt_kwargs = {}

self._set_flow_metadata(flow)

storage = self._get_storage()
flow.storage = storage

environment = self._get_environment(
cluster_kwargs=dask_cluster_kwargs, adapt_kwargs=dask_adapt_kwargs
cluster_kwargs=dask_cluster_kwargs, adapt_kwargs=dask_adapt_kwargs # type: ignore
)
flow.environment = environment

Expand Down Expand Up @@ -244,14 +277,12 @@ def _get_storage(self) -> Webhook:

def _get_environment(
self,
cluster_kwargs: Optional[Dict[str, Any]] = None,
adapt_kwargs: Optional[Dict[str, Any]] = None,
cluster_kwargs: Dict[str, Any],
adapt_kwargs: Dict[str, Any],
) -> KubernetesJobEnvironment:
"""
Get an environment that customizes the execution of a Prefect flow run.
"""
cluster_kwargs = cluster_kwargs or {"n_workers": 1}
adapt_kwargs = adapt_kwargs or {"minimum": 1, "maximum": 2}

# get job spec with Saturn details from Atlas
url = f"{self._base_url}/api/prefect_cloud/flows/{self.flow_id}/run_job_spec"
Expand Down
61 changes: 51 additions & 10 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def test_get_environment():
flow = TEST_FLOW.copy()
integration.register_flow_with_saturn(flow=flow)

environment = integration._get_environment()
environment = integration._get_environment(cluster_kwargs={"n_workers": 3}, adapt_kwargs={})
assert isinstance(environment, KubernetesJobEnvironment)
assert environment.unique_job_name is True
env_args = environment._job_spec["spec"]["template"]["spec"]["containers"][0]["args"]
Expand All @@ -333,23 +333,64 @@ def test_get_environment_dask_kwargs():
prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME
)
flow = TEST_FLOW.copy()
integration.register_flow_with_saturn(flow=flow)
flow = integration.register_flow_with_saturn(
flow=flow,
dask_cluster_kwargs={"n_workers": 8},
dask_adapt_kwargs={"minimum": 3, "maximum": 3},
)

assert isinstance(flow.environment, KubernetesJobEnvironment)
assert isinstance(flow.environment.executor, DaskExecutor)
assert flow.environment.executor.cluster_kwargs == {"n_workers": 8}
assert flow.environment.executor.adapt_kwargs == {"minimum": 3, "maximum": 3}


@responses.activate
def test_get_environment_dask_adaptive_scaling_off_by_default():
with patch("prefect_saturn.core.Client", new=MockClient):
responses.add(**REGISTER_FLOW_RESPONSE())
responses.add(**BUILD_STORAGE_RESPONSE())
responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200))

environment = integration._get_environment(
cluster_kwargs={"n_workers": 8}, adapt_kwargs={"minimum": 3, "maximum": 3}
integration = prefect_saturn.PrefectCloudIntegration(
prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME
)
assert isinstance(environment, KubernetesJobEnvironment)
assert isinstance(environment.executor, DaskExecutor)
assert environment.executor.cluster_kwargs == {"n_workers": 8}
assert environment.executor.adapt_kwargs == {"minimum": 3, "maximum": 3}
flow = TEST_FLOW.copy()
flow = integration.register_flow_with_saturn(flow=flow)

assert isinstance(flow.environment, KubernetesJobEnvironment)
assert isinstance(flow.environment.executor, DaskExecutor)
assert flow.environment.executor.cluster_kwargs == {"n_workers": 1}
assert flow.environment.executor.adapt_kwargs == {}


@responses.activate
def test_get_environment_dask_kwargs_respects_empty_dict():
with patch("prefect_saturn.core.Client", new=MockClient):
responses.add(**REGISTER_FLOW_RESPONSE())
responses.add(**BUILD_STORAGE_RESPONSE())
responses.add(**REGISTER_RUN_JOB_SPEC_RESPONSE(200))

integration = prefect_saturn.PrefectCloudIntegration(
prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME,
)
flow = TEST_FLOW.copy()
flow = integration.register_flow_with_saturn(
flow=flow, dask_cluster_kwargs={}, dask_adapt_kwargs={}
)

assert isinstance(flow.environment, KubernetesJobEnvironment)
assert isinstance(flow.environment.executor, DaskExecutor)
assert flow.environment.executor.cluster_kwargs == {}
assert flow.environment.executor.adapt_kwargs == {}


def test_get_environment_fails_if_flow_not_registered():
integration = prefect_saturn.PrefectCloudIntegration(
prefect_cloud_project_name=TEST_PREFECT_PROJECT_NAME
)
with raises(RuntimeError, match=prefect_saturn.Errors.NOT_REGISTERED):
integration._get_environment()
integration._get_environment(cluster_kwargs={}, adapt_kwargs={})


@responses.activate
Expand All @@ -365,7 +406,7 @@ def test_add_environment_fails_if_id_not_recognized():
integration._set_flow_metadata(flow=flow)

with raises(HTTPError, match="404 Client Error"):
integration._get_environment()
integration._get_environment(cluster_kwargs={}, adapt_kwargs={})


@responses.activate
Expand Down

0 comments on commit a1ab85c

Please sign in to comment.