mlatom.interfaces.xtb_interface 源代码

#!/usr/bin/env python3
'''
.. code-block::

  !---------------------------------------------------------------------------! 
  ! xtb: interface to the xtb program                                         ! 
  ! Implementations by: Pavlo O. Dral                                         !
  !---------------------------------------------------------------------------! 
'''

import os
import numpy as np
import sys
import subprocess
from .. import constants, models
from ..decorators import doc_inherit

[文档] class xtb_methods(models.OMP_model, metaclass=models.meta_method): ''' xTB interface Arguments: method (str): xTB methods read_keywords_from_file (str): keywords used in xTB .. note:: GFN2-xTB and GFN2-xTB* (remove D4 from GFN2-xTB) are available. Examples: .. code-block:: from ml.interfaces.xtb import xtb_methods() # read molecule from xyz file mol = ml.data.molecule() mol.read_from_xyz_file('sp.xyz') # initialize xtb methods model = xtb_methods(method='GFN2-xTB) # calculate energy, gradients and hessian model.predict(molecule=mol, calculate_energy_gradients=True, calculate_hessian=True) print(mol.energy) ''' def __init__(self, method='GFN2-xTB', read_keywords_from_file='', nthreads=None, **kwargs): self.method = method if self.method.lower() == 'GFN2-xTB*'.lower(): self.without_d4 = True else: self.without_d4 = False self.read_keywords_from_file = read_keywords_from_file if 'solvent' in kwargs: self.solvent = kwargs['solvent'] else: self.solvent = None try: self.xtbbin = os.environ['xtb'] except: self.xtbbin = "%s/xtb" % os.path.dirname(__file__) if nthreads is None: from multiprocessing import cpu_count self.nthreads = cpu_count() else: self.nthreads = nthreads if 'stacksize' in kwargs: os.environ["OMP_STACKSIZE"] = kwargs['stacksize'] if 'save_files_in_current_directory' in kwargs: self.save_files_in_current_directory = kwargs['save_files_in_current_directory'] else: self.save_files_in_current_directory = False
[文档] @doc_inherit def predict(self, molecular_database=None, molecule=None, calculate_energy=True, calculate_energy_gradients=False, calculate_hessian=False, calculate_dipole_derivatives=False,calculate_polarizability_derivatives=False): molDB = super().predict(molecular_database=molecular_database, molecule=molecule) if calculate_dipole_derivatives or calculate_polarizability_derivatives or self.without_d4: self.xtbbin = "%s/xtb" % os.path.dirname(__file__) self.additional_xtb_keywords = [] if self.read_keywords_from_file != '': kw_file = self.read_keywords_from_file with open(kw_file, 'r') as fxtbkw: for line in fxtbkw: self.additional_xtb_keywords = line.split() imol = -1 for mol in molDB.molecules: imol += 1 jmol = imol if len(self.additional_xtb_keywords) < imol+1: jmol = -1 mol.additional_xtb_keywords = self.additional_xtb_keywords[jmol] import tempfile with tempfile.TemporaryDirectory() as tmpdirname: if self.save_files_in_current_directory: tmpdirname = '.' self.tmpdirname = os.path.abspath(tmpdirname) for ii, mol in enumerate(molDB.molecules): xyzfilename = f'{tmpdirname}/predict{ii}.xyz' mol.write_file_with_xyz_coordinates(filename = xyzfilename) self.xtbargs = [self.xtbbin, xyzfilename] if mol.charge != 0: self.xtbargs += ['-c', '%d' % mol.charge] # there is a bug in xtb - it does not read --charg number_of_unpaired_electrons = mol.multiplicity - 1 self.xtbargs += ['-u', '%d' % number_of_unpaired_electrons] # there is a bug in xtb - it does not read --uhf # with open(f'{tmpdirname}/.CHRG', 'w') as fcharge, open(f'{tmpdirname}/.UHF', 'w') as fuhf: # fcharge.writelines(f'{mol.charge}\n') # fuhf.writelines(f'{number_of_unpaired_electrons}\n') if self.without_d4: self.xtbargs += ['--withoutd4'] if self.solvent: self.xtbargs += ['--alpb', self.solvent] if calculate_energy and not calculate_energy_gradients and not calculate_hessian: self.predict_energy(mol) if calculate_energy_gradients: self.predict_energy_gradients(mol) if calculate_hessian: self.predict_hessian(mol,calculate_polarizability_derivatives, calculate_dipole_derivatives)
def predict_energy( self, mol, ): terminated = False outputs = [] while not terminated: stdout, stderr = self.run_xtb_job(self.xtbargs+self.additional_xtb_keywords,self.tmpdirname) rerun, terminated = self.error_handle(stdout+stderr) xtb_scf_successful = not rerun mol.__dict__['xtb_scf_successful'] = xtb_scf_successful if xtb_scf_successful: for readable in stdout: outputs.append(readable) if 'TOTAL ENERGY' in readable: energy = float(readable.split()[3]) mol.energy = energy if 'dispersion' in readable: dispersion = float(readable.split()[3]) mol.__dict__['D4_dispersion_from_GFN2xTB'] = dispersion if 'Gsolv' in readable: solvation_free_energy = float(readable.split()[3]) mol.solvation_free_energy = solvation_free_energy for iline in range(len(outputs)): if 'molecular dipole:' in outputs[iline]: sum_line = outputs[iline+3].strip().split()[1:] dipole_moment = [eval(each) for each in sum_line] mol.dipole_moment = dipole_moment else: print('xTB calculation failed.') sys.stdout.flush() def predict_energy_gradients( self, mol, ): terminated = False outputs = [] while not terminated: stdout, stderr = self.run_xtb_job(self.xtbargs + ['--grad'] + self.additional_xtb_keywords, self.tmpdirname) rerun, terminated = self.error_handle(stdout+stderr) xtb_scf_successful = not rerun mol.__dict__['xtb_scf_successful'] = xtb_scf_successful if xtb_scf_successful: for readable in stdout: outputs.append(readable) if 'TOTAL ENERGY' in readable: energy = float(readable.split()[3]) mol.energy = energy if 'dispersion' in readable: dispersion = float(readable.split()[3]) mol.__dict__['D4_dispersion_from_GFN2xTB'] = dispersion if 'Gsolv' in readable: solvation_free_energy = float(readable.split()[3]) mol.solvation_free_energy = solvation_free_energy with open(f'{self.tmpdirname}/gradient', 'r') as fout: iatom = -1 for line in fout: if len(line.split()) != 3: continue iatom += 1 mol.atoms[iatom].energy_gradients = np.array([float(xx) / constants.Bohr2Angstrom for xx in line.split()]).astype(float) for iline in range(len(outputs)): if 'molecular dipole:' in outputs[iline]: sum_line = outputs[iline+3].strip().split()[1:] dipole_moment = [eval(each) for each in sum_line] mol.dipole_moment = dipole_moment else: print('xTB calculation failed.') sys.stdout.flush() def predict_hessian( self, mol, calculate_polarizability_derivatives, calculate_dipole_derivatives ): terminated = False outputs = [] natoms = len(mol.atoms) while not terminated: if not calculate_polarizability_derivatives: stdout, stderr = self.run_xtb_job(self.xtbargs + ['--hess'] + self.additional_xtb_keywords, self.tmpdirname) else: stdout, stderr = self.run_xtb_job(self.xtbargs + ['--hess','--ptb','--alpha'] + self.additional_xtb_keywords, self.tmpdirname) rerun, terminated = self.error_handle(stdout+stderr) xtb_scf_successful = not rerun mol.__dict__['xtb_scf_successful'] = xtb_scf_successful # There is bug in xtb that it fails for H2, thus, it may end with error but still prints hessian. if xtb_scf_successful or os.path.exists(f'{self.tmpdirname}/hessian'): for readable in stdout: outputs.append(readable) if 'TOTAL ENERGY' in readable: energy = float(readable.split()[3]) mol.energy = energy if 'dispersion' in readable: dispersion = float(readable.split()[3]) mol.__dict__['D4_dispersion_from_GFN2xTB'] = dispersion if 'Gsolv' in readable: solvation_free_energy = float(readable.split()[3]) mol.solvation_free_energy = solvation_free_energy with open(f'{self.tmpdirname}/hessian', 'r') as fout: nlines = 0 hess = [] for line in fout: nlines += 1 if nlines == 1: continue for xx in line.split(): hess.append(float(xx) / (constants.Bohr2Angstrom**2)) hess = np.array(hess).astype(float) mol.hessian = hess.reshape(natoms*3,natoms*3) # Read dipole derivatives if calculate_dipole_derivatives: if os.path.exists(f'{self.tmpdirname}/dipd'): with open(f'{self.tmpdirname}/dipd', 'r') as fout: dipd = [] for iline, line in enumerate(fout.readlines()): if iline == 0: continue dipd += [float(each) for each in line.split()] dipd = np.array(dipd).astype(float) mol.dipole_derivatives = dipd / constants.Debye # Read polarizability derivatives if calculate_polarizability_derivatives: if os.path.exists(f'{self.tmpdirname}/polard'): with open(f'{self.tmpdirname}/polard','r') as fout: polard = [] for iline,line in enumerate(fout.readlines()): if iline == 0: continue polard += [float(each) for each in line.split()] # for ii in range(6): # # polard += [float(each) for each in line.split()] # polard.append(float(line[5+15*ii:5+15*(ii+1)])) polard = np.array(polard).astype(float) mol.polarizability_derivatives = polard else: print('xTB calculation failed.') sys.stdout.flush() def run_xtb_job(self,xtbargs,tmpdirname): segmentation_error = False floating_point_error = False other_error = None stderr = []; stdout = [] try: proc = subprocess.run(xtbargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tmpdirname, universal_newlines=True, check=True) stdout = proc.stdout.split('\n') stderr = proc.stderr.split('\n') except subprocess.CalledProcessError as e: if e.returncode == -11: # kill signals: SIGSEGV segmentation_error = True elif e.returncode == -6: floating_point_error = True elif e.returncode == 1: # not errors of system signal proc = subprocess.Popen(xtbargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tmpdirname, universal_newlines=True) stdout, stderr = proc.communicate() stdout = stdout.split('\n') stderr = stderr.split('\n') else: other_error = e.returncode if segmentation_error: stderr.append('segmentation fault') if floating_point_error: stderr.append('floating-point exceptions') if other_error: stderr.append(f'killed signal {other_error}') return stdout, stderr def error_handle(self,stds): terminated = True rerun = True for readable in stds: if 'normal termination of xtb' in readable: terminated = True rerun = False return rerun, terminated if 'segmentation fault' in readable.lower(): # memory problem if "OMP_STACKSIZE" not in os.environ: print('OMP stacksize is too small for the system. Try to increase to 1G') sys.stdout.flush() os.environ["OMP_STACKSIZE"] = '1G' rerun = True terminated = False return rerun, terminated else: raise ValueError('OMP stacksize is too small for the system.') if 'solv_model_loadInternalParam' in readable: raise ValueError(f'Please specify correct solvent name for xTB calculation.') if 'floating-point exceptions' in readable.lower(): print('xTB calculation WARNING: Floating-point exceptions are signalling') sys.stdout.flush() continue if 'killed signal' in readable.lower(): raise ValueError(f'xTB calculation errors terminated by the system with {readable}') # convergence error not implemented return rerun, terminated
if __name__ == '__main__': pass