Source code for a3fe.run.lambda_window

"""Functionality for running simulations at a single lambda value."""

__all__ = ["LamWindow"]

import glob as _glob
import logging as _logging
import os as _os
import subprocess as _subprocess
from copy import deepcopy as _deepcopy
from typing import List as _List
from typing import Optional as _Optional
from typing import Union as _Union

import numpy as _np

from ..analyse.detect_equil import check_equil_chodera as _check_equil_chodera
from ..analyse.detect_equil import (
    dummy_check_equil_multiwindow as _dummy_check_equil_multiwindow,
)
from ..configuration import EngineType as _EngineType
from ..configuration import SlurmConfig as _SlurmConfig
from ..configuration import _EngineConfig
from ._simulation_runner import SimulationRunner as _SimulationRunner
from ._virtual_queue import VirtualQueue as _VirtualQueue
from .simulation import Simulation as _Simulation


[docs]class LamWindow(_SimulationRunner): """A class to hold and manipulate a set of SOMD simulations at a given lambda value.""" equil_detection_methods = { "multiwindow": _dummy_check_equil_multiwindow, "chodera": _check_equil_chodera, } runtime_attributes = _deepcopy(_SimulationRunner.runtime_attributes) runtime_attributes["_equilibrated"] = False runtime_attributes["_equil_time"] = None
[docs] def __init__( self, lam: float, virtual_queue: _VirtualQueue, lam_val_weight: _Optional[float] = None, block_size: float = 1, equil_detection: str = "multiwindow", slurm_equil_detection: bool = True, gradient_threshold: _Optional[float] = None, runtime_constant: _Optional[float] = 0.0005, relative_simulation_cost: float = 1, ensemble_size: int = 5, base_dir: _Optional[str] = None, input_dir: _Optional[str] = None, output_dir: _Optional[str] = None, stream_log_level: int = _logging.INFO, slurm_config: _Optional[_SlurmConfig] = None, analysis_slurm_config: _Optional[_SlurmConfig] = None, engine_config: _Optional[_EngineConfig] = None, engine_type: _EngineType = _EngineType.SOMD, update_paths: bool = True, ) -> None: """ Initialise a LamWindow object. Parameters ---------- lam : float Lambda value for the simulation. virtual_queue : VirtualQueue VirtualQueue object to use for submitting jobs. lam_val_weight : float, Optional, default: None Weight to use for this lambda value in the free energy calculation (e.g. from Trapezoidal rule). block_size : float, Optional, default: 1 Size of the blocks to use for equilibration detection, in ns. Only used for the block gradient equilibration detection method. equil_detection : str, Optional, default: "multiwindow" Method to use for equilibration detection. Options are: - "multiwindow": Use the multiwindow paired t-test method to detect equilibration. - "block_gradient": Use the gradient of the block averages to detect equilibration. - "chodera": Use Chodera's method to detect equilibration. slurm_equil_detection : bool, Optional, default: True Whether to use SLURM to run the equilibration detection MBAR calculations. gradient_threshold : float, Optional, default: None The threshold for the absolute value of the gradient, in kcal mol-1 ns-1, below which the simulation is considered equilibrated. If None, no theshold is set and the simulation is equilibrated when the gradient passes through 0. A sensible value appears to be 0.5 kcal mol-1 ns-1. Only required when the equilibration detection method is "block_gradient". runtime_constant: float, Optional, default: 0.0005 The runtime_constant (kcal**2 mol**-2 ns*-1) only affects behaviour if running adaptively, and must be supplied if running adaptively. This is used to calculate how long to run each simulation for based on the current uncertainty of the per-window free energy estimate, as discussed in the docstring of the run() method. relative_simlation_cost : float, Optional, default: 1 The relative cost of the simulation for a given runtime. This is used to calculate the predicted optimal runtime during adaptive simulations. The recommended use is to set this to 1 for the bound leg and to (speed of bound leg / speed of free leg) for the free leg. ensemble_size : int, Optional, default: 5 Number of simulations to run at this lambda value. base_dir : str, Optional, default: None Path to the base directory. If None, this is set to the current working directory. input_dir : str, Optional, default: None Path to directory containing input files for the simulations. If None, this will be set to "current_working_directory/input". output_dir : str, Optional, default: None Path to directory to store output files from the simulations. If None, this will be set to "current_working_directory/output". stream_log_level : int, Optional, default: logging.INFO Logging level to use for the steam file handlers for the Ensemble object and its child objects. slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler. If None, the default partition is used. analysis_slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler for the analysis. This is helpful e.g. if you want to submit analysis to the CPU partition, but the main simulation to the GPU partition. If None, engine_config: EngineConfig, default: None Configuration for the engine. If None, the default configuration is used. engine_type: EngineType, default: EngineType.SOMD The type of engine to use for the production simulations. update_paths: bool, Optional, default: True If true, if the simulation runner is loaded by unpickling, then update_paths() is called. Returns ------- None """ # Set the lamdbda value first, as this is required for __str__, # and therefore the super().__init__ call self.lam = lam super().__init__( base_dir=base_dir, input_dir=input_dir, output_dir=output_dir, stream_log_level=stream_log_level, ensemble_size=ensemble_size, update_paths=update_paths, slurm_config=slurm_config, analysis_slurm_config=analysis_slurm_config, engine_config=engine_config, engine_type=engine_type, dump=False, ) if not self.loaded_from_pickle: self.lam_val_weight = lam_val_weight self.virtual_queue = virtual_queue self.block_size = block_size if equil_detection not in self.equil_detection_methods: raise ValueError( f"Equilibration detection method {equil_detection} not recognised." ) # Need to pass self object to equilibration detection function self.check_equil = self.equil_detection_methods[equil_detection] self.slurm_equil_detection = slurm_equil_detection # Ensure that we do not try to use a gradient threshold with a method that does not use it if equil_detection != "block_gradient" and gradient_threshold is not None: raise ValueError( "Gradient threshold can only be set for block gradient method." ) self.gradient_threshold = gradient_threshold self.runtime_constant = runtime_constant self.relative_simulation_cost = relative_simulation_cost self._running: bool = False # Create the required simulations for this lambda value for run_no in range(1, ensemble_size + 1): # Copy the input files over to the simulation base directory, # which is also the simulation input directory and output directory run_name = "run_" + str(run_no).zfill(2) sim_base_dir = _os.path.join(self.base_dir, run_name) _subprocess.call(["mkdir", "-p", sim_base_dir]) for file in _glob.glob(_os.path.join(self.input_dir, "*")): # If the file is a coordinates, topology, or restraint file, # make a symbolic link to it, otherwise copy it if file.endswith((".rst7", ".prm7", ".txt")): _subprocess.call( [ "ln", "-s", _os.path.relpath(file, sim_base_dir), sim_base_dir, ] ) else: _subprocess.call(["cp", file, sim_base_dir]) self.sims.append( _Simulation( lam=lam, run_no=run_no, virtual_queue=virtual_queue, base_dir=sim_base_dir, input_dir=sim_base_dir, output_dir=sim_base_dir, stream_log_level=stream_log_level, slurm_config=self.slurm_config, analysis_slurm_config=self.analysis_slurm_config, engine_config=self.engine_config.copy(), engine_type=self.engine_type, ) ) # Save the state and update log self._update_log() self._dump()
def __str__(self) -> str: return f"LamWindow (lam={self.lam:.3f})" @property def sims(self) -> _List[_Simulation]: return self._sub_sim_runners @sims.setter def legs(self, value) -> None: self._logger.info("Modifying/ creating simulations") self._sub_sim_runners = value
[docs] def run(self, run_nos: _Optional[_List[int]] = None, runtime: float = 2.5) -> None: """ Run all simulations at the lambda value. Parameters ---------- run_nos : List[int], Optional, default: None List of run numbers to run. If None, all simulations are run. runtime : float, Optional, default: 2.5 Runtime of simulation, in ns. Returns ------- None """ run_nos = self._get_valid_run_nos(run_nos) # Run the simulations self._logger.info(f"Running simulations {run_nos} for {runtime:.3f} ns") for run_no in run_nos: # type: ignore self.sims[run_no - 1].run(runtime) self._running = True
[docs] def kill(self) -> None: """Kill all simulations at the lambda value.""" if self.running: self._logger.info("Killing all simulations") for sim in self.sims: sim.kill() self._running = False
[docs] def get_tot_simtime(self, run_nos: _Optional[_List[int]] = None) -> float: """ Get the total simulation time for all specified runs, in ns. Parameters ---------- run_nos : List[int], Optional, default: None The run numbers to use for MBAR. If None, all runs will be used. Returns ------- tot_simtime : float Total simulation time, in ns. """ run_nos = self._get_valid_run_nos(run_nos) return sum([self.sims[run_no - 1].get_tot_simtime() for run_no in run_nos])
[docs] def get_tot_gpu_time(self, run_nos: _Optional[_List[int]] = None) -> float: """ Get the total GPU time for all specified runs, in GPU hours. Parameters ---------- run_nos : List[int], Optional, default: None The run numbers to use for MBAR. If None, all runs will be used. Returns ------- tot_gpu_time : float Total GPU time, in GPU hours. """ run_nos = self._get_valid_run_nos(run_nos) return sum([self.sims[run_no - 1].get_tot_gpu_time() for run_no in run_nos])
[docs] def is_equilibrated(self, run_nos: _Optional[_List[int]] = None) -> bool: """ Check if the ensemble of simulations at the lambda window is equilibrated, based on the run numbers specified and the equilibration detection method. Store the equilibration status and time in private variables if so. Parameters ---------- run_nos : List[int], Optional, default: None The run numbers to equilibration detection. If None, all runs will be used. Returns ------- equilibrated : bool True if the simulation is equilibrated, False otherwise. """ self._equilibrated, self._equil_time = self.check_equil(self, run_nos=run_nos) return self._equilibrated
@property def equilibrated(self) -> bool: """Whether equilibration has been achieved.""" return self._equilibrated @property def equil_time(self) -> _Union[float, None]: """The equilibration time in ns, per run.""" return self._equil_time # ns @property def failed_simulations(self) -> _List[_SimulationRunner]: """The failed simulations""" return [sim for sim in self.sims if sim.failed]
[docs] def set_equilibration_time(self, equil_time: float) -> None: """ Set the equilibration time for the simulation runner and any sub-simulation runners. Parameters ---------- equil_time : float The equilibration time to set, in ns per run per lambda window. """ self._logger.info(f"Setting equilibration time to {equil_time:.3f} ns per run") self._equilibrated = True self._equil_time = equil_time
def _write_equilibrated_simfiles(self) -> None: """ Remove unequilibrated data from simulation files for all simulations at the lambda window, and write a new simulation file containing only the equilibrated data. This is the natural place for this function because the equilibration time is stored at the lambda window level. """ # Check that we have the required equilibration data if self.equil_time is None: raise ValueError( "Equilibration time not set. " "Please run is_equilibrated() before calling this function." ) # Get the index of the first equilibrated data point # Minus 1 because first energy is only written after the first nrg_freq steps equil_index = ( int( self._equil_time / ( self.sims[0].engine_config.timestep * self.sims[0].engine_config.energy_frequency ) ) - 1 # type: ignore ) # Write the equilibrated data for each simulation for sim in self.sims: in_simfile = sim.output_dir + "/simfile.dat" out_simfile = sim.output_dir + "/simfile_equilibrated.dat" with open(in_simfile, "r") as ifile: lines = ifile.readlines() # Figure out how many lines come before the data non_data_lines = 0 for line in lines: if line.startswith("#"): non_data_lines += 1 else: break # Overwrite the original file with one containing only the equilibrated data with open(out_simfile, "w") as ofile: # First, write the header for line in lines[:non_data_lines]: ofile.write(line) # Now write the data, skipping the non-equilibrated portion for line in lines[equil_index + non_data_lines :]: ofile.write(line) def _get_rolling_average( self, data: _np.ndarray, idx_block_size: int ) -> _np.ndarray: """ Calculate the rolling average of a 1D array. Parameters ---------- data : np.ndarray 1D array of data to be block averaged. idx_block_size : int Index size of the blocks to be used in the block average. Returns ------- block_av : np.ndarray 1D array of block averages of the same length as data. Initial values (before there is sufficient data to calculate a block average) are set to nan. """ if idx_block_size > len(data): raise ValueError( "Block size cannot be larger than the length of the data array." ) rolling_av = _np.full(len(data), _np.nan) for i in range(len(data)): if i < idx_block_size: continue else: block_av = _np.mean(data[i - idx_block_size : i]) rolling_av[i] = block_av return rolling_av
[docs] def analyse(self) -> None: raise NotImplementedError( "Analysis cannot be performed for a single lambda window." )
[docs] def analyse_convergence(self) -> None: raise ( NotImplementedError( "Convergence analysis is not performed for a single lambda window, only " " at the level of a stage or above." ) )
def setup(self) -> None: raise NotImplementedError("LamWindows are set up when they are created")