From 2fd3b464a457d8f8b8505735545aa9f3b4f03e28 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 8 Feb 2024 13:31:08 -0800 Subject: [PATCH] switch to clientless tests --- tests/tape_tests/conftest.py | 100 ++++++++++---------- tests/tape_tests/test_ensemble.py | 20 ++-- tests/tape_tests/test_ensemble_frame.py | 3 +- tests/tape_tests/test_feature_extraction.py | 8 +- 4 files changed, 62 insertions(+), 69 deletions(-) diff --git a/tests/tape_tests/conftest.py b/tests/tape_tests/conftest.py index 73430784..3f1f0bb7 100644 --- a/tests/tape_tests/conftest.py +++ b/tests/tape_tests/conftest.py @@ -56,9 +56,9 @@ def create_test_object_table(npartitions=1): "create_test_source_table, create_test_column_mapper", [("create_test_source_table", "create_test_column_mapper")], ) -def read_dask_dataframe_ensemble(dask_client, create_test_source_table, create_test_column_mapper): +def read_dask_dataframe_ensemble(create_test_source_table, create_test_column_mapper): return tape.read_dask_dataframe( - dask_client=dask_client, + dask_client=False, source_frame=create_test_source_table, column_mapper=create_test_column_mapper, ) @@ -71,12 +71,12 @@ def read_dask_dataframe_ensemble(dask_client, create_test_source_table, create_t [("create_test_source_table", "create_test_object_table", "create_test_column_mapper")], ) def read_dask_dataframe_with_object_ensemble( - dask_client, create_test_source_table, create_test_object_table, create_test_column_mapper + create_test_source_table, create_test_object_table, create_test_column_mapper ): return tape.read_dask_dataframe( source_frame=create_test_source_table, object_frame=create_test_object_table, - dask_client=dask_client, + dask_client=False, column_mapper=create_test_column_mapper, ) @@ -86,11 +86,11 @@ def read_dask_dataframe_with_object_ensemble( @pytest.mark.parametrize( "create_test_rows, create_test_column_mapper", [("create_test_rows", "create_test_column_mapper")] ) -def read_pandas_ensemble(dask_client, create_test_rows, create_test_column_mapper): +def read_pandas_ensemble(create_test_rows, create_test_column_mapper): return tape.read_pandas_dataframe( source_frame=pd.DataFrame(create_test_rows), column_mapper=create_test_column_mapper, - dask_client=dask_client, + dask_client=False, npartitions=1, ) @@ -100,7 +100,7 @@ def read_pandas_ensemble(dask_client, create_test_rows, create_test_column_mappe @pytest.mark.parametrize( "create_test_rows, create_test_column_mapper", [("create_test_rows", "create_test_column_mapper")] ) -def read_pandas_with_object_ensemble(dask_client, create_test_rows, create_test_column_mapper): +def read_pandas_with_object_ensemble(create_test_rows, create_test_column_mapper): n_obj = 5 id = 8000 + np.arange(n_obj) name = id.astype(str) @@ -108,7 +108,7 @@ def read_pandas_with_object_ensemble(dask_client, create_test_rows, create_test_ """Create an Ensemble from pandas dataframes.""" return tape.read_pandas_dataframe( - dask_client=dask_client, + dask_client=False, source_frame=pd.DataFrame(create_test_rows), object_frame=object_table, column_mapper=create_test_column_mapper, @@ -118,12 +118,12 @@ def read_pandas_with_object_ensemble(dask_client, create_test_rows, create_test_ # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble_without_client(): - """Create an Ensemble from parquet data without a dask client.""" +def read_parquet_ensemble_with_client(dask_client): + """Create an Ensemble from parquet data with a dask client.""" return tape.read_parquet( source_file="tests/tape_tests/data/source/test_source.parquet", object_file="tests/tape_tests/data/object/test_object.parquet", - dask_client=False, + dask_client=dask_client, id_col="ps1_objid", time_col="midPointTai", band_col="filterName", @@ -134,12 +134,12 @@ def read_parquet_ensemble_without_client(): # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble(dask_client): +def read_parquet_ensemble(): """Create an Ensemble from parquet data.""" return tape.read_parquet( source_file="tests/tape_tests/data/source/test_source.parquet", object_file="tests/tape_tests/data/object/test_object.parquet", - dask_client=dask_client, + dask_client=False, id_col="ps1_objid", time_col="midPointTai", band_col="filterName", @@ -150,11 +150,11 @@ def read_parquet_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble_from_source(dask_client): +def read_parquet_ensemble_from_source(): """Create an Ensemble from parquet data, with object file withheld.""" return tape.read_parquet( source_file="tests/tape_tests/data/source/test_source.parquet", - dask_client=dask_client, + dask_client=False, id_col="ps1_objid", time_col="midPointTai", band_col="filterName", @@ -165,7 +165,7 @@ def read_parquet_ensemble_from_source(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble_with_column_mapper(dask_client): +def read_parquet_ensemble_with_column_mapper(): """Create an Ensemble from parquet data, with object file withheld.""" colmap = ColumnMapper().assign( id_col="ps1_objid", @@ -178,26 +178,26 @@ def read_parquet_ensemble_with_column_mapper(dask_client): return tape.read_parquet( source_file="tests/tape_tests/data/source/test_source.parquet", column_mapper=colmap, - dask_client=dask_client, + dask_client=False, ) # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble_with_known_column_mapper(dask_client): +def read_parquet_ensemble_with_known_column_mapper(): """Create an Ensemble from parquet data, with object file withheld.""" colmap = ColumnMapper().use_known_map("ZTF") return tape.read_parquet( source_file="tests/tape_tests/data/source/test_source.parquet", column_mapper=colmap, - dask_client=dask_client, + dask_client=False, ) # pylint: disable=redefined-outer-name @pytest.fixture -def read_parquet_ensemble_from_hipscat(dask_client): +def read_parquet_ensemble_from_hipscat(): """Create an Ensemble from a hipscat/hive-style directory.""" colmap = ColumnMapper( @@ -214,7 +214,7 @@ def read_parquet_ensemble_from_hipscat(dask_client): column_mapper=colmap, object_index="id", source_index="object_id", - dask_client=dask_client, + dask_client=False, ) @@ -228,9 +228,9 @@ def dask_client(): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_without_client(): +def parquet_ensemble_with_client(dask_client): """Create an Ensemble from parquet data without a dask client.""" - ens = Ensemble(client=False) + ens = Ensemble(client=dask_client) ens.from_parquet( "tests/tape_tests/data/source/test_source.parquet", "tests/tape_tests/data/object/test_object.parquet", @@ -245,9 +245,9 @@ def parquet_ensemble_without_client(): @pytest.fixture -def parquet_files_and_ensemble_without_client(): +def parquet_files_and_ensemble_with_client(dask_client): """Create an Ensemble from parquet data without a dask client.""" - ens = Ensemble(client=False) + ens = Ensemble(client=dask_client) source_file = "tests/tape_tests/data/source/test_source.parquet" object_file = "tests/tape_tests/data/object/test_object.parquet" colmap = ColumnMapper().assign( @@ -263,9 +263,9 @@ def parquet_files_and_ensemble_without_client(): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble(dask_client): +def parquet_ensemble(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) ens.from_parquet( "tests/tape_tests/data/source/test_source.parquet", "tests/tape_tests/data/object/test_object.parquet", @@ -281,9 +281,9 @@ def parquet_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_partition_size(dask_client): +def parquet_ensemble_partition_size(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) ens.from_parquet( "tests/tape_tests/data/source/test_source.parquet", "tests/tape_tests/data/object/test_object.parquet", @@ -300,9 +300,9 @@ def parquet_ensemble_partition_size(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_with_divisions(dask_client): +def parquet_ensemble_with_divisions(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) ens.from_parquet( "tests/tape_tests/data/source/test_source.parquet", "tests/tape_tests/data/object/test_object.parquet", @@ -319,9 +319,9 @@ def parquet_ensemble_with_divisions(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_from_source(dask_client): +def parquet_ensemble_from_source(): """Create an Ensemble from parquet data, with object file withheld.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) ens.from_parquet( "tests/tape_tests/data/source/test_source.parquet", id_col="ps1_objid", @@ -336,9 +336,9 @@ def parquet_ensemble_from_source(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_with_column_mapper(dask_client): +def parquet_ensemble_with_column_mapper(): """Create an Ensemble from parquet data, with object file withheld.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) colmap = ColumnMapper().assign( id_col="ps1_objid", @@ -357,9 +357,9 @@ def parquet_ensemble_with_column_mapper(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_with_known_column_mapper(dask_client): +def parquet_ensemble_with_known_column_mapper(): """Create an Ensemble from parquet data, with object file withheld.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) colmap = ColumnMapper().use_known_map("ZTF") ens.from_parquet( @@ -416,7 +416,7 @@ def ensemble_from_lsdb(): band_col="band", ) - ens = Ensemble(False) + ens = Ensemble(client=False) # We just avoid needing to invoke the ._ddf property from the catalogs ens.from_lsdb(joined_source_cat, object_cat, column_mapper=colmap) @@ -453,9 +453,9 @@ def read_ensemble_from_lsdb(): # pylint: disable=redefined-outer-name @pytest.fixture -def dask_dataframe_ensemble(dask_client): +def dask_dataframe_ensemble(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) num_points = 1000 all_bands = np.array(["r", "g", "b", "i"]) @@ -480,9 +480,9 @@ def dask_dataframe_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def dask_dataframe_ensemble_partition_size(dask_client): +def dask_dataframe_ensemble_partition_size(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) num_points = 1000 all_bands = np.array(["r", "g", "b", "i"]) @@ -508,9 +508,9 @@ def dask_dataframe_ensemble_partition_size(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def dask_dataframe_with_object_ensemble(dask_client): +def dask_dataframe_with_object_ensemble(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) n_obj = 5 id = 8000 + np.arange(n_obj) @@ -547,9 +547,9 @@ def dask_dataframe_with_object_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def pandas_ensemble(dask_client): +def pandas_ensemble(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) num_points = 1000 all_bands = np.array(["r", "g", "b", "i"]) @@ -575,9 +575,9 @@ def pandas_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def pandas_with_object_ensemble(dask_client): +def pandas_with_object_ensemble(): """Create an Ensemble from parquet data.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) n_obj = 5 id = 8000 + np.arange(n_obj) @@ -613,9 +613,9 @@ def pandas_with_object_ensemble(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def ensemble_from_source_dict(dask_client): +def ensemble_from_source_dict(): """Create an Ensemble from a source dict, returning the ensemble and the source dict.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) # Create some fake data with two IDs (8001, 8002), two bands ["g", "b"] # a few time steps, flux, and data for zero point calculations. diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 5fcb2900..cca1a5b4 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -47,18 +47,13 @@ def test_with_client(): [ "parquet_ensemble", "parquet_ensemble_with_divisions", - "parquet_ensemble_without_client", + "parquet_ensemble_with_client", "parquet_ensemble_from_source", "parquet_ensemble_with_column_mapper", "parquet_ensemble_with_known_column_mapper", "parquet_ensemble_partition_size", "read_parquet_ensemble", - "read_parquet_ensemble_without_client", - "read_parquet_ensemble_from_source", - "read_parquet_ensemble_with_column_mapper", - "read_parquet_ensemble_with_known_column_mapper", - "read_parquet_ensemble", - "read_parquet_ensemble_without_client", + "read_parquet_ensemble_with_client", "read_parquet_ensemble_from_source", "read_parquet_ensemble_with_column_mapper", "read_parquet_ensemble_with_known_column_mapper", @@ -192,7 +187,7 @@ def test_hipscat_constructors(data_fixture, request): "data_fixture", [ "parquet_ensemble", - "parquet_ensemble_without_client", + "parquet_ensemble_with_client", ], ) def test_update_ensemble(data_fixture, request): @@ -261,7 +256,7 @@ def test_available_datasets(dask_client): @pytest.mark.parametrize( "data_fixture", [ - "parquet_files_and_ensemble_without_client", + "parquet_files_and_ensemble_with_client", ], ) def test_frame_tracking(data_fixture, request): @@ -1025,7 +1020,7 @@ def test_persist(dask_client): "data_fixture", [ "parquet_ensemble_with_divisions", - "parquet_ensemble_without_client", + "parquet_ensemble_with_client", ], ) def test_sample(data_fixture, request): @@ -1049,9 +1044,6 @@ def test_sample(data_fixture, request): assert len(ens.object) == prior_obj_len assert len(ens.source) == prior_src_len - if data_fixture == "parquet_ensemble_with_divisions": - ens.client.close() # sample_objects disables client cleanup, must do manually - def test_update_column_map(dask_client): """ @@ -1933,7 +1925,7 @@ def test_bin_sources_two_days(dask_client): [ "parquet_ensemble", "parquet_ensemble_with_divisions", - "parquet_ensemble_without_client", + "parquet_ensemble_with_client", ], ) @pytest.mark.parametrize("use_map", [True, False]) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 7c99ae7a..8375a34a 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -1,4 +1,5 @@ """ Test EnsembleFrame (inherited from Dask.DataFrame) creation and manipulations. """ + import numpy as np import pandas as pd from tape import ( @@ -228,7 +229,7 @@ def test_convert_flux_to_mag(data_fixture, request, err_col, zp_form, out_col_na @pytest.mark.parametrize( "data_fixture", [ - "parquet_files_and_ensemble_without_client", + "parquet_files_and_ensemble_with_client", ], ) def test_object_and_source_frame_propagation(data_fixture, request): diff --git a/tests/tape_tests/test_feature_extraction.py b/tests/tape_tests/test_feature_extraction.py index 73aaa3b5..9e8bb115 100644 --- a/tests/tape_tests/test_feature_extraction.py +++ b/tests/tape_tests/test_feature_extraction.py @@ -25,7 +25,7 @@ def test_stetsonk(): assert_array_equal(result.dtypes, np.float64) -def test_multiple_features_with_ensemble(dask_client): +def test_multiple_features_with_ensemble(): n = 5 object1 = { @@ -45,7 +45,7 @@ def test_multiple_features_with_ensemble(dask_client): rows = {column: np.concatenate([object1[column], object2[column]]) for column in object1} cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") - ens = Ensemble(client=dask_client).from_source_dict(rows, cmap) + ens = Ensemble(client=False).from_source_dict(rows, cmap) extractor = licu.Extractor(licu.AndersonDarlingNormal(), licu.InterPercentileRange(0.25), licu.StetsonK()) result = ens.batch( @@ -58,7 +58,7 @@ def test_multiple_features_with_ensemble(dask_client): assert_allclose(result, [[0.114875, 0.625, 0.848528]] * 2, atol=1e-5) -def test_otsu_with_ensemble_all_bands(dask_client): +def test_otsu_with_ensemble_all_bands(): n = 10 assert n % 2 == 0 @@ -79,7 +79,7 @@ def test_otsu_with_ensemble_all_bands(dask_client): rows = {column: np.concatenate([object1[column], object2[column]]) for column in object1} cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") - ens = Ensemble(client=dask_client).from_source_dict(rows, cmap) + ens = Ensemble(client=False).from_source_dict(rows, cmap) result = ens.batch( licu.OtsuSplit(),