From 98fb6c52df1951d0d4f22a55a9333d4d083abcca Mon Sep 17 00:00:00 2001 From: Vivek Khandelwal Date: Mon, 31 Jul 2023 15:40:50 +0000 Subject: [PATCH] Expand pipelines to fix streaming of tokens --- .../langchain/expanded_pipelines.py | 432 ++++++++++++++++++ .../langchain/gpt_langchain.py | 52 +-- 2 files changed, 434 insertions(+), 50 deletions(-) create mode 100644 apps/language_models/langchain/expanded_pipelines.py diff --git a/apps/language_models/langchain/expanded_pipelines.py b/apps/language_models/langchain/expanded_pipelines.py new file mode 100644 index 0000000000..cca7131cb9 --- /dev/null +++ b/apps/language_models/langchain/expanded_pipelines.py @@ -0,0 +1,432 @@ +"""Load question answering chains.""" +from __future__ import annotations +from typing import ( + Any, + Mapping, + Optional, + Dict, + List, + Sequence, + Tuple, + Union, + Protocol, +) +from langchain.base_language import BaseLanguageModel +from langchain.callbacks.base import BaseCallbackManager +from langchain.chains.question_answering import stuff_prompt +from langchain.prompts.base import BasePromptTemplate +from langchain.docstore.document import Document +from abc import ABC, abstractmethod +from langchain.chains.base import Chain +from langchain.callbacks.manager import ( + CallbackManager, + CallbackManagerForChainRun, + Callbacks, +) +from langchain.input import get_colored_text +from langchain.load.dump import dumpd +from langchain.prompts.prompt import PromptTemplate +from langchain.schema import LLMResult, PromptValue +from pydantic import Extra, Field, root_validator + + +def format_document(doc: Document, prompt: BasePromptTemplate) -> str: + """Format a document into a string based on a prompt template.""" + base_info = {"page_content": doc.page_content} + base_info.update(doc.metadata) + missing_metadata = set(prompt.input_variables).difference(base_info) + if len(missing_metadata) > 0: + required_metadata = [ + iv for iv in prompt.input_variables if iv != "page_content" + ] + raise ValueError( + f"Document prompt requires documents to have metadata variables: " + f"{required_metadata}. Received document with missing metadata: " + f"{list(missing_metadata)}." + ) + document_info = {k: base_info[k] for k in prompt.input_variables} + return prompt.format(**document_info) + + +class BaseCombineDocumentsChain(Chain, ABC): + """Base interface for chains combining documents.""" + + input_key: str = "input_documents" #: :meta private: + output_key: str = "output_text" #: :meta private: + + @property + def input_keys(self) -> List[str]: + """Expect input key. + + :meta private: + """ + return [self.input_key] + + @property + def output_keys(self) -> List[str]: + """Return output key. + + :meta private: + """ + return [self.output_key] + + def prompt_length( + self, docs: List[Document], **kwargs: Any + ) -> Optional[int]: + """Return the prompt length given the documents passed in. + + Returns None if the method does not depend on the prompt length. + """ + return None + + @abstractmethod + def combine_docs( + self, docs: List[Document], **kwargs: Any + ) -> Tuple[str, dict]: + """Combine documents into a single string.""" + + def _call( + self, + inputs: Dict[str, List[Document]], + run_manager: Optional[CallbackManagerForChainRun] = None, + ) -> Dict[str, str]: + _run_manager = ( + run_manager or CallbackManagerForChainRun.get_noop_manager() + ) + docs = inputs[self.input_key] + # Other keys are assumed to be needed for LLM prediction + other_keys = {k: v for k, v in inputs.items() if k != self.input_key} + output, extra_return_dict = self.combine_docs( + docs, callbacks=_run_manager.get_child(), **other_keys + ) + extra_return_dict[self.output_key] = output + return extra_return_dict + + +class LLMChain(Chain): + """Chain to run queries against LLMs. + + Example: + .. code-block:: python + + from langchain import LLMChain, OpenAI, PromptTemplate + prompt_template = "Tell me a {adjective} joke" + prompt = PromptTemplate( + input_variables=["adjective"], template=prompt_template + ) + llm = LLMChain(llm=OpenAI(), prompt=prompt) + """ + + @property + def lc_serializable(self) -> bool: + return True + + prompt: BasePromptTemplate + """Prompt object to use.""" + llm: BaseLanguageModel + output_key: str = "text" #: :meta private: + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + arbitrary_types_allowed = True + + @property + def input_keys(self) -> List[str]: + """Will be whatever keys the prompt expects. + + :meta private: + """ + return self.prompt.input_variables + + @property + def output_keys(self) -> List[str]: + """Will always return text key. + + :meta private: + """ + return [self.output_key] + + def _call( + self, + inputs: Dict[str, Any], + run_manager: Optional[CallbackManagerForChainRun] = None, + ) -> Dict[str, str]: + response = self.generate([inputs], run_manager=run_manager) + return self.create_outputs(response)[0] + + def generate( + self, + input_list: List[Dict[str, Any]], + run_manager: Optional[CallbackManagerForChainRun] = None, + ) -> LLMResult: + """Generate LLM result from inputs.""" + prompts, stop = self.prep_prompts(input_list, run_manager=run_manager) + return self.llm.generate_prompt( + prompts, + stop, + callbacks=run_manager.get_child() if run_manager else None, + ) + + def prep_prompts( + self, + input_list: List[Dict[str, Any]], + run_manager: Optional[CallbackManagerForChainRun] = None, + ) -> Tuple[List[PromptValue], Optional[List[str]]]: + """Prepare prompts from inputs.""" + stop = None + if "stop" in input_list[0]: + stop = input_list[0]["stop"] + prompts = [] + for inputs in input_list: + selected_inputs = { + k: inputs[k] for k in self.prompt.input_variables + } + prompt = self.prompt.format_prompt(**selected_inputs) + _colored_text = get_colored_text(prompt.to_string(), "green") + _text = "Prompt after formatting:\n" + _colored_text + if run_manager: + run_manager.on_text(_text, end="\n", verbose=self.verbose) + if "stop" in inputs and inputs["stop"] != stop: + raise ValueError( + "If `stop` is present in any inputs, should be present in all." + ) + prompts.append(prompt) + return prompts, stop + + def apply( + self, input_list: List[Dict[str, Any]], callbacks: Callbacks = None + ) -> List[Dict[str, str]]: + """Utilize the LLM generate method for speed gains.""" + callback_manager = CallbackManager.configure( + callbacks, self.callbacks, self.verbose + ) + run_manager = callback_manager.on_chain_start( + dumpd(self), + {"input_list": input_list}, + ) + try: + response = self.generate(input_list, run_manager=run_manager) + except (KeyboardInterrupt, Exception) as e: + run_manager.on_chain_error(e) + raise e + outputs = self.create_outputs(response) + run_manager.on_chain_end({"outputs": outputs}) + return outputs + + def create_outputs(self, response: LLMResult) -> List[Dict[str, str]]: + """Create outputs from response.""" + return [ + # Get the text of the top generated string. + {self.output_key: generation[0].text} + for generation in response.generations + ] + + def predict(self, callbacks: Callbacks = None, **kwargs: Any) -> str: + """Format prompt with kwargs and pass to LLM. + + Args: + callbacks: Callbacks to pass to LLMChain + **kwargs: Keys to pass to prompt template. + + Returns: + Completion from LLM. + + Example: + .. code-block:: python + + completion = llm.predict(adjective="funny") + """ + return self(kwargs, callbacks=callbacks)[self.output_key] + + def predict_and_parse( + self, callbacks: Callbacks = None, **kwargs: Any + ) -> Union[str, List[str], Dict[str, Any]]: + """Call predict and then parse the results.""" + result = self.predict(callbacks=callbacks, **kwargs) + if self.prompt.output_parser is not None: + return self.prompt.output_parser.parse(result) + else: + return result + + def apply_and_parse( + self, input_list: List[Dict[str, Any]], callbacks: Callbacks = None + ) -> Sequence[Union[str, List[str], Dict[str, str]]]: + """Call apply and then parse the results.""" + result = self.apply(input_list, callbacks=callbacks) + return self._parse_result(result) + + def _parse_result( + self, result: List[Dict[str, str]] + ) -> Sequence[Union[str, List[str], Dict[str, str]]]: + if self.prompt.output_parser is not None: + return [ + self.prompt.output_parser.parse(res[self.output_key]) + for res in result + ] + else: + return result + + @property + def _chain_type(self) -> str: + return "llm_chain" + + @classmethod + def from_string(cls, llm: BaseLanguageModel, template: str) -> LLMChain: + """Create LLMChain from LLM and template.""" + prompt_template = PromptTemplate.from_template(template) + return cls(llm=llm, prompt=prompt_template) + + +def _get_default_document_prompt() -> PromptTemplate: + return PromptTemplate( + input_variables=["page_content"], template="{page_content}" + ) + + +class StuffDocumentsChain(BaseCombineDocumentsChain): + """Chain that combines documents by stuffing into context.""" + + llm_chain: LLMChain + """LLM wrapper to use after formatting documents.""" + document_prompt: BasePromptTemplate = Field( + default_factory=_get_default_document_prompt + ) + """Prompt to use to format each document.""" + document_variable_name: str + """The variable name in the llm_chain to put the documents in. + If only one variable in the llm_chain, this need not be provided.""" + document_separator: str = "\n\n" + """The string with which to join the formatted documents""" + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + arbitrary_types_allowed = True + + @root_validator(pre=True) + def get_default_document_variable_name(cls, values: Dict) -> Dict: + """Get default document variable name, if not provided.""" + llm_chain_variables = values["llm_chain"].prompt.input_variables + if "document_variable_name" not in values: + if len(llm_chain_variables) == 1: + values["document_variable_name"] = llm_chain_variables[0] + else: + raise ValueError( + "document_variable_name must be provided if there are " + "multiple llm_chain_variables" + ) + else: + if values["document_variable_name"] not in llm_chain_variables: + raise ValueError( + f"document_variable_name {values['document_variable_name']} was " + f"not found in llm_chain input_variables: {llm_chain_variables}" + ) + return values + + def _get_inputs(self, docs: List[Document], **kwargs: Any) -> dict: + # Format each document according to the prompt + doc_strings = [ + format_document(doc, self.document_prompt) for doc in docs + ] + # Join the documents together to put them in the prompt. + inputs = { + k: v + for k, v in kwargs.items() + if k in self.llm_chain.prompt.input_variables + } + inputs[self.document_variable_name] = self.document_separator.join( + doc_strings + ) + return inputs + + def prompt_length( + self, docs: List[Document], **kwargs: Any + ) -> Optional[int]: + """Get the prompt length by formatting the prompt.""" + inputs = self._get_inputs(docs, **kwargs) + prompt = self.llm_chain.prompt.format(**inputs) + return self.llm_chain.llm.get_num_tokens(prompt) + + def combine_docs( + self, docs: List[Document], callbacks: Callbacks = None, **kwargs: Any + ) -> Tuple[str, dict]: + """Stuff all documents into one prompt and pass to LLM.""" + inputs = self._get_inputs(docs, **kwargs) + # Call predict on the LLM. + return self.llm_chain.predict(callbacks=callbacks, **inputs), {} + + @property + def _chain_type(self) -> str: + return "stuff_documents_chain" + + +class LoadingCallable(Protocol): + """Interface for loading the combine documents chain.""" + + def __call__( + self, llm: BaseLanguageModel, **kwargs: Any + ) -> BaseCombineDocumentsChain: + """Callable to load the combine documents chain.""" + + +def _load_stuff_chain( + llm: BaseLanguageModel, + prompt: Optional[BasePromptTemplate] = None, + document_variable_name: str = "context", + verbose: Optional[bool] = None, + callback_manager: Optional[BaseCallbackManager] = None, + callbacks: Callbacks = None, + **kwargs: Any, +) -> StuffDocumentsChain: + _prompt = prompt or stuff_prompt.PROMPT_SELECTOR.get_prompt(llm) + llm_chain = LLMChain( + llm=llm, + prompt=_prompt, + verbose=verbose, + callback_manager=callback_manager, + callbacks=callbacks, + ) + # TODO: document prompt + return StuffDocumentsChain( + llm_chain=llm_chain, + document_variable_name=document_variable_name, + verbose=verbose, + callback_manager=callback_manager, + **kwargs, + ) + + +def load_qa_chain( + llm: BaseLanguageModel, + chain_type: str = "stuff", + verbose: Optional[bool] = None, + callback_manager: Optional[BaseCallbackManager] = None, + **kwargs: Any, +) -> BaseCombineDocumentsChain: + """Load question answering chain. + + Args: + llm: Language Model to use in the chain. + chain_type: Type of document combining chain to use. Should be one of "stuff", + "map_reduce", "map_rerank", and "refine". + verbose: Whether chains should be run in verbose mode or not. Note that this + applies to all chains that make up the final chain. + callback_manager: Callback manager to use for the chain. + + Returns: + A chain to use for question answering. + """ + loader_mapping: Mapping[str, LoadingCallable] = { + "stuff": _load_stuff_chain, + } + if chain_type not in loader_mapping: + raise ValueError( + f"Got unsupported chain type: {chain_type}. " + f"Should be one of {loader_mapping.keys()}" + ) + return loader_mapping[chain_type]( + llm, verbose=verbose, callback_manager=callback_manager, **kwargs + ) diff --git a/apps/language_models/langchain/gpt_langchain.py b/apps/language_models/langchain/gpt_langchain.py index 16ac5526d8..aea7507a3b 100644 --- a/apps/language_models/langchain/gpt_langchain.py +++ b/apps/language_models/langchain/gpt_langchain.py @@ -87,7 +87,7 @@ UnstructuredExcelLoader, ) from langchain.text_splitter import RecursiveCharacterTextSplitter, Language -from langchain.chains.question_answering import load_qa_chain +from expanded_pipelines import load_qa_chain from langchain.docstore.document import Document from langchain import PromptTemplate, HuggingFaceTextGenInference from langchain.vectorstores import Chroma @@ -2958,56 +2958,8 @@ def get_similarity_chain( template=template, ) chain = load_qa_chain(llm, prompt=prompt) - else: - # only if use_openai_model = True, unused normally except in testing - chain = load_qa_with_sources_chain(llm) - if not use_context: - chain_kwargs = dict(input_documents=[], question=query) - else: - chain_kwargs = dict(input_documents=docs, question=query) + chain_kwargs = dict(input_documents=docs, question=query) target = wrapped_partial(chain, chain_kwargs) - elif langchain_action in [ - LangChainAction.SUMMARIZE_MAP.value, - LangChainAction.SUMMARIZE_REFINE, - LangChainAction.SUMMARIZE_ALL.value, - ]: - from langchain.chains.summarize import load_summarize_chain - - if langchain_action == LangChainAction.SUMMARIZE_MAP.value: - prompt = PromptTemplate( - input_variables=["text"], template=template - ) - chain = load_summarize_chain( - llm, - chain_type="map_reduce", - map_prompt=prompt, - combine_prompt=prompt, - return_intermediate_steps=True, - ) - target = wrapped_partial( - chain, {"input_documents": docs} - ) # , return_only_outputs=True) - elif langchain_action == LangChainAction.SUMMARIZE_ALL.value: - assert use_template - prompt = PromptTemplate( - input_variables=["text"], template=template - ) - chain = load_summarize_chain( - llm, - chain_type="stuff", - prompt=prompt, - return_intermediate_steps=True, - ) - target = wrapped_partial(chain) - elif langchain_action == LangChainAction.SUMMARIZE_REFINE.value: - chain = load_summarize_chain( - llm, chain_type="refine", return_intermediate_steps=True - ) - target = wrapped_partial(chain) - else: - raise RuntimeError( - "No such langchain_action=%s" % langchain_action - ) else: raise RuntimeError("No such langchain_action=%s" % langchain_action)