Skip to content

Commit

Permalink
Merge branch 'remote-domain-list'
Browse files Browse the repository at this point in the history
  • Loading branch information
ThioJoe committed Dec 31, 2021
2 parents 8fb88bd + 777a59e commit 709eae8
Showing 1 changed file with 144 additions and 35 deletions.
179 changes: 144 additions & 35 deletions YouTubeSpammerPurge.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from pkg_resources import parse_version
import unicodedata
import hashlib
from itertools import islice

# Non Standard Modules
import rtfunicode
Expand Down Expand Up @@ -555,18 +556,19 @@ def add_spam(commentID, videoID):
redAdEmojiSet = smartFilter['redAdEmojiSet']
yellowAdEmojiSet = smartFilter['yellowAdEmojiSet']
hrtSet = smartFilter['hrtSet']
domainRegex = smartFilter['domainRegex']
compiledRegexDict = smartFilter['compiledRegexDict']
languages = smartFilter['languages']
sensitive = smartFilter['sensitive']
rootDomainRegex = smartFilter['rootDomainRegex']
spamDomainRegex = smartFilter['spamDomainRegex']
compiledRegexDict = smartFilter['compiledRegexDict']

if debugSingleComment == True:
if input("Sensitive True/False: ").lower() == 'true': sensitive = True
else: sensitive = False

# Check for sensitive smart mode
if sensitive == True:
domainRegex = smartFilter['sensitiveDomainRegex']
rootDomainRegex = smartFilter['sensitiveRootDomainRegex']

# Processed Variables
combinedString = authorChannelName + commentText
Expand Down Expand Up @@ -615,6 +617,8 @@ def remove_unicode_categories(string):
add_spam(commentID, videoID)
elif any(findOnlyObfuscated(expression[1], expression[0], authorChannelName) for expression in compiledRegexDict['usernameObfuBlackWords']):
add_spam(commentID, videoID)
elif any(re.search(expression, combinedString) for expression in spamDomainRegex):
add_spam(commentID, videoID)
elif sensitive == True and re.search(smartFilter['usernameConfuseRegex'], authorChannelName):
add_spam(commentID, videoID)
elif sensitive == False and findOnlyObfuscated(smartFilter['usernameConfuseRegex'], miscData['channelOwnerName'], authorChannelName):
Expand Down Expand Up @@ -658,7 +662,7 @@ def remove_unicode_categories(string):
if languageCount >= 2:
yellowCount += 1

if re.search(domainRegex, combinedString.lower()):
if re.search(rootDomainRegex, combinedString.lower()):
yellowCount += 1

# Red Tests
Expand Down Expand Up @@ -1367,7 +1371,8 @@ def safety_check_username_against_filter(currentUserName, scanMode, filterCharsS
proceed = True

return proceed



############################# Check For Update ##############################
def check_for_update(currentVersion, silentCheck=False):
isUpdateAvailable = False
Expand Down Expand Up @@ -1530,22 +1535,105 @@ def check_for_update(currentVersion, silentCheck=False):
elif silentCheck == True:
return isUpdateAvailable

######################### Try To Get Remote File ##########################
def getRemoteFile(url, stream, silent=False, headers=None):
try:
if stream == False:
response = requests.get(url, headers=headers)
elif stream == True:
response = requests.get(url, headers=headers, stream=True)
if response.status_code != 200:
if silent == False:
print("Error fetching remote file or resource: " + url)
print("Response Code: " + str(response.status_code))
else:
return response

except Exception as e:
if silent == False:
print(e + "\n")
print(f"{B.RED}{F.WHITE} Error {S.R} While Fetching Remote File or Resource: " + url)
print("See above messages for details.\n")
print("If this keeps happening, you may want to report the issue here: https://github.com/ThioJoe/YT-Spammer-Purge/issues")
return None

########################### Check Lists Updates ###########################
def check_lists_update(currentListVersion, silentCheck = False):
isListUpdateAvailable = False
spamDomainListLatestCommit = 'https://api.github.com/repos/ThioJoe/YT-Spam-Domains-List/commits?path=SpamDomainsList.txt&page=1&per_page=1'
spamDomainListRawLink = 'https://raw.githubusercontent.com/ThioJoe/YT-Spam-Domains-List/main/SpamDomainsList.txt'
#otherlink = 'https://api.github.com/repos/ThioJoe/YT-Spam-Domains-List/contents/SpamDomainsList.txt'
#spamDomainHostedLocation = "https://cdn.jsdelivr.net/gh/thiojoe/YT-Spam-Domains-List/SpamDomainsList.txt"

response = getRemoteFile(spamDomainListLatestCommit, silentCheck)
if response != None:
pass
elif silentCheck == True:
return isListUpdateAvailable
else:
input("Press enter to Exit...")
sys.exit()

listUpdateDateTime = response.json()[0]['commit']['committer']['date']
date = datetime.strptime(listUpdateDateTime, '%Y-%m-%dT%H:%M:%SZ')
latestVersion = date.strftime('%Y.%m.%d')

if currentListVersion == None or (parse_version(latestVersion) > parse_version(currentListVersion)):
downloadFilePath = "spam_lists/SpamDomainsList.txt"
filedownload = getRemoteFile(spamDomainListRawLink, headers={'Accept-Encoding': 'identity'}, stream=True) # These headers required to get correct file size
total_size_in_bytes= int(filedownload.headers.get('content-length', 0))
block_size = 1048576 #1 MiB in bytes

with open(downloadFilePath, 'wb') as file:
for data in filedownload.iter_content(block_size):
file.write(data)

if os.stat(downloadFilePath).st_size == total_size_in_bytes:
pass
elif total_size_in_bytes != 0 and os.stat(downloadFilePath).st_size != total_size_in_bytes:
os.remove(downloadFilePath)
print(f"\n> {F.RED} File did not fully download. Please try again later.") # Add back!

############################# Ingest Other Files ##############################
def ingest_domain_file():
def ingest_asset_file(fileName):
def assetFilesPath(relative_path):
if hasattr(sys, '_MEIPASS'): # If running as a pyinstaller bundle
return os.path.join(sys._MEIPASS, relative_path)
return os.path.join(os.path.abspath("assets"), relative_path) # If running as script, specifies resource folder as /assets

# Open list of root zone domain extensions
with open(assetFilesPath("rootZoneDomainList.txt"), 'r', encoding="utf-8") as domainFile:
rootZoneData = domainFile.readlines()
rootZoneList = []
for line in rootZoneData:
line = line.strip()
rootZoneList.append(line.lower())

return rootZoneList
with open(assetFilesPath(fileName), 'r', encoding="utf-8") as file:
data = file.readlines()
dataList = []
for line in data:
if not line.startswith('#'):
line = line.strip()
dataList.append(line.lower())
return dataList

def ingest_list_file(relativeFilePath):
if os.path.exists(relativeFilePath):
with open(relativeFilePath, 'r', encoding="utf-8") as listFile:
listData = listFile.readlines()
processedList = []
for line in listData:
if not line.startswith('#'):
line = line.strip()
processedList.append(line.lower())
return processedList

def get_list_file_version(relativeFilePath):
if os.path.exists(relativeFilePath):
matchBetweenBrackets = '(?<=\[)(.*?)(?=\])' # Matches text between first set of two square brackets
with open(relativeFilePath, 'r', encoding="utf-8") as file:
for line in islice(file, 0, 2):
try:
listVersion = str(re.search(matchBetweenBrackets, line).group(0))
except AttributeError:
pass
return listVersion
else:
return None

############################# CONFIG FILE FUNCTIONS ##############################
def create_config_file():
Expand Down Expand Up @@ -2014,7 +2102,8 @@ def prepare_filter_mode_non_ascii(currentUser, scanMode, config):
# Auto smart mode
def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive=False):
currentUserName = currentUser[1]
domainList = miscData['domainList']
rootDomainList = miscData['rootDomainList']
spamDomainList = miscData['spamDomainList']
utf_16 = "utf-8"
if config and config['filter_mode'] == "autosmart":
pass
Expand Down Expand Up @@ -2094,7 +2183,8 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive
'yellowAdWords': [],
'exactRedAdWords': [],
'usernameRedWords': [],
'textObfuBlackWords': []
'textObfuBlackWords': [],
'usernameObfuBlackWords': [],
}
# Compile regex with upper case, otherwise many false positive character matches
bufferMatch, addBuffers = "*_~|`", "*_~|`\[\]\(\)'" # Add 'buffer' chars to compensate for obfuscation
Expand All @@ -2114,7 +2204,7 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
compiledRegexDict['yellowAdWords'].append([word, value])
for word in exactRedAdWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=False).replace(m, a))
value = re.compile(confusable_regex(word.upper(), include_character_padding=False))
compiledRegexDict['exactRedAdWords'].append([word, value])
for word in usernameRedWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
Expand All @@ -2129,16 +2219,22 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive
# Prepare All-domain Regex Expression
prepString = "\.("
first = True
for extension in domainList:
for extension in rootDomainList:
if first == True:
prepString += extension
first = False
else:
prepString = prepString + "|" + extension
sensitivePrepString = prepString + ")"
prepString = prepString + ")\/"
domainRegex = re.compile(prepString)
sensitiveDomainRegex = re.compile(sensitivePrepString)
rootDomainRegex = re.compile(prepString)
sensitiveRootDomainRegex = re.compile(sensitivePrepString)

# Prepare spam domain regex
spamDomainRegex = []
for domain in spamDomainList:
expression = re.compile(confusable_regex(domain.upper(), include_character_padding=False))
spamDomainRegex.append(expression)

# Prepare Multi Language Detection
turkish = 'Ç窺Ğğİ'
Expand All @@ -2162,13 +2258,14 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive
'redAdEmojiSet': redAdEmojiSet,
'yellowAdEmojiSet': yellowAdEmojiSet,
'hrtSet': hrtSet,
'domainRegex': domainRegex,
'rootDomainRegex': rootDomainRegex,
'compiledRegexDict': compiledRegexDict,
'usernameConfuseRegex': usernameConfuseRegex,
'languages': languages,
'sensitive': sensitive,
'sensitiveDomainRegex': sensitiveDomainRegex,
'sensitiveRootDomainRegex': sensitiveRootDomainRegex,
'unicodeCategoriesStrip': unicodeCategoriesStrip,
'spamDomainRegex': spamDomainRegex,
}
return filterSettings, None

Expand Down Expand Up @@ -2274,27 +2371,39 @@ def main():
input("Press Enter to exit...")
sys.exit()

# Load any other data
print("Loading other assets..\n")
miscData = {}
domainList = ingest_domain_file()
miscData['domainList'] = domainList
if config:
moderator_mode = config['moderator_mode']
else:
moderator_mode = False

# Check for program updates
# Check for program and list updates
print("Checking for updates to program and spam lists...")
spamListFolder = "spam_lists"
spamDomainListFileName = "SpamDomainsList.txt"
spamDomainListPath = os.path.join(spamListFolder, spamDomainListFileName) # Path to version included in packaged assets folder
if not config or config['auto_check_update'] == True:
try:
updateAvailable = check_for_update(version, silentCheck=True)
os.system(clear_command)
spamDomainListVersion = get_list_file_version(spamDomainListPath)
check_lists_update(spamDomainListVersion)
spamDomainList = ingest_list_file(spamDomainListPath)
except:
print(f"{F.LIGHTRED_EX}Error Code U-3 occurred while checking for updates. (Checking can be disabled using the config file setting) Continuing...{S.R}\n")
updateAvailable = False
spamDomainList = ingest_asset_file(spamDomainListFileName)
else:
updateAvailable = False
os.system(clear_command)
spamDomainList = ingest_asset_file(spamDomainListFileName)
os.system(clear_command)

# Load any other data
print("Loading other assets..\n")
rootDomainListAssetFile = "rootZoneDomainList.txt"
miscData = {}
rootDomainList = ingest_asset_file(rootDomainListAssetFile)
miscData['rootDomainList'] = rootDomainList
miscData['spamDomainList'] = spamDomainList
os.system(clear_command)

if config:
moderator_mode = config['moderator_mode']
else:
moderator_mode = False

#----------------------------------- Begin Showing Program ---------------------------------
print(f"{F.LIGHTYELLOW_EX}\n===================== YOUTUBE SPAMMER PURGE v" + version + f" ====================={S.R}")
Expand Down

0 comments on commit 709eae8

Please sign in to comment.