From a967bfdf9184094997501105e52276224514b061 Mon Sep 17 00:00:00 2001 From: Ian Hoang <51065478+IanHoang@users.noreply.github.com> Date: Thu, 3 Oct 2024 10:29:08 -0500 Subject: [PATCH] Fix --number-of-docs bug in create-workload (#659) Signed-off-by: Ian Hoang --- osbenchmark/utils/opts.py | 9 ++++- osbenchmark/workload_generator/extractors.py | 31 +++++++++++++--- osbenchmark/workload_generator/helpers.py | 35 ++++++++++++------- .../workload_generator/workload_generator.py | 1 + 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/osbenchmark/utils/opts.py b/osbenchmark/utils/opts.py index e30c067f7..fcc029b60 100644 --- a/osbenchmark/utils/opts.py +++ b/osbenchmark/utils/opts.py @@ -125,7 +125,14 @@ class StoreKeyPairAsDict(argparse.Action): """ def __call__(self, parser, namespace, values, option_string=None): custom_dict = {} - for kv in values: + + if len(values) == 1: + # If values contains spaces, user provided 2+ key value pairs + kv_pairs = values[0].split(" ") + else: + kv_pairs = values + + for kv in kv_pairs: try: k,v = kv.split(":") custom_dict[k] = v diff --git a/osbenchmark/workload_generator/extractors.py b/osbenchmark/workload_generator/extractors.py index a92dc0f7d..c3e2045dd 100644 --- a/osbenchmark/workload_generator/extractors.py +++ b/osbenchmark/workload_generator/extractors.py @@ -12,8 +12,9 @@ import os from abc import ABC, abstractmethod -from opensearchpy import OpenSearchException +import opensearchpy.exceptions +from osbenchmark import exceptions from osbenchmark.utils import console from osbenchmark.workload_generator.config import CustomWorkload @@ -41,8 +42,10 @@ def extract_indices(self, workload_path): try: for index in self.custom_workload.indices: extracted_indices += self.extract(workload_path, index.name) - except OpenSearchException: - self.logger("Failed at extracting index [%s]", index) + except opensearchpy.exceptions.NotFoundError: + raise exceptions.SystemSetupError(f"Index [{index.name}] does not exist.") + except opensearchpy.OpenSearchException: + self.logger.error("Failed at extracting index [%s]", index) failed_indices += index return extracted_indices, failed_indices @@ -138,6 +141,9 @@ def extract_documents(self, index, documents_limit=None): class SequentialCorpusExtractor(CorpusExtractor): + DEFAULT_TEST_MODE_DOC_COUNT = 1000 + DEFAULT_TEST_MODE_SUFFIX = "-1k" + def __init__(self, custom_workload, client): self.custom_workload: CustomWorkload = custom_workload self.client = client @@ -173,6 +179,21 @@ def extract_documents(self, index, documents_limit=None): documents_to_extract = total_documents if not documents_limit else min(total_documents, documents_limit) + if documents_limit: + # Only time when documents-1k.json will be less than 1K documents is + # when the documents_limit is < 1k documents or source index has less than 1k documents + if documents_limit < self.DEFAULT_TEST_MODE_DOC_COUNT: + test_mode_warning_msg = "Due to --number-of-docs set by user, " + \ + f"test-mode docs will be less than the default {self.DEFAULT_TEST_MODE_DOC_COUNT} documents." + console.warn(test_mode_warning_msg) + + # Notify users when they specified more documents than available in index + if documents_limit > total_documents: + documents_to_extract_warning_msg = f"User requested extraction of {documents_limit} documents " + \ + f"but there are only {total_documents} documents in {index}. " + \ + f"Will only extract {total_documents} documents from {index}." + console.warn(documents_to_extract_warning_msg) + if documents_to_extract > 0: logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", total_documents, index, documents_to_extract) docs_path = self._get_doc_outpath(self.custom_workload.workload_path, index) @@ -180,8 +201,8 @@ def extract_documents(self, index, documents_limit=None): self.dump_documents( self.client, index, - self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"), - min(documents_to_extract, 1000), + self._get_doc_outpath(self.custom_workload.workload_path, index, self.DEFAULT_TEST_MODE_SUFFIX), + min(documents_to_extract, self.DEFAULT_TEST_MODE_DOC_COUNT), " for test mode") # Create full corpora self.dump_documents(self.client, index, docs_path, documents_to_extract) diff --git a/osbenchmark/workload_generator/helpers.py b/osbenchmark/workload_generator/helpers.py index 7a03e47d7..562f60e49 100644 --- a/osbenchmark/workload_generator/helpers.py +++ b/osbenchmark/workload_generator/helpers.py @@ -125,18 +125,29 @@ def process_queries(self): return processed_queries -def process_indices(indices, document_frequency, number_of_docs): +def process_indices(indices, document_frequency, indices_docs_mapping): processed_indices = [] for index_name in indices: - index = Index( - name=index_name, - document_frequency=document_frequency, - number_of_docs=number_of_docs - ) - processed_indices.append(index) + try: + #Setting number_of_docs_for_index to None means OSB will grab all docs available in index + number_of_docs_for_index = None + if indices_docs_mapping and index_name in indices_docs_mapping: + number_of_docs_for_index = int(indices_docs_mapping[index_name]) + if number_of_docs_for_index <= 0: + raise exceptions.SystemSetupError( + "Values specified with --number-of-docs must be greater than 0") + + index = Index( + name=index_name, + document_frequency=document_frequency, + number_of_docs=number_of_docs_for_index + ) + processed_indices.append(index) - return processed_indices + except ValueError as e: + raise exceptions.SystemSetupError("Ensure you are using integers if providing --number-of-docs.", e) + return processed_indices def validate_index_documents_map(indices, indices_docs_map): logger = logging.getLogger(__name__) @@ -147,13 +158,13 @@ def validate_index_documents_map(indices, indices_docs_map): if len(indices) < len(indices_docs_map): raise exceptions.SystemSetupError( - "Number of : pairs exceeds number of indices in --indices. " + - "Ensure number of : pairs is less than or equal to number of indices in --indices." + "Number of : pairs in --number-of-docs exceeds number of indices in --indices. " + + "Ensure number of : pairs is less than or equal to number of indices." ) for index_name in indices_docs_map: if index_name not in indices: raise exceptions.SystemSetupError( - "Index from : pair was not found in --indices. " + - "Ensure that indices from all : pairs exist in --indices." + f"Index {index_name} provided in --number-of-docs was not found in --indices. " + + "Ensure that all indices in --number-of-docs are present in --indices." ) diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 41adac508..188bda837 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -40,6 +40,7 @@ def create_workload(cfg): console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) processed_indices = process_indices(indices, document_frequency, number_of_docs) + logger.info("Processed Indices: %s", processed_indices) custom_workload = CustomWorkload( workload_name=workload_name,