Skip to content

Commit

Permalink
V6.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
adator85 committed Nov 18, 2024
1 parent d37c152 commit f7664c9
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 178 deletions.
197 changes: 196 additions & 1 deletion core/classes/protocols/unreal6.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from re import match, findall
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING, Union
from ssl import SSLEOFError, SSLError
Expand Down Expand Up @@ -694,6 +694,105 @@ def on_part(self, serverMsg: list[str]) -> None:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")

def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':001', 'EOS']
server_msg_copy = serverMsg.copy()
hsid = str(server_msg_copy[0]).replace(':','')
if hsid == self.__Config.HSID:
if self.__Config.DEFENDER_INIT == 1:
current_version = self.__Config.CURRENT_VERSION
latest_version = self.__Config.LATEST_VERSION
if self.__Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'

print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
print(f"# PORT : {self.__Config.SERVEUR_PORT} ")
print(f"# SSL : {self.__Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.__Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")

self.__Base.logs.info(f"################### DEFENDER ###################")
self.__Base.logs.info(f"# SERVICE CONNECTE ")
self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Base.logs.info(f"# VERSION : {version} ")
self.__Base.logs.info(f"################################################")

if self.__Base.check_for_new_version(False):
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.__Config.SERVICE_CHANLOG
)

# Initialisation terminé aprés le premier PING
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[{self.__Config.COLORS.green}INFORMATION{self.__Config.COLORS.nogc}] >> Defender is ready",
channel=self.__Config.SERVICE_CHANLOG
)
self.__Config.DEFENDER_INIT = 0

# Send EOF to other modules
for classe_name, classe_object in self.__Irc.loaded_classes.items():
classe_object.cmd(server_msg_copy)

return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Key Error: {ie}")
except KeyError as ke:
self.__Base.logs.error(f"{__name__} - Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")

def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# :001 REPUTATION 127.0.0.1 118
server_msg_copy = serverMsg.copy()
self.__Irc.first_connexion_ip = server_msg_copy[2]
self.__Irc.first_score = 0

if str(server_msg_copy[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.__Irc.first_score = int(str(server_msg_copy[3]).replace('*',''))
for user in self.__Irc.User.UID_DB:
if user.remote_ip == self.__Irc.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.__Irc.first_score = int(server_msg_copy[3])

# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'Index Error {__name__}: {ie}')
except ValueError as ve:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")

def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
Expand Down Expand Up @@ -757,6 +856,102 @@ def on_uid(self, serverMsg: list[str]) -> None:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")

def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
try:
srv_msg = serverMsg.copy()

# Supprimer la premiere valeur
if srv_msg[0].startswith('@'):
srv_msg.pop(0)

cmd = srv_msg

# Hide auth logs
if len(cmd) == 7:
if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth':
data_copy = cmd.copy()
data_copy[6] = '**********'
self.__Base.logs.debug(f">> {data_copy}")
else:
self.__Base.logs.debug(f">> {cmd}")
else:
self.__Base.logs.debug(f">> {cmd}")

get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname)
dnickname = self.__Config.SERVICE_NICKNAME

pattern = fr'(:\{self.__Config.SERVICE_PREFIX})(.*)$'
hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .

if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.__Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} is not available")
self.send_notice(
nick_from=self.__Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
)
return None

cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)

fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.Is_Channel(cmd[2]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)

if cmd[2] == self.__Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = search(pattern, ' '.join(cmd))

if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()

# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.on_version(srv_msg)
return False

# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.on_time(srv_msg)
return False

# Réponse a un PING
if arg[0] == '\x01PING':
self.on_ping(srv_msg)
return False

if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False

cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)

fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.__Irc.Channel.Is_Channel(arg[1]) else None

self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
return None

except KeyError as ke:
self.__Base.logs.error(f"Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")

def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
Expand Down
Loading

0 comments on commit f7664c9

Please sign in to comment.