diff --git a/environment.yml b/environment.yml index 8fc0a06..b060d7d 100644 --- a/environment.yml +++ b/environment.yml @@ -2,10 +2,12 @@ name: msibi channels: - conda-forge dependencies: -- cuda-version==11.8 -- freud>=2.13 -- gsd>=3.0 -- hoomd>=4.0 -- cmeutils +- cuda-version=11.8 +- freud >=2.13 +- gsd >=3.0 +- hoomd >=4.0 +- python >=3.9 +- pandas +- cmeutils >=1.2 - more-itertools - jupyter diff --git a/msibi/__init__.py b/msibi/__init__.py index d2c4732..655df43 100644 --- a/msibi/__init__.py +++ b/msibi/__init__.py @@ -1,9 +1,10 @@ -from msibi.optimize import MSIBI -from msibi.forces import Pair, Bond, Angle, Dihedral -from msibi.potentials import * -from msibi.state import State -from msibi.__version__ import __version__ -from msibi import utils +from .state import State +from .forces import Pair, Bond, Angle, Dihedral +from .optimize import MSIBI +#from .potentials import * +#from .potentials import quadratic_spring, mie, lennard_jones, pair_tail_correction +from .__version__ import __version__ +from . import utils __all__ = [ "__version__", diff --git a/msibi/forces.py b/msibi/forces.py index 2a5d518..0e20dbe 100644 --- a/msibi/forces.py +++ b/msibi/forces.py @@ -1,5 +1,6 @@ import math import os +from typing import Union import warnings from cmeutils.structure import ( @@ -10,8 +11,10 @@ ) import matplotlib.pyplot as plt import numpy as np +import pandas as pd -from msibi.potentials import quadratic_spring, bond_correction +import msibi +from msibi.potentials import quadratic_spring, bond_correction, lennard_jones from msibi.utils.error_calculation import calc_similarity from msibi.utils.smoothing import savitzky_golay from msibi.utils.sorting import natural_sort @@ -41,15 +44,17 @@ class Force(object): This must be a positive integer if this force is being optimized. nbins is used to setting the potenials independent varible (x) range and step size (dx). + It is also used in determining the bin size of the target and query + distributions. """ def __init__( self, - name, - optimize, - nbins=None, - head_correction_form="linear" + name: str, + optimize: bool, + nbins: int=None, + head_correction_form: str="linear" ): if optimize and nbins is None or nbins<=0: raise ValueError( @@ -88,7 +93,8 @@ def __repr__(self): ) @property - def potential(self): + def potential(self) -> np.ndarray: + """The potential energy values V(x).""" if self.format != "table": warnings.warn(f"{self} is not using a table potential.") return self._potential @@ -106,54 +112,119 @@ def potential(self, array): self._potential = array @property - def force(self): + def force(self) -> np.ndarray: + """The force values F(x).""" if self.format != "table": warnings.warn(f"{self} is not using a table potential.") return None return -1.0*np.gradient(self.potential, self.dx) @property - def smoothing_window(self): + def smoothing_window(self) -> int: + """Window size used in smoothing the distributions.""" return self._smoothing_window @smoothing_window.setter - def smoothing_window(self, value): + def smoothing_window(self, value: int): + if not isinstance(value, int): + raise ValueError("The smoothing window must be an integer.") self._smoothing_window = value for state in self._states: self._add_state(state) @property - def smoothing_order(self): + def smoothing_order(self) -> int: + """The order used in smoothing the distributions.""" return self._smoothing_order @smoothing_order.setter - def smoothing_order(self, value): + def smoothing_order(self, value: int): + if not isinstance(value, int): + raise ValueError("The smoothing order must be an integer.") self._smoothing_order = value for state in self._states: self._add_state(state) @property - def nbins(self): + def nbins(self) -> int: + """The number of bins used in calculating distributions.""" return self._nbins @nbins.setter - def nbins(self, value): + def nbins(self, value: int): + if not isinstance(value, int): + raise ValueError("nbins must be an integer.") self._nbins = value for state in self._states: self._add_state(state) - def target_distribution(self, state): + def smooth_potential(self) -> None: + """Smooth and overwrite the current potential. + + Note + ---- + This uses a Savitzky Golay smoothing algorithm where the + window size and order parameters are set by + msibi.forces.Force.smoothing_window and + msibi.forces.Force.smoothing_order. + Both of these can be changed using their respective setters. + + """ + if self.format != "table": + raise RuntimeError( + "This force is not a table potential and is not mutable." + ) + potential = np.copy(self.potential) + self.potential = savitzky_golay( + y=potential, + window_size=self.smoothing_window, + order=self.smoothing_order, + ) + + def save_potential(self, file_path: str) -> None: + """Save the x-range, potential and force to a csv file. + + Parameters + ---------- + file_path : str, required + File path and name to save table potential to. + + Notes + ----- + This method uses pandas.DataFrame.to_csv() and saves the data + in with column labels of 'x', 'potential', 'force'. + + If you want to smooth the final potential, use + msibi.forces.Force.smooth_potential() before + calling this method. + + """ + if self.format != "table": + raise RuntimeError( + "This force is not a table potential and " + "cannot be saved to a .txt file." + ) + df = pd.DataFrame({ + "x": self.x_range, + "potential": self.potential, + "force": self.force + }) + df.to_csv(file_path, index=False) + + def target_distribution(self, state: msibi.state.State) -> np.ndarray: """The target structural distribution corresponding to this foce. Parameters ---------- state : msibi.state.State, required The state to use in finding the target distribution. + """ return self._states[state]["target_distribution"] - def plot_target_distribution(self, state): - """Quick plotting function that shows the target structural + def plot_target_distribution(self, state: msibi.state.State) -> None: + """ + Quick plotting function that shows the target structural distribution corresponding to this forces. Parameters @@ -166,6 +237,7 @@ def plot_target_distribution(self, state): Use this to see how the shape of the target distribution is affected by your choices for nbins, smoothing window, and smoothing order. + """ #TODO: Make custom error if not self.optimize: @@ -188,14 +260,15 @@ def plot_target_distribution(self, state): plt.plot(target[:,0], y_smoothed, label="Smoothed") plt.legend() - def plot_fit_scores(self, state): - """Returns a plot showing the evolution of the distribution + def plot_fit_scores(self, state: msibi.state.State) -> None: + """Plots the evolution of the distribution matching evolution. Parameters ---------- state : msibi.state.State, required The state to use in finding the target distribution. + """ if not self.optimize: raise RuntimeError("This force object is not set to be optimized.") @@ -204,29 +277,64 @@ def plot_fit_scores(self, state): plt.xlabel("Iteration") plt.ylabel("Fit Score") - def distribution_history(self, state): - """Returns the complete query distribution history for a given state. + def distribution_history(self, state: msibi.state.State): + """Get the complete query distribution history for a given state. Parameters ---------- state : msibi.state.State, required The state to use for calculating the distribution. + """ return self._states[state]["distribution_history"] - def set_target_distribution(self, state, array): - """""" + def set_target_distribution(self, state: msibi.state.State, array) -> None: + """Store the target distribution for a given state. + + Parameters + ---------- + state: msibi.state.State + The state used in finding the distribution. + + """ self._states[state]["target_distribution"] = array - def current_distribution(self, state, query=True): - """""" + def current_distribution( + self, + state: msibi.state.State, + query: bool=True + ) -> np.ndarray: + """Returns the corresponding distrubution from the most recent + query simulation. + + Parameters + ---------- + state : msibi.state.State, required + The state to use for calculating the distribution. + + """ return self._get_state_distribution(state, query) - def distribution_fit(self, state): - """""" + def distribution_fit(self, state: msibi.state.State) -> float: + """Get the fit score from the most recent query simulation. + + Parameters + ---------- + state : msibi.state.State, required + The state to use for calculating the distribution. + + """ return self._calc_fit(state) - def set_quadratic(self, k4, k3, k2, x0, x_min, x_max): + def set_quadratic( + self, + k4: Union[float, int], + k3: Union[float, int], + k2: Union[float, int], + x0: Union[float, int], + x_min: Union[float, int], + x_max: Union[float, int] + ) -> None: """Set a potential based on the following function: V(x) = k4(x-x0)^4 + k3(x-x0)^3 + k2(x-x0)^2 @@ -245,6 +353,7 @@ def set_quadratic(self, k4, k3, k2, x0, x_min, x_max): The lower bound of the potential range x_max : float, required The upper bound of the potential range + """ self.format = "table" self.x_min = x_min @@ -255,42 +364,43 @@ def set_quadratic(self, k4, k3, k2, x0, x_min, x_max): self.force_init = "Table" self.force_entry = self._table_entry() - def set_from_file(self, file_path): - """Creates a potential from a text file. - The columns of the text file must be in the order of r, V. - where r is the independent value (i.e. distance) and V - is the potential enregy at r. The force will be calculated - from r and V using np.gradient(). + def set_from_file(self, file_path: str) -> None: + """Set a potential from a csv file. Parameters: ----------- file_path : str, required - The full path to the table potential text file. + The full path to the table potential csv file. + + Notes: + ------ + This uses pandas.DataFrame.read_csv and expects + column names of 'x', 'potential', and 'force'. - Notes - ----- Use this potential setter to set a potential from a previous MSIBI run. For example, use the final potential files from a bond-optimization IBI run to set a static coarse-grained bond potential while you perform IBI runs on angle and/or pair potentials. + + Also see: msibi.forces.Force.save_potential() + """ - f = np.loadtxt(file_path) - self.x_range = f[:,0] + self.format = "table" + df = pd.read_csv(file_path) + self.x_range = df["x"].values + self.potential = df["potential"].values self.dx = np.round(self.x_range[1] - self.x_range[0], 3) self.x_min = self.x_range[0] self.x_max = self.x_range[-1] + self.dx - self._potential = f[:,1] - self.format = "table" self.force_init = "Table" - self.force_entry = self.table_entry() - def _add_state(self, state): - """Add a state to be used in optimizing this Fond. + def _add_state(self, state: msibi.state.State) -> None: + """Add a state to be used in optimizing this force. Parameters ---------- state : msibi.state.State - Instance of a State object already created. + Instance of a State object previously created. """ if self.optimize: @@ -317,8 +427,15 @@ def _add_state(self, state): "path": state.dir } - def _compute_current_distribution(self, state): - """Find the current distribution of the query trajectory""" + def _compute_current_distribution(self, state: msibi.state.State) -> None: + """Find the current distribution of the query trajectory + + Parameters + ---------- + state : msibi.state.State + Instance of a State object previously created. + + """ distribution = self._get_state_distribution(state, query=True) if self.smoothing_window and self.smoothing_order: distribution[:,1] = savitzky_golay( @@ -337,24 +454,42 @@ def _compute_current_distribution(self, state): self._states[state]["target_distribution"][:,1] ) self._states[state]["f_fit"].append(f_fit) - #TODO: Get rid of this func? Pass in correct traj file in other funcs? - def _get_state_distribution(self, state, query): - """Find the bond length distribution of a Bond at a State.""" + + def _get_state_distribution( + self, + state: msibi.state.State, + query: bool + ) -> np.ndarray: + """Get the corresponding distrubiton for a given state. + + Parameters + ---------- + state: msibi.state.State + State used in calculating the distribution. + query: bool + If True, uses the most recent query trajectory. + If False, uses the state's target trajectory. + + """ if query: traj = state.query_traj else: traj = state.traj_file return self._get_distribution(state=state, gsd_file=traj) - def _save_current_distribution(self, state, iteration): - """Save the current bond length distribution + def _save_current_distribution( + self, + state: msibi.state.State, + iteration: int + ) -> None: + """Save the corresponding distrubiton for a given state to a file. Parameters ---------- state : State - A state object + The state used in finding the distribution. iteration : int - Current iteration step, used in the filename + Current iteration step, used in the filename. """ distribution = self._states[state]["current_distribution"] @@ -363,9 +498,9 @@ def _save_current_distribution(self, state, iteration): fpath = os.path.join(state.dir, fname) np.savetxt(fpath, distribution) - def _update_potential(self): + def _update_potential(self) -> None: """Compare distributions of current iteration against target, - and update the Bond potential via Boltzmann inversion. + and update the potential via Boltzmann inversion. """ self.potential_history.append(np.copy(self.potential)) @@ -390,13 +525,45 @@ def _update_potential(self): class Bond(Force): + """ + Creates a bond force used in query simulations. + + Parameters + ---------- + type1 : str, required + Name of the first particle type in the bond. + This must match the types found in the state's target GSD file. + type2 : str, required + Name of the second particle type in the bond. + This must match the types found in the state's target GSD file. + optimize : bool, required + Set to True if this force is to be mutable and optimized. + Set to False if this force is to be held constant while + other forces are optimized. + nbins : int, optional + This must be a positive integer if this force is being optimized. + nbins is used to setting the potenials independent varible (x) range + and step size (dx). + It is also used in determining the bin size of the target and query + distributions. + + Notes + ----- + The bond type is sorted so that type1 and type2 + are listed in alphabetical order, and must match the bond + types found in the state's target GSD file bond types. + + For example: `Bond(type1="B", type2="A")` will have `Bond.name = "A-B"` + + """ + def __init__( self, - type1, - type2, - optimize, - nbins=None, - head_correction_form="linear" + type1: str, + type2: str, + optimize: bool, + nbins: int=None, + head_correction_form: str="linear" ): self.type1, self.type2 = sorted( [type1, type2], key=natural_sort @@ -410,30 +577,31 @@ def __init__( head_correction_form=head_correction_form ) - def set_harmonic(self, r0, k): - """Sets a fixed harmonic bond potential. + def set_harmonic(self, r0: Union[float, int], k: Union[float, int]) -> None: + """Set a fixed harmonic bond potential. Using this method is not compatible force msibi.forces.Force objects that are set to be optimized during MSIBI Parameters ---------- - r0 : float, required + r0 : (Union[float, int]), required Equilibrium bond length - k : float, required + k : (Union[float, int]), required Spring constant + """ if self.optimize: raise RuntimeError( f"Force {self} is set to be optimized during MSIBI." "This potential setter cannot be used " - "for a force set for optimization. Instead, use either " - "set_from_file() or set_quadratic()." + "for a force designated for optimization. " + "Instead, use set_from_file() or set_quadratic()." ) self.type = "static" self.force_init = "Harmonic" self.force_entry = dict(r0=r0, k=k) - def _table_entry(self): + def _table_entry(self) -> dict: table_entry = { "r_min": self.x_min, "r_max": self.x_max, @@ -442,7 +610,21 @@ def _table_entry(self): } return table_entry - def _get_distribution(self, state, gsd_file): + def _get_distribution( + self, + state: msibi.state.State, + gsd_file: str + ) -> np.ndarray: + """Calculate a bond length distribution. + + Parameters + ---------- + state: msibi.state.State, required + State used in calculating the distribution. + gsd_file: str, required + Path to the GSD file used. + + """ return bond_distribution( gsd_file=gsd_file, A_name=self.type1, @@ -457,14 +639,46 @@ def _get_distribution(self, state, gsd_file): class Angle(Force): + """ + Creates an angle force used in query simulations. + + Parameters + ---------- + type1 : str, required + Name of the first particle type in the angle. + This must match the types found in the state's target GSD file. + type2 : str, required + Name of the second particle type in the angle. + This must match the types found in the state's target GSD file. + type3 : str, required + Name of the third particle type in the angle. + This must match the types found in the state's target GSD file. + optimize : bool, required + Set to True if this force is to be mutable and optimized. + Set to False if this force is to be held constant while + other forces are optimized. + nbins : int, optional + This must be a positive integer if this force is being optimized. + nbins is used to setting the potenials independent varible (x) range + and step size (dx). + It is also used in determining the bin size of the target and query + distributions. + + Notes + ----- + The angle type is formed in the order of type1-type2-type3 and must match + the same order in the target GSD file angle types. + + """ + def __init__( self, - type1, - type2, - type3, - optimize, - nbins=None, - head_correction_form="linear" + type1: str, + type2: str, + type3: str, + optimize: bool, + nbins: int=None, + head_correction_form: str="linear" ): self.type1 = type1 self.type2 = type2 @@ -478,34 +692,49 @@ def __init__( head_correction_form=head_correction_form ) - def set_harmonic(self, t0, k): - """Sets a fixed harmonic angle potential. + def set_harmonic(self, t0: Union[float, int], k: Union[float, int]) -> None: + """Set a fixed harmonic angle potential. Using this method is not compatible force msibi.forces.Force objects that are set to be optimized during MSIBI Parameters ---------- - t0 : float, required + t0 : (Union[float, int]), required Equilibrium bond angle - k : float, required + k : (Union[float, int]), required Spring constant + """ if self.optimize: raise RuntimeError( f"Force {self} is set to be optimized during MSIBI." "This potential setter cannot be used " - "for a force set for optimization. Instead, use either " - "set_from_file() or set_quadratic()." + "for a force designated for optimization. " + "Instead, use set_from_file() or set_quadratic()." ) self.type = "static" self.force_init = "Harmonic" self.force_entry = dict(t0=t0, k=k) - def _table_entry(self): + def _table_entry(self) -> dict: table_entry = {"U": self.potential, "tau": self.force} return table_entry - def _get_distribution(self, state, gsd_file): + def _get_distribution( + self, + state: msibi.state.State, + gsd_file: str + ) -> np.ndarray: + """Calculate a bond angle distribution. + + Parameters + ---------- + state: msibi.state.State, required + State used in calculating the distribution. + gsd_file: str, required + Path to the GSD file used. + + """ return angle_distribution( gsd_file=gsd_file, A_name=self.type1, @@ -521,72 +750,190 @@ def _get_distribution(self, state, gsd_file): class Pair(Force): + """ + Creates a pair force used in query simulations. + + Parameters + ---------- + type1 : str, required + Name of the first particle type in the pair. + This must match the types found in the state's target GSD file. + type2 : str, required + Name of the second particle type in the pair. + This must match the types found in the state's target GSD file. + optimize : bool, required + Set to True if this force is to be mutable and optimized. + Set to False if this force is to be held constant while + other forces are optimized. + r_cut : (Union[float, int]) : required + Sets the cutoff distance used in Hoomd's neighborlist. + nbins : int, optional + This must be a positive integer if this force is being optimized. + nbins is used to setting the potenials independent varible (x) range + and step size (dx). + It is also used in determining the bin size of the target and query + distributions. + exclude_bonded : bool, default False + If True, then particles from the same molecule are not + included in the RDF calculation. + If False, all particles are included. + + Notes + ----- + The pair type is sorted so that type1 and type2 + are listed in alphabetical order, and must match the pair + types found in the state's target GSD file bond types. + + For example: `Pair(type1="B", type2="A")` will have `Pair.name = "A-B"` + + """ + def __init__( self, - type1, - type2, - optimize, - exclude_bonded=False, - head_correction_form="linear" + type1: str, + type2: str, + optimize: bool, + r_cut: Union[float, int], + nbins: int=None, + exclude_bonded: bool=False, + head_correction_form: str="linear" ): self.type1, self.type2 = sorted( [type1, type2], key=natural_sort) + self.r_cut = r_cut name = f"{self.type1}-{self.type2}" - self.r_cut = None + # Pair types in hoomd have different naming structure. + self._pair_name = (type1, type2) super(Pair, self).__init__( name=name, optimize=optimize, + nbins=nbins, head_correction_form=head_correction_form ) - def set_lj(self, epsilon, sigma): - """Creates a hoomd 12-6 LJ pair potential used during - the query simulations. This method is not compatible when - optimizing pair potentials. Rather, this method should - only be used to create static pair potentials while optimizing - other potentials. + def set_lj( + self, + r_min: Union[float, int], + r_cut: Union[float, int], + epsilon: Union[float, int], + sigma: Union[float, int], + ) -> None: + """Set a hoomd 12-6 LJ pair potential used during + the query simulations. Parameters ---------- - epsilon : float, required + epsilon : (Union[float, int]), required Sets the dept hof the potential energy well. - sigma : float, required + sigma : (Union[float, int]), required Sets the particle size. - r_cut : float, required + r_cut : (Union[float, int]), required Maximum distance used to calculate neighbor pair potentials. """ - self.type = "static" - self.force_init = "LJ" - self.force_entry = dict(sigma=sigma, epsilon=epsilon) + self.format = "table" + self.dx = (r_cut - r_min) / self.nbins + self.x_range = np.arange(r_min, r_cut + self.dx, self.dx) + self.x_min = self.x_range[0] + self.r_cut = self.x_range[-1] + self.potential = lennard_jones( + r=self.x_range, + epsilon=epsilon, + sigma=sigma + ) + self.force_init = "Table" - def _get_distribution(self, state, gsd_file): - return gsd_rdf( - gsdfile=gsd_file, - A_name=self.type1, - B_name=self.type2, - start=-state.n_frames, - stop=-1, - bins=self.nbins + 1 + def _table_entry(self) -> dict: + table_entry = { + "r_min": self.x_min, + "U": self.potential, + "F": self.force, + } + return table_entry + + def _get_distribution( + self, + state: msibi.state.State, + gsd_file: str + ) -> np.ndarray: + """Calculate a pair distribution. + + Parameters + ---------- + state: msibi.state.State, required + State used in calculating the distribution. + gsd_file: str, required + Path to the GSD file used. + + """ + rdf, N = gsd_rdf( + gsdfile=gsd_file, + A_name=self.type1, + B_name=self.type2, + r_min=self.x_min, + r_max=self.r_cut, + exclude_bonded=state.exclude_bonded, + start=-state.n_frames, + stop=-1, + bins=self.nbins + 1 ) + x = rdf.bin_centers + y = rdf.rdf * N + dist = np.vstack([x, y]) + return dist.T + class Dihedral(Force): + """ + Creates a dihedral force used in query simulations. + + Parameters + ---------- + type1 : str, required + Name of the first particle type in the dihedral. + This must match the types found in the state's target GSD file. + type2 : str, required + Name of the second particle type in the dihedral. + This must match the types found in the state's target GSD file. + type3 : str, required + Name of the third particle type in the dihedral. + This must match the types found in the state's target GSD file. + type4 : str, required + Name of the fourth particle type in the dihedral. + This must match the types found in the state's target GSD file. + optimize : bool, required + Set to True if this force is to be mutable and optimized. + Set to False if this force is to be held constant while + other forces are optimized. + nbins : int, optional + This must be a positive integer if this force is being optimized. + nbins is used to setting the potenials independent varible (x) range + and step size (dx). + It is also used in determining the bin size of the target and query + distributions. + + Notes + ----- + The dihedral type is formed in the order of type1-type2-type3-type4 and + must match the same order in the target GSD file angle types. + + """ + def __init__( self, - type1, - type2, - type3, - type4, - optimize, - nbins=None, - head_correction_form="linear" + type1: str, + type2: str, + type3: str, + type4: str, + optimize: bool, + nbins: int=None, + head_correction_form: str="linear" ): self.type1 = type1 self.type2 = type2 self.type3 = type3 self.type4 = type4 name = f"{self.type1}-{self.type2}-{self.type3}-{self.type4}" - self.table_entry = dict(U=None, tau=None) super(Dihedral, self).__init__( name=name, optimize=optimize, @@ -594,10 +941,14 @@ def __init__( head_correction_form=head_correction_form ) - def set_harmonic(self, phi0, k): - """Sets a fixed harmonic dihedral potential. - Using this method is not compatible force msibi.forces.Force - objects that are set to be optimized during MSIBI + def set_harmonic( + self, + phi0: Union[float, int], + k: Union[float, int], + d: int, + n: int, + ) -> None: + """Set a fixed harmonic dihedral potential. Parameters ---------- @@ -609,23 +960,38 @@ def set_harmonic(self, phi0, k): Sign factor n : int, required Angle scaling factor + """ if self.optimize: raise RuntimeError( f"Force {self} is set to be optimized during MSIBI." "This potential setter cannot be used " - "for a force set for optimization. Instead, use either " - "set_from_file() or set_quadratic()." + "for a force designated for optimization. " + "Instead, use set_from_file() or set_quadratic()." ) self.type = "static" self.force_init = "Periodic" self.force_entry = dict(phi0=phi0, k=k, d=d, n=n) - def _table_entry(self): + def _table_entry(self) -> dict: table_entry = {"U": self.potential, "tau": self.force} return table_entry - def _get_distribution(self, state, gsd_file): + def _get_distribution( + self, + state: msibi.state.State, + gsd_file: str + ) -> np.ndarray: + """Calculate a dihedral distribution. + + Parameters + ---------- + state: msibi.state.State, required + State used in calculating the distribution. + gsd_file: str, required + Path to the GSD file used. + + """ return dihedral_distribution( gsd_file=gsd, A_name=self.type1, diff --git a/msibi/optimize.py b/msibi/optimize.py index 589dd57..ead8e14 100644 --- a/msibi/optimize.py +++ b/msibi/optimize.py @@ -1,47 +1,41 @@ import os +import pickle import shutil +import hoomd +from hoomd.md.methods import ConstantVolume, ConstantPressure import numpy as np import msibi -from msibi.potentials import pair_tail_correction, save_table_potential -from msibi.utils.smoothing import savitzky_golay -from msibi.utils.exceptions import UnsupportedEngine class MSIBI(object): - """Management class for orchestrating an MSIBI optimization. + """ + Management class for orchestrating an MSIBI optimization. Parameters ---------- - nlist : str, required - The type of hoomd neighbor list to use. - When optimizing bonded potentials, using hoomd.md.nlist.tree - may work best for single chain, low density simulations - When optimizing pair potentials hoomd.md.nlist.cell - may work best - integrator_method : str, required - The integrator_method to use in the query simulation. + nlist : hoomd.md.list.NeighborList, required + The type of Hoomd neighbor list to use. + integrator_method : hoomd.md.methods.Method, required + The integrator method to use in the query simulation. + The only supported options are ConstantVolume or ConstantPressure. integrator_kwargs : dict, required - The args and their values required by the integrator chosen + The arguments and their values required by the integrator chosen + thermostat : hoomd.md.methods.thermostat.Thermostat, required + The thermostat to be paired with the integrator method. + thermostat_kwargs : dict, required + The arguments and their values required by the thermostat chosen. dt : float, required The time step delta gsd_period : int, required The number of frames between snapshots written to query.gsd n_steps : int, required How many steps to run the query simulations - r_cut : float, optional, default 0 - Set the r_cut value to use in pair interactions. - Leave as zero if pair interactions aren't being used. nlist_exclusions : list of str, optional, default ["1-2", "1-3"] Sets the pair exclusions used during the optimization simulations seed : int, optional, default 42 Random seed to use during the simulation - backup_trajectories : bool, optional, default False - If False, the query simulation trajectories are - overwritten during each iteraiton. - If True, the query simulations are saved for - each iteration. Attributes ---------- @@ -59,28 +53,39 @@ class MSIBI(object): Methods ------- add_state(msibi.state.state) + Add a state point to be included in optimizing forces. add_force(msibi.forces.Force) Add the required interaction objects. See forces.py - run_optimization(n_iterations, backup_trajectories) + run_optimization(n_iterations, n_steps, backup_trajectories) Performs iterations of query simulations and potential updates resulting in a final optimized potential. + pickle_forces() + Saves a pickle file containing a list of Hoomd force objects + as they existed in the most recent optimization run. """ + def __init__( self, - nlist, - integrator_method, - thermostat, - method_kwargs, - thermostat_kwargs, - dt, - gsd_period, - r_cut, - nlist_exclusions=["bond", "angle"], - seed=42, + nlist: hoomd.md.nlist, + integrator_method: hoomd.md.methods.Method, + thermostat: hoomd.md.methods.thermostats.Thermostat, + method_kwargs: dict, + thermostat_kwargs: dict, + dt: float, + gsd_period: int, + nlist_exclusions: list[str]=["bond", "angle"], + seed: int=42, ): - if nlist not in ["Cell", "Tree", "Stencil"]: - raise ValueError(f"{nlist} is not a valid neighbor list in Hoomd") + if integrator_method not in [ + hoomd.md.methods.ConstantVolume, + hoomd.md.methods.ConstantPressure + ]: + raise ValueError( + "MSIBI is only compatible with NVT " + "(hoomd.md.methods.ConstantVolume), or NPT " + "(hoomd.md.methods.ConstantPressure)" + ) self.nlist = nlist self.integrator_method = integrator_method self.thermostat = thermostat @@ -88,7 +93,6 @@ def __init__( self.thermostat_kwargs = thermostat_kwargs self.dt = dt self.gsd_period = gsd_period - self.r_cut = r_cut self.seed = seed self.nlist_exclusions = nlist_exclusions self.n_iterations = 0 @@ -96,7 +100,7 @@ def __init__( self.forces = [] self._optimize_forces = [] - def add_state(self, state): + def add_state(self, state: msibi.state.State) -> None: """Add a state point to MSIBI.states. Parameters @@ -104,10 +108,11 @@ def add_state(self, state): state : msibi.state.State, required Instance of msibi.state.State """ + #TODO: Do we need this? state._opt = self self.states.append(state) - def add_force(self, force): + def add_force(self, force: msibi.forces.Force) -> None: """Add a force to be included in the query simulations. Parameters @@ -119,6 +124,7 @@ def add_force(self, force): ----- Only one type of force can be optimized at a time. Forces not set to be optimized are held fixed during query simulations. + """ self.forces.append(force) if force.optimize: @@ -158,11 +164,11 @@ def _add_optimize_force(self, force): def run_optimization( self, - n_steps, - n_iterations, - backup_trajectories=False, + n_steps: int, + n_iterations: int, + backup_trajectories: bool=False, _dir=None - ): + ) -> None: """Runs query simulations and performs MSIBI on the potentials set to be optimized. @@ -177,39 +183,126 @@ def run_optimization( are saved in their respective msibi.state.State directory. """ - for n in range(n_iterations): print(f"---Optimization: {n+1} of {n_iterations}---") + forces = self._build_force_objects() for state in self.states: state._run_simulation( n_steps=n_steps, - nlist=self.nlist, - nlist_exclusions=self.nlist_exclusions, + forces=forces, integrator_method=self.integrator_method, method_kwargs=self.method_kwargs, thermostat=self.thermostat, thermostat_kwargs=self.thermostat_kwargs, dt=self.dt, - r_cut=self.r_cut, seed=self.seed, iteration=self.n_iterations, gsd_period=self.gsd_period, - pairs=self.pairs, - bonds=self.bonds, - angles=self.angles, - dihedrals=self.dihedrals, backup_trajectories=backup_trajectories ) self._update_potentials() self.n_iterations += 1 - def _update_potentials(self): + def pickle_forces(self, file_path: str) -> None: + """Save the Hoomd objects for all forces to a single pickle file. + + Parameters + ---------- + file_path : str, required + The path and file name for the pickle file. + + Notes + ----- + Use this method as a convienent way to save and use the final + set of forces in your own Hoomd-Blue script, or with + flowerMD (https://github.com/cmelab/flowerMD). + + """ + forces = self._build_force_objects() + if len(forces) == 0: + raise RuntimeError( + "No forces have been created yet. See MSIBI.add_force()" + ) + f = open(file_path, "wb") + pickle.dump(forces, f) + + def _build_force_objects(self) -> list: + """Creates force objects for query simulations.""" + # Create pair objects + pair_force = None + for pair in self.pairs: + if not pair_force: # Only create hoomd.md.pair obj once + hoomd_pair_force = getattr(hoomd.md.pair, pair.force_init) + if pair.force_init == "Table": + pair_force = hoomd_pair_force( + nlist=self.nlist( + buffer=20, + exclusions=self.nlist_exclusions + ), + ) + else: + pair_force = hoomd_pair_force( + nlist=self.nlist( + buffer=20, + exclusions=self.nlist_exclusions + ), + ) + if pair.format == "table": + pair_force.params[pair._pair_name] = pair._table_entry() + pair_force.r_cut[pair._pair_name] = pair.r_cut + else: + pair_force.params[pair._pair_name] = pair.force_entry + # Create bond objects + bond_force = None + for bond in self.bonds: + if not bond_force: + hoomd_bond_force = getattr(hoomd.md.bond, bond.force_init) + if bond.force_init == "Table": + bond_force = hoomd_bond_force(width=bond.nbins + 1) + else: + bond_force = hoomd_bond_force() + if bond.format == "table": + bond_force.params[bond.name] = bond._table_entry() + else: + bond_force.params[bond.name] = bond.force_entry + # Create angle objects + angle_force = None + for angle in self.angles: + if not angle_force: + hoomd_angle_force = getattr(hoomd.md.angle, angle.force_init) + if angle.force_init == "Table": + angle_force = hoomd_angle_force(width=angle.nbins + 1) + else: + angle_force = hoomd_angle_force() + if angle.format == "table": + angle_force.params[angle.name] = angle._table_entry() + else: + angle_force.params[angle.name] = angle.force_entry + # Create dihedral objects + dihedral_force = None + for dih in self.dihedrals: + if not dihedral_force: + hoomd_dihedral_force = getattr( + hoomd.md.dihedral, dih.force_init + ) + if dih.force_init == "Table": + dihedral_force = hoomd_dihedral_force(width=dih.nbins + 1) + else: + dihedral_force = hoomd_dihedral_force() + if dih.format == "table": + dihedral_force.params[dih.name] = dih._table_entry() + else: + dihedral_force.params[dih.name] = dih.force_entry + forces = [pair_force, bond_force, angle_force, dihedral_force] + return [f for f in forces if f] # Filter out any None values + + def _update_potentials(self) -> None: """Update the potentials for the potentials to be optimized.""" for force in self._optimize_forces: self._recompute_distribution(force) force._update_potential() - def _recompute_distribution(self, force): + def _recompute_distribution(self, force: msibi.forces.Force) -> None: """Recompute the current distribution of bond lengths or angles""" for state in self.states: force._compute_current_distribution(state) diff --git a/msibi/potentials.py b/msibi/potentials.py index ddb9bb7..16ae67e 100644 --- a/msibi/potentials.py +++ b/msibi/potentials.py @@ -5,22 +5,6 @@ from msibi.utils.general import find_nearest -#TODO: Move this to the Force class -def save_table_potential(potential, r, dr, iteration, potential_file): - """Save the length, potential energy,force values to a text file.""" - F = -1.0 * np.gradient(potential, dr) - data = np.vstack([r, potential, F]) - # This file overwritten for each iteration, used during query sim. - np.savetxt(potential_file, data.T) - - if iteration != None: - basename = os.path.basename(potential_file) - basename = "step{0:d}.{1}".format(iteration, basename) - dirname = os.path.dirname(potential_file) - iteration_filename = os.path.join(dirname, basename) - # This file written for viewing evolution of potential. - np.savetxt(iteration_filename, data.T) - def quadratic_spring(x, x0, k4, k3, k2): """Creates a quadratic spring-like potential with the following form @@ -38,7 +22,11 @@ def mie(r, epsilon, sigma, m, n): """The Mie potential functional form""" prefactor = (m / (m - n)) * (m / n) ** (n / (m - n)) V_r = prefactor * epsilon * ((sigma / r) ** m - (sigma / r) ** n) - return V_r + return V_r + + +def lennard_jones(r, epsilon, sigma): + return mie(r=r, epsilon=epsilon, sigma=sigma, m=12, n=6) def pair_tail_correction(r, V, r_switch): diff --git a/msibi/state.py b/msibi/state.py index a674fc7..7d3341a 100644 --- a/msibi/state.py +++ b/msibi/state.py @@ -1,29 +1,28 @@ import os import shutil +from typing import Union import warnings - -import gsd import gsd.hoomd import hoomd -from msibi import MSIBI, utils class State(object): - """A single state used as part of a multistate optimization. + """ + A single state used as part of a multistate optimization. Parameters ---------- name : str State name used in creating state directory space and output files. - kT : float + kT : (Union[float, int]) Unitless heat energy (product of Boltzmann's constant and temperature). - traj_file : path to a gsd.hoomd.HOOMDTrajectory file + traj_file : path to a gsd.hoomd file The gsd trajectory associated with this state. - This trajectory is used to calcualte the target distributions used + This trajectory calcualtes the target distributions used during optimization. - alpha : float, default 1.0 - The alpha value used to scaale the weight of this state. + alpha : (Union[float, int]), default 1.0 + The alpha value used to scale the weight of this state. Attributes ---------- @@ -44,13 +43,12 @@ class State(object): def __init__( self, - name, - kT, - traj_file, - n_frames, - alpha=1.0, - exclude_bonded=True, - target_frames=None, + name: str, + kT: float, + traj_file: str, + n_frames: int, + alpha: float=1.0, + exclude_bonded: bool=True, _dir=None ): self.name = name @@ -72,47 +70,41 @@ def __repr__(self): ) @property - def n_frames(self): + def n_frames(self) -> int: + """The number of frames used in calculating distributions.""" return self._n_frames @n_frames.setter - def n_frames(self, value): + def n_frames(self, value: int): self._n_frames = value @property - def alpha(self): + def alpha(self) -> Union[int, float]: + """State point weighting value.""" return self._alpha @alpha.setter - def alpha(self, value): + def alpha(self, value: float): self._alpha = value def _run_simulation( self, - n_steps, - nlist, - nlist_exclusions, - integrator_method, - method_kwargs, - thermostat, - thermostat_kwargs, - dt, - seed, - r_cut, - iteration, - gsd_period, - pairs=None, - bonds=None, - angles=None, - dihedrals=None, - backup_trajectories=False - ): - """ - Contains the hoomd 4 script used to run each query simulation. + n_steps: int, + forces: list, + integrator_method: str, + method_kwargs: dict, + thermostat: str, + thermostat_kwargs: dict, + dt: float, + seed: int, + iteration: int, + gsd_period: int, + backup_trajectories: bool=False + ) -> None: + """Run the hoomd 4 script used to run each query simulation. This method is called in msibi.optimize. """ - device = hoomd.device.auto_select() sim = hoomd.simulation.Simulation(device=device) print(f"Starting simulation {iteration} for state {self}") @@ -121,74 +113,11 @@ def _run_simulation( with gsd.hoomd.open(self.traj_file, "r") as traj: last_snap = traj[-1] sim.create_state_from_snapshot(last_snap) - nlist = getattr(hoomd.md.nlist, nlist) - # Create pair objects - pair_force = None - for pair in pairs: - if not pair_force: # Only create hoomd.md.pair obj once - hoomd_pair_force = getattr(hoomd.md.pair, pair.force_init) - if pair.force_init == "Table": - pair_force = hoomd_pair_force(width=pair.nbins) - else: - pair_force = hoomd_pair_force( - nlist=nlist(buffer=20, exclusions=nlist_exclusions), - default_r_cut=r_cut - ) - param_name = (pair.name[0], pair.name[-1]) # Can't use pair.name - if pair.format == "table": - pair_force.params[param_name] = pair._table_entry() - else: - pair_force.params[param_name] = pair.force_entry - # Create bond objects - bond_force = None - for bond in bonds: - if not bond_force: - hoomd_bond_force = getattr(hoomd.md.bond, bond.force_init) - if bond.force_init == "Table": - bond_force = hoomd_bond_force(width=bond.nbins + 1) - else: - bond_force = hoomd_bond_force() - if bond.format == "table": - bond_force.params[bond.name] = bond._table_entry() - else: - bond_force.params[bond.name] = bond.force_entry - # Create angle objects - angle_force = None - for angle in angles: - if not angle_force: - hoomd_angle_force = getattr(hoomd.md.angle, angle.force_init) - if angle.force_init == "Table": - angle_force = hoomd_angle_force(width=angle.nbins + 1) - else: - angle_force = hoomd_angle_force() - if angle.format == "table": - angle_force.params[angle.name] = angle._table_entry() - else: - angle_force.params[angle.name] = angle.force_entry - # Create dihedral objects - dihedral_force = None - for dih in dihedrals: - if not dihedral_force: - hoomd_dihedral_force = getattr( - hoomd.md.dihedral, dih.force_init - ) - if dih.force_init == "Table": - dihedral_force = hoomd_dihedral_force(width=dih.nbins + 1) - else: - dihedral_force = hoomd_dihedral_force() - if dih.format == "table": - dihedral_force.params[dih.name] = dih._table_entry() - else: - dihedral_force.params[dih.name] = dih.force_entry - # Create integrator and integration method - forces = [pair_force, bond_force, angle_force, dihedral_force] integrator = hoomd.md.Integrator(dt=dt) - integrator.forces = [f for f in forces if f] # Filter out None - _thermostat = getattr(hoomd.md.methods.thermostats, thermostat) - thermostat = _thermostat(kT=self.kT, **thermostat_kwargs) - method = getattr(hoomd.md.methods, integrator_method) + integrator.forces = forces + thermostat = thermostat(kT=self.kT, **thermostat_kwargs) integrator.methods.append( - method( + integrator_method( filter=hoomd.filter.All(), thermostat=thermostat, **method_kwargs @@ -213,7 +142,7 @@ def _run_simulation( print(f"Finished simulation {iteration} for state {self}") print() - def _setup_dir(self, name, kT, dir_name=None): + def _setup_dir(self, name, kT, dir_name=None) -> str: """Create a state directory each time a new State is created.""" if dir_name is None: if not os.path.isdir("states"): diff --git a/msibi/utils/general.py b/msibi/utils/general.py index 6969eb9..dc4014c 100644 --- a/msibi/utils/general.py +++ b/msibi/utils/general.py @@ -5,7 +5,6 @@ import numpy as np from pkg_resources import resource_filename import gc -import msibi def find_nearest(array, target):