Skip to content

Commit

Permalink
Support multiple workload deletion from single workload delete command (
Browse files Browse the repository at this point in the history
#62)

* Support multiple workload deletion from single workload delete command

* Add docstring for get_workload_list()

* Refactor workload deletion flow

* Add docstring to get_user_input()
  • Loading branch information
SurbhiJainUSC authored Feb 6, 2024
1 parent 650e47e commit 4c21926
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 30 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ checkpointing so the job restarts near where it was interrupted.
--workload xpk-test-workload --cluster xpk-test
```

This will only delete `xpk-test-workload` workload in `xpk-test` cluster.

* Workload Delete (delete all training jobs in the cluster):

```shell
python3 xpk.py workload delete \
--cluster xpk-test
```

This will delete all the workloads in `xpk-test` cluster. Deletion will only begin if you type `y` or `yes` at the prompt.

* Workload Delete supports filtering. Delete a portion of jobs that match user criteria.
* Filter by Job: `filter-by-job`

```shell
python3 xpk.py workload delete \
--cluster xpk-test --filter-by-job=$USER
```

This will delete all the workloads in `xpk-test` cluster whose names start with `$USER`. Deletion will only begin if you type `y` or `yes` at the prompt.

## Workload List
* Workload List (see training jobs):

Expand Down
126 changes: 96 additions & 30 deletions xpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,19 @@ def get_capacity_arguments(args) -> tuple[str, int]:

return capacity_args, return_code


def get_user_input(input_msg):
"""Function to get the user input for a prompt.
Args:
input_msg: message to be displayed by the prompt.
Returns:
True if user enter y or yes at the prompt, False otherwise.
"""
user_input = input(input_msg)
return user_input in ('y', 'yes')


def run_gke_node_pool_create_command(args, system) -> int:
"""Run the Create GKE Node Pool request.
Expand Down Expand Up @@ -1392,13 +1405,9 @@ def run_gke_node_pool_create_command(args, system) -> int:

will_delete = True
if node_pools_to_delete and not args.force:
user_input = input(
will_delete = get_user_input(
f'Planning to delete {len(node_pools_to_delete)} node pools including '
f'{node_pools_to_delete}. \nDo you wish to delete: y (yes) / n (no):\n'
)
user_input_approves_delete = user_input in ('y', 'yes')
if not user_input_approves_delete:
will_delete = False
f'{node_pools_to_delete}. \nDo you wish to delete: y (yes) / n (no):\n')

if not will_delete:
xpk_print('Skipping delete commands. Continuing to next step.')
Expand Down Expand Up @@ -2481,14 +2490,42 @@ def workload_delete(args) -> int:
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)

yml_string = workload_delete_yaml.format(args=args)
tmp = write_temporary_file(yml_string)
command = f'kubectl delete -f {str(tmp.file.name)}'
return_code = run_command_with_updates(command, 'Delete Workload', args)
will_delete = True
if not args.workload:
args.filter_by_status = "EVERYTHING"
columns = {
'Jobset Name': '.metadata.ownerReferences[0].name',
}
xpk_print("Get the name of the workloads in the cluster.")
return_code, return_value = get_workload_list(args, columns)

if return_code != 0:
xpk_print(f'List Job request returned ERROR {return_code}')
xpk_exit(return_code)
# Skip the header
workloads = return_value.strip().split("\n")[1:]
if workloads and not args.force:
will_delete = get_user_input(
f'Planning to delete {len(workloads)} workloads in the cluster {args.cluster} '
f'including {workloads}. \nDo you wish to delete: y (yes) / n (no):\n')
else:
workloads = [args.workload]

if return_code != 0:
xpk_print(f'Delete Workload request returned ERROR {return_code}')
xpk_exit(return_code)
if not workloads:
xpk_print("There are no workloads to delete matching the filter in the cluster.")
elif not will_delete:
xpk_print("Skipping delete command.")
else:
for workload in workloads:
args.workload = workload
yml_string = workload_delete_yaml.format(args=args)
tmp = write_temporary_file(yml_string)
command = f'kubectl delete -f {str(tmp.file.name)}'
return_code = run_command_with_updates(command, 'Delete Workload', args)

if return_code != 0:
xpk_print(f'Delete Workload request returned ERROR {return_code}')
xpk_exit(return_code)
xpk_exit(0)


Expand Down Expand Up @@ -2560,6 +2597,28 @@ def determine_workload_list_filter_by_job(args) -> str:
return workload_list_awk_command(f'{job_name_arg} ~ \"{args.filter_by_job}\"')


def get_workload_list(args, columns) -> None:
"""Function to get the list of the workloads in the cluster.
Args:
args: user provided arguments for running the command.
Returns:
return_code: 0 if successful and 1 otherwise.
return_value: workloads in the cluster matching the criteria.
"""
s = ','.join([key + ':' + value for key, value in columns.items()])

workload_list_filter_status_cmd = determine_workload_list_filter_by_status(args)
workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args)
command = (f'kubectl get workloads -o=custom-columns="{s}" '
f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}'
)

return_code, return_value = run_command_for_value(command, 'List Jobs', args)
return return_code, return_value


def workload_list(args) -> None:
"""Function around workload list.
Expand Down Expand Up @@ -2588,20 +2647,12 @@ def workload_list(args) -> None:
'Status Message': '.status.conditions[-1].message',
'Status Time': '.status.conditions[-1].lastTransitionTime',
}

s = ','.join([key + ':' + value for key, value in columns.items()])

workload_list_filter_status_cmd = determine_workload_list_filter_by_status(args)
workload_list_filter_job_cmd = determine_workload_list_filter_by_job(args)
command = (f'kubectl get workloads -o=custom-columns="{s}" '
f'{workload_list_filter_status_cmd} {workload_list_filter_job_cmd}'
)

return_code = run_command_with_updates(command, 'List Jobs', args)
return_code, return_value = get_workload_list(args, columns)

if return_code != 0:
xpk_print(f'List Job request returned ERROR {return_code}')
xpk_exit(return_code)
xpk_print(return_value)
xpk_exit(0)


Expand Down Expand Up @@ -3186,13 +3237,6 @@ def directory_path_type(value):
add_shared_arguments(workload_delete_parser_optional_arguments)

### Required arguments
workload_delete_parser_required_arguments.add_argument(
'--workload',
type=workload_name_type,
default=None,
help='The name of the workload to delete.',
required=True,
)
workload_delete_parser_required_arguments.add_argument(
'--cluster',
type=str,
Expand All @@ -3201,6 +3245,28 @@ def directory_path_type(value):
required=True,
)

### Optional arguments
workload_delete_parser_optional_arguments.add_argument(
'--workload',
type=workload_name_type,
default=None,
help='The name of the workload to delete. If the workload is not specified, '
'all workloads will be deleted from the cluster.',
)
workload_delete_parser_optional_arguments.add_argument(
'--filter-by-job',
type=str,
help='Filters the arguments based on job name. Provide a regex expression'
'to parse jobs that match the pattern or provide a job name to delete a single job.',
)
workload_delete_parser_optional_arguments.add_argument(
'--force',
action='store_true',
help=(
'Forces workload deletion command to run without additional approval.'
),
)

workload_delete_parser.set_defaults(func=workload_delete)

# "workload list" command parser.
Expand Down

0 comments on commit 4c21926

Please sign in to comment.