Skip to content

Commit

Permalink
New datatype and loading for tracking data
Browse files Browse the repository at this point in the history
* test

* added new class for full simulation data

* deleted random file

* docstring, new constant to be used for tests

* adapt compare_tbt to be able to check all fields if tracking simulation data is specified

* add test to read trackone file fully (all fields) as SimulationData in the TbtData matrices

* up minor version

* update type hint of matrices in TbtData

* field instead of plane

* make fieldnames classmethod and return a list of the fields

* with docstrings

* change full_sim_data for is_tracking_data

* group numpy_to_tbt and numpy_to_sim_tbt into one function, with argument to be provided

* rename to is_tracking_data here too

* remove SIMDATA_FIELDS and use the fieldnames classmethod instead

* generate_average_tbtdata now goes through fields automatically

* differenciate between index of bunch and index of field

* declare DataType in structures, import and use it for type hints

* TrackingData instead of SimulationData

---------

Co-authored-by: Felix Soubelet <felix.soubelet@protonmail.com>
  • Loading branch information
fscarlier and fsoubelet authored Jun 5, 2023
1 parent cedbe25 commit 0cdccf9
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 40 deletions.
13 changes: 7 additions & 6 deletions tests/test_lhc_and_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pytest

from turn_by_turn.constants import PLANES, PRINT_PRECISION
from turn_by_turn.constants import PRINT_PRECISION
from turn_by_turn.errors import DataTypeError
from turn_by_turn.io import read_tbt, write_lhc_ascii, write_tbt
from turn_by_turn.structures import TbtData
Expand Down Expand Up @@ -56,15 +56,16 @@ def test_tbt_write_read_ascii(_sdds_file, _test_file):
# ----- Helpers ----- #


def compare_tbt(origin: TbtData, new: TbtData, no_binary: bool, max_deviation=ASCII_PRECISION) -> None:
def compare_tbt(origin: TbtData, new: TbtData, no_binary: bool, max_deviation = ASCII_PRECISION, is_tracking_data: bool = False) -> None:
assert new.nturns == origin.nturns
assert new.nbunches == origin.nbunches
assert new.bunch_ids == origin.bunch_ids
for index in range(origin.nbunches):
for plane in PLANES:
assert np.all(new.matrices[index][plane].index == origin.matrices[index][plane].index)
origin_mat = origin.matrices[index][plane].to_numpy()
new_mat = new.matrices[index][plane].to_numpy()
# In matrices are either TransverseData or TrackingData and we can get all the fields from the `fieldnames` classmethod
for field in origin.matrices[0].fieldnames():
assert np.all(new.matrices[index][field].index == origin.matrices[index][field].index)
origin_mat = origin.matrices[index][field].to_numpy()
new_mat = new.matrices[index][field].to_numpy()
if no_binary:
assert np.nanmax(np.abs(origin_mat - new_mat)) < max_deviation
else:
Expand Down
48 changes: 44 additions & 4 deletions tests/test_ptc_trackone.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import pandas as pd
import pytest

from tests.test_lhc_and_general import compare_tbt, INPUTS_DIR
from tests.test_lhc_and_general import ASCII_PRECISION, INPUTS_DIR, compare_tbt
from turn_by_turn import ptc, trackone
from turn_by_turn.errors import PTCFormatError
from turn_by_turn.structures import TbtData, TransverseData
from turn_by_turn.structures import TbtData, TrackingData, TransverseData


def test_read_ptc(_ptc_file):
Expand Down Expand Up @@ -63,14 +63,23 @@ def test_read_trackone_looseparticles(_ptc_file_losses):
assert not new.matrices[0].X.isna().any().any()


def test_read_trackone_simdata(_ptc_file):
new = trackone.read_tbt(_ptc_file, is_tracking_data=True) # read all fields (includes PX, PY, T, PT, S, E)
origin = _original_simulation_data()
compare_tbt(origin, new, True, is_tracking_data=True)


# ----- Helpers ----- #


def _original_trackone(track: bool = False) -> TbtData:
names = np.array(["C1.BPM1"])
matrix = [
TransverseData(
TransverseData( # first "bunch"
X=pd.DataFrame(index=names, data=[[0.001, -0.0003606, -0.00165823, -0.00266631]]),
Y=pd.DataFrame(index=names, data=[[0.001, 0.00070558, -0.00020681, -0.00093807]]),
),
TransverseData(
TransverseData( # second "bunch"
X=pd.DataFrame(index=names, data=[[0.0011, -0.00039666, -0.00182406, -0.00293294]]),
Y=pd.DataFrame(index=names, data=[[0.0011, 0.00077614, -0.00022749, -0.00103188]]),
),
Expand All @@ -79,6 +88,37 @@ def _original_trackone(track: bool = False) -> TbtData:
return origin


def _original_simulation_data() -> TbtData:
names = np.array(["C1.BPM1"])
matrices = [
TrackingData( # first "bunch"
X=pd.DataFrame(index=names, data=[[0.001, -0.000361, -0.001658, -0.002666]]),
PX=pd.DataFrame(index=names, data=[[0.0, -0.000202, -0.000368, -0.00047]]),
Y=pd.DataFrame(index=names, data=[[0.001, 0.000706, -0.000207, -0.000938]]),
PY=pd.DataFrame(index=names, data=[[0.0, -0.000349, -0.000392, -0.000092]]),
T=pd.DataFrame(index=names, data=[[0.0, -0.000008, -0.000015, -0.000023]]),
PT=pd.DataFrame(index=names, data=[[0, 0, 0, 0]]),
S=pd.DataFrame(index=names, data=[[0, 0, 0, 0]]),
E=pd.DataFrame(index=names, data=[[500.00088, 500.00088, 500.00088, 500.00088]]),
),
TrackingData( # second "bunch"
X=pd.DataFrame(index=names, data=[[0.0011, -0.000397, -0.001824, -0.002933]]),
PX=pd.DataFrame(index=names, data=[[0.0, -0.000222, -0.000405, -0.000517]]),
Y=pd.DataFrame(index=names, data=[[0.0011, 0.000776, -0.000227, -0.001032]]),
PY=pd.DataFrame(index=names, data=[[0.0, -0.000384, -0.000431, -0.000101]]),
T=pd.DataFrame(index=names, data=[[-0.0, -0.000009, -0.000018, -0.000028]]),
PT=pd.DataFrame(index=names, data=[[0, 0, 0, 0]]),
S=pd.DataFrame(index=names, data=[[0, 0, 0, 0]]),
E=pd.DataFrame(index=names, data=[[500.00088, 500.00088, 500.00088, 500.00088]]),
)
]
origin = TbtData(matrices, date=None, bunch_ids=[0, 1], nturns=4) # [0, 1] for bunch_ids because it's from tracking
return origin


# ----- Fixtures ----- #


@pytest.fixture()
def _ptc_file_no_date() -> Path:
return INPUTS_DIR / "test_trackone_no_date"
Expand Down
2 changes: 1 addition & 1 deletion turn_by_turn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
__title__ = "turn_by_turn"
__description__ = "Read and write turn-by-turn measurement files from different particle accelerator formats."
__url__ = "https://github.com/pylhc/turn_by_turn"
__version__ = "0.4.2"
__version__ = "0.5.0"
__author__ = "pylhc"
__author_email__ = "pylhc@github.com"
__license__ = "MIT"
Expand Down
39 changes: 35 additions & 4 deletions turn_by_turn/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
from dataclasses import dataclass, field, fields
from datetime import datetime
from typing import Dict, List, Sequence
from typing import List, Sequence, Union

import pandas as pd
from dateutil import tz
Expand All @@ -21,23 +21,54 @@ class TransverseData:
X: pd.DataFrame # horizontal data
Y: pd.DataFrame # vertical data

def fieldnames(self):
return (f.name for f in fields(self))
@classmethod
def fieldnames(self) -> List[str]:
"""Return a list of the fields of this dataclass."""
return list(f.name for f in fields(self))

def __getitem__(self, item): # to access X and Y like one would with a dictionary
if item not in self.fieldnames():
raise KeyError(f"'{item}' is not in the fields of a {self.__class__.__name__} object.")
return getattr(self, item)


@dataclass
class TrackingData:
"""
Object holding multidimensional turn-by-turn simulation data in the form of pandas DataFrames.
"""

X: pd.DataFrame # horizontal data
PX: pd.DataFrame # horizontal momentum data
Y: pd.DataFrame # vertical data
PY: pd.DataFrame # vertical momentum data
T: pd.DataFrame # longitudinal data
PT: pd.DataFrame # longitudinal momentum data
S: pd.DataFrame # longitudinal position data
E: pd.DataFrame # energy data

@classmethod
def fieldnames(self) -> List[str]:
"""Return a list of the fields of this dataclass."""
return list(f.name for f in fields(self))

def __getitem__(self, item): # to access fields like one would with a dictionary
if item not in self.fieldnames():
raise KeyError(f"'{item}' is not in the fields of a {self.__class__.__name__} object.")
return getattr(self, item)


DataType = Union[TransverseData, TrackingData]


@dataclass
class TbtData:
"""
Object holding a representation of a Turn-by-Turn data measurement. The date of the measurement,
the transverse data, number of turns and bunches as well as the bunch IDs are encapsulated in this object.
"""

matrices: Sequence[TransverseData] # each entry corresponds to a bunch
matrices: Sequence[DataType] # each entry corresponds to a bunch
date: datetime = None # will default in post_init
bunch_ids: List[int] = None # will default in post_init
nturns: int = None
Expand Down
16 changes: 12 additions & 4 deletions turn_by_turn/trackone.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,34 @@

import numpy as np

from turn_by_turn.structures import TbtData
from turn_by_turn.structures import TbtData, TrackingData, TransverseData
from turn_by_turn.utils import numpy_to_tbt

LOGGER = logging.getLogger()


def read_tbt(file_path: Union[str, Path]) -> TbtData:
def read_tbt(file_path: Union[str, Path], is_tracking_data: bool = False) -> TbtData:
"""
Reads turn-by-turn data from the ``MAD-X`` **trackone** format file.
Args:
file_path (Union[str, Path]): path to the turn-by-turn measurement file.
is_tracking_data (bool): if ``True``, all (``X``, ``PX``, ``Y``, ``PY``,
``T``, ``PT``, ``S``, ``E``) fields are expected in the file as it
is considered a full tracking simulation output. Those are then read
into ``TrackingData`` objects. Defaults to ``False``.
Returns:
A ``TbTData`` object with the loaded data.
"""
nturns, npart = get_trackone_stats(file_path)
names, matrix = get_structure_from_trackone(nturns, npart, file_path)
# matrix[0, 2] contains just (x, y) samples.
return numpy_to_tbt(names, matrix[[0, 2]])
if is_tracking_data:
# Converts full tracking output to TbTData.
return numpy_to_tbt(names, matrix, datatype=TrackingData)
else:
# matrix[0, 2] contains just (x, y) samples.
return numpy_to_tbt(names, matrix[[0, 2]], datatype=TransverseData)


def get_trackone_stats(file_path: Union[str, Path], write_out: bool = False) -> Tuple[int, int]:
Expand Down
46 changes: 25 additions & 21 deletions turn_by_turn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
Utility functions for convenience operations on turn-by-turn data objects in this package.
"""
import logging

from typing import Dict, Sequence
from typing import Sequence, Union

import numpy as np
import pandas as pd

from turn_by_turn.constants import PLANES, PLANE_TO_NUM
from turn_by_turn.constants import PLANE_TO_NUM, PLANES
from turn_by_turn.errors import ExclusiveArgumentsError
from turn_by_turn.structures import TbtData, TransverseData
from turn_by_turn.structures import DataType, TbtData, TransverseData

LOGGER = logging.getLogger(__name__)

Expand All @@ -31,19 +30,18 @@ def generate_average_tbtdata(tbtdata: TbtData) -> TbtData:
"""
data = tbtdata.matrices
bpm_names = data[0].X.index
datatype = tbtdata.matrices[0].__class__

new_matrices = [
TransverseData(
X=pd.DataFrame(
index=bpm_names,
data=get_averaged_data(bpm_names, data, "X", tbtdata.nturns),
dtype=float,
),
Y=pd.DataFrame(
index=bpm_names,
data=get_averaged_data(bpm_names, data, "Y", tbtdata.nturns),
dtype=float,
),
datatype( # datatype is directly the class to load data into
**{ # for each field in the datatype, load the corresponding matrix
field: pd.DataFrame(
index=bpm_names,
data=get_averaged_data(bpm_names, data, field, tbtdata.nturns),
dtype=float,
)
for field in datatype.fieldnames()
}
)
]
return TbtData(new_matrices, tbtdata.date, [1], tbtdata.nturns)
Expand Down Expand Up @@ -151,14 +149,18 @@ def add_noise_to_tbt(data: TbtData, noise: float = None, sigma: float = None, se
)


def numpy_to_tbt(names: np.ndarray, matrix: np.ndarray) -> TbtData:
def numpy_to_tbt(names: np.ndarray, matrix: np.ndarray, datatype: DataType = TransverseData) -> TbtData:
"""
Converts turn by turn matrices and names into a ``TbTData`` object.
Args:
names (np.ndarray): Numpy array of BPM names.
matrix (np.ndarray): 4D Numpy array [quantity, BPM, particle/bunch No., turn No.]
quantities in order [x, y].
datatype (DataType): The type of data to be converted to in the matrices. Either
``TransverseData`` (which implies reading ``X`` and ``Y`` fields) or
``TrackingData`` (which implies reading all 8 fields). Defaults to
``TransverseData``.
Returns:
A ``TbtData`` object loaded with the matrices in the provided numpy arrays.
Expand All @@ -167,12 +169,14 @@ def numpy_to_tbt(names: np.ndarray, matrix: np.ndarray) -> TbtData:
_, _, nbunches, nturns = matrix.shape
matrices = []
indices = []
for index in range(nbunches):
for idx_bunch in range(nbunches):
matrices.append(
TransverseData(
X=pd.DataFrame(index=names, data=matrix[0, :, index, :]),
Y=pd.DataFrame(index=names, data=matrix[1, :, index, :]),
datatype( # datatype is directly the class to load data into (TransverseData or TrackingData)
**{ # for each field in the datatype, load the corresponding matrix
field: pd.DataFrame(index=names, data=matrix[idx_field, :, idx_bunch, :])
for idx_field, field in enumerate(datatype.fieldnames())
}
)
)
indices.append(index)
indices.append(idx_bunch)
return TbtData(matrices=matrices, bunch_ids=indices, nturns=nturns)

0 comments on commit 0cdccf9

Please sign in to comment.