diff --git a/.bandit.yml b/.bandit.yml index ae781ceb..3bd46e2c 100644 --- a/.bandit.yml +++ b/.bandit.yml @@ -3,4 +3,4 @@ exclude_dirs: - "./tests/" - "./docs/" - - "netutils/oui_mappings.py" + - "netutils/data_files/" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95628584..8bada118 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -204,10 +204,10 @@ jobs: run: "docker image ls" - name: "Run Tests" run: "poetry run invoke pytest" - - name: "Install Napalm" - run: "pip install napalm" + - name: "Install Optional Dependencies" + run: "poetry run poetry install --extras optionals" - name: "Run Optional Tests" - run: "poetry run pytest tests/unit/test_lib_helpers_optionals.py" + run: "poetry run pytest tests/unit/test_lib_helpers_optionals.py tests/unit/test_acl.py" needs: - "pylint" publish_gh: diff --git a/docs/assets/acl-workflow.drawio b/docs/assets/acl-workflow.drawio new file mode 100644 index 00000000..ef750b63 --- /dev/null +++ b/docs/assets/acl-workflow.drawio @@ -0,0 +1,367 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/dev/code_reference/acl.md b/docs/dev/code_reference/acl.md new file mode 100644 index 00000000..d13ca277 --- /dev/null +++ b/docs/dev/code_reference/acl.md @@ -0,0 +1,5 @@ +# ACLs + +::: netutils.acl + options: + show_submodules: True \ No newline at end of file diff --git a/docs/dev/arch_decision.md b/docs/dev/dev_adr.md similarity index 100% rename from docs/dev/arch_decision.md rename to docs/dev/dev_adr.md diff --git a/docs/images/acl-workflow.png b/docs/images/acl-workflow.png new file mode 100644 index 00000000..d7a990eb Binary files /dev/null and b/docs/images/acl-workflow.png differ diff --git a/docs/user/include_jinja_list.md b/docs/user/include_jinja_list.md index 4be27d36..e64e0ef2 100644 --- a/docs/user/include_jinja_list.md +++ b/docs/user/include_jinja_list.md @@ -33,6 +33,7 @@ | get_broadcast_address | netutils.ip.get_broadcast_address | | get_first_usable | netutils.ip.get_first_usable | | get_peer_ip | netutils.ip.get_peer_ip | +| get_range_ips | netutils.ip.get_range_ips | | get_usable_range | netutils.ip.get_usable_range | | ip_addition | netutils.ip.ip_addition | | ip_subtract | netutils.ip.ip_subtract | @@ -43,7 +44,10 @@ | ipaddress_network | netutils.ip.ipaddress_network | | is_classful | netutils.ip.is_classful | | is_ip | netutils.ip.is_ip | +| is_ip_range | netutils.ip.is_ip_range | +| is_ip_within | netutils.ip.is_ip_within | | is_netmask | netutils.ip.is_netmask | +| is_network | netutils.ip.is_network | | netmask_to_cidr | netutils.ip.netmask_to_cidr | | get_napalm_getters | netutils.lib_helpers.get_napalm_getters | | get_oui | netutils.mac.get_oui | diff --git a/docs/user/lib_use_cases.md b/docs/user/lib_use_cases.md index 2e01aa04..3baaac14 100644 --- a/docs/user/lib_use_cases.md +++ b/docs/user/lib_use_cases.md @@ -23,7 +23,7 @@ Functions are grouped with like functions, such as IP or MAC address based funct - Ping - Provides the ability to ping, currently only tcp ping. - Protocol Mapper - Provides a mapping for protocol names to numbers and vice versa. - Route - Provides the ability to provide a list of routes and an IP Address and return the longest prefix matched route. -- Time -Provides the ability to convert between integer time and string times. +- Time - Provides the ability to convert between integer time and string times. - VLANs - Provide the ability to convert configuration into lists or lists into configuration. ## Examples diff --git a/docs/user/lib_use_cases_acl.md b/docs/user/lib_use_cases_acl.md new file mode 100644 index 00000000..b4c93dff --- /dev/null +++ b/docs/user/lib_use_cases_acl.md @@ -0,0 +1,302 @@ +# ACL + +The ACL classes are intended to help guide the ACL conversation. It is not intended to solve every ACL challenge you may have. In essence, it provides sane defaults and welcomes you to extend the logic via supported extension mechanisms. Three patterns that heavily make up the capabilities are: + +- Expanding data to the ["Cartesian product"](#cartesian-product) (or combination) of each rule, so that each product can be easily evaluated. +- Providing a `f"{type}_*` method pattern, to dynamically find any `validate_*` or `enforce_*` method you provide. +- Providing a `f"{type}_{attr}` method pattern, to dynamically find any `process_{attr}` or `match_{attr}` method you provide for the given attrs. + +Each of these are covered in detail, below in the [core concepts](#core-concepts) section. + +Here you can see how the Python classes work together. There is a lot going on, so I encourage you to review the diagram briefly, and refer back to it often while reviewing the detailed information below. + +![ACL Classes](../images/acl-workflow.png) + +> It may be helpful to open the diagram in a new tab to view the full size, as an example, in Chrome you can right-click on the image and select "Open Image on New Tab". + +The intention of this page is not to cover every attribute and it's behavior, but a more human (although highly technical) understanding of what is going on. For more detailed information, please see the [test](https://github.com/networktocode/netutils/blob/develop/tests/unit/test_acl.py) and [code docs](../../dev/code_reference/acl/). + +> In the future the intention is to add features such as better de-duplication, partial match, and path analysis. + +## Core Concepts + +### Cartesian Product + +This ["Cartesian product"](https://en.wikipedia.org/wiki/Cartesian_product) concept is used throughout the page, so I thought it would be good to review. In this example, we have a single `rule`, and like many rules, it has multiple sources, destinations, and protocols. The `_cartesian_product` function creates the combinations, each of which is technically called a product. + + +```python +>>> from netutils.acl import _cartesian_product +>>> rule = dict( +... name="Allow to internal web", +... src_ip=["192.168.0.0/24", "10.0.0.0/16"], +... dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], +... dst_port=["tcp/80", "udp/53"], +... action="permit", +... ) +>>> for item in _cartesian_product(rule): +... print(item) +... +{'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '172.16.0.0/16', 'dst_port': 'tcp/80', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '172.16.0.0/16', 'dst_port': 'udp/53', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'tcp/80', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'udp/53', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '172.16.0.0/16', 'dst_port': 'tcp/80', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '172.16.0.0/16', 'dst_port': 'udp/53', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'tcp/80', 'action': 'permit'} +{'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'udp/53', 'action': 'permit'} +>>> +``` + +Now that you have the Cartesian products, you can evaluate each one individually. In this example perhaps '192.168.0.0/24' -> '172.16.0.0/16' is allowed, but '192.168.0.0/24' -> '192.168.250.10-192.168.250.20' is denied. Yet another example is the access could be allowed on both sources and destinations IPs, but not for udp/53 (DNS). + +Having the ability to look at each product individually allows you to only have to worry about the check you wish to create, versus custom logic that attempts to understand the combinations. Building a `is_pci_to_non_pci` becomes trivial when looking at each product. This idea applies to validating, enforcing, matching, etc.. + + +### Dynamic Method - Attrs + +The methods `process` and `match` both follow this pattern. As an example, the `process` method will dynamically find any method that follows `f"process_{attr}` pattern. This allows a Python class that inherits from `ACLRule` to simply add a `process_src_ip` method and that method would be called. + +### Dynamic Method - Any + +The methods `validate` and `enforce` both follow this pattern. As an example, the `validate` method will dynamically find any method that follows `f"process_*` pattern. This allows a Python class that inherits from `ACLRule` to simply add a `validate_ip_in_network` method and that method would be called. + +In both methods, ordering can be controlled with `order_validate` and `order_enforce` respectively. The default ordering will be what the Python `dir` function returns, which is in alphabetical. + +## ACLRule + +The `ACLRule` class at a high level: + +- The class `ACLRule` is used for working with Access Control List (ACL) rules. +- Is built with extensibility in mind to allow you to customize how your business operates. +- It contains a list of attributes such as "name," "src_ip," "src_zone," "dst_ip," "dst_port," "dst_zone," and "action" that are commonly used to work with ACLs. +- Provides the ability for you to expand data such as converting an address-group name into detailed addresses with your custom code. +- Provides the Cartesian product (or combinations) of the rules to make evaluation simple. +- It provides options for verifying input data and result data (data expanded) with corresponding JSON schemas. +- Provides the ability to validate data, generally for tech feasibility testing such as "are IPs on our network and in IPAM" or "is NAT IP provided vs actual IP". +- Provides the ability to enforce data, generally for security testing such as "is PCI attempting to talk with a non-PCI environment" or "is IP range not narrowly scoped enough". + - The class supports a matrix feature, which allows users to define custom rule matching based on predefined matrix definitions. +- Provides the ability to match one rule to another, to understand if rule exists already. + +### Initialization & Loading Data + +The initialization process calls on the `load_data` method. This on a high level verifies schema of initial data, allows you to process data (e.g. convert tcp/https -> 80/443), expand data, determine Cartesian product (or combinations) of the firewall rule (traditionally 5-tuple), and verifies schema of result data. + +The Cartesian product (or combination) is key to the functionality of other steps, this allows you to evaluate each rule based on the smallest view of the data, so pay close attention to those steps, as it is important to other methods as well. + +To provide some ideas on what you may validate: + +- Is one of the source or destination IPs in our network? +- Is one of the source or destination IPs in our IPAM? +- Is the source and destination IPs able to route to each other? +- Is the source and destination IPs on the same network? +- Is one of the destination IPs to the real IP vs the NAT IP? +- Is there routing from the source zone to the destination zone? +- Is the business unit name in the description ? + +Many of validations will be based on IPs, but not all. + +Here you will find a written understanding of what is happening in the code: + +- The init method takes in data and calls `load_data`. +- The method `load_data` processes the input data. + - The `input_data_check` method is called and verifies the input data based on the specified JSON schema. + - This is controlled by the `input_data_verify` attribute and schema defined in `input_data_schema`. + - For each `self.attrs`, a method name matching `f"process_{attr}"`, (e.g. `process_src_ip()`) is called. + - This allows you to inherit from and provide your own custom processes to convert, expand, or otherwise modify data before being evaluated. + - The `process_dst_port` method processes the `dst_port` attribute by converting the protocol and port information, it is enabled by default but controlled with the `dst_port_process` attribute. + - Both a dictionary `self.processed` and attributes (e.g. self.action, self.src_ip, etc.) are created. + - The `result_data_check` method verifies the processed data based on the specified JSON schema. + - This is controlled by the `result_data_verify` attribute which is disabled by default. + - The `validate` method validating the rule using a series of custom methods starting with `validate_` prefixes. + - The ordering can be controlled based on the `order_validate` attribute or defaults to all matches. + - The rules are expanded into `self.expanded_rules` by creating each combination of the tuple, using a Cartesian product function. + - An example combination would be converting source: 10.1.1.1, 10.1.1.2, destination: 10.100.100.100, port: 80 -> 10.1.1.1, destination: 10.100.100.100, port: 80 and 10.1.1.2, destination: 10.100.100.100, port: 80. + - This will be key, so that each combination can be compared individually later. + - Filter out the combinations that have the same source and destination IP, if `self.filter_same_ip` which is on by default. + +### Enforce + +Enforce is generally used for security controls. An `enforce_matrix` is provided but not used by default. You can think of the matrix as an Excel sheet, in which you have source as the rows and destination as the column. You would identify the source/destination IP and find which x & y coordinates in your Excel document, and perform whatever action it states, such as deny, review, approve, etc. + +To provide some ideas on what you may enforce: + +- Is the request for a PCI to non-PCI network? +- Is the request for a security-zone-X to security-zone-Y network? +- Is the protocol a secure protocol? +- Is the request approved? +- Is the IPs requested narrowly scoped? + +Here you will find a written understanding of what is happening in the code: + +- The `enforce` method validating the rule using a series of custom methods starting with `enforce_` prefixes such as `enforce_pci_checks`. + - The ordering can be controlled based on the `order_enforce` attribute or defaults to all matches. + - The `enforce_matrix` method enforces ACL rules based on a predefined matrix feature. + - This is controlled by the `self.matrix_enforced` attribute and is off by default. + - The `enforce_matrix` method runs the enforcement checks for each of the `self.expanded_rules` (or combinations of tuples) + - This matrix definition is very simple and not likely ready to be be used in a production environment, instead used for simple demonstrations and communicating potential ideas. + - Each method should return a dictionary or list of dictionaries as both of these are handled + - In the example there is the `obj` and `action` key. + - This could and should be extended, such as providing obj, action, detail_msg, notification_team, and any other metadata that the tooling using this system would require. + - Catastrophic errors will raise an error. + +While not accurate in all use cases it would be best practice to run any of your custom `enforce_` against `self.expanded_rules`. + +### Match & Match Details + +The `match_details` method provides a verbose way of verifying match details between two ACL rule's, the `match` method uses `match_details` and provides a boolean if there are any rules in `rules_unmatched` which would tell you if you had a full match or not. We will only review in detail the `match_details`. + +Here you will find a written understanding of what is happening in the code: + +- The `self.expanded_rules` is looped over for every combination. + - For each `self.attrs`, a method name matching `f"match_{attr}"`, (e.g. `match_src_ip()`) is called. + - This allows you to inherit from and provide your own custom equality check or verify with your business logic. + - You do not need to have a `f"match_{attr}"` method for every attr, description as example would not be a good candidate to match on. + - Equality checks are done on `src_zone`, `dst_zone`, `action`, and `port` by default. + - An `is_ip_within` check is done with for `src_ip` and `dst_ip` by default. +- In the process, details are provided for and returned: + - `rules_matched` - Root key that is a list of dictionaries of rules that matched. + - `rules_unmatched` - Root key that is a list of dictionaries of rules that did not match. + - `existing_rule_product` - The original expanded_rule that existed in this item. + - `existing_rule` - The full original rule (not expanded_rule) that existed. + - `match_rule` - The full original rule that tested against, only shown in `rules_matched` root key. + +This data could help you to understand what matched, why it matched, and other metadata. This detail data can be used to in `ACLRules` to aggregate and ask more interesting questions. + +## ACLRules + +The `ACLRules` class at a high level: + +- Loads up multiple `ACLRule` and loads the data from a list of dictionaries. + - This is generally the data that exists on the firewall already, but there are other use cases. +- Allows you to match the multiple `ACLRule` objects, and test against a single `ACLRule` object. + - This is generally to see if the access to the rule you are testing exists already or not. + +Using the `match_details` method, you could as an example, build logic if every product of the rule is matched, just not against a single rule. This is one of many different ways you could use the data. + +## Example Usage + +Here we can test if a rule is matched via the existing ruleset. We can leverage the permit or deny to understand if this exists already or not. + +**Simple Example** + +```python +>>> from netutils.acl import ACLRules +>>> +>>> existing_acls = [ +... dict( +... name="Allow to internal web", +... src_ip=["192.168.0.0/24", "10.0.0.0/16"], +... dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], +... dst_port=["tcp/80", "udp/53"], +... action="permit", +... ), +... dict( +... name="Allow to internal dns", +... src_ip=["192.168.1.0/24"], +... dst_ip=["172.16.0.0/16"], +... dst_port=["tcp/80", "udp/53"], +... action="permit", +... ) +... ] +>>> +>>> new_acl_match = dict( +... name="Check multiple sources pass", +... src_ip=["192.168.1.10", "192.168.1.11", "192.168.1.15-192.168.1.20"], +... dst_ip="172.16.0.10", +... dst_port="tcp/www-http", +... action="permit", +... ) +>>> +>>> ACLRules(existing_acls).match(new_acl_match) +'permit' +>>> +>>> +>>> new_acl_non_match = dict( +... name="Check no match", +... src_ip=["10.1.1.1"], +... dst_ip="172.16.0.10", +... dst_port="tcp/www-http", +... action="permit", +... ) +>>> +>>> ACLRules(existing_acls).match(new_acl_non_match) +'deny' +>>> +``` + +**Inherit Example** + +```python + +from netutils.acl import ACLRule + +class ExpandAddrGroups(ACLRule): + address_groups = {"red": ["white", "blue"], "blue": ["cyan"], "yellow": ["orange"]} + addresses = {"white": ["10.1.1.1", "10.2.2.2"], "cyan": ["10.3.3.3"], "orange": ["10.4.4.4"]} + + def __init__(self, data, *args, **kwargs): + self.flattened_addresses = self.flatten_addresses(self.address_groups, self.addresses) + super().__init__(data, *args, **kwargs) + + def flatten_addresses(self, address_groups, addresses): + flattened_addresses = {} + for group, subgroups in address_groups.items(): + if group in addresses: + flattened_addresses.setdefault(group, []).extend(addresses[group]) + for subgroup in subgroups: + if subgroup in addresses: + flattened_addresses.setdefault(group, []).extend(addresses[subgroup]) + if subgroup in address_groups: + subgroup_addresses = self.flatten_addresses({subgroup: address_groups[subgroup]}, addresses) + for sub_group, ips in subgroup_addresses.items(): + flattened_addresses.setdefault(sub_group, []).extend(ips) + if group != sub_group: + flattened_addresses.setdefault(group, []).extend(ips) + return flattened_addresses + + def process_ip(self, ip): + if not isinstance(ip, list): + ip = [ip] + output = [] + for ip_name in ip: + if not ip_name[0].isalpha(): + output.append(ip_name) + elif self.addresses.get(ip_name): + output.extend(self.addresses[ip_name]) + elif self.flattened_addresses.get(ip_name): + output.extend(self.flattened_addresses[ip_name]) + return sorted(list(set(output))) + + def process_src_ip(self, src_ip): + return self.process_ip(src_ip) + + def process_dst_ip(self, dst_ip): + return self.process_ip(dst_ip) +``` + +Using the above object, we can test with: + +```python +>>> rule_data = dict( +... name="Check allow", +... src_ip=["red", "blue", "10.4.4.4"], +... dst_ip=["white"], +... dst_port="6/www-http", +... action="permit", +... ) +>>> +>>> address_object_expanded = ExpandAddrGroups(rule_data) +>>> for item in address_object_expanded.expanded_rules: +... print(item) +... +{'name': 'Check allow', 'src_ip': '10.1.1.1', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} +{'name': 'Check allow', 'src_ip': '10.2.2.2', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} +{'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} +{'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} +{'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} +{'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} +>>> +``` + +In that example you can see how we expanded `red` -> 10.1.1.1", "10.2.2.2", "10.3.3.3" as an example. diff --git a/mkdocs.yml b/mkdocs.yml index c2c2e943..35979a05 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -100,6 +100,7 @@ nav: - Jinja Filters: "user/lib_use_cases_jinja_filters.md" - Library Mapper: "user/lib_use_cases_lib_mapper.md" - Protocol Mapper: "user/lib_use_cases_protocol_mappers.md" + - ACL: "user/lib_use_cases_acl.md" - Upgrade Paths: "user/lib_upgrade_paths.md" - Getting Started: "user/lib_getting_started.md" - Frequently Asked Questions: "user/faq.md" @@ -122,10 +123,12 @@ nav: - Contributing to the Library: "dev/contributing.md" - Development Environment: "dev/dev_environment.md" - Config Parser Development: "dev/dev_config.md" + - Arch Decision Records: "dev/dev_adr.md" - Code Attribution to the Library: "dev/attribution.md" - Code Reference: - "dev/code_reference/index.md" - ASN: "dev/code_reference/asn.md" + - ACL: "dev/code_reference/acl.md" - Bandwidth: "dev/code_reference/bandwidth.md" - Banner: "dev/code_reference/banner.md" - Configs: "dev/code_reference/configs.md" diff --git a/netutils/acl.py b/netutils/acl.py new file mode 100644 index 00000000..b9fb51af --- /dev/null +++ b/netutils/acl.py @@ -0,0 +1,524 @@ +"""Classes to help manage ACLs .""" + +import itertools +import copy +import typing as t +from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM +from netutils.ip import is_ip_within + +try: + import jsonschema + + HAS_JSON_SCHEMA = True +except ImportError: + HAS_JSON_SCHEMA = False + +INPUT_SCHEMA = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "src_zone": {"type": ["string", "array"]}, + "src_ip": {"$ref": "#/definitions/arrayOrIP"}, + "dst_ip": {"$ref": "#/definitions/arrayOrIP"}, + "dst_port": { + "oneOf": [ + { + "$ref": "#/definitions/port", + }, + { + "type": "array", + "items": { + "$ref": "#/definitions/port", + }, + "minItems": 1, + "uniqueItems": True, + }, + ], + }, + "dst_zone": {"type": ["string", "array"]}, + "action": {"type": "string"}, + }, + "definitions": { + "ipv4": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}$"}, + "ipv4_cidr": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}\\/\\d{1,2}$"}, + "ipv4_range": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}\\-(?:\\d{1,3}\\.){3}\\d{1,3}$"}, + "ipv6": {"type": "string", "pattern": "^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$"}, + "ipv6_cidr": {"type": "string", "pattern": "^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}/[0-9]{1,3}$"}, + "ipv6_range": { + "type": "string", + "pattern": "^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}-([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$", + }, + "oneIP": { + "oneOf": [ + { + "$ref": "#/definitions/ipv4", + }, + { + "$ref": "#/definitions/ipv4_cidr", + }, + { + "$ref": "#/definitions/ipv4_range", + }, + { + "$ref": "#/definitions/ipv6", + }, + { + "$ref": "#/definitions/ipv6_cidr", + }, + { + "$ref": "#/definitions/ipv6_range", + }, + ], + }, + "arrayOrIP": { + "oneOf": [ + { + "$ref": "#/definitions/oneIP", + }, + { + "type": "array", + "items": { + "$ref": "#/definitions/oneIP", + }, + }, + ], + }, + "port": {"type": "string", "pattern": "^\\S+\\/\\S+$"}, + }, + "required": [], +} + + +RESULT_SCHEMA = copy.deepcopy(INPUT_SCHEMA) +RESULT_SCHEMA["definitions"]["port"]["pattern"] = "^\\d+\\/\\d+$" # type: ignore + + +def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]: + """Function to create the Cartesian product/combinations from a data dictionary. + + Args: + data: A dictionary with string keys and either string or list of string values + + Returns: + A list of dictionaries that extrapolates the product to individual items. + + Examples: + >>> from netutils.acl import _cartesian_product + >>> rule = dict( + ... name="Allow to internal web", + ... src_ip=["192.168.0.0/24", "10.0.0.0/16"], + ... dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], + ... dst_port=["tcp/80", "udp/53"], + ... action="permit", + ... ) + >>> for item in _cartesian_product(rule): + ... print(item) + ... + {'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '172.16.0.0/16', 'dst_port': 'tcp/80', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '172.16.0.0/16', 'dst_port': 'udp/53', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'tcp/80', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '192.168.0.0/24', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'udp/53', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '172.16.0.0/16', 'dst_port': 'tcp/80', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '172.16.0.0/16', 'dst_port': 'udp/53', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'tcp/80', 'action': 'permit'} + {'name': 'Allow to internal web', 'src_ip': '10.0.0.0/16', 'dst_ip': '192.168.250.10-192.168.250.20', 'dst_port': 'udp/53', 'action': 'permit'} + >>> + """ + keys = [] + values = [] + for key, value in data.items(): + keys.append(key) + if isinstance(value, (str, int)): + values.append([value]) + else: + values.append(value) + product = list(itertools.product(*values)) + return [dict(zip(keys, item)) for item in product] + + +def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: + """Checks the schema provided.""" + if not verify: + return + if HAS_JSON_SCHEMA: + jsonschema.validate(data, schema) + else: + if not isinstance(data, dict): + raise ValueError() + + +class ACLRule: + """A class that helps you imagine an acl rule via methodologies.""" + + attrs: t.List[str] = ["name", "src_ip", "src_zone", "dst_ip", "dst_port", "dst_zone", "action"] + permit: str = "permit" + deny: str = "deny" + + input_data_verify: bool = False + input_data_schema: t.Any = INPUT_SCHEMA + + result_data_verify: bool = False + result_data_schema: t.Any = RESULT_SCHEMA + + matrix: t.Any = {} + matrix_enforced: bool = False + matrix_definition: t.Any = {} + + dst_port_process: bool = True + + order_validate: t.List[str] = [] + order_enforce: t.List[str] = [] + filter_same_ip: bool = True + + def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument + """Initialize and load data. + + Args: + data: A dictionary with string keys and either string or list of string values + + Examples: + >>> from netutils.acl import ACLRule + >>> + >>> acl_data = dict( + ... name="Check no match", + ... src_ip=["10.1.1.1"], + ... dst_ip="172.16.0.10", + ... dst_port="tcp/www-http", + ... action="permit", + ... ) + >>> + >>> rule = ACLRule(acl_data) + >>> + >>> rule.expanded_rules + [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] + >>> + """ + self.processed: t.Dict[str, str] = {} + self.data = data + self.load_data() + + def load_data(self) -> None: + """Load the data into the rule while verifying input data, result data, and processing data.""" + self.input_data_check() + for attr in self.attrs: + if not self.data.get(attr): + continue + if hasattr(self, f"process_{attr}"): + proccessor = getattr(self, f"process_{attr}") + _attr_data = proccessor(self.data[attr]) + else: + _attr_data = self.data[attr] + self.processed[attr] = _attr_data + setattr(self, attr, _attr_data) + self.result_data_check() + self.validate() + self.expanded_rules = _cartesian_product(self.processed) + if self.filter_same_ip: + self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] + + def input_data_check(self) -> None: + """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" + return _check_schema(self.data, self.input_data_schema, self.input_data_verify) + + def result_data_check(self) -> None: + """Verify the result data against the specified JSONSchema or using a simple dictionary check.""" + return _check_schema(self.processed, self.result_data_schema, self.result_data_verify) + + def validate(self) -> t.Any: + """Run through any method that startswith('validate_') and run that method.""" + if self.order_validate: + method_order = self.order_validate + else: + method_order = dir(self) + results = [] + for name in method_order: + if name.startswith("validate_"): + result = getattr(self, name)() + if not result: + continue + if result and isinstance(result, dict): + results.append(result) + elif result and isinstance(result, list): + results.extend(result) + return results + + def process_dst_port( + self, dst_port: t.Any + ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements + """Convert port and protocol information.""" + output = [] + if not self.dst_port_process: + return None + if not isinstance(dst_port, list): + dst_port = [dst_port] + for item in dst_port: + protocol = item.split("/")[0] + port = item.split("/")[1] + if protocol.isalpha(): + if not PROTO_NAME_TO_NUM.get(protocol.upper()): + raise ValueError( + f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." + ) + protocol = PROTO_NAME_TO_NUM[protocol.upper()] + # test port[0] vs port, since dashes do not count, e.g. www-http + if int(protocol) == 6 and port[0].isalpha(): + if not TCP_NAME_TO_NUM.get(port.upper()): + raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") + port = TCP_NAME_TO_NUM[port.upper()] + if int(protocol) == 17 and port[0].isalpha(): + if not UDP_NAME_TO_NUM.get(port.upper()): + raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") + port = UDP_NAME_TO_NUM[port.upper()] + output.append(f"{protocol}/{port}") + return output + + def enforce(self) -> t.List[t.Dict[str, t.Any]]: + """Run through any method that startswith('enforce_') and run that method. + + Returns: + A list of dictionaries that explains the results of the enforcement. + """ + if self.order_enforce: + method_order = self.order_enforce + else: + method_order = dir(self) + results = [] + for name in method_order: + if name.startswith("enforce_"): + result = getattr(self, name)() + if not result: + continue + if result and isinstance(result, dict): + results.append(result) + elif result and isinstance(result, list): + results.extend(result) + return results + + def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: + """A simple `matrix` or grid style check of a rule. + + Returns: + A list of dictionaries that explains the results of the matrix being enforced. + """ + if not self.matrix_enforced: + return None + if not self.matrix: + raise ValueError("You must set a matrix dictionary to use the matrix feature.") + if not self.matrix_definition: + raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") + actions = [] + for rule in self.expanded_rules: + source = rule["src_ip"] + destination = rule["dst_ip"] + port = rule["dst_port"] + src_zone = "" + dst_zone = "" + as_tuple = (source, destination, port) + for zone, ips in self.matrix_definition.items(): + if is_ip_within(source, ips): + src_zone = zone + if is_ip_within(destination, ips): + dst_zone = zone + if port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []): + actions.append({"obj": as_tuple, "action": "allow"}) + elif port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []): + actions.append({"obj": as_tuple, "action": "notify"}) + else: + actions.append({"obj": as_tuple, "action": "deny"}) + return actions + + def match_action(self, existing_action: str, check_action: str) -> bool: + """Match the action for equality. + + Args: + existing_action: The existing action value to be matched. + check_action: The action value to be checked against the existing action. + + Returns: + True if `existing_action` matches `check_action`, False otherwise. + """ + return existing_action == check_action + + def match_src_ip(self, existing_ip: str, check_ip: str) -> bool: + """Determined if source `check_ip` is within `existing_ip`. + + Args: + existing_ip: The existing source IP address or IP range to be matched against. + check_ip: The source IP address to be checked. + + Returns: + True if `check_ip` is within the range of `existing_ip`, False otherwise. + """ + return is_ip_within(check_ip, existing_ip) + + def match_src_zone(self, existing_src_zone: str, check_src_zone: str) -> bool: + """Match the source zone for equality. + + Args: + existing_src_zone: The existing source zone value to be matched. + check_src_zone: The source zone value to be checked against the existing zone. + + Returns: + True if `existing_src_zone` matches `check_src_zone`, False otherwise. + """ + return existing_src_zone == check_src_zone + + def match_dst_ip(self, existing_ip: str, check_ip: str) -> bool: + """Determined if destination `check_ip` is within `existing_ip. + + Args: + existing_ip: The existing destination IP address or IP range to be matched against. + check_ip: The destination IP address to be checked. + + Returns: + True if `check_ip` is within the range of `existing_ip`, False otherwise. + """ + return is_ip_within(check_ip, existing_ip) + + def match_dst_zone(self, existing_dst_zone: str, check_dst_zone: str) -> bool: + """Match the destination zone for equality. + + Args: + existing_dst_zone: The existing destination zone value to be matched. + check_dst_zone: The destination zone value to be checked against the existing zone. + + Returns: + True if `existing_dst_zone` matches `check_dst_zone`, False otherwise. + """ + return existing_dst_zone == check_dst_zone + + def match_dst_port(self, existing_port: str, check_port: str) -> bool: + """Match the destination port for equality. + + Args: + existing_port: The existing destination port value to be matched. + check_port: The destination port value to be checked against the existing port. + + Returns: + True if `existing_port` matches `check_port`, False otherwise. + """ + return existing_port == check_port + + def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals + """Verbose way of verifying match details. + + Args: + match_rule: The rule which you are testing against. + + Returns: + A dictionary with root keys of `rules_matched` and `rules_matched`. + """ + attrs = [] + for name in dir(self): + if name.startswith("match_"): + obj_name = name[len("match_") :] # noqa: E203 + # When an attribute is not defined, can skip it + if not hasattr(match_rule, obj_name): + continue + attrs.append(obj_name) + + rules_found: t.List[bool] = [] + rules_unmatched: t.List[t.Dict[str, t.Any]] = [] + rules_matched: t.List[t.Dict[str, t.Any]] = [] + + if not match_rule.expanded_rules: + raise ValueError("There is no expanded rules to test against.") + for rule in match_rule.expanded_rules: + rules_found.append(False) + for existing_rule in self.expanded_rules: + missing = False + for attr in attrs: + # Examples of obj are match_rule.src_ip, match_rule.dst_port + rule_value = rule[attr] + existing_value = existing_rule[attr] + # Examples of getter are self.match_src_ip, self.match_dst_port + getter = getattr(self, f"match_{attr}")(existing_value, rule_value) + if not getter and getter is not None: + missing = True + break + # If the loop gets through with each existing rule not flagging + # the `missing` value, we know everything was matched, and the rule has + # found a complete match, we can break out of the loop at this point. + if not missing: + rules_found[-1] = True + break + detailed_info = { + "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable + "match_rule": match_rule.processed, + "existing_rule": self.processed, + } + if rules_found[-1]: + detailed_info["match_rule_product"] = rule + rules_matched.append(detailed_info) + else: + rules_unmatched.append(detailed_info) + return {"rules_matched": rules_matched, "rules_unmatched": rules_unmatched} + + def match(self, match_rule: "ACLRule") -> bool: + """Simple boolean way of verifying match or not. + + Args: + match_rule: The rule which you are testing against. + + Returns: + A boolean if there was a full match or not. + """ + details = self.match_details(match_rule) + return not bool(details["rules_unmatched"]) + + def __repr__(self) -> str: + """Set repr of the object to be sane.""" + output = [] + for attr in self.attrs: + if self.processed.get(attr): + output.append(f"{attr}: {self.processed[attr]}") + return ", ".join(output) + + +class ACLRules: + """Class to help match multiple ACLRule objects.""" + + class_obj = ACLRule + + def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument + """Class to help match multiple ACLRule. + + Args: + data: A list of `ACLRule` rules. + """ + self.data: t.Any = data + self.rules: t.List[t.Any] = [] + self.load_data() + + def load_data(self) -> None: + """Load the data for multiple rules.""" + for item in self.data: + self.rules.append(self.class_obj(item)) + + def match(self, rule: ACLRule) -> str: + """Check the rules loaded in `load_data` against a new `rule`. + + Args: + rule: The `ACLRule` rule to test against the list of `ACLRule`s loaded in initiation. + + Returns: + The response from the rule that matched, or `deny` by default. + """ + for item in self.rules: + if item.match(self.class_obj(rule)): + return str(item.action) + return str(item.deny) # pylint: disable=undefined-loop-variable + + def match_details(self, rule: ACLRule) -> t.Any: + """Verbosely check the rules loaded in `load_data` against a new `rule`. + + Args: + rule: The `ACLRule` rule to test against the list of `ACLRule`s loaded in initiation. + + Returns: + The details from the `ACLRule.match_details` results. + """ + output = [] + for item in self.rules: + output.append(item.match_details(self.class_obj(rule))) + return output diff --git a/netutils/dns.py b/netutils/dns.py index 965ae879..8eb2b9c1 100644 --- a/netutils/dns.py +++ b/netutils/dns.py @@ -35,10 +35,10 @@ def is_fqdn_resolvable(hostname: str) -> bool: from your machine to the DNS server, an upstream DNS issue, etc. Args: - hostname (str): A FQDN that may or may not be resolvable. + hostname: A FQDN that may or may not be resolvable. Returns: - bool: The result as to whether or not the domain was valid. + The result as to whether or not the domain was valid. Examples: >>> from netutils.dns import is_fqdn_resolvable diff --git a/netutils/interface.py b/netutils/interface.py index 0c50625c..a3bab738 100644 --- a/netutils/interface.py +++ b/netutils/interface.py @@ -438,10 +438,10 @@ def sort_interface_list(interfaces: t.List[str]) -> t.List[str]: nodes are removed. Args: - interfaces (list[str]): A list of interfaces to be sorted. The input list is not mutated by this function. + interfaces: A list of interfaces to be sorted. The input list is not mutated by this function. Returns: - list[str]: A **new** sorted, unique list elements from the input. + A **new** sorted, unique list elements from the input. Examples: >>> sort_interface_list(["Gi1/0/1", "Gi1/0/3", "Gi1/0/3.100", "Gi1/0/2", "Gi1/0/2.50", "Gi2/0/2", "Po40", "Po160", "Lo10"]) diff --git a/netutils/ip.py b/netutils/ip.py index afbe91a2..fbd93703 100644 --- a/netutils/ip.py +++ b/netutils/ip.py @@ -5,6 +5,8 @@ from netutils.constants import IPV4_MASKS, IPV6_MASKS +IPAddress = t.Union[ipaddress.IPv4Address, ipaddress.IPv6Address] + def ipaddress_address(ip: str, attr: str) -> t.Any: """Convenience function primarily built to expose ipaddress.ip_address to Jinja. @@ -236,6 +238,103 @@ def is_ip(ip: str) -> bool: return False +def is_ip_range(ip_range: str) -> bool: + """Verifies whether or not a string is a valid IP address range. + + An `ip_range` is in the format of `{ip_start}-{ip_end}`, IPs in str format, same IP version, and + ip_start is before ip_end. + + Args: + ip_range: An IP address range in string format that is properly formatted. + + Returns: + The result as to whether or not the string is a valid IP address. + + Examples: + >>> from netutils.ip import is_ip_range + >>> is_ip_range("10.100.100.255") + False + >>> is_ip_range("10.100.100.255-10.100.101.1") + True + >>> + """ + if "-" not in ip_range: + return False + start_ip, end_ip = ip_range.split("-") + if not is_ip(start_ip) or not is_ip(end_ip): + return False + start_ip_obj = ipaddress.ip_address(start_ip) + end_ip_obj = ipaddress.ip_address(end_ip) + if not type(start_ip_obj) == type(end_ip_obj): # pylint: disable=unidiomatic-typecheck + return False + # IP version being the same is enforced above, mypy disagrees, can safely ignore + if not start_ip_obj < end_ip_obj: # type: ignore + return False + return True + + +def is_ip_within(ip: str, ip_compare: t.Union[str, t.List[str]]) -> bool: + """ + Check if an IP address, IP subnet, or IP range is within the range of a list of IP addresses, IP subnets, and IP ranges. + + Args: + ip: IP address, IP subnet, or IP range to check. + ip_compare: String or list of IP addresses, IP subnets, and IP ranges to compare against. + + Returns: + True if the IP is in range, False otherwise. + + Examples: + >>> from netutils.ip import is_ip_within + >>> is_ip_within("10.100.100.10", "10.100.100.0/24") + True + >>> is_ip_within("10.100.100.0/25", ["10.100.100.0/24", "10.100.200.0/24"]) + True + >>> + >>> is_ip_within("10.100.100.10", ["10.100.100.8-10.100.100.20", "10.100.200.0/24"]) + True + >>> is_ip_within("10.100.100.8-10.100.100.20", ["10.100.100.0/24"]) + True + >>> is_ip_within("1.1.1.1", ["2.2.2.2", "3.3.3.3"]) + False + >>> + """ + + def _convert_ip(ip: str) -> str: + if is_ip(ip): + if "." in ip: + mask = "32" + if ":" in ip: + mask = "128" + return f"{ip}/{mask}" + return ip + + if "-" in ip: + ip_obj_start, ip_obj_end = get_range_ips(ip) + else: + ip_obj = ipaddress.ip_network(_convert_ip(ip)) + ip_obj_start = ip_obj[0] + ip_obj_end = ip_obj[-1] + + if isinstance(ip_compare, str): + ip_compare = [ip_compare] + + for item in ip_compare: + if "-" in item: + item_obj_start, item_obj_end = get_range_ips(item) + + else: + item_obj = ipaddress.ip_network(_convert_ip(item)) + item_obj_start = item_obj[0] + item_obj_end = item_obj[-1] + assert type(item_obj_start) == type(item_obj_end) # nosec # pylint: disable=unidiomatic-typecheck + # Use this validation method, since it is consitent with ranges + # vs the `.subnet_of` method which is not. + if item_obj_start <= ip_obj_start <= ip_obj_end <= item_obj_end: # type: ignore + return True + return False + + def is_netmask(netmask: str) -> bool: """Verifies whether or not a string is a valid subnet mask. @@ -260,6 +359,32 @@ def is_netmask(netmask: str) -> bool: return False +def is_network(ip_network: str) -> bool: + """Verifies whether or not a string is a valid IP Network with a Mask. + + Args: + ip: An IP network in string format that is able to be converted by `ipaddress` library. + + Returns: + The result as to whether or not the string is a valid IP network. + + Examples: + >>> from netutils.ip import is_network + >>> is_network("10.100.100.0") + False + >>> is_network("10.100.100.0/24") + True + >>> + """ + if "/" not in ip_network: + return False + try: + ipaddress.ip_network(ip_network) + return True + except ValueError: + return False + + def netmask_to_cidr(netmask: str) -> int: """Creates a CIDR notation of a given subnet mask in decimal format. @@ -420,6 +545,31 @@ def get_peer_ip(ip_interface: str) -> str: return val[0] +def get_range_ips(ip_range: str) -> t.Tuple[IPAddress, IPAddress]: + """Get's the two IPs as a tuple of IPAddress objects. + + Args: + ip_range: An IP address range in string format that is properly formatted. + + Returns: + The start and end IP address of the range provided. + + Examples: + >>> from netutils.ip import get_range_ips + >>> get_range_ips("10.100.100.255-10.100.101.1") + (IPv4Address('10.100.100.255'), IPv4Address('10.100.101.1')) + >>> get_range_ips("2001::1-2001::10") + (IPv6Address('2001::1'), IPv6Address('2001::10')) + >>> + """ + if not is_ip_range(ip_range): + raise ValueError(r"Not a valid IP range format of `{start_ip}-{end_ip}`") + start_ip, end_ip = ip_range.split("-") + start_ip_obj = ipaddress.ip_address(start_ip) + end_ip_obj = ipaddress.ip_address(end_ip) + return start_ip_obj, end_ip_obj + + def get_usable_range(ip_network: str) -> str: """Given a network, return the string of usable IP addresses. diff --git a/netutils/utils.py b/netutils/utils.py index 9580c278..11b1ae24 100644 --- a/netutils/utils.py +++ b/netutils/utils.py @@ -34,7 +34,10 @@ "ip_subtract": "ip.ip_subtract", "is_classful": "ip.is_classful", "is_ip": "ip.is_ip", + "is_ip_range": "ip.is_ip_range", + "is_ip_within": "ip.is_ip_within", "is_netmask": "ip.is_netmask", + "is_network": "ip.is_network", "netmask_to_cidr": "ip.netmask_to_cidr", "cidr_to_netmask": "ip.cidr_to_netmask", "cidr_to_netmaskv6": "ip.cidr_to_netmaskv6", @@ -42,6 +45,7 @@ "get_broadcast_address": "ip.get_broadcast_address", "get_first_usable": "ip.get_first_usable", "get_peer_ip": "ip.get_peer_ip", + "get_range_ips": "ip.get_range_ips", "get_usable_range": "ip.get_usable_range", "ipaddress_address": "ip.ipaddress_address", "ipaddress_interface": "ip.ipaddress_interface", diff --git a/poetry.lock b/poetry.lock index 972da29d..3b75769d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -30,6 +30,24 @@ wrapt = [ {version = ">=1.14,<2", markers = "python_version >= \"3.11\""}, ] +[[package]] +name = "attrs" +version = "23.1.0" +description = "Classes Without Boilerplate" +optional = true +python-versions = ">=3.7" +files = [ + {file = "attrs-23.1.0-py3-none-any.whl", hash = "sha256:1f28b4522cdc2fb4256ac1a020c78acf9cba2c6b461ccd2c126f3aa8e8335d04"}, + {file = "attrs-23.1.0.tar.gz", hash = "sha256:6279836d581513a26f1bf235f9acd333bc9115683f14f7e8fae46c98fc50e015"}, +] + +[package.extras] +cov = ["attrs[tests]", "coverage[toml] (>=5.3)"] +dev = ["attrs[docs,tests]", "pre-commit"] +docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"] +tests = ["attrs[tests-no-zope]", "zope-interface"] +tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] + [[package]] name = "babel" version = "2.12.1" @@ -625,6 +643,24 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker perf = ["ipython"] testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] +[[package]] +name = "importlib-resources" +version = "6.0.0" +description = "Read resources from Python packages" +optional = true +python-versions = ">=3.8" +files = [ + {file = "importlib_resources-6.0.0-py3-none-any.whl", hash = "sha256:d952faee11004c045f785bb5636e8f885bed30dc3c940d5d42798a2a4541c185"}, + {file = "importlib_resources-6.0.0.tar.gz", hash = "sha256:4cf94875a8368bd89531a756df9a9ebe1f150e0f885030b461237bc7f2d905f2"}, +] + +[package.dependencies] +zipp = {version = ">=3.1.0", markers = "python_version < \"3.10\""} + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +testing = ["pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-ruff"] + [[package]] name = "iniconfig" version = "2.0.0" @@ -681,6 +717,44 @@ MarkupSafe = ">=2.0" [package.extras] i18n = ["Babel (>=2.7)"] +[[package]] +name = "jsonschema" +version = "4.18.4" +description = "An implementation of JSON Schema validation for Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "jsonschema-4.18.4-py3-none-any.whl", hash = "sha256:971be834317c22daaa9132340a51c01b50910724082c2c1a2ac87eeec153a3fe"}, + {file = "jsonschema-4.18.4.tar.gz", hash = "sha256:fb3642735399fa958c0d2aad7057901554596c63349f4f6b283c493cf692a25d"}, +] + +[package.dependencies] +attrs = ">=22.2.0" +importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""} +jsonschema-specifications = ">=2023.03.6" +pkgutil-resolve-name = {version = ">=1.3.10", markers = "python_version < \"3.9\""} +referencing = ">=0.28.4" +rpds-py = ">=0.7.1" + +[package.extras] +format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"] +format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"] + +[[package]] +name = "jsonschema-specifications" +version = "2023.7.1" +description = "The JSON Schema meta-schemas and vocabularies, exposed as a Registry" +optional = true +python-versions = ">=3.8" +files = [ + {file = "jsonschema_specifications-2023.7.1-py3-none-any.whl", hash = "sha256:05adf340b659828a004220a9613be00fa3f223f2b82002e273dee62fd50524b1"}, + {file = "jsonschema_specifications-2023.7.1.tar.gz", hash = "sha256:c91a50404e88a1f6ba40636778e2ee08f6e24c5613fe4c53ac24578a5a7f72bb"}, +] + +[package.dependencies] +importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""} +referencing = ">=0.28.0" + [[package]] name = "junos-eznc" version = "2.6.7" @@ -1320,6 +1394,17 @@ files = [ {file = "pbr-5.11.1.tar.gz", hash = "sha256:aefc51675b0b533d56bb5fd1c8c6c0522fe31896679882e1c4c63d5e4a0fccb3"}, ] +[[package]] +name = "pkgutil-resolve-name" +version = "1.3.10" +description = "Resolve a name to an object." +optional = true +python-versions = ">=3.6" +files = [ + {file = "pkgutil_resolve_name-1.3.10-py3-none-any.whl", hash = "sha256:ca27cc078d25c5ad71a9de0a7a330146c4e014c2462d9af19c6b828280649c5e"}, + {file = "pkgutil_resolve_name-1.3.10.tar.gz", hash = "sha256:357d6c9e6a755653cfd78893817c0853af365dd51ec97f3d358a819373bbd174"}, +] + [[package]] name = "platformdirs" version = "3.9.1" @@ -1639,6 +1724,21 @@ files = [ [package.dependencies] pyyaml = "*" +[[package]] +name = "referencing" +version = "0.30.0" +description = "JSON Referencing + Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "referencing-0.30.0-py3-none-any.whl", hash = "sha256:c257b08a399b6c2f5a3510a50d28ab5dbc7bbde049bcaf954d43c446f83ab548"}, + {file = "referencing-0.30.0.tar.gz", hash = "sha256:47237742e990457f7512c7d27486394a9aadaf876cbfaa4be65b27b4f4d47c6b"}, +] + +[package.dependencies] +attrs = ">=22.2.0" +rpds-py = ">=0.7.0" + [[package]] name = "requests" version = "2.31.0" @@ -1679,6 +1779,112 @@ typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9 [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "rpds-py" +version = "0.9.2" +description = "Python bindings to Rust's persistent data structures (rpds)" +optional = true +python-versions = ">=3.8" +files = [ + {file = "rpds_py-0.9.2-cp310-cp310-macosx_10_7_x86_64.whl", hash = "sha256:ab6919a09c055c9b092798ce18c6c4adf49d24d4d9e43a92b257e3f2548231e7"}, + {file = "rpds_py-0.9.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d55777a80f78dd09410bd84ff8c95ee05519f41113b2df90a69622f5540c4f8b"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a216b26e5af0a8e265d4efd65d3bcec5fba6b26909014effe20cd302fd1138fa"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:29cd8bfb2d716366a035913ced99188a79b623a3512292963d84d3e06e63b496"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:44659b1f326214950a8204a248ca6199535e73a694be8d3e0e869f820767f12f"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:745f5a43fdd7d6d25a53ab1a99979e7f8ea419dfefebcab0a5a1e9095490ee5e"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a987578ac5214f18b99d1f2a3851cba5b09f4a689818a106c23dbad0dfeb760f"}, + {file = "rpds_py-0.9.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:bf4151acb541b6e895354f6ff9ac06995ad9e4175cbc6d30aaed08856558201f"}, + {file = "rpds_py-0.9.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:03421628f0dc10a4119d714a17f646e2837126a25ac7a256bdf7c3943400f67f"}, + {file = "rpds_py-0.9.2-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:13b602dc3e8dff3063734f02dcf05111e887f301fdda74151a93dbbc249930fe"}, + {file = "rpds_py-0.9.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:fae5cb554b604b3f9e2c608241b5d8d303e410d7dfb6d397c335f983495ce7f6"}, + {file = "rpds_py-0.9.2-cp310-none-win32.whl", hash = "sha256:47c5f58a8e0c2c920cc7783113df2fc4ff12bf3a411d985012f145e9242a2764"}, + {file = "rpds_py-0.9.2-cp310-none-win_amd64.whl", hash = "sha256:4ea6b73c22d8182dff91155af018b11aac9ff7eca085750455c5990cb1cfae6e"}, + {file = "rpds_py-0.9.2-cp311-cp311-macosx_10_7_x86_64.whl", hash = "sha256:e564d2238512c5ef5e9d79338ab77f1cbbda6c2d541ad41b2af445fb200385e3"}, + {file = "rpds_py-0.9.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f411330a6376fb50e5b7a3e66894e4a39e60ca2e17dce258d53768fea06a37bd"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0e7521f5af0233e89939ad626b15278c71b69dc1dfccaa7b97bd4cdf96536bb7"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8d3335c03100a073883857e91db9f2e0ef8a1cf42dc0369cbb9151c149dbbc1b"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d25b1c1096ef0447355f7293fbe9ad740f7c47ae032c2884113f8e87660d8f6e"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6a5d3fbd02efd9cf6a8ffc2f17b53a33542f6b154e88dd7b42ef4a4c0700fdad"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c5934e2833afeaf36bd1eadb57256239785f5af0220ed8d21c2896ec4d3a765f"}, + {file = "rpds_py-0.9.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:095b460e117685867d45548fbd8598a8d9999227e9061ee7f012d9d264e6048d"}, + {file = "rpds_py-0.9.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:91378d9f4151adc223d584489591dbb79f78814c0734a7c3bfa9c9e09978121c"}, + {file = "rpds_py-0.9.2-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:24a81c177379300220e907e9b864107614b144f6c2a15ed5c3450e19cf536fae"}, + {file = "rpds_py-0.9.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:de0b6eceb46141984671802d412568d22c6bacc9b230174f9e55fc72ef4f57de"}, + {file = "rpds_py-0.9.2-cp311-none-win32.whl", hash = "sha256:700375326ed641f3d9d32060a91513ad668bcb7e2cffb18415c399acb25de2ab"}, + {file = "rpds_py-0.9.2-cp311-none-win_amd64.whl", hash = "sha256:0766babfcf941db8607bdaf82569ec38107dbb03c7f0b72604a0b346b6eb3298"}, + {file = "rpds_py-0.9.2-cp312-cp312-macosx_10_7_x86_64.whl", hash = "sha256:b1440c291db3f98a914e1afd9d6541e8fc60b4c3aab1a9008d03da4651e67386"}, + {file = "rpds_py-0.9.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0f2996fbac8e0b77fd67102becb9229986396e051f33dbceada3debaacc7033f"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9f30d205755566a25f2ae0382944fcae2f350500ae4df4e795efa9e850821d82"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:159fba751a1e6b1c69244e23ba6c28f879a8758a3e992ed056d86d74a194a0f3"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1f044792e1adcea82468a72310c66a7f08728d72a244730d14880cd1dabe36b"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9251eb8aa82e6cf88510530b29eef4fac825a2b709baf5b94a6094894f252387"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:01899794b654e616c8625b194ddd1e5b51ef5b60ed61baa7a2d9c2ad7b2a4238"}, + {file = "rpds_py-0.9.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:b0c43f8ae8f6be1d605b0465671124aa8d6a0e40f1fb81dcea28b7e3d87ca1e1"}, + {file = "rpds_py-0.9.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:207f57c402d1f8712618f737356e4b6f35253b6d20a324d9a47cb9f38ee43a6b"}, + {file = "rpds_py-0.9.2-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:b52e7c5ae35b00566d244ffefba0f46bb6bec749a50412acf42b1c3f402e2c90"}, + {file = "rpds_py-0.9.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:978fa96dbb005d599ec4fd9ed301b1cc45f1a8f7982d4793faf20b404b56677d"}, + {file = "rpds_py-0.9.2-cp38-cp38-macosx_10_7_x86_64.whl", hash = "sha256:6aa8326a4a608e1c28da191edd7c924dff445251b94653988efb059b16577a4d"}, + {file = "rpds_py-0.9.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:aad51239bee6bff6823bbbdc8ad85136c6125542bbc609e035ab98ca1e32a192"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4bd4dc3602370679c2dfb818d9c97b1137d4dd412230cfecd3c66a1bf388a196"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dd9da77c6ec1f258387957b754f0df60766ac23ed698b61941ba9acccd3284d1"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:190ca6f55042ea4649ed19c9093a9be9d63cd8a97880106747d7147f88a49d18"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:876bf9ed62323bc7dcfc261dbc5572c996ef26fe6406b0ff985cbcf460fc8a4c"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa2818759aba55df50592ecbc95ebcdc99917fa7b55cc6796235b04193eb3c55"}, + {file = "rpds_py-0.9.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:9ea4d00850ef1e917815e59b078ecb338f6a8efda23369677c54a5825dbebb55"}, + {file = "rpds_py-0.9.2-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:5855c85eb8b8a968a74dc7fb014c9166a05e7e7a8377fb91d78512900aadd13d"}, + {file = "rpds_py-0.9.2-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:14c408e9d1a80dcb45c05a5149e5961aadb912fff42ca1dd9b68c0044904eb32"}, + {file = "rpds_py-0.9.2-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:65a0583c43d9f22cb2130c7b110e695fff834fd5e832a776a107197e59a1898e"}, + {file = "rpds_py-0.9.2-cp38-none-win32.whl", hash = "sha256:71f2f7715935a61fa3e4ae91d91b67e571aeb5cb5d10331ab681256bda2ad920"}, + {file = "rpds_py-0.9.2-cp38-none-win_amd64.whl", hash = "sha256:674c704605092e3ebbbd13687b09c9f78c362a4bc710343efe37a91457123044"}, + {file = "rpds_py-0.9.2-cp39-cp39-macosx_10_7_x86_64.whl", hash = "sha256:07e2c54bef6838fa44c48dfbc8234e8e2466d851124b551fc4e07a1cfeb37260"}, + {file = "rpds_py-0.9.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f7fdf55283ad38c33e35e2855565361f4bf0abd02470b8ab28d499c663bc5d7c"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:890ba852c16ace6ed9f90e8670f2c1c178d96510a21b06d2fa12d8783a905193"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:50025635ba8b629a86d9d5474e650da304cb46bbb4d18690532dd79341467846"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:517cbf6e67ae3623c5127206489d69eb2bdb27239a3c3cc559350ef52a3bbf0b"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0836d71ca19071090d524739420a61580f3f894618d10b666cf3d9a1688355b1"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9c439fd54b2b9053717cca3de9583be6584b384d88d045f97d409f0ca867d80f"}, + {file = "rpds_py-0.9.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:f68996a3b3dc9335037f82754f9cdbe3a95db42bde571d8c3be26cc6245f2324"}, + {file = "rpds_py-0.9.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:7d68dc8acded354c972116f59b5eb2e5864432948e098c19fe6994926d8e15c3"}, + {file = "rpds_py-0.9.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:f963c6b1218b96db85fc37a9f0851eaf8b9040aa46dec112611697a7023da535"}, + {file = "rpds_py-0.9.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:5a46859d7f947061b4010e554ccd1791467d1b1759f2dc2ec9055fa239f1bc26"}, + {file = "rpds_py-0.9.2-cp39-none-win32.whl", hash = "sha256:e07e5dbf8a83c66783a9fe2d4566968ea8c161199680e8ad38d53e075df5f0d0"}, + {file = "rpds_py-0.9.2-cp39-none-win_amd64.whl", hash = "sha256:682726178138ea45a0766907957b60f3a1bf3acdf212436be9733f28b6c5af3c"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-macosx_10_7_x86_64.whl", hash = "sha256:196cb208825a8b9c8fc360dc0f87993b8b260038615230242bf18ec84447c08d"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:c7671d45530fcb6d5e22fd40c97e1e1e01965fc298cbda523bb640f3d923b387"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83b32f0940adec65099f3b1c215ef7f1d025d13ff947975a055989cb7fd019a4"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7f67da97f5b9eac838b6980fc6da268622e91f8960e083a34533ca710bec8611"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:03975db5f103997904c37e804e5f340c8fdabbb5883f26ee50a255d664eed58c"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:987b06d1cdb28f88a42e4fb8a87f094e43f3c435ed8e486533aea0bf2e53d931"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c861a7e4aef15ff91233751619ce3a3d2b9e5877e0fcd76f9ea4f6847183aa16"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:02938432352359805b6da099c9c95c8a0547fe4b274ce8f1a91677401bb9a45f"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-musllinux_1_2_aarch64.whl", hash = "sha256:ef1f08f2a924837e112cba2953e15aacfccbbfcd773b4b9b4723f8f2ddded08e"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-musllinux_1_2_i686.whl", hash = "sha256:35da5cc5cb37c04c4ee03128ad59b8c3941a1e5cd398d78c37f716f32a9b7f67"}, + {file = "rpds_py-0.9.2-pp310-pypy310_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:141acb9d4ccc04e704e5992d35472f78c35af047fa0cfae2923835d153f091be"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-macosx_10_7_x86_64.whl", hash = "sha256:79f594919d2c1a0cc17d1988a6adaf9a2f000d2e1048f71f298b056b1018e872"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-macosx_11_0_arm64.whl", hash = "sha256:a06418fe1155e72e16dddc68bb3780ae44cebb2912fbd8bb6ff9161de56e1798"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b2eb034c94b0b96d5eddb290b7b5198460e2d5d0c421751713953a9c4e47d10"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8b08605d248b974eb02f40bdcd1a35d3924c83a2a5e8f5d0fa5af852c4d960af"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a0805911caedfe2736935250be5008b261f10a729a303f676d3d5fea6900c96a"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ab2299e3f92aa5417d5e16bb45bb4586171c1327568f638e8453c9f8d9e0f020"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8c8d7594e38cf98d8a7df25b440f684b510cf4627fe038c297a87496d10a174f"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:8b9ec12ad5f0a4625db34db7e0005be2632c1013b253a4a60e8302ad4d462afd"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-musllinux_1_2_aarch64.whl", hash = "sha256:1fcdee18fea97238ed17ab6478c66b2095e4ae7177e35fb71fbe561a27adf620"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-musllinux_1_2_i686.whl", hash = "sha256:933a7d5cd4b84f959aedeb84f2030f0a01d63ae6cf256629af3081cf3e3426e8"}, + {file = "rpds_py-0.9.2-pp38-pypy38_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:686ba516e02db6d6f8c279d1641f7067ebb5dc58b1d0536c4aaebb7bf01cdc5d"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-macosx_10_7_x86_64.whl", hash = "sha256:0173c0444bec0a3d7d848eaeca2d8bd32a1b43f3d3fde6617aac3731fa4be05f"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:d576c3ef8c7b2d560e301eb33891d1944d965a4d7a2eacb6332eee8a71827db6"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ed89861ee8c8c47d6beb742a602f912b1bb64f598b1e2f3d758948721d44d468"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:1054a08e818f8e18910f1bee731583fe8f899b0a0a5044c6e680ceea34f93876"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:99e7c4bb27ff1aab90dcc3e9d37ee5af0231ed98d99cb6f5250de28889a3d502"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c545d9d14d47be716495076b659db179206e3fd997769bc01e2d550eeb685596"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9039a11bca3c41be5a58282ed81ae422fa680409022b996032a43badef2a3752"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:fb39aca7a64ad0c9490adfa719dbeeb87d13be137ca189d2564e596f8ba32c07"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-musllinux_1_2_aarch64.whl", hash = "sha256:2d8b3b3a2ce0eaa00c5bbbb60b6713e94e7e0becab7b3db6c5c77f979e8ed1f1"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-musllinux_1_2_i686.whl", hash = "sha256:99b1c16f732b3a9971406fbfe18468592c5a3529585a45a35adbc1389a529a03"}, + {file = "rpds_py-0.9.2-pp39-pypy39_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:c27ee01a6c3223025f4badd533bea5e87c988cb0ba2811b690395dfe16088cfe"}, + {file = "rpds_py-0.9.2.tar.gz", hash = "sha256:8d70e8f14900f2657c249ea4def963bed86a29b81f81f5b76b5a9215680de945"}, +] + [[package]] name = "scp" version = "0.14.5" @@ -2209,9 +2415,9 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.link testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] [extras] -optionals = ["napalm"] +optionals = ["jsonschema", "napalm"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "31acbdd2552dcd4f4f421b8ec8bc0ba4c39aa2bf2b3b03a557ade07c64478520" +content-hash = "a998eb6100de134fda00c152b9bd73ccdb3fe2a3999046b69e091de9ffd9c77e" diff --git a/pyproject.toml b/pyproject.toml index 99b1a0e9..813961cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,9 +27,10 @@ include = [ [tool.poetry.dependencies] python = "^3.8" napalm = {version = "^4.0.0", optional = true} +jsonschema = {version = "^4.17.3", optional = true} [tool.poetry.extras] -optionals = ["napalm"] +optionals = ["jsonschema", "napalm"] [tool.poetry.dev-dependencies] bandit = "*" diff --git a/tasks.py b/tasks.py index cdbb3296..045025af 100644 --- a/tasks.py +++ b/tasks.py @@ -178,7 +178,7 @@ def pylint(context, local=INVOKE_LOCAL): context (obj): Used to run specific commands local (bool): Define as `True` to execute locally """ - exec_cmd = 'find . -name "*.py" | grep -v "tests/unit/mock" | xargs pylint' + exec_cmd = 'find . -name "*.py" | grep -vE "(tests/unit/mock|netutils/data_files)" | xargs pylint' run_cmd(context, exec_cmd, local) diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py new file mode 100644 index 00000000..b7ce7f1f --- /dev/null +++ b/tests/unit/test_acl.py @@ -0,0 +1,375 @@ +"""Test for the BGP ASN functions.""" +# pylint: disable=use-dict-literal + +import pytest + +from netutils import acl + +verify_acl = [ + { + "sent": dict( + name="Check multiple sources pass. Check convesion of non-alpha tcp, e.g. with a dash", + src_ip=["192.168.1.10", "192.168.1.11", "192.168.1.15-192.168.1.20"], + dst_ip="172.16.0.10", + dst_port="tcp/www-http", + action="permit", + ), + "received": "permit", + }, + { + "sent": dict( + name="Check with number in port definition", + src_ip="192.168.0.10", + dst_ip="192.168.250.11", + dst_port="6/80", + action="permit", + ), + "received": "permit", + }, + { + "sent": dict( + name="Check with subnets", + src_ip="192.168.0.0/25", + dst_ip="172.16.0.0/24", + dst_port="6/80", + action="permit", + ), + "received": "permit", + }, + { + "sent": dict( + name="Test partial match on Source IP", + src_ip=["192.168.1.10", "192.168.2.10"], + dst_ip="172.16.0.11", + dst_port="tcp/80", + action="permit", + ), + "received": "deny", + }, + { + "sent": dict( + name="Test an entry that is not found", + src_ip="192.168.1.10", + dst_ip="192.168.240.1", + dst_port="tcp/80", + action="permit", + ), + "received": "deny", + }, + { + "sent": dict( + name="Test an action not permit or deny", + src_ip="10.1.1.1", + dst_ip="10.255.255.255", + dst_port="tcp/443", + action="permit", + ), + "received": "deny", + }, +] + +acls = [ + dict( + name="Allow to internal web", + src_ip=["192.168.0.0/24", "10.0.0.0/16"], + dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], + dst_port=["tcp/80", "udp/53"], + action="permit", + ), + dict( + name="Allow to internal dns", + src_ip=["192.168.1.0/24"], + dst_ip=["172.16.0.0/16"], + dst_port=["tcp/80", "udp/53"], + action="permit", + ), + dict( + name="Allow to internal https", + src_ip=["10.0.0.0/8"], + dst_ip=["172.16.0.0/16"], + dst_port=["tcp/443"], + action="deny", + ), + dict( + name="Drop (not deny) this specfic packet", + src_ip="10.1.1.1", + dst_ip="10.255.255.255", + dst_port="tcp/443", + action="drop", + ), + dict( + name="Allow External DNS", + src_ip=["0.0.0.0/0"], + dst_ip=["8.8.8.8/32", "8.8.4.4/32"], + dst_port=["udp/53"], + action="permit", + ), +] + +verify_matrix = [ + { + "sent": dict( + name="Check allow", + src_ip="10.1.100.5", + dst_ip="10.1.200.0", + dst_port="tcp/www-http", + action="permit", + ), + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/80"), "action": "allow"}], + }, + { + "sent": dict( + name="Check Notify", + src_ip="10.1.100.5", + dst_ip="10.1.200.0", + dst_port="tcp/25", + action="permit", + ), + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/25"), "action": "notify"}], + }, + { + "sent": dict( + name="Check not found and denied", + src_ip="10.1.100.5", + dst_ip="10.1.200.0", + dst_port="tcp/53", + action="permit", + ), + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/53"), "action": "deny"}], + }, + { + "sent": dict( + name="Check not found and denied", + src_ip=["10.1.100.5", "10.1.100.6"], + dst_ip="10.1.200.0", + dst_port="tcp/80", + action="permit", + ), + "received": [ + {"obj": ("10.1.100.5", "10.1.200.0", "6/80"), "action": "allow"}, + {"obj": ("10.1.100.6", "10.1.200.0", "6/80"), "action": "allow"}, + ], + }, + { + "sent": dict( + name="Nothing found", + src_ip="1.1.1.1", + dst_ip="2.2.2.2", + dst_port="tcp/53", + action="permit", + ), + "received": [{"obj": ("1.1.1.1", "2.2.2.2", "6/53"), "action": "deny"}], + }, +] + +verify_schema = [ + { + "sent": dict( + name="Bad IP", + src_ip="10.1.100.A", + dst_ip="10.1.200.0", + dst_port="tcp/www-http", + action="permit", + ), + }, + { + "sent": dict( + name="Bad port", + src_ip="10.1.100.5", + dst_ip="10.1.200.0", + dst_port="tcp25", + action="permit", + ), + }, + { + "sent": dict( + name="Bad IP in list", + src_ip=["10.1.100.5", "10.1.100.A"], + dst_ip="10.1.200.0", + dst_port="tcp/25", + action="permit", + ), + }, +] + +verify_schema2 = [ + { + "sent": dict( + name="Check allow", + src_ip="10.1.100.1", + dst_ip="10.1.200.0", + dst_port="6/www-http", + action=100, + ), + }, +] + +IP_DEFINITIONS = { + "red": ["10.1.100.0/23", "10.1.102.0/24"], + "blue": ["10.1.200.0/23", "10.1.202.0/24"], + "orange": ["10.1.0.0/23", "10.1.2.0/24"], +} + +MATRIX = { + "red": {"blue": {"allow": ["6/80", "6/443"], "notify": ["6/25"]}, "orange": {"allow": ["6/80"]}}, + "blue": {"red": {"allow": ["6/80"]}}, +} + + +class TestMatrix(acl.ACLRule): + """ACLRule inherited class to test the matrix.""" + + matrix = MATRIX + matrix_enforced = True + matrix_definition = IP_DEFINITIONS + + +class TestSchema(acl.ACLRule): + """ACLRule inherited class to test the schema.""" + + input_data_verify = True + + +class TestSchema2(acl.ACLRule): + """ACLRule inherited class alternate to test the schema.""" + + result_data_verify = True + + +@pytest.mark.parametrize("data", verify_acl) +def test_verify_acl(data): + assert acl.ACLRules(acls).match(data["sent"]) == data["received"] + + +@pytest.mark.parametrize("data", verify_matrix) +def test_matrix(data): + assert TestMatrix(data["sent"]).enforce() == data["received"] + + +@pytest.mark.parametrize("data", verify_schema) +def test_schema(data): + pytest.importorskip("jsonschema") + try: + import jsonschema # pylint: disable=import-outside-toplevel + except ImportError: + pass + + with pytest.raises(jsonschema.exceptions.ValidationError): + TestSchema(data["sent"]) + + +def test_schema_not_enforced_when_option_not_set(): + try: + acl.ACLRule(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action=100)) + except Exception: # pylint: disable=broad-exception-caught + assert False, "No error should have been raised" + + +def test_schema_valid(): + try: + TestSchema(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit")) + except Exception: # pylint: disable=broad-exception-caught + assert False, "No error should have been raised" + + +@pytest.mark.parametrize("data", verify_schema2) +def test_schema2(data): + pytest.importorskip("jsonschema") + try: + import jsonschema # pylint: disable=import-outside-toplevel + except ImportError: + pass + + with pytest.raises(jsonschema.exceptions.ValidationError): + TestSchema2(data["sent"]).validate() + + +def test_schema2_valid(): + try: + TestSchema2(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit")).validate() + except Exception: # pylint: disable=broad-exception-caught + assert False, "No error should have been raised" + + +class TestAddrGroups(acl.ACLRule): + """ACLRule inherited class alternate to test expansions.""" + + address_groups = {"red": ["white", "blue"], "blue": ["cyan"], "yellow": ["orange"]} + + addresses = {"white": ["10.1.1.1", "10.2.2.2"], "cyan": ["10.3.3.3"], "orange": ["10.4.4.4"]} + + def __init__(self, data, *args, **kwargs): + self.flattened_addresses = self.flatten_addresses(self.address_groups, self.addresses) + super().__init__(data, *args, **kwargs) + + def flatten_addresses(self, address_groups, addresses): + """Go through and get the addresses given potential address groups.""" + + flattened_addresses = {} + + for group, subgroups in address_groups.items(): + if group in addresses: + flattened_addresses.setdefault(group, []).extend(addresses[group]) + for subgroup in subgroups: + if subgroup in addresses: + flattened_addresses.setdefault(group, []).extend(addresses[subgroup]) + if subgroup in address_groups: + subgroup_addresses = self.flatten_addresses({subgroup: address_groups[subgroup]}, addresses) + for sub_group, ips in subgroup_addresses.items(): + flattened_addresses.setdefault(sub_group, []).extend(ips) + if group != sub_group: + flattened_addresses.setdefault(group, []).extend(ips) + + return flattened_addresses + + def process_ip(self, ip): + """Test ability to expand IP for both source and destination.""" + + if not isinstance(ip, list): + ip = [ip] + output = [] + for ip_name in ip: + if not ip_name[0].isalpha(): + output.append(ip_name) + elif self.addresses.get(ip_name): + output.extend(self.addresses[ip_name]) + elif self.flattened_addresses.get(ip_name): + output.extend(self.flattened_addresses[ip_name]) + return sorted(list(set(output))) + + def process_src_ip(self, src_ip): + """Test ability to expand IP for both source.""" + + return self.process_ip(src_ip) + + def process_dst_ip(self, dst_ip): + """Test ability to expand IP for both destination.""" + + return self.process_ip(dst_ip) + + +add_group_check = [ + { + "sent": dict( + name="Check allow", + src_ip=["red", "blue", "10.4.4.4"], + dst_ip=["white"], + dst_port="6/www-http", + action="permit", + ), + "received": [ + {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.1.1.1"}, + {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.2.2.2"}, + {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.3.3.3"}, + {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.3.3.3"}, + {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.4.4.4"}, + {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.4.4.4"}, + ], + } +] + + +@pytest.mark.parametrize("data", add_group_check) +def test_custom_address_group(data): + obj = TestAddrGroups(data["sent"]) + assert obj.expanded_rules == data["received"] diff --git a/tests/unit/test_ip.py b/tests/unit/test_ip.py index 442fe22d..f3898382 100644 --- a/tests/unit/test_ip.py +++ b/tests/unit/test_ip.py @@ -1,4 +1,5 @@ """Test for the IP functions.""" +import ipaddress import pytest from netutils import ip @@ -160,6 +161,145 @@ }, ] +IS_IP_RANGE = [ + { + "sent": { + "ip_range": "10.1.1.1", + }, + "received": False, + }, + { + "sent": { + "ip_range": "10.1.100.10-10.1.100.1", + }, + "received": False, + }, + { + "sent": { + "ip_range": "2001::10-2001::1", + }, + "received": False, + }, + { + "sent": { + "ip_range": "10.500.100.10-10.1.100.1", + }, + "received": False, + }, + { + "sent": { + "ip_range": "NOT AN IP", + }, + "received": False, + }, + { + "sent": { + "ip_range": "255.255.255.256", + }, + "received": False, + }, + { + "sent": { + "ip_range": "10.1.100.10-10.1.100.100", + }, + "received": True, + }, + { + "sent": { + "ip_range": "2001::10-2001::100", + }, + "received": True, + }, +] + +GET_RANGE_IPS = [ + { + "sent": { + "ip_range": "2001::10-2001::100", + }, + "received": (ipaddress.IPv6Address("2001::10"), ipaddress.IPv6Address("2001::100")), + }, + { + "sent": { + "ip_range": "10.1.100.10-10.1.100.100", + }, + "received": (ipaddress.IPv4Address("10.1.100.10"), ipaddress.IPv4Address("10.1.100.100")), + }, +] + +IS_IP_WITHIN = [ + { + "sent": { + "ip": "192.168.1.10", + "ip_compare": "192.168.1.10", + }, + "received": True, + }, + { + "sent": { + "ip": "192.168.1.10", + "ip_compare": "192.168.1.0-192.168.1.20", + }, + "received": True, + }, + { + "sent": { + "ip": "192.168.1.0/24", + "ip_compare": ["192.168.1.0-192.168.1.20", "192.168.2.0-192.168.2.20"], + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.10-192.168.1.15", + "ip_compare": ["192.168.1.0-192.168.1.20", "192.168.2.0-192.168.2.20"], + }, + "received": True, + }, + { + "sent": { + "ip": "10.0.0.0/8", + "ip_compare": ["192.168.1.0/24", "172.16.0.0/12"], + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.10", + "ip_compare": "192.168.1.20", + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.10", + "ip_compare": "192.168.2.0-192.168.2.20", + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.0/24", + "ip_compare": ["192.168.2.0-192.168.2.20", "192.168.3.0-192.168.3.20"], + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.50-192.168.1.60", + "ip_compare": ["192.168.2.0-192.168.2.20", "192.168.3.0-192.168.3.20"], + }, + "received": False, + }, + { + "sent": { + "ip": "192.168.1.10", + "ip_compare": "192.168.2.0/24", + }, + "received": False, + }, +] + GET_BROADCAST_ADDRESS = [ { "sent": {"ip_network": "10.1.1.0/24"}, @@ -357,6 +497,33 @@ def test_is_ip(data): assert ip.is_ip(**data["sent"]) == data["received"] +@pytest.mark.parametrize("data", IS_IP_RANGE) +def test_is_ip_range(data): + assert ip.is_ip_range(**data["sent"]) == data["received"] + + +@pytest.mark.parametrize("data", GET_RANGE_IPS) +def test_get_range_ips(data): + assert ip.get_range_ips(**data["sent"]) == data["received"] + + +def test_get_range_ips_fail(): + with pytest.raises(ValueError, match=r"Not a valid IP range format of .*"): + data = {"ip_range": "10.1.100.10-10.1.100.1"} + ip.get_range_ips(**data) + + +@pytest.mark.parametrize("data", IS_IP_WITHIN) +def test_is_ip_within(data): + assert ip.is_ip_within(**data["sent"]) == data["received"] + + +def test_is_ip_within_fail(): + with pytest.raises(ValueError): + data = {"ip": "10.1.100.100", "ip_compare": "10.1.100.10-2001::1"} + ip.is_ip_within(**data) + + @pytest.mark.parametrize("data", GET_ALL_HOST) def test_get_all_host(data): assert list(ip.get_all_host(**data["sent"])) == data["received"] diff --git a/tests/unit/test_lib_helpers_optionals.py b/tests/unit/test_lib_helpers_optionals.py index f383a16b..9d829966 100755 --- a/tests/unit/test_lib_helpers_optionals.py +++ b/tests/unit/test_lib_helpers_optionals.py @@ -9,21 +9,7 @@ def test_get_napalm_getters_napalm_installed_default(): pytest.importorskip("napalm") with mock.patch("netutils.lib_helpers.get_network_driver"): napalm_getters = get_napalm_getters() - assert napalm_getters == { - "asa": {}, - "cisco_wlc_ssh": {}, - "eos": {}, - "fortios": {}, - "huawei": {}, - "ios": {}, - "iosxr": {}, - "junos": {}, - "nxos": {}, - "nxos_ssh": {}, - "panos": {}, - "sros": {}, - "vyos": {}, - } + assert all(item in napalm_getters.keys() for item in ["asa", "eos", "fortios"]) def test_get_napalm_getters_napalm_installed_nxos_keys(): diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 50d17983..c0e7028a 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -52,7 +52,7 @@ def get_jinja2_function_names(): def test_jinja2_mapping_contains_all_functions(get_jinja2_function_names): # pylint: disable=redefined-outer-name mapping_function_names = [path.split(".")[-1] for path in list(_JINJA2_FUNCTION_MAPPINGS.values())] - sorted_get_jinja2_function_names = sorted(get_jinja2_function_names) + sorted_get_jinja2_function_names = sorted(list(set(get_jinja2_function_names))) sorted_mapping_function_names = sorted(mapping_function_names) assert sorted_get_jinja2_function_names == sorted_mapping_function_names