#!/usr/bin/env python3
'''
.. code-block::
!---------------------------------------------------------------------------!
! models: Module with models !
! Implementations by: Pavlo O. Dral, Fuchun Ge, Yi-Fan Hou, Yuxinxin Chen, !
! Peikun Zheng !
!---------------------------------------------------------------------------!
'''
from __future__ import annotations
from typing import Any, Union, Dict, List, Iterable, Callable
import os, tempfile, uuid, sys
import numpy as np
from collections import UserDict
from . import data, stats, stopper, interfaces
from .decorators import doc_inherit
[文档]
class model():
'''
Parent (super) class for models to enable useful features such as logging during geometry optimizations.
'''
nthreads = 0
def set_num_threads(self, nthreads=0):
# implement for each subclass
if nthreads:
self.nthreads = nthreads
[文档]
def config_multiprocessing(self):
'''
for scripts that need to be executed before running model in parallel
'''
pass
def parse_args(self, args):
# for command-line arguments parsing
pass
def _predict_geomopt(self,
return_string=False,
dump_trajectory_interval=None,
filename=None,
format='json',
print_properties=None,
molecule: data.molecule = None,
calculate_energy: bool = True,
calculate_energy_gradients: bool = True,
**kwargs):
self.predict(molecule=molecule,
calculate_energy=calculate_energy,
calculate_energy_gradients=calculate_energy_gradients,
**kwargs)
if dump_trajectory_interval != None:
opttraj = data.molecular_trajectory()
opttraj.load(filename=filename, format=format)
nsteps = len(opttraj.steps)
if print_properties == 'all' or type(print_properties) == list:
printstrs = []
printstrs += [' %s ' % ('-'*78)]
printstrs += [f' Iteration {nsteps+1}']
printstrs += [' %s \n' % ('-'*78)]
printstrs += [molecule.info(properties=print_properties, return_string=True)]
printstrs = '\n'.join(printstrs) + '\n'
if not return_string:
print(printstrs)
opttraj.steps.append(data.molecular_trajectory_step(step=nsteps, molecule=molecule))
opttraj.dump(filename=filename, format=format)
moldb = data.molecular_database()
moldb.molecules = [each.molecule for each in opttraj.steps]
xyzfilename = os.path.splitext(os.path.basename(filename))[0]
moldb.write_file_with_xyz_coordinates(f'{xyzfilename}.xyz')
if return_string and (dump_trajectory_interval != None) and (print_properties == 'all' or type(print_properties) == list): return printstrs
[文档]
def predict(
self,
molecular_database: data.molecular_database = None,
molecule: data.molecule = None,
calculate_energy: bool = False,
calculate_energy_gradients: bool = False,
calculate_hessian: bool = False,
**kwargs,
):
'''
Make predictions for molecular geometries with the model.
Arguments:
molecular_database (:class:`mlatom.data.molecular_database`, optional): A database contains the molecules whose properties need to be predicted by the model.
molecule (:class:`mlatom.models.molecule`, optional): A molecule object whose property needs to be predicted by the model.
calculate_energy (bool, optional): Use the model to calculate energy.
calculate_energy_gradients (bool, optional): Use the model to calculate energy gradients.
calculate_hessian (bool, optional): Use the model to calculate energy hessian.
'''
# for universal control of predicting behavior
self.set_num_threads()
if molecular_database != None:
molecular_database = molecular_database
elif molecule != None:
molecular_database = data.molecular_database([molecule])
else:
errmsg = 'Either molecule or molecular_database should be provided in input'
raise ValueError(errmsg)
return molecular_database
def _call_impl(self, *args, **kwargs):
return self.predict(*args, **kwargs)
__call__ : Callable[..., Any] = _call_impl
class torch_model(model):
# models that utilize PyTorch should inherit this class
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
import torch
torch.set_num_threads(self.nthreads)
def config_multiprocessing(self):
super().config_multiprocessing()
import torch
torch.set_num_threads(1)
class torchani_model(torch_model):
# models that utilize TorchANI should inherit this class
def config_multiprocessing(self):
return super().config_multiprocessing()
class tensorflow_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["TF_INTRA_OP_PARALLELISM_THREADS"] = str(self.nthreads)
class MKL_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["MKL_NUM_THREADS"] = str(self.nthreads)
class OMP_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["OMP_NUM_THREADS"] = str(self.nthreads)
[文档]
class methods(model):
'''
Create a model object with a specified method.
Arguments:
method (str): Specify the method. Available methods are listed in the section below.
program (str, optional): Specify the program to use.
**kwargs: Other method-specific options
**Available Methods:**
``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'AM1'``, ``'ANI-1ccx'``, ``'ANI-1x'``, ``'ANI-1x-D4'``, ``'ANI-2x'``, ``'ANI-2x-D4'``, ``'CCSD(T)*/CBS'``, ``'CNDO/2'``, ``'D4'``, ``'DFTB0'``, ``'DFTB2'``, ``'DFTB3'``, ``'GFN2-xTB'``, ``'MINDO/3'``, ``'MNDO'``, ``'MNDO/H'``, ``'MNDO/d'``, ``'MNDO/dH'``, ``'MNDOC'``, ``'ODM2'``, ``'ODM2*'``, ``'ODM3'``, ``'ODM3*'``, ``'OM1'``, ``'OM2'``, ``'OM3'``, ``'PM3'``, ``'PM6'``, ``'RM1'``, ``'SCC-DFTB'``, ``'SCC-DFTB-heats'``.
Methods listed above can be accepted without specifying a program.
The required programs still have to be installed though as described in the installation manual.
**Available Programs and Their Corresponding Methods:**
.. table::
:align: center
=============== ==========================================================================================================================================================================
Program Methods
=============== ==========================================================================================================================================================================
TorchANI ``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'ANI-1ccx'``, ``'ANI-1x'``, ``'ANI-1x-D4'``, ``'ANI-2x'``, ``'ANI-2x-D4'``, ``'ANI-1xnr'``
dftd4 ``'AIQM1'``, ``'AIQM1@DFT'``, ``'ANI-1x-D4'``, ``'ANI-2x-D4'``, ``'D4'``
MNDO or Sparrow ``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'MNDO'``, ``'MNDO/d'``, ``'ODM2*'``, ``'ODM3*'``, ``'OM2'``, ``'OM3'``, ``'PM3'``, ``'SCC-DFTB'``, ``'SCC-DFTB-heats'``
MNDO ``'CNDO/2'``, ``'MINDO/3'``, ``'MNDO/H'``, ``'MNDO/dH'``, ``'MNDOC'``, ``'ODM2'``, ``'ODM3'``, ``'OM1'``, semiempirical OMx, DFTB, NDDO-type methods
Sparrow ``'DFTB0'``, ``'DFTB2'``, ``'DFTB3'``, ``'PM6'``, ``'RM1'``, semiempirical OMx, DFTB, NDDO-type methods
xTB ``'GFN2-xTB'``, semiempirical GFNx-TB methods
Orca ``'CCSD(T)*/CBS'``, DFT
Gaussian ab initio methods, DFT
PySCF ab initio methods, DFT
=============== ==========================================================================================================================================================================
'''
methods_map = {
'aiqm1': ['AIQM1', 'AIQM1@DFT', 'AIQM1@DFT*'],
'aiqm2': ['AIQM2', 'AIQM2@DFT'],
'dens': [],
'torchani': ["ANI-1x", "ANI-1ccx", "ANI-2x", 'ANI-1x-D4', 'ANI-2x-D4', 'ANI-1xnr', 'ANI-1ccx-gelu', 'ANI-1ccx-gelu-D4', 'ANI-1x-gelu', 'ANI-1x-gelu-d4'],
'aimnet2': ["AIMNet2@b973c", "AIMNet2@wb97m-d3"],
'mndo': ['ODM2*', 'ODM2', 'ODM3', 'OM3', 'OM2', 'OM1', 'PM3', 'AM1', 'MNDO/d', 'MNDOC', 'MNDO', 'MINDO/3', 'CNDO/2', 'SCC-DFTB', 'SCC-DFTB-heats', 'MNDO/H', 'MNDO/dH'],
'sparrow': ['DFTB0', 'DFTB2', 'DFTB3', 'MNDO', 'MNDO/d', 'AM1', 'RM1', 'PM3', 'PM6', 'OM2', 'OM3', 'ODM2*', 'ODM3*', 'AIQM1'],
'xtb': ['GFN2-xTB', 'GFN2-xTB*'],
'dftd4': ['D4'],
'dftd3': ['d3zero', 'd3bj', 'd3bjm', 'd3zerom', 'd3op'],
'dens': [],
'ccsdtstarcbs': ['CCSD(T)*/CBS'],
# in-interface method searching for a empty list
'pyscf': [],
'gaussian': [],
'columbus': [],
'turbomole': [],
'orca': [],
}
def __init__(self, method: str = None, program: str = None, **kwargs):
# !!! IMPORTANT !!!
# It is neccesary to save all the arguments in the model, otherwise it would not be dumped correctly!
self.method = method
self.program = program
if kwargs != {}: self.kwargs = kwargs
program = self._get_program(method, program)
self.interface = interfaces.__dict__[program]()(method=method, **kwargs)
@property
def nthreads(self):
return self.interface.nthreads
@nthreads.setter
def nthreads(self, nthreads):
self.interface.nthreads = nthreads
[文档]
def predict(self, *args, **kwargs):
self.interface.predict(*args, **kwargs)
[文档]
def config_multiprocessing(self):
super().config_multiprocessing()
self.interface.config_multiprocessing()
@classmethod
def _get_program(cls, method, program):
if program:
if program.lower() not in ['turbomole', 'columbus'] and not method:
raise ValueError('A method must be specified')
if program.casefold() in cls.methods_map:
return program.casefold()
else:
raise ValueError('Unrecognized program')
else:
program_list = []
for program, methods in cls.methods_map.items():
if methods:
if method.casefold() in [m.casefold() for m in methods]:
try:
interfaces.__dict__[program]()()
program_list.append(program)
except:
pass
else:
try:
if interfaces.__dict__[program]().is_available_method(method):
program_list.append(program)
except:
pass
if len(set(program_list)) != 0:
return program_list[0]
raise ValueError("Cannot find appropriate program for the requested method")
@classmethod
def known_methods(cls):
methods = set(method for interfaced_methods in cls.methods_map.values() for method in interfaced_methods)
return methods
@classmethod
def is_known_method(cls, method=None):
methodcasefold = [mm.casefold() for mm in cls.known_methods()]
if method.casefold() in methodcasefold: return True
else: return False
def dump(self, filename=None, format='json'):
model_dict = {'type': 'method'}
for key in self.__dict__:
tt = type(self.__dict__[key])
if tt in [str, dict]:
model_dict[key] = self.__dict__[key]
model_dict['nthreads'] = self.nthreads
if format == 'json':
import json
with open(filename, 'w') as fjson:
json.dump(model_dict, fjson, indent=4)
if format == 'dict':
return model_dict
class meta_method(type):
def __new__(cls, name, bases, namespace, available_methods=[]):
new = super().__new__(cls, name, bases, namespace)
if not available_methods:
available_methods = methods.methods_map[name.split('_')[0]]
new.available_methods = available_methods
return new
def is_available_method(self, method):
return method.casefold() in [m.casefold() for m in self.available_methods]
# Parent model class
[文档]
class ml_model(model):
'''
Useful as a superclass for the ML models that need to be trained.
'''
[文档]
def train(
self,
molecular_database: data.molecular_database,
property_to_learn: Union[str, None] = 'y',
xyz_derivative_property_to_learn: str = None,
) -> None:
'''
Train the model with molecular database provided.
Arguments:
molecular_database (:class:`mlatom.data.molecular_database`): The database of molecules for training.
property_to_learn (str, optional): The label of property to be learned in model training.
xyz_derivative_property_to_learn (str, optional): The label of XYZ derivative property to be learned.
'''
self.set_num_threads()
[文档]
@doc_inherit
def predict(
self,
molecular_database: data.molecular_database = None, molecule: data.molecule = None,
calculate_energy: bool = False, property_to_predict: Union[str, None] = 'estimated_y',
calculate_energy_gradients: bool = False, xyz_derivative_property_to_predict: Union[str, None] = 'estimated_xyz_derivatives_y',
calculate_hessian: bool = False, hessian_to_predict: Union[str, None] = 'estimated_hessian_y',
) -> None:
'''
property_to_predict (str, optional): The label name where the predicted properties to be saved.
xyz_derivative_property_to_predict (str, optional): The label name where the predicted XYZ derivatives to be saved.
hessian_to_predict (str, optional): The label name where the predicted Hessians to be saved.
'''
molecular_database = super().predict(molecular_database=molecular_database, molecule=molecule)
if calculate_energy:
property_to_predict = 'energy'
if calculate_energy_gradients:
xyz_derivative_property_to_predict = 'energy_gradients'
if calculate_hessian:
hessian_to_predict = 'hessian'
return molecular_database, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict
[文档]
def generate_model_dict(self):
'''
Generates model dictionary for dumping in json format.
'''
model_dict = {
'type': 'ml_model',
'ml_model_type': str(type(self)).split("'")[1],
'kwargs': {
'model_file': os.path.abspath(self.model_file)
},
# 'hyperparameters': self.hyperparameters,
'nthreads': self.nthreads,
}
return model_dict
[文档]
def reset(self):
'''
Resets model (deletes the ML model file from the hard disk).
'''
if os.path.exists(self.model_file): os.remove(self.model_file)
[文档]
def dump(self, filename=None, format='json'):
'''
Dumps model class object information in a json file (do not confused with saving the model itself, i.e., its parameters!).
'''
if not self.model_file:
self.save()
model_dict = self.generate_model_dict()
if format == 'json':
import json
with open(filename, 'w') as f:
json.dump(model_dict, f, indent=4)
if format == 'dict':
return model_dict
def parse_args(self, args):
super().parse_args(args)
def parse_hyperparameter_optimization(self, args, arg_key):
space_map = {
'loguniform': 'log',
'uniform': 'linear',
}
if args.hyperparameter_optimization['optimization_algorithm'] == 'tpe':
value = args._hyperopt_str_dict[arg_key]
space = space_map[value.split('(')[0].split('.')[-1]]
lb = float(value.split('(')[1][:-1].split(',')[0])
hb = float(value.split('(')[1][:-1].split(',')[1])
self.hyperparameters[arg_key].optimization_space = space
if space == 'log':
self.hyperparameters[arg_key].minval = 2**lb
self.hyperparameters[arg_key].maxval = 2**hb
else:
self.hyperparameters[arg_key].minval = lb
self.hyperparameters[arg_key].maxval = hb
[文档]
def calculate_validation_loss(self,
training_kwargs=None,
prediction_kwargs=None,
cv_splits_molecular_databases=None, calculate_CV_split_errors=False,
subtraining_molecular_database=None, validation_molecular_database=None,
validation_loss_function=None, validation_loss_function_kwargs={},
debug=False):
'''
Returns the validation loss for the given hyperparameters.
By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients.
Arguments:
training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function.
prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function.
cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`.
calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error.
subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function.
validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function.
validation_loss_function (function, optional): user-defined validation function.
validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``.
'''
property_to_learn = self.get_property_to_learn(training_kwargs)
xyz_derivative_property_to_learn = self.get_xyz_derivative_property_to_learn(training_kwargs)
if property_to_learn == None and xyz_derivative_property_to_learn == None:
property_to_learn = 'y'
if training_kwargs is None:
training_kwargs = {'property_to_learn': 'y'}
else:
training_kwargs['property_to_learn'] = 'y'
property_to_predict = self.get_property_to_predict(prediction_kwargs)
xyz_derivative_property_to_predict = self.get_xyz_derivative_property_to_predict(prediction_kwargs)
if property_to_predict == None and xyz_derivative_property_to_predict == None:
if prediction_kwargs == None: prediction_kwargs = {}
if property_to_learn != None:
property_to_predict = f'estimated_{property_to_learn}'
prediction_kwargs['property_to_predict'] = property_to_predict
if xyz_derivative_property_to_learn != None:
xyz_derivative_property_to_predict = f'estimated_{xyz_derivative_property_to_learn}'
prediction_kwargs['xyz_derivative_property_to_predict'] = xyz_derivative_property_to_predict
estimated_y=None; y=None; estimated_xyz_derivatives=None; xyz_derivatives=None
if type(cv_splits_molecular_databases) == type(None):
self.holdout_validation(subtraining_molecular_database=subtraining_molecular_database,
validation_molecular_database=validation_molecular_database,
training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs)
if property_to_learn != None:
y = validation_molecular_database.get_properties(property_name=property_to_learn)
estimated_y = validation_molecular_database.get_properties(property_name=property_to_predict)
if xyz_derivative_property_to_learn != None:
xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn)
estimated_xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict)
else:
self.cross_validation(cv_splits_molecular_databases=cv_splits_molecular_databases,
training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs)
training_molecular_database = data.molecular_database()
if calculate_CV_split_errors:
nsplits = len(cv_splits_molecular_databases)
CV_y=[None for ii in range(nsplits)]; CV_yest=[None for ii in range(nsplits)]; CV_xyz_derivatives=[None for ii in range(nsplits)]; CV_estimated_xyz_derivatives=[None for ii in range(nsplits)]
for CVsplit in cv_splits_molecular_databases:
training_molecular_database.molecules += CVsplit.molecules
if property_to_learn != None:
y = training_molecular_database.get_properties(property_name=property_to_learn)
estimated_y = training_molecular_database.get_properties(property_name=property_to_predict)
if calculate_CV_split_errors:
CV_y = [] ; CV_yest = []
for ii in range(nsplits):
CV_y.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_learn))
CV_yest.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_predict))
if xyz_derivative_property_to_learn != None:
xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn)
estimated_xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict)
if calculate_CV_split_errors:
CV_xyz_derivatives = [] ; CV_estimated_xyz_derivatives = []
for ii in range(nsplits):
CV_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn))
CV_estimated_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict))
def geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives):
total_rmse = 1
if property_to_learn != None:
total_rmse *= stats.rmse(estimated_y,y)
if xyz_derivative_property_to_learn != None:
total_rmse *= stats.rmse(estimated_xyz_derivatives.reshape(estimated_xyz_derivatives.size),xyz_derivatives.reshape(xyz_derivatives.size))
if property_to_learn != None and xyz_derivative_property_to_learn != None:
total_rmse = np.sqrt(total_rmse)
return total_rmse
if validation_loss_function == None: error = geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives)
else: error = validation_loss_function(**validation_loss_function_kwargs)
self.reset()
if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors:
CV_errors = []
for ii in range(nsplits):
if validation_loss_function == None: CVerror = geomRMSEloc(CV_yest[ii],CV_y[ii],CV_estimated_xyz_derivatives[ii],CV_xyz_derivatives[ii])
else: CVerror = validation_loss_function(**validation_loss_function_kwargs)
CV_errors.append(CVerror)
if debug:
for each in self.hyperparameters.keys():
print(f" Hyperparameter {each} = {self.hyperparameters[each].value}")
print(f" Validation loss: {error}")
if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors:
return error, CV_errors
else:
return error
[文档]
def optimize_hyperparameters(self,
hyperparameters=None,
training_kwargs=None,
prediction_kwargs=None,
cv_splits_molecular_databases=None,
subtraining_molecular_database=None, validation_molecular_database=None,
optimization_algorithm=None, optimization_algorithm_kwargs={},
maximum_evaluations=10000,
validation_loss_function=None, validation_loss_function_kwargs={},
debug=False):
'''
Optimizes hyperparameters by minimizing the validation loss.
By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients.
Arguments:
hyperparameters (list, required): the list with strings - names of hyperparameters. Hyperparameters themselves must be in ``youmodel.hyperparameters`` defined with class instance :class:`hyperparameters <mlatom.models.hyperparameters>` consisting of :class:`hyperparameter <mlatom.models.hyperparameter>` defining the optimization space.
training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function.
prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function.
cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`.
calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error.
subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function.
validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function.
validation_loss_function (function, optional): user-defined validation function.
validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``.
optimization_algorithm (str, required): optimization algorithm. No default, must be specified among: 'grid' ('brute'), 'TPE', 'Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact'.
optimization_algorithm_kwargs (dict, optional): kwargs to be passed to optimization algorithm, e.g., ``{'grid_size': 5}`` (default 9 for the grid search).
maximum_evaluations (int, optional): maximum number of optimization evaluations (default: 10000) supported by all optimizers except for grid search.
Saves the final hyperparameters in ``yourmodel.hyperparameters`` adn validation loss in ``yourmodel.validation_loss``.
'''
def validation_loss(current_hyperparameters):
for ii in range(len(current_hyperparameters)):
self.hyperparameters[hyperparameters[ii]].value = current_hyperparameters[ii]
return self.calculate_validation_loss( training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs,
cv_splits_molecular_databases=cv_splits_molecular_databases,
subtraining_molecular_database=subtraining_molecular_database,
validation_molecular_database=validation_molecular_database,
validation_loss_function=validation_loss_function, validation_loss_function_kwargs=validation_loss_function_kwargs,
debug=debug)
import tempfile
with tempfile.TemporaryDirectory() as tmpdirname:
saved_name = self.model_file
self.model_file = f'{tmpdirname}/{saved_name}'
if optimization_algorithm.casefold() in [mm.casefold() for mm in ['Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact']]:
import scipy.optimize
import numpy as np
initial_hyperparameters = np.array([self.hyperparameters[key].value for key in hyperparameters])
bounds = np.array([[self.hyperparameters[key].minval, self.hyperparameters[key].maxval] for key in hyperparameters])
res = scipy.optimize.minimize(validation_loss, initial_hyperparameters, method=optimization_algorithm, bounds=bounds,
options={'xatol': 1e-8, 'disp': True, 'maxiter': maximum_evaluations})
for ii in range(len(res.x)):
self.hyperparameters[hyperparameters[ii]].value = res.x[ii]
elif optimization_algorithm.casefold() in [mm.casefold() for mm in ['grid', 'brute']]:
import scipy.optimize
import numpy as np
grid_slices = []
for key in hyperparameters:
if 'grid_size' in optimization_algorithm_kwargs.keys():
grid_size = optimization_algorithm_kwargs['grid_size']
else:
grid_size=9
if self.hyperparameters[key].optimization_space == 'linear': grid_slices.append(list(np.linspace(self.hyperparameters[key].minval, self.hyperparameters[key].maxval, num=grid_size)))
if self.hyperparameters[key].optimization_space == 'log': grid_slices.append(list(np.logspace(np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval), num=grid_size, base=np.exp(1))))
params, _ = optimize_grid(validation_loss, grid_slices)
for ii in range(len(params)):
self.hyperparameters[hyperparameters[ii]].value = params[ii]
elif optimization_algorithm.lower() == 'tpe':
import hyperopt
import numpy as np
from hyperopt.std_out_err_redirect_tqdm import DummyTqdmFile
def fileno(self):
if self.file.name == '<stdin>':
return 0
elif self.file.name == '<stdout>':
return 1
elif self.file.name == '<stderr>':
return 2
else:
return 3
DummyTqdmFile.fileno = fileno
validation_loss_wraper_for_hyperopt = lambda d: validation_loss([d[k] for k in hyperparameters])
space_mapping = {'linear': hyperopt.hp.uniform, 'log': hyperopt.hp.loguniform, 'normal': hyperopt.hp.normal, 'lognormal': hyperopt.hp.lognormal, 'discrete': hyperopt.hp.quniform, 'discretelog': hyperopt.hp.qloguniform, 'discretelognormal': hyperopt.hp.qlognormal, 'choices': hyperopt.hp.choice}
def get_space(key):
space_type = self.hyperparameters[key].optimization_space
if space_type in ['log']:
args = [np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval)]
elif space_type in ['linear']:
args = [self.hyperparameters[key].minval, self.hyperparameters[key].maxval]
else:
raise NotImplementedError
return space_mapping[space_type](key, *args)
space = {key: get_space(key) for key in hyperparameters}
res = hyperopt.fmin(fn=validation_loss_wraper_for_hyperopt, space=space, algo=hyperopt.tpe.suggest, max_evals=maximum_evaluations, show_progressbar=True)#, points_to_evaluate=initial_hyperparameters
for k, v in res.items():
self.hyperparameters[k].value = v
self.model_file = saved_name
# Use the final hyperparameters to train the model and get the validation errors
self.validation_loss = validation_loss(np.array([self.hyperparameters[key].value for key in hyperparameters]))
def holdout_validation(self, subtraining_molecular_database=None, validation_molecular_database=None,
training_kwargs=None, prediction_kwargs=None):
if type(training_kwargs) == type(None): training_kwargs = {}
if type(prediction_kwargs) == type(None): prediction_kwargs = {}
self.train(molecular_database=subtraining_molecular_database, **training_kwargs)
self.predict(molecular_database = validation_molecular_database, **prediction_kwargs)
def cross_validation(self, cv_splits_molecular_databases=None,
training_kwargs=None, prediction_kwargs=None):
if type(training_kwargs) == type(None): training_kwargs = {}
if type(prediction_kwargs) == type(None): prediction_kwargs = {}
nsplits = len(cv_splits_molecular_databases)
for ii in range(nsplits):
subtraining_molecular_database = data.molecular_database()
for jj in range(nsplits):
if ii != jj: subtraining_molecular_database.molecules += cv_splits_molecular_databases[jj].molecules
validation_molecular_database = cv_splits_molecular_databases[ii]
self.reset()
self.train(molecular_database=subtraining_molecular_database, **training_kwargs)
self.predict(molecular_database=validation_molecular_database, **prediction_kwargs)
def get_property_to_learn(self, training_kwargs=None):
if type(training_kwargs) == type(None):
property_to_learn = None
else:
if 'property_to_learn' in training_kwargs:
property_to_learn = training_kwargs['property_to_learn']
else:
property_to_learn = None
return property_to_learn
def get_xyz_derivative_property_to_learn(self, training_kwargs=None):
if type(training_kwargs) == type(None):
xyz_derivative_property_to_learn = None
else:
if 'xyz_derivative_property_to_learn' in training_kwargs:
xyz_derivative_property_to_learn = training_kwargs['xyz_derivative_property_to_learn']
else:
xyz_derivative_property_to_learn = None
return xyz_derivative_property_to_learn
def get_property_to_predict(self, prediction_kwargs=None):
if type(prediction_kwargs) != type(None):
if 'property_to_predict' in prediction_kwargs:
property_to_predict = prediction_kwargs['property_to_predict']
else:
if 'calculate_energy' in prediction_kwargs:
property_to_predict = 'estimated_energy'
else:
property_to_predict = 'estimated_y'
else:
property_to_predict = None
return property_to_predict
def get_xyz_derivative_property_to_predict(self,prediction_kwargs=None):
if type(prediction_kwargs) != type(None):
if 'xyz_derivative_property_to_predict' in prediction_kwargs:
xyz_derivative_property_to_predict = prediction_kwargs['xyz_derivative_property_to_predict']
else:
if 'calculate_energy_gradients' in prediction_kwargs:
xyz_derivative_property_to_predict = 'estimated_energy_gradients'
else:
xyz_derivative_property_to_predict = 'estimated_xyz_derivatives_y'
else:
xyz_derivative_property_to_predict = None
return xyz_derivative_property_to_predict
def optimize_grid(func, grid):
'''
Optimizes on the given grid by finding parameters (provided by grid) leading to the minimum value of the given function.
'''
last = True
for ii in grid[:-1]:
if len(ii) != 1:
last = False
break
if last:
other_params = [jj[0] for jj in grid[:-1]]
opt_param = grid[-1][0]
min_val = func(other_params + [opt_param])
for param in grid[-1][1:]:
val = func(other_params + [param])
if val < min_val:
opt_param = param
min_val = val
return other_params + [opt_param], min_val
else:
min_val = None
for kk in range(len(grid))[:-1]:
if len(grid[kk]) != 1:
if kk == 0: other_params_left = []
else: other_params_left = [[grid[ii][0]] for ii in range(kk)]
other_params_right = grid[kk+1:]
for param in grid[kk]:
params, val = optimize_grid(func,other_params_left + [[param]] + other_params_right)
if min_val == None:
min_val = val
opt_params = params
elif val < min_val:
opt_params = params
min_val = val
break
return opt_params, min_val
class krr(ml_model):
def train(self, molecular_database=None,
property_to_learn='y',
xyz_derivative_property_to_learn = None,
save_model=True,
invert_matrix=False,
matrix_decomposition=None,
kernel_function_kwargs=None,prior=None):
xyz = np.array([mol.xyz_coordinates for mol in molecular_database.molecules]).astype(float)
yy = molecular_database.get_properties(property_name=property_to_learn)
if prior == None:
self.prior = 0.0
elif type(prior) == float or type(prior) == int:
self.prior = prior
elif prior.casefold() == 'mean'.casefold():
self.prior = np.mean(yy)
else:
stopper.stopMLatom(f'Unsupported prior: {prior}')
yy = yy - self.prior
#self.train_x = xx
self.Ntrain = len(xyz)
self.train_xyz = xyz
self.kernel_function = self.gaussian_kernel_function
self.kernel_function_kwargs = kernel_function_kwargs
self.kernel_matrix_size = 0
Natoms = len(xyz[0])
self.Natoms = Natoms
self.train_property = False
self.train_xyz_derivative_property = False
yyref = np.array([])
if property_to_learn != None:
self.train_property = True
self.kernel_matrix_size += len(yy)
yyref = np.concatenate((yyref,yy))
if xyz_derivative_property_to_learn != None:
self.train_xyz_derivative_property = True
self.kernel_matrix_size += 3*Natoms*len(yy)
yygrad = molecular_database.get_xyz_derivative_properties().reshape(3*Natoms*len(yy))
yyref = np.concatenate((yyref,yygrad))
kernel_matrix = np.zeros((self.kernel_matrix_size,self.kernel_matrix_size))
kernel_matrix = kernel_matrix + np.identity(self.kernel_matrix_size)*self.hyperparameters['lambda'].value
for ii in range(len(yy)):
value_and_derivatives = self.kernel_function(xyz[ii],xyz[ii],calculate_value=self.train_property,calculate_gradients=self.train_xyz_derivative_property,calculate_Hessian=self.train_xyz_derivative_property,**self.kernel_function_kwargs)
kernel_matrix[ii][ii] += value_and_derivatives['value']
if self.train_xyz_derivative_property:
kernel_matrix[ii,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['gradients'].reshape(3*Natoms)
kernel_matrix[len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['Hessian']
for jj in range(ii+1,len(yy)):
value_and_derivatives = self.kernel_function(xyz[ii],xyz[jj],calculate_value=self.train_property,calculate_gradients=self.train_xyz_derivative_property,calculate_Hessian=self.train_xyz_derivative_property,calculate_gradients_j=self.train_xyz_derivative_property,**self.kernel_function_kwargs)
kernel_matrix[ii][jj] += value_and_derivatives['value']
kernel_matrix[jj][ii] += value_and_derivatives['value']
if self.train_xyz_derivative_property:
kernel_matrix[jj,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] = value_and_derivatives['gradients'].reshape(3*Natoms)
kernel_matrix[ii,len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms] = value_and_derivatives['gradients_j'].reshape(3*Natoms)
kernel_matrix[len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms,len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms] += value_and_derivatives['Hessian']
kernel_matrix[len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['Hessian']
if self.train_xyz_derivative_property:
kernel_matrix[len(yy):,:len(yy)] = kernel_matrix[:len(yy),len(yy):].T
if invert_matrix:
self.alphas = np.dot(np.linalg.inv(kernel_matrix), yyref)
else:
from scipy.linalg import cho_factor, cho_solve, lu_factor, lu_solve
if matrix_decomposition==None:
try:
c, low = cho_factor(kernel_matrix, overwrite_a=True, check_finite=False)
self.alphas = cho_solve((c, low), yyref, check_finite=False)
except:
c, low = lu_factor(kernel_matrix, overwrite_a=True, check_finite=False)
self.alphas = lu_solve((c, low), yyref, check_finite=False)
elif matrix_decomposition.casefold()=='Cholesky'.casefold():
c, low = cho_factor(kernel_matrix, overwrite_a=True, check_finite=False)
self.alphas = cho_solve((c, low), yyref, check_finite=False)
elif matrix_decomposition.casefold()=='LU'.casefold():
c, low = lu_factor(kernel_matrix, overwrite_a=True, check_finite=False)
self.alphas = lu_solve((c, low), yyref, check_finite=False)
def predict(self, molecular_database=None, molecule=None,
calculate_energy=False, calculate_energy_gradients=False, calculate_hessian=False, # arguments if KREG is used as MLP ; hessian not implemented (possible with numerical differentiation)
property_to_predict = None, xyz_derivative_property_to_predict = None,
hessian_to_predict=None,):
molDB, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict = \
super().predict(molecular_database=molecular_database, molecule=molecule, calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, calculate_hessian=calculate_hessian, property_to_predict = property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict, hessian_to_predict = hessian_to_predict)
Natoms = len(molDB.molecules[0].atoms)
kk_size = 0
if self.train_property:
kk_size += self.Ntrain
if self.train_xyz_derivative_property:
kk_size += self.Ntrain * Natoms*3
for mol in molDB.molecules:
kk = np.zeros(kk_size)
kk_der = np.zeros((kk_size,3*Natoms))
for ii in range(self.Ntrain):
value_and_derivatives = self.kernel_function(self.train_xyz[ii],mol.xyz_coordinates,calculate_gradients=self.train_xyz_derivative_property,**self.kernel_function_kwargs)
kk[ii] = value_and_derivatives['value']
if self.train_xyz_derivative_property:
kk[self.Ntrain+ii*3*Natoms:self.Ntrain+(ii+1)*3*Natoms] = value_and_derivatives['gradients'].reshape(3*Natoms)
if xyz_derivative_property_to_predict:
value_and_derivatives = self.kernel_function(mol.xyz_coordinates,self.train_xyz[ii],calculate_gradients=bool(xyz_derivative_property_to_predict),calculate_Hessian=self.train_xyz_derivative_property,**self.kernel_function_kwargs)
kk_der[ii] = value_and_derivatives['gradients'].reshape(3*Natoms)
if self.train_xyz_derivative_property:
kk_der[self.Ntrain+ii*3*Natoms:self.Ntrain+(ii+1)*3*Natoms,:] = value_and_derivatives['Hessian'].T
gradients = np.matmul(self.alphas.reshape(1,len(self.alphas)),kk_der)[0]
mol.__dict__[property_to_predict] = np.sum(np.multiply(self.alphas, kk))+self.prior
for iatom in range(len(mol.atoms)):
mol.atoms[iatom].__dict__[xyz_derivative_property_to_predict] = gradients[3*iatom:3*iatom+3]
def gaussian_kernel_function(self,coordi,coordj,calculate_value=True,calculate_gradients=False,calculate_gradients_j=False,calculate_Hessian=False,**kwargs):
import torch
if 'Req' in kwargs:
Req = kwargs['Req']
if calculate_Hessian:
calculate_gradients = True
calculate_value = True
elif calculate_gradients:
calculate_value = True
coordi_tensor = torch.tensor(coordi,requires_grad=True)
coordj_tensor = torch.tensor(coordj,requires_grad=True)
value = None
gradients = None
gradients_j = None
Hessian = None
xi_tensor = self.RE_descriptor_tensor(coordi_tensor,Req)
xj_tensor = self.RE_descriptor_tensor(coordj_tensor,Req)
if calculate_value:
value_tensor = torch.exp(torch.sum(torch.square(xi_tensor - xj_tensor))/(-2*self.hyperparameters['sigma'].value**2))
value = value_tensor.detach().numpy()
if calculate_gradients:
gradients_tensor = torch.autograd.grad(value_tensor,coordi_tensor,create_graph=True,retain_graph=True)[0]
gradients = gradients_tensor.detach().numpy()
if calculate_gradients_j:
gradients_j_tensor = torch.autograd.grad(value_tensor,coordj_tensor,create_graph=True,retain_graph=True)[0]
gradients_j = gradients_j_tensor.detach().numpy()
if calculate_Hessian:
Hessian = []
gradients_tensor = gradients_tensor.reshape(len(gradients_tensor)*3)
for ii in range(len(gradients_tensor)):
Hessian.append(torch.autograd.grad(gradients_tensor[ii],coordj_tensor,create_graph=True,retain_graph=True)[0].detach().numpy().reshape((len(gradients_tensor))))
Hessian = np.array(Hessian)
output = {'value':value,'gradients':gradients,'gradients_j':gradients_j,'Hessian':Hessian}
return output
def RE_descriptor_tensor(self,coord_tensor,Req):
import torch
Natoms = len(coord_tensor)
icount = 0
for iatom in range(Natoms):
for jatom in range(iatom+1,Natoms):
output = Req[icount] / self.distance_tensor(coord_tensor[iatom],coord_tensor[jatom])
if icount == 0:
descriptor = output.reshape(1)
else:
descriptor = torch.cat((descriptor,output.reshape(1)))
icount += 1
return descriptor
def distance_tensor(self, atomi,atomj):
import torch
return torch.sqrt(self.distance_squared_tensor(atomi,atomj))
def distance_squared_tensor(self, atomi,atomj):
import torch
return torch.sum(torch.square(atomi-atomj))
[文档]
class hyperparameter():
'''
Class of hyperparameter object, containing data could be used in hyperparameter optimizations.
Arguments:
value (Any, optional): The value of the hyperparameter.
optimization_space (str, optional): Defines the space for hyperparameter. Currently supports ``'linear'``, and ``'log'``.
dtype (Callable, optional): A callable object that forces the data type of value. Automatically choose one if set to ``None``.
'''
def __init__(self, value: Any = None, optimization_space: str = 'linear', dtype: Union[Callable, None] = None, name: str = "", minval: Any = None, maxval: Any = None, step: Any = None, choices: Iterable[Any] = [], **kwargs):
self.name = name
self.dtype = dtype if dtype else None if value is None else type(value)
self.value = value# @Yifan
self.optimization_space = optimization_space # 'linear' or 'log'
self.minval = minval
self.maxval = maxval
self.step = step
self.choices = choices
def __setattr__(self, key, value):
if key == 'value':
value = (value if isinstance(value, self.dtype) else self._cast_dtype(value)) if self.dtype else value
if key == 'dtype':
self._set_dtype_cast_method(value)
super().__setattr__(key, value)
def __repr__(self):
return f'hyperparameter {str(self.__dict__)}'
def _set_dtype_cast_method(self, dtype):
if type(dtype) == tuple:
dtype = dtype[0]
if dtype == np.ndarray:
self._cast_dtype = np.array
else:
self._cast_dtype = dtype
[文档]
def update(self, new_hyperparameter:hyperparameter) -> None:
'''
Update hyperparameter with data in another instance.
Arguments:
new_hyperparameter (:class:`mlatom.models.hyperparamters`): Whose data are to be applied to the current instance.
'''
self.__dict__.update(new_hyperparameter.__dict__)
[文档]
def copy(self):
'''
Returns a copy of current instance.
Returns:
:class:`mlatom.models.hyperparamter`: a new instance copied from current one.
'''
return hyperparameter(**self.__dict__)
[文档]
class hyperparameters(UserDict):
'''
Class for storing hyperparameters, values are auto-converted to :class:`mlatom.models.hyperparameter` objects.
Inherit from collections.UserDict.
Initiaion:
Initiate with a dictinoary or kwargs or both.
e.g.:
.. code-block::
hyperparamters({'a': 1.0}, b=hyperparameter(value=2, minval=0, maxval=4))
'''
def __setitem__(self, key, value):
if isinstance(value, hyperparameter):
if key in self:
super().__getitem__(key).update(value)
else:
super().__setitem__(key, value)
elif key in self:
super().__getitem__(key).value = value
else:
super().__setitem__(key, hyperparameter(value=value, name=key))
def __getattr__(self, key):
if key in self:
return self[key].value
else:
return self.__dict__[key]
def __setattr__(self, key, value):
if key.startswith('__') or (key in self.__dict__) or key == 'data':
super().__setattr__(key, value)
else:
self.__setitem__(key, value)
def __getstate__(self):
return vars(self)
def __setstate__(self, state):
vars(self).update(state)
[文档]
def copy(self, keys: Union[Iterable[str], None] = None) -> hyperparameters:
'''
Returns a copy of current instance.
Arguments:
keys (Iterable[str], optional): If keys provided, only the hyperparameters selected by keys will be copied, instead of all hyperparameters.
Returns:
:class:`mlatom.models.hyperparamters`: a new instance copied from current one.
'''
if keys is None:
keys = self.keys()
return hyperparameters({key: self[key].copy() for key in keys})
[文档]
class kreg(krr, OMP_model, MKL_model):
'''
Create a KREG model object.
Arguments:
model_file (str, optional): The name of the file where the model should be dumped to or loaded from.
ml_program (str, optional): Specify which ML program to use. Avaliable options: ``'KREG_API'``, ``'MLatomF``.
equilibrium_molecule (:class:`mlatom.data.molecule` | None): Specify the equilibrium geometry to be used to generate RE descriptor. The geometry with lowest energy/value will be selected if set to ``None``.
prior (default - None): the prior can be 'mean', None (0.0), or any floating point number.
hyperparameters (Dict[str, Any] | :class:`mlatom.models.hyperparameters`, optional): Updates the hyperparameters of the model with provided.
'''
hyperparameters = hyperparameters({'lambda': hyperparameter(value=2**-35,
minval=2**-35,
maxval=1.0,
optimization_space='log',
name='lambda'),
'sigma': hyperparameter(value=1.0,
minval=2**-5,
maxval=2**9,
optimization_space='log',
name='sigma')})
def __init__(self, model_file: Union[str, None] = None, ml_program: str = 'KREG_API', equilibrium_molecule: Union[data.molecule, None] = None, prior: float = 0, nthreads: Union[int, None] = None, hyperparameters: Union[Dict[str,Any], hyperparameters]={}):
self.model_file = model_file
self.equilibrium_molecule = equilibrium_molecule
self.ml_program = ml_program
if self.ml_program.casefold() == 'KREG_API'.casefold():
from .kreg_api import KREG_API
if self.model_file != None:
if self.model_file[-4:] != '.npz':
self.model_file += '.npz'
if os.path.exists(self.model_file):
self.kreg_api = KREG_API()
self.kreg_api.load_model(self.model_file)
if self.ml_program.casefold() == 'MLatomF'.casefold():
from . import interface_MLatomF
self.interface_mlatomf = interface_MLatomF
self.hyperparameters = self.hyperparameters.copy()
self.hyperparameters.update(hyperparameters)
self.nthreads = nthreads
def parse_args(self, args):
super().parse_args(args)
for hyperparam in self.hyperparameters:
if hyperparam in args.hyperparameter_optimization['hyperparameters']:
self.parse_hyperparameter_optimization(args, hyperparam)
elif hyperparam in args.data:
if args.data[hyperparam].lower() == 'opt':
self.hyperparameters[hyperparam].dtype = str
self.hyperparameters[hyperparam].value = args.data[hyperparam]
if args.eqXYZfileIn:
self.equilibrium_molecule = data.molecule.from_xyz_file(args.eqXYZfileIn)
[文档]
def generate_model_dict(self):
model_dict = super().generate_model_dict()
model_dict['kwargs']['ml_program'] = self.ml_program
return model_dict
def get_descriptor(self, molecule=None, molecular_database=None, descriptor_name='re', equilibrium_molecule=None):
if molecular_database != None:
molDB = molecular_database
elif molecule != None:
molDB = data.molecular_database()
molDB.molecules.append(molecule)
else:
errmsg = 'Either molecule or molecular_database should be provided in input'
raise ValueError(errmsg)
if 'reference_distance_matrix' in self.__dict__:
eq_distmat = self.reference_distance_matrix
else:
if equilibrium_molecule == None:
if 'energy' in molDB.molecules[0].__dict__:
energies = molDB.get_properties(property_name='energy')
equilibrium_molecule = molDB.molecules[np.argmin(energies)]
elif 'y' in molecular_database.molecules[0].__dict__:
y = molecular_database.get_properties(property_name='y')
equilibrium_molecule = molecular_database.molecules[np.argmin(y)]
else:
errmsg = 'equilibrium molecule is not provided and no energies are found in molecular database'
raise ValueError(errmsg)
eq_distmat = equilibrium_molecule.get_internuclear_distance_matrix()
self.reference_distance_matrix = eq_distmat
natoms = len(molDB.molecules[0].atoms)
for mol in molDB.molecules:
descriptor = np.zeros(int(natoms*(natoms-1)/2))
distmat = mol.get_internuclear_distance_matrix()
ii = -1
for iatomind in range(natoms):
for jatomind in range(iatomind+1,natoms):
ii += 1
descriptor[ii] = eq_distmat[iatomind][jatomind]/distmat[iatomind][jatomind]
mol.__dict__[descriptor_name] = descriptor
mol.descriptor = descriptor
def get_equilibrium_distances(self,molecular_database=None):
if self.equilibrium_molecule == None: self.equilibrium_molecule = self.get_equilibrium_molecule(molecular_database=molecular_database)
eq_distmat = self.equilibrium_molecule.get_internuclear_distance_matrix()
Req = np.array([])
for ii in range(len(eq_distmat)):
Req = np.concatenate((Req,eq_distmat[ii][ii+1:]))
return Req
def get_equilibrium_molecule(self,molecular_database=None):
if 'energy' in molecular_database.molecules[0].__dict__:
energies = molecular_database.get_properties(property_name='energy')
equilibrium_molecule = molecular_database.molecules[np.argmin(energies)]
elif 'y' in molecular_database.molecules[0].__dict__:
y = molecular_database.get_properties(property_name='y')
equilibrium_molecule = molecular_database.molecules[np.argmin(y)]
else:
errmsg = 'equilibrium molecule is not provided and no energies are found in molecular database'
raise ValueError(errmsg)
return equilibrium_molecule
[文档]
def train(self, molecular_database=None,
property_to_learn=None,
xyz_derivative_property_to_learn = None,
save_model=True,
invert_matrix=False,
matrix_decomposition=None,
prior=None,
hyperparameters: Union[Dict[str,Any], hyperparameters] = {},):
'''
Train the KREG model with molecular database provided.
Arguments:
molecular_database (:class:`mlatom.data.molecular_database`): The database of molecules for training.
property_to_learn (str, optional): The label of property to be learned in model training.
xyz_derivative_property_to_learn (str, optional): The label of XYZ derivative property to be learned.
prior (str or float or int, optional): default zero prior. It can also be 'mean' and any user-defined number.
'''
self.hyperparameters.update(hyperparameters)
if self.ml_program.casefold() == 'MLatomF'.casefold():
mlatomfargs = ['createMLmodel'] + ['%s=%s' % (param, self.hyperparameters[param].value) for param in self.hyperparameters.keys()]
if save_model:
if self.model_file == None:
self.model_file =f'kreg_{str(uuid.uuid4())}.unf'
with tempfile.TemporaryDirectory() as tmpdirname:
molecular_database.write_file_with_xyz_coordinates(filename = f'{tmpdirname}/train.xyz')
mlatomfargs.append(f'XYZfile={tmpdirname}/train.xyz')
if property_to_learn != None:
molecular_database.write_file_with_properties(filename = f'{tmpdirname}/y.dat', property_to_write = property_to_learn)
mlatomfargs.append(f'Yfile={tmpdirname}/y.dat')
if xyz_derivative_property_to_learn != None:
molecular_database.write_file_with_xyz_derivative_properties(filename = f'{tmpdirname}/ygrad.xyz', xyz_derivative_property_to_write = xyz_derivative_property_to_learn)
mlatomfargs.append(f'YgradXYZfile={tmpdirname}/ygrad.xyz')
if prior != None:
mlatomfargs.append(f'prior={prior}')
mlatomfargs.append(f'MLmodelOut={self.model_file}')
if 'additional_mlatomf_args' in self.__dict__:
mlatomfargs += self.additional_mlatomf_args
self.interface_mlatomf.ifMLatomCls.run(mlatomfargs, shutup=True)
elif self.ml_program.casefold() == 'KREG_API'.casefold():
from .kreg_api import KREG_API
if save_model:
if self.model_file == None:
self.model_file = f'kreg_{str(uuid.uuid4())}.npz'
else:
if self.model_file[-4:] != '.npz':
self.model_file += '.npz'
self.kreg_api = KREG_API()
self.kreg_api.nthreads = self.nthreads
if self.equilibrium_molecule == None: self.equilibrium_molecule = self.get_equilibrium_molecule(molecular_database=molecular_database)
if not 'lambdaGradXYZ' in self.hyperparameters.keys(): lambdaGradXYZ = self.hyperparameters['lambda'].value
else: lambdaGradXYZ = self.hyperparameters['lambdaGradXYZ'].value
if 'prior' in self.hyperparameters.keys(): prior = self.hyperparameters['prior'].value
self.kreg_api.train(self.hyperparameters['sigma'].value,self.hyperparameters['lambda'].value,lambdaGradXYZ,molecular_database,self.equilibrium_molecule,property_to_learn=property_to_learn,xyz_derivative_property_to_learn=xyz_derivative_property_to_learn,prior=prior)
if save_model:
self.kreg_api.save_model(self.model_file)
else:
if 'reference_distance_matrix' in self.__dict__: del self.__dict__['reference_distance_matrix']
Req = self.get_equilibrium_distances(molecular_database=molecular_database)
super().train(molecular_database=molecular_database, property_to_learn=property_to_learn,
invert_matrix=invert_matrix,
matrix_decomposition=matrix_decomposition,
kernel_function_kwargs={'Req':Req},prior=prior)
[文档]
def predict(self, molecular_database=None, molecule=None,
calculate_energy=False, calculate_energy_gradients=False, calculate_hessian=False, # arguments if KREG is used as MLP ; hessian not implemented (possible with numerical differentiation)
property_to_predict = None, xyz_derivative_property_to_predict = None, hessian_to_predict = None,):
if self.ml_program.casefold() != 'MLatomF'.casefold() and self.ml_program.casefold() != 'KREG_API'.casefold():
super().predict(molecular_database=molecular_database, molecule=molecule,
calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, property_to_predict=property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict)
else:
molDB, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict = \
super(krr, self).predict(molecular_database=molecular_database, molecule=molecule, calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, calculate_hessian=calculate_hessian, property_to_predict = property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict, hessian_to_predict = hessian_to_predict)
if self.ml_program.casefold() == 'KREG_API'.casefold():
if 'kreg_api' in dir(self):
self.kreg_api.nthreads = self.nthreads
self.kreg_api.predict(molecular_database=molecular_database,molecule=molecule,property_to_predict=property_to_predict,xyz_derivative_property_to_predict=xyz_derivative_property_to_predict)
else:
stopper.stopMLatom('KREG_API model not found')
elif self.ml_program.casefold() == 'MLatomF'.casefold():
with tempfile.TemporaryDirectory() as tmpdirname:
molDB.write_file_with_xyz_coordinates(filename = f'{tmpdirname}/predict.xyz')
mlatomfargs = ['useMLmodel', 'MLmodelIn=%s' % self.model_file]
mlatomfargs.append(f'XYZfile={tmpdirname}/predict.xyz')
if property_to_predict: mlatomfargs.append(f'YestFile={tmpdirname}/yest.dat')
if xyz_derivative_property_to_predict: mlatomfargs.append(f'YgradXYZestFile={tmpdirname}/ygradest.xyz')
self.interface_mlatomf.ifMLatomCls.run(mlatomfargs, shutup=True)
if not os.path.exists(f'{tmpdirname}/yest.dat'):
import time
time.sleep(0.0000001) # Sometimes program needs more time to write file yest.dat / P.O.D., 2023-04-18
os.system(f'cp -r {tmpdirname} .')
if property_to_predict != None: molDB.add_scalar_properties_from_file(filename = f'{tmpdirname}/yest.dat', property_name = property_to_predict)
if xyz_derivative_property_to_predict != None: molDB.add_xyz_derivative_properties_from_file(filename = f'{tmpdirname}/ygradest.xyz', xyz_derivative_property = xyz_derivative_property_to_predict)
[文档]
def ani(**kwargs):
'''
Returns an ANI model object (see :class:`mlatom.interfaces.torchani_interface.ani`).
'''
from .interfaces.torchani_interface import ani
return ani(**kwargs)
def msani(**kwargs):
'''
Returns an MS-ANI model object (see :class:`mlatom.interfaces.torchani_interface.msani`).
'''
from .interfaces.torchani_interface import msani
return msani(**kwargs)
[文档]
def dpmd(**kwargs):
'''
Returns a DPMD model object (see :class:`mlatom.interfaces.dpmd_interface.dpmd`).
'''
from .interfaces.dpmd_interface import dpmd
return dpmd(**kwargs)
[文档]
def gap(**kwargs):
'''
Returns a GAP model object (see :class:`mlatom.interfaces.gap_interface.gap`).
'''
from .interfaces.gap_interface import gap
return gap(**kwargs)
[文档]
def physnet(**kwargs):
'''
Returns a PhysNet model object (see :class:`mlatom.interfaces.physnet_interface.physnet`).
'''
from .interfaces.physnet_interface import physnet
return physnet(**kwargs)
[文档]
def sgdml(**kwargs):
'''
Returns an sGDML model object (see :class:`mlatom.interfaces.sgdml_interface.sgdml`).
'''
from .interfaces.sgdml_interface import sgdml
return sgdml(**kwargs)
[文档]
def mace(**kwargs):
'''
Returns an MACE model object (see :class:`mlatom.interfaces.mace_interface.mace`).
'''
from .interfaces.mace_interface import mace
return mace(**kwargs)
[文档]
class model_tree_node(model):
'''
Create a model tree node.
Arguments:
name (str): The name assign to the object.
parent: The parent of the model node.
children: The children of this model tree node.
operator: Specify the operation to be made when making predictions.
'''
def __init__(self, name=None, parent=None, children=None, operator=None, model=None):
self.name = name
self.parent = parent
self.children = children
if self.parent != None:
if self.parent.children == None: self.parent.children = []
if not self in self.parent.children:
self.parent.children.append(self)
if self.children != None:
for child in self.children:
child.parent=self
self.operator = operator
self.model = model
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
if self.children != None:
for child in self.children:
child.set_num_threads(self.nthreads)
else:
self.model.set_num_threads(self.nthreads)
[文档]
def predict(self, **kwargs):
molDB = super().predict(**kwargs)
if len(molDB) == 0: return
if 'calculate_energy' in kwargs: calculate_energy = kwargs['calculate_energy']
else: calculate_energy = True
if 'calculate_energy_gradients' in kwargs: calculate_energy_gradients = kwargs['calculate_energy_gradients']
else: calculate_energy_gradients = False
if 'calculate_hessian' in kwargs: calculate_hessian = kwargs['calculate_hessian']
else: calculate_hessian = False
if 'nstates' in kwargs: nstates = kwargs['nstates']
else: nstates = 1
if 'current_state' in kwargs: current_state = kwargs['current_state']
else: current_state = 0
properties = [] ; atomic_properties = []
if calculate_energy: properties.append('energy')
if calculate_energy_gradients: atomic_properties.append('energy_gradients')
if calculate_hessian: properties.append('hessian')
for mol in molDB.molecules:
if nstates:
mol_copy = mol.copy()
mol_copy.electronic_states = []
if nstates >1:
for _ in range(nstates - len(mol.electronic_states)):
mol.electronic_states.append(mol_copy.copy())
for mol_el_st in mol.electronic_states:
if not self.name in mol_el_st.__dict__:
parent = None
if self.parent != None:
if self.parent.name in mol_el_st.__dict__:
parent = mol_el_st.__dict__[self.parent.name]
children = None
if self.children != None:
for child in self.children:
if child.name in mol_el_st.__dict__:
if children == None: children = []
children.append(mol_el_st.__dict__[child.name])
mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
if not self.name in mol.__dict__:
parent = None
if self.parent != None:
if self.parent.name in mol.__dict__:
parent = mol.__dict__[self.parent.name]
children = None
if self.children != None:
for child in self.children:
if child.name in mol.__dict__:
if children == None: children = []
children.append(mol.__dict__[child.name])
mol.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
if self.children == None and self.operator == 'predict':
self.model.predict(**kwargs)
for mol in molDB.molecules:
if not mol.electronic_states:
self.get_properties_from_molecule(mol, properties, atomic_properties)
for mol_el_st in mol.electronic_states:
# mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
self.get_properties_from_molecule(mol_el_st, properties, atomic_properties)
else:
for child in self.children:
child.predict(**kwargs)
if 'weight' in child.__dict__.keys():
mol.__dict__[child.name].__dict__['weight'] = child.weight
if self.operator == 'sum':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].sum(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].sum(properties+atomic_properties)
if self.operator == 'weighted_sum':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].weighted_sum(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].weighted_sum(properties+atomic_properties)
if self.operator == 'average':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].average(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].average(properties+atomic_properties)
if self.parent == None:
self.update_molecular_properties(molecular_database=molDB, properties=properties, atomic_properties=atomic_properties, current_state=current_state)
def get_properties_from_molecule(self, molecule, properties=[], atomic_properties=[]):
property_values = molecule.__dict__[self.name].__dict__
for property_name in properties:
if property_name in molecule.__dict__: property_values[property_name] = molecule.__dict__.pop(property_name)
for property_name in atomic_properties:
property_values[property_name] = []
for atom in molecule.atoms:
property_values[property_name].append(atom.__dict__.pop(property_name))
property_values[property_name] = np.array(property_values[property_name]).astype(float)
def update_molecular_properties(self, molecular_database=None, molecule=None, properties=[], atomic_properties=[], current_state=0):
molDB = molecular_database
if molecule != None:
molDB = data.molecular_database()
molDB.molecules.append(molecule)
for mol in molDB.molecules:
for property_name in properties:
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name]
if not mol.electronic_states:
mol.__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name]
else:
mol.__dict__[property_name] = mol.electronic_states[current_state].__dict__[property_name]
for property_name in atomic_properties:
for mol_el_st in mol.electronic_states:
for iatom in range(len(mol_el_st.atoms)):
mol_el_st.atoms[iatom].__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name][iatom]
if not mol.electronic_states:
for iatom in range(len(mol.atoms)):
mol.atoms[iatom].__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name][iatom]
else:
for iatom in range(len(mol.atoms)):
mol.atoms[iatom].__dict__[property_name] = mol.electronic_states[current_state].atoms[iatom].__dict__[property_name]
[文档]
def dump(self, filename=None, format='json'):
'''
Dump the object to a file.
'''
model_dict = {
'type': 'model_tree_node',
'name': self.name,
'children': [child.dump(format='dict') for child in self.children] if self.children else None,
'operator': self.operator,
'model': self.model.dump(format='dict') if self.model else None,
'nthreads': self.nthreads,
'weight': self.weight if 'weight' in self.__dict__ else None
}
if format == 'json':
import json
with open(filename, 'w') as f:
json.dump(model_dict, f, indent=4)
if format == 'dict':
return model_dict
def load(filename, format=None):
'''
Load a saved model object.
'''
if filename[-5:] == '.json' or format == 'json':
try:
return load_json(filename)
except:
pass
if filename[-5:] == '.npz' or format == 'npz':
try:
return load_npz(filename)
except:
pass
else:
return load_pickle(filename)
def load_json(filename):
import json
with open(filename) as f:
model_dict = json.load(f)
return load_dict(model_dict)
def load_npz(filename):
pass
def load_pickle(filename):
import pickle
with open(filename, 'rb') as file:
return pickle.load(file)
def load_dict(model_dict):
type = model_dict.pop('type')
nthreads = model_dict.pop('nthreads') if 'nthreads' in model_dict else 0
if type == 'method':
kwargs = {}
if 'kwargs' in model_dict:
kwargs = model_dict.pop('kwargs')
model = methods(**model_dict, **kwargs)
if type == 'ml_model':
model = globals()[model_dict['ml_model_type'].split('.')[-1]](**model_dict['kwargs'])
if type == 'model_tree_node':
children = [load_dict(child_dict) for child_dict in model_dict['children']] if model_dict['children'] else None
name = model_dict['name']
operator = model_dict['operator']
model = load_dict(model_dict['model']) if model_dict['model'] else None
weight = model_dict['weight'] if 'weight' in model_dict else None
model = model_tree_node(name=name, children=children, operator=operator, model=model)
if weight:
model.weight = weight
model.set_num_threads(nthreads)
return model
if __name__ == '__main__':
pass