From b34374820d9a4f7f14c50801b49bd7cf054387de Mon Sep 17 00:00:00 2001 From: sapnajainEntrust <149614151+sapnajainEntrust@users.noreply.github.com> Date: Wed, 15 Nov 2023 14:14:18 -0500 Subject: [PATCH] Update entrust_cagw_certificate.py --- plugins/modules/entrust_cagw_certificate.py | 236 ++++++++++---------- 1 file changed, 118 insertions(+), 118 deletions(-) diff --git a/plugins/modules/entrust_cagw_certificate.py b/plugins/modules/entrust_cagw_certificate.py index 036176ea9..3c2179be7 100644 --- a/plugins/modules/entrust_cagw_certificate.py +++ b/plugins/modules/entrust_cagw_certificate.py @@ -478,11 +478,11 @@ ''' from ansible_collections.community.crypto.plugins.module_utils.entrust_cagw.api import ( - cagw_client_argument_spec, - CAGWClient, - RestOperationException, - SessionConfigurationException, - ) + cagw_client_argument_spec, + CAGWClient, + RestOperationException, + SessionConfigurationException, +) import datetime import os @@ -496,12 +496,12 @@ from ansible.module_utils._text import to_native, to_bytes from ansible_collections.community.crypto.plugins.module_utils.io import ( - write_file, - ) + write_file, +) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - load_certificate, - ) + load_certificate, +) CRYPTOGRAPHY_IMP_ERR = None try: @@ -553,25 +553,25 @@ def __init__(self, module): # Instantiate the CAGW client try: self.cagw_client = CAGWClient( - cagw_api_cert=module.params['cagw_api_client_cert_path'], - cagw_api_cert_key=module.params['cagw_api_client_cert_key_path'], - cagw_api_specification_path=module.params['cagw_api_specification_path'] - ) + cagw_api_cert=module.params['cagw_api_client_cert_path'], + cagw_api_cert_key=module.params['cagw_api_client_cert_key_path'], + cagw_api_specification_path=module.params['cagw_api_specification_path'] + ) except SessionConfigurationException as e: module.fail_json(msg='Failed to initialize Entrust Provider: {0}'.format(to_native(e))) def write_cert_to_file(self): fh = open(self.path, "w") try: - fh.write(self.cert) + fh.write(self.cert) finally: - fh.close() + fh.close() def update_csr(self, module): body = {} csr = '' with open(module.params['csr']) as csr_file: - lines = csr_file.readlines() + lines = csr_file.readlines() # Remove first line lines = lines[1:] # Remove last line @@ -660,7 +660,7 @@ def set_cert_details(self, module): self.cert_days = calculate_cert_days(self.cert_details.get('validityPeriod')) if self.request_type == 'new': - self.cert = self.cert_details.get('body') + self.cert = self.cert_details.get('body') elif self.request_type == 'get': self.cert = self.cert_details.get('certificateData') @@ -668,9 +668,9 @@ def check(self, module): if self.cert: serial_number = "{0:X}".format(self.cert.serial_number) result = self.cagw_client.GetCertificate(ca_id=module.params['certificate_authority_id'], - serial_no=serial_number, - validate_certs=module.params['validate_certs'], - host=module.params['host'], port=module.params['port']) + serial_no=serial_number, + validate_certs=module.params['validate_certs'], + host=module.params['host'], port=module.params['port']) self.cert_details = result.get('certificate') # Changing the request type to get since we are getting the certificate here on the basis of # serial number and we need to populate the cert details on the get response only. @@ -682,10 +682,10 @@ def check(self, module): module_params_connector_name = module.params['connector_name'] #ECS CA getCertificate api through CAGW doesn't return status of the certificate if module_params_connector_name == 'SM': - self.cert_status = self.cert_details.get('status') + self.cert_status = self.cert_details.get('status') if self.cert_status == 'EXPIRED' or self.cert_status == 'expired' or self.cert_status == 'SUSPENDED' or self.cert_status == 'suspended' or self.cert_status == 'REVOKED' or self.cert_status == 'revoked' or self.cert_status == 'held': - return False + return False if self.cert_days < module.params['remaining_days']: return False @@ -710,8 +710,8 @@ def request_cert(self, module): if module_params_connector_name == 'ECS': body.update(self.update_properties(module)) result = self.cagw_client.NewCertRequest(Body=body, ca_id=module.params['certificate_authority_id'], - validate_certs=module.params['validate_certs'], - host=module.params['host'], port=module.params['port']) + validate_certs=module.params['validate_certs'], + host=module.params['host'], port=module.params['port']) self.cert_details = result.get('enrollment') self.set_cert_details(module) if module_params_format == 'X509': @@ -723,15 +723,15 @@ def request_cert(self, module): elif self.request_type == 'action': body.update(self.update_action(module)) result = self.cagw_client.ActionOnCertificate(Body=body, ca_id=module.params['certificate_authority_id'], - serial_no=module.params['serial_no'], - validate_certs=module.params['validate_certs'], - host=module.params['host'], port=module.params['port']) + serial_no=module.params['serial_no'], + validate_certs=module.params['validate_certs'], + host=module.params['host'], port=module.params['port']) self.cert_details = result.get('action') elif self.request_type == 'get': result = self.cagw_client.GetCertificate(ca_id=module.params['certificate_authority_id'], - serial_no=module.params['serial_no'], - validate_certs=module.params['validate_certs'], - host=module.params['host'], port=module.params['port']) + serial_no=module.params['serial_no'], + validate_certs=module.params['validate_certs'], + host=module.params['host'], port=module.params['port']) self.cert_details = result.get('certificate') self.set_cert_details(module) self.cert = begin_line + self.cert + end_line @@ -745,109 +745,109 @@ def request_cert(self, module): def dump(self): result = { - 'changed': self.changed, - 'filename': self.path, - 'cert_status': self.cert_status, - 'serialNumber': self.serialNumber, - 'cert_days': self.cert_days, - 'cert_details': self.cert_details, - 'message': self.message, - } + 'changed': self.changed, + 'filename': self.path, + 'cert_status': self.cert_status, + 'serialNumber': self.serialNumber, + 'cert_days': self.cert_days, + 'cert_details': self.cert_details, + 'message': self.message, + } return result def custom_fields_spec(): return dict( - text1=dict(type='str'), - text2=dict(type='str'), - text3=dict(type='str'), - text4=dict(type='str'), - text5=dict(type='str'), - text6=dict(type='str'), - text7=dict(type='str'), - text8=dict(type='str'), - text9=dict(type='str'), - text10=dict(type='str'), - text11=dict(type='str'), - text12=dict(type='str'), - text13=dict(type='str'), - text14=dict(type='str'), - text15=dict(type='str'), - number1=dict(type='float'), - number2=dict(type='float'), - number3=dict(type='float'), - number4=dict(type='float'), - number5=dict(type='float'), - date1=dict(type='str'), - date2=dict(type='str'), - date3=dict(type='str'), - date4=dict(type='str'), - date5=dict(type='str'), - email1=dict(type='str'), - email2=dict(type='str'), - email3=dict(type='str'), - email4=dict(type='str'), - email5=dict(type='str'), - dropdown1=dict(type='str'), - dropdown2=dict(type='str'), - dropdown3=dict(type='str'), - dropdown4=dict(type='str'), - dropdown5=dict(type='str'), - ) + text1=dict(type='str'), + text2=dict(type='str'), + text3=dict(type='str'), + text4=dict(type='str'), + text5=dict(type='str'), + text6=dict(type='str'), + text7=dict(type='str'), + text8=dict(type='str'), + text9=dict(type='str'), + text10=dict(type='str'), + text11=dict(type='str'), + text12=dict(type='str'), + text13=dict(type='str'), + text14=dict(type='str'), + text15=dict(type='str'), + number1=dict(type='float'), + number2=dict(type='float'), + number3=dict(type='float'), + number4=dict(type='float'), + number5=dict(type='float'), + date1=dict(type='str'), + date2=dict(type='str'), + date3=dict(type='str'), + date4=dict(type='str'), + date5=dict(type='str'), + email1=dict(type='str'), + email2=dict(type='str'), + email3=dict(type='str'), + email4=dict(type='str'), + email5=dict(type='str'), + dropdown1=dict(type='str'), + dropdown2=dict(type='str'), + dropdown3=dict(type='str'), + dropdown4=dict(type='str'), + dropdown5=dict(type='str'), + ) def subject_alt_name_spec(): return dict( - dNSName=dict(type='str'), - iPAddress=dict(type='str'), - directoryName=dict(type='str'), - uniformResourceIdentifier=dict(type='str'), - rfc822Name=dict(type='str'), - ) + dNSName=dict(type='str'), + iPAddress=dict(type='str'), + directoryName=dict(type='str'), + uniformResourceIdentifier=dict(type='str'), + rfc822Name=dict(type='str'), + ) def entrust_cagw_certificate_argument_spec(): - return dict( - force=dict(type='bool', default=False), - path=dict(type='path'), - request_type=dict(type='str', required=True, choices=['new', 'action', 'get']), - action_type=dict(type='str', choices=['RevokeAction', 'HoldAction', 'UnholdAction']), - action_reason=dict(type='str'), - enrollment_format=dict(type='str', choices=['X509', 'PKCS12']), - host=dict(type='str', required=True), - port=dict(type='str', default=443), - certificate_authority_id=dict(type='str', required=True), - serial_no=dict(type='str'), - p12_protection_password=dict(type='str', no_log=True), - dn=dict(type='str'), - certificate_profile_id=dict(type='str'), - csr=dict(type='path'), - remaining_days=dict(type='int', default=30), - connector_name=dict(type='str', choices=['SM', 'ECS', 'PKIaaS', 'MSCA']), - tracking_info=dict(type='str'), - requester_name=dict(type='str'), - requester_email=dict(type='str'), - requester_phone=dict(type='str'), - additional_emails=dict(type='list', elements='str'), - custom_fields=dict(type='dict', default=None, options=custom_fields_spec()), - subject_alt_name=dict(type='dict', default=None, options=subject_alt_name_spec()), - validate_certs=dict(type='bool', default=True), + return dict( + force=dict(type='bool', default=False), + path=dict(type='path'), + request_type=dict(type='str', required=True, choices=['new', 'action', 'get']), + action_type=dict(type='str', choices=['RevokeAction', 'HoldAction', 'UnholdAction']), + action_reason=dict(type='str'), + enrollment_format=dict(type='str', choices=['X509', 'PKCS12']), + host=dict(type='str', required=True), + port=dict(type='str', default=443), + certificate_authority_id=dict(type='str', required=True), + serial_no=dict(type='str'), + p12_protection_password=dict(type='str', no_log=True), + dn=dict(type='str'), + certificate_profile_id=dict(type='str'), + csr=dict(type='path'), + remaining_days=dict(type='int', default=30), + connector_name=dict(type='str', choices=['SM', 'ECS', 'PKIaaS', 'MSCA']), + tracking_info=dict(type='str'), + requester_name=dict(type='str'), + requester_email=dict(type='str'), + requester_phone=dict(type='str'), + additional_emails=dict(type='list', elements='str'), + custom_fields=dict(type='dict', default=None, options=custom_fields_spec()), + subject_alt_name=dict(type='dict', default=None, options=subject_alt_name_spec()), + validate_certs=dict(type='bool', default=True), ) def main(): - cagw_argument_spec = cagw_client_argument_spec() + cagw_argument_spec = cagw_client_argument_spec() cagw_argument_spec.update(entrust_cagw_certificate_argument_spec()) module = AnsibleModule( - argument_spec=cagw_argument_spec, - required_if=( - ['request_type', 'new', ['path', 'enrollment_format', 'certificate_profile_id', 'connector_name']], - ['request_type', 'action', ['action_type', 'serial_no', 'action_reason']], - ['request_type', 'get', ['path', 'serial_no']], - ['enrollment_format', 'X509', ['csr']], - ['enrollment_format', 'PKCS12', ['p12_protection_password', 'dn']], - ['connector_name', 'ECS', ['requester_name', 'requester_email', 'requester_phone']], - ) - ) + argument_spec=cagw_argument_spec, + required_if=( + ['request_type', 'new', ['path', 'enrollment_format', 'certificate_profile_id', 'connector_name']], + ['request_type', 'action', ['action_type', 'serial_no', 'action_reason']], + ['request_type', 'get', ['path', 'serial_no']], + ['enrollment_format', 'X509', ['csr']], + ['enrollment_format', 'PKCS12', ['p12_protection_password', 'dn']], + ['connector_name', 'ECS', ['requester_name', 'requester_email', 'requester_phone']], + ) + ) if not CRYPTOGRAPHY_FOUND or CRYPTOGRAPHY_VERSION < LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION): module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), - exception=CRYPTOGRAPHY_IMP_ERR) + exception=CRYPTOGRAPHY_IMP_ERR) # A new x509 based enrollment request must have the csr field if module.params['request_type'] == 'new': @@ -856,7 +856,7 @@ def main(): module_params_csr = module.params['csr'] if not os.path.exists(module_params_csr): module.fail_json(msg='The csr field of {0} was not a valid path. csr is required when request_type={1} with enrollment_format={2}' .format( - module_params_csr, module.params['request_type'], module_params_format)) + module_params_csr, module.params['request_type'], module_params_format)) certificate = CagwCertificate(module) certificate.request_cert(module)