diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/__init__.py index 56a124ca84bd..cf79b890fc69 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/__init__.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/__init__.py @@ -16,6 +16,7 @@ import os from google_cloud_pipeline_components._implementation.model_evaluation.chunking.component import chunking as ChunkingOp +from google_cloud_pipeline_components._implementation.model_evaluation.data_labeling.component import evaluation_data_labeling as EvaluationDataLabelingOp from google_cloud_pipeline_components._implementation.model_evaluation.data_sampler.component import evaluation_data_sampler as EvaluationDataSamplerOp from google_cloud_pipeline_components._implementation.model_evaluation.dataset_preprocessor.component import dataset_preprocessor_error_analysis as EvaluationDatasetPreprocessorOp from google_cloud_pipeline_components._implementation.model_evaluation.endpoint_batch_predict.component import evaluation_llm_endpoint_batch_predict_pipeline_graph_component @@ -49,6 +50,7 @@ 'evaluation_llm_endpoint_batch_predict_pipeline_graph_component', 'ChunkingOp', 'EvaluationDataSamplerOp', + 'EvaluationDataLabelingOp', 'EvaluationDatasetPreprocessorOp', 'ErrorAnalysisAnnotationOp', 'EvaluatedAnnotationOp', diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/__init__.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/__init__.py new file mode 100644 index 000000000000..30c57174c450 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2023 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Google Cloud Pipeline Evaluation Data Labeling Component.""" diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/component.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/component.py new file mode 100644 index 000000000000..29101c233595 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/model_evaluation/data_labeling/component.py @@ -0,0 +1,83 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Data Labeling Evaluation component.""" + + +from google_cloud_pipeline_components import _image +from kfp import dsl + + +@dsl.container_component +def evaluation_data_labeling( + project: str, + location: str, + gcp_resources: dsl.OutputPath(str), # pytype: disable=invalid-annotation + job_display_name: str, + dataset_name: str, + instruction_uri: str, + inputs_schema_uri: str, + annotation_spec: str, + labeler_count: int, + annotation_label: str, +): + """Builds a container spec that launches a data labeling job. + + Args: + project: Project to run the job in. + location: Location to run the job in. + gcp_resources: GCP resources that can be used to track the job. + job_display_name: Display name of the data labeling job. + dataset_name: Name of the dataset to use for the data labeling job. + instruction_uri: URI of the instruction for the data labeling job. + inputs_schema_uri: URI of the inputs schema for the data labeling job. + annotation_spec: Annotation spec for the data labeling job. + labeler_count: Number of labelers to use for the data labeling job. + annotation_label: Label of the data labeling job. + + Returns: + Container spec that launches a data labeling job with the specified payload. + """ + return dsl.ContainerSpec( + image=_image.GCPC_IMAGE_TAG, + command=[ + 'python3', + '-u', + '-m', + 'google_cloud_pipeline_components.container._implementation.model_evaluation.data_labeling_job.launcher', + ], + args=[ + '--type', + 'DataLabelingJob', + '--project', + project, + '--location', + location, + '--gcp_resources', + gcp_resources, + '--job_display_name', + job_display_name, + '--dataset_name', + dataset_name, + '--instruction_uri', + instruction_uri, + '--inputs_schema_uri', + inputs_schema_uri, + '--annotation_spec', + annotation_spec, + '--labeler_count', + labeler_count, + '--annotation_label', + annotation_label, + ], + ) diff --git a/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/__init__.py b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/__init__.py new file mode 100644 index 000000000000..ded4826046e1 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Google Cloud Pipeline Components - Data Labeling Job Launcher and Remote Runner.""" diff --git a/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/launcher.py b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/launcher.py new file mode 100644 index 000000000000..e9a359320618 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/launcher.py @@ -0,0 +1,154 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""GCP launcher for data labeling jobs based on the AI Platform SDK.""" + +import argparse +import logging +import os +import sys +from typing import Any, Dict + +from google_cloud_pipeline_components.container._implementation.model_evaluation.data_labeling_job import remote_runner + + +def _make_parent_dirs_and_return_path(file_path: str): + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + +def _parse_args(args) -> Dict[str, Any]: + """Parse command line arguments. + + Args: + args: A list of arguments. + + Returns: + A tuple containing an argparse.Namespace class instance holding parsed args, + and a list containing all unknonw args. + """ + parser = argparse.ArgumentParser( + prog='Dataflow python job Pipelines service launcher', description='' + ) + parser.add_argument( + '--type', + dest='type', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--project', + dest='project', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--location', + dest='location', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--gcp_resources', + dest='gcp_resources', + type=_make_parent_dirs_and_return_path, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--job_display_name', + dest='job_display_name', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--dataset_name', + dest='dataset_name', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--instruction_uri', + dest='instruction_uri', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--inputs_schema_uri', + dest='inputs_schema_uri', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--annotation_spec', + dest='annotation_spec', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--labeler_count', + dest='labeler_count', + type=int, + required=True, + default=argparse.SUPPRESS, + ) + parser.add_argument( + '--annotation_label', + dest='annotation_label', + type=str, + required=True, + default=argparse.SUPPRESS, + ) + parsed_args, _ = parser.parse_known_args(args) + return vars(parsed_args) + + +def main(argv): + """Main entry. + + Expected input args are as follows: + Project - Required. The project of which the resource will be launched. + Region - Required. The region of which the resource will be launched. + Type - Required. GCP launcher is a single container. This Enum will + specify which resource to be launched. + Request payload - Required. The full serialized json of the resource spec. + Note this can contain the Pipeline Placeholders. + gcp_resources - placeholder output for returning job_id. + + Args: + argv: A list of system arguments. + """ + parsed_args = _parse_args(argv) + job_type = parsed_args['type'] + + if job_type != 'DataLabelingJob': + raise ValueError('Incorrect job type: ' + job_type) + + logging.info( + 'Starting DataLabelingJob using the following arguments: %s', + parsed_args, + ) + + remote_runner.create_data_labeling_job(**parsed_args) + + +if __name__ == '__main__': + main(sys.argv[1:]) diff --git a/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/remote_runner.py b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/remote_runner.py new file mode 100644 index 000000000000..90eda60f10b7 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/_implementation/model_evaluation/data_labeling_job/remote_runner.py @@ -0,0 +1,108 @@ +# Copyright 2024 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""GCP launcher for data labeling jobs based on the AI Platform SDK.""" + +import json + +from google.api_core import retry +from google_cloud_pipeline_components.container.v1.gcp_launcher import job_remote_runner +from google_cloud_pipeline_components.container.v1.gcp_launcher.utils import error_util + +_DATA_LABELING_JOB_RETRY_DEADLINE_SECONDS = 10.0 * 60.0 + + +def create_data_labeling_job_with_client(job_client, parent, job_spec): + create_data_labeling_job_fn = None + try: + create_data_labeling_job_fn = job_client.create_data_labeling_job( + parent=parent, data_labeling_job=job_spec + ) + except (ConnectionError, RuntimeError) as err: + error_util.exit_with_internal_error(err.args[0]) + return create_data_labeling_job_fn + + +def get_data_labeling_job_with_client(job_client, job_name): + get_data_labeling_job_fn = None + try: + get_data_labeling_job_fn = job_client.get_data_labeling_job( + name=job_name, + retry=retry.Retry(deadline=_DATA_LABELING_JOB_RETRY_DEADLINE_SECONDS), + ) + except (ConnectionError, RuntimeError) as err: + error_util.exit_with_internal_error(err.args[0]) + return get_data_labeling_job_fn + + +def create_data_labeling_job( + type, + project, + location, + gcp_resources, + job_display_name, + dataset_name, + instruction_uri, + inputs_schema_uri, + annotation_spec, + labeler_count, + annotation_label, +): + """Create data labeling job. + + This follows the typical launching logic: + 1. Read if the data labeling job already exists in gcp_resources + - If already exists, jump to step 3 and poll the job status. This happens + if the launcher container experienced unexpected termination, such as + preemption + 2. Deserialize the params into the job spec and create the data labeling + job + 3. Poll the data labeling job status every + job_remote_runner._POLLING_INTERVAL_IN_SECONDS seconds + - If the data labeling job is succeeded, return succeeded + - If the data labeling job is cancelled/paused, it's an unexpected + scenario so return failed + - If the data labeling job is running, continue polling the status + + Also retry on ConnectionError up to + job_remote_runner._CONNECTION_ERROR_RETRY_LIMIT times during the poll. + """ + remote_runner = job_remote_runner.JobRemoteRunner( + type, project, location, gcp_resources + ) + + job_spec = { + 'display_name': job_display_name, + 'datasets': [dataset_name], + 'instruction_uri': instruction_uri, + 'inputs_schema_uri': inputs_schema_uri, + 'inputs': annotation_spec, + 'annotation_labels': { + 'aiplatform.googleapis.com/annotation_set_name': annotation_label + }, + 'labeler_count': labeler_count, + } + + try: + # Create data labeling job if it does not exist + job_name = remote_runner.check_if_job_exists() + if job_name is None: + job_name = remote_runner.create_job( + create_data_labeling_job_with_client, + json.dumps(job_spec), + ) + + # Poll data labeling job status until "JobState.JOB_STATE_SUCCEEDED" + remote_runner.poll_job(get_data_labeling_job_with_client, job_name) + except (ConnectionError, RuntimeError) as err: + error_util.exit_with_internal_error(err.args[0])