Skip to content

Commit

Permalink
feat(sdk)!: Pin kfp-pipeline-spec==0.4.0, kfp-server-api>=2.1.0,<2.4.0 (
Browse files Browse the repository at this point in the history
#11192)

Signed-off-by: Chen Sun <chensun@users.noreply.github.com>
  • Loading branch information
chensun authored Sep 9, 2024
1 parent d3aa837 commit dfd4cc1
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 54 deletions.
74 changes: 40 additions & 34 deletions sdk/python/kfp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def get_kfp_healthz(
)

try:
return self._healthz_api.get_healthz()
return self._healthz_api.healthz_service_get_healthz()
# ApiException, including network errors, is the only type that may
# recover after retry.
except kfp_server_api.ApiException:
Expand Down Expand Up @@ -474,7 +474,8 @@ def create_experiment(
description=description,
namespace=namespace,
)
experiment = self._experiment_api.create_experiment(body=experiment)
experiment = self._experiment_api.experiment_service_create_experiment(
body=experiment)

link = f'{self._get_url_prefix()}/#/experiments/details/{experiment.experiment_id}'
if auth.is_ipython():
Expand Down Expand Up @@ -502,7 +503,8 @@ def get_pipeline_id(self, name: str) -> Optional[str]:
'stringValue': name,
}]
})
result = self._pipelines_api.list_pipelines(filter=pipeline_filter)
result = self._pipelines_api.pipeline_service_list_pipelines(
filter=pipeline_filter)
if result.pipelines is None:
return None
if len(result.pipelines) == 1:
Expand Down Expand Up @@ -545,7 +547,7 @@ def list_experiments(
``V2beta1ListExperimentsResponse`` object.
"""
namespace = namespace or self.get_user_namespace()
return self._experiment_api.list_experiments(
return self._experiment_api.experiment_service_list_experiments(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
Expand Down Expand Up @@ -577,7 +579,7 @@ def get_experiment(
raise ValueError(
'Either experiment_id or experiment_name is required.')
if experiment_id is not None:
return self._experiment_api.get_experiment(
return self._experiment_api.experiment_service_get_experiment(
experiment_id=experiment_id)
experiment_filter = json.dumps({
'predicates': [{
Expand All @@ -587,10 +589,10 @@ def get_experiment(
}]
})
if namespace is not None:
result = self._experiment_api.list_experiments(
result = self._experiment_api.experiment_service_list_experiments(
filter=experiment_filter, namespace=namespace)
else:
result = self._experiment_api.list_experiments(
result = self._experiment_api.experiment_service_list_experiments(
filter=experiment_filter)
if not result.experiments:
raise ValueError(
Expand All @@ -609,7 +611,7 @@ def archive_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._experiment_api.archive_experiment(
return self._experiment_api.experiment_service_archive_experiment(
experiment_id=experiment_id)

def unarchive_experiment(self, experiment_id: str) -> dict:
Expand All @@ -621,7 +623,7 @@ def unarchive_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._experiment_api.unarchive_experiment(
return self._experiment_api.experiment_service_unarchive_experiment(
experiment_id=experiment_id)

def delete_experiment(self, experiment_id: str) -> dict:
Expand All @@ -633,7 +635,7 @@ def delete_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._experiment_api.delete_experiment(
return self._experiment_api.experiment_service_delete_experiment(
experiment_id=experiment_id)

def list_pipelines(
Expand Down Expand Up @@ -666,7 +668,7 @@ def list_pipelines(
Returns:
``V2beta1ListPipelinesResponse`` object.
"""
return self._pipelines_api.list_pipelines(
return self._pipelines_api.pipeline_service_list_pipelines(
namespace=namespace,
page_token=page_token,
page_size=page_size,
Expand Down Expand Up @@ -730,7 +732,7 @@ def run_pipeline(
runtime_config=job_config.runtime_config,
service_account=service_account)

response = self._run_api.create_run(body=run_body)
response = self._run_api.run_service_create_run(body=run_body)

link = f'{self._get_url_prefix()}/#/runs/details/{response.run_id}'
if auth.is_ipython():
Expand All @@ -751,7 +753,7 @@ def archive_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._run_api.archive_run(run_id=run_id)
return self._run_api.run_service_archive_run(run_id=run_id)

def unarchive_run(self, run_id: str) -> dict:
"""Restores an archived run.
Expand All @@ -762,7 +764,7 @@ def unarchive_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._run_api.unarchive_run(run_id=run_id)
return self._run_api.run_service_unarchive_run(run_id=run_id)

def delete_run(self, run_id: str) -> dict:
"""Deletes a run.
Expand All @@ -773,7 +775,7 @@ def delete_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._run_api.delete_run(run_id=run_id)
return self._run_api.run_service_delete_run(run_id=run_id)

def terminate_run(self, run_id: str) -> dict:
"""Terminates a run.
Expand All @@ -784,7 +786,7 @@ def terminate_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._run_api.terminate_run(run_id=run_id)
return self._run_api.run_service_terminate_run(run_id=run_id)

def create_recurring_run(
self,
Expand Down Expand Up @@ -896,7 +898,8 @@ def create_recurring_run(
trigger=trigger,
max_concurrency=max_concurrency,
service_account=service_account)
return self._recurring_run_api.create_recurring_run(body=job_body)
return self._recurring_run_api.recurring_run_service_create_recurring_run(
body=job_body)

def _create_job_config(
self,
Expand Down Expand Up @@ -1131,7 +1134,7 @@ def delete_recurring_run(self, recurring_run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._recurring_run_api.delete_recurring_run(
return self._recurring_run_api.recurring_run_service_delete_recurring_run(
recurring_run_id=recurring_run_id)

def disable_job(self, job_id: str) -> dict:
Expand Down Expand Up @@ -1159,7 +1162,7 @@ def disable_recurring_run(self, recurring_run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._recurring_run_api.disable_recurring_run(
return self._recurring_run_api.recurring_run_service_disable_recurring_run(
recurring_run_id=recurring_run_id)

def enable_job(self, job_id: str) -> dict:
Expand Down Expand Up @@ -1187,7 +1190,7 @@ def enable_recurring_run(self, recurring_run_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._recurring_run_api.enable_recurring_run(
return self._recurring_run_api.recurring_run_service_enable_recurring_run(
recurring_run_id=recurring_run_id)

def list_runs(
Expand Down Expand Up @@ -1225,23 +1228,23 @@ def list_runs(
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is not None:
return self._run_api.list_runs(
return self._run_api.run_service_list_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
experiment_id=experiment_id,
filter=filter)

elif namespace is not None:
return self._run_api.list_runs(
return self._run_api.run_service_list_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
namespace=namespace,
filter=filter)

else:
return self._run_api.list_runs(
return self._run_api.run_service_list_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
Expand Down Expand Up @@ -1281,23 +1284,23 @@ def list_recurring_runs(
``V2beta1ListRecurringRunsResponse`` object.
"""
if experiment_id is not None:
return self._recurring_run_api.list_recurring_runs(
return self._recurring_run_api.recurring_run_service_list_recurring_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
experiment_id=experiment_id,
filter=filter)

elif namespace is not None:
return self._recurring_run_api.list_recurring_runs(
return self._recurring_run_api.recurring_run_service_list_recurring_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
namespace=namespace,
filter=filter)

else:
return self._recurring_run_api.list_recurring_runs(
return self._recurring_run_api.recurring_run_service_list_recurring_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
Expand All @@ -1324,7 +1327,7 @@ def get_recurring_run(
stacklevel=2)
recurring_run_id = recurring_run_id or job_id

return self._recurring_run_api.get_recurring_run(
return self._recurring_run_api.recurring_run_service_get_recurring_run(
recurring_run_id=recurring_run_id)

def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run:
Expand All @@ -1336,7 +1339,7 @@ def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run:
Returns:
``V2beta1Run`` object.
"""
return self._run_api.get_run(run_id=run_id)
return self._run_api.run_service_get_run(run_id=run_id)

def wait_for_run_completion(
self,
Expand All @@ -1362,7 +1365,8 @@ def wait_for_run_completion(
finish_states = ['succeeded', 'failed', 'skipped', 'error']
while True:
try:
get_run_response = self._run_api.get_run(run_id=run_id)
get_run_response = self._run_api.run_service_get_run(
run_id=run_id)
is_valid_token = True
except kfp_server_api.ApiException as api_ex:
# if the token is valid but receiving 401 Unauthorized error
Expand Down Expand Up @@ -1480,7 +1484,8 @@ def get_pipeline(self, pipeline_id: str) -> kfp_server_api.V2beta1Pipeline:
Returns:
``V2beta1Pipeline`` object.
"""
return self._pipelines_api.get_pipeline(pipeline_id=pipeline_id)
return self._pipelines_api.pipeline_service_get_pipeline(
pipeline_id=pipeline_id)

def delete_pipeline(self, pipeline_id: str) -> dict:
"""Deletes a pipeline.
Expand All @@ -1491,7 +1496,8 @@ def delete_pipeline(self, pipeline_id: str) -> dict:
Returns:
Empty dictionary.
"""
return self._pipelines_api.delete_pipeline(pipeline_id=pipeline_id)
return self._pipelines_api.pipeline_service_delete_pipeline(
pipeline_id=pipeline_id)

def list_pipeline_versions(
self,
Expand Down Expand Up @@ -1525,7 +1531,7 @@ def list_pipeline_versions(
``V2beta1ListPipelineVersionsResponse`` object.
"""

return self._pipelines_api.list_pipeline_versions(
return self._pipelines_api.pipeline_service_list_pipeline_versions(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
Expand All @@ -1546,7 +1552,7 @@ def get_pipeline_version(
Returns:
``V2beta1PipelineVersion`` object.
"""
return self._pipelines_api.get_pipeline_version(
return self._pipelines_api.pipeline_service_get_pipeline_version(
pipeline_id=pipeline_id,
pipeline_version_id=pipeline_version_id,
)
Expand All @@ -1565,7 +1571,7 @@ def delete_pipeline_version(
Returns:
Empty dictionary.
"""
return self._pipelines_api.delete_pipeline_version(
return self._pipelines_api.pipeline_service_delete_pipeline_version(
pipeline_id=pipeline_id,
pipeline_version_id=pipeline_version_id,
)
Expand Down
Loading

0 comments on commit dfd4cc1

Please sign in to comment.