-
Notifications
You must be signed in to change notification settings - Fork 901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Runners, introduce Task
class
#4206
base: main
Are you sure you want to change the base?
Changes from 6 commits
5eee2d0
4a35750
f094fe7
666cad3
ffc1113
7d0f3b6
a40c6c6
a40f9fb
5eb7cab
0fa8cea
54389ee
94e4894
69699e5
299d4d2
7346f9a
7a0e4d9
30a0d9a
14b10d8
bf6fa5c
9cf2ed0
e94d510
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
from itertools import chain | ||
from typing import TYPE_CHECKING, Any | ||
|
||
from kedro.runner.runner import AbstractRunner, run_node | ||
from kedro.runner.runner import AbstractRunner | ||
|
||
if TYPE_CHECKING: | ||
from pluggy import PluginManager | ||
|
@@ -75,21 +75,22 @@ def _run( | |
|
||
for exec_index, node in enumerate(nodes): | ||
try: | ||
run_node(node, catalog, hook_manager, self._is_async, session_id) | ||
from kedro.runner.task import Task | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed because I refactored |
||
|
||
Task( | ||
node=node, | ||
catalog=catalog, | ||
hook_manager=hook_manager, | ||
is_async=self._is_async, | ||
session_id=session_id, | ||
).execute() | ||
done_nodes.add(node) | ||
except Exception: | ||
self._suggest_resume_scenario(pipeline, done_nodes, catalog) | ||
raise | ||
|
||
# decrement load counts and release any data sets we've finished with | ||
for dataset in node.inputs: | ||
load_counts[dataset] -= 1 | ||
if load_counts[dataset] < 1 and dataset not in pipeline.inputs(): | ||
catalog.release(dataset) | ||
for dataset in node.outputs: | ||
if load_counts[dataset] < 1 and dataset not in pipeline.outputs(): | ||
catalog.release(dataset) | ||
self._release_datasets(node, catalog, load_counts, pipeline) | ||
|
||
self._logger.info( | ||
"Completed %d out of %d tasks", exec_index + 1, len(nodes) | ||
"Completed %d out of %d tasks", len(done_nodes), len(nodes) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ElenaKhaustova can you explain the suggestion of removing the check for the
load_counts[dataset]
to be smaller than 1?