diff --git a/.example.env b/.example.env index 5a3ade0..ff9595c 100644 --- a/.example.env +++ b/.example.env @@ -2,5 +2,6 @@ # BOT_TOKEN = "" # Insert the bot token you get from discord dev portal here # SAUCENAO_KEY "" # Insert saucenao key here +# YT_KEY = ""# Insert YouTube key here # Don't forget to uncomment the variables \ No newline at end of file diff --git a/bot.py b/bot.py index 3ceffe8..e424264 100644 --- a/bot.py +++ b/bot.py @@ -1,9 +1,9 @@ """The main file of the bot, basically sets up and starts the bot """ import asyncio -import datetime import logging import os import sqlite3 +from datetime import datetime import aiohttp_client_cache import dotenv @@ -12,12 +12,11 @@ import miru from lightbulb.ext import tasks +from functions.help import BotHelpCommand from functions.utils import verbose_timedelta dotenv.load_dotenv() -from functions.help import BotHelpCommand - # Setting the prefix as , for windows (where i run the test bot) # and - for others (where it's deployed :) ) @@ -28,6 +27,16 @@ def return_prefix() -> list: return ["-"] +guild_prefix_map = {980479965726404670: [","]} + + +def make_prefix(app, message: hk.Message) -> list: + try: + return guild_prefix_map[message.guild_id] + except: + return ["-"] + + # The following snippet is borrowed from: # https://github.com/Nereg/ARKMonitorBot/blob/ # 1a6cedf34d531bddf0f5b11b3238344192998997/src/main.py#L14 @@ -61,12 +70,13 @@ def setup_logging() -> None: bot = lb.BotApp( token=os.getenv("BOT_TOKEN"), intents=hk.Intents.ALL_UNPRIVILEGED - | hk.Intents.MESSAGE_CONTENT - | hk.Intents.GUILD_MEMBERS, - prefix=lb.when_mentioned_or(return_prefix()), + # | hk.Intents.ALL_PRIVILEGED, + | hk.Intents.MESSAGE_CONTENT | hk.Intents.GUILD_MEMBERS, + prefix=lb.when_mentioned_or(make_prefix), help_class=BotHelpCommand, logs="INFO", - owner_ids=[1002964172360929343, 701090852243505212], + owner_ids=[701090852243505212], + help_slash_command=True, ) miru.install(bot) @@ -90,7 +100,7 @@ async def on_starting(event: hk.StartingEvent) -> None: ], # Keep using the cached response even if this param changes timeout=3, ) - bot.d.timeup = datetime.datetime.now().astimezone() + bot.d.timeup = datetime.now().astimezone() bot.d.chapter_info = {} bot.d.update_channels = ["1127609035374461070"] bot.d.con = sqlite3.connect("akane_db.db") @@ -108,7 +118,7 @@ async def on_stopping(event: hk.StoppingEvent) -> None: await bot.rest.create_message( 1129030476695343174, - f"Bot closed with {verbose_timedelta(datetime.datetime.now().astimezone()-bot.d.timeup)} uptime", + f"Bot closed with {verbose_timedelta(datetime.now().astimezone()-bot.d.timeup)} uptime", ) @@ -122,7 +132,6 @@ async def ping(ctx: lb.Context) -> None: ctx (lb.Context): The event context (irrelevant to the user) """ await ctx.respond(f"Pong! Latency: {bot.heartbeat_latency*1000:.2f}ms") - bot.d.ncom += 1 @bot.listen(lb.CommandErrorEvent) @@ -151,7 +160,7 @@ async def on_error(event: lb.CommandErrorEvent) -> None: elif isinstance(exception, lb.CommandIsOnCooldown): await event.context.respond( f"The command is on cooldown, you can use it after {int(exception.retry_after)}s", - delete_after=int(exception.retry_after), + delete_after=min(15, int(exception.retry_after)), ) elif isinstance(exception, lb.MissingRequiredPermission): @@ -164,58 +173,33 @@ async def on_error(event: lb.CommandErrorEvent) -> None: await event.context.respond("I don't have the permissions to do this πŸ˜”") elif isinstance(exception, lb.NotEnoughArguments): - # await event.context.respond( - # ( - # f"Missing arguments, use `-help {event.context.command.name}`" - # f"for the correct invocation" - # ) - # )\ try: ctx = event.context + command = ctx.command + + if command.hidden: + return await ctx.respond("Missing arguments, initializing command help...") await asyncio.sleep(0.3) - command = ctx.command + helper = BotHelpCommand(ctx.bot) - long_help = command.get_help(ctx) - - if len(command.aliases) > 0: - aliases = f"Aliases: {', '.join(command.aliases)}\n\n" - else: - aliases = "" - - # embed = ( - - # ) - # lines = [ - # ">>> ```adoc", - # "==== Command Help ====", - # f"{command.name} - {command.description}", - # "", - # f"Usage: {prefix}{command.signature}", - # "", - # long_help if long_help else "No additional details provided.", - # "```", - # ] - await ctx.edit_last_response( - content=None, - embed=hk.Embed( - color=0x000000, - title="Command Help", - description=( - f"**{command.name}** \n" - f"{command.description} \n\n" - f"Usage: `{ctx.prefix}{command.signature}` \n\n" - f"{aliases}" - f"{long_help or ''}" - ), - ), - ) + await helper.send_command_help(ctx=ctx, command=command) except Exception as e: await event.context.respond(f"Stop, {e}") + elif isinstance(exception, lb.OnlyInGuild): + await event.context.respond("This command can't be invoked in DMs") + + elif isinstance(exception, lb.ConverterFailure): + await event.context.respond(f"The argument `{exception.raw_value}` is invalid") + + elif isinstance(exception, lb.CommandNotFound): + pass + # To move the fuzzy matching here + if __name__ == "__main__": if os.name == "nt": diff --git a/extensions/adata.py b/extensions/adata.py index c8b27a5..9bbd665 100644 --- a/extensions/adata.py +++ b/extensions/adata.py @@ -1,15 +1,23 @@ """The animanga related plugin""" -import datetime +import collections import re +import typing as t +from datetime import datetime, timedelta import hikari as hk import lightbulb as lb +import miru +from fuzzywuzzy import process from miru.ext import nav import functions.buttons as btns import functions.views as views +from functions.components import CharacterSelect, SimpleTextSelect from functions.errors import RequestsFailedError -from functions.utils import verbose_timedelta +from functions.models import ALCharacter +from functions.models import ColorPalette as colors +from functions.search_images import lookfor +from functions.utils import get_anitrendz_latest, verbose_date, verbose_timedelta al_listener = lb.Plugin( "Lookup", @@ -26,6 +34,15 @@ def parse_description(description: str) -> str: + """Parse Anilist descriptions into Discord friendly markdown + + Args: + description (str): The description to parse + + Returns: + str: The parsed description + """ + description = ( description.replace("
", "") .replace("~!", "||") @@ -109,13 +126,13 @@ async def get_imp_info(chapters): ) @lb.implements(lb.PrefixCommand, lb.SlashCommand) async def al_search(ctx: lb.Context, type: str, media: str) -> None: - """Search an anime/manga/character on AL""" + """A wrapper slash command for AL/VNDB search""" if isinstance(ctx, lb.PrefixContext): await ctx.respond( ( "Please note that the lookup prefix command is depreciated and due to be removed. " - "See the updated commands using `-help lookup`." + f"See the updated commands using `{ctx.prefix}help Lookup`." ) ) @@ -163,6 +180,13 @@ async def al_search(ctx: lb.Context, type: str, media: str) -> None: @lb.command("anime", "Search a anime", pass_options=True, aliases=["ani", "a"]) @lb.implements(lb.PrefixCommand) async def anime_search(ctx: lb.PrefixContext, query: str): + """Search an anime on AL + + Args: + ctx (lb.PrefixContext): The context + query (str): The anime to search for + """ + await _search_anime(ctx, query) @@ -171,6 +195,13 @@ async def anime_search(ctx: lb.PrefixContext, query: str): @lb.command("manga", "Search a manga", pass_options=True, aliases=["m"]) @lb.implements(lb.PrefixCommand) async def manga_search(ctx: lb.PrefixContext, query: str): + """Search a manga on AL and fetch it's preview via MD + + Args: + ctx (lb.PrefixContext): The context + query (str): The manga to search for + """ + await _search_manga(ctx, query) @@ -182,6 +213,13 @@ async def manga_search(ctx: lb.PrefixContext, query: str): @lb.command("user", "Show a user's AL and stats", pass_options=True, aliases=["u"]) @lb.implements(lb.PrefixCommand) async def user_al(ctx: lb.PrefixContext, user: str): + """Shortcut for AL username + + Args: + ctx (lb.PrefixContext): The context + query (str): The user + """ + await ctx.respond(f"https://anilist.co/user/{user}") @@ -189,11 +227,27 @@ async def user_al(ctx: lb.PrefixContext, user: str): @lb.option("query", "The novel query", modifier=lb.commands.OptionModifier.CONSUME_REST) @lb.command("novel", "Search a novel", pass_options=True, aliases=["novels", "n", "ln"]) @lb.implements(lb.PrefixCommand) -async def user_al(ctx: lb.PrefixContext, query: str): +async def ln_search(ctx: lb.PrefixContext, query: str): + """Search a (light) novel on AL + + Args: + ctx (lb.PrefixContext): The context + query (str): The novel to search for + """ + await _search_novel(ctx, query) @al_listener.command +@lb.set_help( + ( + "A simple character search from AL but with additional features.\n" + "To filter a character by series, simply addd a comma and the series name." + "\nEg. `-c Ryou, Bocchi the Rock` will almost certainly give you Ryou Yamada from the BTR " + "series. \nIf you just enter `-c ,Bocchi the Rock` you'll get a dropdown of all characters " + "from the series" + ) +) @lb.option( "query", "The character query", modifier=lb.commands.OptionModifier.CONSUME_REST ) @@ -201,8 +255,15 @@ async def user_al(ctx: lb.PrefixContext, query: str): "character", "Search a character", pass_options=True, aliases=["chara", "c"] ) @lb.implements(lb.PrefixCommand) -async def user_al(ctx: lb.PrefixContext, query: str): - await _search_character(ctx, query) +async def chara_search(ctx: lb.PrefixContext, query: str): + """Search a character on AL + + Args: + ctx (lb.PrefixContext): The context + query (str): The character to search for + """ + + await _search_characters(ctx, query) @al_listener.command @@ -212,7 +273,7 @@ async def user_al(ctx: lb.PrefixContext, query: str): choices=["airing", "upcoming", "bypopularity", "favorite"], required=False, ) -@lb.command("top", "Find top anime on MAL", pass_options=True) +@lb.command("top", "Find top anime on MAL", pass_options=True, auto_defer=True) @lb.implements(lb.PrefixCommand, lb.SlashCommand) async def topanime(ctx: lb.PrefixContext, filter: str = None): """Find the top anime on AL @@ -226,8 +287,57 @@ async def topanime(ctx: lb.PrefixContext, filter: str = None): """ num = 5 - if filter and filter in ["airing", "upcoming", "bypopularity", "favorite"]: + if filter and filter in ["upcoming", "bypopularity", "favorite"]: params = {"limit": num, "filter": filter} + elif filter in ["airing", "weekly", "week"]: + try: + pages = [ + hk.Embed( + title="Top 10 Anime of the Week: AniTrendz", color=colors.LILAC + ).set_image((await get_anitrendz_latest(ctx.bot.d.aio_session))), + hk.Embed( + title="Top 10 Anime of the Week: AnimeCorner", + color=colors.LILAC, + ).set_image( + ( + await lookfor( + "anime corner top anime of the week", + ctx.bot.d.aio_session, + num=1, + recent="w", + ) + )[0]["original"] + ), + ] + view = views.AuthorView(user_id=ctx.author.id) + view.add_item( + btns.SwapButton( + emoji1=hk.Emoji.parse("<:next:1136984292921200650>"), + emoji2=hk.Emoji.parse("<:previous:1136984315415236648>"), + original_page=pages[0], + swap_page=pages[1], + ) + ) + view.add_item(btns.KillButton()) + + choice = await ctx.respond( + embed=pages[0], + components=view, + ) + await view.start(choice) + await view.wait() + + if hasattr(view, "answer"): + pass + else: + await ctx.edit_last_response(components=[]) + + except Exception as e: + await ctx.respond(e) + + finally: + return + else: params = {"limit": num} @@ -237,7 +347,7 @@ async def topanime(ctx: lb.PrefixContext, filter: str = None): if res.ok: res = await res.json() embed = ( - hk.Embed(color=0x2E51A2) + hk.Embed(color=colors.MAL) .set_author(name="Top Anime") .set_footer( "Fetched via MyAnimeList.net", @@ -263,6 +373,13 @@ async def topanime(ctx: lb.PrefixContext, filter: str = None): @lb.command("visualnovel", "Search a vn", pass_options=True, aliases=["vn"]) @lb.implements(lb.PrefixCommand) async def vn_search(ctx: lb.PrefixContext, query: str): + """Search for a visual novel via VNDB + + Args: + ctx (lb.PrefixContext): The context + query (str): The vn to search for + """ + await _search_vn(ctx, query) @@ -273,6 +390,13 @@ async def vn_search(ctx: lb.PrefixContext, query: str): @lb.command("vntrait", "Search a vn", pass_options=True, aliases=["trait"]) @lb.implements(lb.PrefixCommand) async def vn_search(ctx: lb.PrefixContext, query: str): + """Search for a visual novel character trait via VNDB + + Args: + ctx (lb.PrefixContext): The context + query (str): The vn trait to search for + """ + await _search_vntrait(ctx, query) @@ -283,6 +407,13 @@ async def vn_search(ctx: lb.PrefixContext, query: str): @lb.command("vntag", "Search a vntag", pass_options=True, aliases=["tag"]) @lb.implements(lb.PrefixCommand) async def vn_search(ctx: lb.PrefixContext, query: str): + """Search for a visual novel tag via VNDB + + Args: + ctx (lb.PrefixContext): The context + query (str): The vn tag to search for + """ + await _search_vntag(ctx, query) @@ -293,16 +424,23 @@ async def vn_search(ctx: lb.PrefixContext, query: str): @lb.command("vnc", "Search a vn character", pass_options=True) @lb.implements(lb.PrefixCommand) async def vn_search(ctx: lb.PrefixContext, query: str): + """Search for a visual novel character via VNDB + + Args: + ctx (lb.PrefixContext): The context + query (str): The character to search for + """ + await _search_vnchara(ctx, query) @al_listener.command +@lb.add_checks(lb.dm_only | lb.nsfw_channel_only) @lb.option("code", "You know it", int) @lb.command("nh", "Search 🌚", pass_options=True, hidden=True) @lb.implements(lb.PrefixCommand) async def nhhh(ctx: lb.PrefixContext, code: int): - if not ctx.get_channel().is_nsfw: - return + """Not gonna elaborate this one""" res = await ctx.bot.d.aio_session.get( f"https://cubari.moe/read/api/nhentai/series/{code}/", timeout=3 @@ -321,18 +459,15 @@ async def nhhh(ctx: lb.PrefixContext, code: int): for i in res["chapters"]["1"]["groups"]["1"]: pages.append( hk.Embed( + color=colors.HELL, title=res["title"], url=f"https://nhentai.net/g/{res['slug']}", description=f"Author: {res['author']} | Artist: {res['artist']}", ).set_image(i) ) - navigator = views.CustomNavi( - pages=pages, buttons=buttons, timeout=180, user_id=ctx.author.id - ) - await navigator.send( - ctx.channel_id, - ) + navigator = views.AuthorNavi(pages=pages, timeout=180, user_id=ctx.author.id) + await navigator.send(ctx.channel_id) else: await ctx.respond("Didn't work") @@ -345,9 +480,10 @@ async def nhhh(ctx: lb.PrefixContext, code: int): "map", "The vnchara to search", modifier=lb.commands.OptionModifier.CONSUME_REST ) @lb.option("person", "The vnchara to search") -@lb.command("addtrait", "Search a vn character", pass_options=True) +@lb.command("addtrait", "Add an alias for a vn trait ", pass_options=True) @lb.implements(lb.PrefixCommand) async def add_trait_map(ctx: lb.PrefixContext, person: str, map: str): + """Add an alias for a vn trait (fun command)""" try: db = ctx.bot.d.con cursor = db.cursor() @@ -364,9 +500,10 @@ async def add_trait_map(ctx: lb.PrefixContext, person: str, map: str): @al_listener.command @lb.add_checks(lb.owner_only) @lb.option("person", "The vnchara to search") -@lb.command("rmtrait", "Search a vn character", pass_options=True) +@lb.command("rmtrait", "Remove an alias for a vn trait", pass_options=True) @lb.implements(lb.PrefixCommand) async def remove_trait_map(ctx: lb.PrefixContext, person: str): + """Remove an alias for a vn trait (fun command)""" try: db = ctx.bot.d.con cursor = db.cursor() @@ -377,27 +514,17 @@ async def remove_trait_map(ctx: lb.PrefixContext, person: str): print(e) -@al_listener.command -@lb.add_checks(lb.owner_only) -@lb.option("series", "The series whose characters to search") -@lb.command( - "charas", - "Search a vn character", - pass_options=True, -) -@lb.implements(lb.PrefixCommand) -async def search_series_characters(ctx: lb.PrefixContext, series: str): - ... - - -async def _fetch_trait_map(user): +async def _fetch_trait_map(user: str) -> str: + """Search if there's a trait map for a query""" db = al_listener.bot.d.con cursor = db.cursor() cursor.execute("SELECT trait FROM traitmap WHERE user=?", (user,)) return cursor.fetchone() -async def _search_character(ctx: lb.Context, character: str): +async def _search_character( + ctx: lb.Context, *, character: t.Optional[str] = None, id_: t.Optional[int] = None +): """Search a character on AL""" query = """ query ($id: Int, $search: String) { # Define which variables will be used in the query @@ -440,41 +567,47 @@ async def _search_character(ctx: lb.Context, character: str): } """ - variables = { - "search": character - # ,"sort": FAVOURITES_DESC - } + try: + variables = {} - response = await ctx.bot.d.aio_session.post( - "https://graphql.anilist.co", - json={"query": query, "variables": variables}, - timeout=3, - ) - if not response.ok: - await ctx.respond( - f"Failed to fetch data 😡. \nTry typing the full name of the character." + if id_: + variables["id"] = id_ + + elif character: + variables["search"] = character + + else: + raise lb.NotEnoughArguments + + response = await ctx.bot.d.aio_session.post( + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=3, ) - return - response = await response.json() + if not response.ok: + await ctx.respond( + f"Failed to fetch data 😡. \nTry typing the full name of the character." + ) + return + response = await response.json() - response = response["data"]["Character"] + response = response["data"]["Character"] - title = response["name"]["full"] + title = response["name"]["full"] - if response["dateOfBirth"]["month"] and response["dateOfBirth"]["day"]: - dob = f"{response['dateOfBirth']['day']}/{response['dateOfBirth']['month']}" - if response["dateOfBirth"]["year"]: - dob += f"/{response['dateOfBirth']['year']}" - else: - dob = "NA" + if response["dateOfBirth"]["month"] and response["dateOfBirth"]["day"]: + dob = f"{response['dateOfBirth']['day']}/{response['dateOfBirth']['month']}" + if response["dateOfBirth"]["year"]: + dob += f"/{response['dateOfBirth']['year']}" + else: + dob = "NA" - if response["description"]: - response["description"] = parse_description(response["description"]) + if response["description"]: + response["description"] = parse_description(response["description"]) - else: - response["description"] = "NA" + else: + response["description"] = "NA" - try: series = "" for i, item in enumerate(response["media"]["nodes"]): @@ -485,15 +618,14 @@ async def _search_character(ctx: lb.Context, character: str): title=title, url=response["siteUrl"], description="\n\n", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) - .add_field("Gender", response["gender"]) + .add_field("Gender", response["gender"] or "Unknown") .add_field("DOB", dob, inline=True) .add_field("Favourites", f"{response['favourites']}❀", inline=True) .add_field("Character Description", response["description"]) .set_thumbnail(response["image"]["large"]) - # .set_author(url=response["siteUrl"], name=title) .set_footer( text="Source: AniList", icon="https://anilist.co/img/icons/android-chrome-512x512.png", @@ -501,19 +633,17 @@ async def _search_character(ctx: lb.Context, character: str): hk.Embed( title=title, url=response["siteUrl"], - # description="### Appears in", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) .set_thumbnail(response["image"]["large"]) .set_footer( text="Source: AniList", icon="https://anilist.co/img/icons/android-chrome-512x512.png", ) - .add_field("Appears in ", series) - # .set_author(url=response["siteUrl"], name=title) + .add_field("Appears in ", series), ] - view = views.CustomView(user_id=ctx.author.id) + view = views.AuthorView(user_id=ctx.author.id) view.add_item( btns.SwapButton( emoji1=hk.Emoji.parse("<:next:1136984292921200650>"), @@ -522,7 +652,7 @@ async def _search_character(ctx: lb.Context, character: str): swap_page=pages[1], ) ) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + view.add_item(btns.KillButton()) choice = await ctx.respond( embed=pages[0], @@ -536,11 +666,11 @@ async def _search_character(ctx: lb.Context, character: str): else: await ctx.edit_last_response(components=[]) except Exception as e: - await ctx.respond(e) - return + await ctx.respond(f"Errer: {e}") async def _search_novel(ctx: lb.Context, novel: str): + """Search a novel on AL""" query = """ query ($id: Int, $search: String, $type: MediaType) { # Define which variables will be used in the query (id) Media (id: $id, search: $search, type: $type, sort: POPULARITY_DESC, format_in: [NOVEL]) { # Insert our variables into the query arguments (id) (type: ANIME is hard-coded in the query) @@ -570,7 +700,7 @@ async def _search_novel(ctx: lb.Context, novel: str): } """ - print("\n\nNOVEL SEARCH\n\n") + variables = {"search": novel, "type": "MANGA"} response = await ctx.bot.d.aio_session.post( @@ -596,16 +726,15 @@ async def _search_novel(ctx: lb.Context, novel: str): else: response["description"] = "NA" - # try: - view = views.CustomView(user_id=ctx.author.id) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + view = views.AuthorView(user_id=ctx.author.id) + view.add_item(btns.KillButton()) choice = await ctx.respond( embed=hk.Embed( title=title, url=response["siteUrl"], description="\n\n", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) .add_field("Rating", response["meanScore"]) .add_field("Genres", ", ".join(response["genres"][:4])) @@ -618,7 +747,6 @@ async def _search_novel(ctx: lb.Context, novel: str): .add_field("Summary", response["description"]) .set_thumbnail(response["coverImage"]["large"]) .set_image(response["bannerImage"]) - # .set_author(url=response["siteUrl"], name=title) .set_footer( text="Source: AniList", icon="https://anilist.co/img/icons/android-chrome-512x512.png", @@ -630,12 +758,13 @@ async def _search_novel(ctx: lb.Context, novel: str): await view.wait() if hasattr(view, "answer"): # Check if there is an answer - print(f"Received an answer! It is: {view.answer}") + pass else: await ctx.edit_last_response(components=[]) async def _search_anime(ctx, anime: str): + """Search an anime on AL""" query = """ query ($id: Int, $search: String, $type: MediaType) { # Define which variables will be used (id) Page (perPage: 5) { @@ -658,6 +787,12 @@ async def _search_anime(ctx, anime: str): coverImage { large } + studios (isMain: true) { + nodes { + name + siteUrl + } + } bannerImage genres status @@ -690,11 +825,11 @@ async def _search_anime(ctx, anime: str): num = 0 if not len((await response.json())["data"]["Page"]["media"]) == 1: - view = views.CustomView(user_id=ctx.author.id) + view = views.AuthorView(user_id=ctx.author.id) embed = hk.Embed( title="Choose the desired anime", color=0x43408A, - timestamp=datetime.datetime.now().astimezone(), + timestamp=datetime.now().astimezone(), ) for count, item in enumerate((await response.json())["data"]["Page"]["media"]): @@ -705,10 +840,7 @@ async def _search_anime(ctx, anime: str): btns.GenericButton(style=hk.ButtonStyle.PRIMARY, label=f"{count+1}") ) - try: - embed.set_image("https://i.imgur.com/FCxEHRN.png") - except Exception as e: - print(e) + embed.set_image("https://i.imgur.com/FCxEHRN.png") choice = await ctx.respond(embed=embed, components=view) @@ -716,14 +848,12 @@ async def _search_anime(ctx, anime: str): await view.wait() if hasattr(view, "answer"): # Check if there is an answer - # print(f"Received an answer! It is: {view.answer}") num = f"{view.answer}" else: await ctx.edit_last_response(views=[]) return - # await ctx.respond('we beyond choice ') num = int(num) - 1 response = (await response.json())["data"]["Page"]["media"][num] @@ -733,7 +863,7 @@ async def _search_anime(ctx, anime: str): no_of_items = ( response["episodes"] if response["episodes"] != 1 - else verbose_timedelta(datetime.timedelta(minutes=response["duration"])) + else verbose_timedelta(timedelta(minutes=response["duration"])) ) if response["description"]: @@ -741,10 +871,9 @@ async def _search_anime(ctx, anime: str): else: response["description"] = "NA" - # print("response parsed ig") - # await ctx.respond('we near embed building ') + try: - view = views.CustomView(user_id=ctx.author.id) + view = views.AuthorView(user_id=ctx.author.id) trailer = "Couldn't find anything." @@ -753,8 +882,8 @@ async def _search_anime(ctx, anime: str): title=title, url=response["siteUrl"], description="\n\n", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) .add_field("Rating", response["meanScore"] or "NA") .add_field("Genres", ", ".join(response["genres"][:4])) @@ -764,16 +893,16 @@ async def _search_anime(ctx, anime: str): no_of_items, inline=True, ) + .add_field("Studio", response["studios"]["nodes"][0]["name"], inline=True) .add_field("Summary", response["description"]) .set_thumbnail(response["coverImage"]["large"]) .set_image(response["bannerImage"]) - # .set_author(url=response["siteUrl"], name=title) .set_footer( text="Source: AniList", icon="https://anilist.co/img/icons/android-chrome-512x512.png", ) ) - # await ctx.respond('built embed') + if response["trailer"]: if response["trailer"]["site"] == "youtube": trailer = f"https://{response['trailer']['site']}.com/watch?v={response['trailer']['id']}" @@ -789,10 +918,8 @@ async def _search_anime(ctx, anime: str): emoji2=hk.Emoji.parse("πŸ”"), ) ) - # ) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) - # await ctx.respond('made view') + view.add_item(btns.KillButton()) await ctx.edit_last_response( embed=embed, components=view, @@ -816,12 +943,13 @@ async def _search_anime(ctx, anime: str): else: buttons = [btns.KillNavButton()] - navigator = views.CustomNavi( + navigator = views.AuthorNavi( pages=pages, buttons=buttons, timeout=180, user_id=ctx.author.id ) async def _search_manga(ctx, manga: str): + """Search a manga on AL and Preview on MD""" query = """ query ($id: Int, $search: String, $type: MediaType) { # Define which variables will be used in the query (id) Media (id: $id, search: $search, type: $type, sort: POPULARITY_DESC, format_in: [MANGA, ONE_SHOT]) { # Insert our variables into the query arguments (id) (type: ANIME is hard-coded in the query) @@ -867,7 +995,6 @@ async def _search_manga(ctx, manga: str): return response = (await response.json())["data"]["Media"] - print(response) title = response["title"]["english"] or response["title"]["romaji"] @@ -933,8 +1060,8 @@ async def _search_manga(ctx, manga: str): title=title, url=response["siteUrl"], description="\n\n", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) .add_field("Rating", response["meanScore"] or "NA") .add_field("Genres", ", ".join(response["genres"][:4]) or "NA") @@ -951,7 +1078,6 @@ async def _search_manga(ctx, manga: str): text="Source: AniList", icon="https://anilist.co/img/icons/android-chrome-512x512.png", ) - # .set_author(url=response["siteUrl"], name=title) ] buttons = [btns.PreviewButton(), btns.KillNavButton()] @@ -973,7 +1099,6 @@ async def _search_manga(ctx, manga: str): ctx.channel_id, ) - # print("NavigatoID", navigator.message_id) ctx.bot.d.chapter_info[navigator.message_id] = [ base_url, data["first"]["id"], @@ -984,12 +1109,114 @@ async def _search_manga(ctx, manga: str): ] +async def _search_characters(ctx: lb.Context, query: str): + query = query.split(",") + if len(query) == 1 or query[1].strip() == "": + if query[0] in ["birth", "birthday", "bday"]: + try: + await ctx.respond( + embed=( + await ( + await ALCharacter.is_birthday(ctx.bot.d.aio_session) + ).make_embed() + ) + ) + except Exception as e: + await ctx.respond(e) + else: + await _search_character(ctx, character=query[0]) + return + + else: + # Make a character picker dropdown from the series + query_ = """ +query ($id: Int, $search: String) { # Define which variables will be used in the query (id) + Media (id: $id, search: $search) { + title { + english + romaji + } + characters { + nodes { + id + name { + full + + } + } + } + } + } + +""" + try: + variables = {"search": query[1]} + + response = await ctx.bot.d.aio_session.post( + "https://graphql.anilist.co", + json={"query": query_, "variables": variables}, + timeout=3, + ) + + if not response.ok: + await ctx.respond( + f"Failed to fetch data 😡, error `code: {response.status}`" + ) + return + + response = await response.json() + + view = views.AuthorView( + user_id=ctx.author.id, session=ctx.bot.d.aio_session + ) + + if query[0].strip() == "": + options = [] + title = ( + response["data"]["Media"]["title"]["english"] + or response["data"]["Media"]["title"]["romaji"] + ) + + for chara in response["data"]["Media"]["characters"]["nodes"]: + options.append( + miru.SelectOption( + label=chara["name"]["full"], value=chara["id"] + ) + ) + + view.add_item( + CharacterSelect( + options=options, placeholder=f"Select {title} character" + ) + ) + view.add_item(btns.KillButton()) + resp = await ctx.respond(content=None, components=view) + await view.start(resp) + await view.wait() + + else: + chara_choices = {} + + for chara in response["data"]["Media"]["characters"]["nodes"]: + chara_choices[chara["name"]["full"]] = chara["id"] + + closest_match, similarity_score = process.extractOne( + query[0], chara_choices.items() + ) + + await _search_character(ctx, id_=chara_choices[closest_match[0]]) + + except Exception as e: + await ctx.respond(f"Error {e}") + + async def _search_vn(ctx: lb.Context, query: str): + """Search a vn""" url = "https://api.vndb.org/kana/vn" headers = {"Content-Type": "application/json"} data = { "filters": ["search", "=", query], - "fields": "title, image.url, rating, released, length_minutes, description, tags.name", + "fields": "title, image.url, rating, released, length_minutes, description, tags.spoiler, tags.name", # "sort": "title" } try: @@ -997,52 +1224,62 @@ async def _search_vn(ctx: lb.Context, query: str): url, headers=headers, json=data, timeout=3 ) - print(req) if not req.ok: await ctx.respond("Couldn't find the VN you asked for.") return - print(await req.json()) req = await req.json() - print(list(tag["name"] for tag in req["results"][0]["tags"])[:3]) if req["results"][0]["description"]: description = parse_vndb_desciption(req["results"][0]["description"]) else: description = "NA" - view = views.CustomView(user_id=ctx.author.id) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + + if req["results"][0]["released"]: + date = req["results"][0]["released"].split("-") + try: + released = verbose_date(date[2], date[1], date[0]) + except Exception as e: + await ctx.respond(e) + else: + released = "Unreleased" + + tags = "NA" + + if req["results"][0]["tags"]: + tags = [] + for tag in req["results"][0]["tags"]: + if tag["spoiler"] == 0: + tags.append(tag["name"]) + + if len(tags) == 4: + break + + tags = ", ".join(tags) + + view = views.AuthorView(user_id=ctx.author.id) + view.add_item(btns.KillButton()) choice = await ctx.respond( hk.Embed( title=req["results"][0]["title"], url=f"https://vndb.org/{req['results'][0]['id']}", - color=0x948782, - timestamp=datetime.datetime.now().astimezone(), + color=colors.VNDB, + timestamp=datetime.now().astimezone(), ) .add_field("Rating", req["results"][0]["rating"] or "NA") - .add_field( - "Tags", - ", ".join(list(tag["name"] for tag in req["results"][0]["tags"])[:4]) - or "NA", - ) - .add_field( - "Released", req["results"][0]["released"] or "Unreleased", inline=True - ) + .add_field("Tags", tags) + .add_field("Released", released, inline=True) .add_field( "Est. Time", verbose_timedelta( - datetime.timedelta(minutes=req["results"][0]["length_minutes"]) + timedelta(minutes=req["results"][0]["length_minutes"]) ) - # f"{req['results'][0]['length_minutes']//60}h{req['results'][0]['length_minutes']%60}m" - if req["results"][0]["length_minutes"] else "NA", + if req["results"][0]["length_minutes"] + else "NA", inline=True, ) .add_field("Summary", description) .set_thumbnail(req["results"][0]["image"]["url"]) - # .set_author( - # name=req["results"][0]["title"], - # url=f"https://vndb.org/{req['results'][0]['id']}", - # ) .set_footer(text="Source: VNDB", icon="https://s.vndb.org/s/angel-bg.jpg"), components=view, ) @@ -1050,21 +1287,28 @@ async def _search_vn(ctx: lb.Context, query: str): await view.wait() if hasattr(view, "answer"): # Check if there is an answer - print(f"Received an answer! It is: {view.answer}") + pass else: await ctx.edit_last_response(components=[]) except Exception as e: - print(e, "\n\n\n") + await ctx.respond(e) -def replace_bbcode_with_markdown(match): +def replace_bbcode_with_markdown(match: re.Match) -> str: + """Make a MD string from a re Match object""" url = match.group(1) + + # Replacing VNDB ids with the corresponding url + if url[0] == "/": + url = "https://vndb.org" + url + link_text = match.group(2) markdown_link = f"[{link_text}]({url})" return markdown_link def parse_vndb_desciption(description: str) -> str: + """Parse a VNDB description into a Discord friendly Markdown""" description = ( description.replace("[spoiler]", "||") .replace("[/spoiler]", "||") @@ -1075,16 +1319,6 @@ def parse_vndb_desciption(description: str) -> str: .replace("[/i]", "") ) - print("\n\n\n", description, "\n\n\n") - - if "[url=/" in description: - print("ok") - description.replace("[url=/", "[url=https://vndb.org/") - if "[url=/" in description: - print("why ") - - print(description) - pattern = r"\[url=(.*?)\](.*?)\[/url\]" # Replace BBCode links with Markdown links in the text @@ -1102,62 +1336,91 @@ def parse_vndb_desciption(description: str) -> str: async def _search_vnchara(ctx: lb.Context, query: str): + """Search a vn character""" url = "https://api.vndb.org/kana/character" headers = {"Content-Type": "application/json"} data = { "filters": ["search", "=", query], - "fields": "name, description, age, sex, image.url, traits.name", + "fields": "name, description, age, sex, image.url, traits.name, traits.group_name", } req = await ctx.bot.d.aio_session.post(url, headers=headers, json=data, timeout=3) if not req.ok: - await ctx.respond("Couldn't find the tag you asked for.") + await ctx.respond("Couldn't find the character you asked for.") return req = await req.json() - if req["results"][0]["description"]: - description = parse_vndb_desciption(req["results"][0]["description"]) - else: - description = "NA" + try: + pages = collections.defaultdict(list) + options = [] - view = views.CustomView(user_id=ctx.author.id) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + for i, chara in enumerate(req["results"]): + if chara["description"]: + description = parse_vndb_desciption(chara["description"]) + else: + description = "NA" + + if chara["traits"]: + traits = {} + traits["body"] = [] + traits["personality"] = [] + + for trait in chara["traits"]: + if trait["group_name"] == "Personality": + traits["personality"].append(trait["name"]) + elif trait["group_name"] == "Body": + traits["body"].append(trait["name"]) + + traits = ( + f"_Body_: {', '.join(traits['body'][:5])}\n" + f"_Personality_: {', '.join(traits['personality'][:5])}" + ) + else: + traits = "NA" - choice = await ctx.respond( - hk.Embed( - title=req["results"][0]["name"], - url=f"https://vndb.org/{req['results'][0]['id']}", - color=0x948782, - timestamp=datetime.datetime.now().astimezone(), - ) - .add_field("Sex", req["results"][0]["sex"][0].upper() or "NA", inline=True) - .add_field("Age", req["results"][0]["age"] or "NA", inline=True) - .add_field( - "Traits", - ", ".join(list(trait["name"] for trait in req["results"][0]["traits"])[:4]) - or "NA", - ) - .add_field("Summary", description) - .set_thumbnail(req["results"][0]["image"]["url"]) - # .set_author( - # name=req["results"][0]["name"], - # url=f"https://vndb.org/{req['results'][0]['id']}", - # ) - .set_footer(text="Source: VNDB", icon="https://files.catbox.moe/3gg4nn.jpg"), - components=view, - ) - await view.start(choice) - await view.wait() + embed = [ + hk.Embed( + title=chara["name"], + url=f"https://vndb.org/{chara['id']}", + color=colors.VNDB, + timestamp=datetime.now().astimezone(), + ) + .add_field("Sex", chara["sex"][0].upper() or "NA", inline=True) + .add_field("Age", chara["age"] or "NA", inline=True) + .add_field("Traits", traits) + .add_field("Summary", description) + .set_thumbnail(chara["image"]["url"]) + .set_footer( + text="Source: VNDB", icon="https://files.catbox.moe/3gg4nn.jpg" + ), + ] - if hasattr(view, "answer"): # Check if there is an answer - print(f"Received an answer! It is: {view.answer}") - else: - await ctx.edit_last_response(components=[]) + if i == 0: + first_page = embed[0] + + options.append(miru.SelectOption(label=chara["name"], value=chara["name"])) + pages[chara["name"]] = embed[0] + + view = views.SelectView(user_id=ctx.author.id, pages=pages) + view.add_item(SimpleTextSelect(options=options, placeholder="Other characters")) + view.add_item(btns.KillButton()) + + resp = await ctx.respond(content=None, embed=first_page, components=view) + await view.start(resp) + await view.wait() + + except hk.BadRequestError: + view = views.AuthorView(user_id=ctx.author.id) + view.add_item(btns.KillButton()) + resp = await ctx.respond(embed=first_page, components=view) + await view.start(resp) + await view.wait() async def _search_vntag(ctx: lb.Context, query: str): + """Search a vn tag""" url = "https://api.vndb.org/kana/tag" headers = {"Content-Type": "application/json"} data = { @@ -1180,23 +1443,19 @@ async def _search_vntag(ctx: lb.Context, query: str): tags = ", ".join(req["results"][0]["aliases"]) or "NA" - view = views.CustomView(user_id=ctx.author.id) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + view = views.AuthorView(user_id=ctx.author.id) + view.add_item(btns.KillButton()) choice = await ctx.respond( hk.Embed( title=req["results"][0]["name"], url=f"https://vndb.org/{req['results'][0]['id']}", - color=0x948782, - timestamp=datetime.datetime.now().astimezone(), + color=colors.VNDB, + timestamp=datetime.now().astimezone(), ) .add_field("Aliases", tags) .add_field("Category", req["results"][0]["category"].upper(), inline=True) .add_field("No of VNs", req["results"][0]["vn_count"], inline=True) .add_field("Summary", description) - # .set_author( - # name=req["results"][0]["name"], - # url=f"https://vndb.org/{req['results'][0]['id']}", - # ) .set_footer(text="Source: VNDB", icon="https://files.catbox.moe/3gg4nn.jpg"), components=view, ) @@ -1204,12 +1463,13 @@ async def _search_vntag(ctx: lb.Context, query: str): await view.wait() if hasattr(view, "answer"): # Check if there is an answer - print(f"Received an answer! It is: {view.answer}") + pass else: await ctx.edit_last_response(components=[]) async def _search_vntrait(ctx: lb.Context, query: str): + """Search a vn character trait""" url = "https://api.vndb.org/kana/trait" headers = {"Content-Type": "application/json"} @@ -1225,8 +1485,6 @@ async def _search_vntrait(ctx: lb.Context, query: str): # "sort": "title" } - # print("Query is", query) - req = await ctx.bot.d.aio_session.post(url, headers=headers, json=data, timeout=3) if not req.ok: @@ -1242,23 +1500,19 @@ async def _search_vntrait(ctx: lb.Context, query: str): tags = ", ".join(req["results"][0]["aliases"][:5]) or "NA" - view = views.CustomView(user_id=ctx.author.id) - view.add_item(btns.KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) + view = views.AuthorView(user_id=ctx.author.id) + view.add_item(btns.KillButton()) choice = await ctx.respond( hk.Embed( title=req["results"][0]["name"], url=f"https://vndb.org/{req['results'][0]['id']}", - color=0x948782, - timestamp=datetime.datetime.now().astimezone(), + color=colors.VNDB, + timestamp=datetime.now().astimezone(), ) .add_field("Aliases", tags) .add_field("Group Name", req["results"][0]["group_name"], inline=True) .add_field("No of Characters", req["results"][0]["char_count"], inline=True) .add_field("Summary", description) - # .set_author( - # name=req["results"][0]["name"], - # url=f"https://vndb.org/{req['results'][0]['id']}", - # ) .set_footer(text="Source: VNDB", icon="https://files.catbox.moe/3gg4nn.jpg"), components=view, ) @@ -1266,7 +1520,7 @@ async def _search_vntrait(ctx: lb.Context, query: str): await view.wait() if hasattr(view, "answer"): # Check if there is an answer - print(f"Received an answer! It is: {view.answer}") + pass else: await ctx.edit_last_response(components=[]) @@ -1307,11 +1561,6 @@ async def al_link_finder(event: hk.GuildReactionAddEvent) -> None: print(e) if len(list_of_series) != 0: - # await channel.send("Beep, bop. AniList link found") - # await al_listener.bot.rest.edit_message( - # event.channel_id, event.message, flags=hk.MessageFlag.SUPPRESS_EMBEDS - # ) - for series in list_of_series: query = """ query ($id: Int, $search: String, $type: MediaType) { # Define which variables will be used in the query (id) @@ -1352,9 +1601,8 @@ async def al_link_finder(event: hk.GuildReactionAddEvent) -> None: timeout=10, ) if not response.ok: - print(response.json()) - await event.message.respond( - f"Nvm 😡, error `code: {response.status_code}`" + await event.message.send( + f"Couldn't find the AL entries details, `code: {response.status}`" ) return response = (await response.json())["data"]["Media"] @@ -1367,8 +1615,8 @@ async def al_link_finder(event: hk.GuildReactionAddEvent) -> None: content=f"Here are the details for the series requested here: {message.make_link(message.guild_id)}", embed=hk.Embed( description="\n\n", - color=0x2B2D42, - timestamp=datetime.datetime.now().astimezone(), + color=colors.ANILIST, + timestamp=datetime.now().astimezone(), ) .add_field("Rating", response["averageScore"]) .add_field("Genres", ",".join(response["genres"])) diff --git a/extensions/anime_plots.py b/extensions/anime_plots.py index 9422387..8cc7af2 100644 --- a/extensions/anime_plots.py +++ b/extensions/anime_plots.py @@ -1,15 +1,14 @@ """Make cool plot charts""" +import io + import hikari as hk import lightbulb as lb import plotly.graph_objects as go -import plotly.io as pio from plotly.subplots import make_subplots - -from functions.models import ColorPalette as colors from functions.fetch_trends import search_it - +from functions.models import ColorPalette as colors plot_plugin = lb.Plugin( "Plots", @@ -26,15 +25,15 @@ # Replace minus with context prefix 😾 ( "Plot the activity of a series at airtime or compare multiple. \n" - f"-> For example, doing `-plot bocchi the rock` will return the activity " + f"- For example, doing `-plot bocchi the rock` will return the activity " "of Bocchi the Rock series during its airtime. \n" - "-> To compare two series, you should seperate them with a 'vs' like so: \n" + "- To compare two series, you should seperate them with a 'vs' like so: \n" f"`-plot helck vs horimiya piece`\n" - "-> To compare series across seasons, add a --autoscale flag at the end for eg." + "- To compare series across seasons, add a --autoscale flag at the end for eg." f"`-plot bocchi vs kaguya --autoscale` \n\n" "Note: You should type out the full name of the series to avoid false matches" ) -) +) @lb.add_cooldown(15, 2, lb.ChannelBucket) @lb.add_cooldown(3, 1, lb.GuildBucket) @lb.option( @@ -55,17 +54,13 @@ async def compare_trends(ctx: lb.Context, query: str) -> None: try: query = query.split() - if query[-1] == "--autoscale": #Auto scale + if query[-1] == "--autoscale": # Auto scale autoscale = True query = " ".join(query[:-1]) else: autoscale = False query = " ".join(query) - - # except Exception as e: - # await ctx.respond(e) - series = query.split("vs") if not len(series) in [1, 2]: await ctx.respond("The command only works for one or two series.") @@ -78,7 +73,6 @@ async def compare_trends(ctx: lb.Context, query: str) -> None: if isinstance(data, int): await ctx.respond(f"An error occurred, `code: {data}` ") return - print(type(data)) # pio.renderers.default = "notebook" fig = make_subplots(specs=[[{"secondary_y": True}]]) @@ -122,17 +116,35 @@ async def compare_trends(ctx: lb.Context, query: str) -> None: data2 = await search_it(series[1], ctx.bot.d.aio_session) if autoscale: - gap = data["data"]["activity"]["dates"][0] - data2["data"]["activity"]["dates"][0] - if data["data"]["activity"]["dates"][0] > data2["data"]["activity"]["dates"][0]: - data["data"]["activity"]["dates"] = [item - gap for item in data["data"]["activity"]["dates"]] - data["data"]["episodes"]["dates"] = [item - gap for item in data["data"]["episodes"]["dates"]] - data["data"]["scores"]["dates"] = [item - gap for item in data["data"]["scores"]["dates"]] - + gap = ( + data["data"]["activity"]["dates"][0] + - data2["data"]["activity"]["dates"][0] + ) + if ( + data["data"]["activity"]["dates"][0] + > data2["data"]["activity"]["dates"][0] + ): + data["data"]["activity"]["dates"] = [ + item - gap for item in data["data"]["activity"]["dates"] + ] + data["data"]["episodes"]["dates"] = [ + item - gap for item in data["data"]["episodes"]["dates"] + ] + data["data"]["scores"]["dates"] = [ + item - gap for item in data["data"]["scores"]["dates"] + ] + else: - data2["data"]["activity"]["dates"] = [item + gap for item in data2["data"]["activity"]["dates"]] - data2["data"]["episodes"]["dates"] = [item + gap for item in data2["data"]["episodes"]["dates"]] - data2["data"]["scores"]["dates"] = [item + gap for item in data2["data"]["scores"]["dates"]] - + data2["data"]["activity"]["dates"] = [ + item + gap for item in data2["data"]["activity"]["dates"] + ] + data2["data"]["episodes"]["dates"] = [ + item + gap for item in data2["data"]["episodes"]["dates"] + ] + data2["data"]["scores"]["dates"] = [ + item + gap for item in data2["data"]["scores"]["dates"] + ] + # pio.renderers.default = "notebook" fig = make_subplots(specs=[[{"secondary_y": True}]]) fig.add_trace( @@ -200,7 +212,9 @@ async def compare_trends(ctx: lb.Context, query: str) -> None: yaxis_title="Trend Value", template="plotly_dark", ) - embed_title = f'Popularity Comparision: {data["name"]} vs {data2["name"]}' + embed_title = ( + f'Popularity Comparision: {data["name"]} vs {data2["name"]}' + ) except Exception as e: await ctx.respond(e) @@ -208,21 +222,12 @@ async def compare_trends(ctx: lb.Context, query: str) -> None: try: fig.update_yaxes(title_text="Score", secondary_y=True) - # await ctx.respond(hk.Embed(color=colors.ELECTRIC_BLUE).set_image("https://i.imgur.com/GinlBL8.gif")) - - fig.write_image(f"pictures/{query}.png") - msg = await ctx.respond(attachment=f"pictures/{query}.png") - await ctx.delete_last_response() - await ctx.respond( - # content=f"## {embed_title}", - # attachment=f"pictures/{query}.png" - embed=hk.Embed( - title=embed_title, color=colors.ELECTRIC_BLUE - ) - .set_image((await msg.message()).attachments[0]), - attachments=[], + embed=hk.Embed(title=embed_title, color=colors.ELECTRIC_BLUE).set_image( + hk.Bytes(io.BytesIO(fig.to_image(format="png")), f"{query}.png") + ), ) + except Exception as e: await ctx.respond(e) diff --git a/extensions/info.py b/extensions/info.py index 8592dd2..c82058d 100644 --- a/extensions/info.py +++ b/extensions/info.py @@ -1,4 +1,5 @@ """Get information about a role, server, user, bot etc.""" +import io import subprocess import typing as t from datetime import datetime @@ -7,9 +8,13 @@ import hikari as hk import lightbulb as lb import psutil +from fuzzywuzzy import process +from PIL import Image, ImageOps from functions.buttons import SwapButton -from functions.views import CustomNavi, CustomView +from functions.models import ColorPalette as colors +from functions.utils import is_image +from functions.views import AuthorNavi, AuthorView info_plugin = lb.Plugin("Utility", "Utility and info commands", include_datastore=True) info_plugin.d.help = True @@ -18,13 +23,20 @@ @info_plugin.command -@lb.set_help("Find the list of users in a role") @lb.option("role", "The role", modifier=lb.OptionModifier.CONSUME_REST) @lb.command("inrole", "List of users in role", pass_options=True) @lb.implements(lb.PrefixCommand) async def inrole_cmd( ctx: lb.Context, role: t.Union[hk.Role, hk.ScheduledEvent] ) -> None: + """List of members in a role or interested in an event + + Args: + ctx (lb.Context): Context for the command + role (t.Union[hk.Role, hk.ScheduledEvent]): The role/event + """ + + # Trying to convert it into an int (hk.SnowFlake) basically, if possible try: role = int(role) except ValueError: @@ -42,10 +54,20 @@ async def inrole_cmd( role = role_ break else: + guild_roles = {} for role_ in await ctx.bot.rest.fetch_roles(ctx.guild_id): - if role.lower() == role_.name.lower(): - role = role_ - break + guild_roles[role_.name] = role_ + + ans = process.extractOne(role, list(guild_roles.keys()), score_cutoff=91) + + # Unpacking the closest value, if it exists + if ans: + closest_role_match, _ = ans + + role = guild_roles[closest_role_match] + else: + await ctx.respond("No matching roles found") + return if isinstance(role, hk.Role): try: @@ -62,18 +84,15 @@ async def inrole_cmd( d2 += f"{member.username}\n" counter += 1 - - if counter == 0: await ctx.respond( hk.Embed( title=f"List of users in {role.name} role ({counter})", timestamp=datetime.now().astimezone(), - color=role.color or 0xFFFFFF - ) - .set_thumbnail(role.icon_url) + color=role.color or 0xFFFFFF, + ).set_thumbnail(role.icon_url) ) - + return mem_ids = [ @@ -83,18 +102,12 @@ async def inrole_cmd( d2.split("\n")[i : i + 20] for i in range(0, len(d2.split("\n")), 20) ] - mem_ids = [d1.split("\n")[i: i+20] for i in range(0, len(d1.split("\n")), 20)] - mem_names = [d2.split("\n")[i: i+20] for i in range(0, len(d2.split("\n")), 20)] - - - for i, item in enumerate(mem_ids): - pages.append( hk.Embed( - title=f"List of users in {role.name} role ({counter})", - timestamp=datetime.now().astimezone(), - color=role.color or 0xFFFFFF + title=f"List of users in {role.name} role ({counter})", + timestamp=datetime.now().astimezone(), + color=role.color or 0xFFFFFF, ) .set_thumbnail(role.icon_url) .add_field("UID", "\n".join(item), inline=True) @@ -105,11 +118,10 @@ async def inrole_cmd( await ctx.respond(pages[0]) return - for page in pages: ctx.author.send(page) - view = CustomNavi( + view = AuthorNavi( pages=pages, user_id=ctx.author.id, ) @@ -119,77 +131,215 @@ async def inrole_cmd( except Exception as e: await ctx.respond(e) return + else: + await ctx.respond("No matching roles found") + + +@info_plugin.command +@lb.add_checks(lb.has_guild_permissions(hk.Permissions.MANAGE_EVENTS) | lb.owner_only) +@lb.option("event", "The event", modifier=lb.OptionModifier.CONSUME_REST) +@lb.command( + "inevent", "Fetch the list of members interested in an event", pass_options=True +) +@lb.implements(lb.PrefixCommand) +async def inevent_cmd(ctx: lb.Context, event: t.Union[hk.ScheduledEvent, str]): + # try: + probable_event = event + try: - event_ = None - events = await ctx.bot.rest.fetch_scheduled_events(ctx.guild_id) - if isinstance(role, int): - for event in events: - if role == event.id: - event_ = event - break - else: - for event in events: - if role == event.name: - event_ = event - break + probable_event = int(probable_event) + except ValueError: + pass + + event_: t.Optional[hk.ScheduledEvent] = None + events = await ctx.bot.rest.fetch_scheduled_events(ctx.guild_id) + if isinstance(probable_event, int): + for event in events: + if probable_event == event.id: + event_ = event + break + else: - if not event_: - return - event_members = [] - await ctx.respond(f"{ctx.guild_id}, {event_.id}") - members = list( - await ctx.bot.rest.fetch_scheduled_event_users(ctx.guild_id, event_) + guild_events = {} + for event in await ctx.bot.rest.fetch_scheduled_events(ctx.guild_id): + guild_events[event.name] = event + + ans = process.extractOne( + probable_event, list(guild_events.keys()), score_cutoff=80 ) - for member in members: - if member.member: - event_members.append(member.member.username) - paginated_members = [ - event_members[i : i + 20] for i in range(0, len(event_members), 20) - ] - pages = [] + # Unpacking the closest value, if it exists + if ans: + closest_role_match, _ = ans + event_ = guild_events[closest_role_match] - if len(event_members) == 0: - await ctx.respond( - hk.Embed( + if not event_: + await ctx.respond("No matching events found") + return + + event_members = [] + members = list(await ctx.bot.rest.fetch_scheduled_event_users(ctx.guild_id, event_)) + for member in members: + if member.member: + event_members.append(member.member.username) + + paginated_members = [ + event_members[i : i + 20] for i in range(0, len(event_members), 20) + ] + pages = [] + + if len(event_members) == 0: + # await ctx.respond(event) + await ctx.respond( + hk.Embed( title=f"List of users interested in {event.name} ({len(event_members)})", timestamp=datetime.now().astimezone(), - color=0x43408A - ) - .set_image(event.image_url) + color=colors.DEFAULT, + ).set_image(event.image_url.url) + ) + + return + + for item in paginated_members: + pages.append( + hk.Embed( + title=f"List of users interested in {event.name} ({len(event_members)})", + timestamp=datetime.now().astimezone(), + color=colors.DEFAULT, + ) + .set_image(event.image_url.url) + .add_field("​", "\n".join(item)) + ) + + if len(pages) == 1: + + await ctx.respond(pages[0]) + return + + view = AuthorNavi( + pages=pages, + user_id=ctx.author.id, + ) + await view.send(ctx.channel_id) + + + +@info_plugin.command +@lb.add_checks( + lb.has_guild_permissions(hk.Permissions.MANAGE_EMOJIS_AND_STICKERS) | lb.guild_only +) +@lb.option( + "processor", + "Pre processors for the image", + required=False, + modifier=lb.OptionModifier.GREEDY, +) +@lb.option("emote", "The emote to add") +@lb.option("name", "Name of the emote to add") +@lb.command("addemote", "Add an emote to the server", aliases=["ae"], pass_options=True) +@lb.implements(lb.PrefixCommand) +async def add_emote( + ctx: lb.Context, + name: str, + emote: t.Union[hk.Emoji, str], + processor: t.Optional[t.List[str]], +) -> None: + try: + if len(name) < 2 or len(name) > 32: + await ctx.respond( + "Invalid emote name length. Must be between 2 and 32 characters" ) - return - for item in paginated_members: - pages.append( - hk.Embed( - title=f"List of users interested in {event.name} ({len(event_members)})", - timestamp=datetime.now().astimezone(), - color=0x43408A + try: + possible_emote = hk.Emoji.parse(emote) + except ValueError: + pass + + if isinstance(possible_emote, hk.CustomEmoji): + await ctx.respond("Adding...") + try: + emoji = await ctx.bot.rest.create_emoji( + ctx.guild_id, name=name, image=possible_emote ) - .set_image(event.image_url) - .add_field("​", "\n".join(item)) - ) + await ctx.respond(f"Added emote: {emoji.mention}") + except Exception as e: + await ctx.respond(f"Error: {e}") - if len(pages) == 1: - await ctx.respond(pages[0]) return - view = CustomNavi( - pages=pages, - user_id=ctx.author.id, - ) - await view.send(ctx.channel_id) + await ctx.respond(ctx.raw_options) + + image_type = await is_image( + emote, ctx.bot.d.aio_session + ) # 1 if PIL friendly image, 2 if not, 0 if not image + + if not image_type: + await ctx.respond("Invalid image url") + return + + elif image_type == 2: + try: + emoji = await ctx.bot.rest.create_emoji( + ctx.guild_id, name=name, image=emote + ) + await ctx.respond(f"Added emote: {emoji.mention}") + + except hk.RateLimitTooLongError: + await ctx.respond("Rate limit hit. Please try again shortly.") + + except hk.BadRequestError: + # Reason being server emotes full or invalid value + await ctx.respond("Can't add this emote") + + except hk.InternalServerError: + await ctx.respond("Discord went buggy oops") + + elif image_type == 1: + try: + async with ctx.bot.d.aio_session.get(emote, timeout=2) as resp: + img_bytes = await resp.read() + + ratio = len(img_bytes) / (1024 * 256) + await ctx.respond(ratio) + + if ratio > 1: + await ctx.respond( + "Image size possibly too large, attempting compression..." + ) + + im = Image.open(io.BytesIO(img_bytes)) + im = ImageOps.contain(im, (128, 128), Image.Resampling.LANCZOS) + + new_pixels = io.BytesIO() + im.save(new_pixels, format="PNG", optimize=True) + emote = new_pixels.getvalue() + # emote = io.BytesIO(im.resize((new_w, new_h)).tobytes()) + + else: + pass + + await ctx.respond(hk.Embed(title="Test").set_image(emote)) + emoji = await ctx.bot.rest.create_emoji( + ctx.guild_id, name=name, image=emote + ) + await ctx.respond(f"Added emote: {emoji.mention}") + + except hk.RateLimitTooLongError: + await ctx.respond("Rate limit hit. Please try again shortly.") + + except hk.BadRequestError: + # Reason being server emotes full or invalid value + await ctx.respond("Can't add this emote") + + except hk.InternalServerError: + await ctx.respond("Discord went buggy oops") except Exception as e: - await ctx.respond(e) - - if len(ctx.responses) == 0: - await ctx.respond(f"No role `{role}` found.") + await ctx.respond(f"Error: {e}") @info_plugin.command @@ -197,15 +347,21 @@ async def inrole_cmd( @lb.command("guilds", "See the servers the bot's in") @lb.implements(lb.PrefixCommand) async def guilds(ctx: lb.Context) -> None: + """Fetch a list of guilds the bot is in + + Args: + ctx (lb.Context): The context for the command + """ + pages = [] - buttons = [CustomPrevButton(), KillNavButton(), CustomNextButton()] + for gld in list([guild for guild in ctx.bot.cache.get_guilds_view().values()]): pages.append( hk.Embed( - color=0xF4EAE9, + color=colors.DAWN_PINK, title=f"Server: {gld.name}", description=f"Server ID: `{gld.id}`", - timestamp=datetime.datetime.now().astimezone(), + timestamp=datetime.now().astimezone(), ) .add_field("Owner", await gld.fetch_owner(), inline=True) .add_field( @@ -220,10 +376,69 @@ async def guilds(ctx: lb.Context) -> None: .set_image(gld.banner_url) ) - navigator = CustomNavi(pages=pages, buttons=buttons, user_id=ctx.author.id) + navigator = AuthorNavi(pages=pages, user_id=ctx.author.id) await navigator.send(ctx.channel_id) +@info_plugin.command +@lb.add_checks(lb.owner_only | lb.guild_only) +@lb.option("user", "The user duh", t.Optional[hk.Member], required=False) +@lb.command("userinfo", "Find information about a user", pass_options=True) +@lb.implements(lb.PrefixCommand) +async def user_info(ctx: lb.Context, user: hk.Member) -> None: + user = user or ctx.member + + try: + presence = user.get_presence() + + if presence.activities and len(presence.activities) != 0: + activity = f"{presence.activity.type} {presence.activity.name}" + else: + activity = presence.visible_status + + roles = (await user.fetch_roles())[1:] + + await ctx.respond( + hk.Embed( + title=f"User: {user.display_name}", + description=f"User ID: `{user.id}`", + colour=colors.DEFAULT, + timestamp=datetime.now().astimezone(), + ) + .set_footer( + text=f"Requested by {ctx.author.username}", + icon=ctx.author.display_avatar_url, + ) + .add_field( + "Bot?", + "Yes" if user.is_bot else "No", + inline=True, + ) + .add_field( + "Account Created", + f"", + inline=True, + ) + .add_field( + "Server Joined", + f"", + inline=True, + ) + .add_field( + "Roles", + ", ".join(r.mention for r in roles), + inline=False, + ) + .set_thumbnail(user.avatar_url) + .set_image(user.banner_url) + ) + + await ctx.respond(activity) + # await ctx.respond(f"{activity.type} {activity.name}") + except Exception as e: + await ctx.respond(e) + + @info_plugin.command @lb.add_cooldown(10, 1, lb.UserBucket) @lb.add_cooldown(15, 2, lb.ChannelBucket) @@ -251,13 +466,10 @@ async def botinfo(ctx: lb.Context) -> None: ) num_commits, _ = process.communicate() - try: - num_commits = int(num_commits) - except ValueError: - return + num_commits = int(num_commits) process = subprocess.Popen( - 'git log -5 --format=" %s"', + 'git log -5 --format=": %s"', text=True, shell=True, stdout=subprocess.PIPE, @@ -265,11 +477,17 @@ async def botinfo(ctx: lb.Context) -> None: ) changes, _ = process.communicate() + change_list: t.List[str] = [ + f"{i+1}. {item}" for i, item in enumerate(changes.split("\n")[:5]) + ] + + changes = "\n".join(change_list) + version = f"{floor(num_commits/1000)}.{floor(num_commits/100)}.{floor(num_commits/10)}" pages = [ hk.Embed( - color=0x43408A, + color=colors.DEFAULT, description="A multi-purpose discord bot \ written in hikari-py.", ) @@ -292,12 +510,12 @@ async def botinfo(ctx: lb.Context) -> None: .set_author(name=f"{user.username} Bot") .set_thumbnail(user.avatar_url) .set_footer(f"Made by: {data.owner}", icon=data.owner.avatar_url), - hk.Embed(description=changes, color=0x43408A).set_author( + hk.Embed(description=changes, color=colors.DEFAULT).set_author( name="Bot Changelog (Recent)" ), ] - view = CustomView(user_id=ctx.author.id) + view = AuthorView(user_id=ctx.author.id) view.add_item( SwapButton( label1="Changelogs", @@ -326,11 +544,16 @@ async def botinfo(ctx: lb.Context) -> None: @info_plugin.command @lb.add_checks( - lb.has_guild_permissions(hk.Permissions.MANAGE_EMOJIS_AND_STICKERS), + lb.has_guild_permissions(hk.Permissions.MANAGE_EMOJIS_AND_STICKERS) | lb.guild_only ) @lb.command("stickerinfo", "Get info about a sticker", aliases=["sticker"]) @lb.implements(lb.PrefixCommand) async def sticker_info(ctx: lb.Context) -> None: + """Fetch info about a sticker (guild or otherwise) + + Args: + ctx (lb.Context): The context (should have a sticker in the message) + """ resp_embed = [] if not ctx.event.message.stickers: await ctx.respond("No sticker in your message, I'm afraid") @@ -345,7 +568,7 @@ async def sticker_info(ctx: lb.Context) -> None: except hk.NotFoundError: resp_embed.append( hk.Embed( - color=0x000000, + color=colors.WARN, title=f"Sticker: {sticker.name}", timestamp=datetime.now().astimezone(), ) @@ -356,7 +579,7 @@ async def sticker_info(ctx: lb.Context) -> None: else: resp_embed.append( hk.Embed( - color=0x43408A, + color=colors.DEFAULT, title=f"Sticker: {sticker.name}", timestamp=datetime.now().astimezone(), ) @@ -367,7 +590,7 @@ async def sticker_info(ctx: lb.Context) -> None: inline=True, ) .add_field("Type", sticker.type, inline=True) - .add_field("Tag", sticker.tag) + .add_field("Tag", f":{sticker.tag}:") .set_thumbnail(ctx.get_guild().icon_url) .set_image(sticker.image_url) ) @@ -383,16 +606,23 @@ async def sticker_info(ctx: lb.Context) -> None: async def emote_info( ctx: lb.Context, ) -> None: + """Fetch basic info about upto 5 emotes + + Args: + ctx (lb.Context): The context in which the command is invoked + """ emotes = [] for word in ctx.event.message.content.split(" "): try: emote = hk.Emoji.parse(word) if isinstance(emote, hk.CustomEmoji): emotes.append(emote) - print(emote.url) + except: continue - print(emotes, "\n\n\n\n\n") + + emotes = list(set(emotes)) + if len(emotes) == 0: await ctx.respond("No emotes found") return @@ -412,6 +642,60 @@ async def emote_info( ) +@info_plugin.command +@lb.add_checks(lb.has_guild_permissions(hk.Permissions.ADMINISTRATOR) | lb.guild_only) +@lb.command("removesticker", "Remove a sticker", aliases=["rst"]) +@lb.implements(lb.PrefixCommand) +async def sticker_removal(ctx: lb.MessageContext): + # ctx.event.message + if not ctx.event.message.stickers: + await ctx.respond("No sticker in your message, I'm afraid") + return + + stickers = [] + sticker_partial = ctx.event.message.stickers[0] + + if isinstance(sticker_partial, hk.GuildSticker): + await ctx.respond("Dance") + + try: + sticker = await ctx.bot.rest.fetch_sticker(sticker_partial.id) + # await ctx.respond(dir(sticker)) + + # io.BytesIO(await ctx.bot.d.aio_session.get(sticker.image_url).read()) + + # await ctx.respond(sticker.image_url.url) + + sticker_image = await ctx.bot.d.aio_session.get( + sticker.image_url.url, timeout=2 + ) + + # await ctx.respond("can fetch sticker") + + await ctx.author.send( + hk.Embed( + title="STICKER REMOVAL NOTIFICATION", + color=colors.ERROR, + description=f"Sticker `{sticker.name}` removed from `{ctx.get_guild()}`", + timestamp=datetime.now().astimezone(), + ).set_image( + hk.Bytes( + io.BytesIO(await sticker_image.read()), + f"{sticker.name}_archive.png", + ) + ) + ) + + await ctx.bot.rest.delete_sticker(ctx.guild_id, sticker_partial) + await ctx.respond(f"Removed sticker: `{sticker.name}`") + + except hk.NotFoundError: + await ctx.respond("Sticker not present in the server") + + except hk.InternalServerError: + await ctx.respond("A hiccup from discord's side, please try again") + + @info_plugin.command @lb.add_checks( lb.bot_has_guild_permissions(hk.Permissions.MANAGE_EMOJIS_AND_STICKERS), @@ -427,8 +711,13 @@ async def emote_info( @lb.implements(lb.PrefixCommand) async def emote_removal( ctx: lb.Context, - # emotes: t.Sequence[hk.Emoji] ) -> None: + """Remove multiple emotes from a guild + + Args: + ctx (lb.Context): The context in which the command is invoked (should have the emotes in the message) + """ + words = ctx.event.message.content.split(" ") if len(words) == 1: await ctx.respond( @@ -441,6 +730,7 @@ async def emote_removal( ), ) ) + return emotes = [] for word in words: try: @@ -463,12 +753,24 @@ async def emote_removal( for emote in emotes[:5]: try: + # await ctx.author.send( + # content=( + # "## EMOTE REMOVAL NOTIFICATION\n" + # f"Emote `{emote.name}` removed from `{ctx.get_guild()}`" + # ), + # attachment=emote, + # ) await ctx.author.send( - content=( - "## EMOTE REMOVAL NOTIFICATION\n" - f"Emote `{emote.name}` removed from `{ctx.get_guild()}`" - ), - attachment=emote, + hk.Embed( + title="EMOTE REMOVAL NOTIFICATION", + color=colors.ERROR, + description=f"Emote `{emote.name}` removed from `{ctx.get_guild()}`", + timestamp=datetime.now().astimezone(), + ).set_image( + hk.Bytes( + io.BytesIO(await emote.read()), f"{emote.name}_archive.png" + ) + ) ) await ctx.bot.rest.delete_emoji(ctx.guild_id, emote) await ctx.respond(f"Removed emote: `{emote.name}`") diff --git a/extensions/prog_utils.py b/extensions/prog_utils.py index 8e823b5..52e9bd6 100644 --- a/extensions/prog_utils.py +++ b/extensions/prog_utils.py @@ -16,6 +16,12 @@ @compiler_plugin.command +@lb.set_help( + ( + "A function to return the output of your code. " + "\nUse print() to print whatever output you desire" + ) +) @lb.option( "code", "The code to test", str, modifier=lb.commands.OptionModifier.CONSUME_REST ) @@ -52,7 +58,7 @@ async def compiler(ctx: lb.Context, code: str) -> None: ["python3", "ntfc.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) as result: output, error = result.communicate(timeout=12) - print(output, error) + if error: await ctx.respond( f"Process returned with error: ```{(str(error, 'UTF-8')).split('ntfc.py')[1][3:]}```" diff --git a/extensions/reload.py b/extensions/reload.py index 5fa56c9..3e74de1 100644 --- a/extensions/reload.py +++ b/extensions/reload.py @@ -14,9 +14,10 @@ @lb.option( "extension", "The extension to reload", + choices=[i[13:-3].replace("/", ".") for i in glob.glob("./extensions/*.py")], ) @lb.command("reload", "Reload an extension", pass_options=True, aliases=["rl"]) -@lb.implements(lb.PrefixCommand) +@lb.implements(lb.PrefixCommand, lb.SlashCommand) async def reload_plugin(ctx: lb.Context, extension: str) -> None: """Reload an extension""" @@ -27,7 +28,7 @@ async def reload_plugin(ctx: lb.Context, extension: str) -> None: await ctx.respond("Reloaded all extensions") return except Exception as e: - print(e) + await ctx.respond(f"Error: `{e}`") ctx.bot.reload_extensions(f"extensions.{extension}") await ctx.respond("Extension reloaded successfully.") @@ -57,29 +58,6 @@ async def unload_plugin(ctx: lb.Context, extension: str) -> None: await ctx.respond("Extension unloaded successfully.") -@reload_plugin.set_error_handler -async def compile_error(event: lb.CommandErrorEvent) -> bool: - """Error handling""" - exception = event.exception.__cause__ or event.exception - - if isinstance(exception, lb.MissingRequiredPermission): - await event.context.respond("You're missing some perms there, bub.") - return True - - if isinstance(exception, lb.CommandIsOnCooldown): - await event.context.respond( - f"The command is on cooldown, you can use it after {int(exception.retry_after)}s", - delete_after=int(exception.retry_after), - ) - return True - - if isinstance(exception, lb.errors.NotEnoughArguments): - await event.context.respond("Please specify the extension name.") - return True - - return False - - def load(bot: lb.BotApp) -> None: """Load the plugin""" bot.add_plugin(reloader_plugin) diff --git a/extensions/sauce.py b/extensions/sauce.py index 8fd6e80..93dad83 100644 --- a/extensions/sauce.py +++ b/extensions/sauce.py @@ -7,11 +7,10 @@ import dotenv import hikari as hk import lightbulb as lb -import requests from functions.buttons import GenericButton, KillButton from functions.utils import * -from functions.views import CustomView +from functions.views import AuthorView dotenv.load_dotenv() @@ -26,7 +25,7 @@ @sauce_plugin.command -@lb.command("User pfp Sauce", "Sauce of user pfp") +@lb.command("User pfp Sauce", "Sauce of user pfp", auto_defer=True) @lb.implements(lb.UserCommand) async def pfp_sauce(ctx: lb.UserContext): """Find the sauce of a user's pfp @@ -73,7 +72,7 @@ async def pfp_sauce(ctx: lb.UserContext): @sauce_plugin.command -@lb.command("Find the Sauce", "Search the sauce of the image") +@lb.command("Find the Sauce", "Search the sauce of the image", auto_defer=True) @lb.implements(lb.MessageCommand) async def find_sauce_menu(ctx: lb.MessageContext): """Find the image from a message and then its sauce @@ -82,7 +81,6 @@ async def find_sauce_menu(ctx: lb.MessageContext): ctx (lb.MessageContext): The context the command is invoked in """ - url = await _find_the_url(ctx) if not url["errorMessage"]: @@ -157,12 +155,12 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: service (str, optional): The service to choose. Defaults to None. """ - await ctx.respond(f"{get_random_quote()} {hk.Emoji.parse('')}") - + await ctx.respond( + f"{get_random_quote()} {hk.Emoji.parse('')}" + ) url = await _find_the_url(ctx) - if not url["errorMessage"]: link = url["url"] else: @@ -188,7 +186,6 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: if float(data["header"]["similarity"]) < 55.0: view.add_item( GenericButton( - url=f"https://yandex.com/images/search?url={url['url']}&rpt=imageview", label="Search Yandex", ) @@ -200,7 +197,6 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: await view.start(choice) await view.wait() except Exception as e: - # print(e, "\n\n\n") embed, view = await _simple_parsing(ctx, data) choice = view.add_item( KillButton(style=hk.ButtonStyle.SECONDARY, label="❌") @@ -221,8 +217,7 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: sauce = f"[{res[0]['filename']}](https://anilist.co/anime/{res[0]['anilist']})" - print(res[0]["similarity"] * 100) - view = CustomView(user_id=ctx.author.id) + view = AuthorView(user_id=ctx.author.id) view.add_item(KillButton(style=hk.ButtonStyle.SECONDARY, label="❌")) choice = await ctx.edit_last_response( @@ -242,7 +237,7 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: .set_author(name="Search results returned the follows: ") .set_footer( text="Powered by: Trace.Moe", - ) + ), ) await view.start(choice) await view.wait() @@ -250,47 +245,15 @@ async def find_sauce(ctx: lb.Context, link: str, service: str = None) -> None: await ctx.respond("Couldn't find it.") except Exception as e: await ctx.respond(f"Ran into an unknown exception: ```{e}```") - print(e) - - -@sauce_plugin.command -@lb.option( - "link", - "The link to check", -) -@lb.command("pingu", "Check if site alive", pass_options=True, hidden=True) -@lb.implements(lb.PrefixCommand) -async def pingu(ctx: lb.Context, link: str) -> None: - """A function to check if a site returns an OK status - - Args: - ctx (lb.Context): The context in which the command is invoked - link (str): The URL of the site - """ - - if not check_if_url(link): - await ctx.respond("That's... not a link <:AkanePoutColor:852847827826376736>") - return - - try: - if (await ctx.bot.d.aio_session.get(link, timeout=2)).ok: - await ctx.respond(f"The site `{link}` is up and running βœ…") - else: - await ctx.respond( - f"The site `{link}` is either down or has blocked the client ❌" - ) - except Exception as e: - await ctx.respond(e) async def _complex_parsing(ctx: lb.Context, data: dict): """A usually stable method to form an embed via sauce results""" sauce = "😡" - view = CustomView(user_id=ctx.author.id, timeout=10 * 60) + view = AuthorView(user_id=ctx.author.id, timeout=10 * 60) if "MangaDex" in data["header"]["index_name"]: - try: if "mal_id" in data["data"].keys(): view.add_item( @@ -308,8 +271,8 @@ async def _complex_parsing(ctx: lb.Context, data: dict): url=(await al_from_mal(name=data["data"]["source"]))["siteUrl"], ) ) - except Exception as e: - print(e) + except: + pass view.add_item( GenericButton( style=hk.ButtonStyle.LINK, @@ -333,9 +296,7 @@ async def _complex_parsing(ctx: lb.Context, data: dict): view, ) - elif "Anime" in data["header"]["index_name"]: - if len(data["data"]["ext_urls"]) > 1: view.add_item( GenericButton( @@ -356,7 +317,6 @@ async def _complex_parsing(ctx: lb.Context, data: dict): ) if res.ok: - res = await res.json() view.add_item( @@ -392,9 +352,7 @@ async def _complex_parsing(ctx: lb.Context, data: dict): view, ) - elif "Danbooru" in data["header"]["index_name"]: - view.add_item( GenericButton( style=hk.ButtonStyle.LINK, @@ -431,9 +389,7 @@ async def _complex_parsing(ctx: lb.Context, data: dict): view, ) - elif "Pixiv" in data["header"]["index_name"]: - view.add_item( GenericButton( style=hk.ButtonStyle.LINK, @@ -459,11 +415,8 @@ async def _complex_parsing(ctx: lb.Context, data: dict): ), view, ) - # except Exception as e: - # print(e) + elif "H-Misc (E-Hentai)" in data["header"]["index_name"]: - # view = CustomView(user_id=ctx.author.id) - # try: view.add_item( GenericButton( style=hk.ButtonStyle.LINK, @@ -554,7 +507,7 @@ async def _simple_parsing(ctx: lb.Context, data: dict): .set_footer( text="Powered by: SauceNAO", icon="https://i.imgur.com/2VRIEPR.png" ), - CustomView(user_id=ctx.author.id), + AuthorView(user_id=ctx.author.id), ) @@ -621,16 +574,6 @@ async def vndb_url(text: str) -> str: url_regex = re.compile(pattern) -# def _match_url(text): -# # Find the first occurrence of the pattern in the text -# match = url_regex.search(text) - -# if match: -# return match.group() -# else: -# return None - - async def _find_the_url(ctx) -> dict: """A function which finds the link (if it exists) across contexts @@ -656,7 +599,7 @@ async def _find_the_url(ctx) -> dict: } try: if _is_tenor_link(url): - # If the tenor scrapping fails, then it sends the original link back + # If the tenor scrapping fails, then it sends the original link back link = await tenor_link_from_gif(url, ctx.bot.d.aio_session) if link == url: return {"url": None, "errorMessage": "Unknown error"} @@ -700,9 +643,9 @@ async def _find_the_url(ctx) -> dict: "errorMessage": "There's nothing here to find the sauce of <:AkaneSip:1095068327786852453>", } - - if await is_image(ctx.options["target"].attachments[0].url, ctx.bot.d.aio_session): - + if await is_image( + ctx.options["target"].attachments[0].url, ctx.bot.d.aio_session + ): return { "url": ctx.options["target"].attachments[0].url, "errorMessage": None, @@ -711,8 +654,6 @@ async def _find_the_url(ctx) -> dict: return {"url": url, "errorMessage": errorMessage} - - def _is_tenor_link(link) -> bool: """Very barebones way to check if a confirmed link is a tenor gif""" if "//tenor.com/view" in link: diff --git a/extensions/seasonals.py b/extensions/seasonals.py index 88544d5..b6db846 100644 --- a/extensions/seasonals.py +++ b/extensions/seasonals.py @@ -1,14 +1,15 @@ -"""To make text or message logs""" +"""Seasonal anime releases feed""" import asyncio -import datetime import json import re +from datetime import datetime import hikari as hk import lightbulb as lb from dateutil import parser from functions.buttons import GenericButton, NewButton +from functions.models import ColorPalette as colors from functions.utils import rss2json from functions.views import PeristentViewTest @@ -18,7 +19,7 @@ aniupdates.d.help = False -async def get_magnet_for(query: str): +async def _get_magnet_for(query: str): magnet_feed = "https://subsplease.org/rss/?r=1080" items = rss2json(magnet_feed) items = json.loads(items) @@ -28,7 +29,7 @@ async def get_magnet_for(query: str): return i["link"] -async def get_anime_updates() -> list: +async def _get_anime_updates() -> list: link = "https://www.toptal.com/developers/feed2json/convert" # magnet_feed = "https://subsplease.org/rss/?r=1080" @@ -43,24 +44,16 @@ async def get_anime_updates() -> list: for i in items["feeds"]: item_dict = {} - # if not "1080" in i["title"]: - # continue - print(datetime.datetime.now(datetime.timezone.utc)) - print(i["title"]) - print(i["published"]) - - # try: if ( int( ( - datetime.datetime.now(datetime.timezone.utc) - - parser.parse(i["published"]) + datetime.now(datetime.timezone.utc) - parser.parse(i["published"]) ).total_seconds() ) > 720 ): - print("short") + # Series didn't release in the last 12 minutes, so continue to loop continue item_dict["timestamp"] = parser.parse(i["published"]) item_dict["link"] = i["link"] @@ -70,13 +63,11 @@ async def get_anime_updates() -> list: i["title"][13:-13].split("(")[0][:-6] ) if not item_dict["data"]: - print("Not on AL") + # Series not on AL, hence skipping it continue - print(item_dict) + updates.append(item_dict) - # except Exception as e: - # print(e) return updates @@ -85,31 +76,21 @@ async def on_starting(event: hk.StartedEvent) -> None: """Event fired on start of bot""" view = PeristentViewTest() await view.start() - # conn = sqlite3.connect("akane_db.db") - # cursor = conn.cursor() - # aniupdates.bot.d.con = conn - - # cursor.execute(''' - # CREATE TABLE IF NOT EXISTS aniupdates ( - # id INTEGER PRIMARY KEY AUTOINCREMENT, - # guild_channel INTEGER, - # ) - # ''') while True: print("Getting anime updates") - updates = await get_anime_updates() + updates = await _get_anime_updates() if not updates: print("\n\nNOTHING\n\n") - # print(updates) + for update in updates: view = PeristentViewTest() view.add_item( NewButton( style=hk.ButtonStyle.SECONDARY, - custom_id=f"{int(datetime.datetime.now().timestamp())}", + custom_id=f"{int(datetime.now().timestamp())}", emoji=hk.Emoji.parse("🧲"), - link=await get_magnet_for(update["file"]), + link=await _get_magnet_for(update["file"]), ) ) view.add_item( @@ -122,7 +103,6 @@ async def on_starting(event: hk.StartedEvent) -> None: view.add_item( GenericButton( style=hk.ButtonStyle.LINK, - # label = "Anilist", emoji=hk.Emoji.parse("<:anilist:1127683041372942376>"), url=update["data"]["siteUrl"], ) @@ -131,7 +111,7 @@ async def on_starting(event: hk.StartedEvent) -> None: check = await aniupdates.bot.rest.create_message( channel=channel, embed=hk.Embed( - color=0x7DF9FF, + color=colors.ELECTRIC_BLUE, description=update["file"][13:], timestamp=update["timestamp"], title=f"Episode {get_episode_number(update['file'])}: {update['data']['title']['romaji']} out", diff --git a/extensions/tasks_utils.py b/extensions/tasks_utils.py index 78d3571..7205eba 100644 --- a/extensions/tasks_utils.py +++ b/extensions/tasks_utils.py @@ -3,25 +3,25 @@ import glob import os import subprocess +import typing as t +from datetime import datetime import hikari as hk import lightbulb as lb +from fuzzywuzzy import process from lightbulb.ext import tasks -# from extensions.ping import ( -# CustomNavi, -# CustomNextButton, -# CustomPrevButton, -# KillNavButton, -# ) +from functions.models import ColorPalette as colors +from functions.utils import check_if_url task_plugin = lb.Plugin("Tasks", "Background processes", include_datastore=True) task_plugin.d.help = False -@tasks.task(d=1) -async def remove_lookup_data(): - task_plugin.bot.d.chapter_info = {} +@tasks.task(d=2) +async def clear_session_cache(): + """Clear the bot's request session cache""" + await task_plugin.bot.d.aio_session.delete_expired_responses() @tasks.task(d=10) @@ -34,67 +34,78 @@ async def clear_pic_files(): print("Cleared") -# @task_plugin.command -# @lb.add_checks(lb.owner_only) -# @lb.command("dir", "Upload media files from the cwd", hidden=True) -# @lb.implements(lb.PrefixCommand) -# async def directory(ctx: lb.Context) -> None: -# """Get media files from the directory, this to be used is if a command fails +@task_plugin.listener(hk.GuildMessageCreateEvent) +async def custom_commands(event: hk.GuildMessageCreateEvent) -> None: + """Listener to listen for fuzzy command matching -# Args: -# ctx (lb.Context): The event context (irrelevant to the user) -# """ - -# if not (guild := ctx.get_guild()): -# await ctx.respond("This command may only be used in servers.") -# return - -# embed = hk.Embed() -# view = miru.View() - -# if len(os.listdir("./pictures")) > 20: -# await ctx.respond("Too many items. Can't list") -# return - -# for i, item in enumerate(["./pictures"]): -# embed.add_field(f"`{i+1}.`", f"```ansi\n\u001b[0;35m{item} ```") -# view.add_item(GenericButton(style=hk.ButtonStyle.SECONDARY, label=str(item))) -# view.add_item(KillButton(style=hk.ButtonStyle.DANGER, label="❌")) + Args: + event (hk.GuildMessageCreateEvent): The event to listen for + """ -# choice = await ctx.respond(embed=embed, components=view) + if event.is_bot or not event.content: + return -# await view.start(choice) -# await view.wait() + app = task_plugin.bot + prefixes = await app.get_prefix(app, event.message) -# if not hasattr(view, "answer"): -# await ctx.edit_last_response("Process timed out", embeds=[], components=[]) -# return + ctx_prefix = None -# folder = view.answer + for prefix in prefixes: + if event.content.startswith(prefix): + ctx_prefix = prefix + break -# embed2 = hk.Embed() -# view2 = miru.View() + if not ctx_prefix: + return -# for i, item in enumerate(os.listdir(f"./{folder}")): -# embed2.add_field(f"`{i+1}.`", f"```ansi\n\u001b[0;35m{item} ```") + try: + commandish = event.content[len(ctx_prefix) :].split()[0] + except IndexError: # Executed if the message is only the prefix + + # The idea being that any prefix must be under 5 characters (this will be enforced) + prefixes_string = "\n".join(filter(lambda x: len(x) < 5, prefixes)) + + await app.rest.create_message( + event.channel_id, + embed=hk.Embed( + color=colors.ELECTRIC_BLUE, timestamp=datetime.now().astimezone() + ) + .add_field("Global Prefixes", f"```{prefixes_string}```") + .add_field("Server Prefixes", f"```ansi\n\u001b[0;30mComing Soon...```") + .add_field("Additional", "- Pinging the bot always works :)") + .set_author( + name="Akane Bot Prefix Configuration", icon=app.get_me().avatar_url + ), + ) + return -# view2.add_item(GenericButton(style=hk.ButtonStyle.SECONDARY, label=f"{i+1}")) -# view2.add_item(KillButton(style=hk.ButtonStyle.DANGER, label="❌")) -# # view. + async with app.rest.trigger_typing(event.channel_id): + prefix_commands_and_aliases = [ + command[0] for command in app.prefix_commands.items() + ] -# choice = await ctx.edit_last_response(embed=embed2, components=view2) + if commandish in prefix_commands_and_aliases: + pass + else: + close_matches: t.Optional[t.Tuple[str, int]] = process.extractBests( + commandish, prefix_commands_and_aliases, score_cutoff=60, limit=3 + ) -# await view2.start(choice) -# await view2.wait() + possible_commands: t.Sequence = [] -# if hasattr(view2, "answer"): # Check if there is an answer -# await ctx.edit_last_response(content="Here it is.", embeds=[], components=[]) -# filez = os.listdir(f"./{folder}")[int(view2.answer) - 1] -# else: -# await ctx.edit_last_response("Process timed out.", embeds=[], components=[]) -# return + if close_matches: + possible_commands = [i for i, _ in close_matches] + else: + possible_commands = [" "] -# await ctx.respond(attachment=f"{folder}/{filez}") + await app.rest.create_message( + event.channel_id, + ( + f"No command with the name `{commandish}` could be found. " + f"Did you mean: `{'` or `'.join(possible_commands)}`" + ), + ) + return @task_plugin.command @@ -106,7 +117,7 @@ async def update_code(ctx: lb.Context) -> None: ["git", "pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) as result: output, error = result.communicate(timeout=12) - print(output, error) + if error: await ctx.respond( f"Process returned with error: ```{(str(error, 'UTF-8'))}```" @@ -117,7 +128,57 @@ async def update_code(ctx: lb.Context) -> None: await ctx.edit_last_response("Restarting the bot...") - os.kill(os.getpid()) + await ctx.bot.close() + + # try: + # os.kill(os.getpid(), signal.SIGTERM) + # except Exception as e: + # await ctx.respond(e) + + +@task_plugin.command +@lb.option("color", "The colour to embed", hk.Color) +@lb.command("embed", "Make embed of a color", pass_options=True, hidden=True) +@lb.implements(lb.PrefixCommand) +async def embed_color(ctx: lb.Context, color: hk.Color) -> None: + await ctx.respond( + embed=hk.Embed( + color=color, + title="Test Embed", + description="Testing the appropriate colours for embed", + timestamp=datetime.now().astimezone(), + ) + ) + + +@task_plugin.command +@lb.option( + "link", + "The link to check", +) +@lb.command("pingu", "Check if site alive", pass_options=True, hidden=True) +@lb.implements(lb.PrefixCommand) +async def pingu(ctx: lb.Context, link: str) -> None: + """A function to check if a site returns an OK status + + Args: + ctx (lb.Context): The context in which the command is invoked + link (str): The URL of the site + """ + + if not check_if_url(link): + await ctx.respond("That's... not a link <:AkanePoutColor:852847827826376736>") + return + + try: + if (await ctx.bot.d.aio_session.get(link, timeout=2)).ok: + await ctx.respond(f"The site `{link}` is up and running βœ…") + else: + await ctx.respond( + f"The site `{link}` is either down or has blocked the client ❌" + ) + except Exception as e: + await ctx.respond(f"Hit an exception: `{e}`") @task_plugin.command @@ -129,7 +190,7 @@ async def update_code(ctx: lb.Context) -> None: ["git", "pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) as result: output, error = result.communicate(timeout=12) - print(output, error) + if error: await ctx.respond( f"Process returned with error: ```{(str(error, 'UTF-8'))}```" @@ -143,7 +204,7 @@ async def update_code(ctx: lb.Context) -> None: @task_plugin.command @lb.add_checks(lb.owner_only) -@lb.command("shutdown", "shut down") +@lb.command("shutdown", "shut down", aliases=["kms"]) @lb.implements(lb.PrefixCommand) async def guilds(ctx: lb.Context) -> None: with open("ded.txt", "w+", encoding="UTF-8") as ded: @@ -219,37 +280,27 @@ async def prefix_invocation(event: lb.CommandInvocationEvent) -> None: @lb.command("stats", "Bot usage stats") @lb.implements(lb.PrefixCommand) async def bot_stats(ctx: lb.Context) -> None: - try: - conn = task_plugin.bot.d.con - cursor = conn.cursor() - cursor.execute("SELECT command, usage FROM botstats") - result = cursor.fetchall() - - command, usage = ("```", "```") - - for item in result: - command += item[0] - command += "\n" - usage += str(item[1]) - usage += "\n" - - command += "```" - usage += "```" - - await ctx.respond( - embed=hk.Embed(title="Bot Usage Stats") - .add_field("Command", command, inline=True) - .add_field("Usage", usage, inline=True) - ) + conn = task_plugin.bot.d.con + cursor = conn.cursor() + cursor.execute("SELECT command, usage FROM botstats") + result = cursor.fetchall() - except Exception as e: - print(e) + command, usage = ("```", "```") - # if result is None: + for item in result: + command += item[0] + command += "\n" + usage += str(item[1]) + usage += "\n" - # cursor.execute("INSERT INTO botstats (command, usage) VALUES (?, 1)", (command,)) + command += "```" + usage += "```" - # conn.commit() + await ctx.respond( + embed=hk.Embed(title="Bot Usage Stats") + .add_field("Command", command, inline=True) + .add_field("Usage", usage, inline=True) + ) def load(bot: lb.BotApp) -> None: diff --git a/extensions/yt.py b/extensions/yt.py index 88f551d..347eff8 100644 --- a/extensions/yt.py +++ b/extensions/yt.py @@ -4,17 +4,14 @@ import dotenv import hikari as hk import lightbulb as lb -import requests from functions.buttons import GenericButton, KillButton from functions.models import YTVideo - -# from extensions.ping import CustomView, GenericButton, KillButton -from functions.views import CustomView +from functions.views import AuthorView dotenv.load_dotenv() -YT_KEY = os.environ["YT_KEY"] +YT_KEY = os.getenv("YT_KEY") yt_plugin = lb.Plugin("YouTube", "Search and get songs", include_datastore=True) @@ -46,7 +43,6 @@ async def youtube_search(ctx: lb.Context, query: str) -> None: await ctx.respond("This command may only be used in servers.") return try: - req = requests.Session() response_params = { "part": "snippet", "maxResults": "6", @@ -55,10 +51,12 @@ async def youtube_search(ctx: lb.Context, query: str) -> None: "key": YT_KEY, } - response = req.get( - " https://youtube.googleapis.com/youtube/v3/search", params=response_params + response = await ctx.bot.d.aio_session.get( + "https://youtube.googleapis.com/youtube/v3/search", + params=response_params, + timeout=2, ) - # print(type(response.json())) + if not response.ok: await ctx.respond(f"Error occurred 😡, code `{response.status_code}`") return @@ -66,10 +64,10 @@ async def youtube_search(ctx: lb.Context, query: str) -> None: embed = hk.Embed() lst_vids = [] embed.set_footer(f"Requested by: {ctx.author}", icon=ctx.author.avatar_url) - view = CustomView(user_id=ctx.author.id) + view = AuthorView(user_id=ctx.author.id) for i in range(5): - qvideo = YTVideo(response.json(), i) - qvideo.set_duration(req) + qvideo = YTVideo(await response.json(), i) + await qvideo.set_duration(ctx.bot.d.aio_session) embed.add_field( f"`{i+1}.`", ( @@ -82,24 +80,21 @@ async def youtube_search(ctx: lb.Context, query: str) -> None: view.add_item(GenericButton(style=hk.ButtonStyle.SECONDARY, label=f"{i+1}")) view.add_item(KillButton(style=hk.ButtonStyle.DANGER, label="❌")) - # view.add_item(NoButton(style=hk.ButtonStyle.DANGER, label="No")) choice = await ctx.respond(embed=embed, components=view) - # vid_index = choice - 1 await view.start(choice) await view.wait() - # view.from_message(message) + if hasattr(view, "answer"): # Check if there is an answer await ctx.edit_last_response( f"Video link: {lst_vids[int(view.answer)-1].get_link()}", embeds=[], - # flags=hk.MessageFlag.SUPPRESS_EMBEDS, components=[], ) else: await ctx.edit_last_response("Process timed out.", embeds=[], views=[]) return except Exception as e: - print(e) + await ctx.respond(e) def load(bot: lb.BotApp) -> None: diff --git a/functions/buttons.py b/functions/buttons.py index 7a4b8b3..330b64b 100644 --- a/functions/buttons.py +++ b/functions/buttons.py @@ -1,22 +1,21 @@ """Custom Button classes""" +import io import typing as t import hikari as hk import miru - -# import requests -# import requests_cache from miru.ext import nav -# from bs4 import BeautifulSoup - -# requests_cache.install_cache( -# "my_cache", expire_after=3600 -# ) # Cache expires after 1 hour (3600 seconds) +# from bs4 import BeautifulSoup +async def poor_mans_proxy(link: str, session): + resp = await session.get(link, timeout=2) + return io.BytesIO(await resp.read()) async def preview_maker(base_url, data_id, title, manga_id, cover, session): + """A preview maker function for the manga previews""" + req = await session.get(f"{base_url}/at-home/server/{data_id}", timeout=10) if not req.ok: @@ -33,7 +32,8 @@ async def preview_maker(base_url, data_id, title, manga_id, cover, session): except Exception as e: print("ERra\n\n\n", e) - for page in r_json["chapter"]["data"]: + for page in r_json["chapter"]["data"][:5]: + # Proxy the first five at first pages.append( hk.Embed( title=title, @@ -41,7 +41,10 @@ async def preview_maker(base_url, data_id, title, manga_id, cover, session): url=f"https://mangadex.org/title/{manga_id}", ) .set_image( - hk.URL(f"{r_json['baseUrl']}/data/{r_json['chapter']['hash']}/{page}") + await poor_mans_proxy( + f"{r_json['baseUrl']}/data/{r_json['chapter']['hash']}/{page}", + session, + ) ) .set_footer( "Fetched via: MangaDex", @@ -64,7 +67,7 @@ async def preview_maker(base_url, data_id, title, manga_id, cover, session): class GenericButton(miru.Button): - """A custom next general class""" + """A general button class""" # Let's leave our arguments dynamic this time, instead of hard-coding them def __init__(self, *args, **kwargs) -> None: @@ -76,9 +79,8 @@ async def callback(self, ctx: miru.ViewContext) -> None: class KillNavButton(nav.NavButton): - """A custom next kill class""" + """A custom navigator kill button class""" - # Let's leave our arguments dynamic this time, instead of hard-coding them def __init__( self, *, @@ -100,7 +102,7 @@ async def before_page_change(self) -> None: class CustomPrevButton(nav.NavButton): - """A custom previous button class""" + """A custom previous button class to make a rotating navigator""" def __init__( self, @@ -130,7 +132,7 @@ async def before_page_change(self) -> None: class CustomNextButton(nav.NavButton): - """A custom next button class""" + """A custom next button class to make a rotating navigator""" def __init__( self, @@ -158,7 +160,7 @@ async def before_page_change(self) -> None: ... -class NavLinkButton(nav.NavButton): +class NavButton(nav.NavButton): """A custom next button class""" def __init__( @@ -183,7 +185,7 @@ async def before_page_change(self) -> None: class PreviewButton(nav.NavButton): - """A custom next button class""" + """A custom button for the manga preview""" def __init__( self, @@ -204,7 +206,7 @@ async def callback(self, ctx: miru.ViewContext): if self.label == "πŸ”": self.label = "Preview" self.emoji = hk.Emoji.parse("") - print(self.view.children) + for item in self.view.children: if not item == self: self.view.remove_item(item) @@ -230,7 +232,7 @@ async def callback(self, ctx: miru.ViewContext): # await self.view.swap_pages( # ctx, ) # ) - except: + except Exception as e: await ctx.respond( ( f"Looks like MangaDex doesn't have this series " @@ -326,17 +328,28 @@ async def before_page_change(self) -> None: class KillButton(miru.Button): - """A custom next kill class""" + """A custom kill button class""" - # Leaving our arguments dynamic, instead of hard-coding them - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) + def __init__( + self, + *, + style: t.Union[hk.ButtonStyle, int] = hk.ButtonStyle.SECONDARY, + label: t.Optional[str] = "❌", + custom_id: t.Optional[str] = None, + emoji: t.Optional[t.Union[hk.Emoji, str]] = None, + row: t.Optional[int] = None, + ): + super().__init__( + style=style, label=label, custom_id=custom_id, emoji=emoji, row=row + ) async def callback(self, ctx: miru.ViewContext) -> None: await self.view.message.delete() class NewButton(miru.Button): + """A spitter button for the releases feed""" + def __init__( self, style: t.Union[hk.ButtonStyle, int] = hk.ButtonStyle.SECONDARY, @@ -347,7 +360,6 @@ def __init__( ) -> None: self.link = link super().__init__(style=style, label=label, emoji=emoji, custom_id=custom_id) - print(self.link) async def callback(self, ctx: miru.ViewContext) -> None: try: @@ -357,7 +369,7 @@ async def callback(self, ctx: miru.ViewContext) -> None: class SwapButton(miru.Button): - """A custom next button class""" + """A button to switch between a two-paged view""" def __init__( self, @@ -385,22 +397,14 @@ def __init__( ) async def callback(self, ctx: miru.ViewContext): - # if not ctx.author.id == self.view.user_id: - # await ctx.respond( - # ( - # "You can't interact with this button as " - # "you are not the invoker of the command." - # ), - # flags=hk.MessageFlag.EPHEMERAL, - # ) - # return + if self.emoji == self.emoji1: if self.label2 or self.emoji2: self.label = self.label2 self.emoji = self.emoji2 - # await ctx.respond("Page 1 -> 2") + # Page 1 -> 2 if isinstance(self.swap_page, str): await ctx.edit_response( @@ -410,13 +414,12 @@ async def callback(self, ctx: miru.ViewContext): await ctx.edit_response( content=None, embeds=[self.swap_page], components=self.view ) - # await self.view.swap_pages(ctx, self.other_page) + return - # await ctx.respond("Page 2 -> 1") - # await ctx.respond(self.emoji1) - # await ctx.respond(self.emoji) + # Page 2 -> 1 + self.label = self.label1 self.emoji = self.emoji1 diff --git a/functions/components.py b/functions/components.py index fbb5bd1..f8907c8 100644 --- a/functions/components.py +++ b/functions/components.py @@ -1,10 +1,15 @@ """Misc component classes""" import typing as t +import hikari as hk import miru +from functions.models import ALCharacter + + +class SimpleTextSelect(miru.TextSelect): + """A simple text select which switches between a pages dictionary based on user choice""" -class HelpTextSelect(miru.TextSelect): def __init__( self, *, @@ -14,7 +19,7 @@ def __init__( min_values: int = 1, max_values: int = 1, disabled: bool = False, - row: int | None = None + row: int | None = None, ) -> None: super().__init__( options=options, @@ -30,3 +35,18 @@ async def callback(self, ctx: miru.ViewContext) -> None: # self.view.answer = self.values[0] # try: await ctx.edit_response(embeds=[self.view.pages[self.values[0]]]) + + +class CharacterSelect(miru.TextSelect): + """A text select made for the character command's dropdown""" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + async def callback(self, ctx: miru.ViewContext) -> None: + try: + chara = await ALCharacter.from_id(self.values[0], self.view.session) + await ctx.edit_response(embeds=[await chara.make_embed()]) + + except Exception as e: + await ctx.respond(content=f"Error: {e}", flags=hk.MessageFlag.EPHEMERAL) diff --git a/functions/fetch_trends.py b/functions/fetch_trends.py index 94aea4d..6eeb777 100644 --- a/functions/fetch_trends.py +++ b/functions/fetch_trends.py @@ -1,15 +1,11 @@ """The extension which fetches the AL data for the plot function""" -import datetime +from datetime import datetime, timedelta from operator import itemgetter -import requests -import requests_cache - -requests_cache.install_cache("my_cache", expire_after=3600) - async def search_it(search_query: str, session) -> dict | int: """Search for the anime""" + # Here we define our query as a multi-line string query = """ query ($id: Int, $search: String) { @@ -52,7 +48,7 @@ async def search_it(search_query: str, session) -> dict | int: data = (await response.json())["data"]["Media"] al_id = data["id"] name = data["title"]["english"] or data["title"]["romaji"] - lower_limit = datetime.datetime( + lower_limit = datetime( data["startDate"]["year"], data["startDate"]["month"], data["startDate"]["day"], @@ -60,32 +56,26 @@ async def search_it(search_query: str, session) -> dict | int: 0, ) - if datetime.datetime.now() < lower_limit: + if datetime.now() < lower_limit: print("Unaired stuff sir") - lower_limit = lower_limit - datetime.timedelta(days=7) + lower_limit = lower_limit - timedelta(days=7) if data["endDate"]["year"]: - upper_limit = datetime.datetime( + upper_limit = datetime( data["endDate"]["year"], data["endDate"]["month"], data["endDate"]["day"], 0, 0, - ) + datetime.timedelta(days=7) + ) + timedelta(days=7) else: - upper_limit = datetime.datetime.now() + upper_limit = datetime.now() else: print((await response.json())["errors"]) return response.status - # except Exception as e: - # print("\n\n\n\n\n\n") - # print(e) - # print(name, "\n\n\n\n") - # return name - - """Fetching the trend values """ - # req = requests.Session() - # id = input("Enter id. ") + + # Fetching the trend values + trend_score = [] flag = True counter = 1 @@ -118,11 +108,12 @@ async def search_it(search_query: str, session) -> dict | int: } response = await session.post( - "https://graphql.anilist.co", json={"query": query, "variables": variables}, timeout=2 + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=2, ) if response.ok: - # print(response.json()) if not (await response.json())["data"]["Page"]["pageInfo"]["hasNextPage"]: flag = False else: @@ -131,7 +122,6 @@ async def search_it(search_query: str, session) -> dict | int: for item in (await response.json())["data"]["Page"]["mediaTrends"]: trend_score.append(item) else: - # print("ERROR") print((await response.json())["errors"]) return response.status @@ -151,10 +141,10 @@ async def search_it(search_query: str, session) -> dict | int: for value in sorted(episode_entries, key=itemgetter("date")): trends2.append(value["trending"]) - dates2.append(datetime.datetime.fromtimestamp(value["date"])) + dates2.append(datetime.fromtimestamp(value["date"])) for value in sorted(trend_score, key=itemgetter("date")): - dates.append(datetime.datetime.fromtimestamp(value["date"])) + dates.append(datetime.fromtimestamp(value["date"])) trends.append(value["trending"]) if value["averageScore"]: scores.append(value["averageScore"]) @@ -164,18 +154,9 @@ async def search_it(search_query: str, session) -> dict | int: return { "name": name, "data": { - "activity": { - "dates": dates, - "values": trends - }, - "episodes": { - "dates": dates2, - "values": trends2 - }, - "scores": { - "dates": dates[-len(scores) :], - "values": scores - } + "activity": {"dates": dates, "values": trends}, + "episodes": {"dates": dates2, "values": trends2}, + "scores": {"dates": dates[-len(scores) :], "values": scores}, }, # [dates, trends, dates2, trends2, dates[-len(scores) :], scores], } diff --git a/functions/help.py b/functions/help.py index 7ad9462..5ad04fd 100644 --- a/functions/help.py +++ b/functions/help.py @@ -11,83 +11,9 @@ from lightbulb import errors, plugins from lightbulb.help_command import BaseHelpCommand -from functions.components import HelpTextSelect +from functions.components import SimpleTextSelect from functions.views import SelectView -# except Exception as e: -# await ctx.respond(e) -# await ctx.respond("Test") - - -# async def get_text(self, select: miru.TextSelect, ctx: miru.Context) -> None: -# """Create the selection menu""" -# print(select) -# await ctx.respond("I exist") -# self.answer = select.values[0] - -# async def callback(self, ctx: miru.ViewContext): -# await ctx.respond("this that badabing badabong") - - -# class AnimalView(miru.View): -# """The view class for the animals command""" - -# def __init__(self, author: hk.User) -> None: -# self.author = author -# super().__init__(timeout=60*5) - -# @miru.text_select( -# # custom_id="animal_select", -# placeholder="Choose The Plugin", -# options=[ -# miru.SelectOption("Dog", value="dog", emoji="🐢"), -# miru.SelectOption("Bird", value="bird", emoji="🐦"), -# miru.SelectOption("Koala", value="koala", emoji="🐨"), -# miru.SelectOption("Panda", value="panda", emoji="🐼"), -# miru.SelectOption("Cat", value="cat", emoji="🐱"), -# miru.SelectOption("Racoon", value="racoon", emoji="🦝"), -# miru.SelectOption( -# "Red Panda", -# value="red_panda", -# emoji=hk.Emoji.parse("<:RedPanda:1060649685934674001>"), -# ), -# ], -# ) -# async def select_menu(self, select: miru.TextSelect, ctx: miru.Context) -> None: -# """Create the selection menu""" -# print(select) -# animal = select.values[0] -# async with ctx.bot.d.aio_session.get( -# f"https://some-random-api.ml/animal/{animal}" -# ) as res: -# if res.ok: -# res = await res.json() - -# await ctx.edit_response( -# f"Here's a {animal.replace('_', ' ')} for you!!", -# components=[], -# embed=hk.Embed( -# title="", -# description=res["fact"], -# color=0xF4EAE9, -# timestamp=datetime.now().astimezone(), -# ) -# .set_image(res["image"]) -# .set_footer( -# f"Requested by: {ctx.author}", icon=ctx.author.avatar_url -# ), -# ) -# else: -# await ctx.edit_response( -# f"API error, `code:{res.status}`", components=[] -# ) - -# async def on_timeout(self) -> None: -# await self.message.edit("Timed out", components=[]) - -# async def view_check(self, ctx: miru.Context) -> bool: -# return ctx.user.id == self.author.id - async def filter_commands( cmds: t.Sequence[commands.base.Command], ctx: context_.base.Context @@ -144,14 +70,7 @@ def _add_cmds_to_plugin_pages( async def send_bot_help(self, ctx: context_.base.Context) -> None: pages = {} - # lines = [ - # ">>> ```adoc", - # "Akane Bot Help Menu", - # "", - # f"For more information: {context.prefix}help [command|category]", - # "", - # "==== Categories ====", - # ] + try: main_embed = ( hk.Embed( @@ -168,7 +87,6 @@ async def send_bot_help(self, ctx: context_.base.Context) -> None: ) .set_image("https://i.imgur.com/LJ1t4wD.png") ) - # import os p_commands = await self._get_command_plugin_map( self.app._prefix_commands, ctx @@ -176,25 +94,21 @@ async def send_bot_help(self, ctx: context_.base.Context) -> None: s_commands = await self._get_command_plugin_map( self.app._slash_commands, ctx ) - # m_commands = await self._get_command_plugin_map(self.app._message_commands, context) - # u_commands = await self._get_command_plugin_map(self.app._user_commands, context) plugin_pages: t.MutableMapping[ t.Optional[plugins.Plugin], t.List[str] ] = collections.defaultdict(list) self._add_cmds_to_plugin_pages(plugin_pages, p_commands, "Prefix") self._add_cmds_to_plugin_pages(plugin_pages, s_commands, "Slash") - # self._add_cmds_to_plugin_pages(plugin_pages, m_commands, "Message") - # self._add_cmds_to_plugin_pages(plugin_pages, u_commands, "User") for plugin, page in plugin_pages.items(): if not plugin: continue if not plugin.d.help == True: continue - # if plugin: + main_embed.add_field(plugin.name, plugin.description) - # """Start of tragedy""" + p_cmds, s_cmds = [], [] all_commands = await filter_commands(plugin._all_commands, ctx) for cmd in all_commands: @@ -202,17 +116,12 @@ async def send_bot_help(self, ctx: context_.base.Context) -> None: p_cmds.append(cmd) elif isinstance(cmd, commands.slash.SlashCommand): s_cmds.append(cmd) - # elif isinstance(cmd, commands.message.MessageCommand): - # m_cmds.append(cmd) - # elif isinstance(cmd, commands.user.UserCommand): - # u_cmds.append(cmd) cmds: t.List[t.Tuple[t.Sequence[commands.base.Command], str]] = [ (p_cmds, "Prefix"), (s_cmds, "Slash"), - # (m_cmds, "Message"), - # (u_cmds, "User"), ] + embed = hk.Embed( color=0x000000, title=f"{plugin.name} Help", @@ -220,49 +129,17 @@ async def send_bot_help(self, ctx: context_.base.Context) -> None: timestamp=datetime.now().astimezone(), ) - # embed.add_field("") for cmd_list, header in cmds: - # field1 = "" - # field2 = "" - # field3 = "" desc = "" if cmd_list: - # embed.add_field(f"{header} Commands", "\u200B") for cmd in set(cmd_list): - desc += f"`{cmd.name}" - # print(" "*14-len(cmd.name)) - desc += " " * (14 - len(cmd.name)) - desc += f"` {cmd.description} \n" - # lines.append(f"- {cmd.name} - {cmd.description}, {cmd.aliases}") - # field1 += f"```{cmd.name}```" - # field2 += f"```{cmd.description}```" - # field3 += f"```{', '.join(cmd.aliases) or ' '}```" + desc += f"`{cmd.name: <14}` {cmd.description} \n" + embed.add_field(f"{header} Commands", desc) - # embed.add_field("\u200B", field2, inline=True) - # if isinstance(cmd_list[0], commands.prefix.PrefixCommand): - # embed.add_field("\u200B", field3, inline=True) - # else: - # embed.add_field("\u200B", "\u200B", inline=True) embed.set_image(plugin.d.help_image) pages[plugin.name.replace(" ", "_")] = embed - # ) - - # "\n".join( - # [ - # ">>> ```adoc", - # f"==== {plugin.name if plugin is not None else 'Uncategorised'} ====", - # (f"{plugin.description}\n" if plugin.description else "No description provided\n") - # if plugin is not None - # else "", - # *page, - # "```", - # ] - # ) - - # lines.append("```") - # pages.insert(0, "\n".join(lines)) - # try: + view = SelectView(user_id=ctx.author.id, pages=pages) options = [] for plugin, _ in plugin_pages.items(): @@ -275,30 +152,16 @@ async def send_bot_help(self, ctx: context_.base.Context) -> None: label=plugin.name, value=plugin.name, emoji=plugin.d.help_emoji ) ) - # selector = - view.add_item(HelpTextSelect(options=options, placeholder="Select Plugin")) - resp = await ctx.respond(embed=main_embed, components=view) - # print("\n\n", pages, "\n\n") + view.add_item( + SimpleTextSelect(options=options, placeholder="Select Plugin") + ) + resp = await ctx.respond(content=None, embed=main_embed, components=view) await view.start(resp) await view.wait() - # print("Selector") - # print(dir(selector)) - # print("\n\n\n\n\n") - # print(dir(view)) - # # await ctx.respond(dir(resp)) - # # await context.respond(pages) - # print(dir(resp), "\n\n\n\n\n\n\n") - - # if hasattr(view, "answer"): # Check if there is an answer - # print(f"Received an answer! It is: {view.answer}") - # await context.edit_last_response(embeds=[pages[view.answer]], components=view) - - except Exception as e: - await ctx.respond(e) - # navigator = nav.ButtonNavigator(pages) - # await navigator.run(context) + except Exception as exp: + await ctx.respond(f"Initializing help command failed: `{exp}`") async def send_command_help( self, ctx: context_.base.Context, command: commands.base.Command @@ -312,32 +175,40 @@ async def send_command_help( else "πŸ–±οΈ" ) - # embed = ( - - # ) - # lines = [ - # ">>> ```adoc", - # "==== Command Help ====", - # f"{command.name} - {command.description}", - # "", - # f"Usage: {prefix}{command.signature}", - # "", - # long_help if long_help else "No additional details provided.", - # "```", - # ] - await ctx.respond( - embed=hk.Embed( - color=0x000000, - title="Command Help", - description=( - f"**{command.name}** \n" - f"{command.description} \n\n" - f"Usage: `{prefix}{command.signature}` \n\n" - f"Aliases: {', '.join(command.aliases)}\n\n" - f"{long_help or ''}" + if len(command.aliases) > 0: + aliases = f"Aliases: {', '.join(command.aliases)}\n\n" + else: + aliases = "" + + if len(ctx.responses) == 0: + await ctx.respond( + embed=hk.Embed( + color=0x000000, + title="Command Help", + description=( + f"**{command.name}** \n" + f"{command.description} \n\n" + f"Usage: `{prefix}{command.signature}` \n\n" + f"{aliases}" + f"{long_help or ''}" + ), + ) + ) + else: + await ctx.edit_last_response( + content=None, + embed=hk.Embed( + color=0x000000, + title="Command Help", + description=( + f"**{command.name}** \n" + f"{command.description} \n\n" + f"Usage: `{prefix}{command.signature}` \n\n" + f"{aliases}" + f"{long_help or ''}" + ), ), ) - ) async def send_group_help( self, @@ -394,64 +265,34 @@ async def send_group_help( async def send_plugin_help( self, ctx: context_.base.Context, plugin: plugins.Plugin ) -> None: - # lines = [ - # ">>> ```adoc", - # "==== Category Help ====", - # f"{plugin.name} - {plugin.description or 'No description provided'}", - # "", - # ] - try: - p_cmds, s_cmds, m_cmds, u_cmds = [], [], [], [] - all_commands = await filter_commands(plugin._all_commands, ctx) - for cmd in all_commands: - if isinstance(cmd, commands.prefix.PrefixCommand): - p_cmds.append(cmd) - elif isinstance(cmd, commands.slash.SlashCommand): - s_cmds.append(cmd) - # elif isinstance(cmd, commands.message.MessageCommand): - # m_cmds.append(cmd) - # elif isinstance(cmd, commands.user.UserCommand): - # u_cmds.append(cmd) - - cmds: t.List[t.Tuple[t.Sequence[commands.base.Command], str]] = [ - (p_cmds, "Prefix"), - (s_cmds, "Slash"), - # (m_cmds, "Message"), - # (u_cmds, "User"), - ] - embed = hk.Embed( - color=0x000000, - title=f"{plugin.name} Help", - description=f"{plugin.description or 'No additional details provided.'}\n", - timestamp=datetime.now().astimezone(), - ) + p_cmds, s_cmds = [], [] + all_commands = await filter_commands(plugin._all_commands, ctx) + for cmd in all_commands: + if isinstance(cmd, commands.prefix.PrefixCommand): + p_cmds.append(cmd) + elif isinstance(cmd, commands.slash.SlashCommand): + s_cmds.append(cmd) + + # Message and User commands are not included in the Plugin Help + cmds: t.List[t.Tuple[t.Sequence[commands.base.Command], str]] = [ + (p_cmds, "Prefix"), + (s_cmds, "Slash"), + ] + + embed = hk.Embed( + color=0x000000, + title=f"{plugin.name} Help", + description=f"{plugin.description or 'No additional details provided.'}\n", + timestamp=datetime.now().astimezone(), + ) + + for cmd_list, header in cmds: + desc = "" + if cmd_list: + for cmd in set(cmd_list): + desc += f"`{cmd.name: <14}` {cmd.description} \n" + + embed.add_field(f"{header} Commands", desc) - # embed.add_field("") - for cmd_list, header in cmds: - # field1 = "" - # field2 = "" - # field3 = "" - desc = "" - if cmd_list: - # embed.add_field(f"{header} Commands", "\u200B") - for cmd in set(cmd_list): - desc += f"`{cmd.name}" - # print(" "*14-len(cmd.name)) - desc += " " * (14 - len(cmd.name)) - desc += f"` {cmd.description} \n" - # lines.append(f"- {cmd.name} - {cmd.description}, {cmd.aliases}") - # field1 += f"```{cmd.name}```" - # field2 += f"```{cmd.description}```" - # field3 += f"```{', '.join(cmd.aliases) or ' '}```" - embed.add_field(f"{header} Commands", desc) - # embed.add_field("\u200B", field2, inline=True) - # if isinstance(cmd_list[0], commands.prefix.PrefixCommand): - # embed.add_field("\u200B", field3, inline=True) - # else: - # embed.add_field("\u200B", "\u200B", inline=True) - - embed.set_image(plugin.d.help_image) - # lines.append("```") - await ctx.respond(embed) - except Exception as e: - await ctx.respond(e) + embed.set_image(plugin.d.help_image) + await ctx.respond(embed) diff --git a/functions/models.py b/functions/models.py index 48b2c0e..8865ece 100644 --- a/functions/models.py +++ b/functions/models.py @@ -1,3 +1,17 @@ +import os +import re +from datetime import datetime + +import aiohttp_client_cache +import dotenv +import hikari as hk +import isodate + +dotenv.load_dotenv() + +YT_KEY = os.getenv("YT_KEY") + + class YTVideo: """YouTube search video class""" @@ -12,21 +26,21 @@ def __init__(self, search_results: dict, i: int) -> None: ] # make it medium later self.vid_channel: str = search_results["items"][i]["snippet"]["channelTitle"] self.vid_duration: str = "" - # pprint(search_results["items"][i]) def get_link(self) -> str: """Getting a link to the vid""" return f"https://www.youtube.com/watch?v={self.vid_id}" - def set_duration(self, req) -> str: + async def set_duration(self, req) -> str: """Make an API call and set the duration property""" - ytapi2 = req.get( + ytapi2 = await req.get( ( f"https://youtube.googleapis.com/youtube/v3/videos?part=snippet%2Ccontent" f"Details%2Cstatistics&id={self.vid_id}®ionCode=US&key={YT_KEY}" - ) + ), + timeout=2, ) - ytapi2 = ytapi2.json() + ytapi2 = await ytapi2.json() self.vid_duration = str( isodate.parse_duration(ytapi2["items"][0]["contentDetails"]["duration"]) ) @@ -37,23 +51,375 @@ def set_duration(self, req) -> str: return self.vid_duration[2:] return self.vid_duration - def get_duration_secs(self) -> int: - """Get the duration in seconds""" - ytapi2 = requests.get( - ( - f"https://youtube.googleapis.com/youtube/v3/videos?part=snippet%2Ccontent" - f"Details%2Cstatistics&id={self.vid_id}®ionCode=US&key={YT_KEY}" - ), - timeout=10, + +class ColorPalette: + """Commonly required colours for the bot + extras""" + + ANILIST = 0x2B2D42 + VNDB = 0x07111D # 0x948782 + MAL = 0x2E51A2 + CAT = 0x484FC + HELL = 0xEC2454 + SLATE_BLUE = 0x43408A + DEFAULT = 0x43408A + WARN = 0xFCC404 + ERROR = 0xA91B0D + GREEN = 0x568203 + ELECTRIC_BLUE = 0x7DF9FF + MINT = 0xBBF9F5 + PINK = 0xEF98CD + LILAC = 0xC8A2C8 + DAWN_PINK = 0xF4EAE9 + + +class ColourPalette(ColorPalette): + """Alias for :obj:`~ColorPalette`.""" + + +class AnilistBase: + def __init__(self, name: str, id_: int) -> None: + self.name = name + self.id = id_ + + @staticmethod + def parse_description(description: str) -> str: + """Parse Anilist descriptions into Discord friendly markdown + + Args: + description (str): The description to parse + + Returns: + str: The parsed description + """ + + description = ( + description.replace("
", "") + .replace("~!", "||") + .replace("!~", "||") + .replace("#", "") + ) + description = ( + description.replace("", "") + .replace("", "") + .replace("", "") + .replace("", "") + .replace("
", "") + ) + + if len(description) > 400: + description = description[0:400] + + # If the trimmed description has a missing spoiler tag, add one + if description.count("||") % 2: + description = description + "||" + + description = description + "..." + + return description + + +class ALCharacter(AnilistBase): + def __init__( + self, name: str, id_: int, session: aiohttp_client_cache.CachedSession + ) -> None: + self.url = f"https://anilist.co/character/{id_}" + self.session = session + super().__init__(name, id_) + + async def from_search(query_: str, session: aiohttp_client_cache.CachedSession): + # async with session.get() + # async with session.get() + + # self.session = session + # try: + query = """ + query ($search: String) { # Define which variables will be used in the query + Character (search: $search, sort: FAVOURITES_DESC) { # Add var. to the query + id + name { + full + } + } + } + """ + + variables = { + "search": query_ + # ,"sort": FAVOURITES_DESC + } + + response = await session.post( + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=3, + ) + if not response.ok: + return await response.json() + response = await response.json() + + response = response["data"]["Character"] + + title = response["name"]["full"] + id_ = response["id"] + + return cls(title, id_, session) + + @classmethod + async def from_id(cls, query_: int, session: aiohttp_client_cache.CachedSession): + # async with session.get() + + # self.session = session + # try: + query = """ + query ($id: Int) { # Define which variables will be used in the query + Character (id: $id, sort: FAVOURITES_DESC) { # Add var. to the query + id + name { + full + } + } + } + """ + + variables = { + "id": query_ + # ,"sort": FAVOURITES_DESC + } + + response = await session.post( + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=3, + ) + if not response.ok: + return await response.json() + response = await response.json() + + response = response["data"]["Character"] + + title = response["name"]["full"] + id_ = response["id"] + + return cls(title, id_, session) + + # except Exception as e: + # return e + + @classmethod + async def is_birthday(cls, session: aiohttp_client_cache.CachedSession): + # async with session.get() + + # self.session = session + # try: + query = """ + query ($var: Boolean) { # Define which variables will be used in the query + Character (isBirthday: $var, sort: FAVOURITES_DESC) { # Add var. to the query + id + name { + full + } + } + } + """ + + variables = { + "var": True + # ,"sort": FAVOURITES_DESC + } + + response = await session.post( + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=3, ) - ytapi2 = ytapi2.json() - return int( - isodate.parse_duration( - (ytapi2["items"][0]["contentDetails"]["duration"]) - ).total_seconds() + if not response.ok: + return await response.json() + response = await response.json() + + response = response["data"]["Character"] + + title = response["name"]["full"] + id_ = response["id"] + + return cls(title, id_, session) + + # @staticmethod + async def make_embed(self): + query = """ +query ($id: Int, $search: String) { # Define which variables will be used in the query + Character (id: $id, search: $search, sort: FAVOURITES_DESC) { # Add var. to the query + id + name { + full + } + image { + large + } + gender + dateOfBirth { + year + month + day + } + description (asHtml: false) + media (sort: TRENDING_DESC, perPage: 3) { + nodes { + title { + romaji + } + season + seasonYear + meanScore + seasonInt + episodes + chapters + source + popularity + tags { + name + } + } + } + favourites #β™₯ + siteUrl + } +} +""" + # await ctx.respond("In") + try: + variables = {} + + # if id_: + variables["id"] = self.id + + # elif character: + # variables["search"] = character + + # else: + # raise lb.NotEnoughArguments + + response = await self.session.post( + "https://graphql.anilist.co", + json={"query": query, "variables": variables}, + timeout=3, + ) + if not response.ok: + return hk.Embed( + title="ERROR FETCHING DATA", + color=ColorPalette.ERROR, + description=( + "Failed to fetch data 😡" + "\nTry typing the full name of the character." + ), + ) + # return + response = await response.json() + + response = response["data"]["Character"] + + title = response["name"]["full"] + + if response["dateOfBirth"]["month"] and response["dateOfBirth"]["day"]: + dob = f"{response['dateOfBirth']['day']}/{response['dateOfBirth']['month']}" + if response["dateOfBirth"]["year"]: + dob += f"/{response['dateOfBirth']['year']}" + else: + dob = "NA" + + if response["description"]: + # response["description"] = parse_description(response["description"])\ + response["description"] = self.parse_description( + response["description"] + ) + + else: + response["description"] = "NA" + + return ( + hk.Embed( + title=self.name, + url=self.url, + description="\n\n", + color=ColorPalette.ANILIST, + timestamp=datetime.now().astimezone(), + ) + .add_field("Gender", response["gender"]) + .add_field("DOB", dob, inline=True) + .add_field("Favourites", f"{response['favourites']}❀", inline=True) + .add_field("Character Description", response["description"]) + .set_thumbnail(response["image"]["large"]) + # .set_author(url=response["siteUrl"], name=title) + .set_footer( + text="Source: AniList", + icon="https://anilist.co/img/icons/android-chrome-512x512.png", + ) + ) + + except Exception as e: + return hk.Embed( + title="Failure", + color=ColorPalette.ERROR, + description=f"We encountered an error, `{e}`", + ) + + # async def make_pages(self) -> t.Sequence[hk.Embed, miru.View]: + # ... + + +class VNDBBase: + @staticmethod + def parse_vndb_desciption(description: str) -> str: + """Parse a VNDB description into a Discord friendly Markdown""" + description = ( + description.replace("[spoiler]", "||") + .replace("[/spoiler]", "||") + .replace("#", "") + .replace("[i]", "") + .replace("[b]", "") + .replace("[/b]", "") + .replace("[/i]", "") ) + pattern = r"\[url=(.*?)\](.*?)\[/url\]" + + # Replace BBCode links with Markdown links in the text + description = re.sub(pattern, replace_bbcode_with_markdown, description) + + if len(description) > 300: + description = description[0:300] + + if description.count("||") % 2: + description = description + "||" + + description = description + "..." + + return description + + @staticmethod + def _replace_bbc_with_markdown(match: re.Match) -> str: + """Make a markdown-link string from a re Match object""" + url = match.group(1) + link_text = match.group(2) + markdown_link = f"[{link_text}]({url})" + return markdown_link + + +class VNDBChara(VNDBBase): + ... + + +class ALAnime: + ... + + +class ALManga: + ... + + +class ALNovel: + ... + -# class Genshin: -# def __init__(self, character: str = None): -# ... +# # class Genshin: +# # def __init__(self, character: str = None): +# # ... diff --git a/functions/search_images.py b/functions/search_images.py new file mode 100644 index 0000000..275f24c --- /dev/null +++ b/functions/search_images.py @@ -0,0 +1,134 @@ +""" +This is the module to make 3x3s or find Google Image results +""" +import json +import re +import typing as t + +from aiohttp_client_cache import CachedSession +from bs4 import BeautifulSoup + + +def original_images(soup): + """Return the original res images from the scrapped ones""" + + google_images = [] + + all_script_tags = soup.select("script") + + matched_images_data = "".join( + re.findall(r"AF_initDataCallback\(([^<]+)\);", str(all_script_tags)) + ) + + matched_images_data_fix = json.dumps(matched_images_data) + matched_images_data_json = json.loads(matched_images_data_fix) + + matched_google_image_data = re.findall( + r"\"b-GRID_STATE0\"(.*)sideChannel:\s?{}}", matched_images_data_json + ) + + matched_google_image_thumbnails = ", ".join( + re.findall( + r"\[\"(https\:\/\/encrypted-tbn0\.gstatic\.com\/images\?.*?)\",\d+,\d+\]", + str(matched_google_image_data), + ) + ).split(", ") + + thumbnails = [ + bytes(bytes(thumbnail, "ascii").decode("unicode-escape"), "ascii").decode( + "unicode escape" + ) + for thumbnail in matched_google_image_thumbnails + ] + + removed_matched_google_images_thumbnails = re.sub( + r"\[\"(https\:\/\/encrypted-tbn0\.gstatic\.com\/images\?.*?)\",\d+,\d+\]", + "", + str(matched_google_image_data), + ) + + matched_google_full_resolution_images = re.findall( + r"(?:'|,),\[\"(https:|http.*?)\",\d+,\d+\]", + removed_matched_google_images_thumbnails, + ) + + full_res_images = [ + bytes(bytes(img, "ascii").decode("unicode-escape"), "ascii").decode( + "unicode-escape" + ) + for img in matched_google_full_resolution_images + ] + # print("Parsing shit") + # print(full_res_images[0:2]) + for metadata, thumbnail, original in zip( + soup.select(".isv-r.PNCib.MSM1fd.BUooTd"), thumbnails, full_res_images + ): + # start=1, + # ): + try: + google_images.append( + { + # "title": metadata.select_one(".VFACy.kGQAp.sMi44c.lNHeqe.WGvvNb")[ + # "title" + # ], + "link": metadata.select_one(".VFACy.kGQAp.sMi44c.lNHeqe.WGvvNb")[ + "href" + ], + "source": metadata.select_one(".fxgdke").text, + "thumbnail": thumbnail, + "original": original, + } + ) + except Exception: + print("Google is shit") + google_images.append( + { + "thumbnail": thumbnail, + "source": "Unknown", + "link": "Unknown", + "original": original, + } + ) + + # print(google_images) + return google_images + + +async def lookfor( + query: str, session: CachedSession, *, num: t.Optional[int] = 9, recent: str = None +) -> list: + """Return images and corresponding data of the search query + + Args: + query (str): The query to search for + num (int, optional): The number of images to search for. Defaults to 9. + + Returns: + list: The list of images alongwith thumbnail, source and link if possible + """ + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \ + AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36" + } + params = { + "q": query, # Query to search for + "tbm": "isch", # to display image search results + "h1": "en", # language for the query + "gl": "us", # country to fetch results from + "ijn": "0", + } + + if recent: + params["tbs"] = f"qdr:{recent}" + # req = requests.session() + async with session.get( + "https://www.google.com/search", params=params, headers=headers, timeout=30 + ) as html: + # html = req.get("https://www.google.com/search",params=params,headers=headers,timeout=30) + # print("Fetched g search") + soup = BeautifulSoup(await html.text(), "lxml") + return original_images(soup)[:num] + + +# import time diff --git a/functions/utils.py b/functions/utils.py index 81a5a73..d4498cc 100644 --- a/functions/utils.py +++ b/functions/utils.py @@ -1,15 +1,15 @@ -import datetime import json import random +from datetime import datetime, timedelta from urllib.parse import urlparse import feedparser +import hikari as hk import requests -from bs4 import BeautifulSoup -import typing as t # if t.TYPE_CHECKING: from aiohttp_client_cache import CachedSession +from bs4 import BeautifulSoup def check_if_url(link: str) -> bool: @@ -20,22 +20,6 @@ def check_if_url(link: str) -> bool: return False -# def is_image(link: str) -> int: -# """Tells if a function is an image or not - -# Args: -# link (str): The link to check - -# Returns: -# int: 0 if not, 1 if yes, 2 if yes but not PIL compatible (gif/webp) -# """ -# r = requests.head(link) -# if r.headers["content-type"] in ["image/png", "image/jpeg", "image/jpg"]: -# return 1 -# if r.headers["content-type"] in ["image/webp", "image/gif"]: -# return 2 -# return 0 - async def is_image(link: str, session: CachedSession) -> int: """Using headers check if a link is of an image or not @@ -46,11 +30,14 @@ async def is_image(link: str, session: CachedSession) -> int: Returns: int: 0 if not image, 1/2 if yes """ - async with session.head(link, timeout=2) as r: - if r.headers["content-type"] in ["image/png", "image/jpeg", "image/jpg"]: - return 1 - if r.headers["content-type"] in ["image/webp", "image/gif"]: - return 2 + try: + async with session.head(link, timeout=2) as r: + if r.headers["content-type"] in ["image/png", "image/jpeg", "image/jpg"]: + return 1 + if r.headers["content-type"] in ["image/webp", "image/gif"]: + return 2 + return 0 + except: return 0 @@ -109,7 +96,7 @@ def rss2json(url): return json.dumps(feedsdict) -def verbose_timedelta(delta): +def verbose_timedelta(delta: timedelta): d = delta.days h, s = divmod(delta.seconds, 3600) m, s = divmod(s, 60) @@ -127,13 +114,35 @@ def verbose_timedelta(delta): return ", ".join(dhms[start : end + 1]) +def verbose_date(*args): + month_num_map = { + 1: "January", + 2: "February", + 3: "March", + 4: "April", + 5: "May", + 6: "June", + 7: "July", + 8: "August", + 9: "September", + 10: "October", + 11: "November", + 12: "December", + } + + day, month, year = args + + verbose_date = " ".join([day, month_num_map[int(month)]]) + verbose_date += f", {year}" if year else "" + + return verbose_date + + def iso_to_timestamp(iso_date): """Convert ISO datetime to timestamp""" try: return int( - datetime.datetime.fromisoformat(iso_date[:-1] + "+00:00") - .astimezone() - .timestamp() + datetime.fromisoformat(iso_date[:-1] + "+00:00").astimezone().timestamp() ) except ValueError: # Incase the datetime is not in the iso format, return it as is @@ -157,6 +166,51 @@ async def tenor_link_from_gif(link: str, session: CachedSession): return link +def get_image_dominant_colour(link: str) -> hk.Color: + """Get the dominant colour of an image from a link to it + + Args: + link (str): Link to the image + + Returns: + hk.Color: Dominant Colour + """ + + return hk.Color.of( + get_dominant_colour(Image.open(requests.get(link, stream=True, timeout=10).raw)) + ) + + +def humanized_list_join(lst) -> str: + if not isinstance(lst, list) or isinstance(lst, tuple): + return lst + + if len(lst) == 0: + return " " + + if len(lst) == 1: + return lst[0] + + return f"{','.join(lst[:-1])}" f"or {lst[-1]}" + + +async def get_anitrendz_latest(session: CachedSession): + try: + link = "https://anitrendz.tumblr.com/" + headers = { + "User-Agent": "Mozilla/5.0 (compatible; Discordbot/2.0; +https://discordapp.com)", + } + + async with session.get(link, headers=headers, timeout=3) as response: + soup = BeautifulSoup(await response.read(), "lxml") + + return soup.find("a", {"class": "post_media_photo_anchor"})["data-big-photo"] + + except Exception as e: + print(e) + return link + + def get_random_quote(): return random.choice( [ @@ -165,6 +219,6 @@ def get_random_quote(): "Whenever you need me, I'll be there.", "I Hear The Voice Of Fate, Speaking My Name In Humble Supplication…", "There's Something In The Air… Something Tells Me A New Case Is Brewing.", - "Now this is what I call 'a moment of solitude.'" + "Now this is what I call 'a moment of solitude.'", ] ) diff --git a/functions/views.py b/functions/views.py index 9ae1628..bb5d3ff 100644 --- a/functions/views.py +++ b/functions/views.py @@ -11,6 +11,8 @@ class SelectView(miru.View): + """A subclassed view designed for Text Select""" + def __init__(self, user_id: hk.Snowflake, pages: t.Collection[hk.Embed]) -> None: self.user_id = user_id self.pages = pages @@ -30,11 +32,15 @@ async def view_check(self, ctx: miru.Context) -> bool: class PeristentViewTest(miru.View): + """A subclassed view designed to make persistent views(wip)""" + def __init__(self) -> None: super().__init__(autodefer=True, timeout=None) -class CustomNavi(nav.NavigatorView): +class AuthorNavi(nav.NavigatorView): + """A subclassed navigator view with author checks for the view""" + def __init__( self, *, @@ -73,15 +79,19 @@ async def on_timeout(self) -> None: # self.get_context(self.message).bot.d.chapter_info[self.message_id] = None -class CustomView(miru.View): +class AuthorView(miru.View): + """A subclassed view with author checks for the view""" + def __init__( self, *, autodefer: bool = True, timeout: t.Optional[t.Union[float, int, timedelta]] = 180.0, + session: t.Optional[aiohttp_client_cache.CachedSession] = None, user_id: hk.Snowflake = None, ) -> None: self.user_id = user_id + self.session = session super().__init__(autodefer=autodefer, timeout=timeout) async def on_timeout(self) -> None: @@ -101,6 +111,8 @@ async def view_check(self, ctx: miru.Context) -> bool: class PreView(nav.NavigatorView): + """A view designed for the preview feature of the manga command""" + def __init__( self, *, @@ -135,7 +147,7 @@ async def view_check(self, ctx: miru.Context) -> bool: return False -# class AutoPaginator(CustomNavi): +# class AutoPaginator(AuthorNavi): # def __init__( # self, # *, @@ -175,3 +187,14 @@ async def view_check(self, ctx: miru.Context) -> bool: # async def on_timeout(self) -> None: # await self.message.edit(components=[]) + + +class TabbedSwitcher(miru.View): + """A new view which will specialize in switching embeds via buttons""" + + def __init__( + self, + *, + pages, + ): + ... diff --git a/main.py b/main.py index 5610b80..07a45fb 100644 --- a/main.py +++ b/main.py @@ -1,30 +1,36 @@ +import asyncio import os import subprocess -import time check = "ps aux | grep bot.py | grep -v 'grep' | awk '{print $2}'" -while True: - try: - if os.path.exists("ded.txt"): - os.remove("ded.txt") - break - process = subprocess.Popen( - check, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - stdout, stderr = process.communicate() - if not int.from_bytes(stdout, byteorder="big"): - print("Starting bot.") - if not os.path.exists("logs"): - os.mkdir("logs") - if os.name == "nt": - os.system("nohup python -OO bot.py >> logs/output.log 2>&1 &") +async def main_bot_loop(): + while True: + try: + if os.path.exists("ded.txt"): + os.remove("ded.txt") + break + + process = subprocess.Popen( + check, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate() + if not int.from_bytes(stdout, byteorder="big"): + print("Starting bot.") + if not os.path.exists("logs"): + os.mkdir("logs") + if os.name == "nt": + os.system("nohup python -OO bot.py >> logs/output.log 2>&1 &") + else: + os.system("nohup python3 -OO bot.py >> logs/output.log 2>&1 &") + else: - os.system("nohup python3 -OO bot.py >> logs/output.log 2>&1 &") + # Bot is already up + await asyncio.sleep(10) + except Exception as e: + print(e) + - else: - # print("Bot is already up") - time.sleep(10) - except Exception as e: - print(e) +if __name__ == "__main__": + asyncio.run(main_bot_loop()) diff --git a/requirements.txt b/requirements.txt index 95c15f9..bda5941 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ hikari[speedups]>=2.0.0.dev120 hikari-lightbulb>=2.3.3 hikari-miru>=3.1.2 -uvloop python-dotenv>=0.21.0 @@ -9,11 +8,14 @@ aiohttp==3.8.3 plotly isodate==0.6.1 psutil -# pillow-simd -tabulate +Pillow +# tabulate feedparser kaleido==0.2.1 -requests-cache aiohttp-client-cache==0.8.1 beautifulsoup4 -lxml \ No newline at end of file +lxml +fuzzywuzzy[speedup] + +# Putting this at the end coz W*ndows doesn't need it +uvloop \ No newline at end of file