Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCAP report and remediation #1441

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions airgun/entities/oscapreport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from wait_for import wait_for

from airgun.entities.base import BaseEntity
from airgun.navigation import NavigateStep, navigator
from airgun.utils import retry_navigation
from airgun.views.oscapreport import (
RemediateModal,
SCAPReportDetailsView,
SCAPReportView,
)


class OSCAPReportEntity(BaseEntity):
endpoint_path = '/compliance/arf_reports'

def search(self, search_string):
"""Search for SCAP Report

:param search_string: how to find the SCAP Report
:return: result of the SCAP Report search
"""
view = self.navigate_to(self, 'All')
return view.search(search_string)

def details(self, search_string, widget_names=None, limit=None):
"""Read the content from corresponding SCAP Report dashboard,
clicking on the link in Reported At column of
SCAP Report list

:param search_string:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this search string ? is this any type of entity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately a link to the correct arf report is in the column Reported At and you get there by clicking the link saying e.g. 1 day ago... so search actually searches by a (not visible) id in the link.
You call it like this:

session.oscapreport.remediate(f'id={arf_id}', title)

:param limit: how many rules results to fetch at most
:return: list of dictionaries with values from SCAP Report Details View
"""
view = self.navigate_to(self, 'Details', search_string=search_string)
return view.read(widget_names=widget_names, limit=limit)

def remediate(self, search_string, resource):
"""Remediate the failed rule using automatic remediation through Ansible

:param search_string:
"""
view = self.navigate_to(self, 'Details', search_string=search_string)
view.table.row(resource=resource).actions.fill('Remediation')
view = RemediateModal(self.browser)
view.wait_displayed()
self.browser.plugin.ensure_page_safe()
wait_for(lambda: view.title.is_displayed, timeout=10, delay=1)
view.fill({'select_remediation_method.snippet': 'Ansible'})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ansible is a default value? you could expect some more options there ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what this function does as per docstring: Remediate the failed rule using automatic remediation through Ansible.
It surely can be changed to allow for different methods. On one hand, it's not a goal of this function/test to test various methods of remote execution. On the other hand, there is a wizard specifically for remediation that lets you choose so IMO this will be a good addition, later.

view.select_capsule.run.click()


@navigator.register(OSCAPReportEntity, 'All')
class ShowAllSCAPReports(NavigateStep):
"""Navigate to Compliance Reports screen."""

VIEW = SCAPReportView

@retry_navigation
def step(self, *args, **kwargs):
self.view.menu.select('Hosts', 'Compliance', 'Reports')


@navigator.register(OSCAPReportEntity, 'Details')
class DetailsSCAPReport(NavigateStep):
"""To get data from ARF report view

Args:
search_string: what to fill to find the SCAP report
"""

VIEW = SCAPReportDetailsView

def prerequisite(self, *args, **kwargs):
return self.navigate_to(self.obj, 'All')

def step(self, *args, **kwargs):
search_string = kwargs.get('search_string')
self.parent.search(search_string)
self.parent.table.row()['Reported At'].widget.click()
6 changes: 6 additions & 0 deletions airgun/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from airgun.entities.os import OperatingSystemEntity
from airgun.entities.oscapcontent import OSCAPContentEntity
from airgun.entities.oscappolicy import OSCAPPolicyEntity
from airgun.entities.oscapreport import OSCAPReportEntity
from airgun.entities.oscaptailoringfile import OSCAPTailoringFileEntity
from airgun.entities.package import PackageEntity
from airgun.entities.partitiontable import PartitionTableEntity
Expand Down Expand Up @@ -570,6 +571,11 @@ def oscappolicy(self):
"""Instance of OSCAP Policy entity."""
return self._open(OSCAPPolicyEntity)

@cached_property
def oscapreport(self):
"""Instance of OSCAP Report entity."""
return self._open(OSCAPReportEntity)

@cached_property
def oscaptailoringfile(self):
"""Instance of OSCAP Tailoring File entity."""
Expand Down
11 changes: 9 additions & 2 deletions airgun/views/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,28 @@ def select_logout(self):
self.account_menu.click()
self.logout.click()

def read(self, widget_names=None):
def read(self, widget_names=None, limit=None):
lhellebr marked this conversation as resolved.
Show resolved Hide resolved
lhellebr marked this conversation as resolved.
Show resolved Hide resolved
"""Reads the contents of the view and presents them as a dictionary.

:param widget_names: If specified, will read only the widgets names in the list.
:param limit: how many entries to fetch at most

:return: A :py:class:`dict` of ``widget_name: widget_read_value``
where the values are retrieved using the :py:meth:`Widget.read`.
"""
if widget_names is None:
if limit is not None:
raise NotImplementedError("You must specify widgets to be able to specify limit")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/widgets/widget_names is that right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... you specify widgets by their names

return super().read()
if not isinstance(widget_names, list | tuple):
widget_names = [widget_names]
values = {}
for widget_name in widget_names:
values[widget_name] = get_widget_by_name(self, widget_name).read()
widget = get_widget_by_name(self, widget_name)
if hasattr(widget, 'read_limited') and callable(widget.read_limited):
values[widget_name] = widget.read(limit=limit)
else:
values[widget_name] = widget.read()
return normalize_dict_values(values)


Expand Down
83 changes: 83 additions & 0 deletions airgun/views/oscapreport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from widgetastic.widget import Text, View
from widgetastic_patternfly4 import Button
from widgetastic_patternfly4.ouia import FormSelect

from airgun.views.common import BaseLoggedInView, SearchableViewMixin, WizardStepView
from airgun.widgets import (
ActionsDropdown,
SatTable,
)


class SCAPReportView(BaseLoggedInView, SearchableViewMixin):
title = Text("//h1[normalize-space(.)='Compliance Reports']")
table = SatTable(
'.//table',
column_widgets={
'Host': Text(".//a[contains(@href,'/new/hosts')]"),
'Reported At': Text(".//a[contains(@href,'/compliance/arf_reports')]"),
'Policy': Text(".//a[contains(@href,'/compliance/policies')]"),
'Openscap Capsule': Text(".//a[contains(@href,'/smart_proxies')]"),
'Passed': Text(".//span[contains(@class,'label-info')]"),
'Failed': Text(".//span[contains(@class,'label-danger')]"),
'Other': Text(".//span[contains(@class,'label-warning')]"),
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"),
},
)

@property
def is_displayed(self):
return self.browser.wait_for_element(self.title, exception=False) is not None


class SCAPReportDetailsView(BaseLoggedInView):
show_log_messages_label = Text('//span[normalize-space(.)="Show log messages:"]')
table = SatTable(
'.//table',
column_widgets={
'Result': Text('./span[1]'),
'Message': Text('./span[2]'),
'Resource': Text('./span[3]'),
'Severity': Text('./img[1]'),
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"),
},
)

@property
def is_displayed(self):
return (
self.browser.wait_for_element(self.show_log_messages_label, exception=False) is not None
)


class RemediateModal(View):
"""
Class representing the "Remediate" modal.
It contains multiple nested classes each representing a step of the wizard.
"""

ROOT = '//div[contains(@data-ouia-component-id, "OUIA-Generated-Modal-large-")]'

title = Text('.//h2[contains(@class, "pf-c-title")]')
close_modal = Button(locator='.//button[@aria-label="Close"]')

@View.nested
class select_remediation_method(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Select snippet")]'
)
snippet = FormSelect('snippet-select')

@View.nested
class name_source(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review hosts")]'
)
host_table = SatTable(".//table")

@View.nested
class select_capsule(WizardStepView):
expander = Text(
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review remediation")]'
)
run = Button(locator='.//button[normalize-space(.)="Run"]')
47 changes: 46 additions & 1 deletion airgun/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,11 +1929,56 @@ def has_rows(self):
return False
return True

def read(self):
def read_limited(self, limit):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhellebr nice addition!

"""This is almost the same as inherited read but has a limit. Use it for tables that take too long to read.
Reads the table. Returns a list, every item in the list is contents read from the row."""
rows = list(self)
# Cut the unwanted rows if necessary
if self.rows_ignore_top is not None:
rows = rows[self.rows_ignore_top :]
if self.rows_ignore_bottom is not None and self.rows_ignore_bottom > 0:
rows = rows[: -self.rows_ignore_bottom]
if self.assoc_column_position is None:
ret = []
rows_read = 0
for row in rows:
if rows_read >= limit:
break
ret.append(row.read())
rows_read = rows_read + 1
return ret
else:
result = {}
rows_read = 0
for row in rows:
if rows_read >= limit:
break
row_read = row.read()
try:
key = row_read.pop(self.header_index_mapping[self.assoc_column_position])
except KeyError:
try:
key = row_read.pop(self.assoc_column_position)
except KeyError:
try:
key = row_read.pop(self.assoc_column)
except KeyError as e:
raise ValueError(
f"The assoc_column={self.assoc_column!r} could not be retrieved"
) from e
if key in result:
raise ValueError(f"Duplicate value for {key}={result[key]!r}")
result[key] = row_read
rows_read = rows_read + 1
return result

def read(self, limit=None):
"""Return empty list in case table is empty"""
if not self.has_rows:
self.logger.debug(f'Table {self.locator} is empty')
return []
if limit is not None:
return self.read_limited(limit)
if self.pagination.is_displayed:
return self._read_all()
return super().read()
Expand Down
Loading