-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
742 lines (544 loc) · 21.4 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
import asyncio
import importlib
import inspect
import pkgutil
import time
import traceback
from collections import Counter
from io import BytesIO
from typing import TYPE_CHECKING, Union
import aiohttp
import discord
from discord import InteractionType
from discord.ext import bridge, commands
import cogs
import config
from data.constants import (
DONATION_LINK,
ERROR_CLR,
GITHUB_LINK,
LB_DISPLAY_AMT,
LB_LENGTH,
PERMISSONS,
PRIMARY_CLR,
PRIVACY_POLICY_LINK,
RULES_LINK,
SUPPORT_SERVER_INVITE,
TEST_EXPIRE_TIME,
TEST_ZONES,
)
from helpers.errors import OnGoingTest
from helpers.ui import BaseView, CustomEmbed, create_link_view, get_log_embed
from helpers.utils import get_hint, mention_command_from_name, message_banned_user
if TYPE_CHECKING:
from cogs.utils.logging import Logging
from cogs.utils.mongo import Mongo, User
from cogs.utils.redis import Redis
class LBCategory:
def __init__(self, parent_index, index, bot, name, unit, get_stat):
self.parent_index = parent_index
self.index = index
self.bot = bot
self.name = name
self.unit = unit
self.get_stat = get_stat
@property
def lb_key(self):
return f"lb.{self.parent_index}.{self.index}"
async def get_placing(self, user_id):
placing = await self.bot.redis.zrevrank(self.lb_key, user_id)
if placing is None or placing > LB_LENGTH:
return None
return placing
async def remove_user(self, user_id):
return await self.bot.redis.zrem(self.lb_key, user_id)
def get_initial_value(self, ctx: "Context"):
return ctx.initial_values[self.parent_index][self.index]
async def get_lb_data(self, end=LB_DISPLAY_AMT):
raw_data = await self.bot.redis.zrevrange(self.lb_key, 0, end, withscores=True)
return {int(u.decode()): v for u, v in raw_data}
async def get_lb_values_from_score(self, min, max):
raw_data = await self.bot.redis.zrevrangebyscore(
self.lb_key, min=min, max=max, withscores=True
)
_, values = zip(*raw_data)
return list(values)
@classmethod
def new(cls, *args, **kwargs):
def do_it(parent_index, index):
return cls(parent_index, index, *args, **kwargs)
return do_it
class Leaderboard:
def __init__(
self,
index: int,
*,
title: str,
emoji: str,
default: int,
stats: list[LBCategory],
check=None,
priority=0,
):
self.index = index
# Meta data
self.title = title
self.emoji = emoji
self.stats = self.initialize_stats(stats)
if len(self.stats) <= default:
raise Exception("Default out of range")
# Default stat index
self.default = default
self._check = check
self.priority = priority
def initialize_stats(self, stats):
return [s(self.index, i) for i, s in enumerate(stats)]
@property
def desc(self):
return ", ".join(s.name for s in self.stats)
async def check(self, ctx: "Context"):
if self._check is None:
return True
return await self._check(ctx)
@classmethod
def new(cls, *args, **kwargs):
return lambda index: cls(index, *args, **kwargs)
class HSLeaderboard(Leaderboard):
@property
def desc(self):
return super().desc + " Test"
def unqualify(name):
return name.rsplit(".", maxsplit=1)[-1]
# https://github.com/python-discord/bot/blob/main/bot/utils/extensions.py
def get_exts():
for module in pkgutil.walk_packages(cogs.__path__, f"{cogs.__name__}."):
# Not loading modules that start with underscore
if unqualify(module.name).startswith("_"):
continue
imported = importlib.import_module(module.name)
# Checking for setup function to determine if it is an extension
if not inspect.isfunction(getattr(imported, "setup", None)):
continue
yield module.name
class WelcomeView(BaseView):
def __init__(self, ctx: "Context", callback, response):
super().__init__(ctx, timeout=120)
self.callback = callback
self.response = response
@discord.ui.button(label="Accept", style=discord.ButtonStyle.primary)
async def accept(self, button, interaction):
user = await self.ctx.bot.mongo.fetch_user(interaction.user, create=True)
embed = self.ctx.default_embed(
title="Rules Accepted",
description="Here are some important commands to get you started!",
)
if self.ctx.is_slash:
test_cmd = mention_command_from_name(
ctx=self.ctx, group="tt", name="dictionary"
)
else:
test_cmd = f"`{self.ctx.prefix}tt`"
if self.ctx.is_slash:
race_cmd = mention_command_from_name(
ctx=self.ctx, group="race", name="dictionary"
)
else:
race_cmd = f"`{self.ctx.prefix}race`"
help_cmd = mention_command_from_name(ctx=self.ctx, name="help")
profile_cmd = mention_command_from_name(ctx=self.ctx, name="profile")
season_cmd = mention_command_from_name(ctx=self.ctx, name="season")
achievements_cmd = mention_command_from_name(ctx=self.ctx, name="achievements")
challenges_cmd = mention_command_from_name(ctx=self.ctx, name="challenges")
embed.add_field(
name="Start Typing",
value=(
f"{test_cmd} - start your first typing test.\n"
f"{race_cmd} - create a multiplayer typing test."
),
inline=False,
)
embed.add_field(
name="Core Commands",
value=(
f"{profile_cmd} - view your profile.\n"
f"{season_cmd} - learn about the monthly season.\n"
f"{achievements_cmd} - view your achievements.\n"
f"{challenges_cmd} - see your progress on the daily challenge."
),
inline=False,
)
embed.add_field(
name="Confused?",
value=f"Type {help_cmd} for a list of commands",
inline=False,
)
embed.add_field(
name="Got any questions?",
value="Join our community server below!",
inline=False,
)
embed.set_thumbnail(url="https://i.imgur.com/MF0xiLu.png")
view = create_link_view(
{
"Our Github": GITHUB_LINK,
"Community Server": SUPPORT_SERVER_INVITE,
"Donate": DONATION_LINK,
}
)
self.ctx.initial_user = user
self.ctx.add_leaderboard_values()
if self.response:
await interaction.response.edit_message(embed=embed, view=view)
else:
await self.ctx.edit(embed=embed, view=view)
await self.ctx.bot.handle_after_welcome_check(self.ctx)
if self.callback is not None:
if self.response:
await self.callback()
else:
await self.callback(interaction)
self.stop()
async def start(self):
embed = self.ctx.default_embed(
title="Welcome to wordPractice!",
description=(
"I'm the most feature dense typing test Discord Bot. I allow\n"
"you to practice your typing skills while having fun!\n\n"
"**Rules and Privacy Policy**\n"
"Please take a second to read our privacy policy and rules\n"
"below."
),
)
embed.set_thumbnail(url="https://i.imgur.com/2vUD4NF.png")
# Adding the links
item = discord.ui.Button(label="Privacy Policy", url=PRIVACY_POLICY_LINK)
self.add_item(item)
item = discord.ui.Button(label="Rules", url=RULES_LINK)
self.add_item(item)
await self.ctx.respond(embed=embed, view=self, ephemeral=True)
class CustomContextItems:
def __init__(self, bot: "WordPractice" = None):
self.bot = bot
# Initial stats
self.initial_user: "User" = None
self.initial_values = []
self.achievements_completed = [] # list of additional achievements completed
self.no_completion = False
self.other_author = None
# Hint is chosen when defining context to ensure a consistent hint throughout each response
self._hint = get_hint()
@property
def hint(self):
return self._hint.format(self.prefix)
@property
def theme(self):
return (
int(self.initial_user.theme[1].replace("#", "0x"), 16)
if self.initial_user
else None
)
@property
def error_embed(self):
return self.bot.error_embed
@property
def default_embed(self):
return self.bot.default_embed
@property
def custom_embed(self):
return self.bot.custom_embed
def embed(self, **kwargs):
color = kwargs.pop("color", self.theme or PRIMARY_CLR)
return CustomEmbed(self.bot, color=color, hint=self.hint, **kwargs)
def add_leaderboard_values(self):
self.initial_values = self.bot.get_leaderboard_values(self.initial_user)
async def add_initial_stats(self, user):
# Getting the initial user
self.initial_user = await self.bot.mongo.fetch_user(user)
if self.initial_user is None:
return
# Getting the initial placing for each category
self.add_leaderboard_values()
class CustomAppContext(bridge.BridgeApplicationContext, CustomContextItems):
def __init__(self, *args, **kwargs):
bridge.BridgeApplicationContext.__init__(self, *args, **kwargs)
CustomContextItems.__init__(self, self.bot)
self.prefix = "/"
self.is_slash = True
@property
def user(self):
return self.other_author or self.interaction.user
author = user
class CustomPrefixContext(bridge.BridgeExtContext, CustomContextItems):
def __init__(self, *args, **kwargs):
bridge.BridgeExtContext.__init__(self, *args, **kwargs)
CustomContextItems.__init__(self, self.bot)
self.is_slash = False
@property
def author(self):
return self.other_author or self.message.author
async def _respond(self, *args, **kwargs):
kwargs.pop("ephemeral", None)
kwargs = kwargs | {"mention_author": False}
return await super()._respond(*args, **kwargs)
Context = Union[CustomPrefixContext, CustomAppContext]
class WordPractice(bridge.AutoShardedBot):
def __init__(self, **kwargs):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
super().__init__(**kwargs, loop=self.loop)
self.add_check(
commands.bot_has_permissions(
read_messages=True,
send_messages=True,
embed_links=True,
attach_files=True,
read_message_history=True,
external_emojis=True,
).predicate
)
name = " your wpm \N{EYES} | %help"
self.activity = discord.Activity(type=discord.ActivityType.watching, name=name)
self.session = aiohttp.ClientSession(loop=self.loop)
self.cooldowns = {}
# Cache
self.cmds_run = {} # user_id: set{cmds}
self.avg_perc = [] # [wpm (33% 66%), raw, acc]
# Not using MaxConcurrency because it's based on context so it doesn't work with users who join race
self.active_tests = {} # user_id: timestamp
self.spam_control = commands.CooldownMapping.from_cooldown(
6, 8, commands.BucketType.user # rate, per
)
self.spam_count = Counter()
# Leaderboards
def get_hs(s):
return lambda u: u.highspeed[s].wpm
async def season_check(ctx: Context):
season_data = await ctx.bot.mongo.get_season_info()
return season_data is not None and season_data["enabled"]
self.lbs = [
Leaderboard.new(
title="All Time",
emoji="\N{EARTH GLOBE AMERICAS}",
stats=[LBCategory.new(self, "Words Typed", "words", lambda u: u.words)],
default=0,
priority=1,
),
Leaderboard.new(
title="Monthly Season",
emoji="\N{SPORTS MEDAL}",
stats=[LBCategory.new(self, "Experience", "xp", lambda u: u.xp)],
default=0,
check=season_check,
priority=2,
),
Leaderboard.new(
title="24 Hour",
emoji="\N{CLOCK FACE ONE OCLOCK}",
stats=[
LBCategory.new(self, "Experience", "xp", lambda u: sum(u.xp_24h)),
LBCategory.new(
self, "Words Typed", "words", lambda u: sum(u.words_24h)
),
],
default=0,
),
Leaderboard.new(
title="High Score",
emoji="\N{RUNNER}",
stats=[
LBCategory.new(self, s.capitalize(), "wpm", get_hs(s))
for s in TEST_ZONES.keys()
],
default=1,
),
]
self.initialize_lbs()
self.start_time = time.time()
self.load_exts()
def initialize_lbs(self):
for i, lb in enumerate(self.lbs):
self.lbs[i] = lb(i)
def get_leaderboard_values(self, user):
values = []
for lb in self.lbs:
category = []
for stat in lb.stats:
category.append(stat.get_stat(user))
values.append(category)
return values
def active_start(self, user_id: int):
timestamp = self.active_tests.get(user_id, None)
# Checking if the user is in an active test and it isn't expired
if timestamp is not None and time.time() - timestamp < TEST_EXPIRE_TIME + 1:
raise OnGoingTest()
self.active_tests[user_id] = time.time()
def active_end(self, user_id: int):
if user_id in self.active_tests:
del self.active_tests[user_id]
async def handle_after_welcome_check(self, ctx: Context):
# Checking if the user is banned
if ctx.initial_user.banned:
embed = ctx.error_embed(
title="You are banned",
description="Join the support server and create a ticket to request a ban appeal",
)
view = create_link_view({"Support Server": SUPPORT_SERVER_INVITE})
await ctx.respond(embed=embed, view=view, ephemeral=True)
return True
return False
@property
def mongo(self) -> "Mongo":
return self.get_cog("Mongo")
@property
def redis(self) -> "Redis":
return self.get_cog("Redis").pool
@property
def log(self) -> "Logging":
return self.get_cog("Logging").log
def error_embed(self, **kwargs):
color = kwargs.pop("color", ERROR_CLR)
return CustomEmbed(self, color=color, add_footer=False, **kwargs)
def default_embed(self, **kwargs):
color = kwargs.pop("color", PRIMARY_CLR)
return CustomEmbed(self, color=color, add_footer=False, **kwargs)
def custom_embed(self, **kwargs):
return CustomEmbed(self, **kwargs)
async def on_shard_ready(self, shard_id):
self.log.info(f"Shard {shard_id} ready")
async def get_application_context(self, interaction, cls=CustomAppContext):
ctx = await super().get_application_context(interaction, cls)
if ctx.initial_user is None:
await ctx.add_initial_stats(interaction.user)
return ctx
async def get_context(self, message, *, cls=CustomPrefixContext):
ctx = await super().get_context(message, cls=cls)
if ctx is None:
return
if ctx.command is None:
return ctx
if ctx.initial_user is None:
await ctx.add_initial_stats(message.author)
return ctx
def load_exts(self):
# Finding files in cogs folder that end with .py
for ext in get_exts():
# Loading the extension
try:
self.load_extension(ext)
except Exception:
self.log.warning(f"Failed to load extension: {ext}")
traceback.print_exc()
def create_invite_link(self):
return discord.utils.oauth_url(
client_id=self.user.id,
permissions=discord.Permissions(permissions=PERMISSONS),
redirect_uri=SUPPORT_SERVER_INVITE,
scopes=("bot", "applications.commands"),
)
async def handle_ongoing_test_error(self, send):
await send(
"You are currently in another test, please finish it before starting a new one!",
ephemeral=True,
)
async def log_the_error(self, embed, error):
msg = "".join(
traceback.format_exception(type(error), error, error.__traceback__)
)
buffer = BytesIO(msg.encode("utf-8"))
file = discord.File(buffer, filename="text.txt")
await self.error_wh.send(embed=embed, file=file)
self.log.warning(msg)
async def handle_new_user(self, ctx: Context, callback=None, response=True):
view = WelcomeView(ctx, callback, response)
await view.start()
async def check_dm_cmd(self, ctx: Context):
if ctx.guild is None:
view = create_link_view({"Invite Bot": self.create_invite_link()})
await ctx.respond(
"Sorry, commands can only be run in servers!", view=view, ephemeral=True
)
return True
return False
async def on_interaction(self, interaction):
if interaction.type is InteractionType.application_command:
ctx = await self.get_application_context(interaction)
if await self.check_dm_cmd(ctx):
return
# Asking the user to accept the rules before using the bot
if ctx.initial_user is None:
async def callback():
await self.process_application_commands(interaction)
return await self.handle_new_user(ctx, callback=callback)
if await self.handle_after_welcome_check(ctx):
return
# Processing command
await self.process_application_commands(interaction)
async def process_commands(self, message):
if message.author.bot:
return
ctx = await self.get_context(message)
if ctx is None:
return
if ctx.command is not None:
if await self.check_dm_cmd(ctx):
return
# Asking the user to accept the rules before using the bot
if ctx.initial_user is None:
async def callback():
await self.invoke(ctx)
return await self.handle_new_user(ctx, callback=callback)
if await self.handle_after_welcome_check(ctx):
return
# Spam control
# https://github.com/Rapptz/RoboDanny/blob/rewrite/bot.py
bucket = self.spam_control.get_bucket(message)
current = message.created_at.replace().timestamp()
retry_after = bucket.update_rate_limit(current)
author_id = message.author.id
if retry_after and author_id != self.owner_id:
self.spam_count[author_id] += 1
if (amt := self.spam_count[author_id]) >= 3:
del self.spam_count[author_id]
reason = "Spamming commands"
# Banning the user
user_data = await ctx.bot.mongo.add_inf(
ctx, ctx.author, ctx.initial_user, reason
)
# Updating the user's data
await self.mongo.replace_user_data(user_data)
await message_banned_user(ctx, ctx.author, reason)
else:
# Flagging the user for spamming commands
embed = get_log_embed(
ctx,
title="User Spamming Commands",
additional=f"**Times Flagged:** {amt}",
error=True,
)
await self.impt_wh.send(embed=embed)
else:
self.spam_count.pop(author_id, None)
await self.invoke(ctx)
@discord.utils.cached_property
def cmd_wh(self):
return discord.Webhook.from_url(config.COMMAND_LOG, session=self.session)
@discord.utils.cached_property
def test_wh(self):
return discord.Webhook.from_url(config.TEST_LOG, session=self.session)
@discord.utils.cached_property
def impt_wh(self):
return discord.Webhook.from_url(config.IMPORTANT_LOG, session=self.session)
@discord.utils.cached_property
def error_wh(self):
return discord.Webhook.from_url(config.ERROR_LOG, session=self.session)
@discord.utils.cached_property
def guild_wh(self):
return discord.Webhook.from_url(config.GUILD_LOG, session=self.session)
async def on_ready(self):
self.log.info("The bot is ready!")
async def close(self):
await super().close()
await self.session.close()
await self.redis.close()
def run(self):
super().run(config.BOT_TOKEN, reconnect=True)