Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedy #1

Merged
merged 5 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# __init__.py
# Relative Path: app/__init__.py
from .utils import logger
from .processes import preprocess_data
107 changes: 25 additions & 82 deletions app/database/clickhouse/process_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,91 +20,34 @@ def insert_axe_into_clickhouse(data):
"""Inserts data into ClickHouse.

Args:
data (list): The list of tuples containing the data to be inserted.
data (list): The list of dictionaries containing the data to be inserted.
"""
# and then for each row in your data...
for row in data:
# logger.debug(f'Inserting data into ClickHouse: /n/n{data}')

# this gives current datetime
now = datetime.now()
# this is of String type and gives string in the 'YYYY-MM-DD hh:mm:ss' format
now_str = now.strftime('%Y-%m-%d %H:%M:%S')
created_at = f"'{now_str}'"

# parse JSON in 'nodes' column
nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes'])
# for every 'node' in 'nodes' do an individual insert
if not nodes:
nodes = [{}]
for node in nodes:

# Here we make manual adjustments to extract values from the JSON object
# like 'target', 'html', 'failureSummary'
# and ensure that other values are correctly formatted for insertion

# Sanitize Failure Summary Data
failure_summary = node.get('failureSummary')
if failure_summary is not None:
# Replace newline characters
failure_summary = failure_summary.replace('\n', ' ')
# Replace null characters
failure_summary = failure_summary.replace('\0', '')
# Escape special characters using repr
failure_summary = repr(failure_summary)
else:
failure_summary = "NULL"

# Get & Sanitize html
html_string = node.get('html')
clean_html = sanitize_html(html_string)

# Use `row` object to get `created_at`
if row.get('created_at') is not None:
tested_at = f"'{row.get('created_at').strftime('%Y-%m-%d %H:%M:%S')}'"
else:
tested_at = "NULL"


query = f"""
INSERT INTO axe_tests
(
domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy
)
VALUES (
{row.get('domain_id', 0)}, '{row.get('domain', '')}', {row.get('url_id', 0)}, '{row.get('url', '')}',
{row.get('scan_id', 0)}, {row.get('rule_id', 0)}, '{uuid.uuid4()}', {tested_at},
'{row.get('rule_type', '')}', '{row.get('axe_id', '')}', '{node.get('impact', '')}',
'{node.get('target', [None])[0] if node.get('target') is not None else ''}',
{clean_html}, {failure_summary}, {created_at},
{row.get('active', 1)},
{row.get('section508', 0)}, {row.get('super_waggy', 0)}
)"""
try:
client.execute(query)
except Exception as e:
# Log the relevant parts of the exception
exception_traceback = traceback.format_exc()
logger.error(f'Failed to insert data into ClickHouse. HTML being processed: {html}')
logger.error(f'Failed Query:\n{query}')
logger.error(f'Exception: {str(e)}')
logger.debug(f'Exception Traceback:\n{exception_traceback}')

# Prepare the rows as tuples
rows = [
(row.get('domain_id'), row.get('domain'), row.get('url_id'), row.get('url'), row.get('scan_id'),
row.get('rule_id'), row.get('test_id'), row.get('tested_at'), row.get('rule_type'), row.get('axe_id'),
row.get('impact'), row.get('target'), row.get('html'), row.get('failure_summary'), row.get('created_at'),
row.get('active'), row.get('section508'), row.get('super_waggy'))
for row in data
]

query = """
INSERT INTO axe_tests
(
domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy
) VALUES """

try:
rows_inserted = client.execute(query, rows, types_check=True)
logger.info(f'{len(data)} rows inserted into ClickHouse')
except Exception as e:
# Log the relevant parts of the exception
exception_traceback = traceback.format_exc()
logger.error(f'Failed Query:\n{query}')
logger.error(f'Exception: {str(e)}')
# logger.debug(f'Exception Traceback:\n{exception_traceback}')

# close the client connection
client.disconnect()
logger.debug('ClickHouse Connection Closed')


def sanitize_html(html_string):
html_string = html_string if html_string is not None else "NULL"
html_string = html_string.strip("'")
# Unescape any HTML special characters
html_string = html.unescape(html_string)
# Replace newline and tab characters
html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
# Remove any other potential special characters
html_string = html_string.translate({ord(i): None for i in '\0'})
# Escape special characters
html_string = repr(html_string)
return html_string
2 changes: 1 addition & 1 deletion app/database/postgres/fetch_unprocessed.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
session = SessionLocal()


def fetch_unprocessed_rules(limit=1000):
def fetch_unprocessed_rules(limit=10000):
"""Fetches all rule_id that are not processed yet."""
result = session.execute(text("""
SELECT id as rule_id
Expand Down
6 changes: 5 additions & 1 deletion app/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from .axe import get_axes, execute_axes
# __init__.py
# Relative Path: app/processes/__init__.py

from .axe import get_axes, execute_axes
from .preprocess_tests import preprocess_data
11 changes: 7 additions & 4 deletions app/processes/axe.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@
from sqlalchemy.orm import sessionmaker
from ..database import axe_postgres, axe_clickhouse
from ..utils import logger
from .preprocess_tests import preprocess_data


# define how to get data from postgres
def get_axes(new_data_id):
data = axe_postgres(new_data_id)
# logger.debug(f'{data}')
return data

# define how to preprocess data
def preprocess_axes(data):
preprocessed_data = preprocess_data(data)
return preprocessed_data

# define how to put data into ClickHouse
def throw_axes(data):
axe_clickhouse(data)


# for executing both functions in sequence
def execute_axes(new_data_id):
data = get_axes(new_data_id)
throw_axes(data)
logger.debug('Inserting into Clickhouse')
preprocessed_data = preprocess_axes(data) # preprocess the data
throw_axes(preprocessed_data)
163 changes: 163 additions & 0 deletions app/processes/preprocess_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# preprocess_tests.py
# Relative Path: app/processes/preprocess_tests.py
"""
Preprocesses test data for insertion into ClickHouse
"""

import json
import html
import uuid
from datetime import datetime
from ..utils import logger


def preprocess_data(data):
"""Preprocesses data for insertion into ClickHouse.

Args:
data (list): The list of tuples containing the data to be preprocessed.

Returns:
list: The list of preprocessed tuples ready for insertion.
"""
preprocessed_data = []

for row in data:
try:
# Parse JSON in 'nodes' column
nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes'])
# For every 'node' in 'nodes' do an individual insert
if not nodes:
nodes = [{}]
for node in nodes:
# Sanitize Failure Summary Data
failure_summary = sanitize_failure_summary(node.get('failureSummary'))

# Get & Sanitize html
clean_html = sanitize_html(node.get('html'))

# Select the first target...
target = process_target(node.get('target', ['']), row, node)

# Replace None with default values
preprocessed_row = {
"domain_id": row.get('domain_id', 0),
"domain": row.get('domain', ''),
"url_id": row.get('url_id', 0),
"url": row.get('url', ''),
"scan_id": row.get('scan_id', 0),
"rule_id": row.get('rule_id', 0),
"test_id": str(uuid.uuid4()),
"tested_at": format_datetime(row.get('created_at')),
"rule_type": row.get('rule_type', ''),
"axe_id": row.get('axe_id', ''),
"impact": node.get('impact', ''),
"target": target, # Use the selected target here.
"html": clean_html,
"failure_summary": failure_summary,
"created_at": format_datetime(datetime.now()),
"active": row.get('active', 1),
"section508": row.get('section508', 0),
"super_waggy": row.get('super_waggy', 0)
}
for key, value in preprocessed_row.items():
if value is None:
preprocessed_row[key] = ''

preprocessed_data.append(preprocessed_row)

except Exception as e:
logger.error(f"Error while processing row: {row}")
logger.error(f"Node: {node}")
logger.error(f"Exception: {str(e)}")
continue # skip to the next row

return preprocessed_data



def process_target(target, row, node):
"""
Process the target value.
If the target is a list with all same values, return the single value.
If the target is a list with different values, log the message and return the first value.
If the target is None, return an empty string.
If the target is a single value, return the value.
"""
if isinstance(target, list):
# Flatten the list if it contains sublists
flat_target = [item for sublist in target for item in sublist] if all(isinstance(sub, list) for sub in target) else target
if len(flat_target) == 0:
return ''
elif len(set(flat_target)) > 1: # set(flat_target) creates a set, which removes duplicates
logger.warning(f"DUPLICATE TARGET: {row['url_id']}, {row['rule_id']}, {row['scan_id']}, {format_datetime(row.get('created_at'))}, {row.get('axe_id')}")
return flat_target[0]
else:
return flat_target[0]
elif target is None:
return ''
else:
return target


def sanitize_failure_summary(failure_summary):
"""Sanitizes the failure summary data.

Args:
failure_summary (str): The failure summary data to be sanitized.

Returns:
str: The sanitized failure summary data.
"""
if failure_summary is not None:
failure_summary = failure_summary.replace('\n', ' ')
failure_summary = failure_summary.replace('\0', '')
else:
failure_summary = "NULL"
return repr(failure_summary)


def sanitize_html(html_string):
"""Sanitizes the html data.

Args:
html_string (str): The html data to be sanitized.

Returns:
str: The sanitized html data.
"""
if html_string is None:
return repr("NULL")

# Check & fix UTF8 issues
html_string = properly_encode_html(html_string)
html_string = html_string.strip("'")
html_string = html.unescape(html_string)
html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
html_string = html_string.translate({ord(i): None for i in '\0'})
return repr(html_string)


def properly_encode_html(html_string):
try:
html_string.encode('utf-8')
except UnicodeEncodeError:
logger.debug('String is NOT UTF8, fixing...')
# Replace invalid characters with a replacement character
html_string = html_string.encode('utf-8', errors='replace').decode('utf-8')
return html_string


def format_datetime(dt):
"""Formats a datetime object.

Args:
dt (datetime): The datetime object to be formatted.

Returns:
datetime: The formatted datetime.
"""
if dt is not None:
return dt
else:
return None
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ markdown-it-py==3.0.0
mdurl==0.1.2
psycopg2==2.9.6
Pygments==2.15.1
PyMySQL==1.1.0
python-dotenv==1.0.0
pytz==2023.3
rich==13.4.2
Expand Down