Skip to content

Commit

Permalink
single bot stream update
Browse files Browse the repository at this point in the history
  • Loading branch information
f1delius committed Jul 18, 2024
1 parent 9eff8b1 commit 94d941d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 19 deletions.
25 changes: 8 additions & 17 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,28 +254,19 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No
await self.client.room_typing(room_id, typing_state=True)
userEmail = allow_message[1]
if self.workflow:
api_url = f"{self.superagent_url}/api/v1/workflows/{self.workflow_id}/invoke"

if self.streaming == True:
get_steps = await workflow_steps(self.superagent_url, self.workflow_id, self.api_key, self.httpx_client)
self.msg_limit[sender_id] += len(get_steps)
await stream_workflow(api_url, self.api_key, content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client, self.user_id, userEmail, self.msg_limit[sender_id])
await stream_workflow(self.superagent_url, self.api_key, self.workflow_id,
content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client, self.user_id, userEmail, self.msg_limit[sender_id])
return
else:
exec_workflow = await workflow_invoke(
api_url, content_body, self.api_key, self.httpx_client, thread_event_id, userEmail)
self.msg_limit[sender_id] += 1
await send_room_message(
self.client,
room_id,
reply_message=exec_workflow,
sender_id=sender_id,
user_message=raw_user_message,
reply_to_event_id=reply_to_event_id,
thread_id=thread_id,
msg_limit=self.msg_limit[sender_id],
personal_api=userEmail
)
get_steps = await workflow_steps(self.superagent_url, self.workflow_id, self.api_key, self.httpx_client)
self.msg_limit[sender_id] += len(get_steps)
await stream_workflow(self.superagent_url, self.api_key, self.workflow_id,
content_body, get_steps, thread_event_id, reply_to_event_id,
room_id, self.httpx_client, self.user_id, userEmail,
self.msg_limit[sender_id], single_bot=True)
return
result = await superagent_invoke(self.superagent_url, self.agent_id, content_body, self.api_key, self.httpx_client, thread_event_id)
self.msg_limit[sender_id] += 1
Expand Down
12 changes: 10 additions & 2 deletions src/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def workflow_steps(

async def workflow_invoke(
superagent_url: str,
workflow_id: str,
prompt: str,
api_key: str,
session: httpx.AsyncClient,
Expand Down Expand Up @@ -62,6 +63,7 @@ async def workflow_invoke(
"sessionId": sessionId,
"enableStreaming": False,
}
api_url = f"{superagent_url}/api/v1/workflows/{workflow_id}/invoke"
if userEmail:
json_body["userEmail"] = userEmail
logger.info(json_body)
Expand All @@ -81,6 +83,7 @@ async def workflow_invoke(
async def stream_workflow(
api_url,
api_key,
workflow_id,
msg_data,
agent,
thread_id,
Expand All @@ -90,13 +93,15 @@ async def stream_workflow(
workflow_bot=None,
user_email=None,
msg_limit=0,
single_bot=False
):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
json = {"input": msg_data, "sessionId": thread_id,
"enableStreaming": True, "stream_token": True}
api_path = f"{api_url}/api/v1/workflows/{workflow_id}/invoke"
if user_email:
json["userEmail"] = user_email
logger.info(f"stream json : {json}")
Expand All @@ -105,7 +110,7 @@ async def stream_workflow(
lines = 0
prev_event = list(agent.keys())[0]
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=json) as response:
async with session.post(api_path, headers=headers, json=json) as response:
response.raise_for_status()
async for line in response.content:
data = line.decode('utf-8')
Expand All @@ -122,7 +127,10 @@ async def stream_workflow(
prev_data += data
lines += 1
if access_token is None:
data = await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit)
if single_bot:
data = await send_agent_message(workflow_id, thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit)
else:
data = await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit)
event_id, access_token = data
elif lines % 5 == 0:
await edit_message(event_id, access_token, prev_data, room_id, workflow_bot, msg_limit, thread_id)
Expand Down

0 comments on commit 94d941d

Please sign in to comment.