Skip to content

Commit

Permalink
feat: Add task processing API
Browse files Browse the repository at this point in the history
Signed-off-by: provokateurin <kate@provokateurin.de>
  • Loading branch information
provokateurin committed Jul 11, 2024
1 parent 7cfca62 commit f29f439
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 46 deletions.
1 change: 1 addition & 0 deletions appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ See [the nextcloud admin docs](https://docs.nextcloud.com/server/latest/admin_ma
</docker-install>
<scopes>
<value>AI_PROVIDERS</value>
<value>TASK_PROCESSING</value>
</scopes>
<system>false</system>
</external-app>
Expand Down
76 changes: 36 additions & 40 deletions lib/main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"""Tha main module of the llm2 app
"""

import queue
import threading
import typing
import time
from contextlib import asynccontextmanager
from time import perf_counter

import pydantic
from chains import generate_chains
from fastapi import Depends, FastAPI, responses
from fastapi import FastAPI
from nc_py_api import AsyncNextcloudApp, NextcloudApp
from nc_py_api.ex_app import LogLvl, anc_app, run_app, set_handlers
from nc_py_api.ex_app import LogLvl, run_app, set_handlers

from chains import generate_chains

chains = generate_chains()


@asynccontextmanager
async def lifespan(_app: FastAPI):
set_handlers(
Expand All @@ -27,57 +27,54 @@ async def lifespan(_app: FastAPI):


APP = FastAPI(lifespan=lifespan)
TASK_LIST: queue.Queue = queue.Queue(maxsize=100)


class BackgroundProcessTask(threading.Thread):
def run(self, *args, **kwargs): # pylint: disable=unused-argument
nc = NextcloudApp()

provider_ids = set()
task_type_ids = set()
for chain_name, _ in chains.items():
provider_ids.add("llm2:" + chain_name)
(model, task) = chain_name.split(":", 2)
task_type_ids.add("core:text2text:" + task)

while True:
task = TASK_LIST.get(block=True)
response = nc.providers.task_processing.next_task(list(provider_ids), list(task_type_ids))
if not response:
time.sleep(5)
continue

task = response["task"]
provider = response["provider"]

try:
chain_name = task.get("chain")
chain_name = provider["name"][5:]
print(f"chain: {chain_name}", flush=True)
chain_load = chains.get(chain_name)
if chain_load is None:
NextcloudApp().providers.text_processing.report_result(
task["id"], error="Requested model is not available"
NextcloudApp().providers.task_processing.report_result(
task["id"], error_message="Requested model is not available"
)
continue
chain = chain_load()
print("Generating reply", flush=True)
time_start = perf_counter()
print(task.get("prompt"))
result = chain.invoke(task.get("prompt")).get("text")
print(task.get("input").get("input"))
result = chain.invoke(task.get("input").get("input")).get("text")
del chain
print(f"reply generated: {round(float(perf_counter() - time_start), 2)}s", flush=True)
print(result, flush=True)
NextcloudApp().providers.text_processing.report_result(
NextcloudApp().providers.task_processing.report_result(
task["id"],
str(result),
{"output": str(result)},
)
except Exception as e: # noqa
print(str(e), flush=True)
nc = NextcloudApp()
nc.log(LogLvl.ERROR, str(e))
nc.providers.text_processing.report_result(task["id"], error=str(e))


class Input(pydantic.BaseModel):
prompt: str
task_id: int


@APP.post("/chain/{chain_name}")
async def tiny_llama(
_nc: typing.Annotated[AsyncNextcloudApp, Depends(anc_app)],
req: Input,
chain_name=None,
):
try:
TASK_LIST.put({"prompt": req.prompt, "id": req.task_id, "chain": chain_name}, block=False)
except queue.Full:
return responses.JSONResponse(content={"error": "task queue is full"}, status_code=429)
return responses.Response()
nc.providers.task_processing.report_result(task["id"], error_message=str(e))


async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
Expand All @@ -86,17 +83,16 @@ async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str:
for chain_name, _ in chains.items():
(model, task) = chain_name.split(":", 2)
try:
await nc.providers.text_processing.register(
"llm2:"+chain_name, "Local Large language Model: " + model, "/chain/" + chain_name, task
await nc.providers.task_processing.register(
"llm2:" + chain_name, "Local Large language Model: " + model, "core:text2text:" + task
)
print(f"Registering {model} - {task}", flush=True)
print(f"Registering {chain_name}", flush=True)
except Exception as e:
print(f"Failed to register", f"{model} - {task}", f"Error:", f"{e}\n", flush=True)
else:
for chain_name, chain in chains.items():
(model, task) = chain_name.split(":", 2)
await nc.providers.text_processing.unregister(model)
print(f"Unregistering {model} - {task}", flush=True)
await nc.providers.task_processing.unregister("llm2:" + chain_name)
print(f"Unregistering {chain_name}", flush=True)
return ""


Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ package-mode = false
python = "^3.10"
pydantic = "^2.8.2"
fastapi = "^0.111.0"
nc-py-api = {extras = ["app"], version = "^0.13.0"}
nc-py-api = {extras = ["app"], version = "^0.14.0"}
langchain = "^0.1.0"
llama-cpp-python = "0.2.76"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
build-backend = "poetry.core.masonry.api"

0 comments on commit f29f439

Please sign in to comment.