Skip to content

Commit

Permalink
send message limit in message json
Browse files Browse the repository at this point in the history
  • Loading branch information
f1delius committed Apr 29, 2024
1 parent 55e1ac4 commit a06e795
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
14 changes: 9 additions & 5 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,36 @@
import markdown


async def send_message_as_tool(tool_id, tool_input, room_id, event_id, thread=None):
async def send_message_as_tool(tool_id, tool_input, room_id, event_id, thread=None, workflow_bot=None, msg_limit=0):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://bots.pixx.co/agents/{tool_id}") as result:
data = await result.json()
if not data:
return None
else:
access_token = data['access_token']
msg = {
content = {
"body": tool_input,
"msgtype": "m.text",
"format": "org.matrix.custom.html",
"formatted_body" : markdown.markdown(
tool_input,
extensions=["nl2br", "tables", "fenced_code"]
)
),
"message_limit" : {
"workflow_bot" : workflow_bot,
"limit" : msg_limit,
}

}
if thread is None:
thread = {
'm.in_reply_to': {'event_id': event_id}
}
msg["m.relates_to"] = thread
content["m.relates_to"] = thread
client = ClientAPI(base_url="https://matrix.pixx.co",
token=access_token)
await client.send_message(room_id, msg)
await client.send_message(room_id, content)


async def invite_bot_to_room(tool_id, session):
Expand Down
12 changes: 7 additions & 5 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No
reply_message="10 Messages Limit Exceeded!",
sender_id=sender_id,
user_message=raw_user_message,
reply_to_event_id=reply_to_event_id
reply_to_event_id=reply_to_event_id,
msg_limit=self.msg_limit[sender_id],
)
return
# remove newline character from event.body
Expand All @@ -194,8 +195,8 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No
if self.workflow:
get_steps = await workflow_steps(self.superagent_url, self.workflow_id, self.api_key, self.httpx_client)
api_url = f"{self.superagent_url}/api/v1/workflows/{self.workflow_id}/invoke"
await stream_json_response_with_auth(api_url, self.api_key, content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client)
self.msg_limit[sender_id] += len(get_steps)
await stream_json_response_with_auth(api_url, self.api_key, content_body, get_steps, thread_event_id, reply_to_event_id, room_id, self.httpx_client, self.user_id, self.msg_limit[sender_id])
return
result = await superagent_invoke(self.superagent_url, self.agent_id, content_body, self.api_key, self.httpx_client, thread_event_id)
if result[1] != []:
Expand All @@ -216,17 +217,18 @@ async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> No
'm.in_reply_to': {'event_id': reply_to_event_id}
}
self.msg_limit[sender_id] += 1
await send_message_as_tool(tool_id, tool_input, room_id, reply_to_event_id, thread=thread)
await send_message_as_tool(tool_id, tool_input, room_id, reply_to_event_id, thread, self.user_id, self.msg_limit[sender_id])
self.msg_limit[sender_id] += 1
await send_room_message(
self.client,
room_id,
reply_message=result[0],
sender_id=sender_id,
user_message=raw_user_message,
reply_to_event_id=reply_to_event_id,
thread_id=thread_id
thread_id=thread_id,
msg_limit=self.msg_limit[sender_id],
)
self.msg_limit[sender_id] += 1
except Exception as e:
logger.error(e)

Expand Down
3 changes: 3 additions & 0 deletions src/send_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async def send_room_message(
user_message: str = "",
reply_to_event_id: str = "",
thread_id = None,
msg_limit=0,
) -> None:
if reply_to_event_id == "":
content = {
Expand All @@ -23,6 +24,7 @@ async def send_room_message(
reply_message,
extensions=["nl2br", "tables", "fenced_code"],
),
"message_limit" : msg_limit,
}
else:
body = "> <" + sender_id + "> " + user_message + "\n\n" + reply_message
Expand Down Expand Up @@ -51,6 +53,7 @@ async def send_room_message(
"format": format,
"formatted_body": formatted_body,
"m.relates_to": {"m.in_reply_to": {"event_id": reply_to_event_id}},
"message_limit" : msg_limit,
}
if thread_id is not None:
thread_event_id = thread_id
Expand Down
29 changes: 20 additions & 9 deletions src/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@ async def workflow_steps(superagent_url: str, workflow_id: str, api_key: str, se
return response.json()


async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thread_id, reply_id, room_id, session: httpx.AsyncClient):
async def stream_json_response_with_auth(
api_url,
api_key,
msg_data,
agent,
thread_id,
reply_id,
room_id,
session: httpx.AsyncClient,
workflow_bot=None,
msg_limit=0,
):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
Expand All @@ -39,23 +50,23 @@ async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thre
async with session.post(api_url, headers=headers, json=json) as response:
response.raise_for_status()
async for line in response.content:
# Split the line into event and data parts
# Split the line into event and data parts
if line.startswith(b'id:'):
event = line.decode('utf-8').split(':', 1)[1].strip()
if prev_event is not None and event != prev_event:
# Print the complete message for the previous event
prev_data = prev_data.replace("````","`\n```")
prev_data = prev_data.replace("````", "`\n```")
prev_data = prev_data.replace("```", "\n```")
logging.info(
f'Event: {prev_event}, Data: {prev_data}')
await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id)
await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit)
# Reset the previous data
prev_data = ''
# Get the next line which contains data
if line.startswith(b'data:'):
event_data = line[6:-1]
if event_data == b'':
prev_data += '\n'
prev_data += '\n'
elif event_data.isspace():
prev_data += '\n' + event_data.decode('utf-8')[1:]
else:
Expand All @@ -66,19 +77,19 @@ async def stream_json_response_with_auth(api_url, api_key, msg_data, agent, thre

# Print the complete message for the last event
if prev_event is not None:
prev_data = prev_data.replace("````","`\n```")
prev_data = prev_data.replace("````", "`\n```")
prev_data = prev_data.replace("```", "\n```")
logging.info(f'Event: {prev_event}, Data: {prev_data}')
await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id)
await send_agent_message(agent[prev_event], thread_id, reply_id, prev_data, room_id, workflow_bot, msg_limit)
else:
logging.info('Failed to fetch streaming data')


async def send_agent_message(agent, thread_event_id, reply_id, data, room_id):
async def send_agent_message(agent, thread_event_id, reply_id, data, room_id, workflow_bot=None, msg_limit=0):
thread = {
'rel_type': 'm.thread',
'event_id': thread_event_id,
'is_falling_back': True,
'm.in_reply_to': {'event_id': reply_id}
}
await send_message_as_tool(agent, data, room_id, reply_id, thread)
await send_message_as_tool(agent, data, room_id, reply_id, thread, workflow_bot, msg_limit)

0 comments on commit a06e795

Please sign in to comment.