Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(draft): add report= argument to uproot.dask; triggers report collection for failed reads #1050

Conversation

douglasdavis
Copy link
Contributor

@douglasdavis douglasdavis commented Nov 29, 2023

This PR pairs with dask-contrib/dask-awkward#415. It adds a new report argument to uproot.dask. It causes the call to return two collections, the actual array of interest represented by the first collection, and a report collection as the second return.

As it stands if report is not None we create a report collection; it's designed to be computed simultaneously with whatever collection of the interest is. An example where we have two field access calls and then a dak.max call:

dataset, report = uproot.dask({"/path/to/files/*.root": "tree"}, report=True)
measurement = dak.max(dataset.thing1 + datasetset.thing2, axis=1)
result, report = dask.compute(measurement, report)

If any of the files at that path end up as failure-to-read (raise an exception), the the measurement collection will have those partitions get computed to an empty array, so the result array object will be pieced together from a concatenation where some of the ingredients are empty, and the report array object will be an awkward array of length dataset.npartitions, (one element in the array per partition)


We have some flexibility here with the report= argument. The from_map API in dask-awkward via dask-contrib/dask-awkward#415 has 4 arguments that can steer the behavior here

  1. which exceptions should be absorbed to send back an empty array at that failed node
  2. which backend for the empty array (for now I think is is always "cpu")
  3. a callback function to run on nodes where the read was successful (this is an entry point for customizing the report for successful reads). (The default is None)
  4. a callback function to run on nodes where the read was a failure (this is an entry point for customizing the report for failed reads). (The default is construct a record array which hold information about the error and the function arguments passed to the read function that failed)

cc @lgray

@douglasdavis douglasdavis changed the title draft: feat: add report= argument to uproot.dask; triggers report collection for failed reads feat(draft): add report= argument to uproot.dask; triggers report collection for failed reads Nov 29, 2023
@lgray
Copy link
Contributor

lgray commented Nov 29, 2023

We should allow the user to configure the kinds of exceptions that cause empty outputs for a partition rather than hardcoding Exception. Probably defaulted to (OSError, FileNotFoundError)?

@douglasdavis
Copy link
Contributor Author

How should we pass that in, as the report= argument or an additional argument?

@lgray
Copy link
Contributor

lgray commented Nov 29, 2023

uproot.iterate has report=True/False to take care of toggling having a report or not, I'd have that here for consistency.
I would add a separate argument to uproot.dask to specify the exceptions.

I suppose there should be an argument for custom report fillers and such too?

@jpivarski?

@lgray
Copy link
Contributor

lgray commented Nov 29, 2023

That or we have report:bool | ReportSpec = False where ReportSpec is something that specifies the four steering arguments of from_map. If report is a bool and True we use a well-informed default ReportSpec?

@jpivarski
Copy link
Member

report: bool or report: bool | ReportSpec is a good idea (the latter can be a later extension).

It's also worthwhile to get the report on the same side—left or right—as it is in uproot.iterate:

report (bool) – If True, this generator yields (arrays, uproot.behaviors.TBranch.Report) pairs; if False, it only yields arrays. The report has data about the TFile, TTree, and global and local entry ranges.

So, report goes on the right if it's a 2-tuple.

lgray added a commit to CoffeaTeam/coffea that referenced this pull request Nov 29, 2023
@lgray
Copy link
Contributor

lgray commented Nov 30, 2023

We should have uproot.dask error if you ask for the report when not in dask-awkward mode.

@lgray
Copy link
Contributor

lgray commented Dec 1, 2023

Giving this a try and right now I get (this is using dak on the failure-report branch rebased to main, this branch uproot):

>>> events, report = uproot.dask("tests/samples/nano_dy.root:Events", report=True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_dask.py", line 249, in dask
    return _get_dak_array(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_dask.py", line 1323, in _get_dak_array
    return dask_awkward.from_map(
  File "/Users/lgray/coffea-dev/dask-awkward/src/dask_awkward/lib/io/io.py", line 775, in from_map
    raise ValueError("io_func must implement mock_empty method.")
ValueError: io_func must implement mock_empty method.

@douglasdavis
Copy link
Contributor Author

Ah yes the function classes need a mock_empty along with mock. I'll tinker with it tomorrow

@lgray
Copy link
Contributor

lgray commented Dec 1, 2023

Tried out the latest branch with the following code:

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
#from distributed import Client                                                                                                                                                                                                                      
import dask
#import dask_awkward as dak                                                                                                                                                                                                                          


if __name__ == "__main__":
    #client = Client()                                                                                                                                                                                                                               


    #dask.config.set({"awkward.optimization.enabled": True, "awkward.raise-failed-meta": True, "awkward.optimization.on-fail": "raise"})                                                                                                             
    events, report = NanoEventsFactory.from_root(
        ["tests/samples/nano_dy.root:Events", "/not/actually/a/root/file.root:Events"],
        metadata={"dataset": "nano_dy"},
        schemaclass=NanoAODSchema,
        uproot_options={"report": True}
    ).events()

    pt, creport = dask.compute(events.Muon.pt, report)

the result was:

(coffea-dev) lgray@Lindseys-MacBook-Pro coffea % python -i report_play.py
/Users/lgray/coffea-dev/coffea/src/coffea/nanoevents/schemas/nanoaod.py:243: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8
  warnings.warn(
Traceback (most recent call last):
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 114, in _open
    self._file = numpy.memmap(self._file_path, dtype=self._dtype, mode="r")
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/numpy/core/memmap.py", line 228, in __new__
    f_ctx = open(os_fspath(filename), ('r' if mode == 'c' else mode)+'b')
FileNotFoundError: [Errno 2] No such file or directory: '/not/actually/a/root/file.root'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 38, in __init__
    self._file = open(self._file_path, "rb")
FileNotFoundError: [Errno 2] No such file or directory: '/not/actually/a/root/file.root'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/lgray/coffea-dev/dask-awkward/src/dask_awkward/lib/io/io.py", line 586, in __call__
    result = self.fn(*args, **kwargs)
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_dask.py", line 1086, in __call__
    ttree = uproot._util.regularize_object_path(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_util.py", line 1153, in regularize_object_path
    file = ReadOnlyFile(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/reading.py", line 588, in __init__
    self._source = Source(file_path, **self._options)
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 110, in __init__
    self._open()
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 120, in _open
    self._fallback = uproot.source.file.MultithreadedFileSource(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 253, in __init__
    self._open()
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 258, in _open
    [FileResource(self._file_path) for x in range(self._num_workers)]
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 258, in <listcomp>
    [FileResource(self._file_path) for x in range(self._num_workers)]
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 40, in __init__
    raise uproot._util._file_not_found(file_path) from err
FileNotFoundError: file not found

    '/not/actually/a/root/file.root'

Files may be specified as:
   * str/bytes: relative or absolute filesystem path or URL, without any colons
         other than Windows drive letter or URL schema.
         Examples: "rel/file.root", "C:\abs\file.root", "http://where/what.root"
   * str/bytes: same with an object-within-ROOT path, separated by a colon.
         Example: "rel/file.root:tdirectory/ttree"
   * pathlib.Path: always interpreted as a filesystem path or URL only (no
         object-within-ROOT path), regardless of whether there are any colons.
         Examples: Path("rel:/file.root"), Path("/abs/path:stuff.root")

Functions that accept many files (uproot.iterate, etc.) also allow:
   * glob syntax in str/bytes and pathlib.Path.
         Examples: Path("rel/*.root"), "/abs/*.root:tdirectory/ttree"
   * dict: keys are filesystem paths, values are objects-within-ROOT paths.
         Example: {"/data_v1/*.root": "ttree_v1", "/data_v2/*.root": "ttree_v2"}
   * already-open TTree objects.
   * iterables of the above.


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "report_play.py", line 19, in <module>
    pt, creport = dask.compute(events.Muon.pt, report)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/coffea-dev/dask-awkward/src/dask_awkward/lib/io/io.py", line 589, in __call__
    result = self.fn.mock_empty(self.backend)
TypeError: mock_empty() takes 1 positional argument but 2 were given

@lgray
Copy link
Contributor

lgray commented Dec 1, 2023

Using uproot alone:

import uproot
import dask

if __name__ == "__main__":
    events, report = uproot.dask(
        ["tests/samples/nano_dy.root:Events", "/not/actually/a/root/file.root:Events"],
        report=True,
        open_files=False,
    )

    pt, creport = dask.compute(events.Muon_pt, report)

yields the exact same error.

@lgray
Copy link
Contributor

lgray commented Dec 1, 2023

OK mock_empty just needs to accept backend to fix that.

After doing that I'm able to get:

>>> creport.to_list()
[{'args': [], 'kwargs': [], 'exception': '', 'message': ''}, {'args': ["('/not/actually/a/root/file.root', 'Events', 0, 1, False)"], 'kwargs': [], 'exception': 'FileNotFoundError', 'message': 'file not found\n\n    \'/not/actually/a/root/file.root\'\n\nFiles may be specified as:\n   * str/bytes: relative or absolute filesystem path or URL, without any colons\n         other than Windows drive letter or URL schema.\n         Examples: "rel/file.root", "C:\\abs\\file.root", "http://where/what.root"\n   * str/bytes: same with an object-within-ROOT path, separated by a colon.\n         Example: "rel/file.root:tdirectory/ttree"\n   * pathlib.Path: always interpreted as a filesystem path or URL only (no\n         object-within-ROOT path), regardless of whether there are any colons.\n         Examples: Path("rel:/file.root"), Path("/abs/path:stuff.root")\n\nFunctions that accept many files (uproot.iterate, etc.) also allow:\n   * glob syntax in str/bytes and pathlib.Path.\n         Examples: Path("rel/*.root"), "/abs/*.root:tdirectory/ttree"\n   * dict: keys are filesystem paths, values are objects-within-ROOT paths.\n         Example: {"/data_v1/*.root": "ttree_v1", "/data_v2/*.root": "ttree_v2"}\n   * already-open TTree objects.\n   * iterables of the above.\n'}]

tadaa!

@lgray
Copy link
Contributor

lgray commented Dec 1, 2023

So I can see the utility already of ReportSpec.

I think it makes sense that the default report for uproot doesn't fill any information for successful file opening/reading, and in fact we should fill with None rather than an empty report so it is clear to the user there was literally no problem in readiing.

However, for debugging system-level performance it would be useful to fill every report, regardless of success or failure, with step info (args I guess), bytes read, time taken to read. Adding the exception information when it happens.

So I think we should come out of the box with ReportSpec so that folks can dive into debugging/optimizing their clusters if they find performance issues. Possibly even providing a report spec that does what I describe above so it is at hand?

@douglasdavis @jpivarski

@douglasdavis
Copy link
Contributor Author

whoops on missing the backend argument! Fixed in the PR now.

On the topic of a report spec: I still need to think about how to make it possible to get a fully customize-able report record that isn't limited in the information it can use to build itself out. We'll likely need users to implement this stuff in their on __call__ implementation instead of us providing a more blackbox-ish wrapper. (which is exists as things stand -- in this PR and the dask-awkward PR the only information the report can possible have access to are the *args and **kwargs passed to the read function, and the exception that was raised). I've been tinkering with moving the code that accomplishes this into a more OOP approach but I'm still ironing it out.

src/uproot/_dask.py Outdated Show resolved Hide resolved
Co-authored-by: Lindsey Gray <lindsey.gray@gmail.com>
@lgray
Copy link
Contributor

lgray commented Dec 2, 2023

did a test using the full nanoevents machinery and it looks like we're good! report works as expected, no strange side effects.

@lgray
Copy link
Contributor

lgray commented Dec 3, 2023

@douglasdavis I think the direction you're going is fine in this case, the "user" I believe can be uproot since we have to implement our own thing anyway. We can then present a more black-boxy interface to folks using the library. I think that's OK especially since it's a not-typically user-facing part of dask-awkward.

@jpivarski jpivarski added next-release Required for the next release and removed next-release Required for the next release labels Dec 8, 2023
@jpivarski
Copy link
Member

Closed in favor of #1058.

@jpivarski jpivarski closed this Dec 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants