Source code for a3fe.run._simulation_runner

"""Abstract base class for simulation runners."""

from __future__ import annotations

import copy as _copy
import logging as _logging
import os as _os
import pathlib as _pathlib
import pickle as _pkl
import subprocess as _subprocess
from abc import ABC
from itertools import count as _count
from time import sleep as _sleep
from typing import Any as _Any
from typing import Dict as _Dict
from typing import List as _List
from typing import Optional as _Optional
from typing import Tuple as _Tuple
from typing import Union as _Union

import numpy as _np
import pandas as _pd
import scipy.stats as _stats

from .._version import __version__ as _version
from ..analyse.exceptions import AnalysisError as _AnalysisError
from ..analyse.plot import plot_convergence as _plot_convergence
from ..analyse.plot import plot_sq_sem_convergence as _plot_sq_sem_convergence
from ..configuration import EngineType as _EngineType
from ..configuration import SlurmConfig as _SlurmConfig
from ..configuration import _EngineConfig
from ._logging_formatters import _A3feFileFormatter, _A3feStreamFormatter


[docs]class SimulationRunner(ABC): """An abstract base class for simulation runners. Note that self._sub_sim_runners (a list of SimulationRunner objects controlled by the current SimulationRunner) must be set in order to use methods such as run()""" # Count the number of instances so we can name things uniquely # for each instance class_count = _count() # Create list of files to be deleted by self.clean() run_files = ["*.png", "overall_stats.dat", "results.csv"] # Create a dict of attributes which can be modified by algorithms when # running the simulation, but which should be reset if the user wants to # re-run. This takes the form {attribute_name: reset_value} runtime_attributes = {} def __init__( self, base_dir: _Optional[str] = None, input_dir: _Optional[str] = None, output_dir: _Optional[str] = None, slurm_config: _Optional[_SlurmConfig] = None, analysis_slurm_config: _Optional[_SlurmConfig] = None, engine_config: _Optional[_EngineConfig] = None, engine_type: _EngineType = _EngineType.SOMD, stream_log_level: int = _logging.INFO, dg_multiplier: int = 1, ensemble_size: int = 5, update_paths: bool = True, dump: bool = True, ) -> None: """ base_dir : str, Optional, default: None Path to the base directory for the simulation runner. If None, this is set to the current working directory. input_dir : str, Optional, default: None Path to directory containing input files for the simulation runner. If None, this is set to `base_directory/input`. output_dir : str, Optional, default: None Path to the output directory in which to store the output from the simulation. If None, this is set to `base_directory/output`. slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler. If None, the default partition is used. analysis_slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler for the analysis. This is helpful e.g. if you want to submit analysis to the CPU partition, but the main simulation to the GPU partition. If None, the standard slurm_config is used. engine_config: EngineConfig, default: None Configuration for the engine. If None, the default configuration is used. engine_type: EngineType, default: EngineType.SOMD The type of engine to use for the production simulations. stream_log_level : int, Optional, default: logging.INFO Logging level to use for the steam file handlers for the calculation object and its child objects. dg_multiplier : int, Optional, default: 1 +1 or -1. Records whether the free energy change should be multiplied by +1 or -1 when being added to the total free energy change for the super simulation-runner. ensemble_size : int, Optional, default: 5 Number of repeats to run. update_paths: bool, Optional, default: True If True, if the simulation runner is loaded by unpickling, then update_paths() is called. dump: bool, Optional, default: True If True, the state of the simulation runner is saved to a pickle file. """ # Set the version of the simulation runner self._logger = _logging.getLogger(self.__class__.__name__) self._version = _version self._logger.debug( f"Initializing simulation runner with A3fe version: {self._version}" ) # Set up the directories (which may be overwritten if the # simulation runner is subsequently loaded from a pickle file) # Make sure that we always use absolute paths if base_dir is None: base_dir = str(_pathlib.Path.cwd()) else: base_dir = str(_pathlib.Path(base_dir).resolve()) if not _pathlib.Path(base_dir).is_dir(): _pathlib.Path(base_dir).mkdir(parents=True) if input_dir is None: input_dir = str(_pathlib.Path(base_dir, "input")) else: input_dir = str(_pathlib.Path(input_dir).resolve()) if output_dir is None: output_dir = str(_pathlib.Path(base_dir, "output")) else: output_dir = str(_pathlib.Path(output_dir).resolve()) # Only create the input and output directories if they're called, using properties self.base_dir = base_dir self._input_dir = input_dir self._output_dir = output_dir # Check if we are starting from a previous simulation runner self.loaded_from_pickle = False if _pathlib.Path(f"{base_dir}/{self.__class__.__name__}.pkl").is_file(): self._load( update_paths=update_paths ) # May overwrite the above attributes and options else: # Initialise sub-simulation runners with an empty list self._sub_sim_runners = [] # Add the dg_multiplier if dg_multiplier not in [-1, 1]: raise ValueError( f"dg_multiplier must be either +1 or -1, not {dg_multiplier}." ) self.dg_multiplier = dg_multiplier # Initialise runtime attributes with default values for attribute, value in self.runtime_attributes.items(): setattr(self, attribute, value) # Create attributes to store the free energy and convergence data self._delta_g: _Union[None, _np.ndarray] = None self._delta_g_er: _Union[None, _np.ndarray] = None self._delta_g_convergence: _Union[None, _np.ndarray] = None self._delta_g_convergence_fracts: _Union[None, _np.ndarray] = None # Register the ensemble size self.ensemble_size = ensemble_size # Set up logging self._stream_log_level = stream_log_level self._set_up_logging() # Create a SLURM config with the default partition if none is provided if slurm_config is None: default_partition = _SlurmConfig.get_default_partition() self._logger.info( f"No SLURM config provided, using default partition {default_partition}" ) self.slurm_config = _SlurmConfig(partition=default_partition) else: self.slurm_config = slurm_config # Use the same SLURM config for analysis if none is provided self.analysis_slurm_config = ( analysis_slurm_config if analysis_slurm_config is not None else self.slurm_config.copy() ) # Create the SOMD config with default values if none is provided self.engine_config = ( engine_config if engine_config is not None else engine_type.engine_config( input_dir=self.input_dir # Use the simulation runner's input directory ) ) self.engine_type = engine_type # Save state if dump: self._dump() def _set_up_logging(self, null: bool = False) -> None: """ Set up the logging for the simulation runner. Parameters ---------- null : bool, optional, default=False Whether to silence all logging by writing to the null logger. """ # If logger exists, remove it and start again if hasattr(self, "_logger"): handlers = self._logger.handlers[:] for handler in handlers: self._logger.removeHandler(handler) handler.close() del self._logger # Name each logger individually to avoid clashes self._logger = _logging.getLogger( f"{str(self)}_{next(self.__class__.class_count)}" ) self._logger.propagate = False self._logger.setLevel(_logging.DEBUG) # For the file handler, we want to log everything file_handler = _logging.FileHandler( f"{self.base_dir}/{self.__class__.__name__}.log" ) file_handler.setFormatter(_A3feFileFormatter()) file_handler.setLevel(_logging.DEBUG) # For the stream handler, we want to log at the user-specified level stream_handler = _logging.StreamHandler() stream_handler.setFormatter(_A3feStreamFormatter()) stream_handler.setLevel(self._stream_log_level) # Add the handlers to the logger self._logger.addHandler(file_handler) self._logger.addHandler(stream_handler) @property def input_dir(self) -> str: """The input directory for the simulation runner.""" if not _pathlib.Path(self._input_dir).is_dir(): _pathlib.Path(self._input_dir).mkdir(parents=True) return self._input_dir @input_dir.setter def input_dir(self, value: str) -> None: f"""Set the input directory for the {self.__class__.__name__}.""" self._input_dir = value @property def output_dir(self) -> str: f"""The output directory for the {self.__class__.__name__}.""" if not _pathlib.Path(self._output_dir).is_dir(): _pathlib.Path(self._output_dir).mkdir(parents=True) return self._output_dir @output_dir.setter def output_dir(self, value: str) -> None: f"""Set the output directory for the {self.__class__.__name__}.""" self._output_dir = value @property def delta_g(self) -> _np.ndarray: f"""The overall free energy change for the {self.__class__.__name__} for each of the ensemble size replicates""" # We haven't yet performed analysis, so analyse if not self._delta_g: self.analyse() # Check that the analysis actually updated the delta_g attribute if self._delta_g is None: raise ValueError( "Analysis failed to update the internal _delta_g attribute" ) return self._delta_g @property def delta_g_er(self) -> _np.ndarray: f"""The overall uncertainties in the free energy changes for the {self.__class__.__name__} for each of the ensemble size replicates""" # We haven't yet performed analysis, so analyse if not self._delta_g_er: self.analyse() # Check that the analysis actually updated the delta_g_er attribute if self._delta_g_er is None: raise ValueError( "Analysis failed to update the internal _delta_g attribute" ) return self._delta_g_er def __str__(self) -> str: return self.__class__.__name__ # def __del__(self) -> None: # self._dump() # Save the state to the pickle file before deletion def run(self, run_nos: _Optional[_List[int]] = None, *args, **kwargs) -> None: f""" Run the {self.__class__.__name__} Parameters ---------- run_nos : List[int], Optional, default=None A list of the run numbers to run. If None, all runs are run. *args, **kwargs Any additional arguments to pass to the run method of the sub-simulation runners. """ run_nos = self._get_valid_run_nos(run_nos) self._logger.info( f"Running run numbers {run_nos} for {self.__class__.__name__}..." ) for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.run(run_nos=run_nos, *args, **kwargs) def _get_valid_run_nos(self, run_nos: _Optional[_List[int]] = None) -> _List[int]: """ Check the requested run numbers are valid, and return a list of all run numbers if None was passed. Parameters ---------- run_nos : List[int], Optional, default=None A list of the run numbers to run. If None, all runs are returned. Returns ------- run_nos : List[int] A list of valid run numbers. """ if run_nos is not None: # Check that the run numbers correspond to valid runs if any([run_no > self.ensemble_size for run_no in run_nos]): raise ValueError( f"Invalid run numbers {run_nos}. All run numbers must be less than or equal to {self.ensemble_size}" ) # Check that no run numbers are repeated if len(run_nos) != len(set(run_nos)): raise ValueError( f"Invalid run numbers {run_nos}. All run numbers must be unique" ) # Check that the run numbers are greater than 0 if any([run_no < 1 for run_no in run_nos]): raise ValueError( f"Invalid run numbers {run_nos}. All run numbers must be greater than 0" ) else: run_nos = list(range(1, self.ensemble_size + 1)) return run_nos def kill(self) -> None: f"""Kill the {self.__class__.__name__}""" self._logger.info(f"Killing {self.__class__.__name__}...") for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.kill() def setup(self) -> None: f"""Set up the {self.__class__.__name__} and all sub-simulation runners.""" self._logger.info(f"Setting up {self.__class__.__name__}...") for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.setup()
[docs] def analyse( self, slurm: bool = True, run_nos: _Optional[_List[int]] = None, subsampling=False, fraction: float = 1, plot_rmsds: bool = False, ) -> _Tuple[_np.ndarray, _np.ndarray]: """ Analyse the simulation runner and any sub-simulations, and return the overall free energy change. Parameters ---------- slurm : bool, optional, default=True Whether to use slurm for the analysis. run_nos : List[int], Optional, default=None A list of the run numbers to analyse. If None, all runs are analysed. subsampling: bool, optional, default=False If True, the free energy will be calculated by subsampling using the methods contained within pymbar. fraction: float, optional, default=1 The fraction of the data to use for analysis. For example, if fraction=0.5, only the first half of the data will be used for analysis. If fraction=1, all data will be used. Note that unequilibrated data is discarded from the beginning of simulations in all cases. plot_rmsds: bool, optional, default=False Whether to plot RMSDS. This is slow and so defaults to False. Returns ------- dg_overall : np.ndarray The overall free energy change for each of the ensemble size repeats. er_overall : np.ndarray The overall error for each of the ensemble size repeats. """ run_nos = self._get_valid_run_nos(run_nos) self._logger.info(f"Analysing runs {run_nos} for {self.__class__.__name__}...") dg_overall = _np.zeros(len(run_nos)) er_overall = _np.zeros(len(run_nos)) # Check that this is not still running if self.running: raise RuntimeError( f"Cannot perform analysis as {self.__class__.__name__} is still running" ) # Check that none of the simulations have failed failed_sims_list = self.failed_simulations if failed_sims_list: self._logger.error( "Unable to perform analysis as several simulations did not complete successfully" ) self._logger.error("Please check the output in the following directories:") for failed_sim in failed_sims_list: self._logger.error(failed_sim.base_dir) raise RuntimeError( "Unable to perform analysis as several simulations did not complete successfully" ) # Analyse the sub-simulation runners for sub_sim_runner in self._sub_sim_runners: dg, er = sub_sim_runner.analyse( slurm=slurm, run_nos=run_nos, subsampling=subsampling, fraction=fraction, plot_rmsds=plot_rmsds, ) # Decide if the component should be added or subtracted # according to the dg_multiplier attribute dg_overall += dg * sub_sim_runner.dg_multiplier er_overall = _np.sqrt(er_overall**2 + er**2) # Log the overall free energy changes self._logger.info(f"Overall free energy changes: {dg_overall} kcal mol-1") self._logger.info(f"Overall errors: {er_overall} kcal mol-1") # Calculate the 95 % confidence interval assuming Gaussian errors mean_free_energy = _np.mean(dg_overall) # Gaussian 95 % C.I. conf_int = ( _stats.t.interval( 0.95, len(dg_overall) - 1, mean_free_energy, scale=_stats.sem(dg_overall), )[1] - mean_free_energy ) # 95 % C.I. # Write overall MBAR stats to file with open(f"{self.output_dir}/overall_stats.dat", "a") as ofile: ofile.write( "###################################### Free Energies ########################################\n" ) ofile.write( f"Mean free energy: {mean_free_energy: .3f} + /- {conf_int:.3f} kcal/mol\n" ) for i in range(self.ensemble_size): ofile.write( f"Free energy from run {i + 1}: {dg_overall[i]: .3f} +/- {er_overall[i]:.3f} kcal/mol\n" ) ofile.write( "Errors are 95 % C.I.s based on the assumption of a Gaussian distribution of free energies\n" ) # Update internal state with result self._delta_g = dg_overall self._delta_g_er = er_overall return dg_overall, er_overall
[docs] def get_results_df( self, save_csv: bool = True, add_sub_sim_runners: bool = True, ) -> _pd.DataFrame: """ Return the results in dataframe format Parameters ---------- save_csv : bool, optional, default=True Whether to save the results as a csv file add_sub_sim_runners : bool, optional, default=True Whether to show the results from the sub-simulation runners. Returns ------- results_df : pd.DataFrame A dataframe containing the results """ # Create a dataframe to store the results headers = [ "dg / kcal mol-1", "dg_95_ci / kcal mol-1", "tot_simtime / ns", "tot_gpu_time / GPU hours", ] results_df = _pd.DataFrame(columns=headers) if add_sub_sim_runners: # Add the results for each of the sub-simulation runners for sub_sim_runner in self._sub_sim_runners: sub_results_df = sub_sim_runner.get_results_df(save_csv=save_csv) results_df = _pd.concat([results_df, sub_results_df]) try: # To extract the overall free energy changes from a previous call of analyse() dgs = self._delta_g ers = self._delta_g_er except AttributeError: raise _AnalysisError( f"Analysis has not been performed for {self.__class__.__name__}. Please call analyse() first." ) if dgs is None or ers is None: raise _AnalysisError( f"Analysis has not been performed for {self.__class__.__name__}. Please call analyse() first." ) # Calculate the 95 % confidence interval assuming Gaussian errors mean_free_energy = _np.mean(dgs) conf_int = ( _stats.t.interval( 0.95, len(dgs) - 1, # type: ignore mean_free_energy, scale=_stats.sem(dgs), )[1] - mean_free_energy ) # 95 % C.I. new_row = { "dg / kcal mol-1": round(mean_free_energy, 2), "dg_95_ci / kcal mol-1": round(conf_int, 2), "tot_simtime / ns": round(self.tot_simtime), "tot_gpu_time / GPU hours": round(self.tot_gpu_time), } # Get the index name if hasattr(self, "stage_type"): index_prefix = f"{self.stage_type.name.lower()}_" elif hasattr(self, "leg_type"): index_prefix = f"{self.leg_type.name.lower()}_" else: index_prefix = "" results_df.loc[index_prefix + self.__class__.__name__.lower()] = new_row # Get the normalised GPU time results_df["normalised_gpu_time"] = ( results_df["tot_gpu_time / GPU hours"] / self.tot_gpu_time ) # Round to 3 s.f. results_df["normalised_gpu_time"] = results_df["normalised_gpu_time"].apply( lambda x: round(x, 3) ) if save_csv: results_df.to_csv(f"{self.output_dir}/results.csv") return results_df
[docs] def analyse_convergence( self, slurm: bool = False, run_nos: _Optional[_List[int]] = None, mode: str = "cumulative", fraction: float = 1, equilibrated: bool = True, ) -> _Tuple[_np.ndarray, _np.ndarray]: """ Get a timeseries of the total free energy change of the sub-simulation runner against total simulation time. Also plot this. Keep this separate from analyse as it is expensive to run. Parameters ---------- slurm: bool, optional, default=False Whether to use slurm for the analysis. run_nos : List[int], Optional, default=None A list of the run numbers to analyse. If None, all runs are analysed. mode : str, optional, default="cumulative" "cumulative" or "block". The type of averaging to use. In both cases, 20 MBAR evaluations are performed. fraction: float, optional, default=1 The fraction of the data to use for analysis. For example, if fraction=0.5, only the first half of the data will be used for analysis. If fraction=1, all data will be used. Note that unequilibrated data is discarded from the beginning of simulations in all cases. equilibrated: bool, optional, default=True Whether to analyse only the equilibrated data (True) or all data (False) Returns ------- fracts : np.ndarray The fraction of the total (equilibrated) simulation time for each value of dg_overall. dg_overall : np.ndarray The overall free energy change for the {self.__class__.__name__} for each value of total (equilibrated) simtime for each of the ensemble size repeats. """ run_nos = self._get_valid_run_nos(run_nos) self._logger.info( f"Analysing convergence of {self.__class__.__name__} for runs {run_nos}..." ) # Get the dg_overall in terms of fraction of the total simulation time # Use steps of 5 % of the total simulation time fracts = _np.arange(0.05, 1.05, 0.05) # Only analyse up to specified fraction of total simulation data fracts = fracts * fraction # Create an array to store the overall free energy change dg_overall = _np.zeros((len(run_nos), len(fracts))) # Now add up the data for each of the sub-simulation runners for sub_sim_runner in self._sub_sim_runners: _, dgs = sub_sim_runner.analyse_convergence( slurm=slurm, run_nos=run_nos, mode=mode, fraction=fraction, equilibrated=equilibrated, ) # Decide if the component should be added or subtracted # according to the dg_multiplier attribute dg_overall += dgs * sub_sim_runner.dg_multiplier self._logger.info(f"Overall free energy changes: {dg_overall} kcal mol-1") self._logger.info(f"Fractions of (equilibrated) simulation time: {fracts}") # Save the convergence information as an attribute self._delta_g_convergence = dg_overall self._delta_g_convergence_fracts = fracts # Plot the overall convergence and the squared SEM of the free energy change for plot in [_plot_convergence, _plot_sq_sem_convergence]: plot( fracts, dg_overall, self.get_tot_simtime(run_nos=run_nos), ( self.equil_time if equilibrated else 0 ), # Already per member of the ensemble self.output_dir, len(run_nos), ) return fracts, dg_overall
@property def running(self) -> bool: f"""Check if the {self.__class__.__name__} is running.""" return any([sub_sim_runner.running for sub_sim_runner in self._sub_sim_runners]) def wait(self) -> None: f"""Wait for the {self.__class__.__name__} to finish running.""" # Give the simulation runner a chance to start _sleep(self.slurm_config.queue_check_interval) while self.running: _sleep(self.slurm_config.queue_check_interval) # Check every 30 seconds def get_tot_simtime(self, run_nos: _Optional[_List[int]] = None) -> float: f""" Get the total simulation time in ns for the {self.__class__.__name__}. and any sub-simulation runners. Parameters ---------- run_nos : List[int], Optional, default=None A list of the run numbers to analyse. If None, all runs are analysed. Returns ------- tot_simtime : float The total simulation time in ns. """ run_nos = self._get_valid_run_nos(run_nos) return sum( [ sub_sim_runner.get_tot_simtime(run_nos=run_nos) for sub_sim_runner in self._sub_sim_runners ] ) # ns def get_tot_gpu_time(self, run_nos: _Optional[_List[int]] = None) -> float: f""" Get the total simulation time in GPU hours for the {self.__class__.__name__}. and any sub-simulation runners. Parameters ---------- run_nos : List[int], Optional, default=None A list of the run numbers to analyse. If None, all runs are analysed. Returns ------- tot_gpu_time : float The total simulation time in GPU hours. """ run_nos = self._get_valid_run_nos(run_nos) return sum( [ sub_sim_runner.get_tot_gpu_time(run_nos=run_nos) for sub_sim_runner in self._sub_sim_runners ] ) @property def tot_simtime(self) -> float: f"""The total simulation time in ns for the {self.__class__.__name__} and any sub-simulation runners.""" return sum( [ sub_sim_runner.get_tot_simtime() for sub_sim_runner in self._sub_sim_runners ] ) # ns @property def tot_gpu_time(self) -> float: f"""The total simulation time in GPU hours for the {self.__class__.__name__} and any sub-simulation runners.""" return sum( [ sub_sim_runner.get_tot_gpu_time() for sub_sim_runner in self._sub_sim_runners ] ) # GPU hours @property def failed_simulations(self) -> _List[SimulationRunner]: """The failed sub-simulation runners""" return [ failure for sub_sim_runner in self._sub_sim_runners for failure in sub_sim_runner.failed_simulations ] def is_equilibrated(self, run_nos: _Optional[_List[int]] = None) -> bool: f""" Whether the {self.__class__.__name__} is equilibrated. This updates the _equilibrated and _equil_time attributes of the lambda windows, which are accessed by the equilibrated and equil_time properties. Parameters ---------- run_nos : List[int], Optional, default=None A list of the run numbers to check for equilibration. If None, all runs are analysed. Returns ------- equilibrated : bool Whether the {self.__class__.__name__} is equilibrated. """ run_nos = self._get_valid_run_nos(run_nos) return all( [ sub_sim_runner.is_equilibrated(run_nos=run_nos) for sub_sim_runner in self._sub_sim_runners ] ) @property def equilibrated(self) -> float: f"""Whether the {self.__class__.__name__} is equilibrated.""" return all( [sub_sim_runner.equilibrated for sub_sim_runner in self._sub_sim_runners] ) @property def equil_time(self) -> float: """ The equilibration time, per member of the ensemble, in ns, for the and any sub-simulation runners. """ return sum( [sub_sim_runner.equil_time for sub_sim_runner in self._sub_sim_runners] ) # ns def _refresh_logging(self) -> None: """Refresh the logging for the simulation runner and all sub-simulation runners.""" self._set_up_logging() for sub_sim_runner in self._sub_sim_runners: sub_sim_runner._refresh_logging()
[docs] def recursively_get_attr(self, attr: str) -> _Dict[SimulationRunner, _Any]: """ Get the values of the attribute for the simulation runner and any sub-simulation runners. If the attribute is not present for a sub-simulation runner, None is returned. Parameters ---------- attr : str The name of the attribute to get the values of. Returns ------- attr_values : Dict[SimulationRunner, Any] A dictionary of the attribute values for the simulation runner and any sub-simulation runners. """ attrs_dict = {} attrs_dict[attr] = getattr(self, attr, None) if self._sub_sim_runners: attrs_dict["sub_sim_runners"] = {} for sub_sim_runner in self._sub_sim_runners: attrs_dict["sub_sim_runners"][sub_sim_runner] = ( sub_sim_runner.recursively_get_attr(attr=attr) ) return attrs_dict
[docs] def recursively_set_attr( self, attr: str, value: _Any, force: bool = False, silent: bool = False ) -> None: """ Set the attribute to the value for the simulation runner and any sub-simulation runners. Parameters ---------- attr : str The name of the attribute to set the values of. value : Any The value to set the attribute to. force : bool, default=False If True, set the attribute even if it doesn't exist. silent: bool, default=False If True, don't log the setting of the attribute or raise any warnings. """ # Don't set the attribute if it doesn't exist if not hasattr(self, attr): if not force and not silent: self._logger.warning( f"The {self.__class__.__name__} does not have the attribute {attr} and this will not be created." ) if force and not silent: self._logger.info( f"Setting the attribute {attr} to {value} even though it does not exist." ) setattr(self, attr, value) else: if not silent: self._logger.info(f"Setting the attribute {attr} to {value}.") setattr(self, attr, value) for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.recursively_set_attr( attr=attr, value=value, force=force, silent=silent )
[docs] def update_engine_config_option(self, option: str, value: str) -> None: """Update an option in the engine configuration file.""" setattr(self.engine_config, option, value) for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.update_engine_config_option(option, value)
[docs] def set_equilibration_time(self, equil_time: float) -> None: """ Set the equilibration time for the simulation runner and any sub-simulation runners. Parameters ---------- equil_time : float The equilibration time to set, in ns per run per lambda window. """ for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.set_equilibration_time(equil_time)
[docs] def update_paths(self, old_sub_path: str, new_sub_path: str) -> None: """ Replace the old sub-path with the new sub-path in the base, input, and output directory paths. Parameters ---------- old_sub_path : str The old sub-path to replace. new_sub_path : str The new sub-path to replace the old sub-path with. """ # Use private attributes to avoid triggering the property setters # which might cause issues by creating directories for attr in ["base_dir", "_input_dir", "_output_dir"]: setattr(self, attr, getattr(self, attr).replace(old_sub_path, new_sub_path)) # Now update the loggers, which depend on the paths self._set_up_logging() # Also update the loggers of any virtual queues if hasattr(self, "virtual_queue"): # Virtual queue may have already been updated if new_sub_path not in self.virtual_queue.log_dir: # type: ignore self.virtual_queue.log_dir = self.virtual_queue.log_dir.replace( old_sub_path, new_sub_path ) # type: ignore self.virtual_queue._set_up_logging() # type: ignore # Update the paths of any sub-simulation runners for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.update_paths(old_sub_path, new_sub_path)
@property def stream_log_level(self) -> int: """The log level for the stream handler.""" return self._stream_log_level @stream_log_level.setter def stream_log_level(self, value: int) -> None: """Set the log level for the stream handler.""" self._stream_log_level = value # Ensure the new setting is applied self._set_up_logging() if hasattr(self, "_sub_sim_runners"): for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.stream_log_level = value sub_sim_runner._set_up_logging() if hasattr(self, "virtual_queue"): self.virtual_queue.stream_log_level = value # type: ignore
[docs] def clean(self, clean_logs=False) -> None: """ Clean the simulation runner by deleting all files with extensions matching self.__class__.run_files in the base and output dirs, and resetting the total runtime to 0. Parameters ---------- clean_logs : bool, default=False If True, also delete the log files. """ delete_files = self.__class__.run_files for del_file in delete_files: # Delete files in base directory for file in _pathlib.Path(self.base_dir).glob(del_file): self._logger.info(f"Deleting {file}") _subprocess.run(["rm", file]) # Delete files in output directory for file in _pathlib.Path(self.output_dir).glob(del_file): self._logger.info(f"Deleting {file}") _subprocess.run(["rm", file]) # Reset the runtime attributes self.reset(reset_sub_sims=False) if clean_logs: # Delete log file contents without deleting the log files _subprocess.run( [ "truncate", "-s", "0", _os.path.join(self.base_dir, self.__class__.__name__ + ".log"), ] ) # Clean any sub-simulation runners if hasattr(self, "_sub_sim_runners"): for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.clean(clean_logs=clean_logs)
def lighten(self, clean_logs=False) -> None: f"""Lighten the {self.__class__.__name__} by deleting all restart and trajectory files.""" # The function which does the work is defined in Simulation - here # we just need to pass the command down if hasattr(self, "_sub_sim_runners"): for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.lighten()
[docs] def reset(self, reset_sub_sims: bool = True) -> None: """ Reset all attributes changed by the runtime algorithms to their default values. Parameters ---------- reset_sub_sims : bool, default=True If True, also reset any sub-simulation runners. """ for attr, value in self.__class__.runtime_attributes.items(): self._logger.info(f"Resetting the attribute {attr} to {value}.") setattr(self, attr, value) if reset_sub_sims: if hasattr(self, "_sub_sim_runners"): for sub_sim_runner in self._sub_sim_runners: sub_sim_runner.reset()
def _close_logging_handlers(self) -> None: """Close the logging file handlers. This can be useful when loading and closing many Calculations, as deleting the Calculation objects will not close the file handlers.""" handlers = self._logger.handlers[:] for handler in handlers: self._logger.removeHandler(handler) handler.close() for sub_sim_runner in self._sub_sim_runners: sub_sim_runner._close_logging_handlers() def _update_log(self) -> None: """ Update the status log file with the current status of the simulation runner. This is detailed information and so is only visible at the debug log level. """ self._logger.debug("##############################################") for var in vars(self): self._logger.debug(f"{var}: {getattr(self, var)}") self._logger.debug("##############################################") @property def _picklable_copy(self) -> SimulationRunner: """Return a copy of the SimulationRunner which can be pickled.""" picklable_copy = _copy.copy(self) # Remove any threads which can't be pcikled if hasattr(picklable_copy, "run_thread"): picklable_copy.run_thread = None # Now run this recursively on any simulations runners stored # in lists by the current simulation runner for key, val in picklable_copy.__dict__.items(): if isinstance(val, _List): if len(val) > 0: if isinstance(val[0], SimulationRunner): picklable_copy.__dict__[key] = [ sim_runner._picklable_copy for sim_runner in val ] return picklable_copy
[docs] def save(self) -> None: """Save the current state of the simulation object to a pickle file.""" self._dump()
def _dump(self) -> None: """Dump the current state of the simulation object to a pickle file, and do the same for any sub-simulations.""" with open(f"{self.base_dir}/{self.__class__.__name__}.pkl", "wb") as ofile: _pkl.dump(self._picklable_copy.__dict__, ofile) for sub_sim_runner in self._sub_sim_runners: sub_sim_runner._dump() def _load(self, update_paths: bool = True) -> None: """Load the state of the simulation object from a pickle file, and do the same for any sub-simulations. Parameters ---------- update_paths : bool, default=True If True, update the paths of the simulation object and any sub-simulation runners so that the base directory becomes the directory passed to the SimulationRunner, or the current working directory if no directory was passed. """ # Note that we cannot recursively call _load on the sub-simulations # because this results in the creation of different virtual queues for the # stages and sub-lam-windows and simulations if not _pathlib.Path( f"{self.base_dir}/{self.__class__.__name__}.pkl" ).is_file(): raise FileNotFoundError( f"Could not find {self.__class__.__name__}.pkl in {self.base_dir}" ) # Store previous value of base dir before it is potentially overwritten below supplied_base_dir = self.base_dir # Load the SimulationRunner, possibly overwriting directories print( f"Loading previous {self.__class__.__name__}. Any arguments will be overwritten..." ) with open(f"{self.base_dir}/{self.__class__.__name__}.pkl", "rb") as file: self.__dict__ = _pkl.load(file) if update_paths: self.update_paths( old_sub_path=self.base_dir, new_sub_path=supplied_base_dir ) # Refresh logging print("Setting up logging...") self._refresh_logging() # Record that the object was loaded from a pickle file self.loaded_from_pickle = True