Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement missing PipelineML slicing functionalities #601

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 24 additions & 28 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,10 @@
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does "
"not make sense for 'PipelineML'. "
"Manipulate directly the training pipeline and "
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
)

MSG_WARNING_KEDRO_VIZ = (
MSG_WARNING_PIPELINEML_DEMOTED = (
"BEWARE - This 'Pipeline' is no longer a 'PipelineML' object. "
"This method is only implemented for compatibility with kedro-viz "
"but should never be used directly.\nSee "
"This method is only implemented for compatibility with kedro-viz and pipeline resume hints on failure."
"It should never be used directly.\nSee "
"https://github.com/Galileo-Galilei/kedro-mlflow/issues/569 "
" for more context. "
)
Expand Down Expand Up @@ -173,16 +166,18 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline):
)

def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes(*node_names)

def only_nodes_with_namespace(
self, node_namespace: str
) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_namespace(node_namespace)

def only_nodes_with_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_inputs(*inputs)

def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
# exceptionnally, we don't call super() because it raises
Expand All @@ -194,10 +189,9 @@ def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
pipeline = self.training.from_inputs(*inputs)
return self._turn_pipeline_to_ml(pipeline)

def only_nodes_with_outputs(
self, *outputs: str
) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_outputs(*outputs)

def to_outputs(self, *outputs: str) -> "PipelineML": # pragma: no cover
# see from_inputs for an explanation of why we don't call super()
Expand Down Expand Up @@ -225,13 +219,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML":

def filter(
self,
tags: Iterable[str] = None,
from_nodes: Iterable[str] = None,
to_nodes: Iterable[str] = None,
node_names: Iterable[str] = None,
from_inputs: Iterable[str] = None,
to_outputs: Iterable[str] = None,
node_namespace: str = None,
tags: Optional[Iterable[str]] = None,
from_nodes: Optional[Iterable[str]] = None,
to_nodes: Optional[Iterable[str]] = None,
node_names: Optional[Iterable[str]] = None,
from_inputs: Optional[Iterable[str]] = None,
to_outputs: Optional[Iterable[str]] = None,
node_namespace: Optional[str] = None,
) -> "Pipeline":
# see from_inputs for an explanation of why we don't call super()
pipeline = self.training.filter(
Expand All @@ -246,10 +240,11 @@ def filter(
return self._turn_pipeline_to_ml(pipeline)

def __add__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training + other

def __sub__(self, other): # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training - other

def __and__(self, other): # pragma: no cover
Expand All @@ -258,7 +253,8 @@ def __and__(self, other): # pragma: no cover
return self._turn_pipeline_to_ml(new_pipeline)

def __or__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training | other


class KedroMlflowPipelineMLError(Exception):
Expand Down
Loading