From 777a59e0e1abad63bf9006c61eca680f9ff264db Mon Sep 17 00:00:00 2001 From: ThioJoe <12518330+ThioJoe@users.noreply.github.com> Date: Fri, 31 Dec 2021 13:08:12 -0700 Subject: [PATCH] Update Script - Add support for spam domain lists Script can now ingest a list of known spam domains. Also can retrieve up-to-date version of list automatically. In auto smart mode, searches the confusable list against combined string. Added functions for ingesting files more universally, to support any future spam lists --- YouTubeSpammerPurge.py | 179 +++++++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 35 deletions(-) diff --git a/YouTubeSpammerPurge.py b/YouTubeSpammerPurge.py index 224ffb65..5eb8bfa2 100644 --- a/YouTubeSpammerPurge.py +++ b/YouTubeSpammerPurge.py @@ -59,6 +59,7 @@ from pkg_resources import parse_version import unicodedata import hashlib +from itertools import islice # Non Standard Modules import rtfunicode @@ -555,10 +556,11 @@ def add_spam(commentID, videoID): redAdEmojiSet = smartFilter['redAdEmojiSet'] yellowAdEmojiSet = smartFilter['yellowAdEmojiSet'] hrtSet = smartFilter['hrtSet'] - domainRegex = smartFilter['domainRegex'] - compiledRegexDict = smartFilter['compiledRegexDict'] languages = smartFilter['languages'] sensitive = smartFilter['sensitive'] + rootDomainRegex = smartFilter['rootDomainRegex'] + spamDomainRegex = smartFilter['spamDomainRegex'] + compiledRegexDict = smartFilter['compiledRegexDict'] if debugSingleComment == True: if input("Sensitive True/False: ").lower() == 'true': sensitive = True @@ -566,7 +568,7 @@ def add_spam(commentID, videoID): # Check for sensitive smart mode if sensitive == True: - domainRegex = smartFilter['sensitiveDomainRegex'] + rootDomainRegex = smartFilter['sensitiveRootDomainRegex'] # Processed Variables combinedString = authorChannelName + commentText @@ -615,6 +617,8 @@ def remove_unicode_categories(string): add_spam(commentID, videoID) elif any(findOnlyObfuscated(expression[1], expression[0], authorChannelName) for expression in compiledRegexDict['usernameObfuBlackWords']): add_spam(commentID, videoID) + elif any(re.search(expression, combinedString) for expression in spamDomainRegex): + add_spam(commentID, videoID) elif sensitive == True and re.search(smartFilter['usernameConfuseRegex'], authorChannelName): add_spam(commentID, videoID) elif sensitive == False and findOnlyObfuscated(smartFilter['usernameConfuseRegex'], miscData['channelOwnerName'], authorChannelName): @@ -658,7 +662,7 @@ def remove_unicode_categories(string): if languageCount >= 2: yellowCount += 1 - if re.search(domainRegex, combinedString.lower()): + if re.search(rootDomainRegex, combinedString.lower()): yellowCount += 1 # Red Tests @@ -1367,7 +1371,8 @@ def safety_check_username_against_filter(currentUserName, scanMode, filterCharsS proceed = True return proceed - + + ############################# Check For Update ############################## def check_for_update(currentVersion, silentCheck=False): isUpdateAvailable = False @@ -1530,22 +1535,105 @@ def check_for_update(currentVersion, silentCheck=False): elif silentCheck == True: return isUpdateAvailable +######################### Try To Get Remote File ########################## +def getRemoteFile(url, stream, silent=False, headers=None): + try: + if stream == False: + response = requests.get(url, headers=headers) + elif stream == True: + response = requests.get(url, headers=headers, stream=True) + if response.status_code != 200: + if silent == False: + print("Error fetching remote file or resource: " + url) + print("Response Code: " + str(response.status_code)) + else: + return response + + except Exception as e: + if silent == False: + print(e + "\n") + print(f"{B.RED}{F.WHITE} Error {S.R} While Fetching Remote File or Resource: " + url) + print("See above messages for details.\n") + print("If this keeps happening, you may want to report the issue here: https://github.com/ThioJoe/YT-Spammer-Purge/issues") + return None + +########################### Check Lists Updates ########################### +def check_lists_update(currentListVersion, silentCheck = False): + isListUpdateAvailable = False + spamDomainListLatestCommit = 'https://api.github.com/repos/ThioJoe/YT-Spam-Domains-List/commits?path=SpamDomainsList.txt&page=1&per_page=1' + spamDomainListRawLink = 'https://raw.githubusercontent.com/ThioJoe/YT-Spam-Domains-List/main/SpamDomainsList.txt' + #otherlink = 'https://api.github.com/repos/ThioJoe/YT-Spam-Domains-List/contents/SpamDomainsList.txt' + #spamDomainHostedLocation = "https://cdn.jsdelivr.net/gh/thiojoe/YT-Spam-Domains-List/SpamDomainsList.txt" + + response = getRemoteFile(spamDomainListLatestCommit, silentCheck) + if response != None: + pass + elif silentCheck == True: + return isListUpdateAvailable + else: + input("Press enter to Exit...") + sys.exit() + + listUpdateDateTime = response.json()[0]['commit']['committer']['date'] + date = datetime.strptime(listUpdateDateTime, '%Y-%m-%dT%H:%M:%SZ') + latestVersion = date.strftime('%Y.%m.%d') + + if currentListVersion == None or (parse_version(latestVersion) > parse_version(currentListVersion)): + downloadFilePath = "spam_lists/SpamDomainsList.txt" + filedownload = getRemoteFile(spamDomainListRawLink, headers={'Accept-Encoding': 'identity'}, stream=True) # These headers required to get correct file size + total_size_in_bytes= int(filedownload.headers.get('content-length', 0)) + block_size = 1048576 #1 MiB in bytes + + with open(downloadFilePath, 'wb') as file: + for data in filedownload.iter_content(block_size): + file.write(data) + + if os.stat(downloadFilePath).st_size == total_size_in_bytes: + pass + elif total_size_in_bytes != 0 and os.stat(downloadFilePath).st_size != total_size_in_bytes: + os.remove(downloadFilePath) + print(f"\n> {F.RED} File did not fully download. Please try again later.") # Add back! + ############################# Ingest Other Files ############################## -def ingest_domain_file(): +def ingest_asset_file(fileName): def assetFilesPath(relative_path): if hasattr(sys, '_MEIPASS'): # If running as a pyinstaller bundle return os.path.join(sys._MEIPASS, relative_path) return os.path.join(os.path.abspath("assets"), relative_path) # If running as script, specifies resource folder as /assets # Open list of root zone domain extensions - with open(assetFilesPath("rootZoneDomainList.txt"), 'r', encoding="utf-8") as domainFile: - rootZoneData = domainFile.readlines() - rootZoneList = [] - for line in rootZoneData: - line = line.strip() - rootZoneList.append(line.lower()) - - return rootZoneList + with open(assetFilesPath(fileName), 'r', encoding="utf-8") as file: + data = file.readlines() + dataList = [] + for line in data: + if not line.startswith('#'): + line = line.strip() + dataList.append(line.lower()) + return dataList + +def ingest_list_file(relativeFilePath): + if os.path.exists(relativeFilePath): + with open(relativeFilePath, 'r', encoding="utf-8") as listFile: + listData = listFile.readlines() + processedList = [] + for line in listData: + if not line.startswith('#'): + line = line.strip() + processedList.append(line.lower()) + return processedList + +def get_list_file_version(relativeFilePath): + if os.path.exists(relativeFilePath): + matchBetweenBrackets = '(?<=\[)(.*?)(?=\])' # Matches text between first set of two square brackets + with open(relativeFilePath, 'r', encoding="utf-8") as file: + for line in islice(file, 0, 2): + try: + listVersion = str(re.search(matchBetweenBrackets, line).group(0)) + except AttributeError: + pass + return listVersion + else: + return None ############################# CONFIG FILE FUNCTIONS ############################## def create_config_file(): @@ -2014,7 +2102,8 @@ def prepare_filter_mode_non_ascii(currentUser, scanMode, config): # Auto smart mode def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive=False): currentUserName = currentUser[1] - domainList = miscData['domainList'] + rootDomainList = miscData['rootDomainList'] + spamDomainList = miscData['spamDomainList'] utf_16 = "utf-8" if config and config['filter_mode'] == "autosmart": pass @@ -2094,7 +2183,8 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive 'yellowAdWords': [], 'exactRedAdWords': [], 'usernameRedWords': [], - 'textObfuBlackWords': [] + 'textObfuBlackWords': [], + 'usernameObfuBlackWords': [], } # Compile regex with upper case, otherwise many false positive character matches bufferMatch, addBuffers = "*_~|`", "*_~|`\[\]\(\)'" # Add 'buffer' chars to compensate for obfuscation @@ -2114,7 +2204,7 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a)) compiledRegexDict['yellowAdWords'].append([word, value]) for word in exactRedAdWords: - value = re.compile(confusable_regex(word.upper(), include_character_padding=False).replace(m, a)) + value = re.compile(confusable_regex(word.upper(), include_character_padding=False)) compiledRegexDict['exactRedAdWords'].append([word, value]) for word in usernameRedWords: value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a)) @@ -2129,7 +2219,7 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive # Prepare All-domain Regex Expression prepString = "\.(" first = True - for extension in domainList: + for extension in rootDomainList: if first == True: prepString += extension first = False @@ -2137,8 +2227,14 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive prepString = prepString + "|" + extension sensitivePrepString = prepString + ")" prepString = prepString + ")\/" - domainRegex = re.compile(prepString) - sensitiveDomainRegex = re.compile(sensitivePrepString) + rootDomainRegex = re.compile(prepString) + sensitiveRootDomainRegex = re.compile(sensitivePrepString) + + # Prepare spam domain regex + spamDomainRegex = [] + for domain in spamDomainList: + expression = re.compile(confusable_regex(domain.upper(), include_character_padding=False)) + spamDomainRegex.append(expression) # Prepare Multi Language Detection turkish = 'Ç窺Ğğİ' @@ -2162,13 +2258,14 @@ def prepare_filter_mode_smart(currentUser, scanMode, config, miscData, sensitive 'redAdEmojiSet': redAdEmojiSet, 'yellowAdEmojiSet': yellowAdEmojiSet, 'hrtSet': hrtSet, - 'domainRegex': domainRegex, + 'rootDomainRegex': rootDomainRegex, 'compiledRegexDict': compiledRegexDict, 'usernameConfuseRegex': usernameConfuseRegex, 'languages': languages, 'sensitive': sensitive, - 'sensitiveDomainRegex': sensitiveDomainRegex, + 'sensitiveRootDomainRegex': sensitiveRootDomainRegex, 'unicodeCategoriesStrip': unicodeCategoriesStrip, + 'spamDomainRegex': spamDomainRegex, } return filterSettings, None @@ -2274,27 +2371,39 @@ def main(): input("Press Enter to exit...") sys.exit() - # Load any other data - print("Loading other assets..\n") - miscData = {} - domainList = ingest_domain_file() - miscData['domainList'] = domainList - if config: - moderator_mode = config['moderator_mode'] - else: - moderator_mode = False - - # Check for program updates + # Check for program and list updates + print("Checking for updates to program and spam lists...") + spamListFolder = "spam_lists" + spamDomainListFileName = "SpamDomainsList.txt" + spamDomainListPath = os.path.join(spamListFolder, spamDomainListFileName) # Path to version included in packaged assets folder if not config or config['auto_check_update'] == True: try: updateAvailable = check_for_update(version, silentCheck=True) - os.system(clear_command) + spamDomainListVersion = get_list_file_version(spamDomainListPath) + check_lists_update(spamDomainListVersion) + spamDomainList = ingest_list_file(spamDomainListPath) except: print(f"{F.LIGHTRED_EX}Error Code U-3 occurred while checking for updates. (Checking can be disabled using the config file setting) Continuing...{S.R}\n") updateAvailable = False + spamDomainList = ingest_asset_file(spamDomainListFileName) else: updateAvailable = False - os.system(clear_command) + spamDomainList = ingest_asset_file(spamDomainListFileName) + os.system(clear_command) + + # Load any other data + print("Loading other assets..\n") + rootDomainListAssetFile = "rootZoneDomainList.txt" + miscData = {} + rootDomainList = ingest_asset_file(rootDomainListAssetFile) + miscData['rootDomainList'] = rootDomainList + miscData['spamDomainList'] = spamDomainList + os.system(clear_command) + + if config: + moderator_mode = config['moderator_mode'] + else: + moderator_mode = False #----------------------------------- Begin Showing Program --------------------------------- print(f"{F.LIGHTYELLOW_EX}\n===================== YOUTUBE SPAMMER PURGE v" + version + f" ====================={S.R}")