From a06e7951f54eceac8b4438d8b75d3a1f1cb60997 Mon Sep 17 00:00:00 2001 From: f1delius Date: Mon, 29 Apr 2024 19:55:09 +0530 Subject: [PATCH] send message limit in message json --- src/api.py | 14 +++++++++----- src/bot.py | 12 +++++++----- src/send_message.py | 3 +++ src/workflow.py | 29 ++++++++++++++++++++--------- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/api.py b/src/api.py index 2333248..590c29e 100644 --- a/src/api.py +++ b/src/api.py @@ -4,7 +4,7 @@ import markdown -async def send_message_as_tool(tool_id, tool_input, room_id, event_id, thread=None): +async def send_message_as_tool(tool_id, tool_input, room_id, event_id, thread=None, workflow_bot=None, msg_limit=0): async with aiohttp.ClientSession() as session: async with session.get(f"https://bots.pixx.co/agents/{tool_id}") as result: data = await result.json() @@ -12,24 +12,28 @@ async def send_message_as_tool(tool_id, tool_input, room_id, event_id, thread=No return None else: access_token = data['access_token'] - msg = { + content = { "body": tool_input, "msgtype": "m.text", "format": "org.matrix.custom.html", "formatted_body" : markdown.markdown( tool_input, extensions=["nl2br", "tables", "fenced_code"] - ) + ), + "message_limit" : { + "workflow_bot" : workflow_bot, + "limit" : msg_limit, + } } if thread is None: thread = { 'm.in_reply_to': {'event_id': event_id} } - msg["m.relates_to"] = thread + content["m.relates_to"] = thread client = ClientAPI(base_url="https://matrix.pixx.co", token=access_token) - await client.send_message(room_id, msg) + await client.send_message(room_id, content) async def invite_bot_to_room(tool_id, session): diff --git a/src/bot.py b/src/bot.py index 6049160..224553b 100644 --- a/src/bot.py +++ b/src/bot.py @@ -183,7 +183,8 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No reply_message="10 Messages Limit Exceeded!", sender_id=sender_id, user_message=raw_user_message, - reply_to_event_id=reply_to_event_id + reply_to_event_id=reply_to_event_id, + msg_limit=self.msg_limit[sender_id], ) return # remove newline character from event.body @@ -194,8 +195,8 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No if self.workflow: get_steps = await workflow_steps(self.superagent_url, self.workflow_id, self.api_key, self.httpx_client) api_url = f"{self.superagent_url}/api/v1/workflows/{self.workflow_id}/invoke" - await stream_json_response_with_auth(api_url, self.api_key, content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client) self.msg_limit[sender_id] += len(get_steps) + await stream_json_response_with_auth(api_url, self.api_key, content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client, self.user_id, self.msg_limit[sender_id]) return result = await superagent_invoke(self.superagent_url, self.agent_id, content_body, self.api_key, self.httpx_client, thread_event_id) if result[1] != []: @@ -216,7 +217,8 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No 'm.in_reply_to': {'event_id': reply_to_event_id} } self.msg_limit[sender_id] += 1 - await send_message_as_tool(tool_id, tool_input, room_id, reply_to_event_id, thread=thread) + await send_message_as_tool(tool_id, tool_input, room_id, reply_to_event_id, thread, self.user_id, self.msg_limit[sender_id]) + self.msg_limit[sender_id] += 1 await send_room_message( self.client, room_id, @@ -224,9 +226,9 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No sender_id=sender_id, user_message=raw_user_message, reply_to_event_id=reply_to_event_id, - thread_id=thread_id + thread_id=thread_id, + msg_limit=self.msg_limit[sender_id], ) - self.msg_limit[sender_id] += 1 except Exception as e: logger.error(e) diff --git a/src/send_message.py b/src/send_message.py index b3aeefd..ef04c3e 100644 --- a/src/send_message.py +++ b/src/send_message.py @@ -13,6 +13,7 @@ async def send_room_message( user_message: str = "", reply_to_event_id: str = "", thread_id = None, + msg_limit=0, ) -> None: if reply_to_event_id == "": content = { @@ -23,6 +24,7 @@ async def send_room_message( reply_message, extensions=["nl2br", "tables", "fenced_code"], ), + "message_limit" : msg_limit, } else: body = "> <" + sender_id + "> " + user_message + "\n\n" + reply_message @@ -51,6 +53,7 @@ async def send_room_message( "format": format, "formatted_body": formatted_body, "m.relates_to": {"m.in_reply_to": {"event_id": reply_to_event_id}}, + "message_limit" : msg_limit, } if thread_id is not None: thread_event_id = thread_id diff --git a/src/workflow.py b/src/workflow.py index 215f98f..b4a9c6e 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -27,7 +27,18 @@ async def workflow_steps(superagent_url: str, workflow_id: str, api_key: str, se return response.json() -async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thread_id, reply_id, room_id, session: httpx.AsyncClient): +async def stream_json_response_with_auth( + api_url, + api_key, + msg_data, + agent, + thread_id, + reply_id, + room_id, + session: httpx.AsyncClient, + workflow_bot=None, + msg_limit=0, +): headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' @@ -39,23 +50,23 @@ async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thre async with session.post(api_url, headers=headers, json=json) as response: response.raise_for_status() async for line in response.content: - # Split the line into event and data parts + # Split the line into event and data parts if line.startswith(b'id:'): event = line.decode('utf-8').split(':', 1)[1].strip() if prev_event is not None and event != prev_event: # Print the complete message for the previous event - prev_data = prev_data.replace("````","`\n```") + prev_data = prev_data.replace("````", "`\n```") prev_data = prev_data.replace("```", "\n```") logging.info( f'Event: {prev_event}, Data: {prev_data}') - await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id) + await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit) # Reset the previous data prev_data = '' # Get the next line which contains data if line.startswith(b'data:'): event_data = line[6:-1] if event_data == b'': - prev_data += '\n' + prev_data += '\n' elif event_data.isspace(): prev_data += '\n' + event_data.decode('utf-8')[1:] else: @@ -66,19 +77,19 @@ async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thre # Print the complete message for the last event if prev_event is not None: - prev_data = prev_data.replace("````","`\n```") + prev_data = prev_data.replace("````", "`\n```") prev_data = prev_data.replace("```", "\n```") logging.info(f'Event: {prev_event}, Data: {prev_data}') - await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id) + await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit) else: logging.info('Failed to fetch streaming data') -async def send_agent_message(agent, thread_event_id, reply_id, data, room_id): +async def send_agent_message(agent, thread_event_id, reply_id, data, room_id, workflow_bot=None, msg_limit=0): thread = { 'rel_type': 'm.thread', 'event_id': thread_event_id, 'is_falling_back': True, 'm.in_reply_to': {'event_id': reply_id} } - await send_message_as_tool(agent, data, room_id, reply_id, thread) + await send_message_as_tool(agent, data, room_id, reply_id, thread, workflow_bot, msg_limit)