Skip to content

Commit

Permalink
Improve ratelimiting, add recursive uploads (#13)
Browse files Browse the repository at this point in the history
# Bugs and Debugging
Improves rate limiting and debugging by slowing down file renames and
adding better `--dry-run` support.

# Recursive Uploads
Adds ability to upload all files in a directory by recursively finding
files.
All files will be uploaded into one directory and directory structure
will be lost.
Use `--recurse-directories` for this option in conjunction
`--exclude-file-types` or `--only-file-types` to limit the file types
that you want to upload. This feature implements the `--recurse-max`
flag as a safety measure so you don't upload a whole drive and has a
default of 1000 files.
If you go over this limit the program will just exit. If you need to
upload more files than that you will need to set this flag.

# Parallel MD5 Hashing 
Also introduces the some md5sum hashing improvements (supposedly) by
hashing in parallel with a default based on your CPU core count. This
can be configured via `--hash-pool-size`. Needs more testing.
  • Loading branch information
alexmi256 authored Aug 24, 2024
2 parents f1f41b8 + e9f019d commit 2502b68
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 48 deletions.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ usage: gofile-upload [-h] [-t TOKEN] [-z {na,eu}] [-f FOLDER] [-d]
[--rename-existing | --no-rename-existing]
[-c CONNECTIONS] [--timeout TIMEOUT]
[--public | --no-public] [--save | --no-save]
[--use-config | --no-use-config] [-r RETRIES]
[--use-config | --no-use-config]
[--recurse-directories | --no-recurse-directories]
[--recurse-max RECURSE_MAX]
[--exclude-file-types EXCLUDE_FILE_TYPES]
[--only-file-types ONLY_FILE_TYPES] [-r RETRIES]
[--hash-pool-size HASH_POOL_SIZE]
[--log-level {debug,info,warning,error,critical}]
[--log-file LOG_FILE]
file
Expand Down Expand Up @@ -67,8 +72,25 @@ optional arguments:
Whether to create and use a config file in
$HOME/.config/gofile_upload/config.json. (default:
True)
--recurse-directories, --no-recurse-directories
Whether to recursively iterate all directories and
search for files to upload if a directory is given as
the upload file
--recurse-max RECURSE_MAX
Maximum number of files before the program errors out
when using --recurse-directory feature. Put here as
safety feature.
--exclude-file-types EXCLUDE_FILE_TYPES
Exclude files ending with these extensions from being
uploaded. Comma separated values. Example: jpg,png
--only-file-types ONLY_FILE_TYPES
Only upload files ending with these extensions. Comma
separated values. Example: jpg,png
-r RETRIES, --retries RETRIES
How many times to retry a failed upload. (default: 3)
--hash-pool-size HASH_POOL_SIZE
How many md5 hashes to calculate in parallel.
(default: 4)
--log-level {debug,info,warning,error,critical}
Log level. (default: warning)
--log-file LOG_FILE Additional file to log information to. (default: None)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "GofileIOUploader"
version = "0.12.5"
version = "0.13.0"
description = "Gofile.io uploader supporting parallel uploads"
readme = "README.md"
requires-python = ">=3.9"
Expand Down
37 changes: 24 additions & 13 deletions src/gofile_uploader/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def __init__(self, options: GofileUploaderOptions):
self.session_headers = (
{"Authorization": f"Bearer {self.options['token']}"} if self.options.get("token") else None
)
self.session = aiohttp.ClientSession("https://api.gofile.io", headers=self.session_headers)
self.session = aiohttp.ClientSession(
"https://api.gofile.io", headers=self.session_headers, raise_for_status=True
)
# These are set once the account is queried
self.root_folder_id = None
self.account_id = None
Expand All @@ -56,7 +58,9 @@ async def init(self):
if not self.session.closed:
await self.session.close()
self.session_headers = {"Authorization": f"Bearer {self.options['token']}"}
self.session = aiohttp.ClientSession("https://api.gofile.io", headers=self.session_headers)
self.session = aiohttp.ClientSession(
"https://api.gofile.io", headers=self.session_headers, raise_for_status=True
)

# Use the account provided by the token
else:
Expand All @@ -72,18 +76,21 @@ async def init(self):

@staticmethod
async def get_new_account() -> GetNewAccountResponse:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.post("https://api.gofile.io/accounts") as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response)
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
return response

@staticmethod
def raise_error_if_error_in_remote_response(response):
if response and ("error" in response.get("status", "") or response.get("status", "") != "ok"):
msg = f"Failed getting response from server:\n{pformat(response)}"
logger.error(msg)
raise Exception(msg)
def raise_error_if_error_in_remote_response(response, exit_if_rate_limited=False):
if response:
if exit_if_rate_limited and ("error-rateLimit" in response.get("status", "")):
exit(1)
if "error" in response.get("status", "") or response.get("status", "") != "ok":
msg = f"Failed getting response from server:\n{pformat(response)}"
logger.error(msg)
raise Exception(msg)

def raise_error_if_not_premium_status(self):
if self.is_premium is False:
Expand Down Expand Up @@ -116,19 +123,20 @@ async def get_servers(self, zone: Optional[Literal["eu", "na"]]) -> GetServersRe
params = {"zone": zone} if zone else None
async with self.session.get("/servers", params=params) as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response)
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
return response

async def get_account_id(self) -> GetAccountIdResponse:
async with self.session.get("/accounts/getid") as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response)
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
logger.debug(f'Account id is "{response["data"]["id"]}"')
return response

async def get_account_details(self, account_id: str) -> GetAccountDetailsResponse:
async with self.session.get(f"/accounts/{account_id}") as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
logger.debug(f'Account details for "{account_id}" are {response["data"]}')
return response

Expand All @@ -155,6 +163,7 @@ async def get_content(self, content_id: str, cache: Optional[bool], password: Op

async with self.session.get(f"/contents/{content_id}", params=params) as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
return response

async def create_folder(self, parent_folder_id: str, folder_name: Optional[str]) -> CreateFolderResponse:
Expand All @@ -165,7 +174,7 @@ async def create_folder(self, parent_folder_id: str, folder_name: Optional[str])
logger.debug(f"Creating new folder '{folder_name}' in parent folder id '{parent_folder_id}' ")
async with self.session.post("/contents/createfolder", data=data) as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response)
GofileIOAPI.raise_error_if_error_in_remote_response(response, exit_if_rate_limited=True)
logger.debug(
f'Folder "{response["data"]["name"]}" ({response["data"]["id"]}) created in {response["data"]["parentFolder"]}'
)
Expand Down Expand Up @@ -240,7 +249,9 @@ async def upload_file(self, file_path: Path, folder_id: Optional[str] = None) ->

async with session.post("/contents/uploadfile", data=data) as resp:
response = await resp.json()
GofileIOAPI.raise_error_if_error_in_remote_response(response)
GofileIOAPI.raise_error_if_error_in_remote_response(
response, exit_if_rate_limited=False
)

file_metadata.update(response["data"])
file_metadata["uploadSuccess"] = response.get("status")
Expand Down
6 changes: 6 additions & 0 deletions src/gofile_uploader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def cli(argparse_arguments: list[str]) -> GofileUploaderOptions:
"log_level": "warning",
"timeout": 600,
"recurse_directories": False,
"hash_pool_size": max(os.cpu_count() - 2, 1),
}
parser = argparse.ArgumentParser(prog="gofile-upload", description="Gofile.io Uploader supporting parallel uploads")
parser.add_argument("file", type=Path, help="File or directory to look for files in to upload")
Expand Down Expand Up @@ -144,6 +145,11 @@ def cli(argparse_arguments: list[str]) -> GofileUploaderOptions:
type=int,
help=f"How many times to retry a failed upload. (default: {default_cli_options['retries']})",
)
parser.add_argument(
"--hash-pool-size",
type=int,
help=f"How many md5 hashes to calculate in parallel. (default: {default_cli_options['hash_pool_size']})",
)
parser.add_argument(
"--log-level",
type=str,
Expand Down
1 change: 1 addition & 0 deletions src/gofile_uploader/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"log_file": None,
"folder": None,
"file": Path("src/gofile_uploader/tests/example_files/file1.txt"),
"hash_pool_size": 1,
"config_file_path": None,
"config_directory": None,
}
Expand Down
91 changes: 58 additions & 33 deletions src/gofile_uploader/gofile_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import re
import sys
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from pprint import pformat, pprint

from tqdm import tqdm
from typing_extensions import List, Optional, cast

from .api import GofileIOAPI
Expand Down Expand Up @@ -111,8 +113,14 @@ async def get_folder_id(self, folder: Optional[str], cache: bool = True) -> str:
logger.info(
f'Could not find a folder inside the root folder with the name "{folder}" so we will create one'
)
new_folder = await self.api.create_folder(self.api.root_folder_id, folder)
return new_folder["data"]["id"]
if self.options["dry_run"]:
print(
f"Dry run only, skipping folder creation of '{folder}' and using root folder '{self.api.root_folder_id}'"
)
return self.api.root_folder_id
else:
new_folder = await self.api.create_folder(self.api.root_folder_id, folder)
return new_folder["data"]["id"]

async def cleanup_api_sessions(self):
await self.api.session.close()
Expand All @@ -131,28 +139,35 @@ def save_responses_to_csv(self, responses: List[CompletedFileUploadResult]):
csv_writer.writerow(row)

def get_md5_sums_for_files(self, paths: List[Path]) -> dict[str, str]:
paths_of_files = [x for x in paths if x.is_file() and str(x) not in self.options["history"]["md5_sums"]]

sums = {}

# TODO: Try to parallelize this
for path in paths:
if path.is_file():
if str(path) in self.options["history"]["md5_sums"]:
md5_sum_for_file = self.options["history"]["md5_sums"][str(path)]
logger.debug(
f'Found precomputed md5sum ({md5_sum_for_file}) for path "{path}" using md5_sums config history'
)
sums[str(path)] = md5_sum_for_file
# TODO: Also check the previously uploaded file responses for MD5s of the same path
else:
logger.debug(f"Computing new md5sum for file {path}")
sums[str(path)] = GofileIOUploader.checksum(path)
disable_hashing_progress = True if len(paths_of_files) < 50 else False

number_of_files = len(paths_of_files)

if number_of_files:
logger.info(f"Calculating hashes for {number_of_files}/{len(paths)} files")
else:
logger.info(f"All {len(paths)} files were previously hashed")

with tqdm(
total=number_of_files, desc="Hashes Calculated", disable=disable_hashing_progress
) as files_hashed_progress:
with ProcessPoolExecutor(max_workers=self.options["hash_pool_size"]) as executor:
futures = {executor.submit(GofileIOUploader.checksum, arg): arg for arg in paths_of_files}
for future in as_completed(futures):
arg = futures[future]
sums[str(arg)] = future.result()
files_hashed_progress.update(1)

# Save md5sums to local config cache so we don't have to recompute later
self.options["history"]["md5_sums"].update(sums)
# Update the current configs since we could have calculated md5 sums
self.save_config_file()

return sums
return {str(path): self.options["history"]["md5_sums"][str(path)] for path in paths}

@staticmethod
def checksum(filename: Path, hash_factory=hashlib.md5, chunk_num_blocks: int = 128):
Expand Down Expand Up @@ -213,9 +228,12 @@ async def upload_files(self, path: Path, folder: Optional[str] = None) -> None:
and not folder_id_contents["data"]["public"]
):
logger.info(f"Making folder {folder_id} public")
await self.api.update_content(folder_id, "public", "true")
if self.options["dry_run"]:
print(f"Dry run only, folder {folder_id} will not be made public")
else:
await self.api.update_content(folder_id, "public", "true")

skipped_files_msg = f'{len(paths_to_skip)}/{len(paths)} files will be skipped since they were already uploaded to the folder "{folder}"'
skipped_files_msg = f'{len(paths_to_skip)}/{len(paths)} files will be skipped since they were already uploaded to the folder "{folder}" ({folder_id})'

logger.info(skipped_files_msg)

Expand Down Expand Up @@ -243,16 +261,22 @@ async def upload_files(self, path: Path, folder: Optional[str] = None) -> None:
f"File {existing_file} matched against md5 {existing_file_md5} on the server but with different name. Will renamed."
)

for content_to_rename in matching_remote_files_to_rename:
logger.info(f'Renaming {content_to_rename["name"]} (server) to {existing_file_name} (local)')
try:
await self.api.update_content(content_to_rename["id"], "name", existing_file_name)
logger.info(f'Renamed {content_to_rename["name"]} to {existing_file_name}')
except Exception as e:
msg = f'Failed to rename file from {content_to_rename["name"]} (server) to {existing_file_name} (local)'
logger.exception(msg, exc_info=e, stack_info=True)

renamed_files.append(content_to_rename)
if self.options["dry_run"]:
print(f"Dry run only, file renaming will be skipped")
else:
for content_to_rename in matching_remote_files_to_rename:
logger.info(
f'Renaming {content_to_rename["name"]} (server) to {existing_file_name} (local)'
)
try:
await self.api.update_content(content_to_rename["id"], "name", existing_file_name)
logger.info(f'Renamed {content_to_rename["name"]} to {existing_file_name}')
time.sleep(0.5)
except Exception as e:
msg = f'Failed to rename file from {content_to_rename["name"]} (server) to {existing_file_name} (local)'
logger.exception(msg, exc_info=e, stack_info=True)

renamed_files.append(content_to_rename)

renamed_files_msg = f"Renamed {len(renamed_files)}/{len(paths_to_skip)} skipped files"

Expand All @@ -263,7 +287,11 @@ async def upload_files(self, path: Path, folder: Optional[str] = None) -> None:

if paths:
try:
responses = await self.api.upload_files(paths, folder_id)
if self.options["dry_run"]:
print(f"Dry run only, files will not be uploaded")
responses = []
else:
responses = await self.api.upload_files(paths, folder_id)
finally:
# FIXME: Should session management even be done here, probably not?
await self.cleanup_api_sessions()
Expand Down Expand Up @@ -294,10 +322,7 @@ async def async_main() -> None:

try:
await gofile_client.api.init()
if options["dry_run"]:
print("Dry run only, uploading skipped")
else:
await gofile_client.upload_files(options["file"], options.get("folder"))
await gofile_client.upload_files(options["file"], options.get("folder"))
finally:
await gofile_client.cleanup_api_sessions()
gofile_client.save_config_file()
Expand Down
1 change: 1 addition & 0 deletions src/gofile_uploader/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class GofileUploaderLocalConfigOptions(TypedDict):
history: GofileUploaderLocalConfigHistory
recurse_directories: Optional[bool]
recurse_max: Optional[int]
hash_pool_size: Optional[int]


class GofileUploaderOptions(GofileUploaderLocalConfigOptions):
Expand Down

0 comments on commit 2502b68

Please sign in to comment.