Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More efficient Vespa Indexing #3575

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/onyx/connectors/web/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ def load_from_state(self) -> GenerateDocumentsOutput:
continue

parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)

doc_batch.append(
Document(
id=current_url,
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ class Document(Base):
last_synced: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, index=True
)

# The following are not attached to User because the account/email may not be known
# within Onyx
# Something like the document creator
Expand Down
7 changes: 0 additions & 7 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
Expand All @@ -166,15 +165,9 @@ def index(
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.

NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.

Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.

Returns:
List of document ids which map to unique documents and are used for deduping chunks
when updating, as well as if the document is newly indexed or already existed and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ schema DANSWER_CHUNK_NAME {
field chunk_id type int {
indexing: summary | attribute
}
field current_index_time type int {
indexing: summary | attribute
}
# Displayed in the UI as the main identifier for the doc
field semantic_identifier type string {
indexing: summary | attribute
Expand Down
94 changes: 53 additions & 41 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
get_existing_documents_from_chunks,
find_existing_docs_in_vespa_by_doc_id,
)
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
)
from onyx.document_index.vespa.shared_utils.vespa_request_builders import (
build_deletion_selection_query,
)
from onyx.document_index.vespa.shared_utils.vespa_request_builders import (
build_vespa_filters,
)
Expand Down Expand Up @@ -307,47 +310,36 @@ def register_multitenant_indices(
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""
# IMPORTANT: This must be done one index at a time, do not use secondary index here
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]

existing_docs: set[str] = set()
"""
Index a list of chunks into Vespa. We rely on 'current_index_time'
to keep track of when each chunk was added/updated in the index. We also raise a ValueError
if any chunk is missing a 'current_index_time' timestamp.
"""

# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
if not fresh_index:
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [
chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
# Clean chunks if needed (remove invalid chars, etc.)
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]

for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
# We will store the set of doc_ids that previously existed in Vespa
doc_ids_to_current_index_time = {
chunk.source_document.id: chunk.current_index_time
for chunk in cleaned_chunks
}
existing_doc_ids = set()

with get_vespa_http_client() as http_client, concurrent.futures.ThreadPoolExecutor(
max_workers=NUM_THREADS
) as executor:
# a) Find which docs already exist in Vespa
existing_doc_ids = find_existing_docs_in_vespa_by_doc_id(
doc_ids=list(doc_ids_to_current_index_time.keys()),
index_name=self.index_name,
http_client=http_client,
executor=executor,
)

print("LIST OF EXISTING DOCS", existing_doc_ids)
# b) Feed new/updated chunks in batches
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,
Expand All @@ -357,14 +349,34 @@ def index(
executor=executor,
)

all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks}
# c) Remove chunks with using versioning scheme 'current_index_time'
for doc_id in existing_doc_ids:
version_cutoff = int(doc_ids_to_current_index_time[doc_id].timestamp())
query_str = build_deletion_selection_query(
doc_id=doc_id,
version_cutoff=version_cutoff,
doc_type=self.index_name,
)
delete_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/"
f"?{query_str}&cluster={DOCUMENT_INDEX_NAME}"
)
try:
resp = http_client.delete(delete_url)
resp.raise_for_status()
except httpx.HTTPStatusError:
logger.exception(
f"Selection-based delete failed for doc_id='{doc_id}'"
)
raise

# Produce insertion records specifying which documents existed prior
return {
DocumentInsertionRecord(
document_id=doc_id,
already_existed=doc_id in existing_docs,
already_existed=(doc_id in existing_doc_ids),
)
for doc_id in all_doc_ids
for doc_id in doc_ids_to_current_index_time
}

@staticmethod
Expand Down
88 changes: 88 additions & 0 deletions backend/onyx/document_index/vespa/indexing_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import concurrent.futures
import json
import urllib.parse
from datetime import datetime
from datetime import timezone
from http import HTTPStatus
from typing import List
from typing import Set

import httpx
from retry import retry
Expand All @@ -21,6 +24,7 @@
from onyx.document_index.vespa_constants import CHUNK_ID
from onyx.document_index.vespa_constants import CONTENT
from onyx.document_index.vespa_constants import CONTENT_SUMMARY
from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME
from onyx.document_index.vespa_constants import DOC_UPDATED_AT
from onyx.document_index.vespa_constants import DOCUMENT_ID
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
Expand All @@ -32,6 +36,7 @@
from onyx.document_index.vespa_constants import METADATA_SUFFIX
from onyx.document_index.vespa_constants import NUM_THREADS
from onyx.document_index.vespa_constants import PRIMARY_OWNERS
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT
from onyx.document_index.vespa_constants import SECONDARY_OWNERS
from onyx.document_index.vespa_constants import SECTION_CONTINUATION
from onyx.document_index.vespa_constants import SEMANTIC_IDENTIFIER
Expand Down Expand Up @@ -168,6 +173,7 @@ def _index_vespa_chunk(
METADATA_SUFFIX: chunk.metadata_suffix_keyword,
EMBEDDINGS: embeddings_name_vector_map,
TITLE_EMBEDDING: chunk.title_embedding,
CURRENT_INDEX_TIME: _vespa_get_updated_at_attribute(chunk.current_index_time),
DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at),
PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners),
SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners),
Expand Down Expand Up @@ -248,3 +254,85 @@ def clean_chunk_id_copy(
}
)
return clean_chunk


def _does_doc_exist_in_vespa(
doc_id: str,
index_name: str,
http_client: httpx.Client,
) -> bool:
"""
Checks whether there's a chunk/doc matching doc_id in Vespa using YQL.
"""
encoded_doc_id = urllib.parse.quote(doc_id)

# Construct the URL with YQL query
url = (
f"{SEARCH_ENDPOINT}"
f'?yql=select+*+from+sources+{index_name}+where+document_id+contains+"{encoded_doc_id}"'
"&hits=0"
)

logger.debug(f"Checking existence for doc_id={doc_id} with URL={url}")
resp = http_client.get(url)

if resp.status_code == 200:
data = resp.json()
try:
total_count = data["root"]["fields"]["totalCount"]
return total_count > 0
except (KeyError, TypeError):
logger.exception(f"Unexpected JSON structure from {url}: {data}")
raise

elif resp.status_code == 404:
return False

else:
logger.warning(
f"Unexpected HTTP {resp.status_code} checking doc existence for doc_id={doc_id}"
)
return False


def find_existing_docs_in_vespa_by_doc_id(
doc_ids: List[str],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
) -> Set[str]:
"""
For each doc_id in doc_ids, returns whether it already exists in Vespa.
We do this concurrently for performance if doc_ids is large.
"""
if not doc_ids:
return set()

external_executor = True
if executor is None:
# Create our own if not given
external_executor = False
executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS)

existing_doc_ids = set()

try:
future_map = {
executor.submit(
_does_doc_exist_in_vespa, doc_id, index_name, http_client
): doc_id
for doc_id in doc_ids
}
for future in concurrent.futures.as_completed(future_map):
doc_id = future_map[future]
try:
if future.result():
existing_doc_ids.add(doc_id)
except Exception:
logger.exception(f"Error checking doc existence for doc_id={doc_id}")
raise

finally:
if not external_executor:
executor.shutdown(wait=True)
return existing_doc_ids
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from urllib.parse import urlencode

from onyx.configs.constants import INDEX_SEPARATOR
from onyx.context.search.models import IndexFilters
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST
from onyx.document_index.vespa_constants import CHUNK_ID
from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME
from onyx.document_index.vespa_constants import DOC_UPDATED_AT
from onyx.document_index.vespa_constants import DOCUMENT_ID
from onyx.document_index.vespa_constants import DOCUMENT_SETS
Expand Down Expand Up @@ -106,3 +108,24 @@ def build_vespa_id_based_retrieval_yql(

id_based_retrieval_yql_section += ")"
return id_based_retrieval_yql_section


def build_deletion_selection_query(
doc_id: str, version_cutoff: int, doc_type: str
) -> str:
"""
Build a Vespa selection expression that includes:
- {doc_type}.document_id == <doc_id>
- {doc_type}.current_index_time < version_cutoff

Returns the URL-encoded selection query parameter.
"""
# Escape single quotes by doubling them for Vespa selection expressions
escaped_doc_id = doc_id.replace("'", "''")

filter_str = (
f"({doc_type}.document_id=='{escaped_doc_id}') and "
f"({doc_type}.{CURRENT_INDEX_TIME} < {version_cutoff})"
)

return urlencode({"selection": filter_str})
1 change: 1 addition & 0 deletions backend/onyx/document_index/vespa_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

TENANT_ID = "tenant_id"
DOCUMENT_ID = "document_id"
CURRENT_INDEX_TIME = "current_index_time"
CHUNK_ID = "chunk_id"
BLURB = "blurb"
CONTENT = "content"
Expand Down
4 changes: 4 additions & 0 deletions backend/onyx/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import traceback
from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from functools import partial
from http import HTTPStatus
from typing import Protocol
Expand Down Expand Up @@ -400,6 +402,8 @@ def index_doc_batch(
else DEFAULT_BOOST
),
tenant_id=tenant_id,
# Use a timezone-aware datetime, here we set to current UTC time
current_index_time=datetime.now(tz=timezone.utc),
)
for chunk in chunks_with_embeddings
]
Expand Down
Loading
Loading