Skip to content

Commit

Permalink
Implement name and description discovery over nats and fastapi
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl committed Oct 9, 2024
1 parent d60bd81 commit eebfa17
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
22 changes: 18 additions & 4 deletions fastagency/adapters/fastapi/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ async def initiate_chat(

return init_msg

@router.get("/discover")
async def discover() -> list[WorkflowInfo]:
@router.get("/discovery")
def discovery() -> list[WorkflowInfo]:

Check warning on line 90 in fastagency/adapters/fastapi/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/fastapi/base.py#L89-L90

Added lines #L89 - L90 were not covered by tests
names = self.wf.names
descriptions = [self.wf.get_description(name) for name in names]
return [
Expand Down Expand Up @@ -274,9 +274,23 @@ async def run_lifespan() -> None:

return "FastAPIWorkflows.run() completed"

def _get_workflow_info(self) -> list[dict[str, str]]:
resp = requests.get(f"{self.fastapi_url}/discovery", timeout=5)
return resp.json() # type: ignore [no-any-return]

Check warning on line 279 in fastagency/adapters/fastapi/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/fastapi/base.py#L278-L279

Added lines #L278 - L279 were not covered by tests

def _get_names(self) -> list[str]:
return [workflow["name"] for workflow in self._get_workflow_info()]

def _get_description(self, name: str) -> str:
return next(
workflow["description"]
for workflow in self._get_workflow_info()
if workflow["name"] == name
)

@property
def names(self) -> list[str]:
return ["simple_learning"]
return self._get_names()

Check warning on line 293 in fastagency/adapters/fastapi/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/fastapi/base.py#L293

Added line #L293 was not covered by tests

def get_description(self, name: str) -> str:
return "Student and teacher learning chat"
return self._get_description(name)

Check warning on line 296 in fastagency/adapters/fastapi/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/fastapi/base.py#L296

Added line #L296 was not covered by tests
53 changes: 50 additions & 3 deletions fastagency/adapters/nats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from asyncer import asyncify, syncify
from faststream import FastStream, Logger
from faststream.nats import JStream, NatsBroker, NatsMessage
from nats.js import api
from nats.aio.client import Client as NatsClient
from nats.js import JetStreamContext, api
from nats.js.kv import KeyValue
from pydantic import BaseModel

from ...base import UI, ProviderProtocol, run_workflow
Expand Down Expand Up @@ -55,6 +57,8 @@ class InitiateModel(BaseModel):
# server prints message to client; chat.server.messages.<user_uuid>.<chat_uuid>
# we create this topic dynamically and subscribe to it => worker is fixed
"chat.server.messages.*.*",
# discovery subject
"discovery",
],
)

Expand Down Expand Up @@ -198,11 +202,21 @@ def callback(t: asyncio.Task[Any]) -> None:
except Exception as e:
await self._send_error_msg(e, logger)

async def _publish_discovery(self) -> None:
"""Publish the discovery message."""
jetstream_key_value = await self.broker.key_value(bucket="discovery")

Check warning on line 207 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L207

Added line #L207 was not covered by tests

names = self.provider.names

Check warning on line 209 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L209

Added line #L209 was not covered by tests
for name in names:
description = self.provider.get_description(name)
await jetstream_key_value.put(name, description.encode())

Check warning on line 212 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L211-L212

Added lines #L211 - L212 were not covered by tests

# todo: make it a router
@asynccontextmanager
async def lifespan(self, app: Any) -> AsyncIterator[None]:
async with self.broker:
await self.broker.start()
await self._publish_discovery()

Check warning on line 219 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L219

Added line #L219 was not covered by tests
try:
yield
finally:
Expand Down Expand Up @@ -430,9 +444,42 @@ async def run_lifespan() -> None:

return "NatsWorkflows.run() completed"

@asynccontextmanager
async def _get_jetstream_context(self) -> AsyncIterator[JetStreamContext]:
nc = NatsClient()

Check warning on line 449 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L449

Added line #L449 was not covered by tests
await nc.connect(self.nats_url, user=self.user, password=self.password)
js = nc.jetstream()
try:
yield js

Check warning on line 453 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L451-L453

Added lines #L451 - L453 were not covered by tests
finally:
await nc.close()

Check warning on line 455 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L455

Added line #L455 was not covered by tests

@asynccontextmanager
async def _get_jetstream_key_value(
self, bucket: str = "discovery"
) -> AsyncIterator[KeyValue]:
async with self._get_jetstream_context() as js:
kv = await js.create_key_value(bucket=bucket)
yield kv

Check warning on line 463 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L461-L463

Added lines #L461 - L463 were not covered by tests

async def _get_names(self) -> list[str]:
async with self._get_jetstream_key_value() as kv:
names = await kv.keys()
return names

Check warning on line 468 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L466-L468

Added lines #L466 - L468 were not covered by tests

async def _get_description(self, name: str) -> str:
async with self._get_jetstream_key_value() as kv:
description = await kv.get(name)
return description.value.decode() if description.value else ""

Check warning on line 473 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L471-L473

Added lines #L471 - L473 were not covered by tests

@property
def names(self) -> list[str]:
return ["simple_learning"]
names = syncify(self._get_names)()

Check warning on line 477 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L477

Added line #L477 was not covered by tests
logger.debug(f"Names: {names}")
return names

Check warning on line 479 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L479

Added line #L479 was not covered by tests

def get_description(self, name: str) -> str:
return "Student and teacher learning chat"
description = syncify(self._get_description)(name)

Check warning on line 482 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L482

Added line #L482 was not covered by tests
logger.debug(f"Description: {description}")
# return "Student and teacher learning chat"
return description

Check warning on line 485 in fastagency/adapters/nats/base.py

View check run for this annotation

Codecov / codecov/patch

fastagency/adapters/nats/base.py#L485

Added line #L485 was not covered by tests

0 comments on commit eebfa17

Please sign in to comment.