Skip to content

Commit

Permalink
Fix --number-of-docs bug in create-workload (#659)
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Hoang <ianhoang16@gmail.com>
  • Loading branch information
IanHoang authored Oct 3, 2024
1 parent 7c6e1b7 commit a967bfd
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
9 changes: 8 additions & 1 deletion osbenchmark/utils/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,14 @@ class StoreKeyPairAsDict(argparse.Action):
"""
def __call__(self, parser, namespace, values, option_string=None):
custom_dict = {}
for kv in values:

if len(values) == 1:
# If values contains spaces, user provided 2+ key value pairs
kv_pairs = values[0].split(" ")
else:
kv_pairs = values

for kv in kv_pairs:
try:
k,v = kv.split(":")
custom_dict[k] = v
Expand Down
31 changes: 26 additions & 5 deletions osbenchmark/workload_generator/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import os
from abc import ABC, abstractmethod

from opensearchpy import OpenSearchException
import opensearchpy.exceptions

from osbenchmark import exceptions
from osbenchmark.utils import console
from osbenchmark.workload_generator.config import CustomWorkload

Expand Down Expand Up @@ -41,8 +42,10 @@ def extract_indices(self, workload_path):
try:
for index in self.custom_workload.indices:
extracted_indices += self.extract(workload_path, index.name)
except OpenSearchException:
self.logger("Failed at extracting index [%s]", index)
except opensearchpy.exceptions.NotFoundError:
raise exceptions.SystemSetupError(f"Index [{index.name}] does not exist.")
except opensearchpy.OpenSearchException:
self.logger.error("Failed at extracting index [%s]", index)
failed_indices += index

return extracted_indices, failed_indices
Expand Down Expand Up @@ -138,6 +141,9 @@ def extract_documents(self, index, documents_limit=None):


class SequentialCorpusExtractor(CorpusExtractor):
DEFAULT_TEST_MODE_DOC_COUNT = 1000
DEFAULT_TEST_MODE_SUFFIX = "-1k"

def __init__(self, custom_workload, client):
self.custom_workload: CustomWorkload = custom_workload
self.client = client
Expand Down Expand Up @@ -173,15 +179,30 @@ def extract_documents(self, index, documents_limit=None):

documents_to_extract = total_documents if not documents_limit else min(total_documents, documents_limit)

if documents_limit:
# Only time when documents-1k.json will be less than 1K documents is
# when the documents_limit is < 1k documents or source index has less than 1k documents
if documents_limit < self.DEFAULT_TEST_MODE_DOC_COUNT:
test_mode_warning_msg = "Due to --number-of-docs set by user, " + \
f"test-mode docs will be less than the default {self.DEFAULT_TEST_MODE_DOC_COUNT} documents."
console.warn(test_mode_warning_msg)

# Notify users when they specified more documents than available in index
if documents_limit > total_documents:
documents_to_extract_warning_msg = f"User requested extraction of {documents_limit} documents " + \
f"but there are only {total_documents} documents in {index}. " + \
f"Will only extract {total_documents} documents from {index}."
console.warn(documents_to_extract_warning_msg)

if documents_to_extract > 0:
logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", total_documents, index, documents_to_extract)
docs_path = self._get_doc_outpath(self.custom_workload.workload_path, index)
# Create test mode corpora
self.dump_documents(
self.client,
index,
self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"),
min(documents_to_extract, 1000),
self._get_doc_outpath(self.custom_workload.workload_path, index, self.DEFAULT_TEST_MODE_SUFFIX),
min(documents_to_extract, self.DEFAULT_TEST_MODE_DOC_COUNT),
" for test mode")
# Create full corpora
self.dump_documents(self.client, index, docs_path, documents_to_extract)
Expand Down
35 changes: 23 additions & 12 deletions osbenchmark/workload_generator/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,29 @@ def process_queries(self):

return processed_queries

def process_indices(indices, document_frequency, number_of_docs):
def process_indices(indices, document_frequency, indices_docs_mapping):
processed_indices = []
for index_name in indices:
index = Index(
name=index_name,
document_frequency=document_frequency,
number_of_docs=number_of_docs
)
processed_indices.append(index)
try:
#Setting number_of_docs_for_index to None means OSB will grab all docs available in index
number_of_docs_for_index = None
if indices_docs_mapping and index_name in indices_docs_mapping:
number_of_docs_for_index = int(indices_docs_mapping[index_name])
if number_of_docs_for_index <= 0:
raise exceptions.SystemSetupError(
"Values specified with --number-of-docs must be greater than 0")

index = Index(
name=index_name,
document_frequency=document_frequency,
number_of_docs=number_of_docs_for_index
)
processed_indices.append(index)

return processed_indices
except ValueError as e:
raise exceptions.SystemSetupError("Ensure you are using integers if providing --number-of-docs.", e)

return processed_indices

def validate_index_documents_map(indices, indices_docs_map):
logger = logging.getLogger(__name__)
Expand All @@ -147,13 +158,13 @@ def validate_index_documents_map(indices, indices_docs_map):

if len(indices) < len(indices_docs_map):
raise exceptions.SystemSetupError(
"Number of <index>:<doc_count> pairs exceeds number of indices in --indices. " +
"Ensure number of <index>:<doc_count> pairs is less than or equal to number of indices in --indices."
"Number of <index>:<doc_count> pairs in --number-of-docs exceeds number of indices in --indices. " +
"Ensure number of <index>:<doc_count> pairs is less than or equal to number of indices."
)

for index_name in indices_docs_map:
if index_name not in indices:
raise exceptions.SystemSetupError(
"Index from <index>:<doc_count> pair was not found in --indices. " +
"Ensure that indices from all <index>:<doc_count> pairs exist in --indices."
f"Index {index_name} provided in --number-of-docs was not found in --indices. " +
"Ensure that all indices in --number-of-docs are present in --indices."
)
1 change: 1 addition & 0 deletions osbenchmark/workload_generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def create_workload(cfg):
console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger)

processed_indices = process_indices(indices, document_frequency, number_of_docs)
logger.info("Processed Indices: %s", processed_indices)

custom_workload = CustomWorkload(
workload_name=workload_name,
Expand Down

0 comments on commit a967bfd

Please sign in to comment.