Skip to content

Commit

Permalink
update for clarity + best practices
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Jan 8, 2025
1 parent 9875d3a commit e2066d2
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 25 deletions.
2 changes: 0 additions & 2 deletions backend/onyx/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def document_by_cc_pair_cleanup_task(
# delete it from vespa and the db
action = "delete"

# TODO: fix the large chunks enabled
chunks_affected = retry_index.delete_single(
document_id,
tenant_id=tenant_id,
Expand Down Expand Up @@ -115,7 +114,6 @@ def document_by_cc_pair_cleanup_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
# TODO: fix the large chunks enabled
chunks_affected = retry_index.update_single(
document_id,
tenant_id=tenant_id,
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/document_index/document_index_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def translate_boost_count_to_multiplier(boost: int) -> float:
return 2 / (1 + math.exp(-1 * boost / 3))


def assemble_document_chunk_info(
# Assembles a list of Vespa chunk IDs for a document
# given the required context. This can be used to directly query
# Vespa's Document API.
def get_document_chunk_ids(
enriched_document_info_list: list[EnrichedDocumentIndexingInfo],
tenant_id: str | None,
large_chunks_enabled: bool,
Expand Down
32 changes: 15 additions & 17 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.db.engine import get_session_with_tenant
from onyx.document_index.document_index_utils import assemble_document_chunk_info
from onyx.document_index.document_index_utils import get_document_chunk_ids
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentInsertionRecord
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
Expand All @@ -42,11 +42,11 @@
from onyx.document_index.vespa.chunk_retrieval import query_vespa
from onyx.document_index.vespa.deletion import delete_vespa_chunks
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import (
check_enable_large_chunks_and_multipass,
)
from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
get_multipass_config,
)
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
Expand Down Expand Up @@ -355,7 +355,7 @@ def index(

# Now, for each doc, we know exactly where to start and end our deletion
# So let's generate the chunk IDs for each chunk to delete
chunks_to_delete = assemble_document_chunk_info(
chunks_to_delete = get_document_chunk_ids(
enriched_document_info_list=enriched_doc_infos,
tenant_id=tenant_id,
large_chunks_enabled=large_chunks_enabled,
Expand Down Expand Up @@ -458,14 +458,14 @@ def update(
for update_request in update_requests:
for doc_info in update_request.minimal_document_indexing_info:
for index_name in index_names:
doc_chunk_info = self.enrich_basic_chunk_info(
doc_chunk_info = VespaIndex.enrich_basic_chunk_info(
index_name=index_name,
http_client=http_client,
document_id=doc_info.doc_id,
previous_chunk_count=doc_info.chunk_start_index,
new_chunk_count=0,
)
doc_chunk_ids = assemble_document_chunk_info(
doc_chunk_ids = get_document_chunk_ids(
enriched_document_info_list=[doc_chunk_info],
tenant_id=tenant_id,
large_chunks_enabled=False,
Expand Down Expand Up @@ -578,9 +578,8 @@ def update_single(

with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
large_chunks_enabled = False
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = check_enable_large_chunks_and_multipass(
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
Expand All @@ -593,15 +592,14 @@ def update_single(
previous_chunk_count=chunk_count,
new_chunk_count=0,
)
doc_chunk_ids = assemble_document_chunk_info(
doc_chunk_ids = get_document_chunk_ids(
enriched_document_info_list=[enriched_doc_infos],
tenant_id=tenant_id,
large_chunks_enabled=large_chunks_enabled,
)
doc_chunk_count += len(doc_chunk_ids)

for doc_chunk_id in doc_chunk_ids:
print("THIS ONE is being updated")
self.update_single_chunk(
doc_chunk_id=doc_chunk_id, index_name=index_name, fields=fields
)
Expand All @@ -615,9 +613,6 @@ def delete_single(
tenant_id: str | None,
chunk_count: int | None,
) -> int:
"""Possibly faster overall than the delete method due to using a single
delete call with a selection query."""

total_chunks_deleted = 0

doc_id = replace_invalid_doc_id_characters(doc_id)
Expand All @@ -634,9 +629,8 @@ def delete_single(
max_workers=NUM_THREADS
) as executor:
for index_name in index_names:
large_chunks_enabled = False
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = check_enable_large_chunks_and_multipass(
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
Expand All @@ -649,7 +643,7 @@ def delete_single(
previous_chunk_count=chunk_count,
new_chunk_count=0,
)
chunks_to_delete = assemble_document_chunk_info(
chunks_to_delete = get_document_chunk_ids(
enriched_document_info_list=[enriched_doc_infos],
tenant_id=tenant_id,
large_chunks_enabled=large_chunks_enabled,
Expand Down Expand Up @@ -762,6 +756,10 @@ def admin_retrieval(

return query_vespa(params)

# Retrieves chunk information for a document:
# - Determines the last indexed chunk
# - Identifies if the document uses the old or new chunk ID system
# This data is crucial for Vespa document updates without relying on the visit API.
@classmethod
def enrich_basic_chunk_info(
cls,
Expand Down
2 changes: 1 addition & 1 deletion backend/onyx/document_index/vespa/indexing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bo
)


def check_enable_large_chunks_and_multipass(
def get_multipass_config(
db_session: Session, primary_index: bool = True
) -> MultipassConfig:
"""
Expand Down
6 changes: 2 additions & 4 deletions backend/onyx/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.vespa.indexing_utils import (
check_enable_large_chunks_and_multipass,
get_multipass_config,
)
from onyx.indexing.chunker import Chunker
from onyx.indexing.embedder import IndexingEmbedder
Expand Down Expand Up @@ -491,9 +491,7 @@ def build_indexing_pipeline(
callback: IndexingHeartbeatInterface | None = None,
) -> IndexingPipelineProtocol:
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
multipass_config = check_enable_large_chunks_and_multipass(
db_session, primary_index=True
)
multipass_config = get_multipass_config(db_session, primary_index=True)

chunker = chunker or Chunker(
tokenizer=embedder.embedding_model.tokenizer,
Expand Down

0 comments on commit e2066d2

Please sign in to comment.