diff --git a/app/__init__.py b/app/__init__.py index 69fe2e6..ed35e8a 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,4 @@ # __init__.py # Relative Path: app/__init__.py from .utils import logger +from .processes import preprocess_data diff --git a/app/database/clickhouse/process_tests.py b/app/database/clickhouse/process_tests.py index 4fc6a8f..3f0ee0c 100644 --- a/app/database/clickhouse/process_tests.py +++ b/app/database/clickhouse/process_tests.py @@ -20,91 +20,34 @@ def insert_axe_into_clickhouse(data): """Inserts data into ClickHouse. Args: - data (list): The list of tuples containing the data to be inserted. + data (list): The list of dictionaries containing the data to be inserted. """ - # and then for each row in your data... - for row in data: - # logger.debug(f'Inserting data into ClickHouse: /n/n{data}') - - # this gives current datetime - now = datetime.now() - # this is of String type and gives string in the 'YYYY-MM-DD hh:mm:ss' format - now_str = now.strftime('%Y-%m-%d %H:%M:%S') - created_at = f"'{now_str}'" - - # parse JSON in 'nodes' column - nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) - # for every 'node' in 'nodes' do an individual insert - if not nodes: - nodes = [{}] - for node in nodes: - - # Here we make manual adjustments to extract values from the JSON object - # like 'target', 'html', 'failureSummary' - # and ensure that other values are correctly formatted for insertion - - # Sanitize Failure Summary Data - failure_summary = node.get('failureSummary') - if failure_summary is not None: - # Replace newline characters - failure_summary = failure_summary.replace('\n', ' ') - # Replace null characters - failure_summary = failure_summary.replace('\0', '') - # Escape special characters using repr - failure_summary = repr(failure_summary) - else: - failure_summary = "NULL" - - # Get & Sanitize html - html_string = node.get('html') - clean_html = sanitize_html(html_string) - - # Use `row` object to get `created_at` - if row.get('created_at') is not None: - tested_at = f"'{row.get('created_at').strftime('%Y-%m-%d %H:%M:%S')}'" - else: - tested_at = "NULL" - - - query = f""" - INSERT INTO axe_tests - ( - domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy - ) - VALUES ( - {row.get('domain_id', 0)}, '{row.get('domain', '')}', {row.get('url_id', 0)}, '{row.get('url', '')}', - {row.get('scan_id', 0)}, {row.get('rule_id', 0)}, '{uuid.uuid4()}', {tested_at}, - '{row.get('rule_type', '')}', '{row.get('axe_id', '')}', '{node.get('impact', '')}', - '{node.get('target', [None])[0] if node.get('target') is not None else ''}', - {clean_html}, {failure_summary}, {created_at}, - {row.get('active', 1)}, - {row.get('section508', 0)}, {row.get('super_waggy', 0)} - )""" - try: - client.execute(query) - except Exception as e: - # Log the relevant parts of the exception - exception_traceback = traceback.format_exc() - logger.error(f'Failed to insert data into ClickHouse. HTML being processed: {html}') - logger.error(f'Failed Query:\n{query}') - logger.error(f'Exception: {str(e)}') - logger.debug(f'Exception Traceback:\n{exception_traceback}') - + # Prepare the rows as tuples + rows = [ + (row.get('domain_id'), row.get('domain'), row.get('url_id'), row.get('url'), row.get('scan_id'), + row.get('rule_id'), row.get('test_id'), row.get('tested_at'), row.get('rule_type'), row.get('axe_id'), + row.get('impact'), row.get('target'), row.get('html'), row.get('failure_summary'), row.get('created_at'), + row.get('active'), row.get('section508'), row.get('super_waggy')) + for row in data + ] + + query = """ + INSERT INTO axe_tests + ( + domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy + ) VALUES """ + + try: + rows_inserted = client.execute(query, rows, types_check=True) + logger.info(f'{len(data)} rows inserted into ClickHouse') + except Exception as e: + # Log the relevant parts of the exception + exception_traceback = traceback.format_exc() + logger.error(f'Failed Query:\n{query}') + logger.error(f'Exception: {str(e)}') + # logger.debug(f'Exception Traceback:\n{exception_traceback}') # close the client connection client.disconnect() - logger.debug('ClickHouse Connection Closed') -def sanitize_html(html_string): - html_string = html_string if html_string is not None else "NULL" - html_string = html_string.strip("'") - # Unescape any HTML special characters - html_string = html.unescape(html_string) - # Replace newline and tab characters - html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') - # Remove any other potential special characters - html_string = html_string.translate({ord(i): None for i in '\0'}) - # Escape special characters - html_string = repr(html_string) - return html_string \ No newline at end of file diff --git a/app/database/postgres/fetch_unprocessed.py b/app/database/postgres/fetch_unprocessed.py index fe8b7e7..96b112a 100644 --- a/app/database/postgres/fetch_unprocessed.py +++ b/app/database/postgres/fetch_unprocessed.py @@ -10,7 +10,7 @@ session = SessionLocal() -def fetch_unprocessed_rules(limit=1000): +def fetch_unprocessed_rules(limit=10000): """Fetches all rule_id that are not processed yet.""" result = session.execute(text(""" SELECT id as rule_id diff --git a/app/processes/__init__.py b/app/processes/__init__.py index 0b87ea5..1087075 100644 --- a/app/processes/__init__.py +++ b/app/processes/__init__.py @@ -1 +1,5 @@ -from .axe import get_axes, execute_axes \ No newline at end of file +# __init__.py +# Relative Path: app/processes/__init__.py + +from .axe import get_axes, execute_axes +from .preprocess_tests import preprocess_data diff --git a/app/processes/axe.py b/app/processes/axe.py index b997c3e..9fb06c2 100644 --- a/app/processes/axe.py +++ b/app/processes/axe.py @@ -13,22 +13,25 @@ from sqlalchemy.orm import sessionmaker from ..database import axe_postgres, axe_clickhouse from ..utils import logger +from .preprocess_tests import preprocess_data # define how to get data from postgres def get_axes(new_data_id): data = axe_postgres(new_data_id) - # logger.debug(f'{data}') return data +# define how to preprocess data +def preprocess_axes(data): + preprocessed_data = preprocess_data(data) + return preprocessed_data # define how to put data into ClickHouse def throw_axes(data): axe_clickhouse(data) - # for executing both functions in sequence def execute_axes(new_data_id): data = get_axes(new_data_id) - throw_axes(data) - logger.debug('Inserting into Clickhouse') \ No newline at end of file + preprocessed_data = preprocess_axes(data) # preprocess the data + throw_axes(preprocessed_data) \ No newline at end of file diff --git a/app/processes/preprocess_tests.py b/app/processes/preprocess_tests.py new file mode 100644 index 0000000..a9fd824 --- /dev/null +++ b/app/processes/preprocess_tests.py @@ -0,0 +1,163 @@ +# preprocess_tests.py +# Relative Path: app/processes/preprocess_tests.py +""" +Preprocesses test data for insertion into ClickHouse +""" + +import json +import html +import uuid +from datetime import datetime +from ..utils import logger + + +def preprocess_data(data): + """Preprocesses data for insertion into ClickHouse. + + Args: + data (list): The list of tuples containing the data to be preprocessed. + + Returns: + list: The list of preprocessed tuples ready for insertion. + """ + preprocessed_data = [] + + for row in data: + try: + # Parse JSON in 'nodes' column + nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) + # For every 'node' in 'nodes' do an individual insert + if not nodes: + nodes = [{}] + for node in nodes: + # Sanitize Failure Summary Data + failure_summary = sanitize_failure_summary(node.get('failureSummary')) + + # Get & Sanitize html + clean_html = sanitize_html(node.get('html')) + + # Select the first target... + target = process_target(node.get('target', ['']), row, node) + + # Replace None with default values + preprocessed_row = { + "domain_id": row.get('domain_id', 0), + "domain": row.get('domain', ''), + "url_id": row.get('url_id', 0), + "url": row.get('url', ''), + "scan_id": row.get('scan_id', 0), + "rule_id": row.get('rule_id', 0), + "test_id": str(uuid.uuid4()), + "tested_at": format_datetime(row.get('created_at')), + "rule_type": row.get('rule_type', ''), + "axe_id": row.get('axe_id', ''), + "impact": node.get('impact', ''), + "target": target, # Use the selected target here. + "html": clean_html, + "failure_summary": failure_summary, + "created_at": format_datetime(datetime.now()), + "active": row.get('active', 1), + "section508": row.get('section508', 0), + "super_waggy": row.get('super_waggy', 0) + } + for key, value in preprocessed_row.items(): + if value is None: + preprocessed_row[key] = '' + + preprocessed_data.append(preprocessed_row) + + except Exception as e: + logger.error(f"Error while processing row: {row}") + logger.error(f"Node: {node}") + logger.error(f"Exception: {str(e)}") + continue # skip to the next row + + return preprocessed_data + + + +def process_target(target, row, node): + """ + Process the target value. + If the target is a list with all same values, return the single value. + If the target is a list with different values, log the message and return the first value. + If the target is None, return an empty string. + If the target is a single value, return the value. + """ + if isinstance(target, list): + # Flatten the list if it contains sublists + flat_target = [item for sublist in target for item in sublist] if all(isinstance(sub, list) for sub in target) else target + if len(flat_target) == 0: + return '' + elif len(set(flat_target)) > 1: # set(flat_target) creates a set, which removes duplicates + logger.warning(f"DUPLICATE TARGET: {row['url_id']}, {row['rule_id']}, {row['scan_id']}, {format_datetime(row.get('created_at'))}, {row.get('axe_id')}") + return flat_target[0] + else: + return flat_target[0] + elif target is None: + return '' + else: + return target + + +def sanitize_failure_summary(failure_summary): + """Sanitizes the failure summary data. + + Args: + failure_summary (str): The failure summary data to be sanitized. + + Returns: + str: The sanitized failure summary data. + """ + if failure_summary is not None: + failure_summary = failure_summary.replace('\n', ' ') + failure_summary = failure_summary.replace('\0', '') + else: + failure_summary = "NULL" + return repr(failure_summary) + + +def sanitize_html(html_string): + """Sanitizes the html data. + + Args: + html_string (str): The html data to be sanitized. + + Returns: + str: The sanitized html data. + """ + if html_string is None: + return repr("NULL") + + # Check & fix UTF8 issues + html_string = properly_encode_html(html_string) + html_string = html_string.strip("'") + html_string = html.unescape(html_string) + html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') + html_string = html_string.translate({ord(i): None for i in '\0'}) + return repr(html_string) + + +def properly_encode_html(html_string): + try: + html_string.encode('utf-8') + except UnicodeEncodeError: + logger.debug('String is NOT UTF8, fixing...') + # Replace invalid characters with a replacement character + html_string = html_string.encode('utf-8', errors='replace').decode('utf-8') + return html_string + + +def format_datetime(dt): + """Formats a datetime object. + + Args: + dt (datetime): The datetime object to be formatted. + + Returns: + datetime: The formatted datetime. + """ + if dt is not None: + return dt + else: + return None diff --git a/requirements.txt b/requirements.txt index 7fd8810..643522d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ markdown-it-py==3.0.0 mdurl==0.1.2 psycopg2==2.9.6 Pygments==2.15.1 +PyMySQL==1.1.0 python-dotenv==1.0.0 pytz==2023.3 rich==13.4.2