#!/usr/bin/env python3
'''
.. code-block::
!---------------------------------------------------------------------------!
! model_cls: Module with generic classes for models !
! Implementations by: Pavlo O. Dral, Fuchun Ge, Yi-Fan Hou, Yuxinxin Chen, !
! Peikun Zheng !
!---------------------------------------------------------------------------!
'''
from __future__ import annotations
from typing import Any, Union, Iterable, Callable
import os, sys
import numpy as np
from collections import UserDict
from . import data, stats
from .decorators import doc_inherit
[文档]
class model():
'''
Parent (super) class for models to enable useful features such as logging during geometry optimizations.
'''
nthreads = 0
def set_num_threads(self, nthreads=0):
# implement for each subclass
if nthreads:
self.nthreads = nthreads
[文档]
def config_multiprocessing(self):
'''
for scripts that need to be executed before running model in parallel
'''
pass
def parse_args(self, args):
# for command-line arguments parsing
pass
def _predict_geomopt(self,
return_string=False,
dump_trajectory_interval=None,
filename=None,
format='json',
print_properties=None,
molecule: data.molecule = None,
**kwargs):
self.predict(molecule=molecule,
**kwargs)
if dump_trajectory_interval != None:
opttraj = data.molecular_trajectory()
opttraj.load(filename=filename, format=format)
nsteps = len(opttraj.steps)
if print_properties == 'all' or type(print_properties) == list:
printstrs = []
printstrs += [' %s ' % ('-'*78)]
printstrs += [f' Iteration {nsteps+1}']
printstrs += [' %s \n' % ('-'*78)]
printstrs += [molecule.info(properties=print_properties, return_string=True)]
printstrs = '\n'.join(printstrs) + '\n'
if not return_string:
print(printstrs)
opttraj.steps.append(data.molecular_trajectory_step(step=nsteps, molecule=molecule))
opttraj.dump(filename=filename, format=format)
moldb = data.molecular_database()
moldb.molecules = [each.molecule for each in opttraj.steps]
xyzfilename = os.path.splitext(os.path.basename(filename))[0]
moldb.write_file_with_xyz_coordinates(f'{xyzfilename}.xyz')
if return_string and (dump_trajectory_interval != None) and (print_properties == 'all' or type(print_properties) == list): return printstrs
[文档]
def predict(
self,
molecular_database: data.molecular_database = None,
molecule: data.molecule = None,
calculate_energy: bool = False,
calculate_energy_gradients: bool = False,
calculate_hessian: bool = False,
**kwargs,
):
'''
Make predictions for molecular geometries with the model.
Arguments:
molecular_database (:class:`mlatom.data.molecular_database`, optional): A database contains the molecules whose properties need to be predicted by the model.
molecule (:class:`mlatom.models.molecule`, optional): A molecule object whose property needs to be predicted by the model.
calculate_energy (bool, optional): Use the model to calculate energy.
calculate_energy_gradients (bool, optional): Use the model to calculate energy gradients.
calculate_hessian (bool, optional): Use the model to calculate energy hessian.
'''
# for universal control of predicting behavior
self.set_num_threads()
if molecular_database != None:
molecular_database = molecular_database
elif molecule != None:
molecular_database = data.molecular_database([molecule])
else:
errmsg = 'Either molecule or molecular_database should be provided in input'
raise ValueError(errmsg)
return molecular_database
def dump(self,filename=None,format='json'):
modelname = self.__class__.__name__
modulename = self.__module__
modulepath = sys.modules[modulename].__spec__.origin
model_dict = {
'type': modelname,
'module':{
'path':modulepath,
'name':modulename
}}
for key in self.__dict__:
tt = type(self.__dict__[key])
if tt in [str, dict]:
model_dict[key] = self.__dict__[key]
model_dict['nthreads'] = self.nthreads
if format == 'json':
import json
with open(filename, 'w') as fjson:
json.dump(model_dict, fjson, indent=4)
if format == 'dict':
return model_dict
def _call_impl(self, *args, **kwargs):
return self.predict(*args, **kwargs)
__call__ : Callable[..., Any] = _call_impl
class torch_model(model):
# models that utilize PyTorch should inherit this class
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
import torch
torch.set_num_threads(self.nthreads)
def config_multiprocessing(self):
super().config_multiprocessing()
import torch
torch.set_num_threads(1)
class torchani_model(torch_model):
# models that utilize TorchANI should inherit this class
def config_multiprocessing(self):
return super().config_multiprocessing()
class tensorflow_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["TF_INTRA_OP_PARALLELISM_THREADS"] = str(self.nthreads)
class MKL_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["MKL_NUM_THREADS"] = str(self.nthreads)
class OMP_model(model):
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
os.environ["OMP_NUM_THREADS"] = str(self.nthreads)
class method_model(model):
@classmethod
def is_method_supported(cls, method):
if 'supported_methods' in cls.__dict__:
if method.casefold() in [m.casefold() for m in cls.supported_methods]:
return True
else:
return False
else:
return None
@classmethod
def is_program_found(cls):
if 'bin_env_name' in cls.__dict__:
bin_env_name = cls.get_bin_env_var()
if bin_env_name is None:
return False
else:
return True
else:
return None
@classmethod
def get_bin_env_var(cls):
if cls.bin_env_name in os.environ:
return os.environ[cls.bin_env_name]
else:
return None
@classmethod
def raise_unsupported_method_error(cls, method):
raise ValueError(f'The method "{method}" is not supported by this class. You might have misspelled method, please check the class documentation.')
# Parent model class
[文档]
class ml_model(model):
'''
Useful as a superclass for the ML models that need to be trained.
'''
[文档]
def train(
self,
molecular_database: data.molecular_database,
property_to_learn: Union[str, None] = 'y',
xyz_derivative_property_to_learn: str = None,
) -> None:
'''
Train the model with molecular database provided.
Arguments:
molecular_database (:class:`mlatom.data.molecular_database`): The database of molecules for training.
property_to_learn (str, optional): The label of property to be learned in model training.
xyz_derivative_property_to_learn (str, optional): The label of XYZ derivative property to be learned.
'''
self.set_num_threads()
[文档]
@doc_inherit
def predict(
self,
molecular_database: data.molecular_database = None, molecule: data.molecule = None,
calculate_energy: bool = False, property_to_predict: Union[str, None] = 'estimated_y',
calculate_energy_gradients: bool = False, xyz_derivative_property_to_predict: Union[str, None] = 'estimated_xyz_derivatives_y',
calculate_hessian: bool = False, hessian_to_predict: Union[str, None] = 'estimated_hessian_y',
) -> None:
'''
property_to_predict (str, optional): The label name where the predicted properties to be saved.
xyz_derivative_property_to_predict (str, optional): The label name where the predicted XYZ derivatives to be saved.
hessian_to_predict (str, optional): The label name where the predicted Hessians to be saved.
'''
molecular_database = super().predict(molecular_database=molecular_database, molecule=molecule)
if calculate_energy:
property_to_predict = 'energy'
if calculate_energy_gradients:
xyz_derivative_property_to_predict = 'energy_gradients'
if calculate_hessian:
hessian_to_predict = 'hessian'
return molecular_database, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict
[文档]
def generate_model_dict(self):
'''
Generates model dictionary for dumping in json format.
'''
model_dict = {
'type': 'ml_model',
'ml_model_type': str(type(self)).split("'")[1],
'kwargs': {
'model_file': os.path.abspath(self.model_file)
},
# 'hyperparameters': self.hyperparameters,
'nthreads': self.nthreads,
}
return model_dict
[文档]
def reset(self):
'''
Resets model (deletes the ML model file from the hard disk).
'''
if os.path.exists(self.model_file): os.remove(self.model_file)
[文档]
def dump(self, filename=None, format='json'):
'''
Dumps model class object information in a json file (do not confused with saving the model itself, i.e., its parameters!).
'''
if not self.model_file:
self.save()
model_dict = self.generate_model_dict()
if format == 'json':
import json
with open(filename, 'w') as f:
json.dump(model_dict, f, indent=4)
if format == 'dict':
return model_dict
def parse_args(self, args):
super().parse_args(args)
def parse_hyperparameter_optimization(self, args, arg_key):
space_map = {
'loguniform': 'log',
'uniform': 'linear',
}
if args.hyperparameter_optimization['optimization_algorithm'] == 'tpe':
value = args._hyperopt_str_dict[arg_key]
space = space_map[value.split('(')[0].split('.')[-1]]
lb = float(value.split('(')[1][:-1].split(',')[0])
hb = float(value.split('(')[1][:-1].split(',')[1])
self.hyperparameters[arg_key].optimization_space = space
if space == 'log':
self.hyperparameters[arg_key].minval = 2**lb
self.hyperparameters[arg_key].maxval = 2**hb
else:
self.hyperparameters[arg_key].minval = lb
self.hyperparameters[arg_key].maxval = hb
[文档]
def calculate_validation_loss(self,
training_kwargs=None,
prediction_kwargs=None,
cv_splits_molecular_databases=None, calculate_CV_split_errors=False,
subtraining_molecular_database=None, validation_molecular_database=None,
validation_loss_function=None, validation_loss_function_kwargs={},
debug=False):
'''
Returns the validation loss for the given hyperparameters.
By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients.
Arguments:
training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function.
prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function.
cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`.
calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error.
subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function.
validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function.
validation_loss_function (function, optional): user-defined validation function.
validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``.
'''
property_to_learn = self.get_property_to_learn(training_kwargs)
xyz_derivative_property_to_learn = self.get_xyz_derivative_property_to_learn(training_kwargs)
if property_to_learn == None and xyz_derivative_property_to_learn == None:
property_to_learn = 'y'
if training_kwargs is None:
training_kwargs = {'property_to_learn': 'y'}
else:
training_kwargs['property_to_learn'] = 'y'
property_to_predict = self.get_property_to_predict(prediction_kwargs)
xyz_derivative_property_to_predict = self.get_xyz_derivative_property_to_predict(prediction_kwargs)
if property_to_predict == None and xyz_derivative_property_to_predict == None:
if prediction_kwargs == None: prediction_kwargs = {}
if property_to_learn != None:
property_to_predict = f'estimated_{property_to_learn}'
prediction_kwargs['property_to_predict'] = property_to_predict
if xyz_derivative_property_to_learn != None:
xyz_derivative_property_to_predict = f'estimated_{xyz_derivative_property_to_learn}'
prediction_kwargs['xyz_derivative_property_to_predict'] = xyz_derivative_property_to_predict
estimated_y=None; y=None; estimated_xyz_derivatives=None; xyz_derivatives=None
if type(cv_splits_molecular_databases) == type(None):
self.holdout_validation(subtraining_molecular_database=subtraining_molecular_database,
validation_molecular_database=validation_molecular_database,
training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs)
if property_to_learn != None:
y = validation_molecular_database.get_properties(property_name=property_to_learn)
estimated_y = validation_molecular_database.get_properties(property_name=property_to_predict)
if xyz_derivative_property_to_learn != None:
xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn)
estimated_xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict)
else:
self.cross_validation(cv_splits_molecular_databases=cv_splits_molecular_databases,
training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs)
training_molecular_database = data.molecular_database()
if calculate_CV_split_errors:
nsplits = len(cv_splits_molecular_databases)
CV_y=[None for ii in range(nsplits)]; CV_yest=[None for ii in range(nsplits)]; CV_xyz_derivatives=[None for ii in range(nsplits)]; CV_estimated_xyz_derivatives=[None for ii in range(nsplits)]
for CVsplit in cv_splits_molecular_databases:
training_molecular_database.molecules += CVsplit.molecules
if property_to_learn != None:
y = training_molecular_database.get_properties(property_name=property_to_learn)
estimated_y = training_molecular_database.get_properties(property_name=property_to_predict)
if calculate_CV_split_errors:
CV_y = [] ; CV_yest = []
for ii in range(nsplits):
CV_y.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_learn))
CV_yest.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_predict))
if xyz_derivative_property_to_learn != None:
xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn)
estimated_xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict)
if calculate_CV_split_errors:
CV_xyz_derivatives = [] ; CV_estimated_xyz_derivatives = []
for ii in range(nsplits):
CV_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn))
CV_estimated_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict))
def geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives):
total_rmse = 1
if property_to_learn != None:
total_rmse *= stats.rmse(estimated_y,y)
if xyz_derivative_property_to_learn != None:
total_rmse *= stats.rmse(estimated_xyz_derivatives.reshape(estimated_xyz_derivatives.size),xyz_derivatives.reshape(xyz_derivatives.size))
if property_to_learn != None and xyz_derivative_property_to_learn != None:
total_rmse = np.sqrt(total_rmse)
return total_rmse
if validation_loss_function == None: error = geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives)
else: error = validation_loss_function(**validation_loss_function_kwargs)
self.reset()
if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors:
CV_errors = []
for ii in range(nsplits):
if validation_loss_function == None: CVerror = geomRMSEloc(CV_yest[ii],CV_y[ii],CV_estimated_xyz_derivatives[ii],CV_xyz_derivatives[ii])
else: CVerror = validation_loss_function(**validation_loss_function_kwargs)
CV_errors.append(CVerror)
if debug:
for each in self.hyperparameters.keys():
print(f" Hyperparameter {each} = {self.hyperparameters[each].value}")
print(f" Validation loss: {error}")
if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors:
return error, CV_errors
else:
return error
[文档]
def optimize_hyperparameters(self,
hyperparameters=None,
training_kwargs=None,
prediction_kwargs=None,
cv_splits_molecular_databases=None,
subtraining_molecular_database=None, validation_molecular_database=None,
optimization_algorithm=None, optimization_algorithm_kwargs={},
maximum_evaluations=10000,
validation_loss_function=None, validation_loss_function_kwargs={},
debug=False):
'''
Optimizes hyperparameters by minimizing the validation loss.
By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients.
Arguments:
hyperparameters (list, required): the list with strings - names of hyperparameters. Hyperparameters themselves must be in ``youmodel.hyperparameters`` defined with class instance :class:`hyperparameters <mlatom.models.hyperparameters>` consisting of :class:`hyperparameter <mlatom.models.hyperparameter>` defining the optimization space.
training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function.
prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function.
cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`.
calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error.
subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function.
validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function.
validation_loss_function (function, optional): user-defined validation function.
validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``.
optimization_algorithm (str, required): optimization algorithm. No default, must be specified among: 'grid' ('brute'), 'TPE', 'Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact'.
optimization_algorithm_kwargs (dict, optional): kwargs to be passed to optimization algorithm, e.g., ``{'grid_size': 5}`` (default 9 for the grid search).
maximum_evaluations (int, optional): maximum number of optimization evaluations (default: 10000) supported by all optimizers except for grid search.
Saves the final hyperparameters in ``yourmodel.hyperparameters`` adn validation loss in ``yourmodel.validation_loss``.
'''
def validation_loss(current_hyperparameters):
for ii in range(len(current_hyperparameters)):
self.hyperparameters[hyperparameters[ii]].value = current_hyperparameters[ii]
return self.calculate_validation_loss( training_kwargs=training_kwargs,
prediction_kwargs=prediction_kwargs,
cv_splits_molecular_databases=cv_splits_molecular_databases,
subtraining_molecular_database=subtraining_molecular_database,
validation_molecular_database=validation_molecular_database,
validation_loss_function=validation_loss_function, validation_loss_function_kwargs=validation_loss_function_kwargs,
debug=debug)
import tempfile
with tempfile.TemporaryDirectory() as tmpdirname:
saved_name = self.model_file
self.model_file = f'{tmpdirname}/{saved_name}'
if optimization_algorithm.casefold() in [mm.casefold() for mm in ['Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact']]:
import scipy.optimize
import numpy as np
initial_hyperparameters = np.array([self.hyperparameters[key].value for key in hyperparameters])
bounds = np.array([[self.hyperparameters[key].minval, self.hyperparameters[key].maxval] for key in hyperparameters])
res = scipy.optimize.minimize(validation_loss, initial_hyperparameters, method=optimization_algorithm, bounds=bounds,
options={'xatol': 1e-8, 'disp': True, 'maxiter': maximum_evaluations})
for ii in range(len(res.x)):
self.hyperparameters[hyperparameters[ii]].value = res.x[ii]
elif optimization_algorithm.casefold() in [mm.casefold() for mm in ['grid', 'brute']]:
import scipy.optimize
import numpy as np
grid_slices = []
for key in hyperparameters:
if 'grid_size' in optimization_algorithm_kwargs.keys():
grid_size = optimization_algorithm_kwargs['grid_size']
else:
grid_size=9
if self.hyperparameters[key].optimization_space == 'linear': grid_slices.append(list(np.linspace(self.hyperparameters[key].minval, self.hyperparameters[key].maxval, num=grid_size)))
if self.hyperparameters[key].optimization_space == 'log': grid_slices.append(list(np.logspace(np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval), num=grid_size, base=np.exp(1))))
params, _ = optimize_grid(validation_loss, grid_slices)
for ii in range(len(params)):
self.hyperparameters[hyperparameters[ii]].value = params[ii]
elif optimization_algorithm.lower() == 'tpe':
import hyperopt
import numpy as np
from hyperopt.std_out_err_redirect_tqdm import DummyTqdmFile
def fileno(self):
if self.file.name == '<stdin>':
return 0
elif self.file.name == '<stdout>':
return 1
elif self.file.name == '<stderr>':
return 2
else:
return 3
DummyTqdmFile.fileno = fileno
validation_loss_wraper_for_hyperopt = lambda d: validation_loss([d[k] for k in hyperparameters])
space_mapping = {'linear': hyperopt.hp.uniform, 'log': hyperopt.hp.loguniform, 'normal': hyperopt.hp.normal, 'lognormal': hyperopt.hp.lognormal, 'discrete': hyperopt.hp.quniform, 'discretelog': hyperopt.hp.qloguniform, 'discretelognormal': hyperopt.hp.qlognormal, 'choices': hyperopt.hp.choice}
def get_space(key):
space_type = self.hyperparameters[key].optimization_space
if space_type in ['log']:
args = [np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval)]
elif space_type in ['linear']:
args = [self.hyperparameters[key].minval, self.hyperparameters[key].maxval]
else:
raise NotImplementedError
return space_mapping[space_type](key, *args)
space = {key: get_space(key) for key in hyperparameters}
res = hyperopt.fmin(fn=validation_loss_wraper_for_hyperopt, space=space, algo=hyperopt.tpe.suggest, max_evals=maximum_evaluations, show_progressbar=True)#, points_to_evaluate=initial_hyperparameters
for k, v in res.items():
self.hyperparameters[k].value = v
self.model_file = saved_name
# Use the final hyperparameters to train the model and get the validation errors
self.validation_loss = validation_loss(np.array([self.hyperparameters[key].value for key in hyperparameters]))
def holdout_validation(self, subtraining_molecular_database=None, validation_molecular_database=None,
training_kwargs=None, prediction_kwargs=None):
if type(training_kwargs) == type(None): training_kwargs = {}
if type(prediction_kwargs) == type(None): prediction_kwargs = {}
self.train(molecular_database=subtraining_molecular_database, **training_kwargs)
self.predict(molecular_database = validation_molecular_database, **prediction_kwargs)
def cross_validation(self, cv_splits_molecular_databases=None,
training_kwargs=None, prediction_kwargs=None):
if type(training_kwargs) == type(None): training_kwargs = {}
if type(prediction_kwargs) == type(None): prediction_kwargs = {}
nsplits = len(cv_splits_molecular_databases)
for ii in range(nsplits):
subtraining_molecular_database = data.molecular_database()
for jj in range(nsplits):
if ii != jj: subtraining_molecular_database.molecules += cv_splits_molecular_databases[jj].molecules
validation_molecular_database = cv_splits_molecular_databases[ii]
self.reset()
self.train(molecular_database=subtraining_molecular_database, **training_kwargs)
self.predict(molecular_database=validation_molecular_database, **prediction_kwargs)
def get_property_to_learn(self, training_kwargs=None):
if type(training_kwargs) == type(None):
property_to_learn = None
else:
if 'property_to_learn' in training_kwargs:
property_to_learn = training_kwargs['property_to_learn']
else:
property_to_learn = None
return property_to_learn
def get_xyz_derivative_property_to_learn(self, training_kwargs=None):
if type(training_kwargs) == type(None):
xyz_derivative_property_to_learn = None
else:
if 'xyz_derivative_property_to_learn' in training_kwargs:
xyz_derivative_property_to_learn = training_kwargs['xyz_derivative_property_to_learn']
else:
xyz_derivative_property_to_learn = None
return xyz_derivative_property_to_learn
def get_property_to_predict(self, prediction_kwargs=None):
if type(prediction_kwargs) != type(None):
if 'property_to_predict' in prediction_kwargs:
property_to_predict = prediction_kwargs['property_to_predict']
else:
if 'calculate_energy' in prediction_kwargs:
property_to_predict = 'estimated_energy'
else:
property_to_predict = 'estimated_y'
else:
property_to_predict = None
return property_to_predict
def get_xyz_derivative_property_to_predict(self,prediction_kwargs=None):
if type(prediction_kwargs) != type(None):
if 'xyz_derivative_property_to_predict' in prediction_kwargs:
xyz_derivative_property_to_predict = prediction_kwargs['xyz_derivative_property_to_predict']
else:
if 'calculate_energy_gradients' in prediction_kwargs:
xyz_derivative_property_to_predict = 'estimated_energy_gradients'
else:
xyz_derivative_property_to_predict = 'estimated_xyz_derivatives_y'
else:
xyz_derivative_property_to_predict = None
return xyz_derivative_property_to_predict
def optimize_grid(func, grid):
'''
Optimizes on the given grid by finding parameters (provided by grid) leading to the minimum value of the given function.
'''
last = True
for ii in grid[:-1]:
if len(ii) != 1:
last = False
break
if last:
other_params = [jj[0] for jj in grid[:-1]]
opt_param = grid[-1][0]
min_val = func(other_params + [opt_param])
for param in grid[-1][1:]:
val = func(other_params + [param])
if val < min_val:
opt_param = param
min_val = val
return other_params + [opt_param], min_val
else:
min_val = None
for kk in range(len(grid))[:-1]:
if len(grid[kk]) != 1:
if kk == 0: other_params_left = []
else: other_params_left = [[grid[ii][0]] for ii in range(kk)]
other_params_right = grid[kk+1:]
for param in grid[kk]:
params, val = optimize_grid(func,other_params_left + [[param]] + other_params_right)
if min_val == None:
min_val = val
opt_params = params
elif val < min_val:
opt_params = params
min_val = val
break
return opt_params, min_val
[文档]
class hyperparameter():
'''
Class of hyperparameter object, containing data could be used in hyperparameter optimizations.
Arguments:
value (Any, optional): The value of the hyperparameter.
optimization_space (str, optional): Defines the space for hyperparameter. Currently supports ``'linear'``, and ``'log'``.
dtype (Callable, optional): A callable object that forces the data type of value. Automatically choose one if set to ``None``.
'''
def __init__(self, value: Any = None, optimization_space: str = 'linear', dtype: Union[Callable, None] = None, name: str = "", minval: Any = None, maxval: Any = None, step: Any = None, choices: Iterable[Any] = [], **kwargs):
self.name = name
self.dtype = dtype if dtype else None if value is None else type(value)
self.value = value# @Yifan
self.optimization_space = optimization_space # 'linear' or 'log'
self.minval = minval
self.maxval = maxval
self.step = step
self.choices = choices
def __setattr__(self, key, value):
if key == 'value':
value = (value if isinstance(value, self.dtype) else self._cast_dtype(value)) if self.dtype else value
if key == 'dtype':
self._set_dtype_cast_method(value)
super().__setattr__(key, value)
def __repr__(self):
return f'hyperparameter {str(self.__dict__)}'
def _set_dtype_cast_method(self, dtype):
if type(dtype) == tuple:
dtype = dtype[0]
if dtype == np.ndarray:
self._cast_dtype = np.array
else:
self._cast_dtype = dtype
[文档]
def update(self, new_hyperparameter:hyperparameter) -> None:
'''
Update hyperparameter with data in another instance.
Arguments:
new_hyperparameter (:class:`mlatom.models.hyperparamters`): Whose data are to be applied to the current instance.
'''
self.__dict__.update(new_hyperparameter.__dict__)
[文档]
def copy(self):
'''
Returns a copy of current instance.
Returns:
:class:`mlatom.models.hyperparamter`: a new instance copied from current one.
'''
return hyperparameter(**self.__dict__)
[文档]
class hyperparameters(UserDict):
'''
Class for storing hyperparameters, values are auto-converted to :class:`mlatom.models.hyperparameter` objects.
Inherit from collections.UserDict.
Initiaion:
Initiate with a dictinoary or kwargs or both.
e.g.:
.. code-block::
hyperparamters({'a': 1.0}, b=hyperparameter(value=2, minval=0, maxval=4))
'''
def __setitem__(self, key, value):
if isinstance(value, hyperparameter):
if key in self:
super().__getitem__(key).update(value)
else:
super().__setitem__(key, value)
elif key in self:
super().__getitem__(key).value = value
else:
super().__setitem__(key, hyperparameter(value=value, name=key))
def __getattr__(self, key):
if key in self:
return self[key].value
else:
return self.__dict__[key]
def __setattr__(self, key, value):
if key.startswith('__') or (key in self.__dict__) or key == 'data':
super().__setattr__(key, value)
else:
self.__setitem__(key, value)
def __getstate__(self):
return vars(self)
def __setstate__(self, state):
vars(self).update(state)
[文档]
def copy(self, keys: Union[Iterable[str], None] = None) -> hyperparameters:
'''
Returns a copy of current instance.
Arguments:
keys (Iterable[str], optional): If keys provided, only the hyperparameters selected by keys will be copied, instead of all hyperparameters.
Returns:
:class:`mlatom.models.hyperparamters`: a new instance copied from current one.
'''
if keys is None:
keys = self.keys()
return hyperparameters({key: self[key].copy() for key in keys})
[文档]
class model_tree_node(model):
'''
Create a model tree node.
Arguments:
name (str): The name assign to the object.
parent: The parent of the model node.
children: The children of this model tree node.
operator: Specify the operation to be made when making predictions.
'''
def __init__(self, name=None, parent=None, children=None, operator=None, model=None):
self.name = name
self.parent = parent
self.children = children
if self.parent != None:
if self.parent.children == None: self.parent.children = []
if not self in self.parent.children:
self.parent.children.append(self)
if self.children != None:
for child in self.children:
child.parent=self
self.operator = operator
self.model = model
def set_num_threads(self, nthreads=0):
super().set_num_threads(nthreads)
if self.nthreads:
if self.children != None:
for child in self.children:
child.set_num_threads(self.nthreads)
else:
self.model.set_num_threads(self.nthreads)
[文档]
def predict(self, **kwargs):
molDB = super().predict(**kwargs)
if len(molDB) == 0: return
if 'calculate_energy' in kwargs: calculate_energy = kwargs['calculate_energy']
else: calculate_energy = True
if 'calculate_energy_gradients' in kwargs: calculate_energy_gradients = kwargs['calculate_energy_gradients']
else: calculate_energy_gradients = False
if 'calculate_hessian' in kwargs: calculate_hessian = kwargs['calculate_hessian']
else: calculate_hessian = False
if 'nstates' in kwargs: nstates = kwargs['nstates']
else: nstates = 1
if 'current_state' in kwargs: current_state = kwargs['current_state']
else: current_state = 0
properties = [] ; atomic_properties = []
if calculate_energy: properties.append('energy')
if calculate_energy_gradients: atomic_properties.append('energy_gradients')
if calculate_hessian: properties.append('hessian')
for mol in molDB.molecules:
if nstates:
mol_copy = mol.copy()
mol_copy.electronic_states = []
if nstates >1:
for _ in range(nstates - len(mol.electronic_states)):
mol.electronic_states.append(mol_copy.copy())
for mol_el_st in mol.electronic_states:
if not self.name in mol_el_st.__dict__:
parent = None
if self.parent != None:
if self.parent.name in mol_el_st.__dict__:
parent = mol_el_st.__dict__[self.parent.name]
children = None
if self.children != None:
for child in self.children:
if child.name in mol_el_st.__dict__:
if children == None: children = []
children.append(mol_el_st.__dict__[child.name])
mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
if not self.name in mol.__dict__:
parent = None
if self.parent != None:
if self.parent.name in mol.__dict__:
parent = mol.__dict__[self.parent.name]
children = None
if self.children != None:
for child in self.children:
if child.name in mol.__dict__:
if children == None: children = []
children.append(mol.__dict__[child.name])
mol.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
if self.children == None and self.operator == 'predict':
self.model.predict(**kwargs)
for mol in molDB.molecules:
if not mol.electronic_states:
self.get_properties_from_molecule(mol, properties, atomic_properties)
for mol_el_st in mol.electronic_states:
# mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children)
self.get_properties_from_molecule(mol_el_st, properties, atomic_properties)
else:
for child in self.children:
child.predict(**kwargs)
if 'weight' in child.__dict__.keys():
mol.__dict__[child.name].__dict__['weight'] = child.weight
if self.operator == 'sum':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].sum(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].sum(properties+atomic_properties)
if self.operator == 'weighted_sum':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].weighted_sum(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].weighted_sum(properties+atomic_properties)
if self.operator == 'average':
for mol in molDB.molecules:
if not mol.electronic_states:
mol.__dict__[self.name].average(properties+atomic_properties)
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[self.name].average(properties+atomic_properties)
if self.parent == None:
self.update_molecular_properties(molecular_database=molDB, properties=properties, atomic_properties=atomic_properties, current_state=current_state)
def get_properties_from_molecule(self, molecule, properties=[], atomic_properties=[]):
property_values = molecule.__dict__[self.name].__dict__
for property_name in properties:
if property_name in molecule.__dict__: property_values[property_name] = molecule.__dict__.pop(property_name)
for property_name in atomic_properties:
property_values[property_name] = []
for atom in molecule.atoms:
property_values[property_name].append(atom.__dict__.pop(property_name))
property_values[property_name] = np.array(property_values[property_name]).astype(float)
def update_molecular_properties(self, molecular_database=None, molecule=None, properties=[], atomic_properties=[], current_state=0):
molDB = molecular_database
if molecule != None:
molDB = data.molecular_database()
molDB.molecules.append(molecule)
for mol in molDB.molecules:
for property_name in properties:
for mol_el_st in mol.electronic_states:
mol_el_st.__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name]
if not mol.electronic_states:
mol.__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name]
else:
mol.__dict__[property_name] = mol.electronic_states[current_state].__dict__[property_name]
for property_name in atomic_properties:
for mol_el_st in mol.electronic_states:
for iatom in range(len(mol_el_st.atoms)):
mol_el_st.atoms[iatom].__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name][iatom]
if not mol.electronic_states:
for iatom in range(len(mol.atoms)):
mol.atoms[iatom].__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name][iatom]
else:
for iatom in range(len(mol.atoms)):
mol.atoms[iatom].__dict__[property_name] = mol.electronic_states[current_state].atoms[iatom].__dict__[property_name]
[文档]
def dump(self, filename=None, format='json'):
'''
Dump the object to a file.
'''
model_dict = {
'type': 'model_tree_node',
'name': self.name,
'children': [child.dump(format='dict') for child in self.children] if self.children else None,
'operator': self.operator,
'model': self.model.dump(format='dict') if self.model else None,
'nthreads': self.nthreads,
'weight': self.weight if 'weight' in self.__dict__ else None
}
if format == 'json':
import json
with open(filename, 'w') as f:
json.dump(model_dict, f, indent=4)
if format == 'dict':
return model_dict
if __name__ == '__main__':
pass