diff --git a/chatbot/bot/memory/embedder.py b/chatbot/bot/memory/embedder.py index e0b4e50..9aba205 100644 --- a/chatbot/bot/memory/embedder.py +++ b/chatbot/bot/memory/embedder.py @@ -1,80 +1,49 @@ -from abc import ABC, abstractmethod from typing import Any +import sentence_transformers -class Embedder(ABC): - @abstractmethod - def embed_documents(self, texts: list[str]) -> list[list[float]]: - """Embed search docs.""" - @abstractmethod - def embed_query(self, text: str) -> list[float]: - """Embed query text.""" - - -class HuggingFaceEmbedder(Embedder): - """HuggingFace sentence_transformers embedding models. - - To use, you should have the ``sentence_transformers`` python package installed. - """ - - client: Any #: :meta private: - model_name: str = "all-MiniLM-L6-v2" - """Model name to use.""" - cache_folder: str | None = None - """Path to store models. - Can be also set by SENTENCE_TRANSFORMERS_HOME environment variable.""" - model_kwargs: dict[str, Any] = {} - """Keyword arguments to pass to the model.""" - encode_kwargs: dict[str, Any] = {} - """Keyword arguments to pass when calling the `encode` method of the model.""" - multi_process: bool = False - """Run encode() on multiple GPUs.""" - - def __init__(self, **kwargs: Any): - """Initialize the sentence_transformer.""" - super().__init__(**kwargs) - try: - import sentence_transformers - - except ImportError as exc: - raise ImportError( - "Could not import sentence_transformers python package. " - "Please install it with `pip install sentence-transformers`." - ) from exc +class Embedder: + def __init__(self, model_name: str = "all-MiniLM-L6-v2", cache_folder: str | None = None, **kwargs: Any): + """ + Initialize the Embedder class with the specified parameters. - self.client = sentence_transformers.SentenceTransformer( - self.model_name, cache_folder=self.cache_folder, **self.model_kwargs - ) + Args: + **kwargs (Any): Additional keyword arguments to pass to the SentenceTransformer model. + """ + self.client = sentence_transformers.SentenceTransformer(model_name, cache_folder=cache_folder, **kwargs) - def embed_documents(self, texts: list[str]) -> list[list[float]]: - """Compute doc embeddings using a HuggingFace transformer model. + def embed_documents(self, texts: list[str], multi_process: bool = False, **encode_kwargs: Any) -> list[list[float]]: + """ + Compute document embeddings using a transformer model. Args: - texts: The list of texts to embed. + texts (list[str]): The list of texts to embed. + multi_process (bool): If True, use multiple processes to compute embeddings. + **encode_kwargs (Any): Additional keyword arguments to pass when calling the `encode` method of the model. Returns: - List of embeddings, one for each text. + list[list[float]]: A list of embeddings, one for each text. """ - import sentence_transformers texts = list(map(lambda x: x.replace("\n", " "), texts)) - if self.multi_process: + if multi_process: pool = self.client.start_multi_process_pool() embeddings = self.client.encode_multi_process(texts, pool) sentence_transformers.SentenceTransformer.stop_multi_process_pool(pool) else: - embeddings = self.client.encode(texts, **self.encode_kwargs) + embeddings = self.client.encode(texts, show_progress_bar=True, **encode_kwargs) return embeddings.tolist() def embed_query(self, text: str) -> list[float]: - """Compute query embeddings using a HuggingFace transformer model. + """ + Compute query embeddings using a transformer model. Args: - text: The text to embed. + text (str): The text to embed. Returns: - Embeddings for the text. + list[float]: Embeddings for the text. """ return self.embed_documents([text])[0] diff --git a/chatbot/cli/rag_chatbot.py b/chatbot/cli/rag_chatbot.py index a6b4583..51a803f 100644 --- a/chatbot/cli/rag_chatbot.py +++ b/chatbot/cli/rag_chatbot.py @@ -6,7 +6,7 @@ from bot.client.lama_cpp_client import LamaCppClient from bot.conversation.conversation_retrieval import ConversationRetrieval from bot.conversation.ctx_strategy import get_ctx_synthesis_strategies, get_ctx_synthesis_strategy -from bot.memory.embedder import HuggingFaceEmbedder +from bot.memory.embedder import Embedder from bot.memory.vector_memory import VectorMemory from bot.model.model_settings import get_model_setting, get_models from helpers.log import get_logger @@ -135,7 +135,7 @@ def main(parameters): conversation = ConversationRetrieval(llm) - embedding = HuggingFaceEmbedder() + embedding = Embedder() index = VectorMemory(vector_store_path=str(vector_store_path), embedding=embedding) loop(conversation, synthesis_strategy, index, parameters) diff --git a/chatbot/memory_builder.py b/chatbot/memory_builder.py index 45e1a06..3ec6363 100644 --- a/chatbot/memory_builder.py +++ b/chatbot/memory_builder.py @@ -3,10 +3,11 @@ from pathlib import Path from typing import List -from bot.memory.embedder import HuggingFaceEmbedder +from bot.memory.embedder import Embedder from bot.memory.vector_memory import VectorMemory +from document_loader.format import Format from document_loader.loader import DirectoryLoader -from document_loader.text_splitter import Format, create_recursive_text_splitter +from document_loader.text_splitter import create_recursive_text_splitter from entities.document import Document from helpers.log import get_logger @@ -62,7 +63,7 @@ def build_memory_index(docs_path: Path, vector_store_path: str, chunk_size: int, logger.info(f"Number of generated chunks: {len(chunks)}") logger.info("Creating memory index...") - embedding = HuggingFaceEmbedder() + embedding = Embedder() VectorMemory.create_memory_index(embedding, chunks, vector_store_path) logger.info("Memory Index has been created successfully!") diff --git a/chatbot/rag_chatbot_app.py b/chatbot/rag_chatbot_app.py index de9a61e..6983c80 100644 --- a/chatbot/rag_chatbot_app.py +++ b/chatbot/rag_chatbot_app.py @@ -11,7 +11,7 @@ get_ctx_synthesis_strategies, get_ctx_synthesis_strategy, ) -from bot.memory.embedder import HuggingFaceEmbedder +from bot.memory.embedder import Embedder from bot.memory.vector_memory import VectorMemory from bot.model.model_settings import get_model_setting, get_models from helpers.log import get_logger @@ -51,7 +51,7 @@ def load_index(vector_store_path: Path) -> VectorMemory: Returns: VectorMemory: An instance of the VectorMemory class with the loaded index. """ - embedding = HuggingFaceEmbedder() + embedding = Embedder() index = VectorMemory(vector_store_path=str(vector_store_path), embedding=embedding) return index diff --git a/chatbot/vector_database/chroma.py b/chatbot/vector_database/chroma.py index 690212c..e159e36 100644 --- a/chatbot/vector_database/chroma.py +++ b/chatbot/vector_database/chroma.py @@ -5,8 +5,9 @@ import chromadb import chromadb.config -from bot.memory.embedder import HuggingFaceEmbedder +from bot.memory.embedder import Embedder from chromadb.api.types import ID, OneOrMany, Where, WhereDocument +from chromadb.utils.batch_utils import create_batches from entities.document import Document logger = logging.getLogger(__name__) @@ -32,13 +33,12 @@ def _results_to_docs_and_scores(results: Any) -> list[tuple[Document, float]]: class Chroma: def __init__( self, - embedding_function: HuggingFaceEmbedder | None = None, + embedding_function: Embedder | None = None, persist_directory: str | None = None, client_settings: chromadb.config.Settings | None = None, collection_name: str = "default", collection_metadata: dict | None = None, client: chromadb.Client = None, - relevance_score_fn: Callable[[float], float] | None = None, ) -> None: """Initialize with a Chroma client.""" @@ -68,10 +68,9 @@ def __init__( embedding_function=None, metadata=collection_metadata, ) - self.override_relevance_score_fn = relevance_score_fn @property - def embeddings(self) -> HuggingFaceEmbedder | None: + def embeddings(self) -> Embedder | None: return self._embedding_function def __query_collection( @@ -94,6 +93,7 @@ def __query_collection( **kwargs, ) + @classmethod def add_texts( self, texts: Iterable[str], @@ -112,7 +112,7 @@ def add_texts( """ # TODO: Handle the case where the user doesn't provide ids on the Collection if ids is None: - ids = [str(uuid.uuid1()) for _ in texts] + ids = [str(uuid.uuid4()) for _ in texts] embeddings = None texts = list(texts) if self._embedding_function is not None: @@ -165,63 +165,147 @@ def add_texts( ) return ids - def similarity_search( - self, - query: str, - k: int = 4, - filter: dict[str, str] | None = None, - **kwargs: Any, - ) -> list[Document]: - """Run similarity search with Chroma. + @classmethod + def from_texts( + cls, + texts: list[str], + embedding: Embedder | None = None, + metadatas: list[dict] | None = None, + ids: list[str] | None = None, + collection_name: str = "default", + persist_directory: str | None = None, + client_settings: chromadb.config.Settings | None = None, + client=None, + collection_metadata: dict | None = None, + ): + """Create a Chroma vectorstore from a raw documents. + + If a persist_directory is specified, the collection will be persisted there. + Otherwise, the data will be ephemeral in-memory. Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + texts (List[str]): List of texts to add to the collection. + collection_name (str): Name of the collection to create. + persist_directory (Optional[str]): Directory to persist the collection. + embedding (Optional[Embeddings]): Embedding function. Defaults to None. + metadatas (Optional[List[dict]]): List of metadatas. Defaults to None. + ids (Optional[List[str]]): List of document IDs. Defaults to None. + client_settings (Optional[chromadb.config.Settings]): Chroma client settings + collection_metadata (Optional[Dict]): Collection configurations. + Defaults to None. Returns: - List[Document]: List of documents most similar to the query text. + Chroma: Chroma vectorstore. """ - docs_and_scores = self.similarity_search_with_score(query, k, filter=filter) - return [doc for doc, _ in docs_and_scores] + chroma_collection = cls( + collection_name=collection_name, + embedding_function=embedding, + persist_directory=persist_directory, + client_settings=client_settings, + client=client, + collection_metadata=collection_metadata, + ) + if ids is None: + ids = [str(uuid.uuid4()) for _ in texts] + + for batch in create_batches( + api=chroma_collection._client, + ids=ids, + metadatas=metadatas, + documents=texts, + ): + chroma_collection.add_texts( + texts=batch[3] if batch[3] else [], + metadatas=batch[2] if batch[2] else None, + ids=batch[0], + ) + return chroma_collection - def similarity_search_with_score( - self, - query: str, - k: int = 4, - filter: dict[str, str] | None = None, - where_document: dict[str, str] | None = None, - **kwargs: Any, - ) -> list[tuple[Document, float]]: - """Run similarity search with Chroma with distance. + def update_document(self, document_id: str, document: Document) -> None: + """Update a document in the collection. Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + document_id (str): ID of the document to update. + document (Document): Document to update. + """ + return self.update_documents([document_id], [document]) - Returns: - List[Tuple[Document, float]]: List of documents most similar to - the query text and cosine distance in float for each. - Lower score represents more similarity. + def update_documents(self, ids: list[str], documents: list[Document]) -> None: + """Update a document in the collection. + + Args: + ids (List[str]): List of ids of the document to update. + documents (List[Document]): List of documents to update. """ + text = [document.page_content for document in documents] + metadata = [document.metadata for document in documents] if self._embedding_function is None: - results = self.__query_collection( - query_texts=[query], - n_results=k, - where=filter, - where_document=where_document, - ) - else: - query_embedding = self._embedding_function.embed_query(query) - results = self.__query_collection( - query_embeddings=[query_embedding], - n_results=k, - where=filter, - where_document=where_document, + raise ValueError("For update, you must specify an embedding function on creation.") + embeddings = self._embedding_function.embed_documents(text) + + for batch in create_batches( + api=self._collection._client, + ids=ids, + metadatas=metadata, + documents=text, + embeddings=embeddings, + ): + self._collection.update( + ids=batch[0], + embeddings=batch[1], + documents=batch[3], + metadatas=batch[2], ) - return _results_to_docs_and_scores(results) + def get( + self, + ids: OneOrMany[ID] | None = None, + where: Where | None = None, + limit: int | None = None, + offset: int | None = None, + where_document: WhereDocument | None = None, + include: list[str] | None = None, + ) -> dict[str, Any]: + """Gets the collection. + + Args: + ids: The ids of the embeddings to get. Optional. + where: A Where type dict used to filter results by. + E.g. `{"color" : "red", "price": 4.20}`. Optional. + limit: The number of documents to return. Optional. + offset: The offset to start returning results from. + Useful for paging results with limit. Optional. + where_document: A WhereDocument type dict used to filter by the documents. + E.g. `{$contains: "hello"}`. Optional. + include: A list of what to include in the results. + Can contain `"embeddings"`, `"metadatas"`, `"documents"`. + Ids are always included. + Defaults to `["metadatas", "documents"]`. Optional. + """ + kwargs = { + "ids": ids, + "where": where, + "limit": limit, + "offset": offset, + "where_document": where_document, + } + + if include is not None: + kwargs["include"] = include + + return self._collection.get(**kwargs) + + def delete(self, ids: list[str] | None = None, **kwargs: Any) -> None: + """Delete by vector IDs. + + Args: + ids: List of ids to delete. + """ + self._collection.delete(ids=ids, **kwargs) + + def delete_collection(self) -> None: + """Delete the collection.""" + self._client.delete_collection(self._collection.name) @staticmethod def _cosine_relevance_score_fn(distance: float) -> float: @@ -261,8 +345,6 @@ def _select_relevance_score_fn(self) -> Callable[[float], float]: - embedding dimensionality - etc. """ - if self.override_relevance_score_fn: - return self.override_relevance_score_fn distance = "l2" distance_key = "hnsw:space" @@ -278,11 +360,65 @@ def _select_relevance_score_fn(self) -> Callable[[float], float]: elif distance == "ip": return self._max_inner_product_relevance_score_fn else: - raise ValueError( - "No supported normalization function" - f" for distance metric of type: {distance}." - "Consider providing relevance_score_fn to Chroma constructor." + raise ValueError("No supported normalization function" f" for distance metric of type: {distance}.") + + def similarity_search( + self, + query: str, + k: int = 4, + filter: dict[str, str] | None = None, + **kwargs: Any, + ) -> list[Document]: + """Run similarity search with Chroma. + + Args: + query (str): Query text to search for. + k (int): Number of results to return. Defaults to 4. + filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + + Returns: + List[Document]: List of documents most similar to the query text. + """ + docs_and_scores = self.similarity_search_with_score(query, k, filter=filter) + return [doc for doc, _ in docs_and_scores] + + def similarity_search_with_score( + self, + query: str, + k: int = 4, + filter: dict[str, str] | None = None, + where_document: dict[str, str] | None = None, + **kwargs: Any, + ) -> list[tuple[Document, float]]: + """Run similarity search with Chroma with distance. + + Args: + query (str): Query text to search for. + k (int): Number of results to return. Defaults to 4. + filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of documents most similar to + the query text and cosine distance in float for each. + Lower score represents more similarity. + """ + if self._embedding_function is None: + results = self.__query_collection( + query_texts=[query], + n_results=k, + where=filter, + where_document=where_document, ) + else: + query_embedding = self._embedding_function.embed_query(query) + results = self.__query_collection( + query_embeddings=[query_embedding], + n_results=k, + where=filter, + where_document=where_document, + ) + + return _results_to_docs_and_scores(results) def _similarity_search_with_relevance_scores( self, @@ -346,161 +482,3 @@ def similarity_search_with_relevance_scores( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return docs_and_similarities - - def delete_collection(self) -> None: - """Delete the collection.""" - self._client.delete_collection(self._collection.name) - - def get( - self, - ids: OneOrMany[ID] | None = None, - where: Where | None = None, - limit: int | None = None, - offset: int | None = None, - where_document: WhereDocument | None = None, - include: list[str] | None = None, - ) -> dict[str, Any]: - """Gets the collection. - - Args: - ids: The ids of the embeddings to get. Optional. - where: A Where type dict used to filter results by. - E.g. `{"color" : "red", "price": 4.20}`. Optional. - limit: The number of documents to return. Optional. - offset: The offset to start returning results from. - Useful for paging results with limit. Optional. - where_document: A WhereDocument type dict used to filter by the documents. - E.g. `{$contains: "hello"}`. Optional. - include: A list of what to include in the results. - Can contain `"embeddings"`, `"metadatas"`, `"documents"`. - Ids are always included. - Defaults to `["metadatas", "documents"]`. Optional. - """ - kwargs = { - "ids": ids, - "where": where, - "limit": limit, - "offset": offset, - "where_document": where_document, - } - - if include is not None: - kwargs["include"] = include - - return self._collection.get(**kwargs) - - def update_document(self, document_id: str, document: Document) -> None: - """Update a document in the collection. - - Args: - document_id (str): ID of the document to update. - document (Document): Document to update. - """ - return self.update_documents([document_id], [document]) - - def update_documents(self, ids: list[str], documents: list[Document]) -> None: - """Update a document in the collection. - - Args: - ids (List[str]): List of ids of the document to update. - documents (List[Document]): List of documents to update. - """ - text = [document.page_content for document in documents] - metadata = [document.metadata for document in documents] - if self._embedding_function is None: - raise ValueError("For update, you must specify an embedding function on creation.") - embeddings = self._embedding_function.embed_documents(text) - - if hasattr(self._collection._client, "max_batch_size"): # for Chroma 0.4.10 and above - from chromadb.utils.batch_utils import create_batches - - for batch in create_batches( - api=self._collection._client, - ids=ids, - metadatas=metadata, - documents=text, - embeddings=embeddings, - ): - self._collection.update( - ids=batch[0], - embeddings=batch[1], - documents=batch[3], - metadatas=batch[2], - ) - else: - self._collection.update( - ids=ids, - embeddings=embeddings, - documents=text, - metadatas=metadata, - ) - - @classmethod - def from_texts( - cls, - texts: list[str], - embedding: HuggingFaceEmbedder | None = None, - metadatas: list[dict] | None = None, - ids: list[str] | None = None, - collection_name: str = "default", - persist_directory: str | None = None, - client_settings: chromadb.config.Settings | None = None, - client=None, - collection_metadata: dict | None = None, - **kwargs: Any, - ): - """Create a Chroma vectorstore from a raw documents. - - If a persist_directory is specified, the collection will be persisted there. - Otherwise, the data will be ephemeral in-memory. - - Args: - texts (List[str]): List of texts to add to the collection. - collection_name (str): Name of the collection to create. - persist_directory (Optional[str]): Directory to persist the collection. - embedding (Optional[Embeddings]): Embedding function. Defaults to None. - metadatas (Optional[List[dict]]): List of metadatas. Defaults to None. - ids (Optional[List[str]]): List of document IDs. Defaults to None. - client_settings (Optional[chromadb.config.Settings]): Chroma client settings - collection_metadata (Optional[Dict]): Collection configurations. - Defaults to None. - - Returns: - Chroma: Chroma vectorstore. - """ - chroma_collection = cls( - collection_name=collection_name, - embedding_function=embedding, - persist_directory=persist_directory, - client_settings=client_settings, - client=client, - collection_metadata=collection_metadata, - **kwargs, - ) - if ids is None: - ids = [str(uuid.uuid1()) for _ in texts] - if hasattr(chroma_collection._client, "max_batch_size"): # for Chroma 0.4.10 and above - from chromadb.utils.batch_utils import create_batches - - for batch in create_batches( - api=chroma_collection._client, - ids=ids, - metadatas=metadatas, - documents=texts, - ): - chroma_collection.add_texts( - texts=batch[3] if batch[3] else [], - metadatas=batch[2] if batch[2] else None, - ids=batch[0], - ) - else: - chroma_collection.add_texts(texts=texts, metadatas=metadatas, ids=ids) - return chroma_collection - - def delete(self, ids: list[str] | None = None, **kwargs: Any) -> None: - """Delete by vector IDs. - - Args: - ids: List of ids to delete. - """ - self._collection.delete(ids=ids) diff --git a/experiments/explore_memory.py b/experiments/explore_memory.py index ca37962..56c34e4 100644 --- a/experiments/explore_memory.py +++ b/experiments/explore_memory.py @@ -1,7 +1,7 @@ from pathlib import Path import chromadb -from bot.memory.embedder import HuggingFaceEmbedder +from bot.memory.embedder import Embedder from bot.memory.vector_memory import VectorMemory from helpers.prettier import prettify_source from vector_database.chroma import Chroma @@ -13,7 +13,7 @@ # Contains an extract of things the user said in the past; episodic_vector_store_path = root_folder / "vector_store" / "episodic_index" - embedding = HuggingFaceEmbedder() + embedding = Embedder() index = VectorMemory(vector_store_path=str(declarative_vector_store_path), embedding=embedding) # query = "" @@ -27,14 +27,14 @@ persistent_client = chromadb.PersistentClient(path=str(episodic_vector_store_path)) collection = persistent_client.get_or_create_collection("episodic_memory") collection.add(ids=["1", "2", "3"], documents=["a", "b", "c"]) - langchain_chroma = Chroma( + chroma = Chroma( client=persistent_client, collection_name="episodic_memory", embedding_function=embedding, ) - docs = langchain_chroma.similarity_search("a") - docs_with_score = langchain_chroma.similarity_search_with_score("a") - docs_with_relevance_score = langchain_chroma.similarity_search_with_relevance_scores("a") + docs = chroma.similarity_search("a") + docs_with_score = chroma.similarity_search_with_score("a") + docs_with_relevance_score = chroma.similarity_search_with_relevance_scores("a") matched_doc = max(docs_with_relevance_score, key=lambda x: x[1]) # The returned distance score is cosine distance. Therefore, a lower score is better.