Skip to content

Commit

Permalink
Merge pull request #290 from RoseauTechnologies/factorize
Browse files Browse the repository at this point in the history
ENH: factorize some code
  • Loading branch information
benoit9126 authored Nov 29, 2024
2 parents 919be96 + 3b3819b commit 2c1c2f6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 145 deletions.
207 changes: 98 additions & 109 deletions roseau/load_flow/models/lines/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
from collections.abc import Sequence
from importlib import resources
from pathlib import Path
from typing import Literal, NoReturn, TypeAlias
from typing import Literal, NoReturn, TypeAlias, TypeVar

import numpy as np
import numpy.linalg as nplin
import pandas as pd
from numpy._typing import NDArray
from typing_extensions import Self

from roseau.load_flow._compat import StrEnum
from roseau.load_flow.exceptions import RoseauLoadFlowException, RoseauLoadFlowExceptionCode
from roseau.load_flow.typing import (
ComplexArray,
ComplexArrayLike2D,
Float,
FloatArray,
FloatScalarOrArrayLike1D,
Id,
Expand Down Expand Up @@ -56,6 +58,7 @@

MaterialArray: TypeAlias = NDArray[Material]
InsulatorArray: TypeAlias = NDArray[Insulator]
_StrEnumType: TypeAlias = TypeVar("_StrEnumType", bound=StrEnum)


class LineParameters(Identifiable, JsonMixin, CatalogueMixin[pd.DataFrame]):
Expand Down Expand Up @@ -141,7 +144,7 @@ def __init__(
self._size = self._z_line.shape[0]
self._ampacities = None
self.ampacities = ampacities
self._line_type = line_type
self._line_type = None if pd.isna(line_type) else LineType(line_type)
self._materials = None
self.materials = materials
self._insulators = None
Expand Down Expand Up @@ -262,102 +265,20 @@ def sections(self) -> Q_[FloatArray] | None:
@ampacities.setter
@ureg_wraps(None, (None, "A"))
def ampacities(self, value: FloatScalarOrArrayLike1D | None) -> None:
value_isna = pd.isna(value)
if not np.isscalar(value_isna):
value_isna = np.array(value_isna).all()

if value_isna:
values = None
else:
if np.isscalar(value):
if value <= 0:
msg = f"Ampacities must be positive: {value} A was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_AMPACITIES_VALUE)
value = [value for _ in range(self._size)]

values = np.array(value, dtype=np.float64)
if len(values) != self._size:
msg = f"Incorrect number of ampacities: {len(values)} instead of {self._size}"
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_AMPACITIES_SIZE)
if (values <= 0).any():
msg = f"Ampacities must be positive: {values.tolist()} A was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_AMPACITIES_VALUE)
self._ampacities = values
self._ampacities = self._check_positive_float_array(value=value, name="ampacities", unit="A", size=self._size)

@materials.setter
def materials(self, value: Material | Sequence[Material] | None) -> None:
value_isna = pd.isna(value)
if np.isscalar(value_isna):
values = None if value_isna else np.array([Material(value) for _ in range(self._size)], dtype=np.object_)
else:
if np.any(value_isna):
msg = f"Materials cannot contain null values: [{', '.join(f'{x}' for x in value)}] was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_MATERIALS_VALUE)

# Build the numpy array fails with pd.NA inside
values = np.array([Material(x) for x in value], dtype=np.object_)
if len(value) != self._size:
msg = f"Incorrect number of materials: {len(value)} instead of {self._size}"
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_MATERIALS_SIZE)
self._materials = values
self._materials = self._check_enum_array(value=value, enum_class=Material, name="materials", size=self._size)

@insulators.setter
def insulators(self, value: Insulator | Sequence[Insulator] | None) -> None:
value_isna = pd.isna(value)
if np.isscalar(value_isna):
values = None if value_isna else np.array([Insulator(value) for _ in range(self._size)], dtype=np.object_)
elif np.all(value_isna):
values = None
else:
if np.any(value_isna):
msg = f"Insulators cannot contain null values: [{', '.join(f'{x}' for x in value)}] was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_INSULATORS_VALUE)

# Build the numpy array fails with pd.NA inside
values = np.array([Insulator(v) for v in value], dtype=np.object_)
if len(value) != self._size:
msg = f"Incorrect number of insulators: {len(value)} instead of {self._size}"
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_INSULATORS_SIZE)
self._insulators = values
self._insulators = self._check_enum_array(value=value, enum_class=Insulator, name="insulators", size=self._size)

@sections.setter
@ureg_wraps(None, (None, "mm**2"))
def sections(self, value: FloatScalarOrArrayLike1D | None) -> None:
value_isna = pd.isna(value)
if np.isscalar(value_isna):
if value_isna:
values = None
elif value <= 0:
msg = f"Sections must be positive: {value} mm² was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_SECTIONS_VALUE)
else:
values = np.array([value for _ in range(self._size)], dtype=np.float64)
else:
if np.any(value_isna):
msg = f"Sections cannot contain null values: [{', '.join(f'{x}' for x in value)}] mm² was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_SECTIONS_VALUE)

# Build the numpy array fails with pd.NA inside
values = np.array(value, dtype=np.float64)
if len(value) != self._size:
msg = f"Incorrect number of sections: {len(value)} instead of {self._size}"
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_SECTIONS_SIZE)
if (values <= 0).any():
msg = f"Sections must be positive: {values.tolist()} mm² was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode.BAD_SECTIONS_VALUE)

self._sections = values
self._sections = self._check_positive_float_array(value=value, name="sections", unit="mm²", size=self._size)

@staticmethod
def _sym_to_zy_simple(n, z0: complex, z1: complex, y0: complex, y1: complex) -> tuple[ComplexArray, ComplexArray]:
Expand Down Expand Up @@ -451,7 +372,8 @@ def from_sym(
<models-line_parameters-alternative_constructors-symmetric>`, the model may be "degraded" if the computed
impedance matrix is not invertible.
"""
z_line, y_shunt = cls._sym_to_zy(id=id, z0=z0, z1=z1, y0=y0, y1=y1, zn=zn, zpn=1j * xpn, bn=bn, bpn=bpn)
zpn = None if pd.isna(xpn) else 1j * xpn
z_line, y_shunt = cls._sym_to_zy(id=id, z0=z0, z1=z1, y0=y0, y1=y1, zn=zn, zpn=zpn, bn=bn, bpn=bpn)
return cls(id=id, z_line=z_line, y_shunt=y_shunt, ampacities=ampacities)

@classmethod
Expand Down Expand Up @@ -519,10 +441,12 @@ def _sym_to_zy(
yn = bn * 1j # Neutral shunt admittance (Siemens/km)
ypn = bpn * 1j # Phase-to-neutral shunt admittance (Siemens/km)

if zpn == 0 and zn == 0:
logger.warning(
if np.isclose(zpn, 0) and np.isclose(zn, 0):
warnings.warn(
f"The line model {id!r} does not have neutral elements. It will be modelled as a 3 wires line "
f"instead."
f"instead.",
category=UserWarning,
stacklevel=find_stack_level(),
)
else:
z_line = np.pad(z_line, (0, 1), mode="constant", constant_values=zpn)
Expand All @@ -532,7 +456,7 @@ def _sym_to_zy(

# Check the validity of the resulting matrices
det_z = nplin.det(z_line)
if abs(det_z) == 0:
if np.isclose(abs(det_z), 0):
if choice == 0:
# Warn the user that the PwF data are bad...
logger.warning(
Expand Down Expand Up @@ -782,40 +706,36 @@ def _from_geometry(
distance_prim = np.sqrt(np.einsum("ijk,ijk->ij", diff, diff))

# Useful matrices
mask_diagonal = np.eye(4, dtype=np.bool_)
mask_off_diagonal = ~mask_diagonal
minus = -np.ones((4, 4), dtype=np.float64)
np.fill_diagonal(minus, 1)

# Electrical parameters
materials = np.array([material, material, material, material_neutral], dtype=np.object_)
rho = np.array([RHO[x].m for x in materials], dtype=np.float64)
r = rho / sections_m2 * np.eye(4, dtype=np.float64) * 1e3 # resistance (ohm/km)
distance[mask_diagonal] = gmr
np.fill_diagonal(distance, gmr)
inductance = MU_0.m / (2 * PI) * np.log(1 / distance) * 1e3 # H/m->H/km
distance[mask_diagonal] = radius
np.fill_diagonal(distance, radius)
epsilons = np.array([epsilon, epsilon, epsilon, epsilon_neutral], dtype=np.float64)
lambdas = 1 / (2 * PI * epsilons) * np.log(distance_prim / distance) # m/F

# Extract the conductivity and the capacities from the lambda (potential coefficients)
lambda_inv = nplin.inv(lambdas) * 1e3 # capacities (F/km)
c = np.zeros((4, 4), dtype=np.float64) # capacities (F/km)
c[mask_diagonal] = np.einsum("ij,ij->i", lambda_inv, minus)
c[mask_off_diagonal] = -lambda_inv[mask_off_diagonal]
c = -lambda_inv.copy() # capacities (F/km)
np.fill_diagonal(c, np.einsum("ij,ij->i", lambda_inv, minus))
g = np.zeros((4, 4), dtype=np.float64) # conductance (S/km)
omega = OMEGA.m
insulators = np.array([insulator, insulator, insulator, insulator_neutral], dtype=np.object_)
tan_d = np.array([TAN_D[x].m for x in insulators], dtype=np.float64)
g[mask_diagonal] = tan_d * np.einsum("ii->i", c) * omega
np.fill_diagonal(g, tan_d * np.einsum("ii->i", c) * omega)

# Build the impedance and admittance matrices
z_line = r + inductance * omega * 1j
y = g + c * omega * 1j

# Compute the shunt admittance matrix from the admittance matrix
y_shunt = np.zeros((4, 4), dtype=np.complex128)
y_shunt[mask_diagonal] = np.einsum("ij->i", y)
y_shunt[mask_off_diagonal] = -y[mask_off_diagonal]
y_shunt = -(y.copy())
np.fill_diagonal(y_shunt, np.einsum("ij->i", y))

return z_line, y_shunt, line_type, materials, insulators, sections_mm2

Expand Down Expand Up @@ -1689,13 +1609,12 @@ def from_dict(cls, data: JsonDict, *, include_results: bool = True) -> Self:
"""
z_line = np.array(data["z_line"][0]) + 1j * np.array(data["z_line"][1])
y_shunt = np.array(data["y_shunt"][0]) + 1j * np.array(data["y_shunt"][1]) if "y_shunt" in data else None
line_type = LineType(data["line_type"]) if "line_type" in data else None
return cls(
id=data["id"],
z_line=z_line,
y_shunt=y_shunt,
ampacities=data.get("ampacities"),
line_type=line_type,
line_type=data.get("line_type"),
materials=data.get("materials"),
insulators=data.get("insulators"),
sections=data.get("sections"),
Expand Down Expand Up @@ -1745,16 +1664,86 @@ def _check_matrix(self) -> None:
]:
if matrix_name == "y_shunt" and not self.with_shunt:
continue

# Check that the off-diagonal element have a zero real part
off_diagonal_elements = matrix[~np.eye(*matrix.shape, dtype=np.bool_)]
if not np.allclose(off_diagonal_elements.real, 0):
msg = (
warnings.warn(
f"The {matrix_name} matrix of line type {self.id!r} has off-diagonal elements "
f"with a non-zero real part."
f"with a non-zero real part.",
category=UserWarning,
stacklevel=find_stack_level(),
)
warnings.warn(msg, category=UserWarning, stacklevel=find_stack_level())

# Check that the real coefficients are non-negative
if (matrix.real < 0.0).any():
msg = f"The {matrix_name} matrix of line type {self.id!r} has coefficients with negative real part."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=code)

@staticmethod
def _check_enum_array(
value: _StrEnumType | Sequence[_StrEnumType] | None,
enum_class: type[_StrEnumType],
name: Literal["insulators", "materials"],
size: int,
) -> NDArray[_StrEnumType] | None:
value_isna = pd.isna(value)
if np.isscalar(value_isna):
return None if value_isna else np.array([enum_class(value) for _ in range(size)], dtype=np.object_)
elif np.all(value_isna):
return None
else:
if np.any(value_isna):
msg = f"{name.title()} cannot contain null values: [{', '.join(f'{x}' for x in value)}] was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_VALUE"])

# Build the numpy array fails with pd.NA inside
values = np.array([enum_class(v) for v in value], dtype=np.object_)
if len(value) != size:
msg = f"Incorrect number of {name}: {len(value)} instead of {size}."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_SIZE"])
return values

@staticmethod
def _check_positive_float_array(
value: FloatArray | Sequence[Float] | Float | None,
name: Literal["sections", "ampacities"],
unit: str,
size: int,
) -> FloatArray | None:
value_isna = pd.isna(value)
if np.isscalar(value_isna):
if value_isna:
return None
elif value <= 0:
msg = f"{name.title()} must be positive: {value} {unit} was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_VALUE"])
else:
return np.array([value for _ in range(size)], dtype=np.float64)
else:
if np.all(value_isna):
return None
if np.any(value_isna):
msg = (
f"{name.title()} cannot contain null values: [{', '.join(f'{x}' for x in value)}] {unit} "
f"was provided."
)
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_VALUE"])

# Build the numpy array fails with pd.NA inside
values = np.array(value, dtype=np.float64)
if len(value) != size:
msg = f"Incorrect number of {name}: {len(value)} instead of {size}."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_SIZE"])
if (values <= 0).any():
msg = f"{name.title()} must be positive: {values.tolist()} {unit} was provided."
logger.error(msg)
raise RoseauLoadFlowException(msg=msg, code=RoseauLoadFlowExceptionCode[f"BAD_{name.upper()}_VALUE"])

return values
Loading

0 comments on commit 2c1c2f6

Please sign in to comment.