"""Functionality for manipulating sets of Calculations"""
__all__ = ["CalcSet"]
import logging as _logging
import os as _os
from typing import Dict as _Dict
from typing import Iterable as _Iterable
from typing import List as _List
from typing import Optional as _Optional
import numpy as _np
from scipy import stats as _stats
from ..analyse.analyse_set import compute_stats as _compute_stats
from ..analyse.plot import plot_against_exp as _plt_against_exp
from ..configuration import SlurmConfig as _SlurmConfig
from ..configuration import _BaseSystemPreparationConfig, _EngineConfig
from ..read._read_exp_dgs import read_exp_dgs as _read_exp_dgs
from ._simulation_runner import SimulationRunner as _SimulationRunner
from ._utils import SimulationRunnerIterator as _SimulationRunnerIterator
from .calculation import Calculation as _Calculation
[docs]class CalcSet(_SimulationRunner):
"""
Class to set up, run, and analyse sets of ABFE calculations
(each represented by Calculation objects). This runs calculations
sequentially to avoid overloading the system.
"""
[docs] def __init__(
self,
calc_paths: _Optional[_List] = None,
calc_args: _Dict[str, _Dict] = {},
base_dir: _Optional[str] = None,
input_dir: _Optional[str] = None,
output_dir: _Optional[str] = None,
stream_log_level: int = _logging.INFO,
slurm_config: _Optional[_SlurmConfig] = None,
analysis_slurm_config: _Optional[_SlurmConfig] = None,
engine_config: _Optional[_EngineConfig] = None,
update_paths: bool = True,
) -> None:
"""
Instantiate a calculation based on files in the input dir. If calculation.pkl exists in the
base directory, the calculation will be loaded from this file and any arguments
supplied will be overwritten.
Parameters
----------
calc_paths: List, Optional, default: None
List of paths to the Calculation base directories. If None, then all directories
in the current directory will be assumed to be calculation base directories
calc_args: Dict[str: _Dict], Optional, default: {}
Dictionary of kwargsto pass to the Calculation objects.
base_dir : str, Optional, default: None
Path to the base directory which contains all the Calculations. If None,
this is set to the current working directory.
input_dir : str, Optional, default: None
Path to directory containing input files for example experimental free
energy changes. If None, this is set to `current_working_directory/input`.
output_dir : str, Optional, default: None
Path to directory containing output files. If None, this
is set to `current_working_directory/output`.
stream_log_level : int, Optional, default: logging.INFO
Logging level to use for the steam file handlers for the
set object and its child objects.
slurm_config: SlurmConfig, default: None
Configuration for the SLURM job scheduler. If None, the
default partition is used.
analysis_slurm_config: SlurmConfig, default: None
Configuration for the SLURM job scheduler for the analysis.
This is helpful e.g. if you want to submit analysis to the CPU
partition, but the main simulation to the GPU partition. If None,
engine_config: EngineConfig, default: None
Configuration for the engine. If None, the default configuration is used.
update_paths: bool, Optional, default: True
If True, if the simulation runner is loaded by unpickling, then
update_paths() is called.
Returns
-------
None
"""
super().__init__(
base_dir=base_dir,
input_dir=input_dir,
output_dir=output_dir,
stream_log_level=stream_log_level,
update_paths=update_paths,
slurm_config=slurm_config,
analysis_slurm_config=analysis_slurm_config,
engine_config=engine_config,
)
if not self.loaded_from_pickle:
# Load/ create the Calculations - temporarily shift to the Calculation base dir
if not calc_paths: # If not supplied, assume that every sub-directory is a calculation base dir
calc_paths = [
directory
for directory in _os.listdir()
if _os.path.isdir(directory)
]
self.calc_paths = [_os.path.abspath(directory) for directory in calc_paths]
# Ensure that all calculations share the same slurm config by adding this if it is not present
if calc_args.get("slurm_config") is None:
calc_args["slurm_config"] = self.slurm_config
if calc_args.get("analysis_slurm_config") is None:
calc_args["analysis_slurm_config"] = self.analysis_slurm_config
self._calc_args = calc_args
# Ensure that all calculations share the same somd config by adding this if it is not present
if calc_args.get("engine_config") is None:
calc_args["engine_config"] = self.engine_config
self._calc_args = calc_args
# Check that we can load all of the calculations
for calc in self.calcs:
# Having set them up according to _calc_args, save them
calc._dump()
# Save the state and update log
self._update_log()
self._dump()
@property
def _sub_sim_runners(self) -> _SimulationRunnerIterator[_Calculation]:
return _SimulationRunnerIterator(
getattr(self, "calc_paths", []),
_Calculation,
**getattr(self, "_calc_args", {}),
)
@_sub_sim_runners.setter
def _sub_sim_runners(self, value: _Iterable) -> None:
"""
Do nothing. This is required for compatibility with the parent class
, which sets _sub_sim_runners to an empty list in __init__
"""
@property
def calcs(self) -> _SimulationRunnerIterator[_Calculation]: # type: ignore
return self._sub_sim_runners
@calcs.setter
def calcs(self, value: _Calculation) -> None:
"""Take the Calculation and add its base directory to the calc_paths list"""
self._logger.info(
f"Adding base directory {value.base_dir} to list of stored calculation base dirs"
)
self.calc_paths.append(value.base_dir)
[docs] def setup(
self,
sysprep_config: _Optional[_BaseSystemPreparationConfig] = None,
) -> None:
"""
Set up all calculations sequentially.
Parameters
----------
sysprep_config: _BaseSystemPreparationConfig, opttional, default = None
The system preparation configuration to use for all calculations. If None, the default
configuration is used.
"""
for calc in self.calcs:
if calc.setup_complete:
self._logger.info(
f"Calculation in {calc.base_dir} is already set up. Skipping..."
)
continue
try:
self._logger.info(f"Setting up calculation in {calc.base_dir}")
calc.setup(sysprep_config=sysprep_config)
self._logger.info(f"Calculation in {calc.base_dir} successfully set up")
except Exception as e:
self._logger.error(
f"Error setting up calculation in {calc.base_dir}: {e}"
)
raise e
[docs] def get_optimal_lam_vals(
self,
simtime: float = 0.1,
er_type: str = "root_var",
delta_er: float = 2,
set_relative_sim_cost: bool = True,
reference_sim_cost: float = 0.21,
run_nos: _List[int] = [1],
) -> None:
"""
Determine the optimal lambda windows for each stage of each leg of each
calculation by running short simulations at each lambda value and analysing them,
using only a single run. Optionally, determine the simulation cost
and recursively set the relative simulation cost according reference_sim_cost.
Parameters
----------
simtime : float, Optional, default: 0.1
The length of the short simulations to run, in ns. If None is provided,
it is assumed that the simulations have already been run and the
optimal lambda values are extracted from the output files.
er_type: str, Optional, default="root_var"
Whether to integrate the standard error of the mean ("sem") or root
variance of the gradients ("root_var") to calculate the optimal
lambda values.
delta_er : float, default=2
If er_type == "root_var", the desired integrated root variance of the gradients
between each lambda value, in kcal mol^(-1). If er_type == "sem", the
desired integrated standard error of the mean of the gradients between each lambda
value, in kcal mol^(-1) ns^(1/2). A sensible default for root_var is 2 kcal mol-1,
and 0.1 kcal mol-1 ns^(1/2) for sem. This is referred to as 'thermodynamic speed'
in the publication.
set_relative_sim_cost: bool, optional, default=True
Whether to recursively set the relative simulation cost for the leg and all
sub simulation runners according to the mean simulation cost of the leg.
reference_sim_cost: float, optional, default=0.16
The reference simulation cost to use if set_relative_sim_cost is True, in hr / ns.
The default of 0.21 is the average bound leg simulation cost from a test set of ligands
of a range of system sizes on RTX 2080s. This is used to set the relative simulation
cost according to average_sim_cost / reference_sim_cost.
run_nos : List[int], optional, default=[1]
The run numbers to use for the calculation. Only 1 is run by default, so by default
we only analyse 1. If using delta_er = "sem", more than one run must be specified.
Returns
-------
None
"""
self._logger.info(
"Determining optimal lambda windows for each stage of every calculation..."
)
for calc in self.calcs:
self._logger.info(
f"Determining optimal lambda windows for {calc.base_dir}..."
)
calc.get_optimal_lam_vals(
simtime=simtime,
er_type=er_type,
delta_er=delta_er,
set_relative_sim_cost=set_relative_sim_cost,
reference_sim_cost=reference_sim_cost,
run_nos=run_nos,
)
# Save state
self._dump()
[docs] def run(
self,
run_nos: _Optional[_List[int]] = None,
adaptive: bool = True,
runtime: _Optional[float] = None,
runtime_constant: _Optional[float] = None,
run_stages_parallel: bool = False,
) -> None:
r"""
Run all calculations. Analysis is not performed by default. If running adaptively,
cycles of short runs then optimal runtime estimation are performed, where the optimal
runtime is estimated according to
.. math::
t_{\\mathrm{Optimal, k}} = \\sqrt{\\frac{t_{\\mathrm{Current}, k}}{C}}\\sigma_{\\mathrm{Current}}(\\Delta \\widehat{F}_k)
where:
- :math:`t_{\\mathrm{Optimal, k}}` is the calculated optimal runtime for lambda window :math:`k`
- :math:`t_{\\mathrm{Current}, k}` is the current runtime for lambda window :math:`k`
- :math:`C` is the runtime constant
- :math:`\sigma_{\\mathrm{Current}}(\\Delta \\widehat{F}_k)` is the current uncertainty in the free energy change contribution for lambda window :math:`k`. This is estimated from inter-run deviations.
- :math:`\Delta \\widehat{F}_k` is the free energy change contribution for lambda window :math:`k`
Parameters
----------
run_nos : List[int], Optional, default: None
List of run numbers to run. If None, all runs will be run.
adaptive : bool, Optional, default: True
If True, the stages will run until the simulations are equilibrated and perform analysis afterwards.
If False, the stages will run for the specified runtime and analysis will not be performed.
runtime : float, Optional, default: None
If adaptive is False, runtime must be supplied and stage will run for this number of nanoseconds.
runtime_constant: float, Optional, default: None
The runtime_constant (kcal**2 mol**-2 ns*-1) only affects behaviour if running adaptively. This is used
to calculate how long to run each simulation for based on the current uncertainty of the per-window
free energy estimate.
run_stages_parallel: bool, Optional, default: False
If True, the stages for each individual calculation will be run in parallel. Can casuse issues with
QOS limits on HPC clusters as each stage might try to submit jobs at the same time, resulting in
oversubmission of jobs. Each calculation will still be run sequentially.
Returns
-------
None
"""
# Run each calculation sequentially
for calc in self.calcs:
if calc.is_complete:
self._logger.info(
f"Calculation in {calc.base_dir} is already complete. Skipping..."
)
continue
try:
self._logger.info(f"Running calculation in {calc.base_dir}")
calc.run(
run_nos=run_nos,
adaptive=adaptive,
runtime=runtime,
runtime_constant=runtime_constant,
parallel=run_stages_parallel,
)
calc.wait()
self._logger.info(
f"Calculation in {calc.base_dir} completed successfully"
)
except Exception as e:
self._logger.error(f"Error running calculation in {calc.base_dir}: {e}")
raise e
[docs] def analyse(
self,
exp_dgs_path: _Optional[str] = None,
offset: bool = False,
compare_to_exp: bool = True,
reanalyse: bool = False,
slurm: bool = False,
run_nos: _Optional[_List[int]] = None,
subsampling=False,
fraction: float = 1,
plot_rmsds: bool = False,
) -> None:
"""
Analyse all calculations in the set and, if the experimental free
energies are provided, plot the free energy changes with respect
to experiment.
Parameters
----------
exp_dgs_path : str, Optional, default = None
The path to the file containing the experimental free energy
changes. This must be a csv file with the columns:
calc_base_dir, name, exp_dg, exp_err
offset: bool, default = False
If True, the calculated dGs will be offset to match the average
experimental free energies.
compare_to_exp : bool, optional, default=True
Whether to compare the calculated free energies to experimental
free energies. If False, only the calculated free energies will
be analysed. If True, correlation statistics and plots will be
generated.
reanalyse : bool, optional, default=False
Whether to reanalyse the data. If False, any existing data will be used
(e.g. if you have already analysed some calculations). If True, the
data will be reanalysed using the options provided. Reanalysis is useful
when you have changed the analysis options and want to apply them to
existing data.
slurm : bool, optional, default=False
Whether to use slurm for the analysis.
run_nos : List[int], Optional, default=None
A list of the run numbers to analyse. If None, all runs are analysed.
subsampling: bool, optional, default=False
If True, the free energy will be calculated by subsampling using
the methods contained within pymbar.
fraction: float, optional, default=1
The fraction of the data to use for analysis. For example, if
fraction=0.5, only the first half of the data will be used for
analysis. If fraction=1, all data will be used. Note that unequilibrated
data is discarded from the beginning of simulations in all cases.
plot_rmsds: bool, optional, default=False
Whether to plot RMSDS. This is slow and so defaults to False.
"""
if compare_to_exp:
# Make sure that the experimental dGs are provided
if not exp_dgs_path:
raise ValueError(
"Experimental free energy changes must be provided to compare to experimental values"
)
all_dgs = _read_exp_dgs(exp_dgs_path, self.base_dir)
# If experimental dgs are not provided, populate the base dir column
if exp_dgs_path is None:
all_dgs["calc_base_dir"] = self.calc_paths
# In the absence of supplied names (in the experimental dGs file),
# use the base dir name (the stem of the path)
all_dgs["name"] = [
calc_path.split("/")[-1] for calc_path in self.calc_paths
]
all_dgs.set_index("name", inplace=True)
all_dgs["calc_cor"] = 0
# Get the calculated dGs
for calc in self.calcs:
# Get the name of the ligand for the calculation and use this to add the results
# get the tail of the base dir name
name = all_dgs.index[all_dgs["calc_base_dir"] == calc.base_dir]
# Make sure that there is only one row with this name
if not len(name) == 1:
raise ValueError(
f"Found {len(name)} rows matching {calc.base_dir} in experimental dGs file"
)
# Carry out MBAR analysis if it has not been done already,
# or if reanalysis is requested
if calc._delta_g is None or reanalyse:
calc.analyse(
slurm=slurm,
run_nos=run_nos,
subsampling=subsampling,
fraction=fraction,
plot_rmsds=plot_rmsds,
)
# Get the confidence interval
mean_free_energy = _np.mean(calc._delta_g)
conf_int = (
_stats.t.interval(
0.95,
len(calc._delta_g) - 1, # type: ignore
mean_free_energy,
scale=_stats.sem(calc._delta_g),
)[1]
- mean_free_energy
) # 95 % C.I.
all_dgs.loc[name, "calc_dg"] = mean_free_energy
all_dgs.loc[name, "calc_er"] = conf_int
# Offset the calculated values with their corrections
all_dgs["calc_dg"] += all_dgs["calc_cor"]
# Save results including NaNs
all_dgs.to_csv(_os.path.join(self.output_dir, "results_summary.txt"), sep=",")
# Calculate statistics and plot if experimental dGs are provided
if compare_to_exp:
# Check that we have experimental dGs for all calculations
if all_dgs["exp_dg"].isnull().any():
raise ValueError(
"Experimental free energy changes must be provided for all "
"calculations if comparing to experimental values"
)
# Exclude rows with NaN
all_dgs.dropna(inplace=True)
# Offset the results if required
if offset:
shift = all_dgs["exp_dg"].mean() - all_dgs["calc_dg"].mean()
all_dgs["calc_dg"] += shift
# Calculate statistics and save results
stats = _compute_stats(all_dgs)
name = "overall_stats_offset.txt" if offset else "overall_stats.txt"
with open(_os.path.join(self.output_dir, name), "wt") as ofile:
for stat in stats:
ofile.write(
f"{stat}: {stats[stat][0]:.2f} ({stats[stat][1]:.2f}, {stats[stat][2]:.2f})\n"
)
# Plot
_plt_against_exp(
all_results=all_dgs,
output_dir=self.output_dir,
offset=offset,
stats=stats,
)
# Avoid base class methods which aren't valid being called
@property
def delta_g(self):
raise AttributeError("CalcSet objects do not have a delta_g attribute.")
@property
def delta_g_err(self):
raise AttributeError("CalcSet objects do not have a delta_g_err attribute.")
[docs] def get_results_df(self, save_csv: bool = True, add_sub_sim_runners: bool = True):
# TODO: Implement this method
raise NotImplementedError(
"This method is not implemented for CalcSet objects. Use the analyse method to get free energy changes."
)
[docs] def analyse_convergence(
self,
slurm: bool = False,
run_nos: _Optional[_List[int]] = None,
mode: str = "cumulative",
fraction: float = 1,
equilibrated: bool = True,
):
"""
Not implemented for CalcSet objects as convergence analysis is expensive.
Call the analyse_convergence method on individual calculations instead (or
run analyse convergence on each calculation in the set, if you're determined).
"""
raise NotImplementedError(
"This method is not implemented for CalcSet objects (due to high computational ",
"expense. Call the analyse_convergence method on individual calculations instead.",
)