Skip to content

Commit

Permalink
Refactor merging module and improve temporary directory handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fbriol committed Feb 10, 2024
1 parent 2c47e9b commit 7548aff
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
23 changes: 16 additions & 7 deletions zcollection/merging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@
from __future__ import annotations

from typing import Protocol
import random
import hashlib
import shutil

import fsspec
import fsspec.implementations.local
import zarr.storage

from zcollection import fs_utils

from .. import dataset, storage, sync
from .time_series import merge_time_series

__all__ = ('MergeCallable', 'perform', 'merge_time_series')

#: Character set used to create a temporary directory.
CHARACTERS = 'abcdefghijklmnopqrstuvwxyz0123456789_'


#: pylint: disable=too-few-public-methods,duplicate-code
class MergeCallable(Protocol):
Expand Down Expand Up @@ -88,6 +87,12 @@ def _rename(
fs.mv(source, dest, recursive=True)


def _extract_root_dirname(dirname: str, sep: str) -> str:
"""Extracts the root directory name from a partition name."""
parts = filter(lambda x: '=' not in x, dirname.split(sep))
return sep.join(parts)


def _update_fs(
dirname: str,
zds: dataset.Dataset,
Expand All @@ -103,9 +108,13 @@ def _update_fs(
fs: The file system that the partition is stored on.
synchronizer: The instance handling access to critical resources.
"""
# Name of the temporary directory.
temp: str = dirname + '.' + ''.join(
random.choice(CHARACTERS) for _ in range(10))
# Building a temporary directory to store the new data. The name of the
# temporary directory is the hash of the partition name.
temp: str = fs_utils.join_path(
_extract_root_dirname(dirname, fs.sep),
hashlib.sha256(dirname.encode()).hexdigest())
if fs.exists(temp):
fs.rm(temp, recursive=True)

# Initializing Zarr group
zarr.storage.init_group(store=fs.get_mapper(temp))
Expand Down
4 changes: 2 additions & 2 deletions zcollection/merging/tests/test_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_update_fs(
generator = data.create_test_dataset(delayed=False)
zds = next(generator)

partition_folder = local_fs.root.joinpath('partition_folder')
partition_folder = local_fs.root.joinpath('variable=1')

zattrs = str(partition_folder.joinpath('.zattrs'))
future = dask_client.submit(_update_fs, str(partition_folder),
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_perform(
generator = data.create_test_dataset(delayed=delayed)
zds = next(generator)

path = str(local_fs.root.joinpath('folder'))
path = str(local_fs.root.joinpath('variable=1'))

future = dask_client.submit(_update_fs, path, dask_client.scatter(zds),
local_fs.fs)
Expand Down

0 comments on commit 7548aff

Please sign in to comment.