Source code for a3fe.run.stage

"""Functions for running free energy calculations with SOMD with automated
equilibration detection based on an ensemble of simulations."""

__all__ = ["Stage"]

import logging as _logging
import os as _os
import pathlib as _pathlib
import threading as _threading
from copy import deepcopy as _deepcopy
from math import ceil as _ceil
from multiprocessing import get_context as _get_context
from time import sleep as _sleep
from typing import Any as _Any
from typing import Callable as _Callable
from typing import Dict as _Dict
from typing import List as _List
from typing import Optional as _Optional
from typing import Tuple as _Tuple
from typing import Union as _Union

import matplotlib.pyplot as _plt
import numpy as _np
import pandas as _pd
import scipy.stats as _stats

from ..analyse.detect_equil import (
    check_equil_multiwindow_gelman_rubin as _check_equil_multiwindow_gelman_rubin,
)
from ..analyse.detect_equil import (
    check_equil_multiwindow_paired_t as _check_equil_multiwindow_paired_t,
)
from ..analyse.detect_equil import (
    dummy_check_equil_multiwindow as _dummy_check_equil_multiwindow,
)
from ..analyse.mbar import collect_mbar_slurm as _collect_mbar_slurm
from ..analyse.mbar import run_mbar as _run_mbar
from ..analyse.mbar import submit_mbar_slurm as _submit_mbar_slurm
from ..analyse.plot import plot_convergence as _plot_convergence
from ..analyse.plot import plot_equilibration_time as _plot_equilibration_time
from ..analyse.plot import plot_gradient_hists as _plot_gradient_hists
from ..analyse.plot import plot_gradient_stats as _plot_gradient_stats
from ..analyse.plot import plot_gradient_timeseries as _plot_gradient_timeseries
from ..analyse.plot import (
    plot_mbar_gradient_convergence as _plot_mbar_gradient_convergence,
)
from ..analyse.plot import plot_mbar_pmf as _plot_mbar_pmf
from ..analyse.plot import plot_overlap_mats as _plot_overlap_mats
from ..analyse.plot import plot_rmsds as _plot_rmsds
from ..analyse.plot import plot_sq_sem_convergence as _plot_sq_sem_convergence
from ..analyse.process_grads import GradientData as _GradientData
from ..configuration import EngineType as _EngineType
from ..configuration import SlurmConfig as _SlurmConfig
from ..configuration import StageType as _StageType
from ..configuration import _EngineConfig
from ._simulation_runner import SimulationRunner as _SimulationRunner
from ._virtual_queue import VirtualQueue as _VirtualQueue
from .lambda_window import LamWindow as _LamWindow


[docs]class Stage(_SimulationRunner): """ Class to hold and manipulate an ensemble of SOMD simulations for a single stage of a calculation. """ # Files to be cleaned by self.clean() run_files = _SimulationRunner.run_files + [ "check_equil_multiwindow*.txt", "freenrg-MBAR*.dat", "*.out", ] runtime_attributes = _deepcopy(_SimulationRunner.runtime_attributes) runtime_attributes["_maximally_efficient"] = False # A thread-safe counter for e.g. naming unique files. _counter = _get_context("spawn").Value("i", 0)
[docs] def __init__( self, stage_type: _StageType, equil_detection: str = "multiwindow", runtime_constant: _Optional[float] = 0.0005, relative_simulation_cost: float = 1, ensemble_size: int = 5, base_dir: _Optional[str] = None, input_dir: _Optional[str] = None, output_dir: _Optional[str] = None, stream_log_level: int = _logging.INFO, slurm_config: _Optional[_SlurmConfig] = None, analysis_slurm_config: _Optional[_SlurmConfig] = None, engine_config: _Optional[_EngineConfig] = None, engine_type: _EngineType = _EngineType.SOMD, update_paths: bool = True, ) -> None: """ Initialise an ensemble of SOMD simulations, constituting the Stage. If Stage.pkl exists in the output directory, the Stage will be loaded from this file and any arguments supplied will be overwritten. Parameters ---------- stage_type : StageType The type of stage. equil_detection : str, Optional, default: "multiwindow" Method to use for equilibration detection. Options are: - "multiwindow": Use the multiwindow paired t-test method to detect equilibration. - "chodera": Use Chodera's method to detect equilibration. runtime_constant : float, Optional, default: 0.0005 The runtime_constant (kcal**2 mol**-2 ns*-1) only affects behaviour if running adaptively, and must be supplied if running adaptively. This is used to calculate how long to run each simulation for based on the current uncertainty of the per-window free energy estimate, as discussed in the docstring of the run() method. relative_simlation_cost : float, Optional, default: 1 The relative cost of the simulation for a given runtime. This is used to calculate the predicted optimal runtime during adaptive simulations. The recommended use is to set this to 1 for the bound leg and to (speed of bound leg / speed of free leg) for the free leg. ensemble_size : int, Optional, default: 5 Number of simulations to run in the ensemble. base_dir : str, Optional, default: None Path to the base directory. If None, this is set to the current working directory. input_dir : str, Optional, default: None Path to directory containing input files for the simulations. If None, this will be set to "current_working_directory/input". output_dir : str, Optional, default: None Path to directory to store output files from the simulations. If None, this will be set to "current_working_directory/output". stream_log_level : int, Optional, default: logging.INFO Logging level to use for the steam file handlers for the Ensemble object and its child objects. slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler. If None, the default partition is used. analysis_slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler for the analysis. This is helpful e.g. if you want to submit analysis to the CPU partition, but the main simulation to the GPU partition. If None, engine_config: EngineConfig, default: None Configuration for the engine. If None, the default configuration is used. engine_type: EngineType, default: EngineType.SOMD The type of engine to use for the production simulations. update_paths: bool, Optional, default: True If True, if the simulation runner is loaded by unpickling, then update_paths() is called. Returns ------- None """ # Set the stage type first, as this is required for __str__, # and therefore the super().__init__ call self.stage_type = stage_type super().__init__( base_dir=base_dir, input_dir=input_dir, output_dir=output_dir, stream_log_level=stream_log_level, slurm_config=slurm_config, analysis_slurm_config=analysis_slurm_config, engine_config=engine_config, engine_type=engine_type, ensemble_size=ensemble_size, update_paths=update_paths, dump=False, ) if not self.loaded_from_pickle: self.equil_detection = equil_detection self.runtime_constant = runtime_constant self.relative_simulation_cost = relative_simulation_cost self._maximally_efficient = False # Set to True if the stage has been run so as to reach max efficieny self._running: bool = False self.run_thread: _Optional[_threading.Thread] = None # Set boolean to allow us to kill the thread self.kill_thread: bool = False self.running_wins: _List[_LamWindow] = [] self.virtual_queue = _VirtualQueue( log_dir=self.base_dir, stream_log_level=self.stream_log_level ) # Creating lambda window objects sets up required input directories lam_val_weights = self.lam_val_weights for i, lam_val in enumerate(self.lam_vals): lam_base_dir = _os.path.join(self.output_dir, f"lambda_{lam_val:.3f}") self.lam_windows.append( _LamWindow( lam=lam_val, lam_val_weight=lam_val_weights[i], virtual_queue=self.virtual_queue, equil_detection=self.equil_detection, runtime_constant=self.runtime_constant, relative_simulation_cost=self.relative_simulation_cost, ensemble_size=self.ensemble_size, base_dir=lam_base_dir, input_dir=self.input_dir, stream_log_level=self.stream_log_level, slurm_config=self.slurm_config, analysis_slurm_config=self.analysis_slurm_config, engine_config=self.engine_config.copy(), engine_type=self.engine_type, ) ) # Point self._sub_sim_runners at the lambda windows self._sub_sim_runners = self.lam_windows # Save the state and update log self._update_log() self._dump()
def __str__(self) -> str: return f"Stage (type = {self.stage_type.name.lower()})" @property def lam_vals(self) -> _List[float]: return self.engine_config.lambda_values @lam_vals.setter def lam_vals(self, value) -> None: self._logger.info("Modifying/ creating lambda values") self.engine_config.lambda_values = value @property def lam_windows(self) -> _List[_LamWindow]: return self._sub_sim_runners @lam_windows.setter def lam_windows(self, value) -> None: self._logger.info("Modifying/ creating lambda windows") self._sub_sim_runners = value @property def lam_val_weights(self) -> _List[float]: """Return the weights for each lambda window. These are calculated according to how each windows contributes to the overall free energy estimate, as given by TI and the trapezoidal rule.""" lam_val_weights = [] for i, lam_val in enumerate(self.lam_vals): if i == 0: lam_val_weights.append(0.5 * (self.lam_vals[i + 1] - lam_val)) elif i == len(self.lam_vals) - 1: lam_val_weights.append(0.5 * (lam_val - self.lam_vals[i - 1])) else: lam_val_weights.append( 0.5 * (self.lam_vals[i + 1] - self.lam_vals[i - 1]) ) return lam_val_weights
[docs] def run( self, run_nos: _Optional[_List[int]] = None, adaptive: bool = True, runtime: _Optional[float] = None, runtime_constant: _Optional[float] = None, ) -> None: """Run the ensemble of simulations constituting the stage (optionally with adaptive equilibration detection), and, if using adaptive equilibration detection, perform analysis once finished. If running adaptively, cycles of short runs then optimal runtime estimation are performed, where the optimal runtime is estimated according to .. math:: t_{\\mathrm{Optimal, k}} = \\sqrt{\\frac{t_{\\mathrm{Current}, k}}{C}}\\sigma_{\\mathrm{Current}}(\\Delta \\widehat{F}_k) where: - :math:`t_{\\mathrm{Optimal, k}}` is the calculated optimal runtime for lambda window :math:`k` - :math:`t_{\\mathrm{Current}, k}` is the current runtime for lambda window :math:`k` - :math:`C` is the runtime constant - :math:`\\sigma_{\\mathrm{Current}}(\\Delta \\widehat{F}_k)` is the current uncertainty in the free energy change contribution for lambda window :math:`k`. This is estimated from inter-run deviations. - :math:`\\Delta \\widehat{F}_k` is the free energy change contribution for lambda window :math:`k` Parameters ---------- adaptive : bool, Optional, default: True If True, the stage will run until the simulations are equilibrated and perform analysis afterwards. If False, the stage will run for the specified runtime and analysis will not be performed. runtime : float, Optional, default: None If adaptive is False, runtime must be supplied and stage will run for this number of nanoseconds. runtime_constant: float, Optional, default: None The runtime_constant (kcal**2 mol**-2 ns*-1) only affects behaviour if running adaptively. This is used to calculate how long to run each simulation for based on the current uncertainty of the per-window free energy estimate. Returns ------- None """ run_nos = self._get_valid_run_nos(run_nos) if not adaptive and runtime is None: raise ValueError( "If adaptive equilibration detection is disabled, a runtime must be supplied." ) # Check adaptive and runtime settings if adaptive and runtime is not None: raise ValueError( "If adaptive equilibration detection is enabled, a runtime cannot be supplied." ) if not adaptive and runtime_constant: raise ValueError( "Runtime constant can only be supplied if running adaptively." ) if runtime_constant: self.recursively_set_attr("runtime_constant", runtime_constant, silent=True) # Run in the background with threading so that user can continuously check # the status of the Stage object self.run_thread = _threading.Thread( target=self._run_without_threading, args=(run_nos, adaptive, runtime), name=str(self), ) self.run_thread.start()
[docs] def kill(self) -> None: """Kill all running simulations.""" # Stop the run loop if self.running: self._logger.info("Killing all lambda windows") self.kill_thread = True for win in self.lam_windows: win.kill()
def _run_without_threading( self, run_nos: _List[int], adaptive: bool = True, runtime: _Optional[float] = None, max_runtime: float = 60, ) -> None: """Run the ensemble of simulations constituting the stage (optionally with adaptive equilibration detection), and, if using adaptive equilibration detection, perform analysis once finished. This function is called by run() with threading, so that the function can be run in the background and the user can continuously check the status of the Stage object. Parameters ---------- run_nos : List[int] The run numbers to run. adaptive : bool, Optional, default: True If True, the stage will run until the simulations are equilibrated and perform analysis afterwards. If False, the stage will run for the specified runtime and analysis will not be performed. runtime : float, Optional, default: None If adaptive is False, runtime must be supplied and stage will run for this number of nanoseconds. max_runtime : float, Optional, default: 30 The maximum runtime for a single simulation during an adaptive simulation, in ns. Only used when adaptive == True. Returns ------- None """ try: # Reset self.kill_thread so we can restart after killing self.kill_thread = False if not adaptive and runtime is None: raise ValueError( "If adaptive equilibration detection is disabled, a runtime must be supplied." ) if adaptive and runtime is not None: raise ValueError( "If adaptive equilibration detection is enabled, a runtime cannot be supplied." ) if not adaptive: self._logger.info( f"Starting {self}. Adaptive equilibration = {adaptive}..." ) elif adaptive: self._logger.info( f"Starting {self}. Adaptive equilibration = {adaptive}..." ) if runtime is None: runtime = 0.2 # ns # Run initial SOMD simulations for win in self.lam_windows: win.run(run_nos=run_nos, runtime=runtime) # type: ignore win._update_log() self._dump() # Periodically check the simulations and analyse/ resubmit as necessary # Copy to ensure that we don't modify self.lam_windows when updating self.running_wins self.running_wins = self.lam_windows.copy() self._dump() # Run the appropriate run loop if adaptive: # Allocate simulation time to achieve maximum efficiency self._run_loop_adaptive_efficiency( run_nos=run_nos, max_runtime=max_runtime ) # Check that equilibration has been achieved and resubmit if required self._run_loop_adaptive_equilibration_multiwindow( run_nos=run_nos, max_runtime=max_runtime ) else: self._run_loop_non_adaptive() # All simulations are now finished, so perform final analysis self._logger.info(f"All simulations in {self} have finished.") except Exception as e: self._logger.exception("") raise e def _run_loop_non_adaptive(self, cycle_pause: int = 60) -> None: """The run loop for non-adaptive runs. Simply wait until all lambda windows have finished running. Parameters ---------- cycle_pause : int, Optional, default: 60 The number of seconds to wait between checking the status of the simulations. """ while self.running_wins: _sleep(cycle_pause) # Check every 60 seconds # Check if we've requested to kill the thread if self.kill_thread: self._logger.info("Kill thread requested: exiting run loop") return # Update the queue before checking the simulations self.virtual_queue.update() # Check if everything has finished for win in self.running_wins: # Check if the window has now finished - calling win.running updates the win._running attribute if not win.running: self._logger.info(f"{win} has finished at {win.tot_simtime:.3f} ns") self.running_wins.remove(win) # Write status after checking for running and equilibration, as the # _running and _equilibrated attributes have now been updated win._update_log() self._dump() def _run_loop_adaptive_efficiency( self, run_nos: _List[int], cycle_pause: int = 60, max_runtime: float = 30, # ns ) -> None: """ Run loop which allocates sampling time in order to achieve maximal estimation efficiency of the free energy difference. Parameters ---------- run_nos : List[int] The run numbers to run. cycle_pause : int, Optional, default: 60 The number of seconds to wait between checking the status of the simulations. max_runtime : float, Optional, default: 30 The maximum runtime for a single simulation during an adaptive simulation, in ns. """ while not self._maximally_efficient: self._logger.info( "Maximum efficiency for given runtime constant not achieved." " Allocating simulation time to achieve maximum efficiency..." ) # Firstly, wait til all window have finished while self.running_wins: _sleep(cycle_pause) # Check every 60 seconds # Check if we've requested to kill the thread if self.kill_thread: self._logger.info("Kill thread requested: exiting run loop") return # Update the queue before checking the simulations self.virtual_queue.update() for win in self.running_wins: if not win.running: self.running_wins.remove(win) # Write status after checking for running and equilibration, as the # _running and _equilibrated attributes have now been updated win._update_log() self._dump() # Now all windows have finished, check if we have reached the maximum # efficiency and resubmit if not # Get the gradient data and extract the per-lambda SEM of the free energy change gradient_data = _GradientData( lam_winds=self.lam_windows, equilibrated=False ) smooth_dg_sems = gradient_data.get_time_normalised_sems( origin="inter_delta_g", smoothen=True ) # For each window, calculate the predicted most efficient run time. See if we have reached this # and if not, resubmit for more simulation time for i, win in enumerate(self.lam_windows): # Note that normalised_sem_dg has already been multiplied by sqrt(tot_simtime) normalised_sem_dg = smooth_dg_sems[i] predicted_run_time_max_eff = ( 1 / _np.sqrt(self.runtime_constant * self.relative_simulation_cost) # type: ignore ) * normalised_sem_dg actual_run_time = win.get_tot_simtime(run_nos=run_nos) win._logger.info( f"Predicted maximum efficiency run time for is {predicted_run_time_max_eff:.3f} ns" ) win._logger.info(f"Actual run time is {actual_run_time} ns") # Make sure that we don't exceed the maximum per-simulation runtime if predicted_run_time_max_eff > max_runtime * win.ensemble_size: win._logger.info( f"Predicted maximum efficiency run time per window is " f"{predicted_run_time_max_eff / win.ensemble_size}, which exceeds the maximum runtime of " f"{max_runtime} ns. Running to the maximum runtime instead." ) predicted_run_time_max_eff = max_runtime * win.ensemble_size if actual_run_time < predicted_run_time_max_eff: resubmit_time = ( predicted_run_time_max_eff - actual_run_time ) / win.ensemble_size # Resubmit a maximum of the current total simulation time again. This avoids issues with # overestimating the best runtime intially and then resubmitting for too long if resubmit_time > actual_run_time / win.ensemble_size: resubmit_time = actual_run_time / win.ensemble_size resubmit_time = ( _ceil(resubmit_time * 10) / 10 ) # Round up to the nearest 0.1 ns if ( resubmit_time > 0 ): # We have not reached the maximum efficiency, so resubmit for the remaining time win._logger.info( f"Window has not reached maximum efficiency. Resubmitting for {resubmit_time:.3f} ns" ) win.run(run_nos=run_nos, runtime=resubmit_time) self.running_wins.append(win) else: # We have reached or exceeded the maximum efficiency runtime win._logger.info( f"Window has reached the most efficient run time at {actual_run_time}. " "No further simulation required" ) # If there are no running lambda windows, we must have reached the maximum efficiency if not self.running_wins: self._maximally_efficient = True self._logger.info( "Maximum efficiency for given runtime constant of " f"{self.runtime_constant} kcal**2 mol**-2 ns**-1 achieved" ) def _run_loop_adaptive_equilibration_multiwindow( self, run_nos: _List[int], cycle_pause: int = 60, max_runtime: float = 30, # seconds # ns check_equil_fn: _Callable = _check_equil_multiwindow_paired_t, check_equil_kwargs: _Dict[str, _Any] = {}, ) -> None: """ Run loop which detects equilibration using the check_equil_multiwindow_modified_geweke method. This checks if equilibration has been achieved over the whole stage, and if not, halves the runtime of the simulations and using the adaptive efficiency loop before re-checking for equilibration. Parameters ---------- run_nos : List[int] The run numbers to run. cycle_pause : int, Optional, default: 60 The number of seconds to wait between checking the status of the simulations. max_runtime : float, Optional, default: 30 The maximum runtime for a single simulation during an adaptive simulation, in ns. check_equil_fn : Callable, Optional, default: dummy_check_equil_multiwindow The function to use to check for equilibration. This should be a function which takes a list of lambda windows and returns a boolen to indicate if equilibration over all lambda windows has been achieved. This should also set the _equilibrated and _equil_time attributes of each lambda window. check_equil_kwargs : Dict[str, Any], Optional, default: {} Any keyword arguments to pass to the check_equil_fn function. """ # Check that all lambda windows have the correct equilibration detection method if any( [ window.check_equil != _dummy_check_equil_multiwindow for window in self.lam_windows ] ): raise ValueError( "Not all lambda windows have the correct equilibration detection method. " "This should be set to dummy_check_equil_multiwindow" ) if not self.runtime_constant: raise ValueError( "Cannot run adaptive equilibration multiwindow without a runtime constant" ) # Assumes that we have not already equilibrated while not self.is_equilibrated(run_nos=run_nos): if self.kill_thread: # Check if we've requested to kill the thread self._logger.info("Kill thread requested: exiting run loop") return # Check if we have reached equilibration self._wait_ignoring_thread() self._logger.info( "Checking for equilibration with the check_equil_multiwindow algorithm..." ) equilibrated, fractional_equil_time = check_equil_fn( self.lam_windows, output_dir=self.output_dir, run_nos=run_nos, **check_equil_kwargs, ) if equilibrated: total_simtime = self.get_tot_simtime(run_nos=run_nos) tot_equil_time = total_simtime * fractional_equil_time # type: ignore self._logger.info("Equilibration achieved") self._logger.info( f"Fractional equilibration time is {fractional_equil_time}" ) self._logger.info(f"Total equilibration time is {tot_equil_time}") self._logger.info(f"Total simulation time is {total_simtime}") self._logger.info(f"Runtime constant is {self.runtime_constant}") break else: self._logger.info( "Equilibration not achieved. Quartering runtime constant and running adaptive efficiency loop" ) # Quarter the simulation constant to double the predicted runtime, # and run the adaptive efficiency loop again self.runtime_constant /= 4 self._maximally_efficient = False # Make all windows running, as is required for the adaptive efficiency loop self.running_wins = self.lam_windows.copy() self._run_loop_adaptive_efficiency( run_nos=run_nos, cycle_pause=cycle_pause, max_runtime=max_runtime )
[docs] def get_optimal_lam_vals( self, er_type: str = "root_var", delta_er: _Optional[float] = None, n_lam_vals: _Optional[int] = None, run_nos: _List[int] = [1], ) -> _np.ndarray: """ Get the optimal lambda values for the stage, based on the integrated SEM, and create plots. Parameters ---------- er_type: str, optional, default="root_var" Whether to integrate the standard error of the mean ("sem") or root variance of the gradients ("root_var") to calculate the optimal lambda values. delta_er : float, optional, default=None If er_type == "root_var", the desired integrated root variance of the gradients between each lambda value, in kcal mol^(-1). If er_type == "sem", the desired integrated standard error of the mean of the gradients between each lambda value, in kcal mol^(-1) ns^(1/2). A sensible default for sem is 0.1 kcal mol-1 ns1/2, and for root_var is 2 kcal mol-1. If not provided, the number of lambda windows must be provided with n_lam_vals. This is referred to as 'thermodynamic speed' in the publication. n_lam_vals : int, optional, default=None The number of lambda values to sample. If not provided, delta_er must be provided. run_nos : List[int], optional, default=[1] The run numbers to use for the calculation. Only 1 is run by default, so by default we only analyse 1. If using er_type = "SEM", more than one run must be specified. Returns ------- optimal_lam_vals : np.ndarray List of optimal lambda values for the stage. """ # Check that we have more than one run if using delta_er == "sem" if er_type == "sem" and len(run_nos) == 1: raise ValueError( "If using er_type = 'sem', more than one run must be specified, as the " "SEM is calculated using between-run errors by default." ) self._logger.info( f"Calculating optimal lambda values with er_type = {er_type} and delta_er = {delta_er}..." ) unequilibrated_gradient_data = _GradientData( lam_winds=self.lam_windows, equilibrated=False, run_nos=run_nos ) for plot_type in [ "mean", "stat_ineff", "integrated_sem", "integrated_var", "pred_best_simtime", ]: _plot_gradient_stats( gradients_data=unequilibrated_gradient_data, output_dir=self.output_dir, plot_type=plot_type, ) _plot_gradient_hists( gradients_data=unequilibrated_gradient_data, output_dir=self.output_dir ) return unequilibrated_gradient_data.calculate_optimal_lam_vals( er_type=er_type, delta_er=delta_er, n_lam_vals=n_lam_vals )
[docs] def analyse( self, slurm: bool = False, run_nos: _Optional[_List[int]] = None, get_frnrg: bool = True, subsampling: bool = False, fraction: float = 1, plot_rmsds: bool = False, ) -> _Union[_Tuple[float, float], _Tuple[None, None]]: r"""Analyse the results of the ensemble of simulations. Requires that all lambda windows have equilibrated. Parameters ---------- slurm : bool, optional, default=False Whether to use slurm for the analysis. run_nos : List[int], Optional, default: None The run numbers to analyse. If None, all runs will be analysed. get_frnrg : bool, optional, default=True If True, the free energy will be calculated with MBAR, otherwise this will be skipped. subsampling: bool, optional, default=False If True, the free energy will be calculated by subsampling using the methods contained within pymbar. fraction: float, optional, default=1 The fraction of the data to use for analysis. For example, if fraction=0.5, only the first half of the data will be used for analysis. If fraction=1, all data will be used. Note that unequilibrated data is discarded from the beginning of simulations in all cases. plot_rmsds: bool, optional, default=False Whether to plot RMSDS. This is slow and so defaults to False. Returns ------- free_energies : np.ndarray or None The free energy changes for the stage for each of the ensemble size runs, in kcal mol-1. If get_frnrg is False, this is None. errors : np.ndarray or None The MBAR error estimates for the free energy changes for the stage for each of the ensemble size runs, in kcal mol-1. If get_frnrg is False, this is None. """ run_nos = self._get_valid_run_nos(run_nos) # Check that this is not still running if self.running: raise RuntimeError("Cannot perform analysis as the Stage is still running") # Check that none of the simulations have failed failed_sims_list = self.failed_simulations if failed_sims_list: self._logger.error( "Unable to perform analysis as several simulations did not complete successfully" ) self._logger.error("Please check the output in the following directories:") for failed_sim in failed_sims_list: self._logger.error(failed_sim.base_dir) raise RuntimeError( "Unable to perform analysis as several simulations did not complete successfully" ) # Check that all simulations have equilibrated for win in self.lam_windows: if not win.equilibrated: raise RuntimeError( "Not all lambda windows have equilibrated. Analysis cannot be performed. " "If you are running non-adaptively, please use the `set_equilibration_time` method " "to set the equilibration time manually." ) if win.equil_time is None: raise RuntimeError( "Despite equilibration being detected, no equilibration time was found." ) try: # Conduct analysis if get_frnrg: self._logger.info( f"Computing free energy changes using the MBAR for runs {run_nos}" ) # Remove unequilibrated data from the equilibrated output directory for win in self.lam_windows: win._write_equilibrated_simfiles() # Run MBAR and compute mean and 95 % C.I. of free energy if not slurm: free_energies, errors, mbar_outfiles, _ = _run_mbar( run_nos=run_nos, output_dir=self.output_dir, percentage_end=fraction * 100, percentage_start=0, subsampling=subsampling, equilibrated=True, ) else: jobs, mbar_outfiles, tmp_files = _submit_mbar_slurm( output_dir=self.output_dir, virtual_queue=self.virtual_queue, slurm_config=self.analysis_slurm_config, run_nos=run_nos, percentage_end=fraction * 100, percentage_start=0, subsampling=subsampling, equilibrated=True, ) free_energies, errors, *_ = _collect_mbar_slurm( output_dir=self.output_dir, run_nos=run_nos, jobs=jobs, mbar_out_files=mbar_outfiles, virtual_queue=self.virtual_queue, tmp_files=tmp_files, ) mean_free_energy = _np.mean(free_energies) # Gaussian 95 % C.I. conf_int = ( _stats.t.interval( 0.95, len(free_energies) - 1, mean_free_energy, scale=_stats.sem(free_energies), )[1] - mean_free_energy ) # 95 % C.I. # Write overall MBAR stats to file with open(f"{self.output_dir}/overall_stats.dat", "a") as ofile: if get_frnrg: ofile.write( "###################################### Free Energies ########################################\n" ) ofile.write( f"Mean free energy: {mean_free_energy: .3f} + /- {conf_int:.3f} kcal/mol\n" ) for i in range(len(free_energies)): ofile.write( f"Free energy from run {i + 1}: {free_energies[i]: .3f} +/- {errors[i]:.3f} kcal/mol\n" ) ofile.write( "Errors are 95 % C.I.s based on the assumption of a Gaussian distribution of free energies\n" ) ofile.write(f"Runs analysed: {run_nos}\n") # Plot overlap matrices and PMFs _plot_overlap_mats( output_dir=self.output_dir, nlam=len(self.lam_windows), mbar_outfiles=mbar_outfiles, ) _plot_mbar_pmf(mbar_outfiles, self.output_dir) equilibrated_gradient_data = _GradientData( lam_winds=self.lam_windows, equilibrated=True ) _plot_overlap_mats( output_dir=self.output_dir, nlam=len(self.lam_windows), predicted=True, gradient_data=equilibrated_gradient_data, ) # Plot RMSDS if plot_rmsds: self._logger.info("Plotting RMSDs") _plot_rmsds( lam_windows=self.lam_windows, output_dir=self.output_dir, selection="resname LIG and (not name H*)", ) # Analyse the gradient data and make plots self._logger.info("Plotting gradients data") equilibrated_gradient_data = _GradientData( lam_winds=self.lam_windows, equilibrated=True, run_nos=run_nos ) for plot_type in [ "mean", "stat_ineff", "integrated_sem", "integrated_var", "pred_best_simtime", ]: _plot_gradient_stats( gradients_data=equilibrated_gradient_data, output_dir=self.output_dir, plot_type=plot_type, ) _plot_gradient_hists( gradients_data=equilibrated_gradient_data, output_dir=self.output_dir, run_nos=run_nos, ) _plot_gradient_timeseries( gradients_data=equilibrated_gradient_data, output_dir=self.output_dir, run_nos=run_nos, ) # Make plots of equilibration time self._logger.info("Plotting equilibration times") _plot_equilibration_time( lam_windows=self.lam_windows, output_dir=self.output_dir ) # Check and plot the Gelman-Rubin stat rhat_dict = _check_equil_multiwindow_gelman_rubin( lambda_windows=self.lam_windows, output_dir=self.output_dir ) rhat_equil = {lam: rhat < 1.1 for lam, rhat in rhat_dict.items()} for lam, equil in rhat_equil.items(): if not equil: self._logger.warning( f"The Gelman-Rubin statistic for lambda = {lam} is greater than 1.1. " "This suggests that the repeat simulations have not converged to the " "same distirbution and there is a sampling issue." ) # Write out stats with open(f"{self.output_dir}/overall_stats.dat", "a") as ofile: for win in self.lam_windows: ofile.write( f"Equilibration time for lambda = {win.lam}: {win.equil_time:.3f} ns per simulation\n" ) ofile.write( f"Total time simulated for lambda = {win.lam}: {win.sims[0].tot_simtime:.3f} ns per simulation\n" ) if get_frnrg: self._logger.info( f"Overall free energy changes: {free_energies} kcal mol-1" ) # type: ignore self._logger.info(f"Overall errors: {errors} kcal mol-1") # type: ignore self._logger.info(f"Analysed runs: {run_nos}") # Update the interally-stored results self._delta_g = free_energies self._delta_g_er = errors return free_energies, errors # type: ignore else: return None, None finally: # Ensure that all plotting resources are closed _plt.close("all")
[docs] def get_results_df(self, save_csv: bool = True) -> _pd.DataFrame: """ Return the results in dataframe format Parameters ---------- save_csv : bool, optional, default=True Whether to save the results as a csv file Returns ------- results_df : pd.DataFrame A dataframe containing the results """ # Call the superclass method but avoid trying to get results from # the sub_sim_runners, which do not have calculated free energy changes return super().get_results_df(save_csv=save_csv, add_sub_sim_runners=False)
[docs] def analyse_convergence( self, slurm: bool = False, run_nos: _Optional[_List[int]] = None, mode: str = "cumulative", fraction: float = 1, equilibrated: bool = True, ) -> _Tuple[_np.ndarray, _np.ndarray]: """ Get a timeseries of the total free energy change of the stage against total simulation time. Also plot this. This is kept separate from the analyse method as it is expensive to run. Parameters ---------- slurm: bool, optional, default=False Whether to use slurm for the analysis. run_nos : List[int], Optional, default: None The run numbers to analyse. If None, all runs will be analysed. mode : str, optional, default="cumulative" "cumulative" or "block". The type of averaging to use. In both cases, 20 MBAR evaluations are performed. fraction : float, optional, default=1 The fraction of the total simulation time to use for the analysis. For example, if fraction=0.5, only the first 50 % of the simulation time will be used for the analysis. equilibrated: bool, optional, default=True Whether to analyse only the equilibrated data (True) or all data (False) Returns ------- slurm: bool, optional, default=False Whether to use slurm for the analysis. fracts : np.ndarray The fraction of the total (equilibrated) simulation time for each value of dg_overall. dg_overall : np.ndarray The overall free energy change for the stage for each value of total (equilibrated) simtime for each of the ensemble size repeats. """ run_nos = self._get_valid_run_nos(run_nos) self._logger.info("Analysing convergence...") # Check the assumption that all simulation times are the same av_simtime = self.get_tot_simtime(run_nos=run_nos) / len(run_nos) if not all( [ _np.isclose( self.get_tot_simtime(run_nos=[run_no]), av_simtime, rtol=1e-2 ) for run_no in run_nos ] ): raise RuntimeError( "Not all simulation times are the same. Convergence analysis cannot be performed." ) # Get the dg_overall in terms of fraction of the total simulation time # Use steps of 5 % of the total simulation time fracts = _np.arange(0.05, 1.05, 0.05) # Only analyse up to specified fraction of total simulation data fracts = fracts * fraction end_percents = fracts * 100 dg_overall = _np.zeros(len(fracts)) # If cumulative mode, start from start from the beginning of the simulation each time if mode == "cumulative": start_percents = _np.zeros(len(fracts)) elif mode == "block": start_percents = _np.arange(0.00, 1.00, 0.05) * 100 * fraction else: raise ValueError("mode must be 'cumulative' or 'block'") # Make sure to re-write the equilibrated simfiles if equilibrated: for win in self.lam_windows: win._write_equilibrated_simfiles() if not slurm: # Now run mbar with multiprocessing to speed things up with _get_context("spawn").Pool() as pool: results = pool.starmap( _run_mbar, [ ( self.output_dir, run_nos, end_percent, start_percent, False, # Subsample True, # Delete output files equilibrated, # Equilibrated ) for start_percent, end_percent in zip( start_percents, end_percents ) ], ) else: # Use slurm frac_jobs = [] results = [] for start_percent, end_percent in zip(start_percents, end_percents): frac_jobs.append( _submit_mbar_slurm( output_dir=self.output_dir, virtual_queue=self.virtual_queue, slurm_config=self.analysis_slurm_config, run_nos=run_nos, percentage_end=end_percent, percentage_start=start_percent, subsampling=False, equilibrated=equilibrated, ) ) for frac_job in frac_jobs: jobs, mbar_outfiles, tmp_simfiles = frac_job results.append( _collect_mbar_slurm( output_dir=self.output_dir, run_nos=run_nos, jobs=jobs, mbar_out_files=mbar_outfiles, virtual_queue=self.virtual_queue, tmp_files=tmp_simfiles, ) ) dg_overall = _np.array( [result[0] for result in results] ).transpose() # result[0] is a 2D array for a given percent mbar_grads = [ result[3] for result in results ] # result[3] is a Dict of gradient data for a given percent self._logger.info(f"Overall free energy changes: {dg_overall} kcal mol-1") self._logger.info(f"Fractions of (equilibrated) simulation time: {fracts}") # Save the convergence information as an attribute self._delta_g_convergence = dg_overall self._delta_g_convergence_fracts = fracts # Plot the overall convergence and the squared SEM of the free energy change for plot in [_plot_convergence, _plot_sq_sem_convergence]: plot( fracts, dg_overall, self.get_tot_simtime(run_nos=run_nos), ( self.equil_time if equilibrated else 0 ), # Already per member of the ensemble self.output_dir, len(run_nos), ) # Plot the convergence of the MBAR gradients _plot_mbar_gradient_convergence( fracts=fracts, mbar_grads=mbar_grads, simtime_per_run=self.get_tot_simtime( run_nos=[1] ), # Assumes all simulation times the same equil_time_per_run=self.equil_time if equilibrated else 0, output_dir=self.output_dir, ) return fracts, dg_overall
def setup(self) -> None: raise NotImplementedError("Stages are set up when they are created") def _mv_output(self, save_name: str) -> None: """ Move the output directory to a new location, without changing self.output_dir. Parameters ---------- save_name : str The new name of the old output directory. """ self._logger.info(f"Moving contents of output directory to {save_name}") base_dir = _pathlib.Path(self.output_dir).parent.resolve() _os.rename(self.output_dir, _os.path.join(base_dir, save_name))
[docs] def wait(self) -> None: """Wait for the stage to finish running.""" # Override the base class method so that we can update the # virtual queue # Give the simulations a chance to start _sleep(self.slurm_config.queue_check_interval) self.virtual_queue.update() while self.running: _sleep(self.slurm_config.queue_check_interval) # Check every 30 seconds self.virtual_queue.update()
def _wait_ignoring_thread(self) -> None: """Wait for the stage to finish running, ignoring the thread.""" _sleep(self.slurm_config.queue_check_interval) self.virtual_queue.update() # Superclass implementation of running ignores the thread while super().running: _sleep(self.slurm_config.queue_check_interval) self.virtual_queue.update() @property def running(self) -> bool: """Check if the stage is running.""" # Override the base class method so that we can also check for active threads live_thread = ( self.run_thread.is_alive() if self.run_thread is not None else False ) return super().running or live_thread
[docs] def update(self, save_name: str = "output_saved") -> None: """ Delete the current set of lamda windows and simulations, and create a new set of simulations based on the current state of the stage. This is useful if you want to change the number of simulations per lambda window, or the number of lambda windows. Parameters ---------- save_name : str, default "output_saved" The name of the directory to save the old output directory to. """ if self.running: raise RuntimeError("Can't update while ensemble is running") if _os.path.isdir(self.output_dir): self._mv_output(save_name) old_lam_vals_attrs = self.lam_windows[0].__dict__ self._logger.info("Deleting old lambda windows and creating new ones...") self._sub_sim_runners = [] lam_val_weights = self.lam_val_weights for i, lam_val in enumerate(self.lam_vals): lam_base_dir = _os.path.join(self.output_dir, f"lambda_{lam_val:.3f}") new_lam_win = _LamWindow( lam=lam_val, lam_val_weight=lam_val_weights[i], virtual_queue=self.virtual_queue, runtime_constant=self.runtime_constant, relative_simulation_cost=self.relative_simulation_cost, ensemble_size=self.ensemble_size, base_dir=lam_base_dir, input_dir=self.input_dir, stream_log_level=self.stream_log_level, slurm_config=self.slurm_config, analysis_slurm_config=self.analysis_slurm_config, engine_config=self.engine_config.copy(), ) # Overwrite the default equilibration detection algorithm new_lam_win.check_equil = old_lam_vals_attrs["check_equil"] self.lam_windows.append(new_lam_win)
[docs] def clean(self, clean_logs=False) -> None: """ Clean the simulation runner by deleting all files with extensions matching self.__class__.run_files in the base and output dirs, and resetting the total runtime to 0. Also flush the virtual queue to remove any remaining simulations. Parameters ---------- clean_logs : bool, default=False If True, also delete the log files. """ self.virtual_queue._flush() super().clean(clean_logs=clean_logs)
class StageContextManager: """Stage context manager to ensure that all stages are killed when the context is exited.""" def __init__(self, stages: _List[Stage]): """ Parameters ---------- stages : List[Stage] The stages to kill when the context is exited. """ self.stages = stages def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): for stage in self.stages: stage.kill()