diff --git a/suit_generator/args.py b/suit_generator/args.py index 30b6020..26be64f 100644 --- a/suit_generator/args.py +++ b/suit_generator/args.py @@ -16,6 +16,7 @@ from suit_generator.cmd_convert import add_arguments as convert_args from suit_generator.cmd_mpi import add_arguments as mpi_args from suit_generator.cmd_cache_create import add_arguments as cache_create_args +from suit_generator.cmd_payload_extract import add_arguments as payload_extract_args def _parser() -> ArgumentParser: @@ -30,6 +31,7 @@ def _parser() -> ArgumentParser: convert_args(subparsers) mpi_args(subparsers) cache_create_args(subparsers) + payload_extract_args(subparsers) return parser diff --git a/suit_generator/cli.py b/suit_generator/cli.py index 6e51582..6518e94 100644 --- a/suit_generator/cli.py +++ b/suit_generator/cli.py @@ -11,7 +11,17 @@ sys.path.insert(0, str(pathlib.Path(__file__).parents[1].absolute())) -from suit_generator import cmd_parse, cmd_keys, cmd_convert, cmd_create, cmd_image, cmd_mpi, cmd_cache_create, args +from suit_generator import ( + cmd_parse, + cmd_keys, + cmd_convert, + cmd_create, + cmd_image, + cmd_mpi, + cmd_cache_create, + cmd_payload_extract, + args, +) from suit_generator.exceptions import GeneratorError, SUITError import logging @@ -30,6 +40,7 @@ cmd_image.ImageCreator.IMAGE_CMD: cmd_image.main, cmd_mpi.MPI_CMD: cmd_mpi.main, cmd_cache_create.CACHE_CREATE_CMD: cmd_cache_create.main, + cmd_payload_extract.PAYLOAD_EXTRACT_CMD: cmd_payload_extract.main, } diff --git a/suit_generator/cmd_payload_extract.py b/suit_generator/cmd_payload_extract.py new file mode 100644 index 0000000..6b9fa9b --- /dev/null +++ b/suit_generator/cmd_payload_extract.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2024 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# +"""CMD_PAYLOAD_EXTRACT CLI command entry point.""" + +import cbor2 +import logging + +log = logging.getLogger(__name__) + +PAYLOAD_EXTRACT_CMD = "payload_extract" + + +def add_arguments(parser): + """Add additional arguments to the passed parser.""" + cmd_payload_extract_arg_parser = parser.add_parser(PAYLOAD_EXTRACT_CMD, help="Create raw cache structure.") + + cmd_payload_extract_arg_parser.add_argument("--input-envelope", required=True, help="Input envelope file path.") + cmd_payload_extract_arg_parser.add_argument("--output-envelope", required=True, help="Output envelope file path.") + cmd_payload_extract_arg_parser.add_argument( + "--payload-name", required=True, help="Name of the integrated payload to extract." + ) + cmd_payload_extract_arg_parser.add_argument( + "--output-payload-file", + required=False, + help="Output payload file path to store the extracted payload." + + "If not provided, the payload will not be stored to a file.", + ) + + cmd_payload_extract_arg_parser.add_argument( + "--payload-replace-path", + help="Path to the integrated payload to replace the extracted payload with." + + "If not provided, the payload will be removed from the envelope.", + ) + + +def main( + input_envelope: str, output_envelope: str, payload_name: str, output_payload_file: str, payload_replace_path: str +) -> None: + """Extract an integrated payload from a SUIT envelope. + + :param input_envelope: input envelope file path + :param output_envelope: output envelope file path + :param payload_name: name of the integrated payload to extract + :param output_payload_file: output file path to store the extracted payload + None if the payload should not be stored to a file + :param payload_replace_path: Path to the integrated payload to replace the extracted payload with. + None if the payload should be removed from the envelope. + """ + with open(input_envelope, "rb") as fh: + envelope = cbor2.load(fh) + extracted_payload = envelope.value.pop(payload_name, None) + + if extracted_payload is None: + log.log(logging.ERROR, 'Payload "%s" not found in envelope', payload_name) + + if payload_replace_path is not None: + with open(payload_replace_path, "rb") as fh: + envelope.value[payload_name] = fh.read() + + with open(output_envelope, "wb") as fh: + cbor2.dump(envelope, fh) + if output_payload_file is not None: + with open(output_payload_file, "wb") as fh: + fh.write(extracted_payload)