Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixe using ThreadRunner with dataset factories #4093

Merged
merged 25 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d38b731
Added lock for dataset load
ElenaKhaustova Aug 15, 2024
711f82c
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 15, 2024
5a7642f
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 19, 2024
6bc06a8
Replaced multiprocessing lock with multithreading
ElenaKhaustova Aug 19, 2024
0ca6689
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 19, 2024
66ded79
Added an alternative solution
ElenaKhaustova Aug 19, 2024
a44b078
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 19, 2024
f9df584
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 20, 2024
219df08
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 22, 2024
99f2da2
Updated solution 2
ElenaKhaustova Aug 22, 2024
ffe17d3
Made solution 1 main
ElenaKhaustova Aug 22, 2024
61bafca
Removed solution 2
ElenaKhaustova Aug 22, 2024
ac9621a
Updated release notes
ElenaKhaustova Aug 22, 2024
669e930
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 23, 2024
d4f6dc1
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 27, 2024
80404bf
Removed lock solution
ElenaKhaustova Aug 27, 2024
a022445
Returned solution with patterns resolving
ElenaKhaustova Aug 27, 2024
0ee3330
Added test for ThreadRunner
ElenaKhaustova Aug 27, 2024
0ba5eed
Merge branch 'fix/4007-thread-runner-with-dataset-factories-bu' into …
ElenaKhaustova Aug 27, 2024
2173f2a
Added _match_pattern patching
ElenaKhaustova Aug 27, 2024
e303959
Fixed tests to satisfy required coverage
ElenaKhaustova Aug 28, 2024
a772800
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 28, 2024
e3770d3
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 28, 2024
ee8377e
Reverted heading for the release notes
ElenaKhaustova Aug 29, 2024
4e0b290
Merge branch 'main' into fix/4007-thread-runner-with-dataset-factories
ElenaKhaustova Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Major features and improvements
* Enhanced `OmegaConfigLoader` configuration validation to detect duplicate keys at all parameter levels, ensuring comprehensive nested key checking.
## Bug fixes and other changes
* Fixed bug where using dataset factories breaks with `ThreadRunner`.

## Breaking changes to the API

## Documentation changes
Expand Down
8 changes: 7 additions & 1 deletion kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
validate_settings,
)
from kedro.io.core import generate_timestamp
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.runner import AbstractRunner, SequentialRunner, ThreadRunner
from kedro.utils import _find_kedro_project

if TYPE_CHECKING:
Expand Down Expand Up @@ -395,6 +395,12 @@ def run( # noqa: PLR0913
)

try:
if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog._match_pattern(
catalog._dataset_patterns, ds
) or catalog._match_pattern(catalog._default_pattern, ds):
_ = catalog._get_dataset(ds)
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
Expand Down
78 changes: 78 additions & 0 deletions tests/framework/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ def mock_runner(mocker):
return mock_runner


@pytest.fixture
def mock_thread_runner(mocker):
mock_runner = mocker.patch(
"kedro.runner.thread_runner.ThreadRunner",
autospec=True,
)
mock_runner.__name__ = "MockThreadRunner`"
return mock_runner


@pytest.fixture
def mock_context_class(mocker):
mock_cls = create_attrs_autospec(KedroContext)
Expand Down Expand Up @@ -693,6 +703,74 @@ def test_run(
catalog=mock_catalog,
)

@pytest.mark.usefixtures("mock_settings_context_class")
@pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME])
@pytest.mark.parametrize("match_pattern", [True, False])
def test_run_thread_runner(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test also verify that dataset factories are resolved? Or is it not possible to test that specifically because the behaviour is not deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not test that because we mock all the calls to runner, pipelines, catalog, etc, as for the rest of the session tests. It only tests that when using ThreadRunner, we enter into the branches to resolve the patterns.

The resolution logic is tested with the catalog tests, though it can be tested here. But for that, we'll have to properly initialise runner, pipelines, and catalog - which might be too complex given that the resolution logic is already tested separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for clarifying! 🙂

self,
fake_project,
fake_session_id,
fake_pipeline_name,
mock_context_class,
mock_thread_runner,
mocker,
match_pattern,
):
"""Test running the project via the session"""

mock_hook = mocker.patch(
"kedro.framework.session.session._create_hook_manager"
).return_value.hook

ds_mock = mocker.Mock(**{"datasets.return_value": ["ds_1", "ds_2"]})
filter_mock = mocker.Mock(**{"filter.return_value": ds_mock})
pipelines_ret = {
_FAKE_PIPELINE_NAME: filter_mock,
"__default__": filter_mock,
}
mocker.patch("kedro.framework.session.session.pipelines", pipelines_ret)
mocker.patch(
"kedro.io.data_catalog.DataCatalog._match_pattern",
return_value=match_pattern,
)

with KedroSession.create(fake_project) as session:
session.run(runner=mock_thread_runner, pipeline_name=fake_pipeline_name)

mock_context = mock_context_class.return_value
record_data = {
"session_id": fake_session_id,
"project_path": fake_project.as_posix(),
"env": mock_context.env,
"kedro_version": kedro_version,
"tags": None,
"from_nodes": None,
"to_nodes": None,
"node_names": None,
"from_inputs": None,
"to_outputs": None,
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
"namespace": None,
"runner": mock_thread_runner.__name__,
}
mock_catalog = mock_context._get_catalog.return_value
mock_pipeline = filter_mock.filter()

mock_hook.before_pipeline_run.assert_called_once_with(
run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog
)
mock_thread_runner.run.assert_called_once_with(
mock_pipeline, mock_catalog, session._hook_manager, fake_session_id
)
mock_hook.after_pipeline_run.assert_called_once_with(
run_params=record_data,
run_result=mock_thread_runner.run.return_value,
pipeline=mock_pipeline,
catalog=mock_catalog,
)

@pytest.mark.usefixtures("mock_settings_context_class")
@pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME])
def test_run_multiple_times(
Expand Down