Skip to content

Commit

Permalink
Merged with main
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
  • Loading branch information
ElenaKhaustova committed Sep 27, 2024
1 parent 0e03aa5 commit 643219d
Showing 1 changed file with 51 additions and 17 deletions.
68 changes: 51 additions & 17 deletions kedro/io/kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import difflib
import logging
import re
from typing import Any
from typing import Any, List

from kedro.io.catalog_config_resolver import CatalogConfigResolver, Patterns
from kedro.io.core import (
Expand Down Expand Up @@ -72,10 +72,12 @@ def __init__(

@property
def datasets(self) -> dict[str, Any]:
# TODO: remove when removing old catalog
return copy.copy(self._datasets)

@datasets.setter
def datasets(self, value: Any) -> None:
# TODO: remove when removing old catalog
raise AttributeError(
"Operation not allowed. Please use KedroDataCatalog.add() instead."
)
Expand All @@ -100,6 +102,34 @@ def __eq__(self, other) -> bool: # type: ignore[no-untyped-def]
other.config_resolver.list_patterns(),
)

def keys(self, regex_search: str | None = None) -> List[str]: # noqa: UP006
return self._filter_keys(regex_search)

def values(self, regex_search: str | None = None) -> List[AbstractDataset]: # noqa: UP006
return [self._datasets[key] for key in self._filter_keys(regex_search)]

def items(
self, regex_search: str | None = None
) -> List[tuple[str, AbstractDataset]]: # noqa: UP006
return [(key, self._datasets[key]) for key in self._filter_keys(regex_search)]

def __iter__(self) -> str:
yield from self._datasets.keys()

def __getitem__(self, ds_name: str) -> AbstractDataset:
return self.get_dataset(ds_name)

def __setitem__(self, key: str, value: Any) -> None:
if key in self._datasets:
self._logger.warning("Replacing dataset '%s'", key)
if isinstance(value, AbstractDataset):
self._datasets[key] = value
else:
self._datasets[key] = MemoryDataset(data=value) # type: ignore[abstract]

def _ipython_key_completions_(self) -> list[str]:
return list(self._datasets.keys())

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)
Expand Down Expand Up @@ -219,30 +249,31 @@ def _get_dataset(
def add(
self, ds_name: str, dataset: AbstractDataset, replace: bool = False
) -> None:
# TODO: remove when removing old catalog
"""Adds a new ``AbstractDataset`` object to the ``KedroDataCatalog``."""
if ds_name in self._datasets:
if replace:
self._logger.warning("Replacing dataset '%s'", ds_name)
else:
raise DatasetAlreadyExistsError(
f"Dataset '{ds_name}' has already been registered"
)
self._datasets[ds_name] = dataset
if ds_name in self._datasets and not replace:
raise DatasetAlreadyExistsError(
f"Dataset '{ds_name}' has already been registered"
)
self.__setitem__(ds_name, dataset)

def list(self, regex_search: str | None = None) -> list[str]:
def list(self, regex_search: str | None = None) -> List[str]: # noqa: UP006
"""
List of all dataset names registered in the catalog.
This can be filtered by providing an optional regular expression
which will only return matching keys.
"""
# TODO: remove when removing old catalog
if regex_search == "":
self._logger.warning("The empty string will not match any datasets")
return []

return self.keys(regex_search)

def _filter_keys(self, regex_search: str | None) -> List[str]: # noqa: UP006
if regex_search is None:
return list(self._datasets.keys())

if not regex_search.strip():
self._logger.warning("The empty string will not match any datasets")
return []

try:
pattern = re.compile(regex_search, flags=re.IGNORECASE)
except re.error as exc:
Expand Down Expand Up @@ -305,10 +336,13 @@ def confirm(self, name: str) -> None:
raise DatasetError(f"Dataset '{name}' does not have 'confirm' method")

def add_data(self, data: dict[str, Any], replace: bool = False) -> None:
# This method was simplified to add memory datasets only, since
# adding AbstractDataset can be done via add() method
# TODO: remove when removing old catalog
for ds_name, ds_data in data.items():
self.add(ds_name, MemoryDataset(data=ds_data), replace) # type: ignore[abstract]
if ds_name in self._datasets and not replace:
raise DatasetAlreadyExistsError(
f"Dataset '{ds_name}' has already been registered"
)
self.__setitem__(ds_name, ds_data)

def add_feed_dict(self, feed_dict: dict[str, Any], replace: bool = False) -> None:
# TODO: remove when removing old catalog
Expand Down

0 comments on commit 643219d

Please sign in to comment.