-
Notifications
You must be signed in to change notification settings - Fork 1
/
bot.py
185 lines (161 loc) · 8.16 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import asyncio
import json
import re
from datetime import datetime
from typing import Type, Dict
from mautrix.client import Client
from mautrix.types import Format, TextMessageEventContent, EventType, RelationType
from maubot import Plugin, MessageEvent
from maubot.handlers import command, event
import aiohttp
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
# Config class to manage configuration
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("gpt_api_key")
helper.copy("api_endpoint")
helper.copy("model")
helper.copy("max_tokens")
helper.copy("temperature")
helper.copy("discourse_api_key")
helper.copy("discourse_api_username")
helper.copy("discourse_base_url")
helper.copy("unsorted_category_id")
helper.copy("matrix_to_discourse_topic")
class MatrixToDiscourseBot(Plugin):
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.log.info("MatrixToDiscourseBot started")
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
@command.new(name="fpost", require_subcommand=False)
@command.argument("title", pass_raw=True, required=False)
async def post_to_discourse(self, evt: MessageEvent, title: str = None) -> None:
self.log.info("Command !fpost triggered.")
await evt.reply("Creating a Forum post...")
try:
self.log.info(f"Received event body: {evt.content.body}")
# Check if the message is a reply to another message
if not evt.content.get_reply_to():
await evt.reply("You must reply to a message to use this command.")
return
# Extract the body of the replied-to message
replied_event = await evt.client.get_event(evt.room_id, evt.content.get_reply_to())
message_body = replied_event.content.body
self.log.info(f"Message body: {message_body}")
# Use provided title or generate one using OpenAI
if not title:
title = await self.generate_title(message_body)
if not title:
title = "Default Title" # Fallback title if generation fails
self.log.info(f"Generated Title: {title}")
# Get the topic ID based on the room ID
topic_id = self.config["matrix_to_discourse_topic"].get(evt.room_id, self.config["unsorted_category_id"])
post_url, error = await self.create_post(
self.config["discourse_base_url"],
topic_id,
title,
message_body
)
if post_url:
await evt.reply(f"Post created successfully! URL: {post_url}")
else:
await evt.reply(f"Failed to create post: {error}")
except Exception as e:
self.log.error(f"Error processing !fpost command: {e}")
await evt.reply(f"An error occurred: {e}")
async def generate_title(self, message_body: str) -> str:
prompt = f"Create a brief (3-10 word) attention grabbing title for the following post on the community forum: {message_body}"
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.config['gpt_api_key']}"
}
data = {
"model": self.config["model"],
"messages": [{"role": "user", "content": prompt}],
"max_tokens": self.config["max_tokens"],
"temperature": self.config["temperature"],
}
async with aiohttp.ClientSession() as session:
async with session.post(self.config["api_endpoint"], headers=headers, json=data) as response:
response_json = await response.json()
if response.status == 200:
return response_json["choices"][0]["message"]["content"].strip()
else:
self.log.error(f"OpenAI API error: {response.status} {response_json}")
return None
except Exception as e:
self.log.error(f"Error generating title: {e}")
return None
async def create_post(self, base_url, category_id, title, message_body):
url = f"{base_url}/posts.json"
headers = {
"Content-Type": "application/json",
"Api-Key": self.config["discourse_api_key"],
"Api-Username": self.config["discourse_api_username"]
}
payload = {
"title": title,
"raw": message_body,
"category": category_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
response_text = await response.text()
if response.status == 200:
data = await response.json()
topic_id = data.get("topic_id")
topic_slug = data.get("topic_slug")
post_url = f"{base_url}/t/{topic_slug}/{topic_id}" if topic_id and topic_slug else "URL not available"
return post_url, None
else:
return None, f"Failed to create post: {response.status}\nResponse: {response_text}"
@command.new(name="fsearch", require_subcommand=False)
@command.argument("query", pass_raw=True, required=True)
async def search_discourse(self, evt: MessageEvent, query: str) -> None:
self.log.info("Command !fsearch triggered.")
await evt.reply("Searching the forum...")
try:
search_url = f"{self.config['discourse_base_url']}/search.json"
headers = {
"Content-Type": "application/json",
"Api-Key": self.config["discourse_api_key"],
"Api-Username": self.config["discourse_api_username"]
}
params = {"q": query}
async with aiohttp.ClientSession() as session:
async with session.get(search_url, headers=headers, params=params) as response:
response_json = await response.json()
if response.status == 200:
search_results = response_json.get("topics", [])
# Safely get keys with default values
for result in search_results:
result['views'] = result.get('views', 0)
result['created_at'] = result.get('created_at', '1970-01-01T00:00:00Z')
# Sort search results by created_at for most recent and by views for most seen
sorted_by_recent = sorted(search_results, key=lambda x: x['created_at'], reverse=True)
sorted_by_views = sorted(search_results, key=lambda x: x['views'], reverse=True)
# Select top 2 most recent and top 3 most seen
top_recent = sorted_by_recent[:2]
top_seen = sorted_by_views[:3]
def format_results(results):
return "\n".join([f"* [{result['title']}]({self.config['discourse_base_url']}/t/{result['slug']}/{result['id']})" for result in results])
result_msg = (
"**Top 2 Most Recent:**\n" +
format_results(top_recent) +
"\n\n**Top 3 Most Seen:**\n" +
format_results(top_seen)
)
if search_results:
await evt.reply(f"Search results:\n{result_msg}")
else:
await evt.reply("No results found.")
else:
self.log.error(f"Discourse API error: {response.status} {response_json}")
await evt.reply("Failed to perform search.")
except Exception as e:
self.log.error(f"Error processing !fsearch command: {e}")
await evt.reply(f"An error occurred: {e}")