Skip to content

Commit

Permalink
Merge pull request #376 from lincc-frameworks/clientless_tests
Browse files Browse the repository at this point in the history
Switch to mostly clientless tests
  • Loading branch information
dougbrn authored Feb 8, 2024
2 parents d7b5d17 + 2fd3b46 commit db3ac53
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 69 deletions.
100 changes: 50 additions & 50 deletions tests/tape_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def create_test_object_table(npartitions=1):
"create_test_source_table, create_test_column_mapper",
[("create_test_source_table", "create_test_column_mapper")],
)
def read_dask_dataframe_ensemble(dask_client, create_test_source_table, create_test_column_mapper):
def read_dask_dataframe_ensemble(create_test_source_table, create_test_column_mapper):
return tape.read_dask_dataframe(
dask_client=dask_client,
dask_client=False,
source_frame=create_test_source_table,
column_mapper=create_test_column_mapper,
)
Expand All @@ -71,12 +71,12 @@ def read_dask_dataframe_ensemble(dask_client, create_test_source_table, create_t
[("create_test_source_table", "create_test_object_table", "create_test_column_mapper")],
)
def read_dask_dataframe_with_object_ensemble(
dask_client, create_test_source_table, create_test_object_table, create_test_column_mapper
create_test_source_table, create_test_object_table, create_test_column_mapper
):
return tape.read_dask_dataframe(
source_frame=create_test_source_table,
object_frame=create_test_object_table,
dask_client=dask_client,
dask_client=False,
column_mapper=create_test_column_mapper,
)

Expand All @@ -86,11 +86,11 @@ def read_dask_dataframe_with_object_ensemble(
@pytest.mark.parametrize(
"create_test_rows, create_test_column_mapper", [("create_test_rows", "create_test_column_mapper")]
)
def read_pandas_ensemble(dask_client, create_test_rows, create_test_column_mapper):
def read_pandas_ensemble(create_test_rows, create_test_column_mapper):
return tape.read_pandas_dataframe(
source_frame=pd.DataFrame(create_test_rows),
column_mapper=create_test_column_mapper,
dask_client=dask_client,
dask_client=False,
npartitions=1,
)

Expand All @@ -100,15 +100,15 @@ def read_pandas_ensemble(dask_client, create_test_rows, create_test_column_mappe
@pytest.mark.parametrize(
"create_test_rows, create_test_column_mapper", [("create_test_rows", "create_test_column_mapper")]
)
def read_pandas_with_object_ensemble(dask_client, create_test_rows, create_test_column_mapper):
def read_pandas_with_object_ensemble(create_test_rows, create_test_column_mapper):
n_obj = 5
id = 8000 + np.arange(n_obj)
name = id.astype(str)
object_table = pd.DataFrame(dict(id=id, name=name))

"""Create an Ensemble from pandas dataframes."""
return tape.read_pandas_dataframe(
dask_client=dask_client,
dask_client=False,
source_frame=pd.DataFrame(create_test_rows),
object_frame=object_table,
column_mapper=create_test_column_mapper,
Expand All @@ -118,12 +118,12 @@ def read_pandas_with_object_ensemble(dask_client, create_test_rows, create_test_

# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble_without_client():
"""Create an Ensemble from parquet data without a dask client."""
def read_parquet_ensemble_with_client(dask_client):
"""Create an Ensemble from parquet data with a dask client."""
return tape.read_parquet(
source_file="tests/tape_tests/data/source/test_source.parquet",
object_file="tests/tape_tests/data/object/test_object.parquet",
dask_client=False,
dask_client=dask_client,
id_col="ps1_objid",
time_col="midPointTai",
band_col="filterName",
Expand All @@ -134,12 +134,12 @@ def read_parquet_ensemble_without_client():

# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble(dask_client):
def read_parquet_ensemble():
"""Create an Ensemble from parquet data."""
return tape.read_parquet(
source_file="tests/tape_tests/data/source/test_source.parquet",
object_file="tests/tape_tests/data/object/test_object.parquet",
dask_client=dask_client,
dask_client=False,
id_col="ps1_objid",
time_col="midPointTai",
band_col="filterName",
Expand All @@ -150,11 +150,11 @@ def read_parquet_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble_from_source(dask_client):
def read_parquet_ensemble_from_source():
"""Create an Ensemble from parquet data, with object file withheld."""
return tape.read_parquet(
source_file="tests/tape_tests/data/source/test_source.parquet",
dask_client=dask_client,
dask_client=False,
id_col="ps1_objid",
time_col="midPointTai",
band_col="filterName",
Expand All @@ -165,7 +165,7 @@ def read_parquet_ensemble_from_source(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble_with_column_mapper(dask_client):
def read_parquet_ensemble_with_column_mapper():
"""Create an Ensemble from parquet data, with object file withheld."""
colmap = ColumnMapper().assign(
id_col="ps1_objid",
Expand All @@ -178,26 +178,26 @@ def read_parquet_ensemble_with_column_mapper(dask_client):
return tape.read_parquet(
source_file="tests/tape_tests/data/source/test_source.parquet",
column_mapper=colmap,
dask_client=dask_client,
dask_client=False,
)


# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble_with_known_column_mapper(dask_client):
def read_parquet_ensemble_with_known_column_mapper():
"""Create an Ensemble from parquet data, with object file withheld."""
colmap = ColumnMapper().use_known_map("ZTF")

return tape.read_parquet(
source_file="tests/tape_tests/data/source/test_source.parquet",
column_mapper=colmap,
dask_client=dask_client,
dask_client=False,
)


# pylint: disable=redefined-outer-name
@pytest.fixture
def read_parquet_ensemble_from_hipscat(dask_client):
def read_parquet_ensemble_from_hipscat():
"""Create an Ensemble from a hipscat/hive-style directory."""

colmap = ColumnMapper(
Expand All @@ -214,7 +214,7 @@ def read_parquet_ensemble_from_hipscat(dask_client):
column_mapper=colmap,
object_index="id",
source_index="object_id",
dask_client=dask_client,
dask_client=False,
)


Expand All @@ -228,9 +228,9 @@ def dask_client():

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_without_client():
def parquet_ensemble_with_client(dask_client):
"""Create an Ensemble from parquet data without a dask client."""
ens = Ensemble(client=False)
ens = Ensemble(client=dask_client)
ens.from_parquet(
"tests/tape_tests/data/source/test_source.parquet",
"tests/tape_tests/data/object/test_object.parquet",
Expand All @@ -245,9 +245,9 @@ def parquet_ensemble_without_client():


@pytest.fixture
def parquet_files_and_ensemble_without_client():
def parquet_files_and_ensemble_with_client(dask_client):
"""Create an Ensemble from parquet data without a dask client."""
ens = Ensemble(client=False)
ens = Ensemble(client=dask_client)
source_file = "tests/tape_tests/data/source/test_source.parquet"
object_file = "tests/tape_tests/data/object/test_object.parquet"
colmap = ColumnMapper().assign(
Expand All @@ -263,9 +263,9 @@ def parquet_files_and_ensemble_without_client():

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble(dask_client):
def parquet_ensemble():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)
ens.from_parquet(
"tests/tape_tests/data/source/test_source.parquet",
"tests/tape_tests/data/object/test_object.parquet",
Expand All @@ -281,9 +281,9 @@ def parquet_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_partition_size(dask_client):
def parquet_ensemble_partition_size():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)
ens.from_parquet(
"tests/tape_tests/data/source/test_source.parquet",
"tests/tape_tests/data/object/test_object.parquet",
Expand All @@ -300,9 +300,9 @@ def parquet_ensemble_partition_size(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_with_divisions(dask_client):
def parquet_ensemble_with_divisions():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)
ens.from_parquet(
"tests/tape_tests/data/source/test_source.parquet",
"tests/tape_tests/data/object/test_object.parquet",
Expand All @@ -319,9 +319,9 @@ def parquet_ensemble_with_divisions(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_from_source(dask_client):
def parquet_ensemble_from_source():
"""Create an Ensemble from parquet data, with object file withheld."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)
ens.from_parquet(
"tests/tape_tests/data/source/test_source.parquet",
id_col="ps1_objid",
Expand All @@ -336,9 +336,9 @@ def parquet_ensemble_from_source(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_with_column_mapper(dask_client):
def parquet_ensemble_with_column_mapper():
"""Create an Ensemble from parquet data, with object file withheld."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

colmap = ColumnMapper().assign(
id_col="ps1_objid",
Expand All @@ -357,9 +357,9 @@ def parquet_ensemble_with_column_mapper(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def parquet_ensemble_with_known_column_mapper(dask_client):
def parquet_ensemble_with_known_column_mapper():
"""Create an Ensemble from parquet data, with object file withheld."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

colmap = ColumnMapper().use_known_map("ZTF")
ens.from_parquet(
Expand Down Expand Up @@ -416,7 +416,7 @@ def ensemble_from_lsdb():
band_col="band",
)

ens = Ensemble(False)
ens = Ensemble(client=False)

# We just avoid needing to invoke the ._ddf property from the catalogs
ens.from_lsdb(joined_source_cat, object_cat, column_mapper=colmap)
Expand Down Expand Up @@ -453,9 +453,9 @@ def read_ensemble_from_lsdb():

# pylint: disable=redefined-outer-name
@pytest.fixture
def dask_dataframe_ensemble(dask_client):
def dask_dataframe_ensemble():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

num_points = 1000
all_bands = np.array(["r", "g", "b", "i"])
Expand All @@ -480,9 +480,9 @@ def dask_dataframe_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def dask_dataframe_ensemble_partition_size(dask_client):
def dask_dataframe_ensemble_partition_size():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

num_points = 1000
all_bands = np.array(["r", "g", "b", "i"])
Expand All @@ -508,9 +508,9 @@ def dask_dataframe_ensemble_partition_size(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def dask_dataframe_with_object_ensemble(dask_client):
def dask_dataframe_with_object_ensemble():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

n_obj = 5
id = 8000 + np.arange(n_obj)
Expand Down Expand Up @@ -547,9 +547,9 @@ def dask_dataframe_with_object_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def pandas_ensemble(dask_client):
def pandas_ensemble():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

num_points = 1000
all_bands = np.array(["r", "g", "b", "i"])
Expand All @@ -575,9 +575,9 @@ def pandas_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def pandas_with_object_ensemble(dask_client):
def pandas_with_object_ensemble():
"""Create an Ensemble from parquet data."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

n_obj = 5
id = 8000 + np.arange(n_obj)
Expand Down Expand Up @@ -613,9 +613,9 @@ def pandas_with_object_ensemble(dask_client):

# pylint: disable=redefined-outer-name
@pytest.fixture
def ensemble_from_source_dict(dask_client):
def ensemble_from_source_dict():
"""Create an Ensemble from a source dict, returning the ensemble and the source dict."""
ens = Ensemble(client=dask_client)
ens = Ensemble(client=False)

# Create some fake data with two IDs (8001, 8002), two bands ["g", "b"]
# a few time steps, flux, and data for zero point calculations.
Expand Down
20 changes: 6 additions & 14 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,13 @@ def test_with_client():
[
"parquet_ensemble",
"parquet_ensemble_with_divisions",
"parquet_ensemble_without_client",
"parquet_ensemble_with_client",
"parquet_ensemble_from_source",
"parquet_ensemble_with_column_mapper",
"parquet_ensemble_with_known_column_mapper",
"parquet_ensemble_partition_size",
"read_parquet_ensemble",
"read_parquet_ensemble_without_client",
"read_parquet_ensemble_from_source",
"read_parquet_ensemble_with_column_mapper",
"read_parquet_ensemble_with_known_column_mapper",
"read_parquet_ensemble",
"read_parquet_ensemble_without_client",
"read_parquet_ensemble_with_client",
"read_parquet_ensemble_from_source",
"read_parquet_ensemble_with_column_mapper",
"read_parquet_ensemble_with_known_column_mapper",
Expand Down Expand Up @@ -192,7 +187,7 @@ def test_hipscat_constructors(data_fixture, request):
"data_fixture",
[
"parquet_ensemble",
"parquet_ensemble_without_client",
"parquet_ensemble_with_client",
],
)
def test_update_ensemble(data_fixture, request):
Expand Down Expand Up @@ -261,7 +256,7 @@ def test_available_datasets(dask_client):
@pytest.mark.parametrize(
"data_fixture",
[
"parquet_files_and_ensemble_without_client",
"parquet_files_and_ensemble_with_client",
],
)
def test_frame_tracking(data_fixture, request):
Expand Down Expand Up @@ -1025,7 +1020,7 @@ def test_persist(dask_client):
"data_fixture",
[
"parquet_ensemble_with_divisions",
"parquet_ensemble_without_client",
"parquet_ensemble_with_client",
],
)
def test_sample(data_fixture, request):
Expand All @@ -1049,9 +1044,6 @@ def test_sample(data_fixture, request):
assert len(ens.object) == prior_obj_len
assert len(ens.source) == prior_src_len

if data_fixture == "parquet_ensemble_with_divisions":
ens.client.close() # sample_objects disables client cleanup, must do manually


def test_update_column_map(dask_client):
"""
Expand Down Expand Up @@ -1933,7 +1925,7 @@ def test_bin_sources_two_days(dask_client):
[
"parquet_ensemble",
"parquet_ensemble_with_divisions",
"parquet_ensemble_without_client",
"parquet_ensemble_with_client",
],
)
@pytest.mark.parametrize("use_map", [True, False])
Expand Down
Loading

0 comments on commit db3ac53

Please sign in to comment.