Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple workload deletion from single workload delete command #62

Merged
merged 7 commits into from
Feb 6, 2024
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ checkpointing so the job restarts near where it was interrupted.
--workload xpk-test-workload --cluster xpk-test
```

This will only delete `xpk-test-workload` workload in `xpk-test` cluster.

* Workload Delete (delete all training jobs in the cluster):

```shell
python3 xpk.py workload delete \
--cluster xpk-test
```

This will delete all the workloads in `xpk-test` cluster. Deletion will only begin if you type `y` or `yes` at the prompt.

* Workload Delete supports filtering. Delete a portion of jobs that match user criteria.
* Filter by Job: `filter-by-job`

```shell
python3 xpk.py workload delete \
--cluster xpk-test --filter-by-job=$USER
```

This will delete all the workloads in `xpk-test` cluster whose names start with `$USER`. Deletion will only begin if you type `y` or `yes` at the prompt.

## Workload List
* Workload List (see training jobs):

Expand Down
126 changes: 96 additions & 30 deletions xpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,19 @@ def get_capacity_arguments(args) -> tuple[str, int]:

return capacity_args, return_code


def get_user_input(input_msg):
"""Function to get the user input for a prompt.

Args:
input_msg: message to be displayed by the prompt.
Returns:
True if user enter y or yes at the prompt, False otherwise.
"""
user_input = input(input_msg)
return user_input in ('y', 'yes')


def run_gke_node_pool_create_command(args, system) -> int:
"""Run the Create GKE Node Pool request.

Expand Down Expand Up @@ -1392,13 +1405,9 @@ def run_gke_node_pool_create_command(args, system) -> int:

will_delete = True
if node_pools_to_delete and not args.force:
user_input = input(
will_delete = get_user_input(
f'Planning to delete {len(node_pools_to_delete)} node pools including '
f'{node_pools_to_delete}. \nDo you wish to delete: y (yes) / n (no):\n'
)
user_input_approves_delete = user_input in ('y', 'yes')
if not user_input_approves_delete:
will_delete = False
f'{node_pools_to_delete}. \nDo you wish to delete: y (yes) / n (no):\n')

if not will_delete:
xpk_print('Skipping delete commands. Continuing to next step.')
Expand Down Expand Up @@ -2481,14 +2490,42 @@ def workload_delete(args) -> int:
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

yml_string = workload_delete_yaml.format(args=args)
tmp = write_temporary_file(yml_string)
command = f'kubectl delete -f {str(tmp.file.name)}'
return_code = run_command_with_updates(command, 'Delete Workload', args)
will_delete = True
if not args.workload:
args.filter_by_status = "EVERYTHING"
columns = {
'Jobset Name': '.metadata.ownerReferences[0].name',
}
xpk_print("Get the name of the workloads in the cluster.")
return_code, return_value = get_workload_list(args, columns)

if return_code != 0:
xpk_print(f'List Job request returned ERROR {return_code}')
xpk_exit(return_code)
# Skip the header
workloads = return_value.strip().split("\n")[1:]
if workloads and not args.force:
will_delete = get_user_input(
f'Planning to delete {len(workloads)} workloads in the cluster {args.cluster} '
f'including {workloads}. \nDo you wish to delete: y (yes) / n (no):\n')
else:
workloads = [args.workload]
Obliviour marked this conversation as resolved.
Show resolved Hide resolved

if return_code != 0:
xpk_print(f'Delete Workload request returned ERROR {return_code}')
xpk_exit(return_code)
if not workloads:
xpk_print("There are no workloads to delete matching the filter in the cluster.")
elif not will_delete:
xpk_print("Skipping delete command.")
else:
for workload in workloads:
args.workload = workload
yml_string = workload_delete_yaml.format(args=args)
tmp = write_temporary_file(yml_string)
command = f'kubectl delete -f {str(tmp.file.name)}'
return_code = run_command_with_updates(command, 'Delete Workload', args)

if return_code != 0:
xpk_print(f'Delete Workload request returned ERROR {return_code}')
xpk_exit(return_code)
xpk_exit(0)


Expand Down Expand Up @@ -2560,6 +2597,28 @@ def determine_workload_list_filter_by_job(args) -> str:
return workload_list_awk_command(f'{job_name_arg} ~ \"{args.filter_by_job}\"')


def get_workload_list(args, columns) -> None:
"""Function to get the list of the workloads in the cluster.
Obliviour marked this conversation as resolved.
Show resolved Hide resolved

Args:
args: user provided arguments for running the command.

Returns:
return_code: 0 if successful and 1 otherwise.
return_value: workloads in the cluster matching the criteria.
"""
s = ','.join([key + ':' + value for key, value in columns.items()])

workload_list_filter_status_cmd = determine_workload_list_filter_by_status(args)
workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args)
command = (f'kubectl get workloads -o=custom-columns="{s}" '
f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}'
)

return_code, return_value = run_command_for_value(command, 'List Jobs', args)
return return_code, return_value


def workload_list(args) -> None:
"""Function around workload list.

Expand Down Expand Up @@ -2588,20 +2647,12 @@ def workload_list(args) -> None:
'Status Message': '.status.conditions[-1].message',
'Status Time': '.status.conditions[-1].lastTransitionTime',
}

s = ','.join([key + ':' + value for key, value in columns.items()])

workload_list_filter_status_cmd = determine_workload_list_filter_by_status(args)
workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args)
command = (f'kubectl get workloads -o=custom-columns="{s}" '
f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}'
)

return_code = run_command_with_updates(command, 'List Jobs', args)
return_code, return_value = get_workload_list(args, columns)

if return_code != 0:
xpk_print(f'List Job request returned ERROR {return_code}')
xpk_exit(return_code)
xpk_print(return_value)
xpk_exit(0)


Expand Down Expand Up @@ -3186,13 +3237,6 @@ def directory_path_type(value):
add_shared_arguments(workload_delete_parser_optional_arguments)

### Required arguments
workload_delete_parser_required_arguments.add_argument(
'--workload',
type=workload_name_type,
default=None,
help='The name of the workload to delete.',
required=True,
)
workload_delete_parser_required_arguments.add_argument(
'--cluster',
type=str,
Expand All @@ -3201,6 +3245,28 @@ def directory_path_type(value):
required=True,
)

### Optional arguments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a --force argument that skips the user prompt? This is good for automated tests for example. Or if a user really believes in themselves. You can build from https://github.com/google/xpk/blob/main/xpk.py#L2823-L2830

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added!

workload_delete_parser_optional_arguments.add_argument(
'--workload',
type=workload_name_type,
default=None,
help='The name of the workload to delete. If the workload is not specified, '
'all workloads will be deleted from the cluster.',
)
workload_delete_parser_optional_arguments.add_argument(
'--filter-by-job',
type=str,
help='Filters the arguments based on job name. Provide a regex expression'
'to parse jobs that match the pattern or provide a job name to delete a single job.',
)
workload_delete_parser_optional_arguments.add_argument(
'--force',
action='store_true',
help=(
'Forces workload deletion command to run without additional approval.'
),
)

workload_delete_parser.set_defaults(func=workload_delete)

# "workload list" command parser.
Expand Down
Loading