diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index a7050b57498..16434a6b5f5 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -80,7 +80,6 @@ def document_by_cc_pair_cleanup_task( # delete it from vespa and the db action = "delete" - # TODO: fix the large chunks enabled chunks_affected = retry_index.delete_single( document_id, tenant_id=tenant_id, @@ -115,7 +114,6 @@ def document_by_cc_pair_cleanup_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - # TODO: fix the large chunks enabled chunks_affected = retry_index.update_single( document_id, tenant_id=tenant_id, diff --git a/backend/onyx/document_index/document_index_utils.py b/backend/onyx/document_index/document_index_utils.py index 8976b556eb2..4770a8a2843 100644 --- a/backend/onyx/document_index/document_index_utils.py +++ b/backend/onyx/document_index/document_index_utils.py @@ -37,7 +37,10 @@ def translate_boost_count_to_multiplier(boost: int) -> float: return 2 / (1 + math.exp(-1 * boost / 3)) -def assemble_document_chunk_info( +# Assembles a list of Vespa chunk IDs for a document +# given the required context. This can be used to directly query +# Vespa's Document API. +def get_document_chunk_ids( enriched_document_info_list: list[EnrichedDocumentIndexingInfo], tenant_id: str | None, large_chunks_enabled: bool, diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 78cfea85656..d8d4b41e4a4 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -26,7 +26,7 @@ from onyx.context.search.models import IndexFilters from onyx.context.search.models import InferenceChunkUncleaned from onyx.db.engine import get_session_with_tenant -from onyx.document_index.document_index_utils import assemble_document_chunk_info +from onyx.document_index.document_index_utils import get_document_chunk_ids from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentInsertionRecord from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo @@ -42,11 +42,11 @@ from onyx.document_index.vespa.chunk_retrieval import query_vespa from onyx.document_index.vespa.deletion import delete_vespa_chunks from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks -from onyx.document_index.vespa.indexing_utils import ( - check_enable_large_chunks_and_multipass, -) from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy +from onyx.document_index.vespa.indexing_utils import ( + get_multipass_config, +) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, @@ -355,7 +355,7 @@ def index( # Now, for each doc, we know exactly where to start and end our deletion # So let's generate the chunk IDs for each chunk to delete - chunks_to_delete = assemble_document_chunk_info( + chunks_to_delete = get_document_chunk_ids( enriched_document_info_list=enriched_doc_infos, tenant_id=tenant_id, large_chunks_enabled=large_chunks_enabled, @@ -458,14 +458,14 @@ def update( for update_request in update_requests: for doc_info in update_request.minimal_document_indexing_info: for index_name in index_names: - doc_chunk_info = self.enrich_basic_chunk_info( + doc_chunk_info = VespaIndex.enrich_basic_chunk_info( index_name=index_name, http_client=http_client, document_id=doc_info.doc_id, previous_chunk_count=doc_info.chunk_start_index, new_chunk_count=0, ) - doc_chunk_ids = assemble_document_chunk_info( + doc_chunk_ids = get_document_chunk_ids( enriched_document_info_list=[doc_chunk_info], tenant_id=tenant_id, large_chunks_enabled=False, @@ -578,9 +578,8 @@ def update_single( with get_vespa_http_client(http2=False) as http_client: for index_name in index_names: - large_chunks_enabled = False with get_session_with_tenant(tenant_id=tenant_id) as db_session: - multipass_config = check_enable_large_chunks_and_multipass( + multipass_config = get_multipass_config( db_session=db_session, primary_index=index_name == self.index_name, ) @@ -593,7 +592,7 @@ def update_single( previous_chunk_count=chunk_count, new_chunk_count=0, ) - doc_chunk_ids = assemble_document_chunk_info( + doc_chunk_ids = get_document_chunk_ids( enriched_document_info_list=[enriched_doc_infos], tenant_id=tenant_id, large_chunks_enabled=large_chunks_enabled, @@ -601,7 +600,6 @@ def update_single( doc_chunk_count += len(doc_chunk_ids) for doc_chunk_id in doc_chunk_ids: - print("THIS ONE is being updated") self.update_single_chunk( doc_chunk_id=doc_chunk_id, index_name=index_name, fields=fields ) @@ -615,9 +613,6 @@ def delete_single( tenant_id: str | None, chunk_count: int | None, ) -> int: - """Possibly faster overall than the delete method due to using a single - delete call with a selection query.""" - total_chunks_deleted = 0 doc_id = replace_invalid_doc_id_characters(doc_id) @@ -634,9 +629,8 @@ def delete_single( max_workers=NUM_THREADS ) as executor: for index_name in index_names: - large_chunks_enabled = False with get_session_with_tenant(tenant_id=tenant_id) as db_session: - multipass_config = check_enable_large_chunks_and_multipass( + multipass_config = get_multipass_config( db_session=db_session, primary_index=index_name == self.index_name, ) @@ -649,7 +643,7 @@ def delete_single( previous_chunk_count=chunk_count, new_chunk_count=0, ) - chunks_to_delete = assemble_document_chunk_info( + chunks_to_delete = get_document_chunk_ids( enriched_document_info_list=[enriched_doc_infos], tenant_id=tenant_id, large_chunks_enabled=large_chunks_enabled, @@ -762,6 +756,10 @@ def admin_retrieval( return query_vespa(params) + # Retrieves chunk information for a document: + # - Determines the last indexed chunk + # - Identifies if the document uses the old or new chunk ID system + # This data is crucial for Vespa document updates without relying on the visit API. @classmethod def enrich_basic_chunk_info( cls, diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index a8d35e996db..857162382ef 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -297,7 +297,7 @@ def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bo ) -def check_enable_large_chunks_and_multipass( +def get_multipass_config( db_session: Session, primary_index: bool = True ) -> MultipassConfig: """ diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index 1502ac6b3d3..74c6c08fd8a 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -36,7 +36,7 @@ from onyx.document_index.interfaces import DocumentMetadata from onyx.document_index.interfaces import IndexBatchParams from onyx.document_index.vespa.indexing_utils import ( - check_enable_large_chunks_and_multipass, + get_multipass_config, ) from onyx.indexing.chunker import Chunker from onyx.indexing.embedder import IndexingEmbedder @@ -491,9 +491,7 @@ def build_indexing_pipeline( callback: IndexingHeartbeatInterface | None = None, ) -> IndexingPipelineProtocol: """Builds a pipeline which takes in a list (batch) of docs and indexes them.""" - multipass_config = check_enable_large_chunks_and_multipass( - db_session, primary_index=True - ) + multipass_config = get_multipass_config(db_session, primary_index=True) chunker = chunker or Chunker( tokenizer=embedder.embedding_model.tokenizer,