Skip to content

Commit

Permalink
implement missing pipeline ml slicing functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
Calychas committed Oct 17, 2024
1 parent 7aadd6c commit 6c81974
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,10 @@
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does "
"not make sense for 'PipelineML'. "
"Manipulate directly the training pipeline and "
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
)

MSG_WARNING_KEDRO_VIZ = (
MSG_WARNING_PIPELINEML_DEMOTED = (
"BEWARE - This 'Pipeline' is no longer a 'PipelineML' object. "
"This method is only implemented for compatibility with kedro-viz "
"but should never be used directly.\nSee "
"This method is only implemented for compatibility with kedro-viz and pipeline resume hints on failure."
"It should never be used directly.\nSee "
"https://github.com/Galileo-Galilei/kedro-mlflow/issues/569 "
" for more context. "
)
Expand Down Expand Up @@ -173,16 +166,18 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline):
)

def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes(*node_names)

def only_nodes_with_namespace(
self, node_namespace: str
) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_namespace(node_namespace)

def only_nodes_with_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_inputs(*inputs)

def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
# exceptionnally, we don't call super() because it raises
Expand All @@ -194,10 +189,9 @@ def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover
pipeline = self.training.from_inputs(*inputs)
return self._turn_pipeline_to_ml(pipeline)

def only_nodes_with_outputs(
self, *outputs: str
) -> "PipelineML": # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": # pragma: no cover
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training.only_nodes_with_outputs(*outputs)

def to_outputs(self, *outputs: str) -> "PipelineML": # pragma: no cover
# see from_inputs for an explanation of why we don't call super()
Expand Down Expand Up @@ -225,13 +219,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML":

def filter(
self,
tags: Iterable[str] = None,
from_nodes: Iterable[str] = None,
to_nodes: Iterable[str] = None,
node_names: Iterable[str] = None,
from_inputs: Iterable[str] = None,
to_outputs: Iterable[str] = None,
node_namespace: str = None,
tags: Optional[Iterable[str]] = None,
from_nodes: Optional[Iterable[str]] = None,
to_nodes: Optional[Iterable[str]] = None,
node_names: Optional[Iterable[str]] = None,
from_inputs: Optional[Iterable[str]] = None,
to_outputs: Optional[Iterable[str]] = None,
node_namespace: Optional[str] = None,
) -> "Pipeline":
# see from_inputs for an explanation of why we don't call super()
pipeline = self.training.filter(
Expand All @@ -246,10 +240,11 @@ def filter(
return self._turn_pipeline_to_ml(pipeline)

def __add__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training + other

def __sub__(self, other): # pragma: no cover
self._logger.warning(MSG_WARNING_KEDRO_VIZ)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training - other

def __and__(self, other): # pragma: no cover
Expand All @@ -258,7 +253,8 @@ def __and__(self, other): # pragma: no cover
return self._turn_pipeline_to_ml(new_pipeline)

def __or__(self, other): # pragma: no cover
raise NotImplementedError(MSG_NOT_IMPLEMENTED)
self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED)
return self.training | other


class KedroMlflowPipelineMLError(Exception):
Expand Down

0 comments on commit 6c81974

Please sign in to comment.