Skip to content

Commit

Permalink
Merge pull request #41 from alliander-opensource/feature/downloader
Browse files Browse the repository at this point in the history
Feature/downloader
  • Loading branch information
bramstoeller authored Mar 7, 2023
2 parents 4710bf3 + c9a25c8 commit d4bb730
Show file tree
Hide file tree
Showing 10 changed files with 941 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"power_grid_model>=1.4",
"pyyaml",
"structlog",
"tqdm",
]
dynamic = ["version"]

Expand Down
246 changes: 246 additions & 0 deletions src/power_grid_model_io/utils/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <dynamic.grid.calculation@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0
"""
Helper functions to download (and store) files from the internet
The most simple (and intended) usage is:
url = "http://141.51.193.167/simbench/gui/usecase/download/?simbench_code=1-complete_data-mixed-all-0-sw&format=csv"
zip_file_path = download(url)
It will download the zip file 1-complete_data-mixed-all-0-sw.zip to a folder in you systems temp dir; for example
"/tmp/1-complete_data-mixed-all-0-sw.zip".
Another convenience function is download_and_extract():
csv_dir_path = download_and_extract(url)
This downloads the zip file as described above, and then it extracts the files there as well, in a folder which
corresponds to the zip file name ("/tmp/1-complete_data-mixed-all-0-sw/" in our example), and it returns the path to
that directory. By default, it will not re-download or re-extract the zip file as long as the files exist in your
temp dir. Your temp dir is typically emptied when you reboot your computer.
"""

import base64
import hashlib
import re
import tempfile
from dataclasses import dataclass
from pathlib import Path
from shutil import rmtree as remove_dir
from typing import Optional, Union
from urllib import request

import structlog
from tqdm import tqdm

from power_grid_model_io.utils.zip import extract

_log = structlog.get_logger(__name__)


@dataclass
class ResponseInfo:
"""
Data class to store response information extracted from the response header
"""

status: int
file_name: Optional[str] = None
file_size: Optional[int] = None


class DownloadProgressHook: # pylint: disable=too-few-public-methods
"""
Report hook for request.urlretrieve() to update a progress bar based on the amount of downloaded blocks
"""

def __init__(self, progress_bar: tqdm):
"""
Report hook for request.urlretrieve() to update a progress bar based on the amount of downloaded blocks
Args:
progress_bar: A tqdm progress bar
"""
self._progress_bar = progress_bar
self._last_block = 0

def __call__(self, block_num: int, block_size: int, file_size: int) -> None:
"""
Args:
block_num: The last downloaded block number
block_size: The block size in bytes
file_size: The file size in bytes (may be 0 in the first call)
"""
if file_size > 0:
self._progress_bar.total = file_size
self._progress_bar.update((block_num - self._last_block) * block_size)
self._last_block = block_num


def download_and_extract(
url: str, dir_path: Optional[Path] = None, file_name: Optional[Union[str, Path]] = None, overwrite: bool = False
) -> Path:
"""
Download a file from a URL and store it locally, extract the contents and return the path to the contents.
Args:
url: The url to the .zip file
dir_path: An optional dir path to store the downloaded file. If no dir_path is given the current working dir
will be used.
file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is
generated based on the url
overwrite: Should we download the file, even if we have downloaded already (and the file size still matches)?
Be careful with this option, as it will remove files from your drive irreversibly!
Returns:
The path to the downloaded file
"""

# Download the file and use the file name as the base name for the extraction directory
src_file_path = download(url=url, file_name=file_name, dir_path=dir_path, overwrite=overwrite)
dst_dir_path = src_file_path.with_suffix("")

# If we explicitly want to overwrite the extracted files, remove the destination dir.
if overwrite and dst_dir_path.is_dir():
remove_dir(dst_dir_path)

# Extract the files and return the path of the extraction directory
return extract(src_file_path=src_file_path, dst_dir_path=dst_dir_path, skip_if_exists=not overwrite)


def download(
url: str, file_name: Optional[Union[str, Path]] = None, dir_path: Optional[Path] = None, overwrite: bool = False
) -> Path:
"""
Download a file from a URL and store it locally
Args:
url: The url to the file
file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is
generated based on the url
dir_path: An optional dir path to store the downloaded file. If no dir_path is given the current working dir
will be used.
overwrite: Should we download the file, even if we have downloaded already (and the file size still matches)?
Returns:
The path to the downloaded file
"""

# get the response info, if the status is not 200
info = get_response_info(url=url)
if info.status != 200:
raise IOError(f"Could not download from URL, status={info.status}")

if file_name is None and info.file_name:
file_name = info.file_name

file_path = get_download_path(dir_path=dir_path, file_name=file_name, unique_key=url)
log = _log.bind(url=url, file_path=file_path)

if file_path.is_file():
if overwrite:
log.debug("Forced re-downloading existing file")
# Don't remove the existing file just yet... Let's first see if we can download a new version.
else:
local_size = file_path.stat().st_size
if local_size == info.file_size:
log.debug("Skip downloading existing file")
return file_path
log.debug(
"Re-downloading existing file, because the size has changed",
local_size=local_size,
remote_size=info.file_size,
)
else:
log.debug("Downloading file")

# Download to a temp file first, so the results are not stored if the transfer fails
with tqdm(desc="Downloading", unit="B", unit_scale=True, leave=True) as progress_bar:
report_hook = DownloadProgressHook(progress_bar)
temp_file, _headers = request.urlretrieve(url, reporthook=report_hook)

# Check if the file contains any content
temp_path = Path(temp_file)
if temp_path.stat().st_size == 0:
log.warning("Downloaded an empty file")

# Remove the file, if it already exists
file_path.unlink(missing_ok=True)

# Move the file to it's final destination
file_path.parent.mkdir(parents=True, exist_ok=True)
temp_path.rename(file_path)
log.debug("Downloaded file", file_size=file_path.stat().st_size)

return file_path


def get_response_info(url: str) -> ResponseInfo:
"""
Retrieve the file size of a given URL (based on it's header)
Args:
url: The url to the file
Return:
The file size in bytes
"""
with request.urlopen(url) as context:
status = context.status
headers = context.headers
file_size = int(headers["Content-Length"]) if "Content-Length" in headers else None
matches = re.findall(r"filename=\"(.+)\"", headers.get("Content-Disposition", ""))
file_name = matches[0] if matches else None

return ResponseInfo(status=status, file_size=file_size, file_name=file_name)


def get_download_path(
dir_path: Optional[Path] = None,
file_name: Optional[Union[str, Path]] = None,
unique_key: Optional[str] = None,
) -> Path:
"""
Determine the file path based on dir_path, file_name and/or data
Args:
dir_path: An optional dir path to store the downloaded file. If no dir_path is given the system's temp dir
will be used. If omitted, the tempfolder is used.
file_name: An optional file name (or path relative to dir_path). If no file_name is given, a file name is
generated based on the unique key (e.g. an url)
unique_key: A unique string that can be used to generate a filename (e.g. a url).
"""

# If no specific download path was given, we need to generate a unique key (based on the given unique key)
if file_name is None or unique_key is not None:
if unique_key is None:
raise ValueError("Supply a unique key in order to auto generate a download path.")

sha256 = hashlib.sha256()
sha256.update(unique_key.encode())
unique_key = base64.b64encode(sha256.digest()).decode("ascii")
unique_key = unique_key.replace("/", "_").replace("+", "-").rstrip("=")

# If no file name was given, use the unique key as a file name
if file_name is None:
file_name = Path(f"{unique_key}.download")
# Otherwise, use the unique key as a sub directory
elif dir_path is None:
dir_path = Path(tempfile.gettempdir()) / unique_key

# If no dir_path is given, use the system's designated folder for temporary files.
if dir_path is None:
dir_path = Path(tempfile.gettempdir())

# Combine the two paths
assert file_name is not None
file_path = (dir_path / file_name) if dir_path else Path(file_name)

# If the file_path exists, it should be a file (not a dir)
if file_path.exists() and not file_path.is_file():
raise ValueError(f"Invalid file path: {file_path}")

return file_path.resolve()
99 changes: 99 additions & 0 deletions src/power_grid_model_io/utils/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <dynamic.grid.calculation@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0
"""
Helper function to extract zip files
csv_dir_path = extract("/tmp/1-complete_data-mixed-all-0-sw.zip")
This extracts the files, in a folder which corresponds to the zip file name ("/tmp/1-complete_data-mixed-all-0-sw/" in
our example), and it returns the path to that directory. By default, it will not re-download or re-extract the zip
file as long as the files exist.
"""

import zipfile
from pathlib import Path
from typing import Optional

import structlog
from tqdm import tqdm

_log = structlog.get_logger(__name__)


def extract(src_file_path: Path, dst_dir_path: Optional[Path] = None, skip_if_exists=False) -> Path:
"""
Extract a .zip file and return the destination dir
Args:
src_file_path: The .zip file to extract.
dst_dir_path: An optional destination path. If none is given, the src_file_path without .zip extension is used.
skip_if_exists: Skip existing files, otherwise raise an exception when a file exists.
Returns: The path where the files are extracted
"""
if src_file_path.suffix.lower() != ".zip":
raise ValueError(f"Only files with .zip extension are supported, got {src_file_path.name}")

if dst_dir_path is None:
dst_dir_path = src_file_path.with_suffix("")

log = _log.bind(src_file_path=src_file_path, dst_dir_path=dst_dir_path)

if dst_dir_path.exists():
if not dst_dir_path.is_dir():
raise NotADirectoryError(f"Destination dir {dst_dir_path} exists and is not a directory")

# Create the destination directory
dst_dir_path.mkdir(parents=True, exist_ok=True)

# Extract per file, so we can show a progress bar
with zipfile.ZipFile(src_file_path, "r") as zip_file:
file_list = zip_file.namelist()
for file_path in tqdm(desc="Extracting", iterable=file_list, total=len(file_list), unit="file", leave=True):
dst_file_path = dst_dir_path / file_path
if dst_file_path.exists() and dst_file_path.stat().st_size > 0:
if skip_if_exists:
log.debug("Skip file extraction, destination file exists", dst_file_path=dst_file_path)
continue
raise FileExistsError(f"Destination file {dst_dir_path / file_path} exists and is not empty")
zip_file.extract(member=file_path, path=dst_dir_path)

# Zip files often contain a single directory with the same name as the zip file.
# In that case, return the dir to that directory instead of the root dir
only_item = _get_only_item_in_dir(dst_dir_path)
if only_item and only_item.is_dir() and only_item.name == src_file_path.stem:
dst_dir_path = only_item

return dst_dir_path.resolve()


def _get_only_item_in_dir(dir_path: Path) -> Optional[Path]:
"""
If dir path contains only a single item, return that item.
Return None otherwise (if there are no items at all, or more than one item).
Args:
dir_path: The path tho the directory
Returns:
A path to the only item (dir or file) in the directory
"""

only_item: Optional[Path] = None
for item in dir_path.iterdir():

# If only_item is not None at this point, it must have been set in the first iteration, i.e. there are more
# than one items in the directory, so return None.
if only_item is not None:
return None

# Else, if only_item is None, we are in the first iteration, i.e. the first item in the dir. This item may be
# the only item in the dir, so let's remember it.
only_item = item

# If we have come to this point, there were zero or one items in the directory. Return the path to that item (or
# None, the initial value).
return only_item
Binary file added tests/data/zip/foo-bar.zip
Binary file not shown.
3 changes: 3 additions & 0 deletions tests/data/zip/foo-bar.zip.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <dynamic.grid.calculation@alliander.com>

SPDX-License-Identifier: MPL-2.0
Binary file added tests/data/zip/foo.zip
Binary file not shown.
3 changes: 3 additions & 0 deletions tests/data/zip/foo.zip.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <dynamic.grid.calculation@alliander.com>

SPDX-License-Identifier: MPL-2.0
Loading

0 comments on commit d4bb730

Please sign in to comment.