Source code for a3fe.run.simulation

"""Functionality to run a single SOMD simulation."""

__all__ = ["Simulation"]

import glob as _glob
import logging as _logging
import os as _os
import pathlib as _pathlib
import subprocess as _subprocess
from typing import List as _List
from typing import Optional as _Optional
from typing import Tuple as _Tuple

import numpy as _np
from sire.units import k_boltz as _k_boltz

from ..configuration import EngineType as _EngineType
from ..configuration import JobStatus as _JobStatus
from ..configuration import SlurmConfig as _SlurmConfig
from ..configuration import _EngineConfig
from ._simulation_runner import SimulationRunner as _SimulationRunner
from ._virtual_queue import Job as _Job
from ._virtual_queue import VirtualQueue as _VirtualQueue


[docs]class Simulation(_SimulationRunner): """Class to store information about a single SOMD simulation.""" required_input_files = [ "somd.prm7", "somd.rst7", "somd.pert", ] # Files to be cleaned by self.clean() run_files = _SimulationRunner.run_files + [ "*.dcd", "*.out", "moves.dat", "simfile.dat", "simfile_equilibrated.dat", "*.s3", "*.s3.previous", "latest.pdb", "gradients.dat", "equilibration_block_gradient.txt", "equilibration_shrinking_block_gradient", ]
[docs] def __init__( self, lam: float, run_no: int, virtual_queue: _VirtualQueue, base_dir: _Optional[str] = None, input_dir: _Optional[str] = None, output_dir: _Optional[str] = None, stream_log_level: int = _logging.INFO, slurm_config: _Optional[_SlurmConfig] = None, analysis_slurm_config: _Optional[_SlurmConfig] = None, engine_config: _Optional[_EngineConfig] = None, engine_type: _EngineType = _EngineType.SOMD, update_paths: bool = True, ) -> None: """ Initialise a Simulation object. Parameters ---------- lam : float Lambda value for the simulation. run_no : int Index of repeat for the simulation. virtual_queue : VirtualQueue Virtual queue object to use for the simulation. base_dir : str, Optional, default: None Path to the base directory. If None, this is set to the current working directory. input_dir : str, Optional, default: None Path to directory containing input files for the simulation. If None, this will be set to "current_working_directory/input". output_dir : str, Optional, default: None Path to directory to store output files from the simulation. If None, this will be set to "current_working_directory/output". stream_log_level : int, Optional, default: logging.INFO Logging level to use for the steam file handlers for the simulation object and its child objects. slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler. If None, the default partition is used. analysis_slurm_config: SlurmConfig, default: None Configuration for the SLURM job scheduler for the analysis. This is helpful e.g. if you want to submit analysis to the CPU partition, but the main simulation to the GPU partition. If None, engine_config: EngineConfig, default: None Configuration for the engine. If None, the default configuration is used. engine_type: EngineType, default: EngineType.SOMD The type of engine to use for the production simulations. update_paths: bool, Optional, default: True If True, if the simulation runner is loaded by unpickling, then update_paths() is called. Returns ------- None """ # Set the lambda value and run number first, as these are # required for __str__, and therefore the super().__init__ call self.lam = lam self.run_no = run_no super().__init__( base_dir=base_dir, input_dir=input_dir, output_dir=output_dir, stream_log_level=stream_log_level, slurm_config=slurm_config, analysis_slurm_config=analysis_slurm_config, engine_config=engine_config, engine_type=engine_type, update_paths=update_paths, dump=False, ) if self.engine_config.lambda_values is None: # type: ignore raise ValueError("No lambda values specified in engine config") if self.lam not in self.engine_config.lambda_values: raise ValueError( f"Lambda value {self.lam} not in list of lambda values: {self.engine_config.lambda_values}" # type: ignore ) if not self.loaded_from_pickle: self.virtual_queue = virtual_queue # Check that the input directory contains the required files self._validate_input() self.job: _Optional[_Job] = None self._running: bool = False # Select the correct rst7 and, if supplied, restraints self._select_input_files() # Save state and update log self._dump() self._update_log()
def __str__(self) -> str: return f"Simulation (lam={self.lam}, run_no={self.run_no})" @property def running(self) -> bool: """ Check if the simulation is still running, and update the running attribute accordingly. Also resubmit job if it has failed. Returns ------- self._running : bool True if the simulation is still running, False otherwise. """ if self.job is None: self._running = False return self._running # Get job ids of currently running jobs - but note that the queue is updated at the # Stage level if self.job in self.virtual_queue.queue: self._running = True self._logger.info("Still running") else: # Must have finished self._logger.info("Not running") self._running = False # Check that job finished successfully if self.job.status == _JobStatus.FINISHED: self._logger.info(f"{self.job} finished successfully") elif self.job.status == "FAILED": old_job = self.job self._logger.info(f"{old_job} failed - resubmitting") # Move log files and s3 files so that the job does not restart _subprocess.run(["mkdir", f"{self.output_dir}/failure"]) for s3_file in _glob.glob(f"{self.output_dir}/*.s3"): _subprocess.run( [ "mv", f"{self.output_dir}/{s3_file}", f"{self.output_dir}/failure", ] ) _subprocess.run( [ "mv", old_job.slurm_outfile, f"{self.output_dir}/failure", ] ) # Now resubmit cmd_list = old_job.command_list self.job = self.virtual_queue.submit( command_list=cmd_list, slurm_file_base=self.slurm_file_base ) self._logger.info(f"{old_job} failed and was resubmitted as {self.job}") self._running = True return self._running def _validate_input(self) -> None: """Check that the required input files are present.""" # Check that the input directory exists if not _os.path.isdir(self.input_dir): raise FileNotFoundError("Input directory does not exist.") # Check that the required input files are present for file in Simulation.required_input_files: if not _os.path.isfile(_os.path.join(self.input_dir, file)): raise FileNotFoundError("Required input file " + file + " not found.") def _select_input_files(self) -> None: """Select the correct rst7 and, if supplied, restraints, according to the run number.""" # Check if we have multiple rst7 files, or only one rst7_files = _glob.glob(_os.path.join(self.input_dir, "*.rst7")) if len(rst7_files) == 0: raise FileNotFoundError("No rst7 files found in input directory") elif len(rst7_files) > 1: # Rename the rst7 file for this run to somd.rst7 and delete any other # rst7 files self._logger.debug("Multiple rst7 files found - renaming") _subprocess.run( [ "mv", _os.path.join(self.input_dir, f"somd_{self.run_no}.rst7"), _os.path.join(self.input_dir, "somd.rst7"), ] ) unwanted_rst7_files = _glob.glob( _os.path.join(self.input_dir, "somd_?.rst7") ) for file in unwanted_rst7_files: _subprocess.run(["rm", file]) else: self._logger.info("Only one rst7 file found - not renaming") # Deal with restraints. Get the name of the restraint file for this run old_restr_file = _os.path.join(self.input_dir, f"restraint_{self.run_no}.txt") # If we already have a restraints.txt file, continue, if _os.path.isfile(_os.path.join(self.input_dir, "restraint.txt")): self._logger.info("restraint.txt file found") elif _os.path.isfile(old_restr_file): self._logger.info("restraint.txt file found - renaming") target_restr_file = _os.path.join(self.input_dir, "restraint.txt") self._logger.info(f"Renaming {old_restr_file} to {target_restr_file}") _subprocess.run(["mv", old_restr_file, target_restr_file]) unwanted_rest_files = _glob.glob( _os.path.join(self.input_dir, "restraint_?.txt") ) for file in unwanted_rest_files: _subprocess.run(["rm", file]) else: self._logger.debug("No restraint file found") @property def slurm_file_base(self) -> str: """Get the base name of the SLURM output file.""" slurm_file_base = self.slurm_config.get_slurm_output_file_base( run_dir=self.input_dir ) self._logger.debug(f"Found slurm output file basename: {slurm_file_base}") return slurm_file_base
[docs] def run(self, runtime: float = 2.5) -> None: """ Run a simulation. Parameters ---------- runtime : float, Optional, default: 2.5 Runtime of simulation, in ns. Returns ------- None """ # Write updated config to file self.engine_config.write_config( run_dir=self.output_dir, lambda_val=self.lam, runtime=runtime, top_file="somd.prm7", # TODO - make generic coord_file="somd.rst7", # TODO - make generic morph_file="somd.pert", # TODO - make generic ) # Get the commands to run the simulation cmd = self.engine_config.get_run_cmd(self.lam) cmd_list = self.slurm_config.get_submission_cmds( cmd=cmd, run_dir=self.output_dir ) self.job = self.virtual_queue.submit( command_list=cmd_list, slurm_file_base=self.slurm_file_base ) self._logger.info(f"Submitted with job {self.job}")
[docs] def get_tot_simtime(self) -> float: """ Get the total simulation time in ns Returns ------- tot_simtime : float Total simulation time in ns. """ data_simfile = f"{self.output_dir}/simfile.dat" if not _pathlib.Path(data_simfile).is_file(): # Simuation has not been run, hence total simulation time is 0 return 0 elif _os.stat(data_simfile).st_size == 0: # Simfile is empty, hence total simulation time is 0 return 0 else: # Read last line of simfile with subprocess to make as fast as possible step = int( _subprocess.check_output( [ "tail", "-1", f"{self.output_dir}/simfile.dat", ] ) .decode("utf-8") .strip() .split()[0] ) return step * (self.engine_config.timestep / 1_000_000) # ns
[docs] def get_tot_gpu_time(self) -> float: """ Get the total simulation time in GPU hours Returns ------- tot_gpu_time : float Total simulation time in GPU hours. """ # Get output files slurm_output_files = self.slurm_output_files # If we don't have any output files, we haven't run any simulations if len(slurm_output_files) == 0: return 0 # Otherwise, add up the simulation time in seconds tot_gpu_time = 0 for file in slurm_output_files: with open(file, "rt") as file: for line in file.readlines(): if line.startswith("Simulation took"): tot_gpu_time += float(line.split(" ")[2]) # And convert to GPU hours return tot_gpu_time / 3600
@property def tot_simtime(self) -> float: """Get the total simulation time in ns""" return self.get_tot_simtime() @property def tot_gpu_time(self) -> float: """Get the total simulation time in GPU hours""" # Get output files return self.get_tot_gpu_time() @property def failed(self) -> bool: """Whether the simulation has failed""" # Check if we are still running if self.running: return False # We are not running, so all slurm output files should contain the # "Simulation took" line if self.slurm_output_files: for file in self.slurm_output_files: with open(file, "rt") as file: failed = True for line in file.readlines(): if line.startswith("Simulation took"): # File shows success, so continue to next file failed = False break # We haven't found "Simulation took" in this file, indicating failure if failed: return True # Either We aren't running and have output files, all with the "Simulation took" line, # or we aren't running and have no output files - either way, we haven't failed return False @property def slurm_output_files(self) -> _List[str]: """Get a list of all slurm output files for this simulation.""" return _glob.glob(f"{self.slurm_file_base}*")
[docs] def kill(self) -> None: """Kill the job.""" if not self.job: raise ValueError("Stage has no job object. Cannot kill job.") if self.job in self.virtual_queue.queue: self._logger.info(f"Killing job {self.job}") self.virtual_queue.kill(self.job)
[docs] def lighten(self) -> None: """Lighten the simulation by deleting all restart and trajectory files.""" delete_files = [ "*.dcd", "*.s3", "*.s3.previous", "gradients.s3", "simfile_equilibrated.dat", "latest.pdb", ] for del_file in delete_files: # Delete files in base directory for file in _pathlib.Path(self.base_dir).glob(del_file): self._logger.info(f"Deleting {file}") _subprocess.run(["rm", file]) # Delete files in output directory for file in _pathlib.Path(self.output_dir).glob(del_file): self._logger.info(f"Deleting {file}") _subprocess.run(["rm", file])
[docs] def read_gradients( self, equilibrated_only: bool = False, endstate: bool = False ) -> _Tuple[_np.ndarray, _np.ndarray]: """ Read the gradients from the output file. These can be either the infiniesimal gradients at the given value of lambda, or the differences in energy between the end state Hamiltonians. Parameters ---------- equilibrated_only : bool, Optional, default: False Whether to read the gradients from the equilibrated region of the simulation (True) or the whole simulation (False). endstate : bool, Optional, default: False Whether to return the difference in energy between the end state Hamiltonians (True) or the infiniesimal gradients at the given value of lambda (False). Returns ------- times : np.ndarray Array of times, in ns. grads : np.ndarray Array of gradients, in kcal/mol. """ # Read the output file if equilibrated_only: with open( _os.path.join(self.output_dir, "simfile_equilibrated.dat"), "r" ) as ifile: lines = ifile.readlines() else: with open(_os.path.join(self.output_dir, "simfile.dat"), "r") as ifile: lines = ifile.readlines() steps = [] grads = [] temp = None # Temperature in K for line in lines: vals = line.split() # Get the temperature, checking the units if line.startswith("#Generating temperature is"): temp = vals[3] try: unit = vals[4] except IndexError: # Must be °C temp, unit = temp.split("°") if unit == "C": temp = float(temp) + 273.15 # Convert to K else: temp = float(temp) # Get the gradients if not line.startswith("#"): step = int(vals[0].strip()) if not endstate: # Return the infinitesimal gradients grad = float(vals[2].strip()) else: # Return the difference in energy between the end state Hamiltonians energy_start = float(vals[5].strip()) energy_end = float(vals[-1].strip()) grad = energy_end - energy_start steps.append(step) grads.append(grad) times = [ x * (self.engine_config.timestep / 1_000_000) for x in steps ] # Timestep already in ns times_arr = _np.array(times) grads_arr = _np.array(grads) # convert gradients to kcal/mol by dividing by beta grads_arr *= temp * _k_boltz.value() return times_arr, grads_arr
[docs] def update_paths(self, old_sub_path: str, new_sub_path: str) -> None: """ Replace the old sub-path with the new sub-path in the base, input, and output directory Parameters ---------- old_sub_path : str The old sub-path to replace. new_sub_path : str The new sub-path to replace the old sub-path with. """ super().update_paths(old_sub_path, new_sub_path)
[docs] def analyse(self) -> None: raise NotImplementedError( "Analysis cannot be performed for a single simulation" )
@property def equil_time(self) -> None: raise NotImplementedError( "Equilibration time is not determined for a single simulation, only " "for an ensemble of simulations within a lambda window." ) @property def equilibrated(self) -> None: raise NotImplementedError( "Equilibration is not detected at the level of single simulations, only " "for an ensemble of simulations within a lambda window." )
[docs] def set_equilibration_time(self, equil_time: float) -> None: raise NotImplementedError( "Equilibration time is not set for a single simulation, only " "for an ensemble of simulations within a lambda window." )
[docs] def analyse_convergence(self) -> None: raise ( NotImplementedError( "Convergence analysis is not performed for a single simulation, only " " at the level of a stage or above." ) )
def setup(self) -> None: raise NotImplementedError("Simulations are set up when they are created")