Skip to content

Commit

Permalink
feat: transform api responses to model objects
Browse files Browse the repository at this point in the history
  • Loading branch information
linuxdaemon committed May 13, 2024
1 parent eb0aa04 commit 0efdc56
Show file tree
Hide file tree
Showing 8 changed files with 1,202 additions and 8 deletions.
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"click>=8.1",
"python-gvm>=23.12.0",
"typing_extensions",
"lxml",
"pydantic-xml[lxml]",
]

[project.urls]
Expand Down Expand Up @@ -151,6 +151,7 @@ python_version = "3.9"
warn_unused_configs = true
strict = true
strict_optional = true
plugins = ["pydantic_xml.mypy"]
check_untyped_defs = true
show_error_codes = true
warn_unused_ignores = true
Expand All @@ -175,6 +176,12 @@ enable_error_code = [
"ignore-without-code",
]
untyped_calls_exclude = ["gvm.transforms"]
follow_imports = "silent"

[tool.pydantic-mypy]
init_forbid_extra = true
init_typed = true
warn_required_dynamic_aliases = true

[tool.pytest.ini_options]
minversion = "6.0"
Expand Down
4 changes: 2 additions & 2 deletions src/gvm_sync_targets/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#
# SPDX-License-Identifier: MIT

import sys

if __name__ == "__main__":
import sys

from gvm_sync_targets.cli import gvm_sync_targets

sys.exit(gvm_sync_targets())
34 changes: 29 additions & 5 deletions src/gvm_sync_targets/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from gvm.transforms import EtreeCheckCommandTransform

from gvm_sync_targets import __version__
from gvm_sync_targets.models import ModelTransform
from gvm_sync_targets.models.assets_response import GetAssetsResponse
from gvm_sync_targets.util import Element, target_in_use, to_str


Expand All @@ -36,12 +38,34 @@ def gvm_sync_targets(

with Gmp(
DebugConnection(UnixSocketConnection()),
transform=EtreeCheckCommandTransform(),
transform=ModelTransform(),
) as gmp:
hosts = hosts_file.read().splitlines()
gmp.authenticate(username, password)
existing_hosts: Element = gmp.get_hosts(details=True)
existing_hosts: GetAssetsResponse = gmp.get_hosts(details=True)
to_add = hosts.copy()
for host in cast(list[Element], existing_hosts.xpath("host")):
print(host.attrib)
print(list(host))
to_remove: list[str] = []

for host in existing_hosts.assets:
ips = [
identifier.value
for identifier in host.identifiers.identifiers
if identifier.name == "ip"
]

if len(ips) > 1:
raise ValueError(f"Multiple IPs?: {ips}")

for ip in ips:
if ip in to_add:
to_add.remove(ip)
elif ip not in hosts:
to_remove.append(host.uuid)

for ip in to_add:
gmp.create_host(ip)

for uuid in to_remove:
gmp.delete_host(uuid)

click.echo(f"Added {len(to_add)} hosts, removed {len(to_remove)}.")
25 changes: 25 additions & 0 deletions src/gvm_sync_targets/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SPDX-FileCopyrightText: 2024-present linuxdaemon <linuxdaemon.irc@gmail.com>
#
# SPDX-License-Identifier: MIT

from collections.abc import Mapping
from typing import Type, cast

from gvm.transforms import EtreeCheckCommandTransform

from gvm_sync_targets.models.assets_response import GetAssetsResponse
from gvm_sync_targets.models.model import Model
from gvm_sync_targets.util import Element

_MODEL_MAP: Mapping[str, type[Model]] = {
"get_assets_response": GetAssetsResponse,
}


class ModelTransform(EtreeCheckCommandTransform):
def __init__(self) -> None:
super().__init__() # type: ignore[no-untyped-call]

def __call__(self, data: str) -> Model:
elem = cast(Element, super().__call__(data))
return _MODEL_MAP[elem.tag].from_xml_tree(elem) # type: ignore[arg-type] # etree.Element isn't a type but that's what this function expects
138 changes: 138 additions & 0 deletions src/gvm_sync_targets/models/assets_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# SPDX-FileCopyrightText: 2024-present linuxdaemon <linuxdaemon.irc@gmail.com>
#
# SPDX-License-Identifier: MIT

import datetime
from typing import Annotated, Optional

from pydantic import PlainSerializer
from pydantic_xml import attr, element, wrapped

from gvm_sync_targets.models.model import Model

IntBoolean = Annotated[
bool, PlainSerializer(lambda value: 1 if value else 0, return_type=int)
]


class Severity(Model, tag="severity"):
value: float = element()


class Source(Model, tag="source"):
uuid: str = attr("id")

type: str = element()
data: Optional[str] = element(default=None)
deleted: IntBoolean = element(default=False)
name: Optional[str] = element(default=None)


class Detail(Model, tag="detail"):
name: str = element()
value: str = element()
source: Source = element()


class Host(Model, tag="host"):
severity: Severity = element()
details: list[Detail] = element("detail")


class Owner(Model, tag="owner"):
name: str = element()


class Permission(Model, tag="permission"):
name: str = element()


class Permissions(Model, tag="permissions"):
permissions: list[Permission] = element()


class OS(Model, tag="os"):
uuid: str = attr("id")

title: str = element()


class Identifier(Model, tag="identifier"):
uuid: str = attr("id")

name: str = element()
value: str = element()
creation_time: datetime.datetime = element()
modification_time: datetime.datetime = element()

source: Optional[Source] = None
os: Optional[OS] = None


class Identifiers(Model, tag="identifiers"):
identifiers: list[Identifier] = element()


class Asset(Model, tag="asset"):
uuid: str = attr("id")

owner: Owner
name: str = element()
comment: Optional[str] = element(default=None)
creation_time: datetime.datetime = element()
modification_time: datetime.datetime = element()
writable: IntBoolean = element()
in_use: IntBoolean = element()

permissions: Permissions = element()
identifiers: Identifiers = element()
type: str = element()
host: Host = element("host")


class Keyword(Model, tag="keyword"):
column: str = element()
relation: str = element()
value: str = element()


class Keywords(Model, tag="keywords"):
keywords: list[Keyword] = element()


class Filters(Model, tag="filters"):
uuid: str = attr("id")

term: str = element()
keywords: Keywords = element()


class AssetCount(Model, tag="asset_count"):
total: int
filtered: int = element()
page: int = element()


class Pagination(Model):
start: int = attr()
max: int = attr()


class SortField(Model, tag="field"):
name: str
order: str = element()


class Sort(Model, tag="sort"):
field: SortField


class GetAssetsResponse(Model, tag="get_assets_response"):
status: int = attr("status")
status_text: str = attr("status_text")

assets: list[Asset] = []
filters: Filters
sort: Sort
pagination: Pagination = element("assets")
asset_count: AssetCount
9 changes: 9 additions & 0 deletions src/gvm_sync_targets/models/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# SPDX-FileCopyrightText: 2024-present linuxdaemon <linuxdaemon.irc@gmail.com>
#
# SPDX-License-Identifier: MIT

from pydantic_xml import BaseXmlModel


class Model(BaseXmlModel, extra="forbid"):
pass
3 changes: 3 additions & 0 deletions tests/gvm_sync_targets/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2024-present linuxdaemon <linuxdaemon.irc@gmail.com>
#
# SPDX-License-Identifier: MIT
Loading

0 comments on commit 0efdc56

Please sign in to comment.