Skip to content

Commit

Permalink
Merge pull request #142 from Pmaraveyias/retire-cert
Browse files Browse the repository at this point in the history
Retire cert
  • Loading branch information
luispresuelVenafi authored Feb 6, 2024
2 parents 51abc35 + 6a5a459 commit c5b198d
Show file tree
Hide file tree
Showing 13 changed files with 252 additions and 29 deletions.
1 change: 1 addition & 0 deletions requirements-build.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pip==23.3.2
pytest==7.4.3
pytest-cov==4.1.0
safety==2.3.5
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.31.0
python-dateutil==2.8.2
cryptography==40.0.2
cryptography==42.0.2
six==1.16.0
ruamel.yaml==0.18.5
pynacl==1.5.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
url="https://github.com/Venafi/vcert-python",
packages=['vcert', 'vcert.parser', 'vcert.policy'],
install_requires=['requests==2.31.0', 'python-dateutil==2.8.2', 'certvalidator<=0.11.1', 'six==1.16.0',
'cryptography==40.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'],
'cryptography==42.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'],
description='Python client library for Venafi Trust Protection Platform and Venafi Cloud.',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions tests/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
CLOUD_URL = environ.get('CLOUD_URL')
CLOUD_APIKEY = environ.get('CLOUD_APIKEY')
CLOUD_ZONE = environ.get('CLOUD_ZONE')
VAAS_ZONE_ONLY_EC = environ.get('VAAS_ZONE_ONLY_EC')
CLOUD_TEAM = environ.get('CLOUD_TEAM')

TPP_PM_ROOT = environ.get('TPP_PM_ROOT')
Expand Down
4 changes: 2 additions & 2 deletions tests/test_local_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def test_parse_tpp_policy1(self):
conn = TPPConnection(url="http://example.com/", user="", password="")
raw_data = json.loads(POLICY_TPP1)
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 7)
self.assertEqual(len(p.key_types), 8)
raw_data['Policy']['KeyPair']['KeySize']['Locked'] = True
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 4)
self.assertEqual(len(p.key_types), 5)
raw_data['Policy']['KeyPair']['KeyAlgorithm']['Locked'] = True
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 1)
Expand Down
20 changes: 19 additions & 1 deletion tests/test_tpp_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from assets import TEST_KEY_ECDSA, TEST_KEY_RSA_4096, TEST_KEY_RSA_2048_ENCRYPTED
from test_env import TPP_ZONE, TPP_ZONE_ECDSA, TPP_USER, TPP_PASSWORD, TPP_TOKEN_URL
from test_utils import (random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse,
enroll_with_zone_update, simple_enroll)
enroll_with_zone_update, simple_enroll, retire_by_id, retire_by_thumbprint)
from vcert import (CustomField, KeyType, RevocationRequest, CertificateRequest, IssuerHint, logger, TPPTokenConnection)
from vcert.errors import ClientBadData, ServerUnexptedBehavior

Expand Down Expand Up @@ -175,6 +175,7 @@ def test_token_revoke_normal(self):
with self.assertRaises(Exception):
self.tpp_conn.renew_cert(req)


def test_token_revoke_without_disable(self):
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
rev_req = RevocationRequest(req_id=req.id, disable=False)
Expand Down Expand Up @@ -267,3 +268,20 @@ def test_revoke_access_token(self):
cn = f"{random_word(10)}.venafi.example.com"
with self.assertRaises(Exception):
enroll(self.tpp_conn, self.tpp_zone, cn)

def test_tpp_token_retire_cert_id(self):
try:
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
ret_data = retire_by_id(self.tpp_conn, req.id)
assert ret_data['Success'] is True
except Exception as err:
self.fail(f"Error in tpp retire by id test: {err}")

def test_tpp_token_retire_cert_thumbprint(self):
try:
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
ret_data = retire_by_thumbprint(self.tpp_conn, cert)
assert ret_data['Success'] is True
except Exception as err:
self.fail(f"Error in tpp retire by thumbprint test: {err}")
16 changes: 16 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from test_env import RANDOM_DOMAIN
from vcert import CertificateRequest, FakeConnection, TPPConnection, TPPTokenConnection, CSR_ORIGIN_SERVICE
from vcert.common import RetireRequest


def random_word(length):
Expand Down Expand Up @@ -209,3 +210,18 @@ def renew_by_thumbprint(conn, prev_cert):
print(prev_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME))
assert cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == prev_cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)


def retire_by_id(conn, prev_cert_id):
print("trying to retire by id")
ret_request = RetireRequest(req_id=prev_cert_id)
retire_data = conn.retire_cert(ret_request)
return retire_data


def retire_by_thumbprint(conn, prev_cert):
print("Trying to retire by thumbprint")
thumbprint = binascii.hexlify(prev_cert.fingerprint(hashes.SHA1())).decode()
ret_request = RetireRequest(thumbprint=thumbprint)
retire_data = conn.retire_cert(ret_request)
return retire_data
39 changes: 19 additions & 20 deletions tests/test_vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.x509.oid import NameOID

from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN
from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN, VAAS_ZONE_ONLY_EC
from test_pm import get_policy_obj, get_defaults_obj
from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, simple_enroll, \
get_vaas_zone
from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE
from vcert.policy import KeyPair, DefaultKeyPair, PolicySpecification
from vcert.common import RetireRequest

log = logger.get_child("test-vaas")


class TestVaaSMethods(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.cloud_zone = CLOUD_ZONE
self.vaas_zone_ec = VAAS_ZONE_ONLY_EC
self.cloud_conn = CloudConnection(token=CLOUD_APIKEY, url=CLOUD_URL)
super(TestVaaSMethods, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -170,29 +172,14 @@ def test_cloud_enroll_service_generated_csr(self):
log.info(f"PKCS12 created successfully for certificate with CN: {cn}")

def test_enroll_ec_key_certificate(self):
policy = get_policy_obj()
kp = KeyPair(
key_types=['EC'],
elliptic_curves=['P521', 'P384'],
reuse_allowed=False)
policy.key_pair = kp
zone = self.vaas_zone_ec

defaults = get_defaults_obj()
defaults.key_pair = DefaultKeyPair(
key_type='EC',
elliptic_curve='P521')

policy_spec = PolicySpecification()
policy_spec.policy = policy
policy_spec.defaults = defaults

zone = get_vaas_zone()

self.cloud_conn.set_policy(zone, policy_spec)
password = 'FooBarPass123'
random_name = f"{random_word(10)}.vfidev.com"

request = CertificateRequest(
common_name=f"{random_word(10)}.venafi.example",
common_name=random_name,
san_dns=[random_name],
key_type=KeyType(
key_type="ec",
option="P384"
Expand All @@ -214,3 +201,15 @@ def test_enroll_ec_key_certificate(self):
if p_key:
self.assertIsInstance(p_key, EllipticCurvePrivateKey, "returned private key is not of type Elliptic Curve")
self.assertEqual(p_key.curve.key_size, 384, f"Private Key expected curve: 384. Got: {p_key.curve.key_size}")

def test_cloud_retire_by_thumbprint(self):
try:
req, cert = simple_enroll(self.cloud_conn, self.cloud_zone)
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
time.sleep(1)
ret_request = RetireRequest(thumbprint=fingerprint)
ret_data = self.cloud_conn.retire_cert(ret_request)
assert ret_data is True
except Exception as e:
log.error(msg=f"Error retiring certificate by thumbprint: {e.message}")
18 changes: 16 additions & 2 deletions vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_ip_address():

class KeyType:
ALLOWED_SIZES = [2048, 3072, 4096, 8192]
ALLOWED_CURVES = ["p256", "p384", "p521"]
ALLOWED_CURVES = ["p256", "p384", "p521", "ed25519"]
RSA = 'rsa'
ECDSA = 'ec'

Expand All @@ -125,7 +125,7 @@ def __init__(self, key_type, option):
raise BadData
elif self.key_type == KeyType.ECDSA:
option = {"secp521r1": "p521", "secp384r1": "p384", "secp256r1": "p256", "p256": "p256", "p384": "p384",
"p521": "p521"}[option.lower().strip()]
"p521": "p521", "ed25519": "ed25519"}[option.lower().strip()]
if option not in KeyType.ALLOWED_CURVES:
log.error(f"unknown curve: {option}, should be one of {KeyType.ALLOWED_CURVES}")
raise BadData
Expand Down Expand Up @@ -593,6 +593,20 @@ def __init__(self, req_id=None, thumbprint=None, reason=RevocationReasons.NoRea
self.disable = disable


class RetireRequest:
def __init__(self, req_id=None, thumbprint=None, guid=None, description=None):
"""
:param req_id:
:param thumbprint:
:param guid:
:param description:
"""
self.id = req_id
self.thumbprint = thumbprint
self.guid = guid
self.description = description


class Authentication:
def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None,
token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM):
Expand Down
38 changes: 38 additions & 0 deletions vcert/connection_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self):
CERTIFICATE_STATUS = CERTIFICATE_REQUESTS + "/{}"
CERTIFICATE_RETRIEVE = API_BASE_PATH + "certificates/{}/contents"
CERTIFICATE_SEARCH = API_BASE_PATH + "certificatesearch"
CERTIFICATE_RETIRE = API_BASE_PATH + "certificates/retirement"
APPLICATIONS = API_BASE_PATH + "applications"
APP_BY_ID = APPLICATIONS + "/{}"
CERTIFICATE_TEMPLATE_BY_ID = APP_BY_ID + "/certificateissuingtemplates/{}"
Expand Down Expand Up @@ -477,6 +478,43 @@ def revoke_cert(self, request):
# not supported in Venafi Cloud
raise NotImplementedError

def retire_cert(self, request):
cert_id = None
if not request.id and not request.thumbprint:
log.error("id or thumbprint must be specified for retiring certificate")
raise ClientBadData

if request.id:
cert_id = request.id

elif request.thumbprint:
response = self.search_by_thumbprint(request.thumbprint)
cert_ids = response.certificateIds
if len(cert_ids) > 1:
log.error(f"multiple certificates matching thumbprint found")
raise VenafiError
cert_id = cert_ids[0]

retire_data = {
'certificateIds': [
cert_id
]
}

status, data = self._post(URLS.CERTIFICATE_RETIRE, retire_data)
if status == HTTPStatus.OK:
if len(data) == 0:
log.error(f"certificate retirement was not successful for {cert_id}")
raise VenafiError
else:
return True
elif status == HTTPStatus.BAD_REQUEST or status == HTTPStatus.PRECONDITION_FAILED:
log.error("bad request for certificate retirement")
raise ClientBadData
else:
log.error("unexpected status returned")
raise ServerUnexptedBehavior

def renew_cert(self, request, reuse_key=False):
cert_request_id = None
if not request.id and not request.thumbprint:
Expand Down
27 changes: 27 additions & 0 deletions vcert/connection_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ def post(self, args):

return self._post(url=url, data=data)

def put(self, args):
"""
:param dict args:
:rtype: tuple[Any, Any]
"""
url = args[self.ARG_URL] if self.ARG_URL in args else None
data = args[self.ARG_DATA] if self.ARG_DATA in args else None

return self._put(url=url, data=data)

def _get(self, url="", params=None):
if not self._token or self._token[1] < time.time() + 1:
self.auth()
Expand Down Expand Up @@ -106,6 +117,22 @@ def _post(self, url, data=None):
raise ClientBadData
return self.process_server_response(r)

def _put(self, url, data=None):
if not self._token or self._token[1] < time.time() + 1:
self.auth()
log.debug(f"Token is {self._token[0]}, timeout is {self._token[1]}")

if isinstance(data, dict):
r = requests.put(f"{self._base_url}{url}",
headers={TOKEN_HEADER_NAME: self._token[0],
'content-type': MIME_JSON,
'cache-control': "no-cache"},
json=data,
**self._http_request_kwargs) # nosec B113
else:
log.error(f"Unexpected client data type: {type(data)} for {url}")
raise ClientBadData
return self.process_server_response(r)
@staticmethod
def _normalize_and_verify_base_url(u):
if u.startswith('http://'): # nosec
Expand Down
Loading

0 comments on commit c5b198d

Please sign in to comment.