From 655ec63d4190cbe96030ad9268856540dcd2b883 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Mon, 22 Nov 2021 13:48:30 -0600 Subject: [PATCH 1/2] - Added support for service generated CSR on VaaS platform. - Added required tests. --- examples/get_cert.py | 2 +- examples/get_cert27.py | 2 +- .../get_cert_tpp_token.py} | 2 +- .../get_service_gen_cert_tpp.py} | 2 +- examples/{ => tpp}/set_policy_tpp_token.py | 2 +- examples/vaas/get_service_gen_cert_vaas.py | 89 ++++++ examples/{ => vaas}/set_policy_vaas.py | 2 +- requirements.txt | 2 + tests/test_vaas.py | 73 ++++- vcert/common.py | 117 ++++--- vcert/connection_cloud.py | 292 +++++++++++++++--- vcert/pem.py | 27 +- vcert/policy/pm_cloud.py | 13 +- vcert/vaas_utils.py | 133 ++++++++ 14 files changed, 636 insertions(+), 122 deletions(-) rename examples/{get_cert_token.py => tpp/get_cert_tpp_token.py} (99%) rename examples/{get_cert_service_tpp.py => tpp/get_service_gen_cert_tpp.py} (99%) rename examples/{ => tpp}/set_policy_tpp_token.py (99%) create mode 100644 examples/vaas/get_service_gen_cert_vaas.py rename examples/{ => vaas}/set_policy_vaas.py (99%) create mode 100644 vcert/vaas_utils.py diff --git a/examples/get_cert.py b/examples/get_cert.py index 02714cd..95c0f20 100644 --- a/examples/get_cert.py +++ b/examples/get_cert.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2019 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/get_cert27.py b/examples/get_cert27.py index 043edfe..2c4fccb 100644 --- a/examples/get_cert27.py +++ b/examples/get_cert27.py @@ -1,6 +1,6 @@ #!/usr/bin/env python2 # -# Copyright 2019 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/get_cert_token.py b/examples/tpp/get_cert_tpp_token.py similarity index 99% rename from examples/get_cert_token.py rename to examples/tpp/get_cert_tpp_token.py index bbf16eb..3192dcb 100644 --- a/examples/get_cert_token.py +++ b/examples/tpp/get_cert_tpp_token.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2020 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/get_cert_service_tpp.py b/examples/tpp/get_service_gen_cert_tpp.py similarity index 99% rename from examples/get_cert_service_tpp.py rename to examples/tpp/get_service_gen_cert_tpp.py index eb48923..ad273b7 100644 --- a/examples/get_cert_service_tpp.py +++ b/examples/tpp/get_service_gen_cert_tpp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2019 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/set_policy_tpp_token.py b/examples/tpp/set_policy_tpp_token.py similarity index 99% rename from examples/set_policy_tpp_token.py rename to examples/tpp/set_policy_tpp_token.py index d25c9fe..391a329 100644 --- a/examples/set_policy_tpp_token.py +++ b/examples/tpp/set_policy_tpp_token.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2019 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/vaas/get_service_gen_cert_vaas.py b/examples/vaas/get_service_gen_cert_vaas.py new file mode 100644 index 0000000..6073f7f --- /dev/null +++ b/examples/vaas/get_service_gen_cert_vaas.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 Venafi, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from vcert import (CertificateRequest, venafi_connection, CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST) +import string +import random +import logging +from os import environ + +logging.basicConfig(level=logging.INFO) +logging.getLogger("urllib3").setLevel(logging.ERROR) + + +def main(): + # Get credentials from environment variables + url = environ.get('VAAS_URL') # Optional, only use when connecting to a specific VaaS server + api_key = environ.get('VAAS_APIKEY') + zone = environ.get("VAAS_ZONE") + + # Connection will be chosen automatically based on which arguments are passed. + # If api_key is passed, Venafi Cloud connection will be used. + # url attribute is no required when connecting to production VaaS platform + conn = venafi_connection(url=url, api_key=api_key) + + # Build a Certificate request + request = CertificateRequest(common_name=random_word(10) + ".venafi.example.com") + # Set the request to use a service generated CSR + request.csr_origin = CSR_ORIGIN_SERVICE + # A password should be defined for the private key to be generated. + request.key_password = 'Foo.Bar.Pass.123!' + # Include some Subject Alternative Names + request.san_dns = ["www.dns.venafi.example.com", "ww1.dns.venafi.example.com"] + # Additional CSR attributes can be included: + request.organization = "Venafi, Inc." + request.organizational_unit = ["Product Management"] + request.locality = "Salt Lake City" + request.province = "Utah" # This is the same as state + request.country = "US" + + # Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first") + # or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST. + # request.chain_option = CHAIN_OPTION_FIRST + # + # To set Custom Fields for the certificate, specify an array of CustomField objects as name-value pairs + # request.custom_fields = [ + # CustomField(name="Cost Center", value="ABC123"), + # CustomField(name="Environment", value="Production"), + # CustomField(name="Environment", value="Staging") + # ] + # + # Request the certificate. + conn.request_cert(request, zone) + # Wait for the certificate to be retrieved. + # This operation may take some time to return, as it waits until the certificate is ISSUED or it timeout. + # Timeout is 180s by default. Can be changed using: + # request.timeout = 300 + cert = conn.retrieve_cert(request) + + # Print the certificate + print(cert.full_chain) + # Save it into a file + f = open("./cert.pem", "w") + f.write(cert.full_chain) + f.close() + + +def random_word(length): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for i in range(length)) + + +if __name__ == '__main__': + main() diff --git a/examples/set_policy_vaas.py b/examples/vaas/set_policy_vaas.py similarity index 99% rename from examples/set_policy_vaas.py rename to examples/vaas/set_policy_vaas.py index 9beb633..6384ac0 100644 --- a/examples/set_policy_vaas.py +++ b/examples/vaas/set_policy_vaas.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2019 Venafi, Inc. +# Copyright 2022 Venafi, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/requirements.txt b/requirements.txt index da6a067..ff2d830 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ ipaddress;python_version<'3.3' enum34;python_version<'3.4' future ruamel.yaml<0.17 +pynacl>=1.4.0 +pycryptodome>=3.11.0 diff --git a/tests/test_vaas.py b/tests/test_vaas.py index 83bcf90..ac31697 100644 --- a/tests/test_vaas.py +++ b/tests/test_vaas.py @@ -22,13 +22,48 @@ from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes +from cryptography.x509.oid import NameOID -from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL +from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, simple_enroll -from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger +from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE +from vcert.pem import pkcs8_to_pem_private_key log = logger.get_child("test-vaas") +p8_key = """ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIPLsOsD8egf4CAicQ +MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBCGU0yCgxPiFpL/l+F5/wmzBIIE +0B2QHY6GoIj204ovABzvhgu6DPt3qvMtxWUhparQoOirf6IWpgPs5yIEVYzm33vb +I0yWb4DTTLQc0k+s1e1whDkhEDyeZ0GGHzHu2LHnsLLUKbUW9wsod9GlQ61IACnr +i8ehxgAAyYAB/PIcwpuF+nzyRHx9bud/916DYQ7Y/DWmCpSHB1/O9vkY1RZJjOqc +XrmzVqL+FBWjPzXk5FfWkRoVIUWsB/yWaP4ZYb5o8xgcAvvyXeofhum9vmiRlRB+ +ii6SH7lgFE7BL1qZPnNCjFeBbDv9OryR1h3FbGnNaKJGOrlA1sirg0lMyi2zsaBe +M0B8y8AVnU8q5JnToIFFo4BnimK7jXPspQ/opu9IaZDWKf3wbwUiC+IfytlelVpT +lMTLvYPPypsjqhInDRrPbdlmx1WN9bfHdkwzRm3x4UuAKTcQKX/5s8AdNDTRx4Kv +UZ2wLylEQcCWYWm3m+YL0PcsnUX301dmKHGG0ub/CwIFO1GYI9+Eb1azsS3h+fx7 +Ec4rOzZ4Q5h1HWnV3P7CVqyq4hSqJ3f7DMThCgW0up2woCMZnZqQcg4+VUYH1oFg +YvrCV0N4W9woHWS6v0HDhMAR9HadUAvDetljrp1ygiPGAe+giNF9AZ+7+MTVwT/M +YEcDzxCrKWQ57KdxnZL2cVELx0pihmqEs0jvh++YShszE39S/Pk58BqFLaS+/eAy +42fXlih2FE+Pj5dTrxY3wY759SOZy+AlHytd3PkYHvCd7qgYTCUo+y8Gd2tIVW2g +pwx59953QhCoyPFMvm97pkHi9IMLLoBobdngV2FKzj3lch1V8iujqNdA8W0Zny0S +6KQgSn6GvW/EVVVIckS41uoKxTJVnCNsI8jpBa4/bUvZzx8s6gDHSZqTFgh+jssu +8rI8nGRsFa3+ynoR3rFcaRFi733BjPHdCYlEYLxfPwhpQ5wYAU2NCMJbCkiakPSR +ywNbIhxJhdmhD8zbNifLaXUB/iFhbW4e+QcZZNo8im/ty0J3OSj9OqNIAAP8k7CV +MdQbI4yu09hDPKIw7YBS+R5pmOjiuQOL4mzeOb8MN4i4AHCUiH/K63pVDqkT1yNM +rIIFjljg1loosubHTU59vWKE/OPuY+BFviK49rw0xGyPdHECgkpS6/CPfzIEkr8U +RsNxRVW/fjTdSw3YaqlrTNEN6tLuddq2R/rMvyXlzhcGB2H81V8ZgJ4bqTgfUdH4 +iAv49PCCIClPQYD4W1HzuSFlNwT4Cy29QgSjw0bHFmvmNvfInidBH5DoJeMovMsy +OROtIuCG0QZjfIcsreU7gcbUvwPNB+nQaDA3IA7fkYmE1xvj38YMIimDRWFKN5Q6 +f67kAGgkFcBlKGh6J+iGNIMscGkRbPRlNHtefE/vaAMHNUBfNxuVk6ylf2Hj2YC9 +gXSp4S0pq5RUvt8KPzeba0mtNlmuFSK9ZfOOu/eBIGvHwA7+HWG4ogTpER1IXbnE +ZzcdVwYponiGL/dtKZIyibxxEUOHjoM9XyoopE9wFq/kQXEgVDCFLdyPAxFS7WA+ +NRqtgX8X41i/zQ72ZvM+bHrq2gk2OnDJ4jyDTBLBQezdOX4rLrWvzIcqh7hmWC1L +KrcsYl3EZcK4zmMgSTTCgEJGKJsgClqUh6TS7atxgIjr +-----END ENCRYPTED PRIVATE KEY----- +""" + class TestCloudMethods(unittest.TestCase): def __init__(self, *args, **kwargs): @@ -135,3 +170,37 @@ def test_cloud_enroll_valid_hours(self): "Expected_delta: %s seconds." % (expected_date.strftime(date_format), expiration_date.strftime(date_format), delta.total_seconds())) + + def test_cloud_enroll_service_generated_csr(self): + cn = random_word(10) + ".venafi.example.com" + password = 'FooBarPass123' + + request = CertificateRequest( + common_name=cn, + key_password=password, + country='US' + ) + + request.san_dns = ["www.client.venafi.example.com", "ww1.client.venafi.example.com"] + request.csr_origin = CSR_ORIGIN_SERVICE + + self.cloud_conn.request_cert(request, self.cloud_zone) + cert_object = self.cloud_conn.retrieve_cert(request) + + cert = x509.load_pem_x509_certificate(cert_object.cert.encode(), default_backend()) + assert isinstance(cert, x509.Certificate) + t1 = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + t2 = [ + x509.NameAttribute( + NameOID.COMMON_NAME, cn or RANDOM_DOMAIN + ) + ] + assert t1 == t2 + + output = cert_object.as_pkcs12('FooBarPass123') + log.info("PKCS12 created successfully:\n%s" % output) + + def test_cloud_parse_key_p8_to_p12(self): + passphrase = 'FooBarPass123' + pem_pk = pkcs8_to_pem_private_key(self.p8_key, passphrase) + log.info("PEM Private Key is: %s" % pem_pk) diff --git a/vcert/common.py b/vcert/common.py index 913179a..9d6b1f6 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -40,19 +40,20 @@ from .ssh_utils import SSHCertRequest, SSHRetrieveResponse, SSHCATemplateRequest, SSHConfig from .tpp_utils import IssuerHint -MIME_JSON = "application/json" -MIME_HTML = "text/html" -MIME_TEXT = "text/plain" -MIME_CSV = "text/csv" -MIME_ANY = "*/*" -LOCALHOST = "127.0.0.1" +MIME_JSON = 'application/json' +MIME_HTML = 'text/html' +MIME_TEXT = 'text/plain' +MIME_CSV = 'text/csv' +MIME_ANY = '*/*' +MIME_OCTET_STREAM = 'application/octet-stream' +LOCALHOST = '127.0.0.1' DEFAULT_TIMEOUT = 180 -CSR_ORIGIN_PROVIDED = "provided" -CSR_ORIGIN_LOCAL = "local" -CSR_ORIGIN_SERVICE = "service" -CHAIN_OPTION_FIRST = "first" -CHAIN_OPTION_LAST = "last" -CHAIN_OPTION_IGNORE = "ignore" +CSR_ORIGIN_PROVIDED = 'provided' +CSR_ORIGIN_LOCAL = 'local' +CSR_ORIGIN_SERVICE = 'service' +CHAIN_OPTION_FIRST = 'first' +CHAIN_OPTION_LAST = 'last' +CHAIN_OPTION_IGNORE = 'ignore' class CertField: @@ -221,33 +222,32 @@ def __str__(self): return self.name -class RecommendedSettings: - def __init__(self, subject_o_value=None, subject_ou_value=None, subject_l_value=None, subject_st_value=None, - subject_c_value=None, key_type=None, key_reuse=None): - """ - :param str subject_o_value: - :param str subject_ou_value: - :param str subject_l_value: - :param str subject_st_value: - :param str subject_c_value: - :param KeyType key_type: - :param bool key_reuse: - """ - self.subjectOValue = subject_o_value - self.subjectOUValue = subject_ou_value - self.subjectLValue = subject_l_value - self.subjectSTValue = subject_st_value - self.subjectCValue = subject_c_value - self.keyType = key_type - self.keyReuse = key_reuse - - class CertificateRequest: - def __init__(self, cert_id=None, san_dns=None, email_addresses="", ip_addresses=None, user_principal_names=None, - uniform_resource_identifiers=None, attributes=None, key_type=None, private_key=None, key_password=None, - csr=None, friendly_name=None, common_name=None, thumbprint=None, organization=None, - organizational_unit=None, country=None, province=None, locality=None, origin=None, custom_fields=None, - timeout=DEFAULT_TIMEOUT, csr_origin=CSR_ORIGIN_LOCAL, include_private_key=False, validity_hours=None, + def __init__(self, cert_id=None, + san_dns=None, + email_addresses="", + ip_addresses=None, + user_principal_names=None, + uniform_resource_identifiers=None, + attributes=None, + key_type=None, + private_key=None, + key_password=None, + csr=None, + friendly_name=None, + common_name=None, + thumbprint=None, + organization=None, + organizational_unit=None, + country=None, + province=None, + locality=None, + origin=None, + custom_fields=None, + timeout=DEFAULT_TIMEOUT, + csr_origin=CSR_ORIGIN_LOCAL, + include_private_key=False, + validity_hours=None, issuer_hint=IssuerHint.DEFAULT): """ :param str cert_id: Certificate request id. Generating by server. @@ -259,11 +259,16 @@ def __init__(self, cert_id=None, san_dns=None, email_addresses="", ip_addresses= :param attributes: :param KeyType key_type: Type of asymmetric cryptography algorithm. Default is RSA 2048. :param asymmetric.PrivateKey private_key: String with pem encoded private key or asymmetric.PrivateKey - :param str key_password: Password for encrypted private key. Not supported at this moment. + :param str key_password: Password for encrypted private key. :param str csr: Certificate Signing Request in pem format :param str friendly_name: Name for certificate in the platform. If not specified common name will be used. :param str common_name: Common name of certificate. Usually domain name. :param str thumbprint: Certificate thumbprint. Can be used for identifying certificate on the platform. + :param organization: + :param organizational_unit: + :param country: + :param province: + :param locality: :param str origin: application identifier :param list[CustomField] custom_fields: list of custom fields values to be added to the certificate. :param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds. @@ -582,32 +587,6 @@ def __init__(self, user=None, password=None, access_token=None, refresh_token=No self.state = state -class AppDetails: - def __init__(self, app_id=None, cit_map=None, company_id=None, name=None, description=None, - owner_ids_and_types=None, fq_dns=None, internal_fq_dns=None, external_ip_ranges=None, - internal_ip_ranges=None, internal_ports=None, fully_qualified_domain_names=None, ip_ranges=None, - ports=None, org_unit_id=None): - """ - :param str app_id: - :param dict cit_map: - """ - self.app_id = app_id - self.cit_alias_id_map = cit_map - self.company_id = company_id - self.name = name - self.description = description - self.owner_ids_and_types = owner_ids_and_types - self.fq_dns = fq_dns - self.internal_fq_dns = internal_fq_dns - self.external_ip_ranges = external_ip_ranges - self.internal_ip_ranges = internal_ip_ranges - self.internal_ports = internal_ports - self.fully_qualified_domain_names = fully_qualified_domain_names - self.ip_ranges = ip_ranges - self.ports = ports - self.org_unit_id = org_unit_id - - class CommonConnection: def auth(self): @@ -702,6 +681,11 @@ def retrieve_ssh_config(self, ca_request): @staticmethod def process_server_response(r): + """ + + :param requests.Response r: + :rtype: str or dict + """ if r.status_code not in (HTTPStatus.OK, HTTPStatus.ACCEPTED, HTTPStatus.CREATED, HTTPStatus.CONFLICT): try: log_errors(r.json()) @@ -727,6 +711,9 @@ def process_server_response(r): elif content_type.startswith(MIME_CSV): log.debug(r.content.decode()) return r.status_code, r.content.decode() + elif content_type.startswith(MIME_OCTET_STREAM): + log.debug(r.content) + return r.status_code, r.content else: log.error("Unexpected content type: %s for request %s" % (content_type, r.request.url)) raise ServerUnexptedBehavior diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index 6df2214..bd5027b 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -17,24 +17,45 @@ from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes, with_statement) -import logging as log +import base64 import re import time from pprint import pprint import requests import six.moves.urllib.parse as urlparse +from nacl.public import SealedBox from .common import (ZoneConfig, CertificateRequest, CommonConnection, Policy, get_ip_address, log_errors, MIME_JSON, - MIME_TEXT, MIME_ANY, CertField, KeyType, AppDetails, RecommendedSettings, DEFAULT_TIMEOUT) + MIME_TEXT, MIME_ANY, CertField, KeyType, DEFAULT_TIMEOUT, + CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_LAST) from .errors import (VenafiConnectionError, ServerUnexptedBehavior, ClientBadData, CertificateRequestError, CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError) from .http import HTTPStatus -from .pem import parse_pem -from .policy.pm_cloud import build_policy_spec, validate_policy_spec, \ - AccountDetails, build_cit_request, build_user, UserDetails, build_company, build_apikey, build_app_update_request, \ - get_ca_info, CertificateAuthorityDetails, CertificateAuthorityInfo, build_account_details, \ - build_app_create_request +from .logger import get_child +from .pem import parse_pem, pkcs8_to_pem_private_key, Certificate +from .policy import PolicySpecification +from .policy.pm_cloud import (build_policy_spec, validate_policy_spec, AccountDetails, build_cit_request, build_user, + UserDetails, build_company, build_apikey, build_app_update_request, get_ca_info, + CertificateAuthorityDetails, CertificateAuthorityInfo, build_account_details, + build_app_create_request) +from .vaas_utils import AppDetails, RecommendedSettings, EdgeEncryptionKey, zip_to_pem, value_matches_regex + +TOKEN_HEADER_NAME = "tppl-api-key" # nosec +APPLICATION_SERVER_TYPE_ID = "784938d1-ef0d-11eb-9461-7bb533ba575b" +MSG_VALUE_NOT_MATCH_POLICY = "Error while requesting certificate using service generated CSR on VaaS. " \ + "Request %s does not match CIT valid %s:\n\tRequest value: %s,\n\tCIT values: %s" + +CSR_ATTR_CN = 'commonName' +CSR_ATTR_ORG = 'organization' +CSR_ATTR_ORG_UNIT = 'organizationalUnits' +CSR_ATTR_LOCALITY = 'locality' +CSR_ATTR_PROVINCE = 'state' +CSR_ATTR_COUNTRY = 'country' +CSR_ATTR_SANS_BY_TYPE = 'subjectAlternativeNamesByType' +CSR_ATTR_SANS_DNS = 'dnsNames' + +log = get_child("connection-vaas") class CertStatuses: @@ -65,11 +86,13 @@ def __init__(self): CERTIFICATE_TEMPLATE_BY_ID = APP_BY_ID + "/certificateissuingtemplates/%s" APP_DETAILS_BY_NAME = APPLICATIONS + "/name/%s" CERTIFICATE_BY_ID = API_BASE_PATH + "certificates/%s" + CERTIFICATE_KEYSTORE_BY_ID = CERTIFICATE_BY_ID + "/keystore" CA_ACCOUNTS = API_VERSION + "certificateauthorities/%s/accounts" CA_ACCOUNT_DETAILS = CA_ACCOUNTS + "/%s" ISSUING_TEMPLATES = API_VERSION + "certificateissuingtemplates" ISSUING_TEMPLATES_UPDATE = ISSUING_TEMPLATES + "/%s" USER_ACCOUNTS = API_VERSION + "useraccounts" + DEK_PUBLIC_KEY = API_VERSION + "edgeencryptionkeys/%s" class CondorChainOptions: @@ -80,9 +103,6 @@ def __init__(self): ROOT_LAST = "EE_FIRST" -TOKEN_HEADER_NAME = "tppl-api-key" # nosec - - class CertificateStatusResponse: def __init__(self, d): self.status = d.get('status') or d.get("certificateStatus") @@ -122,33 +142,53 @@ def __str__(self): return "[Cloud] %s" % self._base_url def _get(self, url, params=None): - r = requests.get(self._base_url + url, params=params, - headers={TOKEN_HEADER_NAME: self._token, "Accept": MIME_ANY, "cache-control": "no-cache"}, - **self._http_request_kwargs - ) + """ + + :param url: + :param params: + :rtype: str or dict + """ + headers = { + TOKEN_HEADER_NAME: self._token, + 'accept': MIME_ANY, + 'cache-control': "no-cache" + } + r = requests.get(self._base_url + url, params=params, headers=headers, **self._http_request_kwargs) return self.process_server_response(r) def _post(self, url, data=None): + """ + + :param url: + :param data: + :rtype: str or dict + """ + headers = { + TOKEN_HEADER_NAME: self._token, + 'accept': MIME_JSON, + 'cache-control': "no-cache" + } if isinstance(data, dict): - r = requests.post(self._base_url + url, json=data, - headers={TOKEN_HEADER_NAME: self._token, - "cache-control": "no-cache", - "Accept": MIME_JSON}, - **self._http_request_kwargs - ) + r = requests.post(self._base_url + url, json=data, headers=headers, **self._http_request_kwargs) else: log.error("Unexpected client data type: %s for %s" % (type(data), url)) raise ClientBadData return self.process_server_response(r) def _put(self, url, data=None): + """ + + :param url: + :param data: + :rtype:str or dict + """ + headers = { + TOKEN_HEADER_NAME: self._token, + 'cache-control': "no-cache", + 'accept': MIME_JSON + } if isinstance(data, dict): - r = requests.put(self._base_url + url, json=data, - headers={TOKEN_HEADER_NAME: self._token, - "cache-control": "no-cache", - "Accept": MIME_JSON}, - **self._http_request_kwargs - ) + r = requests.put(self._base_url + url, json=data, headers=headers, **self._http_request_kwargs) else: log.error("Unexpected client data type: %s for %s" % (type(data), url)) raise ClientBadData @@ -307,12 +347,9 @@ def request_cert(self, request, zone): app_name, cit_alias = _parse_zone(zone) details = self._get_app_details_by_name(app_name) cit_id = details.cit_alias_id_map.get(cit_alias) - if not request.csr: - request.build_csr() ip_address = get_ip_address() request_data = { - 'certificateSigningRequest': request.csr, 'applicationId': details.app_id, 'certificateIssuingTemplateId': cit_id, 'apiClientInformation': { @@ -320,6 +357,15 @@ def request_cert(self, request, zone): 'identifier': ip_address } } + if request.csr_origin != CSR_ORIGIN_SERVICE: + if not request.csr: + request.build_csr() + request_data['certificateSigningRequest'] = request.csr + else: + request_data['isVaaSGenerated'] = True + request_data['applicationServerTypeId'] = APPLICATION_SERVER_TYPE_ID + request_data['csrAttributes'] = self._get_service_generated_csr_attr(request, zone) + if request.validity_hours is not None: request_data['validityPeriod'] = "PT%dH" % request.validity_hours @@ -338,13 +384,18 @@ def retrieve_cert(self, request): log.info("Certificate status is %s." % cert_status.status) return None elif cert_status.status == CertStatuses.FAILED: - log.debug("Status is %s. Returning data for debug" % cert_status.status) + log.debug("Certificate status is %s. Returning data for debug" % cert_status.status) return "Certificate FAILED" elif cert_status.status == CertStatuses.ISSUED: - url = URLS.CERTIFICATE_RETRIEVE % cert_status.certificateIds[0] - if request.chain_option == "first": + request.cert_guid = cert_status.certificateIds[0] + dek_info = self._get_dek_hash(request.cert_guid) + if dek_info and dek_info.public_key: + return self._retrieve_service_generated_cert(request, dek_info) + + url = URLS.CERTIFICATE_RETRIEVE % request.cert_guid + if request.chain_option == CHAIN_OPTION_FIRST: url += "?chainOrder=%s&format=PEM" % CondorChainOptions.ROOT_FIRST - elif request.chain_option == "last": + elif request.chain_option == CHAIN_OPTION_LAST: url += "?chainOrder=%s&format=PEM" % CondorChainOptions.ROOT_LAST else: log.error("chain option %s is not valid" % request.chain_option) @@ -505,16 +556,7 @@ def import_cert(self, request): raise NotImplementedError def get_policy(self, zone): - cit = self._get_template_by_id(zone) - if not cit: - raise VenafiError('Certificate issuing template not found for zone [%s]', zone) - - info = self._get_ca_info(cit.cert_authority, cit.cert_authority_account_id, cit.cert_authority_product_option_id) - if not info: - raise VenafiError('Certificate Authority info not found.') - - ps = build_policy_spec(cit, info) - return ps + return self._get_policy(zone=zone, subject_cn_to_str=True) def _policy_exists(self, zone): """ @@ -660,3 +702,167 @@ def _get_ca_info(self, name, account_id, product_option_id): info.vendor_name = po.product_name return info + + def _get_service_generated_csr_attr(self, request, zone): + """ + + :param CertificateRequest request: + :param str zone: + :rtype: dict[str, Any] + """ + ps = self._get_policy(zone=zone, subject_cn_to_str=False) + csr_attr_map = {} + + if request.common_name: + if ps.policy: + policy_domains = ps.policy.domains + valid = value_matches_regex(value=request.common_name, pattern_list=policy_domains) + if not valid: + log.error(MSG_VALUE_NOT_MATCH_POLICY % ("Common Name", "domains", request.common_name, + ps.policy.domains)) + raise ClientBadData() + csr_attr_map[CSR_ATTR_CN] = request.common_name + + if request.organization: + if ps.policy and ps.policy.subject: + policy_orgs = ps.policy.subject.orgs + valid = value_matches_regex(value=request.organization,pattern_list=policy_orgs) + if not valid: + org_str = "Organization" + log.error(MSG_VALUE_NOT_MATCH_POLICY % (org_str, org_str+"s", request.organization, policy_orgs)) + raise ClientBadData + csr_attr_map[CSR_ATTR_ORG] = request.organization + elif ps.defaults and ps.defaults.subject and ps.defaults.subject.org: + csr_attr_map[CSR_ATTR_ORG] = ps.defaults.subject.org + + if request.organizational_unit: + if ps.policy and ps.policy.subject: + policy_ous = ps.policy.subject.org_units + valid = value_matches_regex(value=request.organizational_unit, pattern_list=policy_ous) + if not valid: + ou_str = "Organizational Unit" + log.error(MSG_VALUE_NOT_MATCH_POLICY % (ou_str, ou_str+"s", request.organizational_unit, + policy_ous)) + raise ClientBadData + csr_attr_map[CSR_ATTR_ORG_UNIT] = request.organizational_unit + elif ps.defaults and ps.defaults.subject and ps.defaults.subject.org_units: + csr_attr_map[CSR_ATTR_ORG_UNIT] = ps.defaults.subject.org_units + + if request.locality: + if ps.policy and ps.policy.subject: + policy_localities = ps.policy.subject.localities + valid = value_matches_regex(value=request.locality, pattern_list=policy_localities) + if not valid: + locality_str = "Localit" + log.error(MSG_VALUE_NOT_MATCH_POLICY % (locality_str+"y", locality_str+"ies", request.locality, + policy_localities)) + raise ClientBadData + csr_attr_map[CSR_ATTR_LOCALITY] = request.locality + elif ps.defaults and ps.defaults.subject and ps.defaults.subject.locality: + csr_attr_map[CSR_ATTR_LOCALITY] = ps.defaults.subject.locality + + if request.province: + if ps.policy and ps.policy.subject: + policy_provinces = ps.policy.subject.localities + valid = value_matches_regex(value=request.province, pattern_list=policy_provinces) + if not valid: + province_str = "Province" + log.error(MSG_VALUE_NOT_MATCH_POLICY % (province_str, province_str+"s", request.province, + policy_provinces)) + raise ClientBadData + csr_attr_map[CSR_ATTR_PROVINCE] = request.province + elif ps.defaults and ps.defaults.subject and ps.defaults.subject.state: + csr_attr_map[CSR_ATTR_PROVINCE] = ps.defaults.subject.state + + if request.country: + if ps.policy and ps.policy.subject: + policy_countries = ps.policy.subject.countries + valid = value_matches_regex(value=request.country, pattern_list=policy_countries) + if not valid: + country_str = "Countr" + log.error(MSG_VALUE_NOT_MATCH_POLICY % (country_str+"y", country_str+"ies", request.country, + policy_countries)) + raise ClientBadData + csr_attr_map[CSR_ATTR_COUNTRY] = request.country + elif ps.defaults and ps.defaults.subject and ps.defaults.subject.country: + csr_attr_map[CSR_ATTR_COUNTRY] = ps.defaults.subject.country + + if len(request.san_dns) > 0: + sans = { + CSR_ATTR_SANS_DNS: request.san_dns + # TODO: Other sans should be added here + } + csr_attr_map[CSR_ATTR_SANS_BY_TYPE] = sans + + return csr_attr_map + + def _get_policy(self, zone, subject_cn_to_str): + """ + + :param str zone: + :param bool subject_cn_to_str: + :rtype: PolicySpecification + """ + cit = self._get_template_by_id(zone) + if not cit: + raise VenafiError('Certificate issuing template not found for zone [%s]', zone) + + info = self._get_ca_info(cit.cert_authority, cit.cert_authority_account_id, cit.cert_authority_product_option_id) + if not info: + raise VenafiError('Certificate Authority info not found.') + + ps = build_policy_spec(cit, info, subject_cn_to_str) + return ps + + def _get_dek_hash(self, cert_id): + """ + + :param str cert_id: + :rtype: EdgeEncryptionKey + """ + url = URLS.CERTIFICATE_BY_ID % cert_id + status, data = self._get(url) + if status != HTTPStatus.OK: + log.error("Error retrieving Certificate details for id: %s" % cert_id) + raise ServerUnexptedBehavior + + dek_hash = data['dekHash'] if 'dekHash' in data else None + if not dek_hash: + return None + + url = URLS.DEK_PUBLIC_KEY % dek_hash + status, data = self._get(url) + if status != HTTPStatus.OK: + log.error("Error retrieving DEK public key for hash: %s" % dek_hash) + raise ServerUnexptedBehavior + + dek = EdgeEncryptionKey(data) + return dek + + def _retrieve_service_generated_cert(self, request, dek_info): + """ + + :param CertificateRequest request: + :param EdgeEncryptionKey dek_info: + :rtype: Certificate + """ + box = SealedBox(dek_info.public_key) + encrypted_key_pass = box.encrypt(request.key_password) + body = { + 'exportFormat': 'PEM', + 'encryptedPrivateKeyPassphrase': base64.b64encode(encrypted_key_pass).decode("utf-8"), + 'encryptedKeystorePassphrase': '', + 'certificateLabel': '' + } + headers = { + 'accept': 'application/octet-stream' + } + url = URLS.CERTIFICATE_KEYSTORE_BY_ID % request.cert_guid + status, data = self._post(url, data=body) + if status not in (HTTPStatus.OK, HTTPStatus.CREATED, HTTPStatus.ACCEPTED): + log.error("Some error") + raise VenafiError + + cert, chain, private_key = zip_to_pem(data, request.chain_option) + pem_private_key = pkcs8_to_pem_private_key(private_key=private_key, passphrase=request.key_password) + return Certificate(cert=cert, chain=chain, key=pem_private_key) diff --git a/vcert/pem.py b/vcert/pem.py index 3b33104..c831ff4 100644 --- a/vcert/pem.py +++ b/vcert/pem.py @@ -21,10 +21,11 @@ import string import random +from Crypto.IO import PKCS8, PEM from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.serialization import pkcs12, load_pem_private_key, BestAvailableEncryption +from cryptography.hazmat.primitives.serialization import pkcs12 from .errors import VenafiError from .logger import get_logger @@ -110,7 +111,8 @@ def as_pkcs12(self, passphrase=None): encryption = serialization.NoEncryption() b_pass = None try: - p_key = load_pem_private_key(data=self.key.encode(), password=b_pass, backend=default_backend()) + p_key = serialization.load_pem_private_key(data=self.key.encode(), password=b_pass, + backend=default_backend()) except Exception as e: get_logger().error(msg="Error parsing Private Key: %s" % e.message) return @@ -120,6 +122,27 @@ def as_pkcs12(self, passphrase=None): return output +def pkcs8_to_pem_private_key(private_key, passphrase): + """ + + :param str private_key: + :param str passphrase: + :rtype: str + """ + b_passphrase = passphrase.encode() + + b_pem, marker, decrypted = PEM.decode(private_key.encode(), b_passphrase) + oid, private_key_der, _ = PKCS8.unwrap(b_pem, b_passphrase) + key = serialization.load_der_private_key(data=private_key_der, password=None, backend=default_backend()) + encryption = serialization.BestAvailableEncryption(b_passphrase) + private_key_pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=encryption + ) + return private_key_pem.decode() + + def random_word(length): letters = string.ascii_lowercase return ''.join(random.choice(letters) for _ in range(length)) # nosec diff --git a/vcert/policy/pm_cloud.py b/vcert/policy/pm_cloud.py index 34cd6ba..78c064f 100644 --- a/vcert/policy/pm_cloud.py +++ b/vcert/policy/pm_cloud.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from vcert.common import Policy as Cit, AppDetails, KeyType +from vcert.common import Policy as Cit, KeyType from vcert.errors import VenafiError from vcert.policy import RPA, DEFAULT_CA from vcert.policy.policy_spec import Policy, Subject, KeyPair, DefaultSubject, DefaultKeyPair, PolicySpecification, \ Defaults, SubjectAltNames +from vcert.vaas_utils import AppDetails supported_rsa_key_sizes = [1024, 2048, 4096] CA_TYPE_DIGICERT = 'DIGICERT' @@ -30,10 +31,11 @@ DEFAULT_HASH_ALGORITHM = 'SHA256' -def build_policy_spec(cit, ca_info): +def build_policy_spec(cit, ca_info, subject_cn_to_str=True): """ :param Cit cit: :param CertificateAuthorityInfo ca_info: + :param bool subject_cn_to_str: Indicates whether or not to remove the regex pattern from the Common Name values :rtype: PolicySpecification """ if not cit: @@ -43,8 +45,11 @@ def build_policy_spec(cit, ca_info): p = Policy() p.wildcard_allowed = is_wildcard_allowed(cit.SubjectCNRegexes) if len(cit.SubjectCNRegexes) > 0: - domains = convert_to_string(cit.SubjectCNRegexes, p.wildcard_allowed) - p.domains = domains + if subject_cn_to_str: + domains = convert_to_string(cit.SubjectCNRegexes, p.wildcard_allowed) + p.domains = domains + else: + p.domains = cit.SubjectCNRegexes else: p.domains = None diff --git a/vcert/vaas_utils.py b/vcert/vaas_utils.py new file mode 100644 index 0000000..30cf619 --- /dev/null +++ b/vcert/vaas_utils.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# +# Copyright 2022 Venafi, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import, division, generators, unicode_literals, print_function, nested_scopes, \ + with_statement + +import io +import re +import zipfile + +from nacl.public import PublicKey +from nacl.encoding import Base64Encoder + +from .common import CHAIN_OPTION_FIRST, CHAIN_OPTION_LAST + + +class AppDetails: + def __init__(self, app_id=None, cit_map=None, company_id=None, name=None, description=None, + owner_ids_and_types=None, fq_dns=None, internal_fq_dns=None, external_ip_ranges=None, + internal_ip_ranges=None, internal_ports=None, fully_qualified_domain_names=None, ip_ranges=None, + ports=None, org_unit_id=None): + """ + :param str app_id: + :param dict cit_map: + """ + self.app_id = app_id + self.cit_alias_id_map = cit_map + self.company_id = company_id + self.name = name + self.description = description + self.owner_ids_and_types = owner_ids_and_types + self.fq_dns = fq_dns + self.internal_fq_dns = internal_fq_dns + self.external_ip_ranges = external_ip_ranges + self.internal_ip_ranges = internal_ip_ranges + self.internal_ports = internal_ports + self.fully_qualified_domain_names = fully_qualified_domain_names + self.ip_ranges = ip_ranges + self.ports = ports + self.org_unit_id = org_unit_id + + +class RecommendedSettings: + def __init__(self, subject_o_value=None, subject_ou_value=None, subject_l_value=None, subject_st_value=None, + subject_c_value=None, key_type=None, key_reuse=None): + """ + :param str subject_o_value: + :param str subject_ou_value: + :param str subject_l_value: + :param str subject_st_value: + :param str subject_c_value: + :param KeyType key_type: + :param bool key_reuse: + """ + self.subjectOValue = subject_o_value + self.subjectOUValue = subject_ou_value + self.subjectLValue = subject_l_value + self.subjectSTValue = subject_st_value + self.subjectCValue = subject_c_value + self.keyType = key_type + self.keyReuse = key_reuse + + +class EdgeEncryptionKey: + def __init__(self, data): + """ + + :param dict data: + """ + self.public_key = PublicKey(data['key'], encoder=Base64Encoder) if 'key' in data else None # type:PublicKey + + +def zip_to_pem(data, chain_option): + """ + + :param data: + :param str chain_option: + :rtype: tuple[str, list, str] + """ + zip_data = zipfile.ZipFile(io.BytesIO(data)) + private_key = None + certs = [] + chain = [] + certificate = None + for info in zip_data.infolist(): + if info.filename.endswith('.key'): + f = zip_data.open(info) + private_key = f.read().decode("utf-8").strip() + f.close() + elif info.filename.endswith('_root-first.pem'): + f = zip_data.open(info) + certs = f.read().decode("utf-8").strip().split('\n\n') + f.close() + for i in range(len(certs)): + if i < len(certs) - 1: + if chain_option == CHAIN_OPTION_FIRST: + chain.append(certs[i]) + elif chain_option == CHAIN_OPTION_LAST: + chain.insert(0, certs[i]) + else: + continue + else: + certificate = certs[i] + return certificate, chain, private_key + + +def value_matches_regex(value, pattern_list): + """ + + :param str value: + :param list[str] pattern_list: + :rtype: bool + """ + return any((re.match(pattern, value) is not None) for pattern in pattern_list) + # for regex in regex_list: + # result = re.match(regex, value) + # if not result: + # return False + # + # return True From 169ed0f29701a9cf98db062bd69a50be7ed55d66 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Mon, 22 Nov 2021 18:04:21 -0600 Subject: [PATCH 2/2] - Updated code to remove dependency on Cryptodome, which has not been active for a lot of time. Using Cryptography instead. --- requirements.txt | 1 - tests/test_vaas.py | 41 +-------------------------------------- vcert/connection_cloud.py | 5 ++--- vcert/pem.py | 22 --------------------- 4 files changed, 3 insertions(+), 66 deletions(-) diff --git a/requirements.txt b/requirements.txt index ff2d830..b52dc7c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,3 @@ enum34;python_version<'3.4' future ruamel.yaml<0.17 pynacl>=1.4.0 -pycryptodome>=3.11.0 diff --git a/tests/test_vaas.py b/tests/test_vaas.py index ac31697..4f4b50f 100644 --- a/tests/test_vaas.py +++ b/tests/test_vaas.py @@ -27,43 +27,9 @@ from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, simple_enroll from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE -from vcert.pem import pkcs8_to_pem_private_key log = logger.get_child("test-vaas") -p8_key = """ ------BEGIN ENCRYPTED PRIVATE KEY----- -MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIPLsOsD8egf4CAicQ -MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBCGU0yCgxPiFpL/l+F5/wmzBIIE -0B2QHY6GoIj204ovABzvhgu6DPt3qvMtxWUhparQoOirf6IWpgPs5yIEVYzm33vb -I0yWb4DTTLQc0k+s1e1whDkhEDyeZ0GGHzHu2LHnsLLUKbUW9wsod9GlQ61IACnr -i8ehxgAAyYAB/PIcwpuF+nzyRHx9bud/916DYQ7Y/DWmCpSHB1/O9vkY1RZJjOqc -XrmzVqL+FBWjPzXk5FfWkRoVIUWsB/yWaP4ZYb5o8xgcAvvyXeofhum9vmiRlRB+ -ii6SH7lgFE7BL1qZPnNCjFeBbDv9OryR1h3FbGnNaKJGOrlA1sirg0lMyi2zsaBe -M0B8y8AVnU8q5JnToIFFo4BnimK7jXPspQ/opu9IaZDWKf3wbwUiC+IfytlelVpT -lMTLvYPPypsjqhInDRrPbdlmx1WN9bfHdkwzRm3x4UuAKTcQKX/5s8AdNDTRx4Kv -UZ2wLylEQcCWYWm3m+YL0PcsnUX301dmKHGG0ub/CwIFO1GYI9+Eb1azsS3h+fx7 -Ec4rOzZ4Q5h1HWnV3P7CVqyq4hSqJ3f7DMThCgW0up2woCMZnZqQcg4+VUYH1oFg -YvrCV0N4W9woHWS6v0HDhMAR9HadUAvDetljrp1ygiPGAe+giNF9AZ+7+MTVwT/M -YEcDzxCrKWQ57KdxnZL2cVELx0pihmqEs0jvh++YShszE39S/Pk58BqFLaS+/eAy -42fXlih2FE+Pj5dTrxY3wY759SOZy+AlHytd3PkYHvCd7qgYTCUo+y8Gd2tIVW2g -pwx59953QhCoyPFMvm97pkHi9IMLLoBobdngV2FKzj3lch1V8iujqNdA8W0Zny0S -6KQgSn6GvW/EVVVIckS41uoKxTJVnCNsI8jpBa4/bUvZzx8s6gDHSZqTFgh+jssu -8rI8nGRsFa3+ynoR3rFcaRFi733BjPHdCYlEYLxfPwhpQ5wYAU2NCMJbCkiakPSR -ywNbIhxJhdmhD8zbNifLaXUB/iFhbW4e+QcZZNo8im/ty0J3OSj9OqNIAAP8k7CV -MdQbI4yu09hDPKIw7YBS+R5pmOjiuQOL4mzeOb8MN4i4AHCUiH/K63pVDqkT1yNM -rIIFjljg1loosubHTU59vWKE/OPuY+BFviK49rw0xGyPdHECgkpS6/CPfzIEkr8U -RsNxRVW/fjTdSw3YaqlrTNEN6tLuddq2R/rMvyXlzhcGB2H81V8ZgJ4bqTgfUdH4 -iAv49PCCIClPQYD4W1HzuSFlNwT4Cy29QgSjw0bHFmvmNvfInidBH5DoJeMovMsy -OROtIuCG0QZjfIcsreU7gcbUvwPNB+nQaDA3IA7fkYmE1xvj38YMIimDRWFKN5Q6 -f67kAGgkFcBlKGh6J+iGNIMscGkRbPRlNHtefE/vaAMHNUBfNxuVk6ylf2Hj2YC9 -gXSp4S0pq5RUvt8KPzeba0mtNlmuFSK9ZfOOu/eBIGvHwA7+HWG4ogTpER1IXbnE -ZzcdVwYponiGL/dtKZIyibxxEUOHjoM9XyoopE9wFq/kQXEgVDCFLdyPAxFS7WA+ -NRqtgX8X41i/zQ72ZvM+bHrq2gk2OnDJ4jyDTBLBQezdOX4rLrWvzIcqh7hmWC1L -KrcsYl3EZcK4zmMgSTTCgEJGKJsgClqUh6TS7atxgIjr ------END ENCRYPTED PRIVATE KEY----- -""" - class TestCloudMethods(unittest.TestCase): def __init__(self, *args, **kwargs): @@ -198,9 +164,4 @@ def test_cloud_enroll_service_generated_csr(self): assert t1 == t2 output = cert_object.as_pkcs12('FooBarPass123') - log.info("PKCS12 created successfully:\n%s" % output) - - def test_cloud_parse_key_p8_to_p12(self): - passphrase = 'FooBarPass123' - pem_pk = pkcs8_to_pem_private_key(self.p8_key, passphrase) - log.info("PEM Private Key is: %s" % pem_pk) + log.info("PKCS12 created successfully for certificate with CN: %s" % cn) diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index bd5027b..b6ad859 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -33,7 +33,7 @@ CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError) from .http import HTTPStatus from .logger import get_child -from .pem import parse_pem, pkcs8_to_pem_private_key, Certificate +from .pem import parse_pem, Certificate from .policy import PolicySpecification from .policy.pm_cloud import (build_policy_spec, validate_policy_spec, AccountDetails, build_cit_request, build_user, UserDetails, build_company, build_apikey, build_app_update_request, get_ca_info, @@ -864,5 +864,4 @@ def _retrieve_service_generated_cert(self, request, dek_info): raise VenafiError cert, chain, private_key = zip_to_pem(data, request.chain_option) - pem_private_key = pkcs8_to_pem_private_key(private_key=private_key, passphrase=request.key_password) - return Certificate(cert=cert, chain=chain, key=pem_private_key) + return Certificate(cert=cert, chain=chain, key=private_key) diff --git a/vcert/pem.py b/vcert/pem.py index c831ff4..004b094 100644 --- a/vcert/pem.py +++ b/vcert/pem.py @@ -21,7 +21,6 @@ import string import random -from Crypto.IO import PKCS8, PEM from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -122,27 +121,6 @@ def as_pkcs12(self, passphrase=None): return output -def pkcs8_to_pem_private_key(private_key, passphrase): - """ - - :param str private_key: - :param str passphrase: - :rtype: str - """ - b_passphrase = passphrase.encode() - - b_pem, marker, decrypted = PEM.decode(private_key.encode(), b_passphrase) - oid, private_key_der, _ = PKCS8.unwrap(b_pem, b_passphrase) - key = serialization.load_der_private_key(data=private_key_der, password=None, backend=default_backend()) - encryption = serialization.BestAvailableEncryption(b_passphrase) - private_key_pem = key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=encryption - ) - return private_key_pem.decode() - - def random_word(length): letters = string.ascii_lowercase return ''.join(random.choice(letters) for _ in range(length)) # nosec