diff --git a/src/uproot/source/coalesce.py b/src/uproot/source/coalesce.py new file mode 100644 index 000000000..52cad9289 --- /dev/null +++ b/src/uproot/source/coalesce.py @@ -0,0 +1,132 @@ +"""Read coalescing algorithms + +Inspired in part by https://github.com/cms-sw/cmssw/blob/master/IOPool/TFileAdaptor/src/ReadRepacker.h +""" + +from __future__ import annotations + +import queue +from concurrent.futures import Future +from dataclasses import dataclass +from typing import Callable + +import uproot.source.chunk + + +@dataclass +class CoalesceConfig: + max_range_gap: int = 32 * 1024 + max_request_ranges: int = 1024 + max_request_bytes: int = 10 * 1024 * 1024 + min_first_request_bytes: int = 32 * 1024 + + +DEFAULT_CONFIG = CoalesceConfig() + + +class SliceFuture: + def __init__(self, parent: Future, s: slice | int): + self._parent = parent + self._s = s + + def add_done_callback(self, callback, *, context=None): + self._parent.add_done_callback(callback) + + def result(self, timeout=None): + return self._parent.result(timeout=timeout)[self._s] + + +@dataclass +class RangeRequest: + start: int + stop: int + future: Future | None + + +@dataclass +class Cluster: + ranges: list[RangeRequest] + + @property + def start(self): + # since these are built from sorted ranges, this is the min start + return self.ranges[0].start + + @property + def stop(self): + return max(range.stop for range in self.ranges) + + def __len__(self): + return self.stop - self.start + + def set_future(self, future: Future): + for range in self.ranges: + local_start = range.start - self.start + local_stop = range.stop - self.start + range.future = SliceFuture(future, slice(local_start, local_stop)) + + +@dataclass +class CoalescedRequest: + clusters: list[Cluster] + + def ranges(self): + return [(cluster.start, cluster.stop) for cluster in self.clusters] + + def set_future(self, future: Future): + for i, cluster in enumerate(self.clusters): + cluster.set_future(SliceFuture(future, i)) + + +def _merge_adjacent(ranges: list[RangeRequest], config: CoalesceConfig): + sorted_ranges = sorted(ranges, key=lambda r: r.start) + cluster = Cluster([]) + for current_range in sorted_ranges: + if cluster.ranges and current_range.start - cluster.stop > config.max_range_gap: + yield cluster + cluster = Cluster([]) + cluster.ranges.append(current_range) + if cluster.ranges: + yield cluster + + +def _coalesce(ranges: list[RangeRequest], config: CoalesceConfig): + clusters: list[Cluster] = [] + request_bytes: int = 0 + first_request = True + for cluster in _merge_adjacent(ranges, config): + if clusters and ( + len(clusters) + 1 >= config.max_request_ranges + or request_bytes + len(cluster) >= config.max_request_bytes + or (first_request and request_bytes >= config.min_first_request_bytes) + ): + yield CoalescedRequest(clusters) + clusters = [] + request_bytes = 0 + first_request = False + clusters.append(cluster) + request_bytes += len(cluster) + if clusters: + yield CoalescedRequest(clusters) + + +def coalesce_requests( + ranges: list[tuple[int, int]], + submit_fn: Callable[[list[tuple[int, int]]], Future], + source: uproot.source.chunk.Source, + notifications: queue.Queue, + config: CoalesceConfig | None = None, +): + if config is None: + config = DEFAULT_CONFIG + all_requests = [RangeRequest(start, stop, None) for start, stop in ranges] + for merged_request in _coalesce(all_requests, config): + future = submit_fn(merged_request.ranges()) + merged_request.set_future(future) + + def chunkify(req: RangeRequest): + chunk = uproot.source.chunk.Chunk(source, req.start, req.stop, req.future) + req.future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications)) + return chunk + + return list(map(chunkify, all_requests)) diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py index 29718de32..588b6b31d 100644 --- a/src/uproot/source/fsspec.py +++ b/src/uproot/source/fsspec.py @@ -12,26 +12,14 @@ import uproot import uproot.source.chunk import uproot.source.futures - - -class PartFuture: - """For splitting the result of fs._cat_ranges into its components""" - - def __init__(self, parent_future: concurrent.futures.Future, part_index: int): - self._parent = parent_future - self._part_index = part_index - - def add_done_callback(self, callback, *, context=None): - self._parent.add_done_callback(callback) - - def result(self, timeout=None): - return self._parent.result(timeout=timeout)[self._part_index] +from uproot.source.coalesce import CoalesceConfig, coalesce_requests class FSSpecSource(uproot.source.chunk.Source): """ Args: file_path (str): A URL for the file to open. + coalesce_config (struct, optional): Configuration options for read coalescing **kwargs (dict): any extra arguments to be forwarded to the particular FileSystem instance constructor. This might include S3 access keys, or HTTP headers, etc. @@ -40,8 +28,11 @@ class FSSpecSource(uproot.source.chunk.Source): to get many chunks in one request. """ - def __init__(self, file_path: str, **options): + def __init__( + self, file_path: str, coalesce_config: CoalesceConfig | None = None, **options + ): super().__init__() + self._coalesce_config = coalesce_config self._fs, self._file_path = fsspec.core.url_to_fs( file_path, **self.extract_fsspec_options(options) ) @@ -50,7 +41,6 @@ def __init__(self, file_path: str, **options): self._async_impl = self._fs.async_impl self._file = None - self._fh = None self._open() @@ -75,25 +65,20 @@ def __repr__(self): return f"<{type(self).__name__} {path} at 0x{id(self):012x}>" def __getstate__(self): - self._fh = None state = dict(self.__dict__) state.pop("_executor") state.pop("_file") - state.pop("_fh") return state def __setstate__(self, state): self.__dict__ = state self._file = None - self._fh = None self._open() def __enter__(self): - self._fh = self._file.__enter__() return self def __exit__(self, exception_type, exception_value, traceback): - self._fh = None self._file.__exit__(exception_type, exception_value, traceback) self._executor.shutdown() @@ -110,20 +95,16 @@ def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start - if self._fh: - self._fh.seek(start) - data = self._fh.read(stop - start) - else: - data = self._fs.cat_file(self._file_path, start, stop) + data = self._fs.cat_file(self._file_path, start=start, end=stop) future = uproot.source.futures.TrivialFuture(data) return uproot.source.chunk.Chunk(self, start, stop, future) def chunks( - self, ranges: list[(int, int)], notifications: queue.Queue + self, ranges: list[tuple[int, int]], notifications: queue.Queue ) -> list[uproot.source.chunk.Chunk]: """ Args: - ranges (list of (int, int) 2-tuples): Intervals to fetch + ranges (list of tuple[int, int] 2-tuples): Intervals to fetch as (start, stop) pairs in a single request, if possible. notifications (``queue.Queue``): Indicator of completed chunks. After each gets filled, it is ``put`` on the @@ -171,29 +152,23 @@ async def async_wrapper_thread(blocking_func, *args, **kwargs): # TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above) return await to_thread(blocking_func, *args, **kwargs) - paths = [self._file_path] * len(ranges) - starts = [start for start, _ in ranges] - ends = [stop for _, stop in ranges] - # _cat_ranges is async while cat_ranges is not. - coroutine = ( - self._fs._cat_ranges(paths=paths, starts=starts, ends=ends) - if self._async_impl - else async_wrapper_thread( - self._fs.cat_ranges, paths=paths, starts=starts, ends=ends + def submit(request_ranges: list[tuple[int, int]]): + paths = [self._file_path] * len(request_ranges) + starts = [start for start, _ in request_ranges] + ends = [stop for _, stop in request_ranges] + # _cat_ranges is async while cat_ranges is not. + coroutine = ( + self._fs._cat_ranges(paths=paths, starts=starts, ends=ends) + if self._async_impl + else async_wrapper_thread( + self._fs.cat_ranges, paths=paths, starts=starts, ends=ends + ) ) - ) - - future = self._executor.submit(coroutine) + return self._executor.submit(coroutine) - chunks = [] - for index, (start, stop) in enumerate(ranges): - chunk_future = PartFuture(future, index) - chunk = uproot.source.chunk.Chunk(self, start, stop, chunk_future) - chunk_future.add_done_callback( - uproot.source.chunk.notifier(chunk, notifications) - ) - chunks.append(chunk) - return chunks + return coalesce_requests( + ranges, submit, self, notifications, config=self._coalesce_config + ) @property def async_impl(self) -> bool: diff --git a/tests/test_0692_fsspec_reading.py b/tests/test_0692_fsspec_reading.py index 73cc09406..eb6e3991c 100644 --- a/tests/test_0692_fsspec_reading.py +++ b/tests/test_0692_fsspec_reading.py @@ -97,10 +97,6 @@ def test_open_fsspec_local(): ) def test_open_fsspec_s3(handler): pytest.importorskip("s3fs") - if sys.version_info < (3, 11): - pytest.skip( - "https://github.com/scikit-hep/uproot5/pull/1012", - ) with uproot.open( "s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst", @@ -461,10 +457,6 @@ def test_fsspec_globbing_xrootd_no_files(handler): ) def test_fsspec_globbing_s3(handler): pytest.importorskip("s3fs") - if sys.version_info < (3, 11): - pytest.skip( - "https://github.com/scikit-hep/uproot5/pull/1012", - ) iterator = uproot.iterate( {"s3://pivarski-princeton/pythia_ppZee_run17emb.*.root": "PicoDst"}, diff --git a/tests/test_1198_coalesce.py b/tests/test_1198_coalesce.py new file mode 100644 index 000000000..709c47e10 --- /dev/null +++ b/tests/test_1198_coalesce.py @@ -0,0 +1,39 @@ +import pytest +from uproot.source.coalesce import CoalesceConfig, RangeRequest, _coalesce, Future + + +@pytest.mark.parametrize( + "config", + [ + CoalesceConfig(), + CoalesceConfig(max_range_gap=2, max_request_ranges=1), + ], + ids=["default", "tiny"], +) +@pytest.mark.parametrize( + "ranges", + [ + [(1, 3), (4, 6), (10, 20)], + [(1, 3), (10, 20), (4, 6), (9, 10)], + [(1, 3), (10, 20), (6, 15)], + [(1, 3), (10, 20), (6, 25)], + ], + ids=["sorted", "jumbled", "overlapped", "nested"], +) +def test_coalesce(ranges, config): + data = b"abcdefghijklmnopqurstuvwxyz" + + all_requests = [RangeRequest(start, stop, None) for start, stop in ranges] + nreq = 0 + for merged_request in _coalesce(all_requests, config): + future = Future() + future.set_result([data[start:stop] for start, stop in merged_request.ranges()]) + merged_request.set_future(future) + nreq += 1 + + if config.max_range_gap == 2: + assert nreq > 1 + + for req in all_requests: + assert req.future + assert req.future.result() == data[req.start : req.stop]