From 1c230c211814a1f5e2cb8414597a71526265a2c6 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Wed, 24 Apr 2024 12:20:32 -0400 Subject: [PATCH] Update to latest dask to mitigate smoke test failures (#286) * Constrain client in unit tests * Update to latest dask, and mitigate scatter flakiness. --- pyproject.toml | 3 +-- src/hipscat_import/catalog/run_import.py | 4 ++-- tests/conftest.py | 2 +- tests/hipscat_import/catalog/test_run_round_trip.py | 2 +- tests/hipscat_import/conftest.py | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c7fc02ae..b00379e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,8 +15,7 @@ classifiers = [ ] dynamic = ["version"] dependencies = [ - "dask<=2024.2.0", - "dask[distributed]", + "dask[complete]>=2024.3.0", # Includes dask expressions. "deprecated", "healpy", "hipscat >= 0.2.9", diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index d1fcb8aa..bfe60bde 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -23,7 +23,7 @@ def _map_pixels(args, client): if args.resume_plan.is_mapping_done(): return - reader_future = client.scatter(args.file_reader) + reader_future = client.scatter(args.file_reader, hash=False) futures = [] for key, file_path in args.resume_plan.map_files: futures.append( @@ -48,7 +48,7 @@ def _split_pixels(args, alignment_future, client): if args.resume_plan.is_splitting_done(): return - reader_future = client.scatter(args.file_reader) + reader_future = client.scatter(args.file_reader, hash=False) futures = [] for key, file_path in args.resume_plan.split_keys: futures.append( diff --git a/tests/conftest.py b/tests/conftest.py index 05109d36..7666200f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,7 +31,7 @@ def dask_client(use_ray): disable_dask_on_ray() else: - client = Client() + client = Client(n_workers=1, threads_per_worker=1) yield client client.close() diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 82621e2e..80c5ff41 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -58,7 +58,7 @@ def test_import_source_table( assert len(catalog.get_healpix_pixels()) == 14 -@pytest.mark.dask(timeout=10) +@pytest.mark.dask def test_import_mixed_schema_csv( dask_client, mixed_schema_csv_dir, diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index d36eaa66..303144bc 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -32,7 +32,7 @@ def test_long_running(): for item in items: timeout = None for mark in item.iter_markers(name="dask"): - timeout = 5 + timeout = 10 if "timeout" in mark.kwargs: timeout = int(mark.kwargs.get("timeout")) if "skip_ray" in mark.kwargs and use_ray: