-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
178 lines (143 loc) · 5.48 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from flask import Flask, request, send_from_directory, abort, Response, redirect, jsonify, send_file
from flask_cors import CORS
import werkzeug
from pathlib import Path
from json import loads
import filesystem
import time
BASE_DIR = Path(__file__).resolve().parent
STATIC_DIR = BASE_DIR / "static"
ASSETS_DIR = STATIC_DIR / "assets"
configPath: Path = None
config: dict = None
storage: filesystem.storage = None
cache: dict[str, dict] = {}
# fileid: {time: int, metadata: dict}
app = Flask(__name__)
CORS(app)
def load_config():
global storage, app, config, configPath
if not configPath:
configPath = BASE_DIR / "config.json"
if not configPath.exists():
raise FileNotFoundError("Config file not found!")
config = loads(configPath.read_text())
if config["storage"] not in filesystem.storageTypes:
raise ValueError(f"Invalid storage type: {config['storage']}")
storage = getattr(filesystem, config["storage"])(config, configPath)
if config["host"]["proxy"]:
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1
)
def check_config():
if not config:
load_config()
def store_cache(metadata: filesystem.Metadata):
cache[metadata.id] = {"time": int(time.time()), "metadata": metadata}
def get_cache(fileid: str, filename: str) -> filesystem.Metadata:
if fileid in cache and filename == cache[fileid]["metadata"].name:
if cache[fileid]["time"] + config["host"]["cachetime"] > int(time.time()):
return cache[fileid]["metadata"]
else:
cache.pop(fileid)
return None
def clear_cache():
for fileid in cache:
if cache[fileid]["time"] + config["host"]["cachetime"] < int(time.time()):
cache.pop(fileid)
def load_metadata(fileid: str, filename: str):
metadata = get_cache(fileid, filename)
if metadata is None:
metadata = storage.load_metadata(fileid, filename)
metadata.folderid = None
metadata.delete = None
store_cache(metadata)
return metadata
@app.before_request
def before_request():
check_config()
@app.route('/info')
def info():
return config["general"], 200
@app.route('/assets/<path:path>')
def send_assets(path):
return send_from_directory(ASSETS_DIR, path)
@app.route('/', methods=['GET', 'POST'])
def main():
return STATIC_DIR.joinpath("index.html").read_text(), 200
@app.route('/api/v1/<path:path>', methods=['GET'])
def v1api(path):
dt = path.split("/")
if len(dt) > 2: return abort(404)
fileid = dt[0]
filename = dt[1]
try:
data = load_metadata(fileid, filename).to_dict()
return jsonify(data), 200
except FileNotFoundError:
return abort(404)
@app.route('/api/v1/clearcache')
def clearcache():
clear_cache()
return "OK", 200
@app.route('/<path:path>', methods=['GET'])
def get(path):
if path == "favicon.ico": return redirect(config["general"]["icon"])
if path == "index.html": return redirect("/")
if path == "robots.txt": return "Disallow: /", 200
dt = path.split("/")
if len(dt) > 2: return abort(404)
fileid = dt[0]
filename = dt[1]
# if fileid == "ye" and filename == "hello.gif": return STATIC_DIR.joinpath("item.html").read_text(), 200
if "curl" in request.headers.get("User-Agent"): return getf(path)
try:
if len(fileid) != config["folderidlength"]: return abort(404)
metadata = load_metadata(fileid, filename)
return STATIC_DIR.joinpath("item.html").read_text(), 200
except FileNotFoundError:
return abort(404)
@app.route('/get/<path:path>', methods=['GET'])
def getf(path):
if path.startswith("delete/"): return abort(404)
dt = path.split("/")
if len(dt) > 2: return abort(404)
fileid = dt[0]
filename = dt[1]
try:
if storage.cache and storage.is_cached(fileid, filename):
return send_file(storage.get_cached(fileid, filename))
if config["host"]["cdn"]["enabled"]:
redirect_url = config['host']['cdn']['url']
redirect_url = redirect_url[:-1] if redirect_url[-1] == "/" else redirect_url
return redirect(f"{redirect_url}/{fileid}/{filename}")
return storage.download(fileid, filename)
except FileNotFoundError:
return abort(404)
@app.route('/<path:path>', methods=['PUT'])
def putf(path):
if "/" in path: return "Invalid path", 400
if path[0] == ".": return "Invalid path", 400
metadata = storage.save(request.stream, request.content_length, path)
store_cache(metadata)
unknown_domains = ["localhost", "127.0.0.1"]
if request.host_url.split("//")[1].split("/")[0] in unknown_domains:
print("WARNING: Host URL is not set correctly! Check your proxy settings. (HOST Header)")
return f"{request.host_url}{metadata.id}/{metadata.name}", 200
@app.errorhandler(404)
def E404(e):
return error("404", "Not Found!")
@app.errorhandler(500)
def E500(e):
return error("500", "Internal Server Error!")
@app.errorhandler(400)
def E400(e):
return error("400", "Bad Request!")
def error(code: str, msg: str):
if "curl" in request.headers.get("User-Agent"): return f"{code}: {msg}", int(code)
return STATIC_DIR.joinpath("error.html").read_text().replace("StatusCode", code).replace("StatusMessage", msg), int(code)
if __name__ == '__main__':
load_config()
app.debug = config["debug"]
app.run(host=config["host"]["ip"], port=config["host"]["port"])