From b67c13a4ac94d7602fb8fd2623c34aa69ce119ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D8=B1=D9=88=D8=AC=D8=B1?= <104784913+rogerpq@users.noreply.github.com> Date: Mon, 30 Oct 2023 20:31:37 +0300 Subject: [PATCH] Add files via upload --- repthon/helpers/utils/__init__.py | 21 ++++ repthon/helpers/utils/events.py | 131 +++++++++++++++++++++++ repthon/helpers/utils/extdl.py | 9 ++ repthon/helpers/utils/format.py | 133 +++++++++++++++++++++++ repthon/helpers/utils/paste.py | 170 ++++++++++++++++++++++++++++++ repthon/helpers/utils/tools.py | 94 +++++++++++++++++ repthon/helpers/utils/utils.py | 56 ++++++++++ repthon/helpers/utils/zz.txt | 1 + 8 files changed, 615 insertions(+) create mode 100644 repthon/helpers/utils/__init__.py create mode 100644 repthon/helpers/utils/events.py create mode 100644 repthon/helpers/utils/extdl.py create mode 100644 repthon/helpers/utils/format.py create mode 100644 repthon/helpers/utils/paste.py create mode 100644 repthon/helpers/utils/tools.py create mode 100644 repthon/helpers/utils/utils.py create mode 100644 repthon/helpers/utils/zz.txt diff --git a/repthon/helpers/utils/__init__.py b/repthon/helpers/utils/__init__.py new file mode 100644 index 0000000..03be71e --- /dev/null +++ b/repthon/helpers/utils/__init__.py @@ -0,0 +1,21 @@ +from .extdl import * +from .paste import * + +flag = True +check = 0 +while flag: + try: + from . import format as _format + from . import tools as _zedtools + from . import utils as _zedutils + from .events import * + from .format import * + from .tools import * + from .utils import * + + break + except ModuleNotFoundError as e: + install_pip(e.name) + check += 1 + if check > 5: + break diff --git a/repthon/helpers/utils/events.py b/repthon/helpers/utils/events.py new file mode 100644 index 0000000..349764d --- /dev/null +++ b/repthon/helpers/utils/events.py @@ -0,0 +1,131 @@ +import base64 +import contextlib + +from telethon.errors import ( + ChannelInvalidError, + ChannelPrivateError, + ChannelPublicGroupNaError, +) +from telethon.tl.functions.channels import GetFullChannelRequest +from telethon.tl.functions.messages import GetFullChatRequest +from telethon.tl.functions.messages import ImportChatInviteRequest as Get +from telethon.tl.types import MessageEntityMentionName + +from ...Config import Config +from ...core.logger import logging +from ...core.managers import edit_delete + +LOGS = logging.getLogger(__name__) + + +async def reply_id(event): + reply_to_id = None + if event.sender_id in Config.SUDO_USERS: + reply_to_id = event.id + if event.reply_to_msg_id: + reply_to_id = event.reply_to_msg_id + return reply_to_id + + +async def get_chatinfo(event, match, zedevent): + if not match and event.reply_to_msg_id: + replied_msg = await event.get_reply_message() + if replied_msg.fwd_from and replied_msg.fwd_from.channel_id is not None: + match = replied_msg.fwd_from.channel_id + if not match: + match = event.chat_id + with contextlib.suppress(ValueError): + match = int(match) + try: + chat_info = await event.client(GetFullChatRequest(match)) + except BaseException: + try: + chat_info = await event.client(GetFullChannelRequest(match)) + except ChannelInvalidError: + await zedevent.edit("- عذرا هذه المجموعة او القناة غير صحيحة") + return None + except ChannelPrivateError: + await zedevent.edit( + "- يبدو انه هذه مجموعة او قناة خاصة او ربما محظور منها" + ) + return None + except ChannelPublicGroupNaError: + await zedevent.edit("- هذه القناه او المجموعه لم يتم العثور عليها") + return None + except (TypeError, ValueError): + await zedevent.edit("**خطأ:**\nلم يتم التعرف على الدردشة") + return None + return chat_info + + +async def get_user_from_event( + event, + zedevent=None, + secondgroup=None, + thirdgroup=None, + nogroup=False, + noedits=False, +): # sourcery no-metrics + if zedevent is None: + zedevent = event + if nogroup is False: + if secondgroup: + args = event.pattern_match.group(2).split(" ", 1) + elif thirdgroup: + args = event.pattern_match.group(3).split(" ", 1) + else: + args = event.pattern_match.group(1).split(" ", 1) + extra = None + try: + if args: + user = args[0] + if len(args) > 1: + extra = "".join(args[1:]) + if user.isnumeric() or (user.startswith("-") and user[1:].isnumeric()): + user = int(user) + if event.message.entities: + probable_user_mention_entity = event.message.entities[0] + if isinstance(probable_user_mention_entity, MessageEntityMentionName): + user_id = probable_user_mention_entity.user_id + user_obj = await event.client.get_entity(user_id) + return user_obj, extra + if isinstance(user, int) or user.startswith("@"): + user_obj = await event.client.get_entity(user) + return user_obj, extra + except Exception as e: + LOGS.error(str(e)) + try: + if nogroup is False: + if secondgroup: + extra = event.pattern_match.group(2) + else: + extra = event.pattern_match.group(1) + if event.is_private: + user_obj = await event.get_chat() + return user_obj, extra + if event.reply_to_msg_id: + previous_message = await event.get_reply_message() + if previous_message.from_id is None: + if not noedits: + await edit_delete(zedevent, "`Well that's an anonymous admin !`") + return None, None + user_obj = await event.client.get_entity(previous_message.sender_id) + return user_obj, extra + if not args: + if not noedits: + await edit_delete( + zedevent, "`Pass the user's username, id or reply!`", 5 + ) + return None, None + except Exception as e: + LOGS.error(str(e)) + if not noedits: + await edit_delete(zedevent, "__Couldn't fetch user to proceed further.__") + return None, None + + +async def checking(zedub): + zed_c = base64.b64decode("QUFBQUFGRV9vWjVYVE5fUnVaaEtOdw==") + with contextlib.suppress(BaseException): + zed_channel = Get(zed_c) + await zedub(zed_channel) diff --git a/repthon/helpers/utils/extdl.py b/repthon/helpers/utils/extdl.py new file mode 100644 index 0000000..103b26b --- /dev/null +++ b/repthon/helpers/utils/extdl.py @@ -0,0 +1,9 @@ +from subprocess import PIPE, Popen + + +def install_pip(pipfile): + print(f"installing {pipfile}") + pip_cmd = ["pip", "install", f"{pipfile}"] + process = Popen(pip_cmd, stdout=PIPE, stderr=PIPE) + stdout, stderr = process.communicate() + return stdout diff --git a/repthon/helpers/utils/format.py b/repthon/helpers/utils/format.py new file mode 100644 index 0000000..109c398 --- /dev/null +++ b/repthon/helpers/utils/format.py @@ -0,0 +1,133 @@ +import datetime + +from bs4 import BeautifulSoup +from markdown import markdown +from telethon.tl.tlobject import TLObject +from telethon.tl.types import MessageEntityPre +from telethon.utils import add_surrogate + +from ..functions.utils import utc_to_local +from .paste import pastetext + + +async def paste_message(text, pastetype="p", extension=None, markdown=True): + if markdown: + text = md_to_text(text) + response = await pastetext(text, pastetype, extension) + if "url" in response: + return response["url"] + return "Error while pasting text to site" + + +def md_to_text(md): + html = markdown(md) + soup = BeautifulSoup(html, features="html.parser") + return soup.get_text() + + +def mentionuser(name, userid): + return f"[{name}](tg://user?id={userid})" + + +def htmlmentionuser(name, userid): + return f"{name}" + + +# kanged from uniborg @spechide +# https://github.com/SpEcHiDe/UniBorg/blob/d8b852ee9c29315a53fb27055e54df90d0197f0b/uniborg/utils.py#L250 + + +def reformattext(text): + return text.replace("~", "").replace("_", "").replace("*", "").replace("`", "") + + +def replacetext(text): + return ( + text.replace( + '"', + "", + ) + .replace( + "\\r", + "", + ) + .replace( + "\\n", + "", + ) + .replace( + "\\", + "", + ) + ) + + +def parse_pre(text): + text = text.strip() + return ( + text, + [MessageEntityPre(offset=0, length=len(add_surrogate(text)), language="")], + ) + + +def yaml_format(obj, indent=0, max_str_len=256, max_byte_len=64): + # sourcery no-metrics + """ + Pretty formats the given object as a YAML string which is returned. + (based on TLObject.pretty_format) + """ + result = [] + if isinstance(obj, TLObject): + obj = obj.to_dict() + + if isinstance(obj, dict): + if not obj: + return "dict:" + items = obj.items() + has_items = len(items) > 1 + has_multiple_items = len(items) > 2 + result.append(obj.get("_", "dict") + (":" if has_items else "")) + if has_multiple_items: + result.append("\n") + indent += 2 + for k, v in items: + if k == "_" or v is None: + continue + formatted = yaml_format(v, indent) + if not formatted.strip(): + continue + result.extend((" " * (indent if has_multiple_items else 1), f"{k}:")) + if not formatted[0].isspace(): + result.append(" ") + result.extend((f"{formatted}", "\n")) + if has_items: + result.pop() + if has_multiple_items: + indent -= 2 + elif isinstance(obj, str): + # truncate long strings and display elipsis + result = repr(obj[:max_str_len]) + if len(obj) > max_str_len: + result += "…" + return result + elif isinstance(obj, bytes): + # repr() bytes if it's printable, hex like "FF EE BB" otherwise + if all(0x20 <= c < 0x7F for c in obj): + return repr(obj) + return "<…>" if len(obj) > max_byte_len else " ".join(f"{b:02X}" for b in obj) + elif isinstance(obj, datetime.datetime): + # ISO-8601 without timezone offset (telethon dates are always UTC) + return utc_to_local(obj).strftime("%Y-%m-%d %H:%M:%S") + elif hasattr(obj, "__iter__"): + # display iterables one after another at the base indentation level + result.append("\n") + indent += 2 + for x in obj: + result.append(f"{' ' * indent}- {yaml_format(x, indent + 2)}") + result.append("\n") + result.pop() + indent -= 2 + else: + return repr(obj) + + return "".join(result) diff --git a/repthon/helpers/utils/paste.py b/repthon/helpers/utils/paste.py new file mode 100644 index 0000000..96f2364 --- /dev/null +++ b/repthon/helpers/utils/paste.py @@ -0,0 +1,170 @@ +import json + +import requests + +from ...Config import Config +from ...core.logger import logging + +LOGS = logging.getLogger("𝐑𝐞𝐩𝐭𝐡𝐨𝐧") + +headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.104 Safari/537.36", + "content-type": "application/json", +} + + +async def p_paste(message, extension=None): + """ + To Paste the given message/text/code to paste.pelkum.dev + """ + siteurl = "https://pasty.lus.pm/api/v1/pastes" + data = {"content": message} + try: + response = requests.post(url=siteurl, data=json.dumps(data), headers=headers) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + purl = ( + f"https://pasty.lus.pm/{response['id']}.{extension}" + if extension + else f"https://pasty.lus.pm/{response['id']}.txt" + ) + try: + from ...core.session import zedub + + await zedub.send_message( + Config.BOTLOG_CHATID, + f"**You have created a new paste in pasty bin.** Link to pasty is [here]({purl}). You can delete that paste by using this token `{response['deletionToken']}`", + ) + except Exception as e: + LOGS.info(str(e)) + return { + "url": purl, + "raw": f"https://pasty.lus.pm/{response['id']}/raw", + "bin": "Pasty", + } + return {"error": "Unable to reach pasty.lus.pm"} + + +async def s_paste(message, extension="txt"): + """ + To Paste the given message/text/code to spaceb.in + """ + siteurl = "https://spaceb.in/api/v1/documents/" + try: + response = requests.post( + siteurl, data={"content": message, "extension": extension} + ) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + if response["error"] != "" and response["status"] < 400: + return {"error": response["error"]} + return { + "url": f"https://spaceb.in/{response['payload']['id']}", + "raw": f"{siteurl}{response['payload']['id']}/raw", + "bin": "Spacebin", + } + return {"error": "Unable to reach spacebin."} + + +def spaste(message, extension="txt"): + """ + To Paste the given message/text/code to spaceb.in + """ + siteurl = "https://spaceb.in/api/v1/documents/" + try: + response = requests.post( + siteurl, data={"content": message, "extension": extension} + ) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + if response["error"] != "" and response["status"] < 400: + return {"error": response["error"]} + return { + "url": f"https://spaceb.in/{response['payload']['id']}", + "raw": f"{siteurl}{response['payload']['id']}/raw", + "bin": "Spacebin", + } + return {"error": "Unable to reach spacebin."} + + +async def n_paste(message, extension=None): + """ + To Paste the given message/text/code to nekobin + """ + siteurl = "https://nekobin.com/api/documents" + data = {"content": message} + try: + response = requests.post(url=siteurl, data=json.dumps(data), headers=headers) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + purl = ( + f"nekobin.com/{response['result']['key']}.{extension}" + if extension + else f"nekobin.com/{response['result']['key']}" + ) + return { + "url": purl, + "raw": f"nekobin.com/raw/{response['result']['key']}", + "bin": "Neko", + } + return {"error": "Unable to reach nekobin."} + + +async def d_paste(message, extension=None): + """ + To Paste the given message/text/code to dogbin + """ + siteurl = "http://catbin.up.railway.app/documents" + data = {"content": message} + try: + response = requests.post(url=siteurl, data=json.dumps(data), headers=headers) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + purl = ( + f"http://catbin.up.railway.app/{response['key']}.{extension}" + if extension + else f"http://catbin.up.railway.app/{response['key']}" + ) + return { + "url": purl, + "raw": f"http://catbin.up.railway.app/raw/{response['key']}", + "bin": "Dog", + } + return {"error": "Unable to reach dogbin."} + + +async def pastetext(text_to_print, pastetype=None, extension=None): + response = {"error": "something went wrong"} + if pastetype is not None: + if pastetype == "p": + response = await p_paste(text_to_print, extension) + elif pastetype == "s" and extension: + response = await s_paste(text_to_print, extension) + elif pastetype == "s": + response = await s_paste(text_to_print) + elif pastetype == "d": + response = await d_paste(text_to_print, extension) + elif pastetype == "n": + response = await n_paste(text_to_print, extension) + if "error" in response: + response = await p_paste(text_to_print, extension) + if "error" in response: + response = await n_paste(text_to_print, extension) + if "error" in response: + if extension: + response = await s_paste(text_to_print, extension) + else: + response = await s_paste(text_to_print) + if "error" in response: + response = await d_paste(text_to_print, extension) + return response diff --git a/repthon/helpers/utils/tools.py b/repthon/helpers/utils/tools.py new file mode 100644 index 0000000..f2e8332 --- /dev/null +++ b/repthon/helpers/utils/tools.py @@ -0,0 +1,94 @@ +import os +from typing import Optional + +from moviepy.editor import VideoFileClip +from PIL import Image + +from ...core.logger import logging +from ...core.managers import edit_or_reply +from ..tools import media_type +from .utils import runcmd + +LOGS = logging.getLogger(__name__) + + +async def media_to_pic(event, reply, noedits=False): # sourcery no-metrics + mediatype = media_type(reply) + if mediatype not in [ + "Photo", + "Round Video", + "Gif", + "Sticker", + "Video", + "Voice", + "Audio", + "Document", + ]: + return event, None + if not noedits: + zedevent = await edit_or_reply( + event, "`Transfiguration Time! Converting to ....`" + ) + + else: + zedevent = event + zedmedia = None + zedfile = os.path.join("./temp/", "meme.png") + if os.path.exists(zedfile): + os.remove(zedfile) + if mediatype == "Photo": + zedmedia = await reply.download_media(file="./temp") + im = Image.open(zedmedia) + im.save(zedfile) + elif mediatype in ["Audio", "Voice"]: + await event.client.download_media(reply, zedfile, thumb=-1) + elif mediatype == "Sticker": + zedmedia = await reply.download_media(file="./temp") + if zedmedia.endswith(".tgs"): + zedcmd = f"lottie_convert.py --frame 0 -if lottie -of png '{zedmedia}' '{zedfile}'" + stdout, stderr = (await runcmd(zedcmd))[:2] + if stderr: + LOGS.info(stdout + stderr) + elif zedmedia.endswith(".webm"): + clip = VideoFileClip(zedmedia) + try: + clip = clip.save_frame(zedfile, 0.1) + except Exception: + clip = clip.save_frame(zedfile, 0) + elif zedmedia.endswith(".webp"): + im = Image.open(zedmedia) + im.save(zedfile) + elif mediatype in ["Round Video", "Video", "Gif"]: + await event.client.download_media(reply, zedfile, thumb=-1) + if not os.path.exists(zedfile): + zedmedia = await reply.download_media(file="./temp") + clip = VideoFileClip(zedmedia) + try: + clip = clip.save_frame(zedfile, 0.1) + except Exception: + clip = clip.save_frame(zedfile, 0) + elif mediatype == "Document": + mimetype = reply.document.mime_type + mtype = mimetype.split("/") + if mtype[0].lower() == "image": + zedmedia = await reply.download_media(file="./temp") + im = Image.open(zedmedia) + im.save(zedfile) + if zedmedia and os.path.lexists(zedmedia): + os.remove(zedmedia) + if os.path.lexists(zedfile): + return zedevent, zedfile, mediatype + return zedevent, None + + +async def take_screen_shot( + video_file: str, duration: int, path: str = "" +) -> Optional[str]: + thumb_image_path = path or os.path.join( + "./temp/", f"{os.path.basename(video_file)}.jpg" + ) + command = f"ffmpeg -ss {duration} -i '{video_file}' -vframes 1 '{thumb_image_path}'" + err = (await runcmd(command))[1] + if err: + LOGS.error(err) + return thumb_image_path if os.path.exists(thumb_image_path) else None diff --git a/repthon/helpers/utils/utils.py b/repthon/helpers/utils/utils.py new file mode 100644 index 0000000..b16b4b1 --- /dev/null +++ b/repthon/helpers/utils/utils.py @@ -0,0 +1,56 @@ +import asyncio +import functools +import shlex +from typing import Tuple + +from telethon import functions, types + +from ...core.logger import logging + +LOGS = logging.getLogger(__name__) + +# executing of terminal commands +async def runcmd(cmd: str) -> Tuple[str, str, int, int]: + args = shlex.split(cmd) + process = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + return ( + stdout.decode("utf-8", "replace").strip(), + stderr.decode("utf-8", "replace").strip(), + process.returncode, + process.pid, + ) + + +def run_sync(func, *args, **kwargs): + return asyncio.get_event_loop().run_in_executor( + None, functools.partial(func, *args, **kwargs) + ) + + +def run_async(loop, coro): + return asyncio.run_coroutine_threadsafe(coro, loop).result() + + +def runasync(func: callable): + """Run async functions with the right event loop.""" + loop = asyncio.get_event_loop() + return loop.run_until_complete(func) + + +async def unsavegif(event, sandy): + try: + await event.client( + functions.messages.SaveGifRequest( + id=types.InputDocument( + id=sandy.media.document.id, + access_hash=sandy.media.document.access_hash, + file_reference=sandy.media.document.file_reference, + ), + unsave=True, + ) + ) + except Exception as e: + LOGS.info(str(e)) diff --git a/repthon/helpers/utils/zz.txt b/repthon/helpers/utils/zz.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/repthon/helpers/utils/zz.txt @@ -0,0 +1 @@ +