diff --git a/.github/workflows/test.cli.yml b/.github/workflows/test.cli.yml index 8f543234..5b371595 100644 --- a/.github/workflows/test.cli.yml +++ b/.github/workflows/test.cli.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v3 diff --git a/integration/keeper_secrets_manager_cli/README.md b/integration/keeper_secrets_manager_cli/README.md index f6f5d98a..fa9e3012 100644 --- a/integration/keeper_secrets_manager_cli/README.md +++ b/integration/keeper_secrets_manager_cli/README.md @@ -6,6 +6,15 @@ For more information see our official documentation page https://docs.keeper.io/ # Change History +## 1.1.3 + +- KSM-496: Added upload file option +- KSM-495: Added query option to ksm secret list command +- KSM-494: Added folder support to secret list command +- KSM-493: Added CLI options to update title and notes +- KSM-492: Added clone option +- KSM-485: Added sub-folder support to ksm secret add command + ## 1.1.1 * KSM-429 - Add `--profile-name` to `ksm profile import` command diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py index 4de0dcb1..56cb8e50 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/__main__.py @@ -67,6 +67,7 @@ class AliasedGroup(HelpColorsGroup): "add", "editor", "field", + "clone", "record", "file", "cache", @@ -249,7 +250,8 @@ def handle_parse_result(self, ctx, opts, args): for mutex_opt in self.not_required_if: if mutex_opt and mutex_opt[0] in opts and (len(mutex_opt) == 1 or opts.get(mutex_opt[0], str(mutex_opt[1])+'_') == mutex_opt[1]): if current_opt: - raise click.UsageError("Illegal usage: '" + str(self.name) + "' is mutually exclusive with " + str(mutex_opt) + ".") + opt = str(mutex_opt) if len(mutex_opt) > 1 else f"'{str(mutex_opt[0])}'" + raise click.UsageError("Illegal usage: '" + str(self.name) + "' is mutually exclusive with " + opt + ".") else: self.prompt = None for mutex_opt in self.required_if: @@ -530,10 +532,14 @@ def secret_command(ctx): cls=HelpColorsCommand, help_options_color='blue' ) -@click.option('--uid', "-u", type=str, multiple=True) +@click.option('--uid', '-u', type=str, multiple=True, help='List specific records by Record UID', cls=Mutex, not_required_if=[('folder',)]) +@click.option('--folder', '-f', type=str, help='List only records in specified folder UID') +@click.option('--recursive', '-r', is_flag=True, help='List recursively all records including subfolders of the folder UID') +@click.option('--query', '-q', type=str, help='List records matching the JSONPath query') +@click.option('--show-value', '-v', is_flag=True, help='Print matching value instead of record title') @click.option('--json', is_flag=True, help='Return secret as JSON') @click.pass_context -def secret_list_command(ctx, uid, json): +def secret_list_command(ctx, uid, folder, recursive, query, show_value, json): """List all secrets""" output = "text" @@ -542,6 +548,10 @@ def secret_list_command(ctx, uid, json): ctx.obj["secret"].secret_list( uids=uid, + folder=folder, + recursive=recursive, + query=query, + show_value=show_value, output_format=output, use_color=ctx.obj["cli"].use_color ) @@ -624,15 +634,19 @@ def secret_notation_command(ctx, text): @click.option('--field-json', type=str, multiple=True, help="Update value in field section of vault using JSON") @click.option('--custom-field-json', type=str, multiple=True, help="Update value in custom field section of vault using JSON") +@click.option('--title', '-t', type=str, help="Update record title.") +@click.option('--notes', '-n', type=str, help="Update record notes.") @click.pass_context -def secret_update_command(ctx, uid, field, custom_field, field_json, custom_field_json): +def secret_update_command(ctx, uid, field, custom_field, field_json, custom_field_json, title, notes): """Update an existing record""" ctx.obj["secret"].update( uid=uid, fields=field, custom_fields=custom_field, fields_json=field_json, - custom_fields_json=custom_field_json + custom_fields_json=custom_field_json, + title=title, + notes=notes ) @@ -827,7 +841,7 @@ def secret_add_command(): help_options_color='blue' ) @click.pass_context -@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--storage-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") @click.option('--record-type', '--rt', required=True, type=str, help="Record type") @click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') @click.option('--title', '-t', type=str, help="Record title") @@ -836,13 +850,13 @@ def secret_add_command(): help='File format to display in editor.') @click.option('--editor', '-e', type=str, help='Application to use to edit record data.') @click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', help='Record version.') -def secret_add_editor_command(ctx, shared_folder_uid, record_type, password_generate, title, notes, +def secret_add_editor_command(ctx, storage_folder_uid, record_type, password_generate, title, notes, output_format, editor, version): """Add a secret record via a text editor""" ctx.obj["secret"].add_record_interactive( version=version, - folder_uid=shared_folder_uid, + folder_uid=storage_folder_uid, record_type=record_type, output_format=output_format, password_generate_flag=password_generate, @@ -859,14 +873,14 @@ def secret_add_editor_command(ctx, shared_folder_uid, record_type, password_gene help_options_color='blue' ) @click.pass_context -@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--storage-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") @click.option('--file', '-f', required=True, type=str, help='Add records from record script file.') @click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') -def secret_add_file_command(ctx, shared_folder_uid, file, password_generate): +def secret_add_file_command(ctx, storage_folder_uid, file, password_generate): """Add a secret record(s) from a file""" ctx.obj["secret"].add_record_from_file( - folder_uid=shared_folder_uid, + folder_uid=storage_folder_uid, file=file, password_generate_flag=password_generate, ) @@ -879,20 +893,20 @@ def secret_add_file_command(ctx, shared_folder_uid, file, password_generate): help_options_color='blue' ) @click.pass_context -@click.option('--shared-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") +@click.option('--storage-folder-uid', '--sf', required=True, type=str, help="Place record in folder with UID.") @click.option('--record-type', '--rt', required=True, type=str, help="Record type") @click.option('--title', '-t', required=True, type=str, help="Record title") @click.option('--password-generate', '-p', is_flag=True, help='Generate passwords for empty password fields.') @click.option('--notes', '-n', type=str, help="Record simple note") @click.option('--version', type=click.Choice(['v3'], case_sensitive=False), default='v3', help='Record version.') @click.argument('field_args', type=str, nargs=-1) -def secret_add_field_command(ctx, shared_folder_uid, record_type, title, password_generate, notes, version, +def secret_add_field_command(ctx, storage_folder_uid, record_type, title, password_generate, notes, version, field_args): """Add a secret record from a command line field arguments""" ctx.obj["secret"].add_record_from_field_args( version=version, - folder_uid=shared_folder_uid, + folder_uid=storage_folder_uid, password_generate_flag=password_generate, record_type=record_type, title=title, @@ -902,6 +916,32 @@ def secret_add_field_command(ctx, shared_folder_uid, record_type, title, passwor print("", file=sys.stderr) +def validate_non_empty(ctx, param, value): + """Validate that parameter's value is not an empty string""" + if isinstance(value, str) and value != "": + return value + raise click.BadParameter("Empty strings are not allowed") + + +@click.command( + name='clone', + cls=HelpColorsCommand, + help_options_color='blue' +) +@click.pass_context +@click.option('--uid', '-u', required=True, type=str, callback=validate_non_empty, help="Record UID to clone") +@click.option('--title', '-t', type=str, help="New record title") +def secret_add_clone_command(ctx, uid, title): + """Add new record by duplicating existing record""" + + ctx.obj["secret"].add_record_from_clone( + uid=uid, + title=title + ) + print("", file=sys.stderr) + + +secret_add_command.add_command(secret_add_clone_command) secret_add_command.add_command(secret_add_field_command) secret_add_command.add_command(secret_add_file_command) secret_add_command.add_command(secret_add_editor_command) @@ -1186,7 +1226,7 @@ def quit_command(): @click.option('--credentials', '-c', type=str, metavar="UID", help="Keeper record with credentials to access destination key/value store.", cls=Mutex, # not_required_if=[('type','json')], - required_if=[('type','azure'), ('type','aws'), ('type','gcp')] + required_if=[('type', 'azure'), ('type', 'aws'), ('type', 'gcp')] ) @click.option('--type', '-t', type=click.Choice(['aws', 'azure', 'gcp', 'json']), default='json', help="Type of the target key/value storage (aws, azure, gcp, json).", show_default=True) @click.option('--dry-run', '-n', is_flag=True, help='Perform a trial run with no changes made.') diff --git a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py index b30cfdbb..e6525ea2 100644 --- a/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py +++ b/integration/keeper_secrets_manager_cli/keeper_secrets_manager_cli/secret.py @@ -14,18 +14,20 @@ import yaml import os import re -from jsonpath_rw_ext import parse import sys from colorama import Fore, Style from pathlib import Path +from jsonpath_rw_ext import parse from keeper_secrets_manager_cli.exception import KsmCliException from keeper_secrets_manager_cli.common import launch_editor -from keeper_secrets_manager_core.core import SecretsManager, KeeperFileUpload +from keeper_secrets_manager_core.core import SecretsManager, CreateOptions, KeeperFolder, KeeperFileUpload from keeper_secrets_manager_core.utils import get_totp_code, generate_password as sdk_generate_password from keeper_secrets_manager_helper.record import Record +from keeper_secrets_manager_helper.v3.record import Record as RecordV3 from keeper_secrets_manager_helper.field_type import FieldType from keeper_secrets_manager_helper.exception import FileSyntaxException from .table import Table, ColumnAlign +from typing import List, Tuple import uuid import tempfile @@ -151,6 +153,7 @@ def _record_to_dict(self, record, load_references=False, unmask=False, use_color "uid": record.uid, "title": record.title, "type": record.type, + "notes": record.dict.get("notes", ""), "fields": standard_fields, "custom_fields": custom_fields, "files": [{ @@ -360,9 +363,23 @@ def _query_jsonpath(self, jsonpath_query, records, force_array, raw): except Exception as err: raise KsmCliException("JSONPath failed: {}".format(err)) - def query(self, uids=None, titles=None, field=None, output_format='json', jsonpath_query=None, - force_array=False, load_references=False, unmask=False, use_color=None, inflate=True, - raw=False): + def _query_jsonpath_list(self, jsonpath_query, records): + record_list = Secret._adjust_records(records, True) + try: + results = [] + jpe = parse(jsonpath_query) + for rec in record_list: + val = jpe.find(rec) + if val and val[0].value: + rec["query_result"] = val[0].value[0] if isinstance(val[0].value, list) else val[0].value + results.append(rec) + return results + except Exception as err: + raise KsmCliException(f"JSONPath failed: {err}") from err + + def query(self, uids=None, folder=None, recursive=False, titles=None, field=None, + output_format='json', jsonpath_query=None, force_array=False, + load_references=False, unmask=False, use_color=None, inflate=True, raw=False): if use_color is None: use_color = self.cli.use_color @@ -391,16 +408,29 @@ def query(self, uids=None, titles=None, field=None, output_format='json', jsonpa if len(secrets) == 0 and fetch_uids is not None: raise KsmCliException("Cannot find requested record(s).") + folders = self.cli.client.get_folders() if folder and recursive else [] + for record in secrets: add_record = False - # If we are searching by title, the fetch_uids was None, we have all the records. We need to filter - # them by the title or uids. - if len(titles) > 0: - if record.title in titles or record.uid in uids: + if folder: + if record.inner_folder_uid == folder or (not record.inner_folder_uid and record.folder_uid == folder): add_record = True + elif recursive and (record.inner_folder_uid or record.folder_uid): + fldr = record.inner_folder_uid or record.folder_uid + fldr = str(fldr) if fldr else '' + while fldr and fldr != folder: + fldr = next((x.parent_uid for x in folders if x.folder_uid == fldr), '') + if fldr == folder: + add_record = True else: - add_record = True + # If we are searching by title, the fetch_uids was None, we have all the records. We need to filter + # them by the title or uids. + if len(titles) > 0: + if record.title in titles or record.uid in uids: + add_record = True + else: + add_record = True if add_record is True: records.append(self._record_to_dict(record, @@ -427,27 +457,50 @@ def query(self, uids=None, titles=None, field=None, output_format='json', jsonpa use_color=use_color) @staticmethod - def _format_list(record_dict, use_color=True): + def _format_list(record_dict, use_color=True, columns=None): table = Table(use_color=use_color) - table.add_column("UID", data_color=Fore.GREEN) - table.add_column("Record Type") - table.add_column("Title", data_color=Fore.YELLOW) - for record in record_dict: - table.add_row([record["uid"], record["type"], record["title"]]) + if columns: + for col in columns: + table.add_column(col[1], data_color=col[2]) + for record in record_dict: + table.add_row([record.get(x[0], "") for x in columns]) + else: + table.add_column("UID", data_color=Fore.GREEN) + table.add_column("Record Type") + table.add_column("Title", data_color=Fore.YELLOW) + for record in record_dict: + table.add_row([record["uid"], record["type"], record["title"]]) return "\n" + table.get_string() + "\n" - def secret_list(self, uids=None, output_format='json', use_color=None): + def secret_list(self, uids=None, folder=None, recursive=False, query=None, show_value=False, output_format='json', use_color=None): if use_color is None: use_color = self.cli.user_color - record_dict = self.query(uids=uids, output_format='dict', unmask=True, use_color=use_color) - if output_format == 'text': - self.cli.output(self._format_list(record_dict, use_color=use_color)) - elif output_format == 'json': - records = [{"uid": x.get("uid"), "title": x.get("title"), "record_type": x.get("type")} - for x in record_dict] - self.cli.output(json.dumps(records, indent=4)) + loadrefs = True if query else False # to load fields[] and custom[] + record_dict = self.query(uids=uids, folder=folder, recursive=recursive, output_format='dict', load_references=loadrefs, unmask=True, use_color=use_color) + if query: + items = self._query_jsonpath_list(query, record_dict) + if output_format == 'text': + columns = [("uid", "UID", Fore.GREEN), ("type", "Record Type", Style.RESET_ALL), ("query_result", "Value", Fore.YELLOW)] if show_value else [] + self.cli.output(self._format_list(items, use_color=use_color, columns=columns)) + elif output_format == 'json': + records = [{ + "uid": x.get("uid", ""), + "record_type": x.get("type", ""), + "title": x.get("title", ""), + "value": x.get("query_result", "")} + for x in items] + if not show_value: + records = [{k: v for k, v in x.items() if k != "value"} for x in records] + self.cli.output(json.dumps(records, indent=4)) + else: + if output_format == 'text': + self.cli.output(self._format_list(record_dict, use_color=use_color)) + elif output_format == 'json': + records = [{"uid": x.get("uid"), "title": x.get("title"), "record_type": x.get("type")} + for x in record_dict] + self.cli.output(json.dumps(records, indent=4)) def upload(self, uid, file, title): @@ -574,7 +627,7 @@ def _split_kv(text, is_json=False, labels=None): return key, value - def update(self, uid, fields=None, custom_fields=None, fields_json=None, custom_fields_json=None): + def update(self, uid, fields=None, custom_fields=None, fields_json=None, custom_fields_json=None, title=None, notes=None): record = self.cli.client.get_secrets(uids=[uid]) if len(record) == 0: @@ -586,6 +639,15 @@ def _get_label(x): label = x.get("type") return label + if title is not None: + record[0].title = str(title) + + if notes is not None: + record[0].dict["notes"] = str(notes) + + if title is not None or notes is not None: + record[0]._update() + # Get a list of all labels/type allowed. labels = { "field": [_get_label(x) for x in record[0].dict.get("fields", [])], @@ -725,11 +787,13 @@ def add_record_from_file(self, folder_uid, file, password_generate_flag): self._check_if_can_add_records() try: + folders = self.cli.client.get_folders() records = Record.create_from_file(file, password_generate=password_generate_flag) record_uids = [] for record in records: record_create_obj = record.get_record_create_obj() - record_uid = self.cli.client.create_secret(folder_uid, record_create_obj) + folder_options, folders = self.build_folder_options(folder_uid, folders) + record_uid = self.cli.client.create_secret_with_options(folder_options, record_create_obj, folders) record_uids.append(record_uid) except FileSyntaxException as err: raise KsmCliException(str(err)) @@ -754,13 +818,72 @@ def add_record_from_field_args(self, version, folder_uid, password_generate_flag ) record = records[0] record_create_obj = record.get_record_create_obj() - record_uid = self.cli.client.create_secret(folder_uid, record_create_obj) + + folder_options, folders = self.build_folder_options(folder_uid) + record_uid = self.cli.client.create_secret_with_options(folder_options, record_create_obj, folders) except Exception as err: raise KsmCliException(f"{err}") print("The following is the new record UID ...", file=sys.stderr) return self.cli.output(record_uid) + def add_record_from_clone(self, uid: str, title: str): + + record_uid = '' # new record UID + self._check_if_can_add_records() + + try: + recs = self.cli.client.get_secrets([uid]) or [] + if recs: + rec = recs[0] + if rec.folder_uid: + folder_options = CreateOptions(rec.folder_uid, rec.inner_folder_uid) + record_data = { + "version": "v3", + "kind": "KeeperRecord", + "data": [{ + "recordType": rec.type, + "title": title if title else rec.title, + "notes": rec.dict.get("notes", ""), + "fields": rec.dict.get("fields", []), + "customFields": rec.dict.get("custom", []) + }] + } + records = RecordV3.create_from_data(record_data) + record = records[0] + record_create_obj = record.get_record_create_obj() + record_uid = self.cli.client.create_secret_with_options(folder_options, record_create_obj) + else: + print(f"Unable to find the parent shared folder for record {uid} - individually shared records cannot be cloned.", file=sys.stderr) + else: + print(f"Record UID not found {uid}", file=sys.stderr) + except Exception as err: + raise KsmCliException(f"{err}") + + if record_uid: + print("The following is the new record UID ...", file=sys.stderr) + return self.cli.output(record_uid) + + def build_folder_options(self, folder_uid: str, folders: List[KeeperFolder] = []) -> Tuple[CreateOptions, List[KeeperFolder]]: + """ Build and return folder create options and folders list """ + + # find closest shared folder parent + if not folders: + folders = self.cli.client.get_folders() or [] + + shared_folder = next((x for x in folders if x.folder_uid == folder_uid), None) + while shared_folder and shared_folder.parent_uid: + shared_folder = next((x for x in folders if x.folder_uid == shared_folder.parent_uid), shared_folder) + + if shared_folder is None: + raise KsmCliException(f'Unable to find the shared folder for {folder_uid}') + if not shared_folder.folder_key: + raise KsmCliException(f'Unable to find folder key for folder {shared_folder.folder_uid}') + + # create folder options + create_options = CreateOptions(shared_folder.folder_uid, folder_uid) + return create_options, folders + def generate_password(self, length, lowercase, uppercase, digits, special_characters): new_password = sdk_generate_password( diff --git a/integration/keeper_secrets_manager_cli/requirements.txt b/integration/keeper_secrets_manager_cli/requirements.txt index 448d3553..b536d2fc 100644 --- a/integration/keeper_secrets_manager_cli/requirements.txt +++ b/integration/keeper_secrets_manager_cli/requirements.txt @@ -1,4 +1,4 @@ -keeper-secrets-manager-core>=16.6.2 +keeper-secrets-manager-core>=16.6.4 keeper-secrets-manager-helper keeper-secrets-manager-storage>=1.0.2 prompt-toolkit~=2.0 diff --git a/integration/keeper_secrets_manager_cli/setup.py b/integration/keeper_secrets_manager_cli/setup.py index f10c308b..e353acb0 100644 --- a/integration/keeper_secrets_manager_cli/setup.py +++ b/integration/keeper_secrets_manager_cli/setup.py @@ -8,9 +8,9 @@ long_description = f.read() install_requires = [ - 'keeper-secrets-manager-core>=16.5.4', + 'keeper-secrets-manager-core>=16.6.4', 'keeper-secrets-manager-helper', - 'keeper-secrets-manager-storage>=1.0.1', + 'keeper-secrets-manager-storage>=1.0.2', 'prompt-toolkit~=2.0', 'jsonpath-rw-ext', 'colorama', @@ -27,7 +27,7 @@ # Version set in the keeper_secrets_manager_cli.version file. setup( name="keeper-secrets-manager-cli", - version="1.1.1", + version="1.1.3", description="Command line tool for Keeper Secrets Manager", long_description=long_description, long_description_content_type="text/markdown",