From acda42204acbe71425e984483cf7d8ce8fbe8178 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Wed, 1 Jan 2025 17:04:15 -0500 Subject: [PATCH 1/2] Revert "Revert "More efficient Vespa indexing (#3552)"" This reverts commit 3eb72e5c1df90102a2b36a9d3a47d0a8e4282e6b. --- backend/onyx/connectors/web/connector.py | 1 - backend/onyx/db/models.py | 1 + backend/onyx/document_index/interfaces.py | 7 -- .../vespa/app_config/schemas/danswer_chunk.sd | 3 + backend/onyx/document_index/vespa/index.py | 93 +++++++++++-------- .../document_index/vespa/indexing_utils.py | 88 ++++++++++++++++++ .../shared_utils/vespa_request_builders.py | 23 +++++ .../onyx/document_index/vespa_constants.py | 1 + backend/onyx/indexing/indexing_pipeline.py | 4 + backend/onyx/indexing/models.py | 5 + backend/onyx/seeding/load_docs.py | 3 +- .../query_time_check/seed_dummy_docs.py | 2 + .../app/chat/documentSidebar/ChatFilters.tsx | 1 - 13 files changed, 181 insertions(+), 51 deletions(-) diff --git a/backend/onyx/connectors/web/connector.py b/backend/onyx/connectors/web/connector.py index f15632b1037..2c3ea064d86 100644 --- a/backend/onyx/connectors/web/connector.py +++ b/backend/onyx/connectors/web/connector.py @@ -359,7 +359,6 @@ def load_from_state(self) -> GenerateDocumentsOutput: continue parsed_html = web_html_cleanup(soup, self.mintlify_cleanup) - doc_batch.append( Document( id=current_url, diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 47170f93b22..730a69de284 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -504,6 +504,7 @@ class Document(Base): last_synced: Mapped[datetime.datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, index=True ) + # The following are not attached to User because the account/email may not be known # within Onyx # Something like the document creator diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index 1f6386b09ea..3d27415a069 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -148,7 +148,6 @@ class Indexable(abc.ABC): def index( self, chunks: list[DocMetadataAwareIndexChunk], - fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: """ Takes a list of document chunks and indexes them in the document index @@ -166,15 +165,9 @@ def index( only needs to index chunks into the PRIMARY index. Do not update the secondary index here, it is done automatically outside of this code. - NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously - indexed for the given index/tenant. This can be used to optimize the indexing process for - new or empty indices. - Parameters: - chunks: Document chunks with all of the information needed for indexing to the document index. - - fresh_index: Boolean indicating whether this is a fresh index with no existing documents. - Returns: List of document ids which map to unique documents and are used for deduping chunks when updating, as well as if the document is newly indexed or already existed and diff --git a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd index 2fd861b779e..4f2aaa9a7d4 100644 --- a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd +++ b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd @@ -10,6 +10,9 @@ schema DANSWER_CHUNK_NAME { field chunk_id type int { indexing: summary | attribute } + field current_index_time type int { + indexing: summary | attribute + } # Displayed in the UI as the main identifier for the doc field semantic_identifier type string { indexing: summary | attribute diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 1b7478f8cd3..abdd37d1baf 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -42,12 +42,15 @@ from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy from onyx.document_index.vespa.indexing_utils import ( - get_existing_documents_from_chunks, + find_existing_docs_in_vespa_by_doc_id, ) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, ) +from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( + build_deletion_selection_query, +) from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( build_vespa_filters, ) @@ -307,47 +310,35 @@ def register_multitenant_indices( def index( self, chunks: list[DocMetadataAwareIndexChunk], - fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: - """Receive a list of chunks from a batch of documents and index the chunks into Vespa along - with updating the associated permissions. Assumes that a document will not be split into - multiple chunk batches calling this function multiple times, otherwise only the last set of - chunks will be kept""" - # IMPORTANT: This must be done one index at a time, do not use secondary index here - cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] - - existing_docs: set[str] = set() + """ + Index a list of chunks into Vespa. We rely on 'current_index_time' + to keep track of when each chunk was added/updated in the index. We also raise a ValueError + if any chunk is missing a 'current_index_time' timestamp. + """ - # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for - # indexing / updates / deletes since we have to make a large volume of requests. - with ( - concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, - get_vespa_http_client() as http_client, - ): - if not fresh_index: - # Check for existing documents, existing documents need to have all of their chunks deleted - # prior to indexing as the document size (num chunks) may have shrunk - first_chunks = [ - chunk for chunk in cleaned_chunks if chunk.chunk_id == 0 - ] - for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): - existing_docs.update( - get_existing_documents_from_chunks( - chunks=chunk_batch, - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) - ) + # Clean chunks if needed (remove invalid chars, etc.) + cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] - for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): - delete_vespa_docs( - document_ids=doc_id_batch, - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) + # We will store the set of doc_ids that previously existed in Vespa + doc_ids_to_current_index_time = { + chunk.source_document.id: chunk.current_index_time + for chunk in cleaned_chunks + } + existing_doc_ids = set() + + with get_vespa_http_client() as http_client, concurrent.futures.ThreadPoolExecutor( + max_workers=NUM_THREADS + ) as executor: + # a) Find which docs already exist in Vespa + existing_doc_ids = find_existing_docs_in_vespa_by_doc_id( + doc_ids=list(doc_ids_to_current_index_time.keys()), + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + # b) Feed new/updated chunks in batches for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, @@ -357,14 +348,34 @@ def index( executor=executor, ) - all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks} + # c) Remove chunks with using versioning scheme 'current_index_time' + for doc_id in existing_doc_ids: + version_cutoff = int(doc_ids_to_current_index_time[doc_id].timestamp()) + query_str = build_deletion_selection_query( + doc_id=doc_id, + version_cutoff=version_cutoff, + doc_type=self.index_name, + ) + delete_url = ( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/" + f"?{query_str}&cluster={DOCUMENT_INDEX_NAME}" + ) + try: + resp = http_client.delete(delete_url) + resp.raise_for_status() + except httpx.HTTPStatusError: + logger.exception( + f"Selection-based delete failed for doc_id='{doc_id}'" + ) + raise + # Produce insertion records specifying which documents existed prior return { DocumentInsertionRecord( document_id=doc_id, - already_existed=doc_id in existing_docs, + already_existed=(doc_id in existing_doc_ids), ) - for doc_id in all_doc_ids + for doc_id in doc_ids_to_current_index_time } @staticmethod diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index bfb0bd94163..1d52c2e67e7 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -1,8 +1,11 @@ import concurrent.futures import json +import urllib.parse from datetime import datetime from datetime import timezone from http import HTTPStatus +from typing import List +from typing import Set import httpx from retry import retry @@ -21,6 +24,7 @@ from onyx.document_index.vespa_constants import CHUNK_ID from onyx.document_index.vespa_constants import CONTENT from onyx.document_index.vespa_constants import CONTENT_SUMMARY +from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT @@ -32,6 +36,7 @@ from onyx.document_index.vespa_constants import METADATA_SUFFIX from onyx.document_index.vespa_constants import NUM_THREADS from onyx.document_index.vespa_constants import PRIMARY_OWNERS +from onyx.document_index.vespa_constants import SEARCH_ENDPOINT from onyx.document_index.vespa_constants import SECONDARY_OWNERS from onyx.document_index.vespa_constants import SECTION_CONTINUATION from onyx.document_index.vespa_constants import SEMANTIC_IDENTIFIER @@ -168,6 +173,7 @@ def _index_vespa_chunk( METADATA_SUFFIX: chunk.metadata_suffix_keyword, EMBEDDINGS: embeddings_name_vector_map, TITLE_EMBEDDING: chunk.title_embedding, + CURRENT_INDEX_TIME: _vespa_get_updated_at_attribute(chunk.current_index_time), DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners), SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners), @@ -248,3 +254,85 @@ def clean_chunk_id_copy( } ) return clean_chunk + + +def _does_doc_exist_in_vespa( + doc_id: str, + index_name: str, + http_client: httpx.Client, +) -> bool: + """ + Checks whether there's a chunk/doc matching doc_id in Vespa using YQL. + """ + encoded_doc_id = urllib.parse.quote(doc_id) + + # Construct the URL with YQL query + url = ( + f"{SEARCH_ENDPOINT}" + f'?yql=select+*+from+sources+{index_name}+where+document_id+contains+"{encoded_doc_id}"' + "&hits=0" + ) + + logger.debug(f"Checking existence for doc_id={doc_id} with URL={url}") + resp = http_client.get(url) + + if resp.status_code == 200: + data = resp.json() + try: + total_count = data["root"]["fields"]["totalCount"] + return total_count > 0 + except (KeyError, TypeError): + logger.exception(f"Unexpected JSON structure from {url}: {data}") + raise + + elif resp.status_code == 404: + return False + + else: + logger.warning( + f"Unexpected HTTP {resp.status_code} checking doc existence for doc_id={doc_id}" + ) + return False + + +def find_existing_docs_in_vespa_by_doc_id( + doc_ids: List[str], + index_name: str, + http_client: httpx.Client, + executor: concurrent.futures.ThreadPoolExecutor | None = None, +) -> Set[str]: + """ + For each doc_id in doc_ids, returns whether it already exists in Vespa. + We do this concurrently for performance if doc_ids is large. + """ + if not doc_ids: + return set() + + external_executor = True + if executor is None: + # Create our own if not given + external_executor = False + executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) + + existing_doc_ids = set() + + try: + future_map = { + executor.submit( + _does_doc_exist_in_vespa, doc_id, index_name, http_client + ): doc_id + for doc_id in doc_ids + } + for future in concurrent.futures.as_completed(future_map): + doc_id = future_map[future] + try: + if future.result(): + existing_doc_ids.add(doc_id) + except Exception: + logger.exception(f"Error checking doc existence for doc_id={doc_id}") + raise + + finally: + if not external_executor: + executor.shutdown(wait=True) + return existing_doc_ids diff --git a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py index dda75c85337..85ce5e48e3b 100644 --- a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py +++ b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py @@ -1,12 +1,14 @@ from datetime import datetime from datetime import timedelta from datetime import timezone +from urllib.parse import urlencode from onyx.configs.constants import INDEX_SEPARATOR from onyx.context.search.models import IndexFilters from onyx.document_index.interfaces import VespaChunkRequest from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST from onyx.document_index.vespa_constants import CHUNK_ID +from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_SETS @@ -106,3 +108,24 @@ def build_vespa_id_based_retrieval_yql( id_based_retrieval_yql_section += ")" return id_based_retrieval_yql_section + + +def build_deletion_selection_query( + doc_id: str, version_cutoff: int, doc_type: str +) -> str: + """ + Build a Vespa selection expression that includes: + - {doc_type}.document_id == + - {doc_type}.current_index_time < version_cutoff + + Returns the URL-encoded selection query parameter. + """ + # Escape single quotes by doubling them for Vespa selection expressions + escaped_doc_id = doc_id.replace("'", "''") + + filter_str = ( + f"({doc_type}.document_id=='{escaped_doc_id}') and " + f"({doc_type}.{CURRENT_INDEX_TIME} < {version_cutoff})" + ) + + return urlencode({"selection": filter_str}) diff --git a/backend/onyx/document_index/vespa_constants.py b/backend/onyx/document_index/vespa_constants.py index aff4e85566d..3bf5ddb9e16 100644 --- a/backend/onyx/document_index/vespa_constants.py +++ b/backend/onyx/document_index/vespa_constants.py @@ -52,6 +52,7 @@ TENANT_ID = "tenant_id" DOCUMENT_ID = "document_id" +CURRENT_INDEX_TIME = "current_index_time" CHUNK_ID = "chunk_id" BLURB = "blurb" CONTENT = "content" diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index f9ed3eb7b8e..bf15a71a6bf 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -1,5 +1,7 @@ import traceback from collections.abc import Callable +from datetime import datetime +from datetime import timezone from functools import partial from http import HTTPStatus from typing import Protocol @@ -400,6 +402,8 @@ def index_doc_batch( else DEFAULT_BOOST ), tenant_id=tenant_id, + # Use a timezone-aware datetime, here we set to current UTC time + current_index_time=datetime.now(tz=timezone.utc), ) for chunk in chunks_with_embeddings ] diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index e9a155d172b..b09d5570a45 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -1,3 +1,4 @@ +import datetime from typing import TYPE_CHECKING from pydantic import BaseModel @@ -73,12 +74,14 @@ class DocMetadataAwareIndexChunk(IndexChunk): of. This is used for filtering / personas. boost: influences the ranking of this chunk at query time. Positive -> ranked higher, negative -> ranked lower. + current_index_time: the timestamp of when this chunk is being indexed. """ tenant_id: str | None = None access: "DocumentAccess" document_sets: set[str] boost: int + current_index_time: datetime.datetime @classmethod def from_index_chunk( @@ -88,6 +91,7 @@ def from_index_chunk( document_sets: set[str], boost: int, tenant_id: str | None, + current_index_time: datetime.datetime, ) -> "DocMetadataAwareIndexChunk": index_chunk_data = index_chunk.model_dump() return cls( @@ -96,6 +100,7 @@ def from_index_chunk( document_sets=document_sets, boost=boost, tenant_id=tenant_id, + current_index_time=current_index_time, ) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index b629b6ac3bc..ed818ce6991 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -86,6 +86,7 @@ def _create_indexable_chunks( access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, + current_index_time=datetime.datetime.now(datetime.timezone.utc), ) chunks.append(chunk) @@ -217,7 +218,7 @@ def seed_initial_documents( # as we just sent over the Vespa schema and there is a slight delay index_with_retries = retry_builder(tries=15)(document_index.index) - index_with_retries(chunks=chunks, fresh_index=cohere_enabled) + index_with_retries(chunks=chunks) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index e7a7805690f..79d506c1d5c 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -10,6 +10,7 @@ """ import random from datetime import datetime +from datetime import timezone from onyx.access.models import DocumentAccess from onyx.configs.constants import DocumentSource @@ -96,6 +97,7 @@ def generate_dummy_chunk( document_sets={document_set for document_set in document_set_names}, boost=random.randint(-1, 1), tenant_id=POSTGRES_DEFAULT_SCHEMA, + current_index_time=datetime.now(tz=timezone.utc), ) diff --git a/web/src/app/chat/documentSidebar/ChatFilters.tsx b/web/src/app/chat/documentSidebar/ChatFilters.tsx index f91529f918f..98fb9b36c37 100644 --- a/web/src/app/chat/documentSidebar/ChatFilters.tsx +++ b/web/src/app/chat/documentSidebar/ChatFilters.tsx @@ -81,7 +81,6 @@ export const ChatFilters = forwardRef( const dedupedDocuments = removeDuplicateDocs(currentDocuments || []); const tokenLimitReached = selectedDocumentTokens > maxTokens - 75; - console.log("SELECTED MESSAGE is", selectedMessage); const hasSelectedDocuments = selectedDocumentIds.length > 0; From 51bed8bcfa24050975cf4a6f9650f6d41b143739 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Wed, 1 Jan 2025 17:30:29 -0500 Subject: [PATCH 2/2] logs --- backend/onyx/document_index/vespa/index.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index abdd37d1baf..4e3d99ba2f1 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -338,6 +338,7 @@ def index( executor=executor, ) + print("LIST OF EXISTING DOCS", existing_doc_ids) # b) Feed new/updated chunks in batches for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks(