From 8f8f68f2b294a2bf4d78586bcbec1514f8c6894d Mon Sep 17 00:00:00 2001 From: Weves Date: Sat, 4 Jan 2025 14:53:22 -0800 Subject: [PATCH] Speedup orphan doc cleanup script --- backend/scripts/orphan_doc_cleanup_script.py | 131 +++++++++++++------ 1 file changed, 90 insertions(+), 41 deletions(-) diff --git a/backend/scripts/orphan_doc_cleanup_script.py b/backend/scripts/orphan_doc_cleanup_script.py index da05f5d4aaf..00a716c8bb9 100644 --- a/backend/scripts/orphan_doc_cleanup_script.py +++ b/backend/scripts/orphan_doc_cleanup_script.py @@ -1,3 +1,4 @@ +import concurrent.futures import os import sys @@ -8,16 +9,17 @@ parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(parent_dir) +from onyx.context.search.models import IndexFilters # noqa: E402 +from onyx.document_index.interfaces import VespaChunkRequest # noqa: E402 from onyx.db.engine import get_session_context_manager # noqa: E402 from onyx.db.document import delete_documents_complete__no_commit # noqa: E402 from onyx.db.search_settings import get_current_search_settings # noqa: E402 from onyx.document_index.vespa.index import VespaIndex # noqa: E402 -from onyx.background.celery.tasks.shared.RetryDocumentIndex import ( # noqa: E402 - RetryDocumentIndex, -) +BATCH_SIZE = 100 -def _get_orphaned_document_ids(db_session: Session) -> list[str]: + +def _get_orphaned_document_ids(db_session: Session, limit: int) -> list[str]: """Get document IDs that don't have any entries in document_by_connector_credential_pair""" query = text( """ @@ -25,54 +27,101 @@ def _get_orphaned_document_ids(db_session: Session) -> list[str]: FROM document d LEFT JOIN document_by_connector_credential_pair dbcc ON d.id = dbcc.id WHERE dbcc.id IS NULL + LIMIT :limit """ ) - orphaned_ids = [doc_id[0] for doc_id in db_session.execute(query)] - print(f"Found {len(orphaned_ids)} orphaned documents") + orphaned_ids = [doc_id[0] for doc_id in db_session.execute(query, {"limit": limit})] + print(f"Found {len(orphaned_ids)} orphaned documents in this batch") return orphaned_ids def main() -> None: with get_session_context_manager() as db_session: - # Get orphaned document IDs - orphaned_ids = _get_orphaned_document_ids(db_session) - if not orphaned_ids: - print("No orphaned documents found") - return - - # Setup Vespa index - search_settings = get_current_search_settings(db_session) - index_name = search_settings.index_name - vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) - retry_index = RetryDocumentIndex(vespa_index) - - # Delete chunks from Vespa first - print("Deleting orphaned document chunks from Vespa") - successfully_vespa_deleted_doc_ids = [] - for doc_id in orphaned_ids: + total_processed = 0 + while True: + # Get orphaned document IDs in batches + orphaned_ids = _get_orphaned_document_ids(db_session, BATCH_SIZE) + if not orphaned_ids: + if total_processed == 0: + print("No orphaned documents found") + else: + print( + f"Finished processing all batches. Total documents " + f"processed: {total_processed}" + ) + return + + # Setup Vespa index + search_settings = get_current_search_settings(db_session) + index_name = search_settings.index_name + vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) + + # Delete chunks from Vespa first + print("Deleting orphaned document chunks from Vespa") + successfully_vespa_deleted_doc_ids = [] + # Process documents in parallel using ThreadPoolExecutor + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: + + def process_doc(doc_id: str) -> bool: + # Check if document exists in Vespa first + try: + chunks = vespa_index.id_based_retrieval( + chunk_requests=[ + VespaChunkRequest(document_id=doc_id, max_chunk_ind=2) + ], + filters=IndexFilters(access_control_list=None), + batch_retrieval=True, + ) + if not chunks: + print(f"Document {doc_id} not found in Vespa") + return True + except Exception as e: + print( + f"Error checking if document {doc_id} exists in Vespa: {e}" + ) + return False + + try: + print(f"Deleting document {doc_id} in Vespa") + chunks_deleted = vespa_index.delete_single(doc_id) + if chunks_deleted > 0: + print( + f"Deleted {chunks_deleted} chunks for document {doc_id}" + ) + return True + except Exception as e: + print( + f"Error deleting document {doc_id} in Vespa and " + f"will not delete from Postgres: {e}" + ) + return False + + # Submit all tasks and gather results + futures = [ + executor.submit(process_doc, doc_id) for doc_id in orphaned_ids + ] + for future in concurrent.futures.as_completed(futures): + doc_id = future.result() + if doc_id: + successfully_vespa_deleted_doc_ids.append(doc_id) + + # Delete documents from Postgres + print("Deleting orphaned documents from Postgres") try: - chunks_deleted = retry_index.delete_single(doc_id) - successfully_vespa_deleted_doc_ids.append(doc_id) - if chunks_deleted > 0: - print(f"Deleted {chunks_deleted} chunks for document {doc_id}") - except Exception as e: - print( - f"Error deleting document {doc_id} in Vespa and will not delete from Postgres: {e}" + delete_documents_complete__no_commit( + db_session, successfully_vespa_deleted_doc_ids ) + db_session.commit() + except Exception as e: + print(f"Error deleting documents from Postgres: {e}") + break - # Delete documents from Postgres - print("Deleting orphaned documents from Postgres") - try: - delete_documents_complete__no_commit( - db_session, successfully_vespa_deleted_doc_ids + total_processed += len(successfully_vespa_deleted_doc_ids) + print( + f"Successfully cleaned up {len(successfully_vespa_deleted_doc_ids)}" + f" orphaned documents in this batch" ) - db_session.commit() - except Exception as e: - print(f"Error deleting documents from Postgres: {e}") - - print( - f"Successfully cleaned up {len(successfully_vespa_deleted_doc_ids)} orphaned documents" - ) + print(f"Total documents processed so far: {total_processed}") if __name__ == "__main__":