Skip to content

Commit

Permalink
Batched async downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
king-millez committed Apr 30, 2024
1 parent f4c5ccc commit a6180ac
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 19 deletions.
135 changes: 134 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ python = "^3.10"
requests = "^2.25.1"
alive-progress = "^3.1.5"
loguru = "^0.7.2"
httpx = "^0.27.0"
aiofiles = "^23.2.1"

[tool.poetry.dev-dependencies]

Expand Down
53 changes: 39 additions & 14 deletions snapmap_archiver/SnapmapArchiver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import json
import os
import sys
import typing as t
from datetime import datetime
from time import sleep

import aiofiles
import httpx
import requests
from alive_progress import alive_bar
from loguru._logger import Logger
Expand Down Expand Up @@ -56,26 +59,48 @@ def __init__(
def _is_cached(self, snap_id: str) -> bool:
return snap_id in self.snap_cache

async def _batched_download(
self, snap_url: str, output_path: str, bar: t.Any, client: httpx.AsyncClient
):
if os.path.isfile(output_path):
self.logger.debug(f" - [{output_path}] already exists.")
bar()
return

async with aiofiles.open(output_path, "wb") as f:
await f.write((await client.get(snap_url)).content)

self.logger.debug(f" - Downloaded [{output_path}].")
bar()

def download_cached_snaps(self):
with alive_bar(
len(self.snap_cache), title=f"Downloading to [{self.output_dir}]..."
) as bar:
for snap in self.snap_cache.values():
fpath = os.path.join(
self.output_dir, f"{snap.snap_id}.{snap.file_type}"
all_snaps = list(self.snap_cache.values())
client = httpx.AsyncClient()
for snap_chunk in [
all_snaps[i : i + 20]
for i in range(
0, len(all_snaps), 20
) # 20 connections seems to be ok with rate limits.
]:
asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*[
self._batched_download(
snap.url,
os.path.join(
self.output_dir, f"{snap.snap_id}.{snap.file_type}"
),
bar,
client,
)
for snap in snap_chunk
]
)
)

if os.path.isfile(fpath):
self.logger.debug(f" - [{fpath}] already exists.")
bar()
continue

with open(fpath, "wb") as f:
f.write(requests.get(snap.url).content)

self.logger.debug(f" - Downloaded [{fpath}].")
bar()

if self.write_json:
with open(
os.path.join(
Expand Down
15 changes: 11 additions & 4 deletions snapmap_archiver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
from loguru import logger
from loguru._logger import Logger

from snapmap_archiver import (DEFAULT_RADIUS, DEFAULT_WRITE_JSON,
DEFAULT_ZOOM_DEPTH, ISSUES_URL, SNAP_PATTERN,
default_output_dir)
from snapmap_archiver import (
DEFAULT_RADIUS,
DEFAULT_WRITE_JSON,
DEFAULT_ZOOM_DEPTH,
ISSUES_URL,
SNAP_PATTERN,
default_output_dir,
)
from snapmap_archiver.coordinates import Coordinates
from snapmap_archiver.SnapmapArchiver import SnapmapArchiver

Expand Down Expand Up @@ -120,7 +125,9 @@ def main():
]

if not valid_snap_ids and not coordinates:
logger.error('Some kind of input is required. Run [snapmap-archiver -h] for help.')
logger.error(
"Some kind of input is required. Run [snapmap-archiver -h] for help."
)
sys.exit(1)

sm_archiver = SnapmapArchiver(
Expand Down

0 comments on commit a6180ac

Please sign in to comment.