Skip to content

Commit

Permalink
Update entrust_cagw_certificate.py
Browse files Browse the repository at this point in the history
  • Loading branch information
sapnajainEntrust authored Nov 15, 2023
1 parent 7d6dc54 commit b343748
Showing 1 changed file with 118 additions and 118 deletions.
236 changes: 118 additions & 118 deletions plugins/modules/entrust_cagw_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@
'''

from ansible_collections.community.crypto.plugins.module_utils.entrust_cagw.api import (
cagw_client_argument_spec,
CAGWClient,
RestOperationException,
SessionConfigurationException,
)
cagw_client_argument_spec,
CAGWClient,
RestOperationException,
SessionConfigurationException,
)

import datetime
import os
Expand All @@ -496,12 +496,12 @@
from ansible.module_utils._text import to_native, to_bytes

from ansible_collections.community.crypto.plugins.module_utils.io import (
write_file,
)
write_file,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
)
load_certificate,
)

CRYPTOGRAPHY_IMP_ERR = None
try:
Expand Down Expand Up @@ -553,25 +553,25 @@ def __init__(self, module):
# Instantiate the CAGW client
try:
self.cagw_client = CAGWClient(
cagw_api_cert=module.params['cagw_api_client_cert_path'],
cagw_api_cert_key=module.params['cagw_api_client_cert_key_path'],
cagw_api_specification_path=module.params['cagw_api_specification_path']
)
cagw_api_cert=module.params['cagw_api_client_cert_path'],
cagw_api_cert_key=module.params['cagw_api_client_cert_key_path'],
cagw_api_specification_path=module.params['cagw_api_specification_path']
)
except SessionConfigurationException as e:
module.fail_json(msg='Failed to initialize Entrust Provider: {0}'.format(to_native(e)))

def write_cert_to_file(self):
fh = open(self.path, "w")
try:
fh.write(self.cert)
fh.write(self.cert)
finally:
fh.close()
fh.close()

def update_csr(self, module):
body = {}
csr = ''
with open(module.params['csr']) as csr_file:
lines = csr_file.readlines()
lines = csr_file.readlines()
# Remove first line
lines = lines[1:]
# Remove last line
Expand Down Expand Up @@ -660,17 +660,17 @@ def set_cert_details(self, module):
self.cert_days = calculate_cert_days(self.cert_details.get('validityPeriod'))

if self.request_type == 'new':
self.cert = self.cert_details.get('body')
self.cert = self.cert_details.get('body')
elif self.request_type == 'get':
self.cert = self.cert_details.get('certificateData')

def check(self, module):
if self.cert:
serial_number = "{0:X}".format(self.cert.serial_number)
result = self.cagw_client.GetCertificate(ca_id=module.params['certificate_authority_id'],
serial_no=serial_number,
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
serial_no=serial_number,
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
self.cert_details = result.get('certificate')
# Changing the request type to get since we are getting the certificate here on the basis of
# serial number and we need to populate the cert details on the get response only.
Expand All @@ -682,10 +682,10 @@ def check(self, module):
module_params_connector_name = module.params['connector_name']
#ECS CA getCertificate api through CAGW doesn't return status of the certificate
if module_params_connector_name == 'SM':
self.cert_status = self.cert_details.get('status')
self.cert_status = self.cert_details.get('status')
if self.cert_status == 'EXPIRED' or self.cert_status == 'expired' or self.cert_status == 'SUSPENDED' or self.cert_status == 'suspended' or self.cert_status == 'REVOKED' or self.cert_status == 'revoked' or self.cert_status == 'held':

return False
return False

if self.cert_days < module.params['remaining_days']:
return False
Expand All @@ -710,8 +710,8 @@ def request_cert(self, module):
if module_params_connector_name == 'ECS':
body.update(self.update_properties(module))
result = self.cagw_client.NewCertRequest(Body=body, ca_id=module.params['certificate_authority_id'],
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
self.cert_details = result.get('enrollment')
self.set_cert_details(module)
if module_params_format == 'X509':
Expand All @@ -723,15 +723,15 @@ def request_cert(self, module):
elif self.request_type == 'action':
body.update(self.update_action(module))
result = self.cagw_client.ActionOnCertificate(Body=body, ca_id=module.params['certificate_authority_id'],
serial_no=module.params['serial_no'],
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
serial_no=module.params['serial_no'],
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
self.cert_details = result.get('action')
elif self.request_type == 'get':
result = self.cagw_client.GetCertificate(ca_id=module.params['certificate_authority_id'],
serial_no=module.params['serial_no'],
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
serial_no=module.params['serial_no'],
validate_certs=module.params['validate_certs'],
host=module.params['host'], port=module.params['port'])
self.cert_details = result.get('certificate')
self.set_cert_details(module)
self.cert = begin_line + self.cert + end_line
Expand All @@ -745,109 +745,109 @@ def request_cert(self, module):

def dump(self):
result = {
'changed': self.changed,
'filename': self.path,
'cert_status': self.cert_status,
'serialNumber': self.serialNumber,
'cert_days': self.cert_days,
'cert_details': self.cert_details,
'message': self.message,
}
'changed': self.changed,
'filename': self.path,
'cert_status': self.cert_status,
'serialNumber': self.serialNumber,
'cert_days': self.cert_days,
'cert_details': self.cert_details,
'message': self.message,
}
return result

def custom_fields_spec():
return dict(
text1=dict(type='str'),
text2=dict(type='str'),
text3=dict(type='str'),
text4=dict(type='str'),
text5=dict(type='str'),
text6=dict(type='str'),
text7=dict(type='str'),
text8=dict(type='str'),
text9=dict(type='str'),
text10=dict(type='str'),
text11=dict(type='str'),
text12=dict(type='str'),
text13=dict(type='str'),
text14=dict(type='str'),
text15=dict(type='str'),
number1=dict(type='float'),
number2=dict(type='float'),
number3=dict(type='float'),
number4=dict(type='float'),
number5=dict(type='float'),
date1=dict(type='str'),
date2=dict(type='str'),
date3=dict(type='str'),
date4=dict(type='str'),
date5=dict(type='str'),
email1=dict(type='str'),
email2=dict(type='str'),
email3=dict(type='str'),
email4=dict(type='str'),
email5=dict(type='str'),
dropdown1=dict(type='str'),
dropdown2=dict(type='str'),
dropdown3=dict(type='str'),
dropdown4=dict(type='str'),
dropdown5=dict(type='str'),
)
text1=dict(type='str'),
text2=dict(type='str'),
text3=dict(type='str'),
text4=dict(type='str'),
text5=dict(type='str'),
text6=dict(type='str'),
text7=dict(type='str'),
text8=dict(type='str'),
text9=dict(type='str'),
text10=dict(type='str'),
text11=dict(type='str'),
text12=dict(type='str'),
text13=dict(type='str'),
text14=dict(type='str'),
text15=dict(type='str'),
number1=dict(type='float'),
number2=dict(type='float'),
number3=dict(type='float'),
number4=dict(type='float'),
number5=dict(type='float'),
date1=dict(type='str'),
date2=dict(type='str'),
date3=dict(type='str'),
date4=dict(type='str'),
date5=dict(type='str'),
email1=dict(type='str'),
email2=dict(type='str'),
email3=dict(type='str'),
email4=dict(type='str'),
email5=dict(type='str'),
dropdown1=dict(type='str'),
dropdown2=dict(type='str'),
dropdown3=dict(type='str'),
dropdown4=dict(type='str'),
dropdown5=dict(type='str'),
)

def subject_alt_name_spec():
return dict(
dNSName=dict(type='str'),
iPAddress=dict(type='str'),
directoryName=dict(type='str'),
uniformResourceIdentifier=dict(type='str'),
rfc822Name=dict(type='str'),
)
dNSName=dict(type='str'),
iPAddress=dict(type='str'),
directoryName=dict(type='str'),
uniformResourceIdentifier=dict(type='str'),
rfc822Name=dict(type='str'),
)

def entrust_cagw_certificate_argument_spec():
return dict(
force=dict(type='bool', default=False),
path=dict(type='path'),
request_type=dict(type='str', required=True, choices=['new', 'action', 'get']),
action_type=dict(type='str', choices=['RevokeAction', 'HoldAction', 'UnholdAction']),
action_reason=dict(type='str'),
enrollment_format=dict(type='str', choices=['X509', 'PKCS12']),
host=dict(type='str', required=True),
port=dict(type='str', default=443),
certificate_authority_id=dict(type='str', required=True),
serial_no=dict(type='str'),
p12_protection_password=dict(type='str', no_log=True),
dn=dict(type='str'),
certificate_profile_id=dict(type='str'),
csr=dict(type='path'),
remaining_days=dict(type='int', default=30),
connector_name=dict(type='str', choices=['SM', 'ECS', 'PKIaaS', 'MSCA']),
tracking_info=dict(type='str'),
requester_name=dict(type='str'),
requester_email=dict(type='str'),
requester_phone=dict(type='str'),
additional_emails=dict(type='list', elements='str'),
custom_fields=dict(type='dict', default=None, options=custom_fields_spec()),
subject_alt_name=dict(type='dict', default=None, options=subject_alt_name_spec()),
validate_certs=dict(type='bool', default=True),
return dict(
force=dict(type='bool', default=False),
path=dict(type='path'),
request_type=dict(type='str', required=True, choices=['new', 'action', 'get']),
action_type=dict(type='str', choices=['RevokeAction', 'HoldAction', 'UnholdAction']),
action_reason=dict(type='str'),
enrollment_format=dict(type='str', choices=['X509', 'PKCS12']),
host=dict(type='str', required=True),
port=dict(type='str', default=443),
certificate_authority_id=dict(type='str', required=True),
serial_no=dict(type='str'),
p12_protection_password=dict(type='str', no_log=True),
dn=dict(type='str'),
certificate_profile_id=dict(type='str'),
csr=dict(type='path'),
remaining_days=dict(type='int', default=30),
connector_name=dict(type='str', choices=['SM', 'ECS', 'PKIaaS', 'MSCA']),
tracking_info=dict(type='str'),
requester_name=dict(type='str'),
requester_email=dict(type='str'),
requester_phone=dict(type='str'),
additional_emails=dict(type='list', elements='str'),
custom_fields=dict(type='dict', default=None, options=custom_fields_spec()),
subject_alt_name=dict(type='dict', default=None, options=subject_alt_name_spec()),
validate_certs=dict(type='bool', default=True),
)

def main():
cagw_argument_spec = cagw_client_argument_spec()
cagw_argument_spec = cagw_client_argument_spec()
cagw_argument_spec.update(entrust_cagw_certificate_argument_spec())
module = AnsibleModule(
argument_spec=cagw_argument_spec,
required_if=(
['request_type', 'new', ['path', 'enrollment_format', 'certificate_profile_id', 'connector_name']],
['request_type', 'action', ['action_type', 'serial_no', 'action_reason']],
['request_type', 'get', ['path', 'serial_no']],
['enrollment_format', 'X509', ['csr']],
['enrollment_format', 'PKCS12', ['p12_protection_password', 'dn']],
['connector_name', 'ECS', ['requester_name', 'requester_email', 'requester_phone']],
)
)
argument_spec=cagw_argument_spec,
required_if=(
['request_type', 'new', ['path', 'enrollment_format', 'certificate_profile_id', 'connector_name']],
['request_type', 'action', ['action_type', 'serial_no', 'action_reason']],
['request_type', 'get', ['path', 'serial_no']],
['enrollment_format', 'X509', ['csr']],
['enrollment_format', 'PKCS12', ['p12_protection_password', 'dn']],
['connector_name', 'ECS', ['requester_name', 'requester_email', 'requester_phone']],
)
)
if not CRYPTOGRAPHY_FOUND or CRYPTOGRAPHY_VERSION < LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION):
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
exception=CRYPTOGRAPHY_IMP_ERR)

# A new x509 based enrollment request must have the csr field
if module.params['request_type'] == 'new':
Expand All @@ -856,7 +856,7 @@ def main():
module_params_csr = module.params['csr']
if not os.path.exists(module_params_csr):
module.fail_json(msg='The csr field of {0} was not a valid path. csr is required when request_type={1} with enrollment_format={2}' .format(
module_params_csr, module.params['request_type'], module_params_format))
module_params_csr, module.params['request_type'], module_params_format))

certificate = CagwCertificate(module)
certificate.request_cert(module)
Expand Down

0 comments on commit b343748

Please sign in to comment.