Skip to content

Commit

Permalink
Clean up certificate/CRL properties and decoder exceptions (#89)
Browse files Browse the repository at this point in the history
* Clean up certificate/CRL properties and decoder exceptions

* Add test case for reporting invalid value node presence

* Fix hex string creation for substrate remainder

* Add double quotes around schema name for consistency

* Update CHANGELOG

* Simplify re-encoded substrate DER comparison logic
  • Loading branch information
CBonnell authored Jul 16, 2024
1 parent ebf135b commit 695779a
Show file tree
Hide file tree
Showing 22 changed files with 533 additions and 134 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project from version 0.9.3 onwards are documented in this file.

## 0.11.2 - 2024-07-16

### Fixes

- Gracefully handle mis-encoded extensions and fields exposed as properties (#88)

## 0.11.1 - 2024-07-02

### New features/enhancements
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2020-2023 DigiCert, Inc.
Copyright (c) 2020-2024 DigiCert, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.1
0.11.2
181 changes: 107 additions & 74 deletions pkilint/document.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import binascii
import datetime
import logging
import re
Expand All @@ -12,12 +11,10 @@
Choice, BitString
)


logger = logging.getLogger(__name__)

PATH_REGEX = re.compile(r'^((?P<doc_name>[^:]*):)?(?P<node_path>([^.]+\.)*[^.]+)?$')


try:
# noinspection PyUnresolvedReferences
from pyasn1_fasder import decode_der
Expand Down Expand Up @@ -50,7 +47,7 @@ def __str__(self) -> str:
)


class Document(object):
class Document:
"""Represents an ASN.1-encoded document."""

def __init__(
Expand Down Expand Up @@ -92,7 +89,7 @@ def __repr__(self):
return f'{self.root.name} document "{self.substrate_source}"'


class PDUNode(object):
class PDUNode:
"""Represents a node of a document."""

def __init__(self, document: Document, name: str, pdu: Asn1Type,
Expand Down Expand Up @@ -234,7 +231,7 @@ def __repr__(self):
return f'{self.pdu.__class__.__name__} @ {path}'


class NodeVisitor(object):
class NodeVisitor:
def __init__(self, *,
path: str = None,
path_re: re.Pattern = None,
Expand Down Expand Up @@ -263,21 +260,98 @@ def match(self, node: PDUNode) -> bool:
return True


class ValueDecodingFailedError(Exception):
def __init__(self, value_node: PDUNode, type_oid: ObjectIdentifier,
pdu_type: Optional[Asn1Type], message: str
):
self.value_node = value_node
self.type_oid = type_oid
self.pdu_type = pdu_type
def get_node_name_for_pdu(pdu: Asn1Type) -> str:
name = pdu.__class__.__name__
# convert PDU class name to camelCase
return name[0].lower() + name[1:]


class SubstrateDecodingFailedError(ValueError):
def __init__(
self, source_document: Document, pdu_instance: Optional[Asn1Type], parent_node: Optional[PDUNode],
message: Optional[str]
):
self.source_document = source_document
self.pdu_instance = pdu_instance
self.parent_node = parent_node
self.message = message

def __str__(self):
message = f'Error occurred while decoding substrate in document "{self.source_document.name}"'

if self.parent_node:
message += f' @ {self.parent_node.path}'

if self.pdu_instance:
message += f' using schema "{self.pdu_instance.__class__.__name__}"'

if self.message:
message += f': {self.message}'

return message


def decode_substrate(source_document: Document, substrate: bytes,
pdu_instance: Asn1Type, parent_node: Optional[PDUNode] = None) -> PDUNode:
if parent_node is not None and any(parent_node.children):
logger.debug("%s has child node; not creating new PDU node",
parent_node.path
)
return next(iter(parent_node.children.values()))

if _USE_PYASN1_FASDER:
try:
decoded, _ = decode_der(substrate, asn1Spec=pdu_instance)
except (ValueError, PyAsn1Error) as e:
raise SubstrateDecodingFailedError(source_document, pdu_instance, parent_node, str(e)) from e

decoded_pdu_name = get_node_name_for_pdu(decoded)
else:
try:
decoded, rest = decode(substrate, asn1Spec=pdu_instance)
except (ValueError, PyAsn1Error) as e:
raise SubstrateDecodingFailedError(source_document, pdu_instance, parent_node, str(e)) from e

decoded_pdu_name = get_node_name_for_pdu(decoded)
type_name = decoded.__class__.__name__

if len(rest) > 0:
rest_hex = bytes(rest).hex()

raise SubstrateDecodingFailedError(
source_document, pdu_instance, parent_node,
f'{len(rest)} unexpected octet(s) following "{type_name}" TLV: "{rest_hex}"'
)

try:
encoded = encode(decoded)

substrate_is_der = encoded == substrate
except (ValueError, PyAsn1Error):
substrate_is_der = False

if not substrate_is_der:
raise SubstrateDecodingFailedError(
source_document, pdu_instance, parent_node,
f'Substrate of type "{type_name}" is not DER-encoded'
)

node = PDUNode(source_document, decoded_pdu_name, decoded, parent_node)

if parent_node is not None:
parent_node.children[decoded_pdu_name] = node
logger.debug("Appended %s node to %s", node.name,
parent_node.path
)

return node


class OptionalAsn1TypeWrapper(NamedTuple):
asn1_type: Asn1Type


class ValueDecoder(object):
class ValueDecoder:
_BITSTRING_SCHEMA_OBJ = BitString()

VALUE_NODE_ABSENT = object()
Expand Down Expand Up @@ -319,16 +393,19 @@ def __call__(self, node):

# value node must be absent, but it exists
elif pdu_type is self.VALUE_NODE_ABSENT and value_node is not None:
raise ValueDecodingFailedError(
value_node, type_node.pdu, pdu_type,
'Value node is present, but the ASN.1 schema specifies that it must be absent'
raise SubstrateDecodingFailedError(
node.document, None, value_node,
f'Value node is present, but type OID {type_node.pdu} specifies that it must be absent'
)

# value node must be present, but it doesn't exist
elif pdu_type is not self.VALUE_NODE_ABSENT and value_node is None:
raise ValueDecodingFailedError(
node, type_node.pdu, pdu_type,
'Value node is absent, but the ASN.1 schema specifies that it must be present'
schema_name = pdu_type.__class__.__name__

raise SubstrateDecodingFailedError(
node.document, pdu_type, value_node,
f'Value node is absent, but type OID {type_node.pdu} specifies that a '
f'"{schema_name}" value must be present'
)

if pdu_type is self.VALUE_NODE_ABSENT or pdu_type is None:
Expand All @@ -337,69 +414,25 @@ def __call__(self, node):
value_octets = self.filter_value(node, type_node, value_node, pdu_type)

try:
decode_substrate(value_node.document, value_octets,
pdu_type, value_node
)
except (PyAsn1Error, ValueError) as e:
raise ValueDecodingFailedError(
value_node, type_node.pdu, pdu_type, str(e)
)
decode_substrate(value_node.document, value_octets, pdu_type, value_node)
except SubstrateDecodingFailedError as e:
schema_name = pdu_type.__class__.__name__

message = (
f'ASN.1 decoding failure occurred at "{value_node.path}" with schema "{schema_name}" corresponding to '
f'type OID {type_node.pdu}: {e.message}'
)

def get_node_name_for_pdu(pdu: Asn1Type) -> str:
name = pdu.__class__.__name__
# convert PDU class name to camelCase
return name[0].lower() + name[1:]
raise SubstrateDecodingFailedError(
e.source_document, e.pdu_instance, e.parent_node, message
) from e


def get_document_by_name(node: PDUNode, document_name: str) -> Document:
"""Retrieves the document with the specified name"""
return node.document.parent[document_name]


def decode_substrate(source_document: Document, substrate: bytes,
pdu_instance: Asn1Type, parent_node: Optional[PDUNode] = None) -> PDUNode:
if parent_node is not None and any(parent_node.children):
logger.debug("%s has child node; not creating new PDU node",
parent_node.path
)
return next(iter(parent_node.children.values()))

if _USE_PYASN1_FASDER:
decoded, _ = decode_der(substrate, asn1Spec=pdu_instance)

decoded_pdu_name = get_node_name_for_pdu(decoded)
else:
decoded, rest = decode(substrate, asn1Spec=pdu_instance)

decoded_pdu_name = get_node_name_for_pdu(decoded)

if len(rest) > 0:
raise ValueError(
"Unexpected {} octets following {} DER in {}: {}".format(
len(rest), decoded_pdu_name, source_document.substrate_source,
binascii.hexlify(rest).decode('us-ascii')
)
)

encoded = encode(decoded)
if encoded != substrate:
type_name = decoded.__class__.__name__
raise ValueError(
f'Substrate of type "{type_name}" is not DER-encoded'
)

node = PDUNode(source_document, decoded_pdu_name, decoded, parent_node)

if parent_node is not None:
parent_node.children[decoded_pdu_name] = node
logger.debug("Appended %s node to %s", node.name,
parent_node.path
)

return node


def get_re_for_path_glob(path_glob: str) -> re.Pattern:
return re.compile(
path_glob.replace('.', r'\.').replace('?', r'\w').replace('*', r'\w*')
Expand Down
3 changes: 3 additions & 0 deletions pkilint/pkix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import enum
from typing import Optional

Expand All @@ -8,6 +9,8 @@
from pkilint.document import ValueDecoder
from pkilint.pkix import extension, algorithm, name

MAXIMUM_TIME_DATETIME = datetime.datetime(9999, 12, 31, 23, 59, 59, tzinfo=datetime.timezone.utc)


def create_attribute_decoder(type_mappings, decode_unknown_as_directorystring=True):
default = rfc5280.DirectoryString() if decode_unknown_as_directorystring else None
Expand Down
24 changes: 17 additions & 7 deletions pkilint/pkix/certificate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ def __init__(self, substrate_source, substrate,

@property
def not_before(self):
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notBefore')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notBefore')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

@property
def not_after(self):
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notAfter')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertificate.validity.notAfter')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

def _decode_and_append_extension(
self, ext_oid: univ.ObjectIdentifier, ext_asn1_spec: Asn1Type) -> Optional[document.PDUNode]:
Expand All @@ -56,7 +62,11 @@ def _decode_and_append_extension(
ext, _ = ext_and_idx
ext_value = ext.children['extnValue']

return document.decode_substrate(self, ext_value.pdu.asOctets(), ext_asn1_spec, ext_value)
try:
return document.decode_substrate(self, ext_value.pdu.asOctets(), ext_asn1_spec, ext_value)
except ValueError:
# suppress decoding errors, which will be reported by DecodingValidator instances
return None

@functools.cached_property
def is_ca(self) -> bool:
Expand Down
13 changes: 9 additions & 4 deletions pkilint/pkix/crl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ def __init__(self, substrate_source, substrate,

@property
def this_update(self):
return time.parse_time_node(
self.root.navigate('tbsCertificateList.thisUpdate')
)
try:
return time.parse_time_node(
self.root.navigate('tbsCertList.thisUpdate')
)
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

@property
def next_update(self):
try:
return time.parse_time_node(
self.root.navigate('tbsCertificateList.nextUpdate')
self.root.navigate('tbsCertList.nextUpdate')
)
except document.PDUNavigationFailedError:
return None
except ValueError:
return pkix.MAXIMUM_TIME_DATETIME

def get_extension_by_oid(self, oid):
tbs_crl = self.root.children['tbsCertList']
Expand Down
Loading

0 comments on commit 695779a

Please sign in to comment.