Skip to content

Commit

Permalink
included writing of traj formats for vel, forces and charges
Browse files Browse the repository at this point in the history
  • Loading branch information
97gamjak committed Nov 12, 2023
1 parent 11f82f2 commit f36881e
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 24 deletions.
151 changes: 136 additions & 15 deletions PQAnalysis/io/trajectoryWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
from beartype.typing import List

from .base import BaseWriter
from ..traj.trajectory import Trajectory
from ..traj.trajectory import Trajectory, TrajectoryFormat
from ..traj.frame import Frame
from ..core.cell import Cell
from ..core.atom import Atom
from ..types import Numpy2DFloatArray
from ..types import Numpy2DFloatArray, Numpy1DFloatArray


def write_trajectory(traj, filename: str | None = None, format: str | None = None) -> None:
def write_trajectory(traj, filename: str | None = None, format: str | None = None, type: TrajectoryFormat | str = TrajectoryFormat.XYZ) -> None:
"""
Wrapper for TrajectoryWriter to write a trajectory to a file.
Expand All @@ -33,10 +34,15 @@ def write_trajectory(traj, filename: str | None = None, format: str | None = Non
The trajectory to write.
filename : str, optional
The name of the file to write to. If None, the output is printed to stdout.
format : str, optional
The format of the file. If None, the default PIMD-QMCF format is used.
type : TrajectoryFormat | str, optional
The type of the data to write to the file. Default is TrajectoryFormat.XYZ.
"""

writer = TrajectoryWriter(filename, format)
writer.write(traj)
writer = TrajectoryWriter(filename, format=format)
writer.write(traj, type=type)


class TrajectoryWriter(BaseWriter):
Expand Down Expand Up @@ -64,13 +70,17 @@ class TrajectoryWriter(BaseWriter):
X 0.0 0.0 0.0
coordinates of the atoms in the format 'element x y z'
_type : TrajectoryFormat
The type of the data to write to the file. Default is TrajectoryFormat.XYZ.
Attributes
----------
format : str
The format of the file.
"""

formats = [None, 'pimd-qmcf', 'qmcfc']
_type: TrajectoryFormat = TrajectoryFormat.XYZ

def __init__(self,
filename: str | None = None,
Expand Down Expand Up @@ -101,7 +111,7 @@ def __init__(self,

self.format = format

def write(self, trajectory: Trajectory) -> None:
def write(self, trajectory: Trajectory, type: TrajectoryFormat | str = TrajectoryFormat.XYZ) -> None:
"""
Writes the trajectory to the file.
Expand All @@ -110,13 +120,91 @@ def write(self, trajectory: Trajectory) -> None:
traj : Trajectory
The trajectory to write.
"""
self._type = TrajectoryFormat(type)
if self._type == TrajectoryFormat.XYZ:
self.write_positions(trajectory)
elif self._type == TrajectoryFormat.VEL:
self.write_velocities(trajectory)
elif self._type == TrajectoryFormat.FORCE:
self.write_forces(trajectory)
elif self._type == TrajectoryFormat.CHARGE:
self.write_charges(trajectory)

self.close()

def write_positions(self, trajectory: Trajectory) -> None:
"""
Writes the positions of the trajectory to the file.
Parameters
----------
traj : Trajectory
The trajectory to write.
"""
self._type = TrajectoryFormat.XYZ
self.open()
for frame in trajectory:
self.__write_header__(frame.n_atoms, frame.cell)
self.__write_coordinates__(frame.pos, frame.atoms)
self._write_header(frame.n_atoms, frame.cell)
self._write_comment(frame)
self._write_xyz(frame.pos, frame.atoms)

self.close()

def __write_header__(self, n_atoms: int, cell: Cell | None = None) -> None:
def write_velocities(self, trajectory: Trajectory) -> None:
"""
Writes the velocities of the trajectory to the file.
Parameters
----------
traj : Trajectory
The trajectory to write.
"""
self._type = TrajectoryFormat.VEL
self.open()
for frame in trajectory:
self._write_header(frame.n_atoms, frame.cell)
self._write_comment(frame)
self._write_xyz(frame.vel, frame.atoms)

self.close()

def write_forces(self, trajectory: Trajectory) -> None:
"""
Writes the forces of the trajectory to the file.
Parameters
----------
traj : Trajectory
The trajectory to write.
"""
self._type = TrajectoryFormat.FORCE
self.open()
for frame in trajectory:
self._write_header(frame.n_atoms, frame.cell)
self._write_comment(frame)
self._write_xyz(frame.forces, frame.atoms)

self.close()

def write_charges(self, trajectory: Trajectory) -> None:
"""
Writes the charges of the trajectory to the file.
Parameters
----------
traj : Trajectory
The trajectory to write.
"""
self._type = TrajectoryFormat.CHARGE
self.open()
for frame in trajectory:
self._write_header(frame.n_atoms, frame.cell)
self._write_comment(frame)
self._write_scalar(frame.charges, frame.atoms)

self.close()

def _write_header(self, n_atoms: int, cell: Cell | None = None) -> None:
"""
Writes the header line of the frame to the file.
Expand All @@ -130,27 +218,60 @@ def __write_header__(self, n_atoms: int, cell: Cell | None = None) -> None:

if cell is not None:
print(
f"{n_atoms} {cell.x} {cell.y} {cell.z} {cell.alpha} {cell.beta} {cell.gamma}\n", file=self.file)
f"{n_atoms} {cell.x} {cell.y} {cell.z} {cell.alpha} {cell.beta} {cell.gamma}", file=self.file)
else:
print(f"{n_atoms}", file=self.file)

def _write_comment(self, frame: Frame) -> None:
"""
Writes the comment line of the frame to the file.
Parameters
----------
frame : Frame
The frame to write the comment line of.
"""

if self._type == TrajectoryFormat.FORCE:
sum_forces = sum(frame.forces)
print(
f"sum of forces: {sum_forces[0]} {sum_forces[1]} {sum_forces[2]}", file=self.file)
else:
print(f"{n_atoms}\n", file=self.file)
print("", file=self.file)

def __write_coordinates__(self, xyz: Numpy2DFloatArray, atoms: List[Atom]) -> None:
def _write_xyz(self, xyz: Numpy2DFloatArray, atoms: List[Atom]) -> None:
"""
Writes the coordinates of the frame to the file.
Writes the xyz of the frame to the file.
If format is 'qmcfc', an additional X 0.0 0.0 0.0 line is written.
Parameters
----------
xyz : np.array
The xyz coordinates of the atoms.
The xyz data of the atoms (either positions, velocities or forces).
atoms : Elements
The elements of the frame.
"""

if self.format == "qmcfc":
if self.format == "qmcfc" and self._type == TrajectoryFormat.XYZ:
print("X 0.0 0.0 0.0", file=self.file)

for i in range(len(atoms)):
print(
f"{atoms[i].name} {xyz[i][0]} {xyz[i][1]} {xyz[i][2]}", file=self.file)

def _write_scalar(self, scalar: Numpy1DFloatArray, atoms: List[Atom]) -> None:
"""
Writes the charges of the frame to the file.
Parameters
----------
scalar : np.array
scalar data of the atoms (atm only charges).
atoms : Elements
The elements of the frame.
"""

for i in range(len(atoms)):
print(
f"{atoms[i].name} {scalar[i]}", file=self.file)
88 changes: 79 additions & 9 deletions tests/io/test_trajectoryWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from PQAnalysis.io.trajectoryWriter import TrajectoryWriter, write_trajectory
from PQAnalysis.traj.frame import Frame
from PQAnalysis.traj.trajectory import Trajectory
from PQAnalysis.traj.trajectory import Trajectory, TrajectoryFormat
from PQAnalysis.core.cell import Cell
from PQAnalysis.core.atomicSystem import AtomicSystem
from PQAnalysis.core.atom import Atom

# TODO: here only one option is tested - think of a better way to test all options


def test_write_trajectory(capsys):
atoms = [Atom(atom) for atom in ['h', 'o']]
Expand Down Expand Up @@ -46,34 +48,60 @@ def test__init__(self):
writer = TrajectoryWriter(format="pimd-qmcf")
assert writer.format == "pimd-qmcf"

def test__write_header__(self, capsys):
def test__write_header(self, capsys):

writer = TrajectoryWriter()
writer.__write_header__(1, Cell(10, 10, 10))
writer._write_header(1, Cell(10, 10, 10))

captured = capsys.readouterr()
assert captured.out == "1 10 10 10 90 90 90\n\n"
assert captured.out == "1 10 10 10 90 90 90\n"

writer.__write_header__(1)
writer._write_header(1)
captured = capsys.readouterr()
assert captured.out == "1\n\n"
assert captured.out == "1\n"

def test__write_coordinates__(self, capsys):
def test__write_comment(self, capsys):

writer = TrajectoryWriter()
writer.__write_coordinates__(
writer._write_comment(Frame(AtomicSystem(
atoms=[Atom(atom) for atom in ["h", "o"]], cell=Cell(10, 10, 10))))

captured = capsys.readouterr()
assert captured.out == "\n"

forces = np.array([[1, 0, 3], [0, 2, 1]])
writer._type = TrajectoryFormat.FORCE
writer._write_comment(Frame(AtomicSystem(
atoms=[Atom(atom) for atom in ["h", "o"]], cell=Cell(10, 10, 10), forces=forces)))

captured = capsys.readouterr()
assert captured.out == "sum of forces: 1 2 4\n"

def test__write_xyz(self, capsys):

writer = TrajectoryWriter()
writer._write_xyz(
atoms=[Atom(atom) for atom in ["h", "o"]], xyz=np.array([[0, 0, 0], [0, 0, 1]]))

captured = capsys.readouterr()
assert captured.out == "h 0 0 0\no 0 0 1\n"

writer.format = "qmcfc"
writer.__write_coordinates__(
writer._write_xyz(
atoms=[Atom(atom) for atom in ["h", "o"]], xyz=np.array([[0, 0, 0], [0, 0, 1]]))

captured = capsys.readouterr()
assert captured.out == "X 0.0 0.0 0.0\nh 0 0 0\no 0 0 1\n"

def test__write_scalar(self, capsys):

writer = TrajectoryWriter()
writer._write_scalar(
atoms=[Atom(atom) for atom in ["h", "o"]], scalar=np.array([1, 2]))

captured = capsys.readouterr()
assert captured.out == "h 1\no 2\n"

def test_write(self, capsys):

atoms = [Atom(atom) for atom in ['h', 'o']]
Expand All @@ -92,3 +120,45 @@ def test_write(self, capsys):

captured = capsys.readouterr()
assert captured.out == "2 10 10 10 90 90 90\n\nh 0 0 0\no 0 0 1\n2 11 10 10 90 90 90\n\nh 0 0 0\no 0 0 1\n"

frame1 = Frame(AtomicSystem(
atoms=atoms, vel=coordinates1, cell=Cell(10, 10, 10)))
frame2 = Frame(AtomicSystem(
atoms=atoms, vel=coordinates2, cell=Cell(11, 10, 10)))

traj = Trajectory([frame1, frame2])
writer = TrajectoryWriter()

writer.write(traj, type="vel")

captured = capsys.readouterr()
assert captured.out == "2 10 10 10 90 90 90\n\nh 0 0 0\no 0 0 1\n2 11 10 10 90 90 90\n\nh 0 0 0\no 0 0 1\n"

frame1 = Frame(AtomicSystem(
atoms=atoms, forces=coordinates1, cell=Cell(10, 10, 10)))
frame2 = Frame(AtomicSystem(
atoms=atoms, forces=coordinates2, cell=Cell(11, 10, 10)))

traj = Trajectory([frame1, frame2])
writer = TrajectoryWriter()

writer.write(traj, type="force")

captured = capsys.readouterr()
assert captured.out == "2 10 10 10 90 90 90\nsum of forces: 0 0 1\nh 0 0 0\no 0 0 1\n2 11 10 10 90 90 90\nsum of forces: 0 0 1\nh 0 0 0\no 0 0 1\n"

charges1 = np.array([1, 2])
charges2 = np.array([3, 4])

frame1 = Frame(AtomicSystem(
atoms=atoms, charges=charges1, cell=Cell(10, 10, 10)))
frame2 = Frame(AtomicSystem(
atoms=atoms, charges=charges2, cell=Cell(11, 10, 10)))

traj = Trajectory([frame1, frame2])
writer = TrajectoryWriter()

writer.write(traj, type="charge")

captured = capsys.readouterr()
assert captured.out == "2 10 10 10 90 90 90\n\nh 1\no 2\n2 11 10 10 90 90 90\n\nh 3\no 4\n"

0 comments on commit f36881e

Please sign in to comment.