diff --git a/CHANGELOG.md b/CHANGELOG.md index 5041f92..e7ce56a 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,22 +16,21 @@ The rules for this file: * accompany each entry with github issue/PR number (Issue #xyz) --> -## [Unreleased] +## [0.1.0] ### Authors - +- ljwoods2 ### Added - +- implemented v0.1.0 zarrtraj spec ### Fixed - + ### Changed - + ### Deprecated - -### Removed - + + diff --git a/zarrtraj/ZARRTRAJ.py b/zarrtraj/ZARRTRAJ.py index b132012..975e7c7 100755 --- a/zarrtraj/ZARRTRAJ.py +++ b/zarrtraj/ZARRTRAJ.py @@ -123,6 +123,7 @@ class MockZarrFile: pass + zarr = types.ModuleType("zarr") zarr.File = MockZarrFile @@ -134,7 +135,7 @@ class MockZarrFile: #: Default frames per data chunk is 10 ZARRTRAJ_DEFAULT_FPC = 10 #: Currently implemented version of the file format -ZARRTRAJ_VERSION = '0.1.0' +ZARRTRAJ_VERSION = "0.1.0" class ZarrTrajBoundaryConditions(Enum): @@ -148,16 +149,15 @@ class ZarrTrajReader(base.ReaderBase): For more information on the format, see the :ref:`zarrtraj_spec`. """ - format = 'ZARRTRAJ' + format = "ZARRTRAJ" @store_init_arguments - def __init__(self, filename, - **kwargs): + def __init__(self, filename, **kwargs): """ Parameters ---------- filename : :class:`zarr.Group` - Open, readable zarrtraj file + open, readable zarrtraj file convert_units : bool (optional) convert units to MDAnalysis units **kwargs : dict @@ -166,7 +166,7 @@ def __init__(self, filename, Raises ------ RuntimeError - when `zarr` is not installed + when ``zarr`` is not installed PermissionError when the Zarr group is not readable RuntimeError @@ -181,39 +181,54 @@ def __init__(self, filename, 'force' group RuntimeError when the Zarrtraj file version is incompatibile with the reader + ValueError + when an observables dataset is not sampled at the same rate as + the position, velocity, and force datasets """ if not HAS_ZARR: - raise RuntimeError("Please install zarr") + raise RuntimeError("ZarrTrajReader: Please install zarr") super(ZarrTrajReader, self).__init__(filename, **kwargs) # ReaderBase calls self.filename = str(filename), which we want to undo self.filename = filename if not self.filename: - raise PermissionError("The Zarr group is not readable") + raise PermissionError( + "ZarrTrajReader: The Zarr group is not readable" + ) if self.filename.attrs["version"] != ZARRTRAJ_VERSION: - raise RuntimeError("Zarrtraj file version " + - f"{self.filename.attrs['version']} " + - "is not compatible with reader version " + - f"{ZARRTRAJ_VERSION}") + raise RuntimeError( + "ZarrTrajReader: Zarrtraj file version " + + f"{self.filename.attrs['version']} " + + "is not compatible with reader version " + + f"{ZARRTRAJ_VERSION}" + ) self._frame = -1 self._file = self.filename - self._particle_group = self._file['particles'] - self._step_array = self._particle_group['step'] - self._time_array = self._particle_group['time'] + self._particle_group = self._file["particles"] + self._step_array = self._particle_group["step"] + self._time_array = self._particle_group["time"] # IO CALL - self._boundary = (ZarrTrajBoundaryConditions.ZARRTRAJ_NONE if - self._particle_group["box"].attrs[ - "boundary"] == "none" - else ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC) - - if (self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC and - 'dimensions' not in self._particle_group['box']): - raise NoDataError("Triclinic box dimension array must " + - "be provided if `boundary` is set to 'periodic'") + self._boundary = ( + ZarrTrajBoundaryConditions.ZARRTRAJ_NONE + if self._particle_group["box"].attrs["boundary"] == "none" + else ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC + ) + + if ( + self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC + and "dimensions" not in self._particle_group["box"] + ): + raise NoDataError( + "ZarrTrajReader: Triclinic box dimension array must " + + "be provided if `boundary` is set to 'periodic'" + ) # IO CALL - self._has = set(name for name in self._particle_group if - name in ('positions', 'velocities', 'forces')) + self._has = set( + name + for name in self._particle_group + if name in ("positions", "velocities", "forces") + ) # Get n_atoms + numcodecs compressor & filter objects from # first available dataset @@ -225,42 +240,62 @@ def __init__(self, filename, # NOTE: add filters break else: - raise NoDataError("Provide at least a position, velocity " + - "or force array in the ZarrTraj file.") + raise NoDataError( + "ZarrTrajReader: Provide at least a position, velocity " + + "or force array in the ZarrTraj file." + ) for name in self._has: self._n_frames = self._particle_group[name].shape[0] break - self.ts = self._Timestep(self.n_atoms, - positions=self.has_positions, - velocities=self.has_velocities, - forces=self.has_forces, - **self._ts_kwargs) + self.ts = self._Timestep( + self.n_atoms, + positions=self.has_positions, + velocities=self.has_velocities, + forces=self.has_forces, + **self._ts_kwargs, + ) self._verify_correct_units() - self.units = {'time': 'ps', - 'length': 'nm', - 'velocity': 'nm/ps', - 'force': 'kJ/(mol*nm)'} + self.units = { + "time": "ps", + "length": "nm", + "velocity": "nm/ps", + "force": "kJ/(mol*nm)", + } self._read_next_timestep() def _verify_correct_units(self): - self._unit_group = self._particle_group['units'] - if ('length' not in self._unit_group.attrs or - self._unit_group.attrs['length'] != "nm"): - raise RuntimeError("Zarrtraj file with positions must contain " + - "'nm' length unit") - if ('velocity' not in self._unit_group.attrs or - self._unit_group.attrs['velocity'] != "nm/ps"): - raise RuntimeError("Zarrtraj file must contain " + - "'nm/ps' velocity unit") - if ('force' not in self._unit_group.attrs or - self._unit_group.attrs['force'] != "kJ/(mol*nm)"): - raise RuntimeError("Zarrtraj file with forces must contain " + - "'kJ/(mol*nm)' force unit") - if (self._unit_group.attrs['time'] != "ps"): - raise RuntimeError("Zarrtraj file must contain 'ps' for time unit") + self._unit_group = self._particle_group["units"] + if ( + "length" not in self._unit_group.attrs + or self._unit_group.attrs["length"] != "nm" + ): + raise RuntimeError( + "ZarrTrajReader: Zarrtraj file with positions must contain " + + "'nm' length unit" + ) + if ( + "velocity" not in self._unit_group.attrs + or self._unit_group.attrs["velocity"] != "nm/ps" + ): + raise RuntimeError( + "ZarrTrajReader: Zarrtraj file must contain " + + "'nm/ps' velocity unit" + ) + if ( + "force" not in self._unit_group.attrs + or self._unit_group.attrs["force"] != "kJ/(mol*nm)" + ): + raise RuntimeError( + "ZarrTrajReader: Zarrtraj file with forces must contain " + + "'kJ/(mol*nm)' force unit" + ) + if self._unit_group.attrs["time"] != "ps": + raise RuntimeError( + "ZarrTrajReader: Zarrtraj file must contain 'ps' for time unit" + ) def _read_next_timestep(self): """Read next frame in trajectory""" @@ -276,12 +311,21 @@ def _read_frame(self, frame): particle_group = self._particle_group ts.frame = frame - # NOTE: handle observables - # Fills data dictionary from 'observables' group - # Note: dt is not read into data as it is not decided whether - # Timestep should have a dt attribute (see Issue #2825) self.ts.time = self._time_array[self._frame] - self.ts.data['step'] = self._step_array[self._frame] + self.ts.data["step"] = self._step_array[self._frame] + + # Handle observables + if "observables" in particle_group: + try: + for key in particle_group["observables"].keys(): + self.ts.data[key] = self._particle_group["observables"][ + key + ][frame] + except IndexError: + raise ValueError( + "ZarrTrajReader: Observables data must be sampled at the same " + + "rate as the position, velocity, and force data." + ) # Sets frame box dimensions if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: @@ -293,11 +337,11 @@ def _read_frame(self, frame): # set the timestep positions, velocities, and forces with # current frame dataset if self.has_positions: - self._read_dataset_into_ts('positions', ts.positions) + self._read_dataset_into_ts("positions", ts.positions) if self.has_velocities: - self._read_dataset_into_ts('velocities', ts.velocities) + self._read_dataset_into_ts("velocities", ts.velocities) if self.has_forces: - self._read_dataset_into_ts('forces', ts.forces) + self._read_dataset_into_ts("forces", ts.forces) self._convert_units() @@ -307,17 +351,18 @@ def _read_dataset_into_ts(self, dataset, attribute): """Reads position, velocity, or force dataset array at current frame into corresponding ts attribute""" - n_atoms_now = self._particle_group[dataset][ - self._frame].shape[0] + n_atoms_now = self._particle_group[dataset][self._frame].shape[0] if n_atoms_now != self.n_atoms: - raise ValueError(f"Frame {self._frame} of the {dataset} dataset" - f" has {n_atoms_now} atoms but the initial frame" - " of either the postion, velocity, or force" - f" dataset had {self.n_atoms} atoms." - " MDAnalysis is unable to deal" - " with variable topology!") + raise ValueError( + f"ZarrTrajReader: Frame {self._frame} of the {dataset} dataset" + f" has {n_atoms_now} atoms but the initial frame" + " of either the postion, velocity, or force" + f" dataset had {self.n_atoms} atoms." + " MDAnalysis is unable to deal" + " with variable topology!" + ) - attribute[:] = self._particle_group[dataset][self._frame, :] + attribute[:] = self._particle_group[dataset][self._frame] def _convert_units(self): """Converts position, velocity, and force values to @@ -346,15 +391,17 @@ def _format_hint(thing): @staticmethod def parse_n_atoms(filename, storage_options=None): - for group in filename['particles']: - if group in ('positions', 'velocities', 'forces'): - n_atoms = filename[f'particles/{group}'].shape[1] + for group in filename["particles"]: + if group in ("positions", "velocities", "forces"): + n_atoms = filename[f"particles/{group}"].shape[1] return n_atoms - raise NoDataError("Could not construct minimal topology from the " + - "Zarrtraj trajectory file, as it did not contain " + - "a 'position', 'velocity', or 'force' group. " + - "You must include a topology file.") + raise NoDataError( + "Could not construct minimal topology from the " + + "Zarrtraj trajectory file, as it did not contain " + + "a 'position', 'velocity', or 'force' group. " + + "You must include a topology file." + ) def close(self): """Close reader""" @@ -371,16 +418,15 @@ def n_frames(self): return self._n_frames def Writer(self, filename, n_atoms=None, **kwargs): - """Return writer for trajectory format - """ + """Return writer for trajectory format""" if n_atoms is None: n_atoms = self.n_atoms - kwargs.setdefault('format', "ZARRTRAJ") - kwargs.setdefault('compressor', self.compressor) + kwargs.setdefault("format", "ZARRTRAJ") + kwargs.setdefault("compressor", self.compressor) # NOTE: add filters - kwargs.setdefault('positions', self.has_positions) - kwargs.setdefault('velocities', self.has_velocities) - kwargs.setdefault('forces', self.has_forces) + kwargs.setdefault("positions", self.has_positions) + kwargs.setdefault("velocities", self.has_velocities) + kwargs.setdefault("forces", self.has_forces) return ZarrTrajWriter(filename, n_atoms, **kwargs) @property @@ -400,12 +446,12 @@ def has_forces(self): def __getstate__(self): state = self.__dict__.copy() - del state['_particle_group'] + del state["_particle_group"] return state def __setstate__(self, state): self.__dict__ = state - self._particle_group = self._file['particles'] + self._particle_group = self._file["particles"] self[self.ts.frame] @@ -427,16 +473,16 @@ class ZarrTrajWriter(base.WriterBase): chunks : tuple (optional) Custom chunk layout to be applied to the position, velocity, and force datasets. By default, these datasets - are chunked in ``{10, n_atoms, 3}`` blocks + are chunked in ``(10, n_atoms, 3)`` blocks max_memory : int (optional) Maximum memory buffer size in bytes for writing to a - a cloud-backed Zarr group or when `force_bufferd=True`. + a cloud-backed Zarr group or when ``force_buffered=True``. By default, this is set to 10 MB. compressor : str or int (optional) - `numcodecs` compressor object to be applied + ``numcodecs`` compressor object to be applied to position, velocity, force, and observables datasets. filters : list (optional) - list of `numcodecs` filter objects to be applied to + list of ``numcodecs`` filter objects to be applied to to position, velocity, force, and observables datasets. positions : bool (optional) Write positions into the trajectory [``True``] @@ -485,30 +531,45 @@ class ZarrTrajWriter(base.WriterBase): chunk of the trajectory data ValueError when the step or time dataset does not increase monotonically - """ + ValueError + when an observables dataset is not sampled at the same rate as + positions, velocities, and forces + """ - format = 'ZARRTRAJ' + format = "ZARRTRAJ" multiframe = True #: These variables are not written from :attr:`Timestep.data` #: dictionary to the observables group in the Zarrtraj file - data_blacklist = ['step', 'time', 'dt'] - - def __init__(self, filename, n_atoms, n_frames=None, - chunks=None, - positions=True, velocities=True, - forces=True, compressor=None, - filters=None, max_memory=None, - force_buffered=False, - author_email=None, author='N/A', - creator='MDAnalysis', creator_version=mda.__version__, - **kwargs): + data_blacklist = ["step", "time", "dt"] + + def __init__( + self, + filename, + n_atoms, + n_frames=None, + chunks=None, + positions=True, + velocities=True, + forces=True, + compressor=None, + filters=None, + max_memory=None, + force_buffered=False, + author_email=None, + author="N/A", + creator="MDAnalysis", + creator_version=mda.__version__, + **kwargs, + ): if not HAS_ZARR: raise RuntimeError("ZarrTrajWriter: Please install zarr") # Verify group is open for writing if not filename.store.is_writeable(): - raise PermissionError("The Zarr group is not writeable") + raise PermissionError( + "ZarrTrajWriter: The Zarr group is not writeable" + ) if n_atoms == 0: raise ValueError("ZarrTrajWriter: no atoms in output trajectory") self.filename = filename @@ -519,49 +580,62 @@ def __init__(self, filename, n_atoms, n_frames=None, # Fill in Zarrtraj metadata from kwargs # IO CALL - self._file.attrs['version'] = ZARRTRAJ_VERSION - self._file.require_group('metadata') - self._file['metadata'].attrs['author'] = author + self._file.attrs["version"] = ZARRTRAJ_VERSION + self._file.require_group("metadata") + self._file["metadata"].attrs["author"] = author if author_email is not None: - self._file['metadata'].attrs['author_email'] = author_email - self._file['metadata'].attrs['creator'] = creator - if creator == 'MDAnalysis': - self._file['metadata'].attrs['creator_version'] = creator_version + self._file["metadata"].attrs["author_email"] = author_email + self._file["metadata"].attrs["creator"] = creator + if creator == "MDAnalysis": + self._file["metadata"].attrs["creator_version"] = creator_version self._determine_if_buffered_storage() if self._is_buffered_store: # Ensure n_frames exists if n_frames is None: - raise RuntimeError("ZarrTrajWriter: Buffered writing requires " + - "'n_frames' kwarg") - self.max_memory = (ZARRTRAJ_DEFAULT_BUFSIZE if max_memory is None - else max_memory) + raise RuntimeError( + "ZarrTrajWriter: Buffered writing requires " + + "'n_frames' kwarg" + ) + self.max_memory = ( + ZARRTRAJ_DEFAULT_BUFSIZE if max_memory is None else max_memory + ) - self.chunks = ((ZARRTRAJ_DEFAULT_FPC, self.n_atoms, 3) - if chunks is None else chunks) + self.chunks = ( + (ZARRTRAJ_DEFAULT_FPC, self.n_atoms, 3) + if chunks is None + else chunks + ) self.filters = filters if filters is not None else [] - self.compressor = (compressor if compressor is not None else - zarr.storage.default_compressor) + self.compressor = ( + compressor + if compressor is not None + else zarr.storage.default_compressor + ) # The writer defaults to writing all data from the parent Timestep if # it exists. If these are True, the writer will check each # Timestep.has_* value and fill the self._has dictionary accordingly # in _initialize_hdf5_datasets() self._write = set() if positions: - self._write.add('positions') + self._write.add("positions") if velocities: - self._write.add('velocities') + self._write.add("velocities") if forces: - self._write.add('forces') + self._write.add("forces") - self.units = {'time': 'ps', - 'length': 'nm', - 'velocity': 'nm/ps', - 'force': 'kJ/(mol*nm)'} + self.units = { + "time": "ps", + "length": "nm", + "velocity": "nm/ps", + "force": "kJ/(mol*nm)", + } if not self._write: - raise ValueError("At least one of positions, velocities, or " - "forces must be set to ``True``.") + raise ValueError( + "ZarrTrajWriter: At least one of positions, velocities, or " + "forces must be set to ``True``." + ) self._initial_write = True @@ -569,22 +643,26 @@ def _determine_if_buffered_storage(self): # Check if we are working with a cloud storage type store = self._file.store if isinstance(store, zarr.storage.FSStore): - if 's3' in store.fs.protocol: + if "s3" in store.fs.protocol: # Verify s3fs is installed # NOTE: Verify this is necessary try: import s3fs except ImportError: - raise Exception("Writing to AWS S3 requires installing " + - + "s3fs") + raise Exception( + "ZarrTrajWriter: Writing to AWS S3 requires installing " + + +"s3fs" + ) self._is_buffered_store = True - elif 'gcs' in store.fs.protocol: + elif "gcs" in store.fs.protocol: # Verify gcsfs is installed try: import gcsfs except ImportError: - raise Exception("Writing to Google Cloud Storage " + - + "requires installing gcsfs") + raise Exception( + "ZarrTrajWriter: Writing to Google Cloud Storage " + + +"requires installing gcsfs" + ) self._is_buffered_store = True elif isinstance(store, zarr.storage.ABSStore): self._is_buffered_store = True @@ -610,19 +688,21 @@ def _write_next_frame(self, ag): # Universe? ts = ag.trajectory.ts except AttributeError: - errmsg = "Input obj is neither an AtomGroup or Universe" + errmsg = "ZarrTrajWriter: Input obj is neither an AtomGroup or Universe" raise TypeError(errmsg) from None if ts.n_atoms != self.n_atoms: - raise IOError("ZarrTrajWriter: Timestep does not have" - " the correct number of atoms") + raise IOError( + "ZarrTrajWriter: Timestep does not have" + " the correct number of atoms" + ) # This will only be called once when first timestep is read. if self._initial_write: self._determine_has(ts) self._determine_units(ag) if self._is_buffered_store: - self._check_max_memory() + self._check_max_memory(ts) self._initialize_zarr_datasets(ts) self._initialize_memory_buffers() else: @@ -645,17 +725,25 @@ def _determine_units(self, ag): from_units = ag.universe.trajectory.units.copy() if self.has_positions and not from_units["length"]: - raise ValueError("The trajectory is missing length units.") + raise ValueError( + "ZarrTrajWriter: The trajectory is missing length units." + ) if self.has_velocities and not from_units["velocity"]: - raise ValueError("The trajectory is missing velocity units.") + raise ValueError( + "ZarrTrajWriter: The trajectory is missing velocity units." + ) if self.has_forces and not from_units["force"]: - raise ValueError("The trajectory is missing force units.") + raise ValueError( + "ZarrTrajWriter: The trajectory is missing force units." + ) if not from_units["time"]: - raise ValueError("The trajectory is missing time units.") + raise ValueError( + "ZarrTrajWriter: The trajectory is missing time units." + ) def _determine_has(self, ts): - # ask the parent file if it has positions, velocities, and forces, - # and dimensions + """ask the parent file if it has positions, velocities, and forces, + dimensions, and observables""" self._has = set() if "positions" in self._write and ts.has_positions: self._has.add("positions") @@ -663,47 +751,57 @@ def _determine_has(self, ts): self._has.add("velocities") if "forces" in self._write and ts.has_forces: self._has.add("forces") - self._boundary = (ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC - if ts.dimensions is not None - else ZarrTrajBoundaryConditions.ZARRTRAJ_NONE) + self._boundary = ( + ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC + if ts.dimensions is not None + else ZarrTrajBoundaryConditions.ZARRTRAJ_NONE + ) + # include observable datasets from ts.data dictionary that + # are NOT in self.data_blacklist + self.data_keys = [ + key for key in ts.data.keys() if key not in self.data_blacklist + ] - def _check_max_memory(self): + def _check_max_memory(self, ts): """ Determines if at least one chunk of size ``chunks`` fits in the ``max_memory``sized buffer. If not, the writer will fail without allocating space for trajectory data on the cloud. """ - - float32_size = np.dtype(np.float32).itemsize - int32_size = np.dtype(np.int32).itemsize + # For each ts element in step, time, dimensions, positions, velocities, + # forces, and observables, add ts[0].size * ts.itemsize * self.chunks[0] + # to mem_per_chunk + has = [] mem_per_chunk = 0 - - # Write edges, step, and time in the same pattern as - # velocity, force, and position, though it is not - # strictly necessary for simplicity + try: + has.append(np.asarray(ts.data["step"])) + except KeyError: + has.append(np.asarray(ts.frame)) + has.append(np.asarray(ts.time)) if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - mem_per_chunk += float32_size * self.chunks[0] * 9 - # Step - mem_per_chunk += int32_size * self.chunks[0] - # Time - mem_per_chunk += float32_size * self.chunks[0] - + has.append(ts.triclinic_dimensions) if self.has_positions: - mem_per_chunk += float32_size * self.chunks[0] * self.n_atoms * 3 - - if self.has_forces: - mem_per_chunk += float32_size * self.chunks[0] * self.n_atoms * 3 - + has.append(ts.positions) if self.has_velocities: - mem_per_chunk += float32_size * self.chunks[0] * self.n_atoms * 3 + has.append(ts.velocities) + if self.has_forces: + has.append(ts.forces) + for key in self.data_keys: + data = np.asarray(ts.data[key]) + has.append(data) + for dataset in has: + mem_per_chunk += dataset.size * dataset.itemsize * self.chunks[0] if mem_per_chunk > self.max_memory: - raise ValueError("`max_memory` kwarg " + - f"must be at least {mem_per_chunk} for " + - f"chunking pattern of {self.chunks}") + raise ValueError( + "ZarrTrajWriter: `max_memory` kwarg " + + f"must be at least {mem_per_chunk} for " + + f"chunking pattern of {self.chunks}" + ) else: - self.n_buffer_frames = self.chunks[0] * (self.max_memory // - mem_per_chunk) + self.n_buffer_frames = self.chunks[0] * ( + self.max_memory // mem_per_chunk + ) def _initialize_zarr_datasets(self, ts): """initializes all datasets that will be written to by @@ -722,90 +820,96 @@ def _initialize_zarr_datasets(self, ts): # for keeping track of where to write in the dataset self._counter = 0 - self._particle_group = self._file.require_group('particles') + self._particle_group = self._file.require_group("particles") # NOTE: subselection init goes here when implemented # Initialize units group - self._particle_group.require_group('units') - self._particle_group["units"].attrs['time'] = self.units['time'] - self._particle_group["units"].attrs['length'] = self.units['length'] - self._particle_group["units"].attrs['velocity'] = self.units['velocity'] - self._particle_group["units"].attrs['force'] = self.units['force'] + self._particle_group.require_group("units") + self._particle_group["units"].attrs["time"] = self.units["time"] + self._particle_group["units"].attrs["length"] = self.units["length"] + self._particle_group["units"].attrs["velocity"] = self.units["velocity"] + self._particle_group["units"].attrs["force"] = self.units["force"] - self._particle_group.require_group('box') + self._particle_group.require_group("box") if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - self._particle_group['box'].attrs['boundary'] = 'periodic' - self._particle_group['box']['dimensions'] = zarr.empty( + self._particle_group["box"].attrs["boundary"] = "periodic" + self._particle_group["box"]["dimensions"] = zarr.empty( shape=(self._first_dim, 3, 3), dtype=np.float32, compressor=self.compressor, - filters=self.filters + filters=self.filters, ) - self._dimensions = self._particle_group['box']['dimensions'] + self._dimensions = self._particle_group["box"]["dimensions"] else: # boundary attr must be "none" - self._particle_group['box'].attrs['boundary'] = 'none' + self._particle_group["box"].attrs["boundary"] = "none" - self._particle_group['step'] = (zarr.empty(shape=(self._first_dim,), - dtype=np.int32)) - self._step = self._particle_group['step'] - self._particle_group['time'] = (zarr.empty(shape=(self._first_dim,), - dtype=np.int32)) - self._time = self._particle_group['time'] + self._particle_group["step"] = zarr.empty( + shape=(self._first_dim,), dtype=np.int32 + ) + self._step = self._particle_group["step"] + self._particle_group["time"] = zarr.empty( + shape=(self._first_dim,), dtype=np.int32 + ) + self._time = self._particle_group["time"] if self.has_positions: - self._create_trajectory_dataset('positions') - self._pos = self._particle_group['positions'] + self._create_trajectory_dataset("positions") + self._pos = self._particle_group["positions"] if self.has_velocities: - self._create_trajectory_dataset('velocities') - self._vel = self._particle_group['velocities'] + self._create_trajectory_dataset("velocities") + self._vel = self._particle_group["velocities"] if self.has_forces: - self._create_trajectory_dataset('forces') - self._force = self._particle_group['forces'] - - # intialize observable datasets from ts.data dictionary that - # are NOT in self.data_blacklist - self.data_keys = [ - key for key in ts.data.keys() if key not in self.data_blacklist] - if self.data_keys: - self._obsv = self._particle_group.require_group('observables') + self._create_trajectory_dataset("forces") + self._force = self._particle_group["forces"] if self.data_keys: + self._obsv = self._particle_group.require_group("observables") for key in self.data_keys: - self._create_observables_dataset(key, ts.data[key]) + data = np.asarray(ts.data[key]) + self._create_observables_dataset(key, data) def _create_observables_dataset(self, group, data): """helper function to initialize a dataset for each observable""" - - self._obsv.require_group(group) - # guarantee ints and floats have a shape () - data = np.asarray(data) - self._obsv[group] = zarr.empty(shape=(self._first_dim,) + data.shape, - dtype=data.dtype) + self._obsv[group] = zarr.empty( + shape=(self._first_dim,) + data.shape, dtype=data.dtype + ) def _create_trajectory_dataset(self, group): """helper function to initialize a dataset for position, velocity, and force""" - self._particle_group[group] = zarr.empty(shape=(self._first_dim, - self.n_atoms, 3), - dtype=np.float32, - chunks=self.chunks, - filters=self.filters, - compressor=self.compressor) + self._particle_group[group] = zarr.empty( + shape=(self._first_dim, self.n_atoms, 3), + dtype=np.float32, + chunks=self.chunks, + filters=self.filters, + compressor=self.compressor, + ) def _initialize_memory_buffers(self): - self._time_buffer = np.zeros((self.n_buffer_frames,), dtype=np.float32) - self._step_buffer = np.zeros((self.n_buffer_frames,), dtype=np.int32) - self._dimensions_buffer = np.zeros((self.n_buffer_frames, 3, 3), - dtype=np.float32) + self._time_buffer = np.empty((self.n_buffer_frames,), dtype=np.float32) + self._step_buffer = np.empty((self.n_buffer_frames,), dtype=np.int32) + self._dimensions_buffer = np.empty( + (self.n_buffer_frames, 3, 3), dtype=np.float32 + ) if self.has_positions: - self._pos_buffer = np.zeros((self.n_buffer_frames, self.n_atoms, - 3), dtype=np.float32) + self._pos_buffer = np.empty( + (self.n_buffer_frames, self.n_atoms, 3), dtype=np.float32 + ) if self.has_velocities: - self._force_buffer = np.zeros((self.n_buffer_frames, self.n_atoms, - 3), dtype=np.float32) + self._force_buffer = np.empty( + (self.n_buffer_frames, self.n_atoms, 3), dtype=np.float32 + ) if self.has_forces: - self._vel_buffer = np.zeros((self.n_buffer_frames, self.n_atoms, - 3), dtype=np.float32) + self._vel_buffer = np.empty( + (self.n_buffer_frames, self.n_atoms, 3), dtype=np.float32 + ) + if self.data_keys: + self._obsv_buffer = {} + for key in self.data_keys: + self._obsv_buffer[key] = np.empty( + (self.n_buffer_frames,) + self._obsv[key].shape[1:], + dtype=self._obsv[key].dtype, + ) # Reduce cloud I/O by storing previous step & time val for error checking self._prev_step = None self._prev_time = None @@ -817,53 +921,87 @@ def _write_next_buffered_timestep(self, ts): buffer_index = i % self.n_buffer_frames # Add the current timestep information to the buffer try: - curr_step = ts.data['step'] + curr_step = ts.data["step"] except KeyError: curr_step = ts.frame self._step_buffer[buffer_index] = curr_step if self._prev_step is not None and curr_step < self._prev_step: - raise ValueError("The Zarrtraj standard dictates that the step " - "dataset must increase monotonically in value.") + raise ValueError( + "ZarrTrajWriter: The Zarrtraj standard dictates that the step " + "dataset must increase monotonically in value." + ) self._prev_step = curr_step - curr_time = (self.convert_time_to_native(ts.time, inplace=False)) + curr_time = self.convert_time_to_native(ts.time, inplace=False) self._time_buffer[buffer_index] = curr_time if self._prev_time is not None and curr_time < self._prev_time: - raise ValueError("The Zarrtraj standard dictates that the time " - "dataset must increase monotonically in value.") + raise ValueError( + "ZarrTrajWriter: The Zarrtraj standard dictates that the time " + "dataset must increase monotonically in value." + ) self._prev_time = curr_time if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - self._dimensions_buffer[buffer_index, :] = ( - self.convert_pos_to_native(ts.triclinic_dimensions, - inplace=False)) + self._dimensions_buffer[buffer_index] = self.convert_pos_to_native( + ts.triclinic_dimensions, inplace=False + ) if self.has_positions: - self._pos_buffer[buffer_index, :] = self.convert_pos_to_native( - ts.positions, inplace=False) + self._pos_buffer[buffer_index] = self.convert_pos_to_native( + ts.positions, inplace=False + ) if self.has_velocities: - self._vel_buffer[buffer_index, :] = ( - self.convert_velocities_to_native(ts.velocities, inplace=False)) + self._vel_buffer[buffer_index] = self.convert_velocities_to_native( + ts.velocities, inplace=False + ) if self.has_forces: - self._force_buffer[buffer_index, :] = ( - self.convert_forces_to_native(ts.forces, inplace=False)) + self._force_buffer[buffer_index] = self.convert_forces_to_native( + ts.forces, inplace=False + ) + + for key in self.data_keys: + try: + data = np.asarray(ts.data[key]) + self._obsv_buffer[key][buffer_index] = data + except IndexError: + raise ValueError( + "ZarrTrajWriter: Observables data must be sampled at the same rate as the position, velocity, and force data." + ) # If buffer is full or last write call, write buffers to cloud - if (((i + 1) % self.n_buffer_frames == 0) or - (i == self.n_frames - 1)): - da.from_array(self._step_buffer[:buffer_index + 1]).to_zarr(self._step, region=(slice(i - buffer_index, i + 1),)) - da.from_array(self._time_buffer[:buffer_index + 1]).to_zarr(self._time, region=(slice(i - buffer_index, i + 1),)) + if ((i + 1) % self.n_buffer_frames == 0) or (i == self.n_frames - 1): + da.from_array(self._step_buffer[: buffer_index + 1]).to_zarr( + self._step, region=(slice(i - buffer_index, i + 1),) + ) + da.from_array(self._time_buffer[: buffer_index + 1]).to_zarr( + self._time, region=(slice(i - buffer_index, i + 1),) + ) if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - da.from_array(self._dimensions_buffer[:buffer_index + 1]).to_zarr(self._dimensions, region=(slice(i - buffer_index, i + 1),)) + da.from_array( + self._dimensions_buffer[: buffer_index + 1] + ).to_zarr( + self._dimensions, region=(slice(i - buffer_index, i + 1),) + ) if self.has_positions: - da.from_array(self._pos_buffer[:buffer_index + 1]).to_zarr(self._pos, region=(slice(i - buffer_index, i + 1),)) + da.from_array(self._pos_buffer[: buffer_index + 1]).to_zarr( + self._pos, region=(slice(i - buffer_index, i + 1),) + ) if self.has_velocities: - da.from_array(self._vel_buffer[:buffer_index + 1]).to_zarr(self._vel, region=(slice(i - buffer_index, i + 1),)) + da.from_array(self._vel_buffer[: buffer_index + 1]).to_zarr( + self._vel, region=(slice(i - buffer_index, i + 1),) + ) if self.has_forces: - da.from_array(self._force_buffer[:buffer_index + 1]).to_zarr(self._force, region=(slice(i - buffer_index, i + 1),)) - + da.from_array(self._force_buffer[: buffer_index + 1]).to_zarr( + self._force, region=(slice(i - buffer_index, i + 1),) + ) + for key in self.data_keys: + da.from_array( + self._obsv_buffer[key][: buffer_index + 1] + ).to_zarr( + self._obsv[key], region=(slice(i - buffer_index, i + 1),) + ) self._counter += 1 def _write_next_timestep(self, ts): @@ -886,48 +1024,70 @@ def _write_next_timestep(self, ts): self._step.resize((self._step.shape[0] + 1,)) self._time.resize((self._time.shape[0] + 1,)) if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - self._dimensions.resize((self._dimensions.shape[0] + 1,) + - self._dimensions.shape[1:]) + self._dimensions.resize( + (self._dimensions.shape[0] + 1,) + + self._dimensions.shape[1:] + ) if self.has_positions: - self._pos.resize((self._pos.shape[0] + 1,) + - self._pos.shape[1:]) + self._pos.resize( + (self._pos.shape[0] + 1,) + self._pos.shape[1:] + ) if self.has_velocities: - self._vel.resize((self._vel.shape[0] + 1,) + - self._vel.shape[1:]) + self._vel.resize( + (self._vel.shape[0] + 1,) + self._vel.shape[1:] + ) if self.has_forces: - self._force.resize((self._force.shape[0] + 1,) + - self._force.shape[1:]) + self._force.resize( + (self._force.shape[0] + 1,) + self._force.shape[1:] + ) + for key in self.data_keys: + self._obsv[key].resize( + (self._obsv[key].shape[0] + 1,) + self._obsv[key].shape[1:] + ) # Zarrtraj step refers to the integration step at which the data were # sampled, therefore ts.data['step'] is the most appropriate value # to use. However, step is also necessary in Zarrtraj to allow # temporal matching of the data, so ts.frame is used as an alternative try: - self._step[i] = ts.data['step'] + self._step[i] = ts.data["step"] except KeyError: self._step[i] = ts.frame - if len(self._step) > 1 and self._step[i] < self._step[i-1]: - raise ValueError("The Zarrtraj standard dictates that the step " - "dataset must increase monotonically in value.") + if len(self._step) > 1 and self._step[i] < self._step[i - 1]: + raise ValueError( + "ZarrTrajWriter: The Zarrtraj standard dictates that the step " + "dataset must increase monotonically in value." + ) self._time[i] = self.convert_time_to_native(ts.time, inplace=False) - if len(self._time) > 1 and self._time[i] < self._time[i-1]: - raise ValueError("The Zarrtraj standard dictates that the time " - "dataset must increase monotonically in value.") + if len(self._time) > 1 and self._time[i] < self._time[i - 1]: + raise ValueError( + "ZarrTrajWriter: The Zarrtraj standard dictates that the time " + "dataset must increase monotonically in value." + ) if self._boundary == ZarrTrajBoundaryConditions.ZARRTRAJ_PERIODIC: - self._dimensions[i, :] = self.convert_pos_to_native( - ts.triclinic_dimensions, inplace=False) + self._dimensions[i] = self.convert_pos_to_native( + ts.triclinic_dimensions, inplace=False + ) if self.has_positions: - self._pos[i, :] = self.convert_pos_to_native( - ts.positions, inplace=False) + self._pos[i] = self.convert_pos_to_native( + ts.positions, inplace=False + ) if self.has_velocities: - self._vel[i, :] = self.convert_velocities_to_native( - ts.velocities, inplace=False) + self._vel[i] = self.convert_velocities_to_native( + ts.velocities, inplace=False + ) if self.has_forces: - self._force[i, :] = self.convert_forces_to_native( - ts.forces, inplace=False) - # NOTE: Fix me. add observables - # if self.convert_units: - # self._convert_dataset_with_units(i) + self._force[i] = self.convert_forces_to_native( + ts.forces, inplace=False + ) + for key in self.data_keys: + try: + data = np.asarray(ts.data[key]) + self._obsv[key][i] = data + except IndexError: + raise ValueError( + "ZarrTrajWriter: Observables data must be sampled at the same rate as the position, velocity, and force data." + ) self._counter += 1 @@ -944,4 +1104,4 @@ def has_velocities(self): @property def has_forces(self): """``True`` if writer is writing forces from Timestep.""" - return "forces" in self._has \ No newline at end of file + return "forces" in self._has diff --git a/zarrtraj/__init__.py b/zarrtraj/__init__.py index 1954ca0..bcee3f0 100755 --- a/zarrtraj/__init__.py +++ b/zarrtraj/__init__.py @@ -16,11 +16,15 @@ print("importing zarrtraj...") + def _format_hint(thing): - return (not isinstance(thing, np.ndarray) and - not isinstance(thing, zarr.Group) and - util.iterable(thing) and - not util.isstream(thing)) + return ( + not isinstance(thing, np.ndarray) + and not isinstance(thing, zarr.Group) + and util.iterable(thing) + and not util.isstream(thing) + ) + _READER_HINTS["CHAIN"] = _format_hint diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/.zattrs b/zarrtraj/data/cobrotoxin.zarrtraj/.zattrs new file mode 100644 index 0000000..dfba51c --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/.zattrs @@ -0,0 +1,3 @@ +{ + "version": "0.1.0" +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/.zgroup b/zarrtraj/data/cobrotoxin.zarrtraj/.zgroup new file mode 100644 index 0000000..3b7daf2 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/.zgroup @@ -0,0 +1,3 @@ +{ + "zarr_format": 2 +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zattrs b/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zattrs new file mode 100644 index 0000000..7810d50 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zattrs @@ -0,0 +1,5 @@ +{ + "author": "N/A", + "creator": "MDAnalysis", + "creator_version": "2.7.0" +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zgroup b/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zgroup new file mode 100644 index 0000000..3b7daf2 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/metadata/.zgroup @@ -0,0 +1,3 @@ +{ + "zarr_format": 2 +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/particles/.zgroup b/zarrtraj/data/cobrotoxin.zarrtraj/particles/.zgroup new file mode 100644 index 0000000..3b7daf2 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/particles/.zgroup @@ -0,0 +1,3 @@ +{ + "zarr_format": 2 +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zattrs b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zattrs new file mode 100644 index 0000000..7128417 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zattrs @@ -0,0 +1,3 @@ +{ + "boundary": "periodic" +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zgroup b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zgroup new file mode 100644 index 0000000..3b7daf2 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/.zgroup @@ -0,0 +1,3 @@ +{ + "zarr_format": 2 +} \ No newline at end of file diff --git a/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/dimensions/.zarray b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/dimensions/.zarray new file mode 100644 index 0000000..11ddb06 --- /dev/null +++ b/zarrtraj/data/cobrotoxin.zarrtraj/particles/box/dimensions/.zarray @@ -0,0 +1,24 @@ +{ + "chunks": [ + 3, + 3, + 3 + ], + "compressor": { + "blocksize": 0, + "clevel": 5, + "cname": "lz4", + "id": "blosc", + "shuffle": 1 + }, + "dtype": " wat_ag.n_atoms: + raise ValueError("n must be less than the number of water molecules") + + result = np.empty((prot_ag.universe.trajectory.n_frames, n), dtype=int) + + i = 0 + for ts in prot_ag.universe.trajectory: + dist = distances.distance_array( + prot_ag.positions, wat_ag.positions, box=prot_ag.dimensions + ) + + minvals = np.empty(wat_ag.n_atoms) + for j in range(wat_ag.n_atoms): + minvals[j] = np.min(dist[:, j]) + + result[i] = np.argsort(minvals)[:n] + i += 1 + + return result + + +def find_free_port(): + """Find a free port on the host machine.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1]