diff --git a/src/npe2/io_utils.py b/src/npe2/io_utils.py index a6462528..2ba68594 100644 --- a/src/npe2/io_utils.py +++ b/src/npe2/io_utils.py @@ -154,9 +154,16 @@ def _read( if _pm is None: _pm = PluginManager.instance() - for rdr in _pm.iter_compatible_readers(paths): - if plugin_name and rdr.plugin_name != plugin_name: - continue + # get readers compatible with paths and chosen plugin - raise errors if + # choices are invalid or there's nothing to try + chosen_compatible_readers = _get_compatible_readers_by_choice( + plugin_name, paths, _pm + ) + assert ( + chosen_compatible_readers + ), "No readers to try. Expected an exception before this point." + + for rdr in chosen_compatible_readers: read_func = rdr.exec( kwargs={"path": paths, "stack": stack, "_registry": _pm.commands} ) @@ -167,12 +174,94 @@ def _read( if plugin_name: raise ValueError( - f"Plugin {plugin_name!r} was selected to open " + f"Reader {plugin_name!r} was selected to open " + f"{paths!r}, but returned no data." ) raise ValueError(f"No readers returned data for {paths!r}") +def _get_compatible_readers_by_choice( + plugin_name: Union[str, None], paths: Union[str, Sequence[str]], pm: PluginManager +): + """Returns compatible readers filtered by validated plugin choice. + + Checks that plugin_name is an existing plugin (and command if + a specific contribution was passed), and that it is compatible + with paths. Raises ValueError if given plugin doesn't exist, + it is not compatible with the given paths, or no compatible + readers can be found for paths (if no plugin was chosen). + + Parameters + ---------- + plugin_name: Union[str, None] + name of chosen plugin, or None + paths: Union[str, Sequence[str]] + paths to read + pm: PluginManager + plugin manager instance to check for readers + + Raises + ------ + ValueError + If the given reader doesn't exist + ValueError + If there are no compatible readers + ValueError + If the given reader is not compatible + + Returns + ------- + compat_readers : List[ReaderContribution] + Compatible readers for plugin choice + """ + passed_contrib = plugin_name and ("." in plugin_name) + compat_readers = list(pm.iter_compatible_readers(paths)) + compat_reader_names = sorted( + {(rdr.command if passed_contrib else rdr.plugin_name) for rdr in compat_readers} + ) + helper_error_message = ( + f"Available readers for {paths!r} are: {compat_reader_names!r}." + if compat_reader_names + else f"No compatible readers are available for {paths!r}." + ) + + # check whether plugin even exists. + if plugin_name: + try: + # note that get_manifest works with a full command e.g. my-plugin.my-reader + pm.get_manifest(plugin_name) + if passed_contrib: + pm.get_command(plugin_name) + except KeyError: + raise ValueError( + f"Given reader {plugin_name!r} does not exist. {helper_error_message}" + ) from None + + # no choice was made and there's no readers to try + if not plugin_name and not len(compat_reader_names): + raise ValueError(helper_error_message) + + # user didn't make a choice and we have some readers to try, return them + if not plugin_name: + return compat_readers + + # user made a choice and it exists, but it may not be a compatible reader + plugin, _, _ = plugin_name.partition(".") + chosen_compatible_readers = [ + rdr + for rdr in compat_readers + if rdr.plugin_name == plugin + and (not passed_contrib or rdr.command == plugin_name) + ] + # the user's choice is not compatible with the paths. let them know what is + if not chosen_compatible_readers: + raise ValueError( + f"Given reader {plugin_name!r} is not a compatible reader for {paths!r}. " + + helper_error_message + ) + return chosen_compatible_readers + + @overload def _write( path: str, diff --git a/tests/test__io_utils.py b/tests/test__io_utils.py index bde64d22..ee64df4d 100644 --- a/tests/test__io_utils.py +++ b/tests/test__io_utils.py @@ -1,5 +1,6 @@ # extra underscore in name to run this first from pathlib import Path +from unittest.mock import patch import pytest @@ -23,14 +24,30 @@ def test_read(uses_sample_plugin): def test_read_with_unknown_plugin(uses_sample_plugin): # no such plugin name.... skips over the sample plugin & error is specific - with pytest.raises(ValueError, match="Plugin 'nope' was selected"): - read(["some.fzzy"], plugin_name="nope", stack=False) + paths = ["some.fzzy"] + chosen_reader = "not-a-plugin" + with pytest.raises( + ValueError, match=f"Given reader {chosen_reader!r} does not exist." + ) as e: + read(paths, plugin_name=chosen_reader, stack=False) + assert f"Available readers for {paths!r} are: {[SAMPLE_PLUGIN_NAME]!r}" in str(e) + + +def test_read_with_unknown_plugin_no_readers(uses_sample_plugin): + paths = ["some.nope"] + chosen_reader = "not-a-plugin" + with pytest.raises( + ValueError, match=f"Given reader {chosen_reader!r} does not exist." + ) as e: + read(paths, plugin_name=chosen_reader, stack=False) + assert "No compatible readers are available" in str(e) def test_read_with_no_plugin(): # no plugin passed and none registered - with pytest.raises(ValueError, match="No readers returned"): - read(["some.nope"], stack=False) + paths = ["some.nope"] + with pytest.raises(ValueError, match="No compatible readers are available"): + read(paths, stack=False) def test_read_uses_correct_passed_plugin(tmp_path): @@ -40,6 +57,9 @@ def test_read_uses_correct_passed_plugin(tmp_path): long_name_plugin = DynamicPlugin(long_name, plugin_manager=pm) short_name_plugin = DynamicPlugin(short_name, plugin_manager=pm) + long_name_plugin.register() + short_name_plugin.register() + path = "something.fzzy" mock_file = tmp_path / path mock_file.touch() @@ -62,6 +82,82 @@ def read(paths): io_utils._read(["some.fzzy"], plugin_name=short_name, stack=False, _pm=pm) +def test_read_fails(): + pm = PluginManager() + plugin_name = "always-fails" + plugin = DynamicPlugin(plugin_name, plugin_manager=pm) + plugin.register() + + @plugin.contribute.reader(filename_patterns=["*.fzzy"]) + def get_read(path): + return None + + with pytest.raises(ValueError, match=f"Reader {plugin_name!r} was selected"): + io_utils._read(["some.fzzy"], plugin_name=plugin_name, stack=False, _pm=pm) + + with pytest.raises(ValueError, match="No readers returned data"): + io_utils._read(["some.fzzy"], stack=False, _pm=pm) + + +def test_read_with_incompatible_reader(uses_sample_plugin): + paths = ["some.notfzzy"] + chosen_reader = f"{SAMPLE_PLUGIN_NAME}" + with pytest.raises( + ValueError, match=f"Given reader {chosen_reader!r} is not a compatible reader" + ): + read(paths, stack=False, plugin_name=chosen_reader) + + +def test_read_with_no_compatible_reader(): + paths = ["some.notfzzy"] + with pytest.raises(ValueError, match="No compatible readers are available"): + read(paths, stack=False) + + +def test_read_with_reader_contribution_plugin(uses_sample_plugin): + paths = ["some.fzzy"] + chosen_reader = f"{SAMPLE_PLUGIN_NAME}.some_reader" + assert read(paths, stack=False, plugin_name=chosen_reader) == [(None,)] + + # if the wrong contribution is passed we get useful error message + chosen_reader = f"{SAMPLE_PLUGIN_NAME}.not_a_reader" + with pytest.raises( + ValueError, + match=f"Given reader {chosen_reader!r} does not exist.", + ) as e: + read(paths, stack=False, plugin_name=chosen_reader) + assert "Available readers for" in str(e) + + +def test_read_assertion_with_no_compatible_readers(uses_sample_plugin): + paths = ["some.noreader"] + with patch("npe2.io_utils._get_compatible_readers_by_choice", return_value=[]): + with pytest.raises(AssertionError, match="No readers to try."): + read(paths, stack=False) + + +def test_available_readers_show_commands(uses_sample_plugin): + paths = ["some.fzzy"] + chosen_reader = "not-a-plugin.not-a-reader" + with pytest.raises( + ValueError, + match=f"Given reader {chosen_reader!r} does not exist.", + ) as e: + read(paths, stack=False, plugin_name=chosen_reader) + assert "Available readers " in str(e) + assert f"{SAMPLE_PLUGIN_NAME}.some_reader" in str(e) + + chosen_reader = "not-a-plugin" + with pytest.raises( + ValueError, + match=f"Given reader {chosen_reader!r} does not exist.", + ) as e: + read(paths, stack=False, plugin_name=chosen_reader) + assert "Available readers " in str(e) + assert f"{SAMPLE_PLUGIN_NAME}.some_reader" not in str(e) + assert f"{SAMPLE_PLUGIN_NAME}" in str(e) + + def test_read_return_reader(uses_sample_plugin): data, reader = read_get_reader("some.fzzy") assert data == [(None,)]