Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Columnar data Python API step 2 #676

Merged
merged 12 commits into from
Jul 22, 2024
40 changes: 40 additions & 0 deletions src/power_grid_model/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
We do not officially support this functionality and may remove features in this library at any given time!
"""

from copy import deepcopy
from typing import List, Optional, Union, cast

import numpy as np

from power_grid_model.core.data_handling import OutputType, process_output_component_types
from power_grid_model.core.dataset_definitions import ComponentType
from power_grid_model.data_types import (
BatchArray,
Expand All @@ -27,6 +29,7 @@
SinglePythonDataset,
SparseBatchArray,
)
from power_grid_model.typing import ComponentAttributeMapping


def is_nan(data) -> bool:
Expand Down Expand Up @@ -284,3 +287,40 @@ def convert_single_dataset_to_python_single_dataset(data: SingleDataset) -> Sing
]
for component, objects in data.items()
}


def copy_output_to_columnar_dataset(
output_data: Dataset,
output_component_types: ComponentAttributeMapping,
output_type: OutputType,
available_components: list[ComponentType],
) -> Dataset:
"""Temporary function to copy row based dataset to a column based dataset as per output_component_types.
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
The purpose of this function is to mimic columnar data without any memory footprint benefits.
Args:
data (Dataset):
component_types (_ComponentAttributeMappingDict):
Returns:
Dataset: converted to
Args:
output_data (Dataset): dataset to convert
output_component_types (ComponentAttributeMapping): desired component and attribute mapping
output_type (OutputType): output type sym or asym
available_components (list[ComponentType]): available components in model
Returns:
Dataset: converted dataset
"""
processed_output_types = process_output_component_types(output_type, output_component_types, available_components)

result_data = {}
for comp_name, attrs in processed_output_types.items():
if attrs is None:
result_data[comp_name] = output_data[comp_name]
elif isinstance(attrs, (list, set)) and len(attrs) == 0:
result_data[comp_name] = {}
else:
result_data[comp_name] = {attr: deepcopy(output_data[comp_name][attr]) for attr in attrs}
return result_data
84 changes: 62 additions & 22 deletions src/power_grid_model/core/data_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@


from enum import Enum
from typing import Dict, List, Mapping, Set, Tuple, Union
from typing import Mapping, Tuple, Union

import numpy as np

from power_grid_model.core.dataset_definitions import ComponentType, DatasetType
from power_grid_model.core.power_grid_dataset import CConstDataset, CMutableDataset
from power_grid_model.core.power_grid_meta import initialize_array, power_grid_meta_data
from power_grid_model.data_types import Dataset
from power_grid_model.enum import CalculationType
from power_grid_model.typing import ComponentAttributeMapping, _ComponentAttributeMappingDict


class OutputType(Enum):
Expand Down Expand Up @@ -101,15 +103,14 @@ def prepare_output_view(output_data: Mapping[ComponentType, np.ndarray], output_


def create_output_data(
output_component_types: Union[Set[ComponentType], List[ComponentType]],
output_component_types: ComponentAttributeMapping,
output_type: OutputType,
all_component_count: Dict[ComponentType, int],
all_component_count: dict[ComponentType, int],
is_batch: bool,
batch_size: int,
) -> Dict[ComponentType, np.ndarray]:
) -> Dataset:
"""
Create the output data that the user can use. always returns batch type output data.
Use reduce_output_data to flatten to single scenario output if applicable.
Create the output dataset based on component and batch size from the model; and output attributes requested by user.
Args:
output_component_types:
Expand All @@ -123,24 +124,14 @@ def create_output_data(
batch_size:
the batch size
Raises:
KeyError: if some specified components are unknown.
Returns:
dictionary of results of all components
key: component type name to be updated in batch
value:
for single calculation: 1D numpy structured array for the results of this component type
for batch calculation: 2D numpy structured array for the results of this component type
Dimension 0: each batch
Dimension 1: the result of each element for this component type
Dataset: output dataset
"""
# raise error if some specified components are unknown
unknown_components = [x for x in output_component_types if x not in power_grid_meta_data[output_type.value]]
if unknown_components:
raise KeyError(f"You have specified some unknown component types: {unknown_components}")
processed_output_types = process_output_component_types(
output_type, output_component_types, list(all_component_count.keys())
)

all_component_count = {k: v for k, v in all_component_count.items() if k in output_component_types}
all_component_count = {k: v for k, v in all_component_count.items() if k in processed_output_types}

# create result dataset
result_dict = {}
Expand All @@ -152,5 +143,54 @@ def create_output_data(
else:
shape = (count,)
result_dict[name] = initialize_array(output_type.value, name, shape=shape, empty=True)

return result_dict


def process_output_component_types(
output_type: OutputType,
output_component_types: ComponentAttributeMapping,
available_components: list[ComponentType],
) -> _ComponentAttributeMappingDict:
"""Checks valid type for output_component_types. Also checks for any invalid component names and attribute names
Args:
output_type (OutputType): the type of output that the user will see (as per the calculation options)
output_component_types (OutputComponentNamesType): output_component_types provided by user
available_components (list[ComponentType]): all components available in model instance
Raises:
ValueError: when the type for output_comoponent_types is incorrect
KeyError: with "unknown component" for any unknown components
KeyError: with "unknown attributes" for any unknown attributes for a known component
Returns:
_OutputComponentTypeDict: processed output_component_types in a dictionary
"""
# limit all component count to user specified component types in output and convert to a dict
if output_component_types is None:
output_component_types = {k: None for k in available_components}
elif isinstance(output_component_types, (list, set)):
output_component_types = {k: None for k in output_component_types}
elif not isinstance(output_component_types, dict) or not all(
attrs is None or isinstance(attrs, (set, list)) for attrs in output_component_types.values()
):
raise ValueError(f"Invalid output_component_types provided: {output_component_types}")

# raise error if some specified components are unknown
output_meta = power_grid_meta_data[output_type.value]
unknown_components = [x for x in output_component_types if x not in output_meta]
if unknown_components:
raise KeyError(f"You have specified some unknown component types: {unknown_components}")

unknown_attributes = {}
for comp_name, attrs in output_component_types.items():
if attrs is None:
continue
diff = set(attrs).difference(output_meta[comp_name].dtype.names)
if diff != set():
unknown_attributes[comp_name] = diff

if unknown_attributes:
raise KeyError(f"You have specified some unknown attributes: {unknown_attributes}")

return output_component_types
30 changes: 23 additions & 7 deletions src/power_grid_model/core/power_grid_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import numpy as np

from power_grid_model._utils import copy_output_to_columnar_dataset
from power_grid_model.core.data_handling import (
create_output_data,
get_output_type,
Expand All @@ -30,6 +31,8 @@
TapChangingStrategy,
_ExperimentalFeatures,
)
from power_grid_model.errors import PowerGridError
from power_grid_model.typing import ComponentAttributeMapping


class PowerGridModel:
Expand Down Expand Up @@ -187,18 +190,13 @@ def include_type(component_type: ComponentType):
# pylint: disable=too-many-arguments
def _construct_output(
self,
output_component_types: Optional[Union[Set[ComponentType], List[ComponentType]]],
output_component_types: ComponentAttributeMapping,
calculation_type: CalculationType,
symmetric: bool,
is_batch: bool,
batch_size: int,
) -> Dict[ComponentType, np.ndarray]:
all_component_count = self._get_output_component_count(calculation_type=calculation_type)

# limit all component count to user specified component types in output
if output_component_types is None:
output_component_types = set(all_component_count.keys())

return create_output_data(
output_component_types=output_component_types,
output_type=get_output_type(calculation_type=calculation_type, symmetric=symmetric),
Expand Down Expand Up @@ -236,10 +234,11 @@ def _calculate_impl(
calculation_type: CalculationType,
symmetric: bool,
update_data: Optional[Dataset],
output_component_types: Optional[Union[Set[ComponentType], List[ComponentType]]],
output_component_types: ComponentAttributeMapping,
options: Options,
continue_on_batch_error: bool,
decode_error: bool,
experimental_features: Union[_ExperimentalFeatures, str], # pylint: disable=too-many-arguments
):
"""
Core calculation routine
Expand All @@ -266,6 +265,14 @@ def _calculate_impl(
update_ptr = ConstDatasetPtr()
batch_size = 1

if experimental_features in [
_ExperimentalFeatures.disabled,
_ExperimentalFeatures.disabled.name,
] and isinstance(output_component_types, dict):
raise PowerGridError(
"Experimental features flag must be enabled when providing a dict for output_component_types"
)

output_data = self._construct_output(
output_component_types=output_component_types,
calculation_type=calculation_type,
Expand All @@ -291,6 +298,12 @@ def _calculate_impl(
continue_on_batch_error=continue_on_batch_error, batch_size=batch_size, decode_error=decode_error
)

output_data = copy_output_to_columnar_dataset(
output_data=output_data,
output_type=get_output_type(calculation_type=calculation_type, symmetric=symmetric),
available_components=list(self._get_output_component_count(calculation_type=calculation_type).keys()),
output_component_types=output_component_types,
)
return output_data

def _calculate_power_flow(
Expand Down Expand Up @@ -327,6 +340,7 @@ def _calculate_power_flow(
options=options,
continue_on_batch_error=continue_on_batch_error,
decode_error=decode_error,
experimental_features=experimental_features,
)

def _calculate_state_estimation(
Expand Down Expand Up @@ -361,6 +375,7 @@ def _calculate_state_estimation(
options=options,
continue_on_batch_error=continue_on_batch_error,
decode_error=decode_error,
experimental_features=experimental_features,
)

def _calculate_short_circuit(
Expand Down Expand Up @@ -394,6 +409,7 @@ def _calculate_short_circuit(
options=options,
continue_on_batch_error=continue_on_batch_error,
decode_error=decode_error,
experimental_features=experimental_features,
)

def calculate_power_flow(
Expand Down
12 changes: 12 additions & 0 deletions src/power_grid_model/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# SPDX-FileCopyrightText: Contributors to the Power Grid Model project <powergridmodel@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

"""
Type hints for PGM. This includes all miscellaneous type hints not under dataset or dataset_definitions categories
"""
from power_grid_model.core.dataset_definitions import ComponentType

_ComponentAttributeMappingDict = dict[ComponentType, set[str] | list[str] | None]

ComponentAttributeMapping = set[ComponentType] | list[ComponentType] | None | _ComponentAttributeMappingDict
93 changes: 93 additions & 0 deletions tests/unit/test_data_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: Contributors to the Power Grid Model project <powergridmodel@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

import numpy as np
import pytest

from power_grid_model.core.data_handling import OutputType, create_output_data, process_output_component_types
from power_grid_model.core.dataset_definitions import ComponentType as CT, DatasetType as DT
from power_grid_model.core.power_grid_meta import initialize_array


@pytest.mark.parametrize(
("output_component_types", "is_batch", "expected"),
[
(
None,
False,
{
CT.node: initialize_array(DT.sym_output, CT.node, 4),
CT.sym_load: initialize_array(DT.sym_output, CT.sym_load, 3),
CT.source: initialize_array(DT.sym_output, CT.source, 1),
},
),
(
[CT.node, CT.sym_load],
False,
{
CT.node: initialize_array(DT.sym_output, CT.node, 4),
CT.sym_load: initialize_array(DT.sym_output, CT.sym_load, 3),
},
),
(
{CT.node, CT.sym_load},
False,
{
CT.node: initialize_array(DT.sym_output, CT.node, 4),
CT.sym_load: initialize_array(DT.sym_output, CT.sym_load, 3),
},
),
pytest.param(
{CT.node: [], CT.sym_load: []}, True, {CT.node: dict(), CT.sym_load: dict()}, marks=pytest.mark.xfail
),
pytest.param({CT.node: [], CT.sym_load: ["p"]}, True, {}, marks=pytest.mark.xfail),
pytest.param({CT.node: ["u"], CT.sym_load: ["p"]}, True, {}, marks=pytest.mark.xfail),
pytest.param({CT.node: None, CT.sym_load: ["p"]}, True, {}, marks=pytest.mark.xfail),
],
)
def test_create_output_data(output_component_types, is_batch, expected):
# TODO use is_batch and shorten parameterization after columnar data implementation
all_component_count = {CT.node: 4, CT.sym_load: 3, CT.source: 1}
batch_size = 15 if is_batch else 1
actual = create_output_data(
output_component_types=output_component_types,
output_type=OutputType.SYM_OUTPUT,
all_component_count=all_component_count,
is_batch=is_batch,
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
batch_size=batch_size,
)

assert actual.keys() == expected.keys()
for comp in expected:
if isinstance(expected[comp], np.ndarray):
# Row based
assert actual[comp].dtype == expected[comp].dtype
elif expected[comp] == dict():
# Empty atrtibutes
assert actual[comp] == expected[comp]
else:
# Columnar data
assert actual[comp].keys() == expected[comp].keys()
assert all(actual[comp][attr].dtype == expected[comp][attr].dtype for attr in expected[comp])


@pytest.mark.parametrize(
("output_component_types", "error", "match"),
[
({"abc": 3, "def": None}, ValueError, "Invalid output_component_types"),
({"abc": None, "def": None}, KeyError, "unknown component"),
({"abc": None, CT.sym_load: None}, KeyError, "unknown component"),
({"abc": ["xyz"], CT.sym_load: None}, KeyError, "unknown component"),
({CT.node: ["xyz"], CT.sym_load: None}, KeyError, "unknown attributes"),
({CT.node: ["xyz1"], CT.sym_load: ["xyz2"]}, KeyError, "unknown attributes"),
],
)
def test_create_output_data__errors(output_component_types, error, match):
available_components = [CT.node, CT.sym_load, CT.source]
with pytest.raises(error, match=match):
process_output_component_types(
output_type=OutputType.SYM_OUTPUT,
output_component_types=output_component_types,
available_components=available_components,
)