Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: custom parsing, serialization and matching in INI file nodes #2310

Draft
wants to merge 2 commits into
base: fix/lower-case-issues
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion antarest/study/business/areas/renewable_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _make_create_cluster_cmd(
) -> CreateRenewablesCluster:
command = CreateRenewablesCluster(
area_id=area_id,
cluster_name=cluster.id,
cluster_name=cluster.name,
parameters=cluster.model_dump(mode="json", by_alias=True, exclude={"id"}),
command_context=self.storage_service.variant_study_service.command_factory.command_context,
study_version=study_version,
Expand Down
2 changes: 1 addition & 1 deletion antarest/study/business/areas/st_storage_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def get_storage(
try:
config = file_study.tree.get(path.split("/"), depth=1)
except KeyError:
raise STStorageNotFound(path, storage_id) from None
raise STStorageNotFound(path, storage_id)
return create_storage_output(StudyVersion.parse(study.version), storage_id, config)

def update_storage(
Expand Down
2 changes: 1 addition & 1 deletion antarest/study/business/areas/thermal_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _make_create_cluster_cmd(
# between the cluster name and the cluster ID (which is a section name).
args = {
"area_id": area_id,
"cluster_name": cluster.id,
"cluster_name": cluster.name,
"parameters": cluster.model_dump(mode="json", by_alias=True, exclude={"id"}),
"command_context": self.storage_service.variant_study_service.command_factory.command_context,
"study_version": study_version,
Expand Down
171 changes: 108 additions & 63 deletions antarest/study/storage/rawstudy/ini_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,25 @@
import typing as t
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, Optional

from typing_extensions import override

from antarest.core.model import JSON

PrimitiveType = t.Union[str, int, float, bool]
ValueParser = Callable[[str], PrimitiveType]
SelectionPredicate = Callable[[str], bool]

def convert_value(value: str) -> t.Union[str, int, float, bool]:

def _lower_case(input: str) -> str:
return input.lower()


LOWER_CASE_PARSER: ValueParser = _lower_case


def _convert_value(value: str) -> PrimitiveType:
"""Convert value to the appropriate type for JSON."""

try:
Expand All @@ -38,8 +50,19 @@ def convert_value(value: str) -> t.Union[str, int, float, bool]:
return value


@dataclasses.dataclass
class IniFilter:
@dataclasses.dataclass(frozen=True)
class OptionKey:
"""
Defines a location in INI file data.
A None section means all sections.
"""

section: Optional[str]
key: Optional[str]


@dataclasses.dataclass(frozen=True)
class ReaderOptions:
"""
Filter sections and options in an INI file based on regular expressions.

Expand All @@ -48,43 +71,8 @@ class IniFilter:
option_regex: A compiled regex for matching option names.
"""

section_regex: t.Optional[t.Pattern[str]] = None
option_regex: t.Optional[t.Pattern[str]] = None

@classmethod
def from_kwargs(
cls,
section: str = "",
option: str = "",
section_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
option_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
**_unused: t.Any, # ignore unknown options
) -> "IniFilter":
"""
Create an instance from given filtering parameters.

When using `section` or `option` parameters, an exact match is done.
Alternatively, one can use `section_regex` or `option_regex` to perform a full match using a regex.

Args:
section: The section name to match (by default, all sections are matched)
option: The option name to match (by default, all options are matched)
section_regex: The regex for matching section names.
option_regex: The regex for matching option names.
_unused: Placeholder for any unknown options.

Returns:
The newly created instance
"""
if section:
section_regex = re.compile(re.escape(section), re.IGNORECASE)
if option:
option_regex = re.compile(re.escape(option), re.IGNORECASE)
if isinstance(section_regex, str):
section_regex = re.compile(section_regex, re.IGNORECASE) if section_regex else None
if isinstance(option_regex, str):
option_regex = re.compile(option_regex, re.IGNORECASE) if option_regex else None
return cls(section_regex=section_regex, option_regex=option_regex)
section_predicate: Optional[SelectionPredicate] = None
option_predicate: Optional[SelectionPredicate] = None

def select_section_option(self, section: str, option: str = "") -> bool:
"""
Expand All @@ -97,20 +85,58 @@ def select_section_option(self, section: str, option: str = "") -> bool:
Returns:
Whether the section and option match their respective regular expressions.
"""
if self.section_regex and not self.section_regex.fullmatch(section):
if self.section_predicate and not self.section_predicate(section):
return False
if self.option_regex and option and not self.option_regex.fullmatch(option):
if option and self.option_predicate and not self.option_predicate(option):
return False
return True


def ini_reader_options(
section: str = "",
option: str = "",
section_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
option_regex: t.Optional[t.Union[str, t.Pattern[str]]] = None,
) -> ReaderOptions:
"""
Create an instance from given filtering parameters.

When using `section` or `option` parameters, an exact match is done.
Alternatively, one can use `section_regex` or `option_regex` to perform a full match using a regex.

Args:
section: The section name to match (by default, all sections are matched)
option: The option name to match (by default, all options are matched)
section_regex: The regex for matching section names.
option_regex: The regex for matching option names.

Returns:
The newly created instance
"""
return ReaderOptions(
section_predicate=make_predicate(section, section_regex), option_predicate=make_predicate(option, option_regex)
)


def make_predicate(
value: str = "", regex: t.Optional[t.Union[str, t.Pattern[str]]] = None
) -> Optional[SelectionPredicate]:
if value:
option_regex = re.compile(re.escape(value), re.IGNORECASE)
elif isinstance(regex, str):
option_regex = re.compile(regex, re.IGNORECASE) if regex else None
else:
return None
return option_regex.fullmatch


class IReader(ABC):
"""
File reader interface.
"""

@abstractmethod
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
"""
Parse `.ini` file to json object.

Expand Down Expand Up @@ -152,11 +178,17 @@ class IniReader(IReader):
This class is not compatible with standard `.ini` readers.
"""

def __init__(self, special_keys: t.Sequence[str] = (), section_name: str = "settings") -> None:
def __init__(
self,
special_keys: t.Sequence[str] = (),
section_name: str = "settings",
value_parsers: t.Dict[OptionKey, ValueParser] = None,
) -> None:
super().__init__()

# Default section name to use if `.ini` file has no section.
self._special_keys = set(special_keys)
self._value_parsers = value_parsers or {}

# List of keys which should be parsed as list.
self._section_name = section_name
Expand All @@ -180,30 +212,31 @@ def __repr__(self) -> str: # pragma: no cover
return f"{cls}(special_keys={special_keys!r}, section_name={section_name!r})"

@override
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
options = options or ReaderOptions()
if isinstance(path, (Path, str)):
try:
with open(path, mode="r", encoding="utf-8") as f:
sections = self._parse_ini_file(f, **kwargs)
sections = self._parse_ini_file(f, options)
except UnicodeDecodeError:
# On windows, `.ini` files may use "cp1252" encoding
with open(path, mode="r", encoding="cp1252") as f:
sections = self._parse_ini_file(f, **kwargs)
sections = self._parse_ini_file(f, options)
except FileNotFoundError:
# If the file is missing, an empty dictionary is returned.
# This is required to mimic the behavior of `configparser.ConfigParser`.
return {}

elif hasattr(path, "read"):
with path:
sections = self._parse_ini_file(path, **kwargs)
sections = self._parse_ini_file(path, options)

else: # pragma: no cover
raise TypeError(repr(type(path)))

return t.cast(JSON, sections)

def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
def _parse_ini_file(self, ini_file: t.TextIO, options: ReaderOptions) -> JSON:
"""
Parse `.ini` file to JSON object.

Expand Down Expand Up @@ -242,8 +275,6 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
Returns:
Dictionary of parsed `.ini` file which can be converted to JSON.
"""
ini_filter = IniFilter.from_kwargs(**kwargs)

# NOTE: This algorithm is 1.93x faster than configparser.ConfigParser
section_name = self._section_name

Expand All @@ -258,10 +289,10 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:
continue
elif line.startswith("["):
section_name = line[1:-1]
stop = self._handle_section(ini_filter, section_name)
stop = self._handle_section(options, section_name)
elif "=" in line:
key, value = map(str.strip, line.split("=", 1))
stop = self._handle_option(ini_filter, section_name, key, value)
stop = self._handle_option(options, section_name, key, value)
else:
raise ValueError(f"☠☠☠ Invalid line: {line!r}")

Expand All @@ -271,7 +302,7 @@ def _parse_ini_file(self, ini_file: t.TextIO, **kwargs: t.Any) -> JSON:

return self._curr_sections

def _handle_section(self, ini_filter: IniFilter, section: str) -> bool:
def _handle_section(self, ini_filter: ReaderOptions, section: str) -> bool:
# state: a new section is found
match = ini_filter.select_section_option(section)

Expand All @@ -294,29 +325,43 @@ def _append_section(self, section: str) -> None:
self._curr_section = section
self._curr_option = ""

def _handle_option(self, ini_filter: IniFilter, section: str, key: str, value: str) -> bool:
def _handle_option(self, options: ReaderOptions, section: str, key: str, value: str) -> bool:
# state: a new option is found (which may be a duplicate)
match = ini_filter.select_section_option(section, key)
match = options.select_section_option(section, key)

if self._curr_option:
if match:
self._append_option(section, key, value)
self._append_option(section, key, value, options)
return False
# prematurely stop parsing if the filter don't match
return not ini_filter.select_section_option(section)
return not options.select_section_option(section)

if match:
self._append_option(section, key, value)
self._append_option(section, key, value, options)
# continue parsing to the next option
return False

def _append_option(self, section: str, key: str, value: str) -> None:
def _get_parser(self, section: str, key: str) -> ValueParser:
if not self._value_parsers:
return _convert_value
possible_keys = [
OptionKey(section=section, key=key),
OptionKey(section=None, key=key),
]
for k in possible_keys:
if parser := self._value_parsers.get(k, None):
return parser
return _convert_value

def _append_option(self, section: str, key: str, value: str, options: ReaderOptions) -> None:
self._curr_sections.setdefault(section, {})
values = self._curr_sections[section]
parser = self._get_parser(section, key)
parsed = parser(value)
if key in self._special_keys:
values.setdefault(key, []).append(convert_value(value))
values.setdefault(key, []).append(parsed)
else:
values[key] = convert_value(value)
values[key] = parsed
self._curr_option = key


Expand All @@ -326,7 +371,7 @@ class SimpleKeyValueReader(IniReader):
"""

@override
def read(self, path: t.Any, **kwargs: t.Any) -> JSON:
def read(self, path: t.Any, options: Optional[ReaderOptions] = None) -> JSON:
"""
Parse `.ini` file which has no section to JSON object.

Expand Down
Loading
Loading