From c39b21cbdcfd1106e2f918982b9e983bf6502e42 Mon Sep 17 00:00:00 2001 From: Bentley Hensel Date: Tue, 11 Jul 2023 16:53:30 -0400 Subject: [PATCH 1/5] Refactor --- app/__init__.py | 1 + app/database/clickhouse/process_tests.py | 107 +++++---------------- app/database/postgres/fetch_unprocessed.py | 2 +- app/processes/__init__.py | 6 +- app/processes/axe.py | 10 +- app/processes/preprocess_tests.py | 107 +++++++++++++++++++++ requirements.txt | 1 + 7 files changed, 147 insertions(+), 87 deletions(-) create mode 100644 app/processes/preprocess_tests.py diff --git a/app/__init__.py b/app/__init__.py index 69fe2e6..ed35e8a 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,4 @@ # __init__.py # Relative Path: app/__init__.py from .utils import logger +from .processes import preprocess_data diff --git a/app/database/clickhouse/process_tests.py b/app/database/clickhouse/process_tests.py index 4fc6a8f..4cafed7 100644 --- a/app/database/clickhouse/process_tests.py +++ b/app/database/clickhouse/process_tests.py @@ -20,91 +20,34 @@ def insert_axe_into_clickhouse(data): """Inserts data into ClickHouse. Args: - data (list): The list of tuples containing the data to be inserted. + data (list): The list of dictionaries containing the data to be inserted. """ - # and then for each row in your data... - for row in data: - # logger.debug(f'Inserting data into ClickHouse: /n/n{data}') - - # this gives current datetime - now = datetime.now() - # this is of String type and gives string in the 'YYYY-MM-DD hh:mm:ss' format - now_str = now.strftime('%Y-%m-%d %H:%M:%S') - created_at = f"'{now_str}'" - - # parse JSON in 'nodes' column - nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) - # for every 'node' in 'nodes' do an individual insert - if not nodes: - nodes = [{}] - for node in nodes: - - # Here we make manual adjustments to extract values from the JSON object - # like 'target', 'html', 'failureSummary' - # and ensure that other values are correctly formatted for insertion - - # Sanitize Failure Summary Data - failure_summary = node.get('failureSummary') - if failure_summary is not None: - # Replace newline characters - failure_summary = failure_summary.replace('\n', ' ') - # Replace null characters - failure_summary = failure_summary.replace('\0', '') - # Escape special characters using repr - failure_summary = repr(failure_summary) - else: - failure_summary = "NULL" - - # Get & Sanitize html - html_string = node.get('html') - clean_html = sanitize_html(html_string) - - # Use `row` object to get `created_at` - if row.get('created_at') is not None: - tested_at = f"'{row.get('created_at').strftime('%Y-%m-%d %H:%M:%S')}'" - else: - tested_at = "NULL" - - - query = f""" - INSERT INTO axe_tests - ( - domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy - ) - VALUES ( - {row.get('domain_id', 0)}, '{row.get('domain', '')}', {row.get('url_id', 0)}, '{row.get('url', '')}', - {row.get('scan_id', 0)}, {row.get('rule_id', 0)}, '{uuid.uuid4()}', {tested_at}, - '{row.get('rule_type', '')}', '{row.get('axe_id', '')}', '{node.get('impact', '')}', - '{node.get('target', [None])[0] if node.get('target') is not None else ''}', - {clean_html}, {failure_summary}, {created_at}, - {row.get('active', 1)}, - {row.get('section508', 0)}, {row.get('super_waggy', 0)} - )""" - try: - client.execute(query) - except Exception as e: - # Log the relevant parts of the exception - exception_traceback = traceback.format_exc() - logger.error(f'Failed to insert data into ClickHouse. HTML being processed: {html}') - logger.error(f'Failed Query:\n{query}') - logger.error(f'Exception: {str(e)}') - logger.debug(f'Exception Traceback:\n{exception_traceback}') - + # Prepare the rows as strings + rows_str = ', '.join([ + f"({row.get('domain_id')}, '{row.get('domain')}', {row.get('url_id')}, '{row.get('url')}', {row.get('scan_id')}, {row.get('rule_id')}, '{row.get('test_id')}', '{row.get('tested_at')}', '{row.get('rule_type')}', '{row.get('axe_id')}', '{row.get('impact')}', '{row.get('target')}', {row.get('html')}, {row.get('failure_summary')}, '{row.get('created_at')}', {row.get('active')}, {row.get('section508')}, {row.get('super_waggy')})" + for row in data + ]) + + query = f""" + INSERT INTO axe_tests + ( + domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy + ) + VALUES {rows_str}""" + + try: + rows_inserted = client.execute(query) + logger.info(f'{len(data)} rows inserted into ClickHouse') + except Exception as e: + # Log the relevant parts of the exception + exception_traceback = traceback.format_exc() + # Get the first few rows of data for logging + first_few_rows = rows_str.split(", ")[:5] + logger.error(f'First few rows of data:\n{first_few_rows}') + logger.error(f'Exception: {str(e)}') + logger.debug(f'Exception Traceback:\n{exception_traceback}') # close the client connection client.disconnect() logger.debug('ClickHouse Connection Closed') - -def sanitize_html(html_string): - html_string = html_string if html_string is not None else "NULL" - html_string = html_string.strip("'") - # Unescape any HTML special characters - html_string = html.unescape(html_string) - # Replace newline and tab characters - html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') - # Remove any other potential special characters - html_string = html_string.translate({ord(i): None for i in '\0'}) - # Escape special characters - html_string = repr(html_string) - return html_string \ No newline at end of file diff --git a/app/database/postgres/fetch_unprocessed.py b/app/database/postgres/fetch_unprocessed.py index fe8b7e7..46b9fe3 100644 --- a/app/database/postgres/fetch_unprocessed.py +++ b/app/database/postgres/fetch_unprocessed.py @@ -10,7 +10,7 @@ session = SessionLocal() -def fetch_unprocessed_rules(limit=1000): +def fetch_unprocessed_rules(limit=2): """Fetches all rule_id that are not processed yet.""" result = session.execute(text(""" SELECT id as rule_id diff --git a/app/processes/__init__.py b/app/processes/__init__.py index 0b87ea5..1087075 100644 --- a/app/processes/__init__.py +++ b/app/processes/__init__.py @@ -1 +1,5 @@ -from .axe import get_axes, execute_axes \ No newline at end of file +# __init__.py +# Relative Path: app/processes/__init__.py + +from .axe import get_axes, execute_axes +from .preprocess_tests import preprocess_data diff --git a/app/processes/axe.py b/app/processes/axe.py index b997c3e..feec062 100644 --- a/app/processes/axe.py +++ b/app/processes/axe.py @@ -13,22 +13,26 @@ from sqlalchemy.orm import sessionmaker from ..database import axe_postgres, axe_clickhouse from ..utils import logger +from .preprocess_tests import preprocess_data # define how to get data from postgres def get_axes(new_data_id): data = axe_postgres(new_data_id) - # logger.debug(f'{data}') return data +# define how to preprocess data +def preprocess_axes(data): + preprocessed_data = preprocess_data(data) + return preprocessed_data # define how to put data into ClickHouse def throw_axes(data): axe_clickhouse(data) - # for executing both functions in sequence def execute_axes(new_data_id): data = get_axes(new_data_id) - throw_axes(data) + preprocessed_data = preprocess_axes(data) # preprocess the data + throw_axes(preprocessed_data) logger.debug('Inserting into Clickhouse') \ No newline at end of file diff --git a/app/processes/preprocess_tests.py b/app/processes/preprocess_tests.py new file mode 100644 index 0000000..1bd2e22 --- /dev/null +++ b/app/processes/preprocess_tests.py @@ -0,0 +1,107 @@ +# preprocess_tests.py +# Relative Path: app/processes/preprocess_tests.py +""" +Preprocesses test data for insertion into ClickHouse +""" + +import json +import html +import uuid +from datetime import datetime + +def preprocess_data(data): + """Preprocesses data for insertion into ClickHouse. + + Args: + data (list): The list of tuples containing the data to be preprocessed. + + Returns: + list: The list of preprocessed tuples ready for insertion. + """ + preprocessed_data = [] + + for row in data: + # Parse JSON in 'nodes' column + nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) + # For every 'node' in 'nodes' do an individual insert + if not nodes: + nodes = [{}] + for node in nodes: + # Sanitize Failure Summary Data + failure_summary = sanitize_failure_summary(node.get('failureSummary')) + + # Get & Sanitize html + clean_html = sanitize_html(node.get('html')) + + # Preprocess data and append to preprocessed_data + preprocessed_data.append({ + "domain_id": row.get('domain_id', 0), + "domain": row.get('domain', ''), + "url_id": row.get('url_id', 0), + "url": row.get('url', ''), + "scan_id": row.get('scan_id', 0), + "rule_id": row.get('rule_id', 0), + "test_id": str(uuid.uuid4()), + "tested_at": format_datetime(row.get('created_at')), + "rule_type": row.get('rule_type', ''), + "axe_id": row.get('axe_id', ''), + "impact": node.get('impact', ''), + "target": node.get('target', [None])[0] if node.get('target') is not None else '', + "html": clean_html, + "failure_summary": failure_summary, + "created_at": format_datetime(datetime.now()), + "active": row.get('active', 1), + "section508": row.get('section508', 0), + "super_waggy": row.get('super_waggy', 0) + }) + + return preprocessed_data + + +def sanitize_failure_summary(failure_summary): + """Sanitizes the failure summary data. + + Args: + failure_summary (str): The failure summary data to be sanitized. + + Returns: + str: The sanitized failure summary data. + """ + if failure_summary is not None: + failure_summary = failure_summary.replace('\n', ' ') + failure_summary = failure_summary.replace('\0', '') + else: + failure_summary = "NULL" + return repr(failure_summary) + + +def sanitize_html(html_string): + """Sanitizes the html data. + + Args: + html_string (str): The html data to be sanitized. + + Returns: + str: The sanitized html data. + """ + html_string = html_string if html_string is not None else "NULL" + html_string = html_string.strip("'") + html_string = html.unescape(html_string) + html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') + html_string = html_string.translate({ord(i): None for i in '\0'}) + return repr(html_string) + + +def format_datetime(dt): + """Formats a datetime object into a string. + + Args: + dt (datetime): The datetime object to be formatted. + + Returns: + str: The formatted datetime string. + """ + if dt is not None: + return dt.strftime('%Y-%m-%d %H:%M:%S') + else: + return "NULL" diff --git a/requirements.txt b/requirements.txt index 7fd8810..643522d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ markdown-it-py==3.0.0 mdurl==0.1.2 psycopg2==2.9.6 Pygments==2.15.1 +PyMySQL==1.1.0 python-dotenv==1.0.0 pytz==2023.3 rich==13.4.2 From 9ffdaec3725475038027078d8eb0b5f4b6918318 Mon Sep 17 00:00:00 2001 From: Bentley Hensel Date: Tue, 11 Jul 2023 17:16:21 -0400 Subject: [PATCH 2/5] Error Handling --- app/database/clickhouse/process_tests.py | 24 ++++++------- app/database/postgres/fetch_unprocessed.py | 2 +- app/processes/axe.py | 3 +- app/processes/preprocess_tests.py | 41 ++++++++++++++-------- 4 files changed, 41 insertions(+), 29 deletions(-) diff --git a/app/database/clickhouse/process_tests.py b/app/database/clickhouse/process_tests.py index 4cafed7..ac8b11a 100644 --- a/app/database/clickhouse/process_tests.py +++ b/app/database/clickhouse/process_tests.py @@ -22,32 +22,32 @@ def insert_axe_into_clickhouse(data): Args: data (list): The list of dictionaries containing the data to be inserted. """ - # Prepare the rows as strings - rows_str = ', '.join([ - f"({row.get('domain_id')}, '{row.get('domain')}', {row.get('url_id')}, '{row.get('url')}', {row.get('scan_id')}, {row.get('rule_id')}, '{row.get('test_id')}', '{row.get('tested_at')}', '{row.get('rule_type')}', '{row.get('axe_id')}', '{row.get('impact')}', '{row.get('target')}', {row.get('html')}, {row.get('failure_summary')}, '{row.get('created_at')}', {row.get('active')}, {row.get('section508')}, {row.get('super_waggy')})" + # Prepare the rows as tuples + rows = [ + (row.get('domain_id'), row.get('domain'), row.get('url_id'), row.get('url'), row.get('scan_id'), + row.get('rule_id'), row.get('test_id'), row.get('tested_at'), row.get('rule_type'), row.get('axe_id'), + row.get('impact'), row.get('target'), row.get('html'), row.get('failure_summary'), row.get('created_at'), + row.get('active'), row.get('section508'), row.get('super_waggy')) for row in data - ]) + ] - query = f""" + query = """ INSERT INTO axe_tests ( domain_id, domain, url_id, url, scan_id, rule_id, test_id, tested_at, rule_type, axe_id, impact, target, html, failure_summary, created_at, active, section508, super_waggy - ) - VALUES {rows_str}""" + ) VALUES """ try: - rows_inserted = client.execute(query) + rows_inserted = client.execute(query, rows, types_check=True) logger.info(f'{len(data)} rows inserted into ClickHouse') except Exception as e: # Log the relevant parts of the exception exception_traceback = traceback.format_exc() - # Get the first few rows of data for logging - first_few_rows = rows_str.split(", ")[:5] - logger.error(f'First few rows of data:\n{first_few_rows}') + logger.error(f'Failed Query:\n{query}') logger.error(f'Exception: {str(e)}') logger.debug(f'Exception Traceback:\n{exception_traceback}') # close the client connection client.disconnect() - logger.debug('ClickHouse Connection Closed') + diff --git a/app/database/postgres/fetch_unprocessed.py b/app/database/postgres/fetch_unprocessed.py index 46b9fe3..624993a 100644 --- a/app/database/postgres/fetch_unprocessed.py +++ b/app/database/postgres/fetch_unprocessed.py @@ -10,7 +10,7 @@ session = SessionLocal() -def fetch_unprocessed_rules(limit=2): +def fetch_unprocessed_rules(limit=10): """Fetches all rule_id that are not processed yet.""" result = session.execute(text(""" SELECT id as rule_id diff --git a/app/processes/axe.py b/app/processes/axe.py index feec062..9fb06c2 100644 --- a/app/processes/axe.py +++ b/app/processes/axe.py @@ -34,5 +34,4 @@ def throw_axes(data): def execute_axes(new_data_id): data = get_axes(new_data_id) preprocessed_data = preprocess_axes(data) # preprocess the data - throw_axes(preprocessed_data) - logger.debug('Inserting into Clickhouse') \ No newline at end of file + throw_axes(preprocessed_data) \ No newline at end of file diff --git a/app/processes/preprocess_tests.py b/app/processes/preprocess_tests.py index 1bd2e22..5a89c75 100644 --- a/app/processes/preprocess_tests.py +++ b/app/processes/preprocess_tests.py @@ -8,6 +8,7 @@ import html import uuid from datetime import datetime +from ..utils import logger def preprocess_data(data): """Preprocesses data for insertion into ClickHouse. @@ -36,20 +37,20 @@ def preprocess_data(data): # Preprocess data and append to preprocessed_data preprocessed_data.append({ "domain_id": row.get('domain_id', 0), - "domain": row.get('domain', ''), + "domain": row.get('domain', '') or '', "url_id": row.get('url_id', 0), - "url": row.get('url', ''), + "url": row.get('url', '') or '', "scan_id": row.get('scan_id', 0), "rule_id": row.get('rule_id', 0), "test_id": str(uuid.uuid4()), - "tested_at": format_datetime(row.get('created_at')), - "rule_type": row.get('rule_type', ''), - "axe_id": row.get('axe_id', ''), - "impact": node.get('impact', ''), - "target": node.get('target', [None])[0] if node.get('target') is not None else '', - "html": clean_html, - "failure_summary": failure_summary, - "created_at": format_datetime(datetime.now()), + "tested_at": format_datetime(row.get('created_at')) or '', + "rule_type": row.get('rule_type', '') or '', + "axe_id": row.get('axe_id', '') or '', + "impact": node.get('impact', '') or '', + "target": node.get('target', [None])[0] if node.get('target') is not None else '' or '', + "html": clean_html or '', + "failure_summary": failure_summary or '', + "created_at": format_datetime(datetime.now()) or '', "active": row.get('active', 1), "section508": row.get('section508', 0), "super_waggy": row.get('super_waggy', 0) @@ -85,6 +86,8 @@ def sanitize_html(html_string): str: The sanitized html data. """ html_string = html_string if html_string is not None else "NULL" + # Check & fix UTF8 issues + html_string = properly_encode_html(html_string) html_string = html_string.strip("'") html_string = html.unescape(html_string) html_string = html_string.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') @@ -92,16 +95,26 @@ def sanitize_html(html_string): return repr(html_string) +def properly_encode_html(html_string): + try: + html_string.encode('utf-8') + except UnicodeEncodeError: + logger.debug('String is NOT UTF8, fixing...') + # Replace invalid characters with a replacement character + html_string = html_string.encode('utf-8', errors='replace').decode('utf-8') + return html_string + + def format_datetime(dt): - """Formats a datetime object into a string. + """Formats a datetime object. Args: dt (datetime): The datetime object to be formatted. Returns: - str: The formatted datetime string. + datetime: The formatted datetime. """ if dt is not None: - return dt.strftime('%Y-%m-%d %H:%M:%S') + return dt else: - return "NULL" + return None From f912529a063c768ba0876ed0f1f45e3a5e42bcc5 Mon Sep 17 00:00:00 2001 From: Bentley Hensel Date: Tue, 11 Jul 2023 17:35:23 -0400 Subject: [PATCH 3/5] Reduced logging --- app/database/clickhouse/process_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/database/clickhouse/process_tests.py b/app/database/clickhouse/process_tests.py index ac8b11a..3f0ee0c 100644 --- a/app/database/clickhouse/process_tests.py +++ b/app/database/clickhouse/process_tests.py @@ -45,7 +45,7 @@ def insert_axe_into_clickhouse(data): exception_traceback = traceback.format_exc() logger.error(f'Failed Query:\n{query}') logger.error(f'Exception: {str(e)}') - logger.debug(f'Exception Traceback:\n{exception_traceback}') + # logger.debug(f'Exception Traceback:\n{exception_traceback}') # close the client connection client.disconnect() From 097832eea1b14df03d53948b3d3f01afe8d9a0d0 Mon Sep 17 00:00:00 2001 From: Bentley Hensel Date: Tue, 11 Jul 2023 18:03:49 -0400 Subject: [PATCH 4/5] =?UTF-8?q?Kindof=20fix=20target=20issue=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/processes/preprocess_tests.py | 111 +++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 34 deletions(-) diff --git a/app/processes/preprocess_tests.py b/app/processes/preprocess_tests.py index 5a89c75..a9fd824 100644 --- a/app/processes/preprocess_tests.py +++ b/app/processes/preprocess_tests.py @@ -10,6 +10,7 @@ from datetime import datetime from ..utils import logger + def preprocess_data(data): """Preprocesses data for insertion into ClickHouse. @@ -22,43 +23,83 @@ def preprocess_data(data): preprocessed_data = [] for row in data: - # Parse JSON in 'nodes' column - nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) - # For every 'node' in 'nodes' do an individual insert - if not nodes: - nodes = [{}] - for node in nodes: - # Sanitize Failure Summary Data - failure_summary = sanitize_failure_summary(node.get('failureSummary')) - - # Get & Sanitize html - clean_html = sanitize_html(node.get('html')) - - # Preprocess data and append to preprocessed_data - preprocessed_data.append({ - "domain_id": row.get('domain_id', 0), - "domain": row.get('domain', '') or '', - "url_id": row.get('url_id', 0), - "url": row.get('url', '') or '', - "scan_id": row.get('scan_id', 0), - "rule_id": row.get('rule_id', 0), - "test_id": str(uuid.uuid4()), - "tested_at": format_datetime(row.get('created_at')) or '', - "rule_type": row.get('rule_type', '') or '', - "axe_id": row.get('axe_id', '') or '', - "impact": node.get('impact', '') or '', - "target": node.get('target', [None])[0] if node.get('target') is not None else '' or '', - "html": clean_html or '', - "failure_summary": failure_summary or '', - "created_at": format_datetime(datetime.now()) or '', - "active": row.get('active', 1), - "section508": row.get('section508', 0), - "super_waggy": row.get('super_waggy', 0) - }) + try: + # Parse JSON in 'nodes' column + nodes = row['nodes'] if isinstance(row['nodes'], list) else json.loads(row['nodes']) + # For every 'node' in 'nodes' do an individual insert + if not nodes: + nodes = [{}] + for node in nodes: + # Sanitize Failure Summary Data + failure_summary = sanitize_failure_summary(node.get('failureSummary')) + + # Get & Sanitize html + clean_html = sanitize_html(node.get('html')) + + # Select the first target... + target = process_target(node.get('target', ['']), row, node) + + # Replace None with default values + preprocessed_row = { + "domain_id": row.get('domain_id', 0), + "domain": row.get('domain', ''), + "url_id": row.get('url_id', 0), + "url": row.get('url', ''), + "scan_id": row.get('scan_id', 0), + "rule_id": row.get('rule_id', 0), + "test_id": str(uuid.uuid4()), + "tested_at": format_datetime(row.get('created_at')), + "rule_type": row.get('rule_type', ''), + "axe_id": row.get('axe_id', ''), + "impact": node.get('impact', ''), + "target": target, # Use the selected target here. + "html": clean_html, + "failure_summary": failure_summary, + "created_at": format_datetime(datetime.now()), + "active": row.get('active', 1), + "section508": row.get('section508', 0), + "super_waggy": row.get('super_waggy', 0) + } + for key, value in preprocessed_row.items(): + if value is None: + preprocessed_row[key] = '' + + preprocessed_data.append(preprocessed_row) + + except Exception as e: + logger.error(f"Error while processing row: {row}") + logger.error(f"Node: {node}") + logger.error(f"Exception: {str(e)}") + continue # skip to the next row return preprocessed_data + +def process_target(target, row, node): + """ + Process the target value. + If the target is a list with all same values, return the single value. + If the target is a list with different values, log the message and return the first value. + If the target is None, return an empty string. + If the target is a single value, return the value. + """ + if isinstance(target, list): + # Flatten the list if it contains sublists + flat_target = [item for sublist in target for item in sublist] if all(isinstance(sub, list) for sub in target) else target + if len(flat_target) == 0: + return '' + elif len(set(flat_target)) > 1: # set(flat_target) creates a set, which removes duplicates + logger.warning(f"DUPLICATE TARGET: {row['url_id']}, {row['rule_id']}, {row['scan_id']}, {format_datetime(row.get('created_at'))}, {row.get('axe_id')}") + return flat_target[0] + else: + return flat_target[0] + elif target is None: + return '' + else: + return target + + def sanitize_failure_summary(failure_summary): """Sanitizes the failure summary data. @@ -85,7 +126,9 @@ def sanitize_html(html_string): Returns: str: The sanitized html data. """ - html_string = html_string if html_string is not None else "NULL" + if html_string is None: + return repr("NULL") + # Check & fix UTF8 issues html_string = properly_encode_html(html_string) html_string = html_string.strip("'") From b532932f23af45914c9ab74182eb499c7c8d7129 Mon Sep 17 00:00:00 2001 From: Bentley Hensel Date: Tue, 11 Jul 2023 18:13:48 -0400 Subject: [PATCH 5/5] Big speed! --- app/database/postgres/fetch_unprocessed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/database/postgres/fetch_unprocessed.py b/app/database/postgres/fetch_unprocessed.py index 624993a..96b112a 100644 --- a/app/database/postgres/fetch_unprocessed.py +++ b/app/database/postgres/fetch_unprocessed.py @@ -10,7 +10,7 @@ session = SessionLocal() -def fetch_unprocessed_rules(limit=10): +def fetch_unprocessed_rules(limit=10000): """Fetches all rule_id that are not processed yet.""" result = session.execute(text(""" SELECT id as rule_id