-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.py
115 lines (90 loc) · 3.24 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from config import *
from telethon import *
from asyncio import CancelledError
import logging
import threading
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import Column, PickleType, UnicodeText, distinct, func
BASE = declarative_base()
SESSION = start()
engine = create_engine("eigth.db")
BASE.metadata.bind = engine
BASE.metadata.create_all(engine)
class Jmthon_GlobalCollection(BASE):
__tablename__ = "jmthon_globalcollection"
keywoard = Column(UnicodeText, primary_key=True)
contents = Column(PickleType, primary_key=True, nullable=False)
def __init__(self, keywoard, contents):
self.keywoard = keywoard
self.contents = tuple(contents)
def __repr__(self):
return "<Jmthon Global Collection lists '%s' for %s>" % (
self.contents,
self.keywoard,
)
def __eq__(self, other):
return bool(
isinstance(other, Jmthon_GlobalCollection)
and self.keywoard == other.keywoard
and self.contents == other.contents
)
Jmthon_GlobalCollection.__table__.create(checkfirst=True)
JMTHON_GLOBALCOLLECTION = threading.RLock()
class COLLECTION_SQL:
def __init__(self):
self.CONTENTS_LIST = {}
COLLECTION_SQL_ = COLLECTION_SQL()
def add_to_collectionlist(keywoard, contents):
with JMTHON_GLOBALCOLLECTION:
keyword_items = Jmthon_GlobalCollection(keywoard, tuple(contents))
SESSION.merge(keyword_items)
SESSION.commit()
COLLECTION_SQL_.CONTENTS_LIST.setdefault(
keywoard, set()).add(tuple(contents))
def del_keyword_collectionlist(keywoard):
with JMTHON_GLOBALCOLLECTION:
keyword_items = (
SESSION.query(Jmthon_GlobalCollection.keywoard)
.filter(Jmthon_GlobalCollection.keywoard == keywoard)
.delete()
)
COLLECTION_SQL_.CONTENTS_LIST.pop(keywoard)
SESSION.commit()
def get_collectionlist_items():
try:
chats = SESSION.query(
Jmthon_GlobalCollection.keywoard).distinct().all()
return [i[0] for i in chats]
finally:
SESSION.close()
logging.basicConfig(
format="[%(levelname)s- %(asctime)s]- %(name)s- %(message)s",
level=logging.INFO,
datefmt="%H:%M:%S",
)
LOGS = logging.getLogger(__name__)
@eighthon.on(events.NewMessage(outgoing=True, pattern=r"\.تحديث"))
async def _(event):
sandy = await event.edit(
event,
"**❃ جارِ اعادة تشغيل السورس\nارسل** `.فحص` **او** `.الاوامر` **للتحقق مما إذ كان البوت شغال ، يستغرق الأمر في الواقع 1-2 دقيقة لإعادة التشغيل**",
)
try:
ulist = get_collectionlist_items()
for i in ulist:
if i == "restart_update":
del_keyword_collectionlist("restart_update")
except Exception as e:
LOGS.error(e)
try:
add_to_collectionlist("restart_update", [sandy.chat_id, sandy.id])
except Exception as e:
LOGS.error(e)
try:
await eighthon.disconnect()
except CancelledError:
pass
except Exception as e:
LOGS.error(e)