Skip to content

Commit

Permalink
Merge pull request #2 from Kinuseka/experimental
Browse files Browse the repository at this point in the history
Using Mirror servers as an alternative solutions for the API
  • Loading branch information
Kinuseka authored May 2, 2022
2 parents f9a35ed + 9d8bc0a commit a3928dc
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 57 deletions.
6 changes: 3 additions & 3 deletions Lib/NHentai.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import urllib.request
#I recommend reading into the source code of the nhentai website to get a better understanding of what my code really does


site_domain = "net"
headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.103 Safari/537.36"}
#Optional
def CheckLink(data, digit=False):
Expand All @@ -13,8 +13,8 @@ def CheckLink(data, digit=False):
Most of the time there wont be any major edits other than the website you want to check.
'''
if digit:
return("https://nhentai.net/g/%s" % data)
if re.search("https?://nhentai.net/g/(\d+|/)", data.lower()):
return(f"https://nhentai.{site_domain}/g/%s" % data)
if re.search(f"https?://nhentai.{site_domain}/g/(\d+|/)", data.lower()):
return(0, data)
else:
return(2, "Link is not nHentai")
Expand Down
141 changes: 141 additions & 0 deletions Lib/NHentai_mirror.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from bs4 import BeautifulSoup
import re
import json
import yaml
import urllib.request
#I recommend reading into the source code of the nhentai website to get a better understanding of what my code really does

site_domain = "to"
headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.103 Safari/537.36"}
#Optional
def CheckLink(data, digit=False):
'''For MODDERS:
This part is where you modify your OWN link checker to your own target site to scrape.
Most of the time there wont be any major edits other than the website you want to check.
'''
if digit:
return(f"https://nhentai.{site_domain}/g/%s" % data)
if re.search(f"https?://nhentai.{site_domain}/g/(\d+|/)", data.lower()):
return(0, data)
else:
return(2, "Link is not nHentai")

#Main API
class Api:
# Use INIT to initialize the needed data, for increased and faster loading times to other functions
def __init__(self,data):
'''
argument 'data' should be a valid link the target booklet
'''

self.name = "NHentai_mirror" #Directory label
#NHENTAI SITE FORTUNATELY HAS A DEDICATED JSON EMBEDDED INTO A SCRIPT FILE THAT YOU CAN USE TO GAIN INFORMATION FROM THE SITE.
#DIFFERENT SITES MIGHT NOT HAVE A JSON FILE SO YOU WILL HAVE TO DO THE PROCESS MANUALLY
req = urllib.request.Request(data, headers=headers)
page = urllib.request.urlopen(req)
self.soup = BeautifulSoup(page, "html.parser")
script_p1 = (self.soup.find_all("script"))
for num, script_line in enumerate(script_p1):
try:
script_line = script_line.contents[0].strip()
script_p2 = re.search(r'N.gallery\((.*?)gallery.init\(', script_line, re.DOTALL)
if script_p2:
break

except IndexError as e:
pass
except AttributeError as e:
pass

script_p2 = re.sub(r'(^N.gallery\()|(gallery.init\($)', '', script_p2.group()).replace(");","")
tscript = str(script_p2)
#IF THERE IS NO ERROR THEN PROCEED

script = yaml.safe_load(tscript)

self.json = script

self.__preloader_pages()

def Pages(self):
"Total available pages count"
Page = len(self.json["images"]["pages"])
return Page

def Tags(self):
"""For MODDERS:
For better readability for humans or other programs, I recommend you use Json to serialize your data.
"""
Tag = self.json["tags"]
return Tag

def Title(self):
title = self.json["title"]["english"]
return title



def Direct_link(self,value):
"""For MODDERS:
This function is only used to RETURN a valid direct link to the targeted image.
The variable 'value' is the episode/page of the certain image to return.
"""
data = self.preloaded_data[value-1]
file = data["t"]
if file == "j":
extension = "jpg"
elif file == "p":
extension = "png"
elif file == "g":
extension = "gif"
else:
print("WARNING AT PAGE: %s\nUNIDENTIFIED FORMAT DETECTED REPORT THIS BUG\nautoset: jpg" % value)
extension = "jpg"
media_id = self.json["media_id"]
url = "https://i.nhentai.net/galleries/%s/%s.%s" % (media_id, value, extension)
#url = "https://t.dogehls.xyz/galleries/%s/%s.%s" % (media_id, value, extension)
return url

def __preloader_pages(self):
dict_data = self.json["images"]["pages"]
data = []
try:
for v in range(self.Pages()):
data.append(dict_data[f"{v+2}"])
except TypeError as e:
data = dict_data
self.preloaded_data = data

class Iterdata:
"""File Iterator used to automatically detect links inside a text file
"""
def __init__(self,data):
self.available = True #Used to indicate that the feature is available. False if none
self.data = data
self._index = -1
self.temptxt = []
def __iter__(self):
return self
def __enter__(self):
self.txt_line = open(self.data,"r")
for rawline in self.txt_line:
for tline in rawline.replace(","," ").split():
if not tline.isdigit():
continue
if len(tline) > 6:
long_line = re.findall('.{1,6}', tline)
for fixline in long_line:
self.temptxt.append(fixline)
self.temptxt.append(tline)
return self
def __next__(self):
self._index += 1
if self._index >= len(self.temptxt):
raise StopIteration
return self.temptxt[self._index]
def __reversed__(self):
return self.temptxt[::-1]
def __exit__(self,tp,v,tb):
self.txt_line.close()

24 changes: 17 additions & 7 deletions Lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import importlib
Api = None
Iterdata = None
CheckLink = None

_module_name = "NHentai"
def init_import(import_name):
global Api
global Iterdata
global CheckLink

_classes = ("Api","Iterdata","CheckLink")
_package_name = "Lib"
_full_module = "%s.%s" % (_package_name,_module_name)
#Default to NHentai

_module_name = import_name

_package_name = "Lib"
_full_module = "%s.%s" % (_package_name,_module_name)
_classes = ("Api","Iterdata","CheckLink")


Api = getattr(importlib.import_module(_full_module),_classes[0])
Iterdata = getattr(importlib.import_module(_full_module),_classes[1])
CheckLink = getattr(importlib.import_module(_full_module),_classes[2])
Iterdata = getattr(importlib.import_module(_full_module),_classes[1])
Api = getattr(importlib.import_module(_full_module),_classes[0])
CheckLink = getattr(importlib.import_module(_full_module),_classes[2])


#FOR MODDERS:
Expand Down
4 changes: 4 additions & 0 deletions Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import re


def initialize(API_DATA_CONFIG):
Lib.init_import(API_DATA_CONFIG["module_name"])

def Data_parse(data):
"""For MODDERS:
1.If you want to add a Link verifier to prevent invalid link error then you have nothing to edit here, however you can modify it to return true all the time if you dont want this feature
Expand Down Expand Up @@ -78,6 +81,7 @@ class CommunicateApi:
Usually does not require modifying this unless if you want to add missing features
"""
def __init__(self, data):

self._Handler = Lib.Api(data)
self.name = self._Handler.name
def Pages(self):
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ A python script that collects data from NHentai.net.
>Httpx
>Anyio (Trio Backend)
>Trio
>pyyaml
```

**Supported Sites at the moment**
```
• NHentai
```
• NHentai [Mirror sites: .to])
```

**Note:**
Mirror download is enabled by default incase the official site is not available, if you prefer
to disable this. Please set "mirror_available" to `false` in config.json
120 changes: 77 additions & 43 deletions Start_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#Custom library
import Process
from essentials import updater as Updater

#EDIT THE MAXIMUM AMOUNT OF DOWNLOAD PROCESS HAPPENING AT THE SAME TIME
#LOWER VALUE: SLOWER, MORE STABLE (BEST IN SLOW NETWORK CONDITIONS)
Expand All @@ -46,6 +47,7 @@ def main(args):
sys.exit(1)
logger.info("Getting Data from API")
#CREATE AN INSTANCE AND LOAD THE NEEDED DATA

Api = Process.CommunicateApi(returnedData)
AcquiredPage = Api.Pages()
AcquiredTags = Api.Tags()
Expand Down Expand Up @@ -187,6 +189,8 @@ def sconfig(_type):
config = json.load(f)
if _type == 1:
return config["main"]["semaphore"]
elif _type == 2:
return config["main"]["Api"]
def FileName():
#THIS FUNCTION DELETES OLD LOGFILES, AND ASSIGNS A NAME TO THE NEW ONE
if not os.path.isdir("Logs"): os.mkdir("Logs")
Expand Down Expand Up @@ -244,6 +248,8 @@ def getSystemInfo(logtype):
#-------

max_process_open = sconfig(1)
API_DATA_CONFIG = sconfig(2)
API_MIRROR_ACCOMPLISHED = False
EMERGENCY = 255
verbose = False
info = '''
Expand All @@ -253,57 +259,85 @@ def getSystemInfo(logtype):
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-n', '--nukecode',metavar=" ", help="-n/--nukecode [argument]")
group.add_argument('-f', '--filecode',type=is_path, metavar=" ", help="-f/--filecode [file.txt location]")
group.add_argument('-up', '--update', action="store_true", help="Checks for update and applies it")
parser.add_argument('-v', '--verbose', action="store_true", help="Enable a verbose downloader")
args = parser.parse_args()
if args.verbose:
verbose = True
elif args.update:
print("Initiating update are you sure? (Y/n)")
__choice_user = input().lower().strip()
if __choice_user == "y":
Updater.github_sync()
sys.exit()
request_status = []
#CALL FUNCTIONS---


#Catch error and main function calls
try:
loggon.info(f"=============== System INFO ===============")
getSystemInfo(loggon)
loggon.info(f"===========================================")
if args.filecode:
if Process.CommunicateApi.File_iter.available:

logger.warning("This method is still UNDER TESTING and MIGHT NOT WORK PROPERLY")
time.sleep(3)
with Process.CommunicateApi.File_iter(args.filecode) as iof:
for file_link in iof:
logger.info("Downloading link: %s" % file_link)
main(file_link)
print("-"*10)
def callers():
try:
Process.initialize(API_DATA_CONFIG)
loggon.info(f"=============== System INFO ===============")
getSystemInfo(loggon)
loggon.info(f"===========================================")
if args.filecode:
if Process.CommunicateApi.File_iter.available:

logger.warning("This method is still UNDER TESTING and MIGHT NOT WORK PROPERLY")
time.sleep(3)
with Process.CommunicateApi.File_iter(args.filecode) as iof:
for file_link in iof:
logger.info("Downloading link: %s" % file_link)
main(file_link)
print("-"*10)
else:
logger.error("This method is not available for the current module")
else:
logger.error("This method is not available for the current module")
else:
main(args.nukecode)
except urllib.error.HTTPError as e:
#ONLY OCCURS WHEN THERE IS NO RESULTS
if e.code == 404:
logger.error("The content you are looking for is not found")
else:
logger.error("HTTP Error Code: %s" % e.code)

sys.exit(1)
except urllib.error.URLError as error:
logger.error("A connection error has occured")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
sys.exit(1)
except SystemExit as error:
if error.code == EMERGENCY:
os._exit(1)
main(args.nukecode)
except urllib.error.HTTPError as e:
#ONLY OCCURS WHEN THERE IS NO RESULTS
if e.code == 404:
logger.error("The content you are looking for is not found")
else:
logger.error("HTTP Error Code: %s" % e.code)
if API_DATA_CONFIG["mirror_available"] and not API_MIRROR_ACCOMPLISHED:
return 101
sys.exit(1)
except urllib.error.URLError as error:
logger.error("A connection error has occured")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
sys.exit(1)
except SystemExit as error:
if error.code == EMERGENCY:
os._exit(1)
else:
raise
except KeyboardInterrupt:
print("")
logger.info("Attempting to close thread..")
run_event.clear()
Thread1.join()
logger.info("Thread closed successfully")
except ModuleNotFoundError as error:
mod_dir = f'Lib.{API_DATA_CONFIG["module_name"]}'
if error.name == mod_dir:
if API_MIRROR_ACCOMPLISHED:
logger.error("Mirror server is not available, traceback is saved on the recent log file")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
else:
logger.error(f"Importing error, {error.name} is not a valid module, traceback is saved on the recent log file")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
except:
logger.error("An unknown error was found while getting data from API, traceback is saved on the recent log file")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
sys.exit()
while True:
exit_code = callers()
if exit_code == 101:
logger.info("Mirror server enabled, trying mirror server.")
API_DATA_CONFIG["module_name"] = f'{API_DATA_CONFIG["module_name"]}_mirror'
API_MIRROR_ACCOMPLISHED = True
else:
raise
except KeyboardInterrupt:
print("")
logger.info("Attempting to close thread..")
run_event.clear()
Thread1.join()
logger.info("Thread closed successfully")
except:
logger.error("An unknown error was found while getting data from API, traceback is saved on the recent log file")
loggon.exception("Exception catched: %s" % sys.exc_info()[0])
sys.exit()
break

Loading

0 comments on commit a3928dc

Please sign in to comment.