diff --git a/appinfo/info.xml b/appinfo/info.xml index e5bbc9a..11036a2 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -29,6 +29,7 @@ See [the nextcloud admin docs](https://docs.nextcloud.com/server/latest/admin_ma AI_PROVIDERS + TASK_PROCESSING false diff --git a/lib/main.py b/lib/main.py index eb55933..a96969f 100644 --- a/lib/main.py +++ b/lib/main.py @@ -1,20 +1,20 @@ """Tha main module of the llm2 app """ -import queue import threading -import typing +import time from contextlib import asynccontextmanager from time import perf_counter -import pydantic -from chains import generate_chains -from fastapi import Depends, FastAPI, responses +from fastapi import FastAPI from nc_py_api import AsyncNextcloudApp, NextcloudApp -from nc_py_api.ex_app import LogLvl, anc_app, run_app, set_handlers +from nc_py_api.ex_app import LogLvl, run_app, set_handlers + +from chains import generate_chains chains = generate_chains() + @asynccontextmanager async def lifespan(_app: FastAPI): set_handlers( @@ -27,57 +27,54 @@ async def lifespan(_app: FastAPI): APP = FastAPI(lifespan=lifespan) -TASK_LIST: queue.Queue = queue.Queue(maxsize=100) class BackgroundProcessTask(threading.Thread): def run(self, *args, **kwargs): # pylint: disable=unused-argument + nc = NextcloudApp() + + provider_ids = set() + task_type_ids = set() + for chain_name, _ in chains.items(): + provider_ids.add("llm2:" + chain_name) + (model, task) = chain_name.split(":", 2) + task_type_ids.add("core:text2text:" + task) + while True: - task = TASK_LIST.get(block=True) + response = nc.providers.task_processing.next_task(list(provider_ids), list(task_type_ids)) + if not response: + time.sleep(5) + continue + + task = response["task"] + provider = response["provider"] + try: - chain_name = task.get("chain") + chain_name = provider["name"][5:] print(f"chain: {chain_name}", flush=True) chain_load = chains.get(chain_name) if chain_load is None: - NextcloudApp().providers.text_processing.report_result( - task["id"], error="Requested model is not available" + NextcloudApp().providers.task_processing.report_result( + task["id"], error_message="Requested model is not available" ) continue chain = chain_load() print("Generating reply", flush=True) time_start = perf_counter() - print(task.get("prompt")) - result = chain.invoke(task.get("prompt")).get("text") + print(task.get("input").get("input")) + result = chain.invoke(task.get("input").get("input")).get("text") del chain print(f"reply generated: {round(float(perf_counter() - time_start), 2)}s", flush=True) print(result, flush=True) - NextcloudApp().providers.text_processing.report_result( + NextcloudApp().providers.task_processing.report_result( task["id"], - str(result), + {"output": str(result)}, ) except Exception as e: # noqa print(str(e), flush=True) nc = NextcloudApp() nc.log(LogLvl.ERROR, str(e)) - nc.providers.text_processing.report_result(task["id"], error=str(e)) - - -class Input(pydantic.BaseModel): - prompt: str - task_id: int - - -@APP.post("/chain/{chain_name}") -async def tiny_llama( - _nc: typing.Annotated[AsyncNextcloudApp, Depends(anc_app)], - req: Input, - chain_name=None, -): - try: - TASK_LIST.put({"prompt": req.prompt, "id": req.task_id, "chain": chain_name}, block=False) - except queue.Full: - return responses.JSONResponse(content={"error": "task queue is full"}, status_code=429) - return responses.Response() + nc.providers.task_processing.report_result(task["id"], error_message=str(e)) async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str: @@ -86,17 +83,16 @@ async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str: for chain_name, _ in chains.items(): (model, task) = chain_name.split(":", 2) try: - await nc.providers.text_processing.register( - "llm2:"+chain_name, "Local Large language Model: " + model, "/chain/" + chain_name, task + await nc.providers.task_processing.register( + "llm2:" + chain_name, "Local Large language Model: " + model, "core:text2text:" + task ) - print(f"Registering {model} - {task}", flush=True) + print(f"Registering {chain_name}", flush=True) except Exception as e: print(f"Failed to register", f"{model} - {task}", f"Error:", f"{e}\n", flush=True) else: for chain_name, chain in chains.items(): - (model, task) = chain_name.split(":", 2) - await nc.providers.text_processing.unregister(model) - print(f"Unregistering {model} - {task}", flush=True) + await nc.providers.task_processing.unregister("llm2:" + chain_name) + print(f"Unregistering {chain_name}", flush=True) return "" diff --git a/poetry.lock b/poetry.lock index 2a54128..18c5284 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1122,13 +1122,13 @@ files = [ [[package]] name = "nc-py-api" -version = "0.13.0" +version = "0.14.0" description = "Nextcloud Python Framework" optional = false python-versions = ">=3.10" files = [ - {file = "nc_py_api-0.13.0-py3-none-any.whl", hash = "sha256:628c5616fd562addf4badac99c48d00c98b81192311c6bcbfede4149213e3dbb"}, - {file = "nc_py_api-0.13.0.tar.gz", hash = "sha256:2b0eb78852fd321e6ecbf81c0a83b22397799259fdd3998725f9bcc61d550bf5"}, + {file = "nc_py_api-0.14.0-py3-none-any.whl", hash = "sha256:c30cbead6fe1c476e994474b1bd8ae567b97953074d69e0517d26b893f75e8bb"}, + {file = "nc_py_api-0.14.0.tar.gz", hash = "sha256:8d71f78a00a2eeff71feff5580c445b03eeefc52055a07d601526386d95de6ee"}, ] [package.dependencies] @@ -2170,4 +2170,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "f82004b7f6a2e9e64f2c982248edac5697a9a94d5b6ea29696dfe0028b75fcb3" +content-hash = "33e46dcf07134b1468acd67517787ae562ae61c302e3c7120339b3d9da7e4a2c" diff --git a/pyproject.toml b/pyproject.toml index 5717c6f..0d2b06d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,11 +11,11 @@ package-mode = false python = "^3.10" pydantic = "^2.8.2" fastapi = "^0.111.0" -nc-py-api = {extras = ["app"], version = "^0.13.0"} +nc-py-api = {extras = ["app"], version = "^0.14.0"} langchain = "^0.1.0" llama-cpp-python = "0.2.76" [build-system] requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" \ No newline at end of file +build-backend = "poetry.core.masonry.api"