From 6c819740aa520b2420ef79aa620a2d965262d731 Mon Sep 17 00:00:00 2001 From: Calychas Date: Thu, 17 Oct 2024 12:02:33 +0200 Subject: [PATCH] implement missing pipeline ml slicing functionalities --- kedro_mlflow/pipeline/pipeline_ml.py | 52 +++++++++++++--------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index bdd6f807..52831903 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -4,17 +4,10 @@ from kedro.pipeline import Pipeline from kedro.pipeline.node import Node -MSG_NOT_IMPLEMENTED = ( - "This method is not implemented because it does " - "not make sense for 'PipelineML'. " - "Manipulate directly the training pipeline and " - "recreate the 'PipelineML' with 'pipeline_ml_factory' factory." -) - -MSG_WARNING_KEDRO_VIZ = ( +MSG_WARNING_PIPELINEML_DEMOTED = ( "BEWARE - This 'Pipeline' is no longer a 'PipelineML' object. " - "This method is only implemented for compatibility with kedro-viz " - "but should never be used directly.\nSee " + "This method is only implemented for compatibility with kedro-viz and pipeline resume hints on failure." + "It should never be used directly.\nSee " "https://github.com/Galileo-Galilei/kedro-mlflow/issues/569 " " for more context. " ) @@ -173,16 +166,18 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline): ) def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover - raise NotImplementedError(MSG_NOT_IMPLEMENTED) + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) + return self.training.only_nodes(*node_names) def only_nodes_with_namespace( self, node_namespace: str ) -> "Pipeline": # pragma: no cover - self._logger.warning(MSG_WARNING_KEDRO_VIZ) + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) return self.training.only_nodes_with_namespace(node_namespace) - def only_nodes_with_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover - raise NotImplementedError(MSG_NOT_IMPLEMENTED) + def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": # pragma: no cover + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) + return self.training.only_nodes_with_inputs(*inputs) def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover # exceptionnally, we don't call super() because it raises @@ -194,10 +189,9 @@ def from_inputs(self, *inputs: str) -> "PipelineML": # pragma: no cover pipeline = self.training.from_inputs(*inputs) return self._turn_pipeline_to_ml(pipeline) - def only_nodes_with_outputs( - self, *outputs: str - ) -> "PipelineML": # pragma: no cover - raise NotImplementedError(MSG_NOT_IMPLEMENTED) + def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": # pragma: no cover + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) + return self.training.only_nodes_with_outputs(*outputs) def to_outputs(self, *outputs: str) -> "PipelineML": # pragma: no cover # see from_inputs for an explanation of why we don't call super() @@ -225,13 +219,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML": def filter( self, - tags: Iterable[str] = None, - from_nodes: Iterable[str] = None, - to_nodes: Iterable[str] = None, - node_names: Iterable[str] = None, - from_inputs: Iterable[str] = None, - to_outputs: Iterable[str] = None, - node_namespace: str = None, + tags: Optional[Iterable[str]] = None, + from_nodes: Optional[Iterable[str]] = None, + to_nodes: Optional[Iterable[str]] = None, + node_names: Optional[Iterable[str]] = None, + from_inputs: Optional[Iterable[str]] = None, + to_outputs: Optional[Iterable[str]] = None, + node_namespace: Optional[str] = None, ) -> "Pipeline": # see from_inputs for an explanation of why we don't call super() pipeline = self.training.filter( @@ -246,10 +240,11 @@ def filter( return self._turn_pipeline_to_ml(pipeline) def __add__(self, other): # pragma: no cover - raise NotImplementedError(MSG_NOT_IMPLEMENTED) + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) + return self.training + other def __sub__(self, other): # pragma: no cover - self._logger.warning(MSG_WARNING_KEDRO_VIZ) + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) return self.training - other def __and__(self, other): # pragma: no cover @@ -258,7 +253,8 @@ def __and__(self, other): # pragma: no cover return self._turn_pipeline_to_ml(new_pipeline) def __or__(self, other): # pragma: no cover - raise NotImplementedError(MSG_NOT_IMPLEMENTED) + self._logger.warning(MSG_WARNING_PIPELINEML_DEMOTED) + return self.training | other class KedroMlflowPipelineMLError(Exception):