Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filtering option in mapping #256

Merged
merged 15 commits into from
Jun 25, 2024
118 changes: 90 additions & 28 deletions src/power_grid_model_io/converters/tabular_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ def _convert_table_to_component(
if table not in data:
return None

n_records = len(data[table])
table_mask = np.ones(len(data[table]), dtype=bool)
if "filter" in attributes:
table_mask = self._parse_filters(data, table)

try:
pgm_data = initialize_array(data_type=data_type, component_type=component, shape=n_records)
pgm_data = initialize_array(data_type=data_type, component_type=component, shape=np.sum(table_mask))
except KeyError as ex:
raise KeyError(f"Invalid component type '{component}' or data type '{data_type}'") from ex

Expand All @@ -196,6 +198,7 @@ def _convert_table_to_component(
data=data,
pgm_data=pgm_data,
table=table,
table_mask=table_mask,
component=component,
attr=attr,
col_def=col_def,
Expand All @@ -204,12 +207,20 @@ def _convert_table_to_component(

return pgm_data

def _parse_filters(self, data: TabularData, table: str) -> np.ndarray:
table_mask = np.ones(len(data[table]), dtype=bool)
# for filter_col, functions in self._mapping["filter"].items():
# for fn, kwargs in functions.items():
# table_mask &= data[table][filter_col].apply(fn, **dict(zip(kwargs)))
return table_mask

# pylint: disable = too-many-arguments
def _convert_col_def_to_attribute(
self,
data: TabularData,
pgm_data: np.ndarray,
table: str,
table_mask: np.ndarray,
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
component: str,
attr: str,
col_def: Any,
Expand Down Expand Up @@ -250,12 +261,19 @@ def _convert_col_def_to_attribute(
# Extra info must be linked to the object IDs, therefore the uuids should be known before extra info can
# be parsed. Before this for loop, it is checked that "id" exists and it is placed at the front.
self._handle_extra_info(
data=data, table=table, col_def=col_def, uuids=pgm_data["id"], extra_info=extra_info
data=data,
table=table,
table_mask=table_mask,
col_def=col_def,
uuids=pgm_data["id"],
extra_info=extra_info,
)
# Extra info should not be added to the numpy arrays, so let's continue to the next attribute
return

attr_data = self._parse_col_def(data=data, table=table, col_def=col_def, extra_info=extra_info)
attr_data = self._parse_col_def(
data=data, table=table, table_mask=table_mask, col_def=col_def, extra_info=extra_info
)

if len(attr_data.columns) != 1:
raise ValueError(f"DataFrame for {component}.{attr} should contain a single column ({attr_data.columns})")
Expand All @@ -266,6 +284,7 @@ def _handle_extra_info(
self,
data: TabularData,
table: str,
table_mask: np.ndarray,
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
col_def: Any,
uuids: np.ndarray,
extra_info: Optional[ExtraInfo],
Expand All @@ -292,7 +311,9 @@ def _handle_extra_info(
if extra_info is None:
return

extra = self._parse_col_def(data=data, table=table, col_def=col_def, extra_info=None).to_dict(orient="records")
extra = self._parse_col_def(
data=data, table=table, table_mask=table_mask, col_def=col_def, extra_info=None
).to_dict(orient="records")
for i, xtr in zip(uuids, extra):
xtr = {
k[0] if isinstance(k, tuple) else k: v
Expand Down Expand Up @@ -339,7 +360,7 @@ def _serialize_data(self, data: Dataset, extra_info: Optional[ExtraInfo]) -> Tab
return TabularData(logger=self._log, **data)

def _parse_col_def(
self, data: TabularData, table: str, col_def: Any, extra_info: Optional[ExtraInfo]
self, data: TabularData, table: str, table_mask: np.ndarray, col_def: Any, extra_info: Optional[ExtraInfo]
) -> pd.DataFrame:
"""Interpret the column definition and extract/convert/create the data as a pandas DataFrame.

Expand All @@ -353,17 +374,21 @@ def _parse_col_def(

"""
if isinstance(col_def, (int, float)):
return self._parse_col_def_const(data=data, table=table, col_def=col_def)
return self._parse_col_def_const(data=data, table=table, col_def=col_def, table_mask=table_mask)
if isinstance(col_def, str):
return self._parse_col_def_column_name(data=data, table=table, col_def=col_def)
return self._parse_col_def_column_name(data=data, table=table, col_def=col_def, table_mask=table_mask)
if isinstance(col_def, dict):
return self._parse_col_def_filter(data=data, table=table, col_def=col_def, extra_info=extra_info)
return self._parse_col_def_filter(
data=data, table=table, table_mask=table_mask, col_def=col_def, extra_info=extra_info
)
if isinstance(col_def, list):
return self._parse_col_def_composite(data=data, table=table, col_def=col_def)
return self._parse_col_def_composite(data=data, table=table, table_mask=table_mask, col_def=col_def)
raise TypeError(f"Invalid column definition: {col_def}")

@staticmethod
def _parse_col_def_const(data: TabularData, table: str, col_def: Union[int, float]) -> pd.DataFrame:
def _parse_col_def_const(
data: TabularData, table: str, col_def: Union[int, float], table_mask: Optional[np.ndarray] = None
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
) -> pd.DataFrame:
"""Create a single column pandas DataFrame containing the const value.

Args:
Expand All @@ -376,9 +401,13 @@ def _parse_col_def_const(data: TabularData, table: str, col_def: Union[int, floa

"""
assert isinstance(col_def, (int, float))
if table_mask is not None:
nitbharambe marked this conversation as resolved.
Show resolved Hide resolved
return pd.DataFrame([col_def] * len(data[table][table_mask]))
return pd.DataFrame([col_def] * len(data[table]))

def _parse_col_def_column_name(self, data: TabularData, table: str, col_def: str) -> pd.DataFrame:
def _parse_col_def_column_name(
self, data: TabularData, table: str, col_def: str, table_mask: Optional[np.ndarray] = None
) -> pd.DataFrame:
"""Extract a column from the data. If the column doesn't exist, check if the col_def is a special float value,
like 'inf'. If that's the case, create a single column pandas DataFrame containing the const value.

Expand All @@ -391,7 +420,10 @@ def _parse_col_def_column_name(self, data: TabularData, table: str, col_def: str

"""
assert isinstance(col_def, str)
table_data = data[table]
if table_mask is None:
table_data = data[table]
else:
table_data = data[table][table_mask]

# If multiple columns are given in col_def, return the first column that exists in the dataset
columns = [col_name.strip() for col_name in col_def.split("|")]
Expand All @@ -408,7 +440,7 @@ def _parse_col_def_column_name(self, data: TabularData, table: str, col_def: str
columns_str = " and ".join(f"'{col_name}'" for col_name in columns)
raise KeyError(f"Could not find column {columns_str} on table '{table}'")

return self._parse_col_def_const(data=data, table=table, col_def=const_value)
return self._parse_col_def_const(data=data, table=table, col_def=const_value, table_mask=table_mask)

def _apply_multiplier(self, table: str, column: str, data: pd.Series) -> pd.Series:
if self._multipliers is None:
Expand All @@ -421,7 +453,14 @@ def _apply_multiplier(self, table: str, column: str, data: pd.Series) -> pd.Seri
return data

def _parse_reference(
self, data: TabularData, table: str, other_table: str, query_column: str, key_column: str, value_column: str
self,
data: TabularData,
table: str,
table_mask: np.ndarray,
other_table: str,
query_column: str,
key_column: str,
value_column: str,
) -> pd.DataFrame:
"""
Find and extract a column from a different table.
Expand All @@ -437,15 +476,20 @@ def _parse_reference(
Returns:

"""
queries = self._parse_col_def_column_name(data=data, table=table, col_def=query_column)
keys = self._parse_col_def_column_name(data=data, table=other_table, col_def=key_column)
values = self._parse_col_def_column_name(data=data, table=other_table, col_def=value_column)
queries = self._parse_col_def_column_name(data=data, table=table, col_def=query_column, table_mask=table_mask)
keys = self._parse_col_def_column_name(data=data, table=other_table, col_def=key_column, table_mask=None)
values = self._parse_col_def_column_name(data=data, table=other_table, col_def=value_column, table_mask=None)
other = pd.concat([keys, values], axis=1)
result = queries.merge(other, how="left", left_on=query_column, right_on=key_column)
return result[[value_column]]

def _parse_col_def_filter(
self, data: TabularData, table: str, col_def: Dict[str, Any], extra_info: Optional[ExtraInfo]
self,
data: TabularData,
table: str,
table_mask: np.ndarray,
col_def: Dict[str, Any],
extra_info: Optional[ExtraInfo],
) -> pd.DataFrame:
"""
Parse column filters like 'auto_id', 'reference', 'function', etc
Expand All @@ -464,6 +508,7 @@ def _parse_col_def_filter(
col_data = self._parse_auto_id(
data=data,
table=table,
table_mask=table_mask,
ref_table=sub_def.get("table"),
ref_name=sub_def.get("name"),
key_col_def=sub_def["key"],
Expand All @@ -481,15 +526,20 @@ def _parse_col_def_filter(
return self._parse_reference(
data=data,
table=table,
table_mask=table_mask,
other_table=sub_def["other_table"],
query_column=sub_def["query_column"],
key_column=sub_def["key_column"],
value_column=sub_def["value_column"],
)
elif isinstance(sub_def, list):
col_data = self._parse_pandas_function(data=data, table=table, fn_name=name, col_def=sub_def)
col_data = self._parse_pandas_function(
data=data, table=table, table_mask=table_mask, fn_name=name, col_def=sub_def
)
elif isinstance(sub_def, dict):
col_data = self._parse_function(data=data, table=table, function=name, col_def=sub_def)
col_data = self._parse_function(
data=data, table=table, table_mask=table_mask, function=name, col_def=sub_def
)
else:
raise TypeError(f"Invalid {name} definition: {sub_def}")
data_frames.append(col_data)
Expand All @@ -499,6 +549,7 @@ def _parse_auto_id(
self,
data: TabularData,
table: str,
table_mask: np.ndarray,
ref_table: Optional[str],
ref_name: Optional[str],
key_col_def: Union[str, List[str], Dict[str, str]],
Expand Down Expand Up @@ -535,7 +586,9 @@ def _parse_auto_id(
else:
raise TypeError(f"Invalid key definition type '{type(key_col_def).__name__}': {key_col_def}")

col_data = self._parse_col_def(data=data, table=table, col_def=key_col_def, extra_info=None)
col_data = self._parse_col_def(
data=data, table=table, table_mask=table_mask, col_def=key_col_def, extra_info=None
)

def auto_id(row: np.ndarray):
key = dict(zip(key_names, row))
Expand All @@ -558,7 +611,9 @@ def auto_id(row: np.ndarray):

return col_data.apply(auto_id, axis=1, raw=True)

def _parse_pandas_function(self, data: TabularData, table: str, fn_name: str, col_def: List[Any]) -> pd.DataFrame:
def _parse_pandas_function(
self, data: TabularData, table: str, table_mask: np.ndarray, fn_name: str, col_def: List[Any]
) -> pd.DataFrame:
"""Special vectorized functions.

Args:
Expand All @@ -576,7 +631,7 @@ def _parse_pandas_function(self, data: TabularData, table: str, fn_name: str, co
if fn_name == "multiply":
fn_name = "prod"

col_data = self._parse_col_def(data=data, table=table, col_def=col_def, extra_info=None)
col_data = self._parse_col_def(data=data, table=table, table_mask=table_mask, col_def=col_def, extra_info=None)

try:
fn_ptr = getattr(col_data, fn_name)
Expand All @@ -599,7 +654,9 @@ def _parse_pandas_function(self, data: TabularData, table: str, fn_name: str, co

return pd.DataFrame(fn_ptr(axis=1))

def _parse_function(self, data: TabularData, table: str, function: str, col_def: Dict[str, Any]) -> pd.DataFrame:
def _parse_function(
self, data: TabularData, table: str, table_mask: np.ndarray, function: str, col_def: Dict[str, Any]
) -> pd.DataFrame:
"""Import the function by name and apply it to each row.

Args:
Expand All @@ -616,15 +673,17 @@ def _parse_function(self, data: TabularData, table: str, function: str, col_def:
fn_ptr = get_function(function)
key_words = list(col_def.keys())
sub_def = list(col_def.values())
col_data = self._parse_col_def(data=data, table=table, col_def=sub_def, extra_info=None)
col_data = self._parse_col_def(data=data, table=table, table_mask=table_mask, col_def=sub_def, extra_info=None)

if col_data.empty:
raise ValueError(f"Cannot apply function {function} to an empty DataFrame")

col_data = col_data.apply(lambda row, fn=fn_ptr: fn(**dict(zip(key_words, row))), axis=1, raw=True)
return pd.DataFrame(col_data)

def _parse_col_def_composite(self, data: TabularData, table: str, col_def: list) -> pd.DataFrame:
def _parse_col_def_composite(
self, data: TabularData, table: str, table_mask: np.ndarray, col_def: list
) -> pd.DataFrame:
"""Select multiple columns (each is created from a column definition) and return them as a new DataFrame.

Args:
Expand All @@ -636,7 +695,10 @@ def _parse_col_def_composite(self, data: TabularData, table: str, col_def: list)

"""
assert isinstance(col_def, list)
columns = [self._parse_col_def(data=data, table=table, col_def=sub_def, extra_info=None) for sub_def in col_def]
columns = [
self._parse_col_def(data=data, table=table, table_mask=table_mask, col_def=sub_def, extra_info=None)
for sub_def in col_def
]
return pd.concat(columns, axis=1)

def _get_id(self, table: str, key: Mapping[str, int], name: Optional[str]) -> int:
Expand Down
Binary file modified tests/data/vision/vision_en.xlsx
Binary file not shown.
Binary file modified tests/data/vision/vision_nl.xlsx
Binary file not shown.
Loading
Loading