From 99f6d9f76f093bae761adfe19a30ecb79bdf0177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E5=AE=B6=E7=91=8B?= <36886416+JiangJiaWei1103@users.noreply.github.com> Date: Tue, 7 Jan 2025 00:03:34 +0800 Subject: [PATCH] [test] Add integration test for accessing sd sttr in dc (#2969) * test: Add integration test for attr access of sd Signed-off-by: JiaWei Jiang * Correct file path Signed-off-by: JiaWei Jiang * test: Support interaction with minio s3 bucket 1. Upload a local parquet file to minio s3 bucket 2. Access StructuredDataset attr from a dataclass 3. Open StructuredDataset from a remote path Signed-off-by: JiaWei Jiang * Delete an unmerged integration test Signed-off-by: JiaWei Jiang * Try imagespec with commit sha of corresponding fix Signed-off-by: JiaWei Jiang * Remove redundant test Signed-off-by: JiaWei Jiang * Remove default_factory and create sd dc from input uri Signed-off-by: JiaWei Jiang * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang * Remove redundant minio env var setup and add test comments Signed-off-by: JiaWei Jiang * Support uploading tmp pqt file Signed-off-by: JiaWei Jiang * Udpate deprecated module Signed-off-by: JiaWei Jiang * Remove redundant and unused imports Signed-off-by: JiaWei Jiang --------- Signed-off-by: JiaWei Jiang --- .../integration/remote/test_remote.py | 20 +++++++- tests/flytekit/integration/remote/utils.py | 3 ++ .../remote/workflows/basic/attr_access_sd.py | 46 +++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index f20a33aea8..8736c7b2ef 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -14,7 +14,7 @@ from urllib.parse import urlparse import uuid import pytest -import mock +from unittest import mock from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase from flytekit.configuration import Config, ImageConfig, SerializationSettings @@ -833,3 +833,21 @@ def test_open_ff(): url = urlparse(remote_file_path) bucket, key = url.netloc, url.path.lstrip("/") file_transfer.delete_file(bucket=bucket, key=key) + + +def test_attr_access_sd(): + """Test accessing StructuredDataset attribute from a dataclass.""" + # Upload a file to minio s3 bucket + file_transfer = SimpleFileTransfer() + remote_file_path = file_transfer.upload_file(file_type="parquet") + + execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path) + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" + + # Delete the remote file to free the space + url = urlparse(remote_file_path) + bucket, key = url.netloc, url.path.lstrip("/") + file_transfer.delete_file(bucket=bucket, key=key) diff --git a/tests/flytekit/integration/remote/utils.py b/tests/flytekit/integration/remote/utils.py index dadc8c6530..c16a0d0f4d 100644 --- a/tests/flytekit/integration/remote/utils.py +++ b/tests/flytekit/integration/remote/utils.py @@ -84,6 +84,9 @@ def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str: tmp_file_path = pathlib.Path(tmp_dir) / "test.json" with open(tmp_file_path, "w") as f: json.dump(d, f) + elif file_type == "parquet": + # Because `upload_file` accepts a single file only, we specify 00000 to make it a single file + tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000" return tmp_file_path diff --git a/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py b/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py new file mode 100644 index 0000000000..9d01926081 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py @@ -0,0 +1,46 @@ +""" +Test accessing StructuredDataset attribute from a dataclass. +""" +from dataclasses import dataclass + +import pandas as pd +from flytekit import task, workflow +from flytekit.types.structured import StructuredDataset + + +@dataclass +class DC: + sd: StructuredDataset + + +@task +def create_dc(uri: str) -> DC: + """Create a dataclass with a StructuredDataset attribute. + + Args: + uri: File URI. + + Returns: + dc: A dataclass with a StructuredDataset attribute. + """ + dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet")) + + return dc + + +@task +def read_sd(sd: StructuredDataset) -> StructuredDataset: + """Read input StructuredDataset.""" + print("sd:", sd.open(pd.DataFrame).all()) + + return sd + + +@workflow +def wf(uri: str) -> None: + dc = create_dc(uri=uri) + read_sd(sd=dc.sd) + + +if __name__ == "__main__": + wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")