-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
615 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from .extdl import * | ||
from .paste import * | ||
|
||
flag = True | ||
check = 0 | ||
while flag: | ||
try: | ||
from . import format as _format | ||
from . import tools as _zedtools | ||
from . import utils as _zedutils | ||
from .events import * | ||
from .format import * | ||
from .tools import * | ||
from .utils import * | ||
|
||
break | ||
except ModuleNotFoundError as e: | ||
install_pip(e.name) | ||
check += 1 | ||
if check > 5: | ||
break |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import base64 | ||
import contextlib | ||
|
||
from telethon.errors import ( | ||
ChannelInvalidError, | ||
ChannelPrivateError, | ||
ChannelPublicGroupNaError, | ||
) | ||
from telethon.tl.functions.channels import GetFullChannelRequest | ||
from telethon.tl.functions.messages import GetFullChatRequest | ||
from telethon.tl.functions.messages import ImportChatInviteRequest as Get | ||
from telethon.tl.types import MessageEntityMentionName | ||
|
||
from ...Config import Config | ||
from ...core.logger import logging | ||
from ...core.managers import edit_delete | ||
|
||
LOGS = logging.getLogger(__name__) | ||
|
||
|
||
async def reply_id(event): | ||
reply_to_id = None | ||
if event.sender_id in Config.SUDO_USERS: | ||
reply_to_id = event.id | ||
if event.reply_to_msg_id: | ||
reply_to_id = event.reply_to_msg_id | ||
return reply_to_id | ||
|
||
|
||
async def get_chatinfo(event, match, zedevent): | ||
if not match and event.reply_to_msg_id: | ||
replied_msg = await event.get_reply_message() | ||
if replied_msg.fwd_from and replied_msg.fwd_from.channel_id is not None: | ||
match = replied_msg.fwd_from.channel_id | ||
if not match: | ||
match = event.chat_id | ||
with contextlib.suppress(ValueError): | ||
match = int(match) | ||
try: | ||
chat_info = await event.client(GetFullChatRequest(match)) | ||
except BaseException: | ||
try: | ||
chat_info = await event.client(GetFullChannelRequest(match)) | ||
except ChannelInvalidError: | ||
await zedevent.edit("- عذرا هذه المجموعة او القناة غير صحيحة") | ||
return None | ||
except ChannelPrivateError: | ||
await zedevent.edit( | ||
"- يبدو انه هذه مجموعة او قناة خاصة او ربما محظور منها" | ||
) | ||
return None | ||
except ChannelPublicGroupNaError: | ||
await zedevent.edit("- هذه القناه او المجموعه لم يتم العثور عليها") | ||
return None | ||
except (TypeError, ValueError): | ||
await zedevent.edit("**خطأ:**\nلم يتم التعرف على الدردشة") | ||
return None | ||
return chat_info | ||
|
||
|
||
async def get_user_from_event( | ||
event, | ||
zedevent=None, | ||
secondgroup=None, | ||
thirdgroup=None, | ||
nogroup=False, | ||
noedits=False, | ||
): # sourcery no-metrics | ||
if zedevent is None: | ||
zedevent = event | ||
if nogroup is False: | ||
if secondgroup: | ||
args = event.pattern_match.group(2).split(" ", 1) | ||
elif thirdgroup: | ||
args = event.pattern_match.group(3).split(" ", 1) | ||
else: | ||
args = event.pattern_match.group(1).split(" ", 1) | ||
extra = None | ||
try: | ||
if args: | ||
user = args[0] | ||
if len(args) > 1: | ||
extra = "".join(args[1:]) | ||
if user.isnumeric() or (user.startswith("-") and user[1:].isnumeric()): | ||
user = int(user) | ||
if event.message.entities: | ||
probable_user_mention_entity = event.message.entities[0] | ||
if isinstance(probable_user_mention_entity, MessageEntityMentionName): | ||
user_id = probable_user_mention_entity.user_id | ||
user_obj = await event.client.get_entity(user_id) | ||
return user_obj, extra | ||
if isinstance(user, int) or user.startswith("@"): | ||
user_obj = await event.client.get_entity(user) | ||
return user_obj, extra | ||
except Exception as e: | ||
LOGS.error(str(e)) | ||
try: | ||
if nogroup is False: | ||
if secondgroup: | ||
extra = event.pattern_match.group(2) | ||
else: | ||
extra = event.pattern_match.group(1) | ||
if event.is_private: | ||
user_obj = await event.get_chat() | ||
return user_obj, extra | ||
if event.reply_to_msg_id: | ||
previous_message = await event.get_reply_message() | ||
if previous_message.from_id is None: | ||
if not noedits: | ||
await edit_delete(zedevent, "`Well that's an anonymous admin !`") | ||
return None, None | ||
user_obj = await event.client.get_entity(previous_message.sender_id) | ||
return user_obj, extra | ||
if not args: | ||
if not noedits: | ||
await edit_delete( | ||
zedevent, "`Pass the user's username, id or reply!`", 5 | ||
) | ||
return None, None | ||
except Exception as e: | ||
LOGS.error(str(e)) | ||
if not noedits: | ||
await edit_delete(zedevent, "__Couldn't fetch user to proceed further.__") | ||
return None, None | ||
|
||
|
||
async def checking(zedub): | ||
zed_c = base64.b64decode("QUFBQUFGRV9vWjVYVE5fUnVaaEtOdw==") | ||
with contextlib.suppress(BaseException): | ||
zed_channel = Get(zed_c) | ||
await zedub(zed_channel) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from subprocess import PIPE, Popen | ||
|
||
|
||
def install_pip(pipfile): | ||
print(f"installing {pipfile}") | ||
pip_cmd = ["pip", "install", f"{pipfile}"] | ||
process = Popen(pip_cmd, stdout=PIPE, stderr=PIPE) | ||
stdout, stderr = process.communicate() | ||
return stdout |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import datetime | ||
|
||
from bs4 import BeautifulSoup | ||
from markdown import markdown | ||
from telethon.tl.tlobject import TLObject | ||
from telethon.tl.types import MessageEntityPre | ||
from telethon.utils import add_surrogate | ||
|
||
from ..functions.utils import utc_to_local | ||
from .paste import pastetext | ||
|
||
|
||
async def paste_message(text, pastetype="p", extension=None, markdown=True): | ||
if markdown: | ||
text = md_to_text(text) | ||
response = await pastetext(text, pastetype, extension) | ||
if "url" in response: | ||
return response["url"] | ||
return "Error while pasting text to site" | ||
|
||
|
||
def md_to_text(md): | ||
html = markdown(md) | ||
soup = BeautifulSoup(html, features="html.parser") | ||
return soup.get_text() | ||
|
||
|
||
def mentionuser(name, userid): | ||
return f"[{name}](tg://user?id={userid})" | ||
|
||
|
||
def htmlmentionuser(name, userid): | ||
return f"<a href='tg://user?id={userid}'>{name}</a>" | ||
|
||
|
||
# kanged from uniborg @spechide | ||
# https://github.com/SpEcHiDe/UniBorg/blob/d8b852ee9c29315a53fb27055e54df90d0197f0b/uniborg/utils.py#L250 | ||
|
||
|
||
def reformattext(text): | ||
return text.replace("~", "").replace("_", "").replace("*", "").replace("`", "") | ||
|
||
|
||
def replacetext(text): | ||
return ( | ||
text.replace( | ||
'"', | ||
"", | ||
) | ||
.replace( | ||
"\\r", | ||
"", | ||
) | ||
.replace( | ||
"\\n", | ||
"", | ||
) | ||
.replace( | ||
"\\", | ||
"", | ||
) | ||
) | ||
|
||
|
||
def parse_pre(text): | ||
text = text.strip() | ||
return ( | ||
text, | ||
[MessageEntityPre(offset=0, length=len(add_surrogate(text)), language="")], | ||
) | ||
|
||
|
||
def yaml_format(obj, indent=0, max_str_len=256, max_byte_len=64): | ||
# sourcery no-metrics | ||
""" | ||
Pretty formats the given object as a YAML string which is returned. | ||
(based on TLObject.pretty_format) | ||
""" | ||
result = [] | ||
if isinstance(obj, TLObject): | ||
obj = obj.to_dict() | ||
|
||
if isinstance(obj, dict): | ||
if not obj: | ||
return "dict:" | ||
items = obj.items() | ||
has_items = len(items) > 1 | ||
has_multiple_items = len(items) > 2 | ||
result.append(obj.get("_", "dict") + (":" if has_items else "")) | ||
if has_multiple_items: | ||
result.append("\n") | ||
indent += 2 | ||
for k, v in items: | ||
if k == "_" or v is None: | ||
continue | ||
formatted = yaml_format(v, indent) | ||
if not formatted.strip(): | ||
continue | ||
result.extend((" " * (indent if has_multiple_items else 1), f"{k}:")) | ||
if not formatted[0].isspace(): | ||
result.append(" ") | ||
result.extend((f"{formatted}", "\n")) | ||
if has_items: | ||
result.pop() | ||
if has_multiple_items: | ||
indent -= 2 | ||
elif isinstance(obj, str): | ||
# truncate long strings and display elipsis | ||
result = repr(obj[:max_str_len]) | ||
if len(obj) > max_str_len: | ||
result += "…" | ||
return result | ||
elif isinstance(obj, bytes): | ||
# repr() bytes if it's printable, hex like "FF EE BB" otherwise | ||
if all(0x20 <= c < 0x7F for c in obj): | ||
return repr(obj) | ||
return "<…>" if len(obj) > max_byte_len else " ".join(f"{b:02X}" for b in obj) | ||
elif isinstance(obj, datetime.datetime): | ||
# ISO-8601 without timezone offset (telethon dates are always UTC) | ||
return utc_to_local(obj).strftime("%Y-%m-%d %H:%M:%S") | ||
elif hasattr(obj, "__iter__"): | ||
# display iterables one after another at the base indentation level | ||
result.append("\n") | ||
indent += 2 | ||
for x in obj: | ||
result.append(f"{' ' * indent}- {yaml_format(x, indent + 2)}") | ||
result.append("\n") | ||
result.pop() | ||
indent -= 2 | ||
else: | ||
return repr(obj) | ||
|
||
return "".join(result) |
Oops, something went wrong.