diff --git a/CHANGES b/CHANGES index f09d0fc8..4a2ebdb2 100644 --- a/CHANGES +++ b/CHANGES @@ -13,14 +13,19 @@ The rules for this file: * release numbers follow "Semantic Versioning" https://semver.org ------------------------------------------------------------------------------ -??/??/2022 orbeckst +??/??/2022 orbeckst, xiki-tempula * 1.0.0 Changes - Drop support for py3.7 (Issue #179, PR #214) + - forward_backward_convergence will use AutoMBAR as backend when `MBAR` is + selected as the estimator (PR #114). + - AutoMBAR accepts the `method` argument (PR #114). Enhancements + - Add a base class for workflows (PR #188). + - Add the ABFE workflow (PR #114). Fixes @@ -39,7 +44,6 @@ Changes - remove broken .zip support from util.anyopen() (PR #197) Enhancements - - Add a base class for workflows (PR #188). - Add filter function to gmx.extract to make it more robust (PR #183): can filter incomplete/corrupted lines (#126, #171) with filter=True. - Add support to util.anyopen() for taking filelike objects (PR #197) diff --git a/docs/api_principles.rst b/docs/api_principles.rst index 7abbe7f7..8fc6b613 100644 --- a/docs/api_principles.rst +++ b/docs/api_principles.rst @@ -65,7 +65,8 @@ The library is structured as follows, following a similar style to │   ├── ti_dhdl.py │   └── ... └── workflows ### WORK IN PROGRESS - └── ... + │   ├── base.py + │   ├── abfe.py * The :mod:`~alchemlyb.parsing` submodule contains parsers for individual MD engines, since the output files needed to perform alchemical free energy calculations vary widely and are not standardized. Each module at the very least provides an `extract_u_nk` function for extracting reduced potentials (needed for MBAR), as well as an `extract_dHdl` function for extracting derivatives required for thermodynamic integration. Other helper functions may be exposed for additional processing, such as generating an XVG file from an EDR file in the case of GROMACS. All `extract\_*` functions take similar arguments (a file path, parameters such as temperature), and produce standard outputs (:class:`pandas.DataFrame` for reduced potentials, :class:`pandas.Series` for derivatives). @@ -74,8 +75,8 @@ The library is structured as follows, following a similar style to * The :mod:`~alchemlyb.convergence` submodule features convenience functions/classes for doing convergence analysis using a given dataset and a chosen estimator. * The :mod:`~alchemlyb.postprocessors` submodule contains functions to calculate new quantities or express data in different units. * The :mod:`~alchemlyb.visualisation` submodule contains convenience plotting functions as known from, for example, `alchemical-analysis.py`_. -* The :mod:`~alchemlyb.workflows` submodule will contain complete analysis workflows that will serve as larger building blocks for complex analysis pipelines or a command line interface. - +* The :mod:`~alchemlyb.workflows` submodule contains complete analysis workflows ... + For example, :mod:`alchemlyb.workflows.abfe` implements a complete absolute binding free energy calculation.". All of these components lend themselves well to writing clear and flexible pipelines for processing data needed for alchemical free energy calculations, and furthermore allow for scaling up via libraries like `dask`_ or `joblib`_. diff --git a/docs/workflows.rst b/docs/workflows.rst index 3296edae..be747509 100644 --- a/docs/workflows.rst +++ b/docs/workflows.rst @@ -7,10 +7,16 @@ of the results and step-by-step version that allows more flexibility. For developers, the skeleton of the workflow should follow the example in :class:`alchemlyb.workflows.base.WorkflowBase`. +For users, **alchemlyb** offers a workflow :class:`alchemlyb.workflows.ABFE` +similar to +`Alchemical Analysis `_ +for doing automatic absolute binding free energy (ABFE) analysis. + .. currentmodule:: alchemlyb.workflows .. autosummary:: :toctree: workflows base + ABFE diff --git a/docs/workflows/alchemlyb.workflows.ABFE.rst b/docs/workflows/alchemlyb.workflows.ABFE.rst new file mode 100644 index 00000000..2130f145 --- /dev/null +++ b/docs/workflows/alchemlyb.workflows.ABFE.rst @@ -0,0 +1,161 @@ +The ABFE workflow +================== +The *Absolute binding free energy* (ABFE) workflow provides a complete workflow +that uses the energy files generated by MD engine as input and generates the +binding free energy as well as the analysis plots. + +Fully Automatic analysis +------------------------ +*Absolute binding free energy* (ABFE) calculations can be analyzed with +two lines of code in a fully automated manner (similar to +`Alchemical Analysis `_). +In this case, any parameters are set when invoking :class:`~alchemlyb.workflows.abfe.ABFE` +and reasonable defaults are chosen for any parameters not set explicitly. The two steps +are to + +1. initialize an instance of the :class:`~alchemlyb.workflows.abfe.ABFE` class +2. invoke the :meth:`~alchemlyb.workflows.ABFE.run` method to execute + complete workflow. + +For a GROMACS ABFE simulation, executing the workflow would look similar +to the following code (The log is configured by logger). :: + + >>> from alchemtest.gmx import load_ABFE + >>> from alchemlyb.workflows import ABFE + >>> # Enable the logger + >>> import logging + >>> logging.basicConfig(filename='ABFE.log', level=logging.INFO) + >>> # Obtain the path of the data + >>> import os + >>> dir = os.path.dirname(load_ABFE()['data']['complex'][0]) + >>> print(dir) + 'alchemtest/gmx/ABFE/complex' + >>> workflow = ABFE(units='kcal/mol', software='Gromacs', dir=dir, + >>> prefix='dhdl', suffix='xvg', T=298, outdirectory='./') + >>> workflow.run(skiptime=10, uncorr='dhdl', threshold=50, + >>> methods=('MBAR', 'BAR', 'TI'), overlap='O_MBAR.pdf', + >>> breakdown=True, forwrev=10) + + +The workflow uses the :class:`~alchemlyb.parsing` to parse the data from the +energy files, remove the initial unequilibrated frames and decorrelate the data +with :class:`~alchemlyb.preprocessing.subsampling`. The decorrelated dataset +:ref:`dHdl ` and :ref:`u_nk ` are then passed to +:class:`~alchemlyb.estimators` for free energy estimation. The workflow will +also perform a set of analysis that allows the user to examine the quality of +the estimation. + +File Input +^^^^^^^^^^ + +This command expects the energy files to be structured in two common ways. It +could either be :: + + simulation + ├── lambda_0 + │   ├── prod.xvg + │   └── ... + ├── lambda_1 + │   ├── prod.xvg + │   └── ... + └── ... + +Where :code:`dir='simulation/lambda_*', prefix='prod', suffix='xvg'`. Or :: + + dhdl_files + ├── dhdl_0.xvg + ├── dhdl_1.xvg + └── ... + +Where :code:`dir='dhdl_files', prefix='dhdl_', suffix='xvg'`. + +Output +^^^^^^ + +The workflow returns the free energy estimate using all of +:class:`~alchemlyb.estimators.TI`, :class:`~alchemlyb.estimators.BAR`, +:class:`~alchemlyb.estimators.MBAR`. For ABFE calculations, the alchemical +transformation is usually done is three stages, the *bonded*, *coul* and *vdw* +which corresponds to the free energy contribution from applying the +restraint to restrain the ligand to the protein, decouple/annihilate the +coulombic interaction between the ligand and the protein and +decouple/annihilate the protein-ligand lennard jones interactions. The result +will be stored in :attr:`~alchemlyb.workflows.ABFE.summary` as +:class:`pandas.Dataframe`. :: + + + MBAR MBAR_Error BAR BAR_Error TI TI_Error + States 0 -- 1 0.065967 0.001293 0.066544 0.001661 0.066663 0.001675 + 1 -- 2 0.089774 0.001398 0.089303 0.002101 0.089566 0.002144 + 2 -- 3 0.132036 0.001638 0.132687 0.002990 0.133292 0.003055 + ... + 26 -- 27 1.243745 0.011239 1.245873 0.015711 1.248959 0.015762 + 27 -- 28 1.128429 0.012859 1.124554 0.016999 1.121892 0.016962 + 28 -- 29 1.010313 0.016442 1.005444 0.017692 1.019747 0.017257 + Stages coul 10.215658 0.033903 10.017838 0.041839 10.017854 0.048744 + vdw 22.547489 0.098699 22.501150 0.060092 22.542936 0.106723 + bonded 2.374144 0.014995 2.341631 0.005507 2.363828 0.021078 + TOTAL 35.137291 0.103580 34.860619 0.087022 34.924618 0.119206 + +Output Files +^^^^^^^^^^^^ + +For quality assessment, a couple of plots were generated and written to +the folder specified by `outdirectory`. + +The :ref:`overlay matrix for the MBAR estimator ` will be +plotted and saved to :file:`O_MBAR.pdf`, which examines the overlap between +different lambda windows. + +The :ref:`dHdl for TI ` will be plotted to +:file:`dhdl_TI.pdf`, allows one to examine if the lambda scheduling has +covered the change of the gradient in the lambda space. + +The :ref:`dF states ` will be plotted to :file:`dF_state.pdf` in +portrait model and :file:`dF_state_long.pdf` in landscape model, which +allows the user to example the contributions from each lambda window. + +The forward and backward convergence will be plotted to :file:`dF_t.pdf` using +:class:`~alchemlyb.estimators.MBAR` and saved in +:attr:`~alchemlyb.workflows.ABFE.convergence`, which allows the user to +examine if the simulation time is enough to achieve a converged result. + +Semi-automatic analysis +----------------------- +The same analysis could also performed in steps allowing access and modification +to the data generated at each stage of the analysis. :: + + >>> from alchemtest.gmx import load_ABFE + >>> from alchemlyb.workflows import ABFE + >>> # Obtain the path of the data + >>> import os + >>> dir = os.path.dirname(load_ABFE()['data']['complex'][0]) + >>> print(dir) + 'alchemtest/gmx/ABFE/complex' + >>> # Load the data + >>> workflow = ABFE(software='Gromacs', dir=dir, + >>> prefix='dhdl', suffix='xvg', T=298, outdirectory='./') + >>> # Set the unit. + >>> workflow.update_units('kcal/mol') + >>> # Read the data + >>> workflow.read() + >>> # Decorrelate the data. + >>> workflow.preprocess(skiptime=10, uncorr='dhdl', threshold=50) + >>> # Run the estimator + >>> workflow.estimate(methods=('mbar', 'bar', 'ti')) + >>> # Retrieve the result + >>> summary = workflow.generate_result() + >>> # Plot the overlap matrix + >>> workflow.plot_overlap_matrix(overlap='O_MBAR.pdf') + >>> # Plot the dHdl for TI + >>> workflow.plot_ti_dhdl(dhdl_TI='dhdl_TI.pdf') + >>> # Plot the dF states + >>> workflow.plot_dF_state(dF_state='dF_state.pdf') + >>> # Convergence analysis + >>> workflow.check_convergence(10, dF_t='dF_t.pdf') + +API Reference +------------- +.. autoclass:: alchemlyb.workflows.ABFE + :members: + :inherited-members: \ No newline at end of file diff --git a/src/alchemlyb/convergence/convergence.py b/src/alchemlyb/convergence/convergence.py index 17ce716e..bc6bbdf3 100644 --- a/src/alchemlyb/convergence/convergence.py +++ b/src/alchemlyb/convergence/convergence.py @@ -1,12 +1,15 @@ -import pandas as pd import logging +from warnings import warn + +import pandas as pd import numpy as np -from ..estimators import MBAR, BAR, TI, AutoMBAR +from ..estimators import BAR, TI, FEP_ESTIMATORS, TI_ESTIMATORS +from ..estimators import AutoMBAR as MBAR from .. import concat -def forward_backward_convergence(df_list, estimator='mbar', num=10): +def forward_backward_convergence(df_list, estimator='MBAR', num=10, **kwargs): '''Forward and backward convergence of the free energy estimate. Generate the free energy estimate as a function of time in both directions, @@ -20,10 +23,16 @@ def forward_backward_convergence(df_list, estimator='mbar', num=10): ---------- df_list : list List of DataFrame of either dHdl or u_nk. - estimator : {'mbar', 'bar', 'ti', 'autombar'} + estimator : {'MBAR', 'BAR', 'TI'} Name of the estimators. + See the important note below on the use of "MBAR". + + .. deprecated:: 1.0.0 + Lower case input is also accepted until release 2.0.0. num : int The number of time points. + kwargs : dict + Keyword arguments to be passed to the estimator. Returns ------- @@ -43,29 +52,40 @@ def forward_backward_convergence(df_list, estimator='mbar', num=10): 9 3.044149 0.016405 3.044385 0.016402 1.0 + Note + ----- + :class:`~alchemlyb.estimators.AutoMBAR` is used when ``estimator='MBAR'``, + supply ``method`` keyword to restore the behavior of + :class:`~alchemlyb.estimators.MBAR`. + (:code:`forward_backward_convergence(u_nk, 'MBAR', num=2, method='adaptive')`) + + .. versionadded:: 0.6.0 + .. versionchanged:: 1.0.0 + The ``estimator`` accepts uppercase input. + The default for using ``estimator='MBAR'`` was changed from + :class:`~alchemlyb.estimators.MBAR` to :class:`~alchemlyb.estimators.AutoMBAR`. ''' logger = logging.getLogger('alchemlyb.convergence.' 'forward_backward_convergence') logger.info('Start convergence analysis.') logger.info('Check data availability.') + if estimator.upper() != estimator: + warn("Using lower-case strings for the 'estimator' kwarg in " + "convergence.forward_backward_convergence() is deprecated in " + "1.0.0 and only upper case will be accepted in 2.0.0", + DeprecationWarning) + estimator = estimator.upper() - if estimator.lower() == 'mbar': - logger.info('Use MBAR estimator for convergence analysis.') - estimator_fit = MBAR().fit - elif estimator.lower() == 'autombar': - logger.info('Use AutoMBAR estimator for convergence analysis.') - estimator_fit = AutoMBAR().fit - elif estimator.lower() == 'bar': - logger.info('Use BAR estimator for convergence analysis.') - estimator_fit = BAR().fit - elif estimator.lower() == 'ti': - logger.info('Use TI estimator for convergence analysis.') - estimator_fit = TI().fit + if estimator not in (FEP_ESTIMATORS + TI_ESTIMATORS): + msg = f"Estimator {estimator} is not available in {FEP_ESTIMATORS + TI_ESTIMATORS}." + logger.error(msg) + raise ValueError(msg) else: - raise ValueError( - '{} is not a valid estimator.'.format(estimator)) + # select estimator class by name + estimator_fit = globals()[estimator](**kwargs).fit + logger.info(f'Use {estimator} estimator for convergence analysis.') logger.info('Begin forward analysis') forward_list = [] diff --git a/src/alchemlyb/estimators/__init__.py b/src/alchemlyb/estimators/__init__.py index c05fb93c..ca48015b 100644 --- a/src/alchemlyb/estimators/__init__.py +++ b/src/alchemlyb/estimators/__init__.py @@ -1,3 +1,6 @@ from .mbar_ import MBAR, AutoMBAR from .bar_ import BAR from .ti_ import TI + +FEP_ESTIMATORS = [MBAR.__name__, AutoMBAR.__name__, BAR.__name__] +TI_ESTIMATORS = [TI.__name__] diff --git a/src/alchemlyb/estimators/mbar_.py b/src/alchemlyb/estimators/mbar_.py index e40925b7..832fde09 100644 --- a/src/alchemlyb/estimators/mbar_.py +++ b/src/alchemlyb/estimators/mbar_.py @@ -152,6 +152,17 @@ class AutoMBAR(MBAR): :class:`AutoMBAR` may be useful in high-throughput calculations where it can avoid failures due non-converged MBAR estimates. + Parameters + ---------- + + method : str, optional, default=None + The optimization routine to use. This parameter defaults to ``None``. + When a specific method is set, AutoMBAR will behave in the same way + as MBAR. + + .. versionadded:: 1.0.0 + + Note ---- All arguments are described under :class:`MBAR` except that the solver method @@ -163,30 +174,35 @@ class AutoMBAR(MBAR): .. versionadded:: 0.6.0 + .. versionchanged:: 1.0.0 + AutoMBAR accepts the `method` argument. """ def __init__(self, maximum_iterations=10000, relative_tolerance=1.0e-7, - initial_f_k=None, verbose=False): + initial_f_k=None, verbose=False, method=None): super().__init__(maximum_iterations=maximum_iterations, relative_tolerance=relative_tolerance, initial_f_k=initial_f_k, - verbose=verbose, method=None) + verbose=verbose, method=method) self.logger = logging.getLogger('alchemlyb.estimators.AutoMBAR') def _do_MBAR(self, u_nk, N_k, solver_protocol): - self.logger.info('Initialise the automatic routine of the MBAR ' - 'estimator.') - # Try the fastest method first - try: - self.logger.info('Trying the hybr method.') - solver_protocol["method"] = 'hybr' - mbar, out = super()._do_MBAR(u_nk, N_k, solver_protocol) - except pymbar.utils.ParameterError: + if solver_protocol["method"] is None: + self.logger.info('Initialise the automatic routine of the MBAR ' + 'estimator.') + # Try the fastest method first try: - self.logger.info('Trying the adaptive method.') - solver_protocol["method"] = 'adaptive' + self.logger.info('Trying the hybr method.') + solver_protocol["method"] = 'hybr' mbar, out = super()._do_MBAR(u_nk, N_k, solver_protocol) except pymbar.utils.ParameterError: - self.logger.info('Trying the BFGS method.') - solver_protocol["method"] = 'BFGS' - mbar, out = super()._do_MBAR(u_nk, N_k, solver_protocol) - return mbar, out + try: + self.logger.info('Trying the adaptive method.') + solver_protocol["method"] = 'adaptive' + mbar, out = super()._do_MBAR(u_nk, N_k, solver_protocol) + except pymbar.utils.ParameterError: + self.logger.info('Trying the BFGS method.') + solver_protocol["method"] = 'BFGS' + mbar, out = super()._do_MBAR(u_nk, N_k, solver_protocol) + return mbar, out + else: + return super()._do_MBAR(u_nk, N_k, solver_protocol) diff --git a/src/alchemlyb/tests/test_convergence.py b/src/alchemlyb/tests/test_convergence.py index 7013d119..6520713c 100644 --- a/src/alchemlyb/tests/test_convergence.py +++ b/src/alchemlyb/tests/test_convergence.py @@ -30,7 +30,7 @@ def test_convergence_mbar(gmx_benzene): def test_convergence_autombar(gmx_benzene): dHdl, u_nk = gmx_benzene - convergence = forward_backward_convergence(u_nk, 'AutoMBAR') + convergence = forward_backward_convergence(u_nk, 'MBAR') assert convergence.shape == (10, 5) assert convergence.iloc[0, 0] == pytest.approx(3.02, 0.01) assert convergence.iloc[0, 2] == pytest.approx(3.06, 0.01) @@ -48,5 +48,15 @@ def test_convergence_bar(gmx_benzene): def test_convergence_wrong_estimator(gmx_benzene): dHdl, u_nk = gmx_benzene - with pytest.raises(ValueError, match="{} is not a valid estimator".format("www")): - convergence = forward_backward_convergence(u_nk, 'www') + with pytest.raises(ValueError, match="is not available in"): + forward_backward_convergence(u_nk, 'WWW') + +def test_convergence_wrong_cases(gmx_benzene): + dHdl, u_nk = gmx_benzene + with pytest.warns(DeprecationWarning, match="Using lower-case strings for"): + forward_backward_convergence(u_nk, 'mbar') + +def test_convergence_method(gmx_benzene): + dHdl, u_nk = gmx_benzene + convergence = forward_backward_convergence(u_nk, 'MBAR', num=2, method='adaptive') + assert len(convergence) == 2 diff --git a/src/alchemlyb/tests/test_visualisation.py b/src/alchemlyb/tests/test_visualisation.py index bcaf4a32..d0e34fb2 100644 --- a/src/alchemlyb/tests/test_visualisation.py +++ b/src/alchemlyb/tests/test_visualisation.py @@ -130,7 +130,7 @@ def test_plot_dF_state(): def test_plot_convergence_dataframe(): bz = load_benzene().data data_list = [extract_u_nk(xvg, T=300) for xvg in bz['Coulomb']] - df = forward_backward_convergence(data_list, 'mbar') + df = forward_backward_convergence(data_list, 'MBAR') ax = plot_convergence(df) assert isinstance(ax, matplotlib.axes.Axes) plt.close(ax.figure) diff --git a/src/alchemlyb/tests/test_workflow_ABFE.py b/src/alchemlyb/tests/test_workflow_ABFE.py new file mode 100644 index 00000000..7c4807c1 --- /dev/null +++ b/src/alchemlyb/tests/test_workflow_ABFE.py @@ -0,0 +1,378 @@ +import numpy as np +import pytest +import os + +from alchemlyb.workflows.abfe import ABFE +from alchemtest.gmx import load_ABFE, load_benzene +from alchemtest.amber import load_bace_example + +class Test_automatic_ABFE(): + '''Test the full automatic workflow for load_ABFE from alchemtest.gmx for + three stage transformation.''' + + @staticmethod + @pytest.fixture(scope='session') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir = os.path.dirname(load_ABFE()['data']['complex'][0]) + workflow = ABFE(units='kcal/mol', software='GROMACS', dir=dir, + prefix='dhdl', suffix='xvg', T=310, outdirectory=str(outdir)) + workflow.run(skiptime=10, uncorr='dhdl', threshold=50, + estimators=('MBAR', 'BAR', 'TI'), overlap='O_MBAR.pdf', + breakdown=True, forwrev=10) + return workflow + + def test_read(self, workflow): + '''test if the files has been loaded correctly.''' + assert len(workflow.u_nk_list) == 30 + assert len(workflow.dHdl_list) == 30 + assert all([len(u_nk) == 1001 for u_nk in workflow.u_nk_list]) + assert all([len(dHdl) == 1001 for dHdl in workflow.dHdl_list]) + + def test_subsample(self, workflow): + '''Test if the data has been shrinked by subsampling.''' + assert len(workflow.u_nk_sample_list) == 30 + assert len(workflow.dHdl_sample_list) == 30 + assert all([len(u_nk) < 1001 for u_nk in workflow.u_nk_sample_list]) + assert all([len(dHdl) < 1001 for dHdl in workflow.dHdl_sample_list]) + + def test_estimator(self, workflow): + '''Test if all three estimators have been used.''' + assert len(workflow.estimator) == 3 + assert 'MBAR' in workflow.estimator + assert 'TI' in workflow.estimator + assert 'BAR' in workflow.estimator + + def test_summary(self, workflow): + '''Test if if the summary is right.''' + summary = workflow.generate_result() + assert np.isclose(summary['MBAR']['Stages']['TOTAL'], 21.8, 0.1) + assert np.isclose(summary['TI']['Stages']['TOTAL'], 21.8, 0.1) + assert np.isclose(summary['BAR']['Stages']['TOTAL'], 21.8, 0.1) + + def test_plot_O_MBAR(self, workflow): + '''test if the O_MBAR.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'O_MBAR.pdf')) + + def test_plot_dhdl_TI(self, workflow): + '''test if the dhdl_TI.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dhdl_TI.pdf')) + + def test_plot_dF_state(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_state.pdf')) + assert os.path.isfile(os.path.join(workflow.out, 'dF_state_long.pdf')) + + def test_check_convergence(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_t.pdf')) + assert len(workflow.convergence) == 10 + + def test_estimator_method(self, workflow, monkeypatch): + '''Test if the method keyword could be passed to the AutoMBAR estimator.''' + monkeypatch.setattr(workflow, 'estimator', + dict()) + workflow.estimate(estimators='MBAR', method='adaptive') + assert 'MBAR' in workflow.estimator + + def test_convergence_method(self, workflow, monkeypatch): + '''Test if the method keyword could be passed to the AutoMBAR estimator from convergence.''' + monkeypatch.setattr(workflow, 'convergence', None) + workflow.check_convergence(2, estimator='MBAR', method='adaptive') + assert len(workflow.convergence) == 2 + +class Test_manual_ABFE(Test_automatic_ABFE): + '''Test the manual workflow for load_ABFE from alchemtest.gmx for three + stage transformation.''' + + @staticmethod + @pytest.fixture(scope='session') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir = os.path.dirname(load_ABFE()['data']['complex'][0]) + workflow = ABFE(software='GROMACS', dir=dir, prefix='dhdl', + suffix='xvg', T=310, outdirectory=str(outdir)) + workflow.update_units('kcal/mol') + workflow.read() + workflow.preprocess(skiptime=10, uncorr='dhdl', threshold=50) + workflow.estimate(estimators=('MBAR', 'BAR', 'TI')) + workflow.plot_overlap_matrix(overlap='O_MBAR.pdf') + workflow.plot_ti_dhdl(dhdl_TI='dhdl_TI.pdf') + workflow.plot_dF_state(dF_state='dF_state.pdf') + workflow.check_convergence(10, dF_t='dF_t.pdf') + return workflow + + def test_plot_dF_state(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_state.pdf')) + + def test_convergence_nosample_u_nk(self, workflow, monkeypatch): + '''test if the convergence routine would use the unsampled data + when the data has not been subsampled.''' + monkeypatch.setattr(workflow, 'u_nk_sample_list', + None) + workflow.check_convergence(10) + assert len(workflow.convergence) == 10 + + def test_dhdl_TI_noTI(self, workflow, monkeypatch): + '''Test to plot the dhdl_TI when ti estimator is not there''' + no_TI = workflow.estimator + no_TI.pop('TI') + monkeypatch.setattr(workflow, 'estimator', + no_TI) + with pytest.raises(ValueError): + workflow.plot_ti_dhdl(dhdl_TI='dhdl_TI.pdf') + + def test_noMBAR_for_plot_overlap_matrix(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'estimator', {}) + assert workflow.plot_overlap_matrix() is None + + def test_no_u_nk_for_check_convergence(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'u_nk_list', None) + monkeypatch.setattr(workflow, 'u_nk_sample_list', None) + with pytest.raises(ValueError): + workflow.check_convergence(10, estimator='MBAR') + + def test_no_dHdl_for_check_convergence(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'dHdl_list', None) + monkeypatch.setattr(workflow, 'dHdl_sample_list', None) + with pytest.raises(ValueError): + workflow.check_convergence(10, estimator='TI') + + def test_no_update_units(self, workflow): + assert workflow.update_units() is None + + def test_no_name_estimate(self, workflow): + with pytest.raises(ValueError): + workflow.estimate('aaa') + + +class Test_automatic_benzene(): + '''Test the full automatic workflow for load_benzene from alchemtest.gmx for + single stage transformation.''' + + @staticmethod + @pytest.fixture(scope='session') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir = os.path.dirname(os.path.dirname( + load_benzene()['data']['Coulomb'][0])) + dir = os.path.join(dir, '*') + workflow = ABFE(units='kcal/mol', software='GROMACS', dir=dir, + prefix='dhdl', suffix='bz2', T=310, + outdirectory=outdir) + workflow.run(skiptime=0, uncorr='dhdl', threshold=50, + estimators=('MBAR', 'BAR', 'TI'), overlap='O_MBAR.pdf', + breakdown=True, forwrev=10) + return workflow + + def test_read(self, workflow): + '''test if the files has been loaded correctly.''' + assert len(workflow.u_nk_list) == 5 + assert len(workflow.dHdl_list) == 5 + assert all([len(u_nk) == 4001 for u_nk in workflow.u_nk_list]) + assert all([len(dHdl) == 4001 for dHdl in workflow.dHdl_list]) + + def test_estimator(self, workflow): + '''Test if all three estimators have been used.''' + assert len(workflow.estimator) == 3 + assert 'MBAR' in workflow.estimator + assert 'TI' in workflow.estimator + assert 'BAR' in workflow.estimator + + def test_O_MBAR(self, workflow): + '''test if the O_MBAR.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'O_MBAR.pdf')) + + def test_dhdl_TI(self, workflow): + '''test if the dhdl_TI.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dhdl_TI.pdf')) + + def test_dF_state(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_state.pdf')) + + def test_convergence(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_t.pdf')) + assert len(workflow.convergence) == 10 + +class Test_unpertubed_lambda(): + '''Test the if two lamdas present and one of them is not pertubed. + + fep bound + time fep-lambda bound-lambda + 0.0 0.5 0 12.958159 0 + 10.0 0.5 0 -1.062968 0 + 20.0 0.5 0 1.019020 0 + 30.0 0.5 0 5.029051 0 + 40.0 0.5 0 7.768072 0 + + Where only fep-lambda changes but the bonded-lambda is always 0. + ''' + + @staticmethod + @pytest.fixture(scope='session') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir = os.path.dirname(os.path.dirname( + load_benzene()['data']['Coulomb'][0])) + dir = os.path.join(dir, '*') + workflow = ABFE(software='GROMACS', dir=dir, prefix='dhdl', + suffix='bz2', T=310, outdirectory=outdir) + workflow.read() + # Block the n_uk + workflow.u_nk_list = [] + # Add another lambda column + for dHdl in workflow.dHdl_list: + dHdl.insert(1, 'bound-lambda', [1.0, ] * len(dHdl)) + dHdl.insert(1, 'bound', [1.0, ] * len(dHdl)) + dHdl.set_index('bound-lambda', append=True, inplace=True) + + workflow.estimate(estimators=('TI', )) + workflow.plot_ti_dhdl(dhdl_TI='dhdl_TI.pdf') + workflow.plot_dF_state(dF_state='dF_state.pdf') + workflow.check_convergence(10, dF_t='dF_t.pdf', estimator='TI') + return workflow + + def test_dhdl_TI(self, workflow): + '''test if the dhdl_TI.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dhdl_TI.pdf')) + + def test_dF_state(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_state.pdf')) + + def test_convergence(self, workflow): + '''test if the dF_state.pdf has been plotted.''' + assert os.path.isfile(os.path.join(workflow.out, 'dF_t.pdf')) + assert len(workflow.convergence) == 10 + + def test_single_estimator_ti(self, workflow): + workflow.estimate(estimators='TI') + summary = workflow.generate_result() + assert np.isclose(summary['TI']['Stages']['TOTAL'], 2.946, 0.1) + +class Test_methods(): + '''Test various methods.''' + + @staticmethod + @pytest.fixture(scope='class') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir = os.path.dirname(os.path.dirname( + load_benzene()['data']['Coulomb'][0])) + dir = os.path.join(dir, '*') + workflow = ABFE(software='GROMACS', dir=dir, prefix='dhdl', + suffix='bz2', T=310, outdirectory=outdir) + workflow.read() + return workflow + + def test_run_none(self, workflow): + '''Don't run anything''' + workflow.run(uncorr=None, estimators=None, overlap=None, breakdown=None, + forwrev=None) + + def test_read_invalid_u_nk(self, workflow, monkeypatch): + def extract_u_nk(self, T): + raise IOError('Error read u_nk.') + monkeypatch.setattr(workflow, '_extract_u_nk', + extract_u_nk) + with pytest.raises(OSError, + match=r'Error reading u_nk .*dhdl\.xvg\.bz2'): + workflow.read() + + def test_read_invalid_dHdl(self, workflow, monkeypatch): + def extract_dHdl(self, T): + raise IOError('Error read dHdl.') + monkeypatch.setattr(workflow, '_extract_dHdl', + extract_dHdl) + with pytest.raises(OSError, + match=r'Error reading dHdl .*dhdl\.xvg\.bz2'): + workflow.read() + + def test_uncorr_threshold(self, workflow, monkeypatch): + '''Test if the full data will be used when the number of data points + are less than the threshold.''' + monkeypatch.setattr(workflow, 'u_nk_list', + [u_nk[:40] for u_nk in workflow.u_nk_list]) + monkeypatch.setattr(workflow, 'dHdl_list', + [dHdl[:40] for dHdl in workflow.dHdl_list]) + workflow.preprocess(threshold=50) + assert all([len(u_nk) == 40 for u_nk in workflow.u_nk_sample_list]) + assert all([len(dHdl) == 40 for dHdl in workflow.dHdl_sample_list]) + + def test_no_u_nk_preprocess(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'u_nk_list', []) + workflow.preprocess(threshold=50) + assert len(workflow.u_nk_list) == 0 + + def test_no_dHdl_preprocess(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'dHdl_list', []) + workflow.preprocess(threshold=50) + assert len(workflow.dHdl_list) == 0 + + def test_single_estimator_mbar(self, workflow): + workflow.estimate(estimators='MBAR') + assert len(workflow.estimator) == 1 + assert 'MBAR' in workflow.estimator + summary = workflow.generate_result() + assert np.isclose(summary['MBAR']['Stages']['TOTAL'], 2.946, 0.1) + + def test_single_estimator_ti(self, workflow): + workflow.estimate(estimators='TI') + summary = workflow.generate_result() + assert np.isclose(summary['TI']['Stages']['TOTAL'], 2.946, 0.1) + + def test_bar_convergence(self, workflow): + workflow.check_convergence(10, estimator='BAR') + assert len(workflow.convergence) == 10 + + def test_convergence_invalid_estimator(self, workflow): + with pytest.raises(ValueError): + workflow.check_convergence(10, estimator='aaa') + + def test_ti_convergence(self, workflow): + workflow.check_convergence(10, estimator='TI') + assert len(workflow.convergence) == 10 + + def test_unprocessed_n_uk(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'u_nk_sample_list', + None) + workflow.estimate() + assert len(workflow.estimator) == 3 + assert 'MBAR' in workflow.estimator + + def test_unprocessed_dhdl(self, workflow, monkeypatch): + monkeypatch.setattr(workflow, 'dHdl_sample_list', + None) + workflow.check_convergence(10, estimator='TI') + assert len(workflow.convergence) == 10 + +class Test_automatic_amber(): + '''Test the full automatic workflow for load_ABFE from alchemtest.amber for + three stage transformation.''' + + @staticmethod + @pytest.fixture(scope='session') + def workflow(tmp_path_factory): + outdir = tmp_path_factory.mktemp("out") + dir, _ = os.path.split( + os.path.dirname(load_bace_example()['data']['complex']['vdw'][0])) + + workflow = ABFE(units='kcal/mol', software='AMBER', dir=dir, + prefix='ti', suffix='bz2', T=310, outdirectory=str( + outdir)) + workflow.read() + workflow.estimate(estimators='TI') + return workflow + + def test_summary(self, workflow): + '''Test if if the summary is right.''' + summary = workflow.generate_result() + assert np.isclose(summary['TI']['Stages']['TOTAL'], 1.40405980473, 0.1) + +def test_no_parser(): + with pytest.raises(NotImplementedError): + workflow = ABFE(units='kcal/mol', software='aaa', + prefix='ti', suffix='bz2', T=310) diff --git a/src/alchemlyb/workflows/__init__.py b/src/alchemlyb/workflows/__init__.py index 5823a467..6b35d460 100644 --- a/src/alchemlyb/workflows/__init__.py +++ b/src/alchemlyb/workflows/__init__.py @@ -1,3 +1,4 @@ __all__ = [ 'base', ] +from .abfe import ABFE diff --git a/src/alchemlyb/workflows/abfe.py b/src/alchemlyb/workflows/abfe.py new file mode 100644 index 00000000..00e3f76a --- /dev/null +++ b/src/alchemlyb/workflows/abfe.py @@ -0,0 +1,723 @@ +import os +from os.path import join +from glob import glob +import pandas as pd +import numpy as np +import logging +import matplotlib.pyplot as plt + +from .base import WorkflowBase +from ..parsing import gmx, amber +from ..preprocessing.subsampling import decorrelate_dhdl, decorrelate_u_nk +from ..estimators import BAR, TI, FEP_ESTIMATORS, TI_ESTIMATORS +from ..estimators import AutoMBAR as MBAR +from ..visualisation import (plot_mbar_overlap_matrix, plot_ti_dhdl, + plot_dF_state, plot_convergence) +from ..postprocessors.units import get_unit_converter +from ..convergence import forward_backward_convergence +from .. import concat +from .. import __version__ + + +class ABFE(WorkflowBase): + '''Workflow for absolute and relative binding free energy calculations. + + This workflow provides functionality similar to the ``alchemical-analysis.py`` script. + It loads multiple input files from alchemical free energy calculations and computes the + free energies between different alchemical windows using different estimators. It + produces plots to aid in the assessment of convergence. + + Parameters + ---------- + T : float + Temperature in K. + units : str + The unit used for printing and plotting results. {'kcal/mol', 'kJ/mol', + 'kT'}. Default: 'kT'. + software : str + The software used for generating input (case-insensitive). {'GROMACS', 'AMBER'}. + This option chooses the appropriate parser for the input file. + dir : str + Directory in which data files are stored. Default: os.path.curdir. + The input files are searched using the pattern of + ``dir + '/**/' + prefix + '*' + suffix``. + prefix : str + Prefix for datafile sets. Default: 'dhdl'. + suffix : str + Suffix for datafile sets. Default: 'xvg'. + outdirectory : str + Directory in which the output files produced by this script will be + stored. Default: os.path.curdir. + ignore_warnings : bool + Turn all errors into warnings. + + Attributes + ---------- + logger : Logger + The logging object. + file_list : list + The list of filenames sorted by the lambda state. + ''' + def __init__(self, T, units='kT', software='GROMACS', dir=os.path.curdir, + prefix='dhdl', suffix='xvg', + outdirectory=os.path.curdir, + ignore_warnings=False): + + super().__init__(units, software, T, outdirectory) + self.ignore_warnings = ignore_warnings + self.logger = logging.getLogger('alchemlyb.workflows.ABFE') + self.logger.info('Initialise Alchemlyb ABFE Workflow') + self.logger.info(f'Alchemlyb Version: f{__version__}') + self.logger.info(f'Set Temperature to {T} K.') + self.logger.info(f'Set Software to {software}.') + + self.update_units(units) + + self.logger.info(f'Finding files with prefix: {prefix}, suffix: ' + f'{suffix} under directory {dir} produced by ' + f'{software}') + self.file_list = glob(dir + '/**/' + prefix + '*' + suffix, + recursive=True) + + self.logger.info(f'Found {len(self.file_list)} xvg files.') + self.logger.info("Unsorted file list: \n %s", '\n'.join( + self.file_list)) + + if software == 'GROMACS': + self.logger.info(f'Using {software} parser to read the data.') + self._extract_u_nk = gmx.extract_u_nk + self._extract_dHdl = gmx.extract_dHdl + elif software == 'AMBER': + self._extract_u_nk = amber.extract_u_nk + self._extract_dHdl = amber.extract_dHdl + else: + raise NotImplementedError(f'{software} parser not found.') + + def read(self): + '''Read the u_nk and dHdL data from the + :attr:`~alchemlyb.workflows.ABFE.file_list` + + Attributes + ---------- + u_nk_list : list + A list of :class:`pandas.DataFrame` of u_nk. + dHdl_list : list + A list of :class:`pandas.DataFrame` of dHdl. + ''' + u_nk_list = [] + dHdl_list = [] + for file in self.file_list: + try: + u_nk = self._extract_u_nk(file, T=self.T) + self.logger.info( + f'Reading {len(u_nk)} lines of u_nk from {file}') + u_nk_list.append(u_nk) + except Exception as exc: + msg = f'Error reading u_nk from {file}.' + if self.ignore_warnings: + self.logger.exception(msg + f'\n{exc}\n' + + 'This exception is being ignored because ignore_warnings=True.') + else: + self.logger.error(msg) + raise OSError(msg) from exc + + try: + dhdl = self._extract_dHdl(file, T=self.T) + self.logger.info( + f'Reading {len(dhdl)} lines of dhdl from {file}') + dHdl_list.append(dhdl) + except Exception as exc: + msg = f'Error reading dHdl from {file}.' + if self.ignore_warnings: + self.logger.exception(msg + f'\n{exc}\n' + + 'This exception is being ignored because ignore_warnings=True.') + else: + self.logger.error(msg) + raise OSError(msg) from exc + + # Sort the files according to the state + self.logger.info('Sort files according to the u_nk.') + column_names = u_nk_list[0].columns.values.tolist() + index_list = sorted(range(len(self.file_list)), + key=lambda x:column_names.index( + u_nk_list[x].reset_index('time').index.values[0])) + + self.file_list = [self.file_list[i] for i in index_list] + self.logger.info("Sorted file list: \n %s", '\n'.join(self.file_list)) + self.u_nk_list = [u_nk_list[i] for i in index_list] + self.dHdl_list = [dHdl_list[i] for i in index_list] + self.u_nk_sample_list = None + self.dHdl_sample_list = None + + def run(self, skiptime=0, uncorr='dhdl', threshold=50, + estimators=('MBAR', 'BAR', 'TI'), overlap='O_MBAR.pdf', + breakdown=True, forwrev=None, *args, **kwargs): + ''' The method for running the automatic analysis. + + Parameters + ---------- + skiptime : float + Discard data prior to this specified time as 'equilibration' data. + Units are specified by the corresponding MD Engine. Default: 0. + uncorr : str + The observable to be used for the autocorrelation analysis; 'dhdl' + (obtained as a sum over those energy components that are changing). + Specify as `None` will not uncorrelate the data. Default: 'dhdl'. + threshold : int + Proceed with correlated samples if the number of uncorrelated samples is + found to be less than this number. If 0 is given, the time series + analysis will not be performed at all. Default: 50. + estimators : str or list of str + A list of the estimators to estimate the free energy with. Default: + `('MBAR', 'BAR', 'TI')`. + overlap : str + The filename for the plot of overlap matrix. Default: 'O_MBAR.pdf'. + breakdown : bool + Plot the free energy differences evaluated for each pair of adjacent + states for all methods, including the dH/dlambda curve for TI. Default: + ``True``. + forwrev : int + Plot the free energy change as a function of time in both directions, + with the specified number of points in the time plot. The number of time + points (an integer) must be provided. Specify as ``None`` will not do + the convergence analysis. Default: None. By default, 'MBAR' + estimator will be used for convergence analysis, as it is + usually the fastest converging method. If the dataset does not + contain u_nk, please run + meth:`~alchemlyb.workflows.ABFE.check_convergence` manually + with estimator='TI'. + + Attributes + ---------- + summary : Dataframe + The summary of the free energy estimate. + convergence : DataFrame + The summary of the convergence results. See + :func:`~alchemlyb.convergence.forward_backward_convergence` for + further explanation. + ''' + self.read() + + if uncorr is not None: + self.preprocess(skiptime=skiptime, uncorr=uncorr, + threshold=threshold) + if estimators is not None: + self.estimate(estimators) + self.generate_result() + + if overlap is not None: + ax = self.plot_overlap_matrix(overlap) + plt.close(ax.figure) + + if breakdown: + ax = self.plot_ti_dhdl() + plt.close(ax.figure) + fig = self.plot_dF_state() + plt.close(fig) + fig = self.plot_dF_state(dF_state='dF_state_long.pdf', + orientation='landscape') + plt.close(fig) + + if forwrev is not None: + ax = self.check_convergence(forwrev, estimator='MBAR', + dF_t='dF_t.pdf') + plt.close(ax.figure) + + + def update_units(self, units=None): + '''Update the unit. + + Parameters + ---------- + units : {'kcal/mol', 'kJ/mol', 'kT'} + The unit used for printing and plotting results. + + ''' + if units is not None: + self.logger.info(f'Set unit to {units}.') + self.units = units or None + + def preprocess(self, skiptime=0, uncorr='dhdl', threshold=50): + '''Preprocess the data by removing the equilibration time and + decorrelate the date. + + Parameters + ---------- + skiptime : float + Discard data prior to this specified time as 'equilibration' data. + Units are specified by the corresponding MD Engine. Default: 0. + uncorr : str + The observable to be used for the autocorrelation analysis; 'dhdl' + (obtained as a sum over those energy components that are changing). + Default: 'dhdl' + threshold : int + Proceed with correlated samples if the number of uncorrelated + samples is found to be less than this number. If 0 is given, the + time series analysis will not be performed at all. Default: 50. + + Attributes + ---------- + u_nk_sample_list : list + The list of u_nk after decorrelation. + dHdl_sample_list : list + The list of dHdl after decorrelation. + ''' + self.logger.info(f'Start preprocessing with skiptime of {skiptime} ' + f'uncorrelation method of {uncorr} and threshold of ' + f'{threshold}') + if len(self.u_nk_list) > 0: + self.logger.info( + f'Processing the u_nk data set with skiptime of {skiptime}.') + + self.u_nk_sample_list = [] + for index, u_nk in enumerate(self.u_nk_list): + # Find the starting frame + + u_nk = u_nk[u_nk.index.get_level_values('time') >= skiptime] + subsample = decorrelate_u_nk(u_nk, uncorr) + + if len(subsample) < threshold: + self.logger.warning(f'Number of u_nk {len(subsample)} ' + f'for state {index} is less than the ' + f'threshold {threshold}.') + self.logger.info(f'Take all the u_nk for state {index}.') + self.u_nk_sample_list.append(u_nk) + else: + self.logger.info(f'Take {len(subsample)} uncorrelated ' + f'u_nk for state {index}.') + self.u_nk_sample_list.append(subsample) + else: + self.logger.info('No u_nk data being subsampled') + + if len(self.dHdl_list) > 0: + self.dHdl_sample_list = [] + for index, dHdl in enumerate(self.dHdl_list): + dHdl = dHdl[dHdl.index.get_level_values('time') >= skiptime] + subsample = decorrelate_dhdl(dHdl) + if len(subsample) < threshold: + self.logger.warning(f'Number of dHdl {len(subsample)} for ' + f'state {index} is less than the ' + f'threshold {threshold}.') + self.logger.info(f'Take all the dHdl for state {index}.') + self.dHdl_sample_list.append(dHdl) + else: + self.logger.info(f'Take {len(subsample)} uncorrelated ' + f'dHdl for state {index}.') + self.dHdl_sample_list.append(subsample) + else: + self.logger.info('No dHdl data being subsampled') + + def estimate(self, estimators=('MBAR', 'BAR', 'TI'), **kwargs): + '''Estimate the free energy using the selected estimator. + + Parameters + ---------- + estimators : str or list of str + A list of the estimators to estimate the free energy with. Default: + ['TI', 'BAR', 'MBAR']. + + kwargs : dict + Keyword arguments to be passed to the estimator. + + Attributes + ---------- + estimator : dict + The dictionary of estimators. The keys are in ['TI', 'BAR', + 'MBAR']. + + Note + ----- + :class:`~alchemlyb.estimators.AutoMBAR` is used when + ``estimators='MBAR'``, supply ``method`` keyword to restore the + behavior of :class:`~alchemlyb.estimators.MBAR`. + (:code:`estimate(estimators='MBAR', method='adaptive')`) + + ''' + # Make estimators into a tuple + if isinstance(estimators, str): + estimators = (estimators, ) + + for estimator in estimators: + if estimator not in (FEP_ESTIMATORS + TI_ESTIMATORS): + msg = f"Estimator {estimator} is not available in {FEP_ESTIMATORS + TI_ESTIMATORS}." + self.logger.error(msg) + raise ValueError(msg) + + self.logger.info( + f"Start running estimator: {','.join(estimators)}.") + self.estimator = {} + # Use unprocessed data if preprocess is not performed. + if 'TI' in estimators: + if self.dHdl_sample_list is not None: + dHdl = concat(self.dHdl_sample_list) + else: + dHdl = concat(self.dHdl_list) + self.logger.warning('dHdl has not been preprocessed.') + self.logger.info( + f'A total {len(dHdl)} lines of dHdl is used.') + + if 'BAR' in estimators or 'MBAR' in estimators: + if self.u_nk_sample_list is not None: + u_nk = concat(self.u_nk_sample_list) + else: + u_nk = concat(self.u_nk_list) + self.logger.warning('u_nk has not been preprocessed.') + self.logger.info( + f'A total {len(u_nk)} lines of u_nk is used.') + + for estimator in estimators: + if estimator == 'MBAR': + self.logger.info('Run MBAR estimator.') + self.estimator[estimator] = MBAR(**kwargs).fit(u_nk) + elif estimator == 'BAR': + self.logger.info('Run BAR estimator.') + self.estimator[estimator] = BAR(**kwargs).fit(u_nk) + elif estimator == 'TI': + self.logger.info('Run TI estimator.') + self.estimator[estimator] = TI(**kwargs).fit(dHdl) + + + def generate_result(self): + '''Summarise the result into a dataframe. + + Returns + ------- + DataFrame + The DataFrame with convergence data. :: + + MBAR MBAR_Error BAR BAR_Error TI TI_Error + States 0 -- 1 0.065967 0.001293 0.066544 0.001661 0.066663 0.001675 + 1 -- 2 0.089774 0.001398 0.089303 0.002101 0.089566 0.002144 + 2 -- 3 0.132036 0.001638 0.132687 0.002990 0.133292 0.003055 + 3 -- 4 0.116494 0.001213 0.116348 0.002691 0.116845 0.002750 + 4 -- 5 0.105251 0.000980 0.106344 0.002337 0.106603 0.002362 + 5 -- 6 0.349320 0.002781 0.343399 0.006839 0.350568 0.007393 + 6 -- 7 0.402346 0.002767 0.391368 0.006641 0.395754 0.006961 + 7 -- 8 0.322284 0.002058 0.319395 0.005333 0.321542 0.005434 + 8 -- 9 0.434999 0.002683 0.425680 0.006823 0.430251 0.007155 + 9 -- 10 0.355672 0.002219 0.350564 0.005472 0.352745 0.005591 + 10 -- 11 3.574227 0.008744 3.513595 0.018711 3.514790 0.018078 + 11 -- 12 2.896685 0.009905 2.821760 0.017844 2.823210 0.018088 + 12 -- 13 2.223769 0.011229 2.188885 0.018438 2.189784 0.018478 + 13 -- 14 1.520978 0.012526 1.493598 0.019155 1.490070 0.019288 + 14 -- 15 0.911279 0.009527 0.894878 0.015023 0.896010 0.015140 + 15 -- 16 0.892365 0.010558 0.886706 0.015260 0.884698 0.015392 + 16 -- 17 1.737971 0.025315 1.720643 0.031416 1.741028 0.030624 + 17 -- 18 1.790706 0.025560 1.788112 0.029435 1.801695 0.029244 + 18 -- 19 1.998635 0.023340 2.007404 0.027447 2.019213 0.027096 + 19 -- 20 2.263475 0.020286 2.265322 0.025023 2.282040 0.024566 + 20 -- 21 2.565680 0.016695 2.561324 0.023611 2.552977 0.023753 + 21 -- 22 1.384094 0.007553 1.385837 0.011672 1.381999 0.011991 + 22 -- 23 1.428567 0.007504 1.422689 0.012524 1.416010 0.013012 + 23 -- 24 1.440581 0.008059 1.412517 0.013125 1.408267 0.013539 + 24 -- 25 1.411329 0.009022 1.419167 0.013356 1.411446 0.013795 + 25 -- 26 1.340320 0.010167 1.360679 0.015213 1.356953 0.015260 + 26 -- 27 1.243745 0.011239 1.245873 0.015711 1.248959 0.015762 + 27 -- 28 1.128429 0.012859 1.124554 0.016999 1.121892 0.016962 + 28 -- 29 1.010313 0.016442 1.005444 0.017692 1.019747 0.017257 + Stages coul 10.215658 0.033903 10.017838 0.041839 10.017854 0.048744 + vdw 22.547489 0.098699 22.501150 0.060092 22.542936 0.106723 + bonded 2.374144 0.014995 2.341631 0.005507 2.363828 0.021078 + TOTAL 35.137291 0.103580 34.860619 0.087022 34.924618 0.119206 + + Attributes + ---------- + summary : Dataframe + The summary of the free energy estimate. + ''' + + # Write estimate + self.logger.info('Summarise the estimate into a dataframe.') + # Make the header name + self.logger.info('Generate the row names.') + estimator_names = list(self.estimator.keys()) + num_states = len(self.estimator[estimator_names[0]].states_) + data_dict = {'name': [], + 'state': []} + for i in range(num_states - 1): + data_dict['name'].append(str(i) + ' -- ' + str(i+1)) + data_dict['state'].append('States') + + try: + u_nk = self.u_nk_list[0] + stages = u_nk.reset_index('time').index.names + self.logger.info('use the stage name from u_nk') + except: + dHdl = self.dHdl_list[0] + stages = dHdl.reset_index('time').index.names + self.logger.info('use the stage name from dHdl') + + for stage in stages: + data_dict['name'].append(stage.split('-')[0]) + data_dict['state'].append('Stages') + data_dict['name'].append('TOTAL') + data_dict['state'].append('Stages') + + col_names = [] + for estimator_name, estimator in self.estimator.items(): + self.logger.info(f'Read the results from estimator {estimator_name}') + + # Do the unit conversion + delta_f_ = estimator.delta_f_ + d_delta_f_ = estimator.d_delta_f_ + # Write the estimator header + + col_names.append(estimator_name) + col_names.append(estimator_name + '_Error') + data_dict[estimator_name] = [] + data_dict[estimator_name + '_Error'] = [] + for index in range(1, num_states): + data_dict[estimator_name].append( + delta_f_.iloc[index-1, index]) + data_dict[estimator_name + '_Error'].append( + d_delta_f_.iloc[index - 1, index]) + + self.logger.info(f'Generate the staged result from estimator {estimator_name}') + for index, stage in enumerate(stages): + if len(stages) == 1: + start = 0 + end = len(estimator.states_) - 1 + else: + # Get the start and the end of the state + lambda_min = min( + [state[index] for state in estimator.states_]) + lambda_max = max( + [state[index] for state in estimator.states_]) + if lambda_min == lambda_max: + # Deal with the case where a certain lambda is used but + # not perturbed + start = 0 + end = 0 + else: + states = [state[index] for state in estimator.states_] + start = list(reversed(states)).index(lambda_min) + start = num_states - start - 1 + end = states.index(lambda_max) + self.logger.info( + f'Stage {stage} is from state {start} to state {end}.') + # This assumes that the indexes are sorted as the + # preprocessing should sort the index of the df. + result = delta_f_.iloc[start, end] + if estimator_name != 'BAR': + error = d_delta_f_.iloc[start, end] + else: + error = np.sqrt(sum( + [d_delta_f_.iloc[start, start+1]**2 + for i in range(start, end + 1)])) + data_dict[estimator_name].append(result) + data_dict[estimator_name + '_Error'].append(error) + + # Total result + # This assumes that the indexes are sorted as the + # preprocessing should sort the index of the df. + result = delta_f_.iloc[0, -1] + if estimator_name != 'BAR': + error = d_delta_f_.iloc[0, -1] + else: + error = np.sqrt(sum( + [d_delta_f_.iloc[i, i + 1] ** 2 + for i in range(num_states - 1)])) + data_dict[estimator_name].append(result) + data_dict[estimator_name + '_Error'].append(error) + summary = pd.DataFrame.from_dict(data_dict) + + summary = summary.set_index(['state', 'name']) + # Make sure that the columns are in the right order + summary = summary[col_names] + # Remove the name of the index column to make it prettier + summary.index.names = [None, None] + + summary.attrs = estimator.delta_f_.attrs + converter = get_unit_converter(self.units) + summary = converter(summary) + self.summary = summary + self.logger.info(f'Write results:\n{summary.to_string()}') + return summary + + def plot_overlap_matrix(self, overlap='O_MBAR.pdf', ax=None): + '''Plot the overlap matrix for MBAR estimator using + :func:`~alchemlyb.visualisation.plot_mbar_overlap_matrix`. + + Parameters + ---------- + overlap : str + The filename for the plot of overlap matrix. Default: 'O_MBAR.pdf' + ax : matplotlib.axes.Axes + Matplotlib axes object where the plot will be drawn on. If + ``ax=None``, a new axes will be generated. + + Returns + ------- + matplotlib.axes.Axes + An axes with the overlap matrix drawn. + ''' + self.logger.info('Plot overlap matrix.') + if 'MBAR' in self.estimator: + ax = plot_mbar_overlap_matrix(self.estimator['MBAR'].overlap_matrix, + ax=ax) + ax.figure.savefig(join(self.out, overlap)) + self.logger.info(f'Plot overlap matrix to {self.out} under {overlap}.') + return ax + else: + self.logger.warning('MBAR estimator not found. ' + 'Overlap matrix not plotted.') + + def plot_ti_dhdl(self, dhdl_TI='dhdl_TI.pdf', labels=None, colors=None, + ax=None): + '''Plot the dHdl for TI estimator using + :func:`~alchemlyb.visualisation.plot_ti_dhdl`. + + Parameters + ---------- + dhdl_TI : str + The filename for the plot of TI dHdl. Default: 'dhdl_TI.pdf' + labels : List + list of labels for labelling all the alchemical transformations. + colors : List + list of colors for plotting all the alchemical transformations. + Default: ['r', 'g', '#7F38EC', '#9F000F', 'b', 'y'] + ax : matplotlib.axes.Axes + Matplotlib axes object where the plot will be drawn on. If ``ax=None``, + a new axes will be generated. + + Returns + ------- + matplotlib.axes.Axes + An axes with the TI dhdl drawn. + ''' + self.logger.info('Plot TI dHdl.') + if 'TI' in self.estimator: + ax = plot_ti_dhdl(self.estimator['TI'], units=self.units, + labels=labels, colors=colors, ax=ax) + ax.figure.savefig(join(self.out, dhdl_TI)) + self.logger.info(f'Plot TI dHdl to {dhdl_TI} under {self.out}.') + return ax + else: + raise ValueError('No TI data available in estimators.') + + def plot_dF_state(self, dF_state='dF_state.pdf', labels=None, colors=None, + orientation='portrait', nb=10): + '''Plot the dF states using + :func:`~alchemlyb.visualisation.plot_dF_state`. + + Parameters + ---------- + dF_state : str + The filename for the plot of dF states. Default: 'dF_state.pdf' + labels : List + list of labels for labelling different estimators. + colors : List + list of colors for plotting different estimators. + orientation : string + The orientation of the figure. Can be `portrait` or `landscape` + nb : int + Maximum number of dF states in one row in the `portrait` mode + + Returns + ------- + matplotlib.figure.Figure + An Figure with the dF states drawn. + ''' + self.logger.info('Plot dF states.') + fig = plot_dF_state(self.estimator.values(), labels=labels, colors=colors, + units=self.units, + orientation=orientation, nb=nb) + fig.savefig(join(self.out, dF_state)) + self.logger.info(f'Plot dF state to {dF_state} under {self.out}.') + return fig + + def check_convergence(self, forwrev, estimator='MBAR', dF_t='dF_t.pdf', + ax=None, **kwargs): + '''Compute the forward and backward convergence using + :func:`~alchemlyb.convergence.forward_backward_convergence`and + plot with + :func:`~alchemlyb.visualisation.plot_convergence`. + + Parameters + ---------- + forwrev : int + Plot the free energy change as a function of time in both + directions, with the specified number of points in the time plot. + The number of time points (an integer) must be provided. + estimator : {'TI', 'BAR', 'MBAR'} + The estimator used for convergence analysis. Default: 'MBAR' + dF_t : str + The filename for the plot of convergence. Default: 'dF_t.pdf' + ax : matplotlib.axes.Axes + Matplotlib axes object where the plot will be drawn on. If ``ax=None``, + a new axes will be generated. + kwargs : dict + Keyword arguments to be passed to the estimator. + + Attributes + ---------- + convergence : DataFrame + + Returns + ------- + matplotlib.axes.Axes + An axes with the convergence drawn. + + Note + ----- + :class:`~alchemlyb.estimators.AutoMBAR` is used when + ``estimator='MBAR'``, supply ``method`` keyword to restore the behavior + of :class:`~alchemlyb.estimators.MBAR`. + (:code:`check_convergence(10, estimator='MBAR', method='adaptive')`) + + ''' + self.logger.info('Start convergence analysis.') + self.logger.info('Checking data availability.') + + if estimator in FEP_ESTIMATORS: + if self.u_nk_sample_list is not None: + u_nk_list = self.u_nk_sample_list + self.logger.info('Subsampled u_nk is available.') + else: + if self.u_nk_list is not None: + u_nk_list = self.u_nk_list + self.logger.info('Subsampled u_nk not available, ' + 'use original data instead.') + else: + msg = f"u_nk is needed for the f{estimator} estimator. " \ + f"If the dataset only has dHdl, " \ + f"run ABFE.check_convergence(estimator='TI') to " \ + f"use a TI estimator." + self.logger.error(msg) + raise ValueError(msg) + convergence = forward_backward_convergence(u_nk_list, + estimator=estimator, + num=forwrev, **kwargs) + elif estimator in TI_ESTIMATORS: + self.logger.warning('No valid FEP estimator or dataset found. ' + 'Fallback to TI.') + if self.dHdl_sample_list is not None: + dHdl_list = self.dHdl_sample_list + self.logger.info('Subsampled dHdl is available.') + else: + if self.dHdl_list is not None: + dHdl_list = self.dHdl_list + self.logger.info('Subsampled dHdl not available, ' + 'use original data instead.') + else: + self.logger.error( + f'dHdl is needed for the f{estimator} estimator.') + raise ValueError( + f'dHdl is needed for the f{estimator} estimator.') + convergence = forward_backward_convergence(dHdl_list, + estimator=estimator, + num=forwrev, **kwargs) + else: + msg = f"Estimator {estimator} is not supported. Choose one from " \ + f"{FEP_ESTIMATORS+TI_ESTIMATORS}." + self.logger.error(msg) + raise ValueError(msg) + + self.convergence = get_unit_converter(self.units)(convergence) + + self.logger.info(f'Plot convergence analysis to {dF_t} under {self.out}.') + + ax = plot_convergence(self.convergence, + units=self.units, ax=ax) + ax.figure.savefig(join(self.out, dF_t)) + return ax