mlatom.models 源代码

#!/usr/bin/env python3
'''
.. code-block::

  !---------------------------------------------------------------------------! 
  ! models: Module with models                                                ! 
  ! Implementations by: Pavlo O. Dral, Fuchun Ge, Yi-Fan Hou, Yuxinxin Chen,  !
  !                     Peikun Zheng                                          ! 
  !---------------------------------------------------------------------------! 
'''
from __future__ import annotations
from typing import Any, Union, Dict, List, Iterable, Callable
import os, tempfile, uuid, sys
import numpy as np
from collections import UserDict

from . import data, stats, stopper, interfaces
from .decorators import doc_inherit

[文档] class model(): ''' Parent (super) class for models to enable useful features such as logging during geometry optimizations. ''' nthreads = 0 def set_num_threads(self, nthreads=0): # implement for each subclass if nthreads: self.nthreads = nthreads
[文档] def config_multiprocessing(self): ''' for scripts that need to be executed before running model in parallel ''' pass
def parse_args(self, args): # for command-line arguments parsing pass def _predict_geomopt(self, return_string=False, dump_trajectory_interval=None, filename=None, format='json', print_properties=None, molecule: data.molecule = None, **kwargs): self.predict(molecule=molecule, **kwargs) if dump_trajectory_interval != None: opttraj = data.molecular_trajectory() opttraj.load(filename=filename, format=format) nsteps = len(opttraj.steps) if print_properties == 'all' or type(print_properties) == list: printstrs = [] printstrs += [' %s ' % ('-'*78)] printstrs += [f' Iteration {nsteps+1}'] printstrs += [' %s \n' % ('-'*78)] printstrs += [molecule.info(properties=print_properties, return_string=True)] printstrs = '\n'.join(printstrs) + '\n' if not return_string: print(printstrs) opttraj.steps.append(data.molecular_trajectory_step(step=nsteps, molecule=molecule)) opttraj.dump(filename=filename, format=format) moldb = data.molecular_database() moldb.molecules = [each.molecule for each in opttraj.steps] xyzfilename = os.path.splitext(os.path.basename(filename))[0] moldb.write_file_with_xyz_coordinates(f'{xyzfilename}.xyz') if return_string and (dump_trajectory_interval != None) and (print_properties == 'all' or type(print_properties) == list): return printstrs
[文档] def predict( self, molecular_database: data.molecular_database = None, molecule: data.molecule = None, calculate_energy: bool = False, calculate_energy_gradients: bool = False, calculate_hessian: bool = False, **kwargs, ): ''' Make predictions for molecular geometries with the model. Arguments: molecular_database (:class:`mlatom.data.molecular_database`, optional): A database contains the molecules whose properties need to be predicted by the model. molecule (:class:`mlatom.models.molecule`, optional): A molecule object whose property needs to be predicted by the model. calculate_energy (bool, optional): Use the model to calculate energy. calculate_energy_gradients (bool, optional): Use the model to calculate energy gradients. calculate_hessian (bool, optional): Use the model to calculate energy hessian. ''' # for universal control of predicting behavior self.set_num_threads() if molecular_database != None: molecular_database = molecular_database elif molecule != None: molecular_database = data.molecular_database([molecule]) else: errmsg = 'Either molecule or molecular_database should be provided in input' raise ValueError(errmsg) return molecular_database
def dump(self,filename=None,format='json'): modelname = self.__class__.__name__ modulename = self.__module__ modulepath = sys.modules[modulename].__spec__.origin model_dict = { 'type': modelname, 'module':{ 'path':modulepath, 'name':modulename }} for key in self.__dict__: tt = type(self.__dict__[key]) if tt in [str, dict]: model_dict[key] = self.__dict__[key] model_dict['nthreads'] = self.nthreads if format == 'json': import json with open(filename, 'w') as fjson: json.dump(model_dict, fjson, indent=4) if format == 'dict': return model_dict def _call_impl(self, *args, **kwargs): return self.predict(*args, **kwargs) __call__ : Callable[..., Any] = _call_impl
class torch_model(model): # models that utilize PyTorch should inherit this class def set_num_threads(self, nthreads=0): super().set_num_threads(nthreads) if self.nthreads: import torch torch.set_num_threads(self.nthreads) def config_multiprocessing(self): super().config_multiprocessing() import torch torch.set_num_threads(1) class torchani_model(torch_model): # models that utilize TorchANI should inherit this class def config_multiprocessing(self): return super().config_multiprocessing() class tensorflow_model(model): def set_num_threads(self, nthreads=0): super().set_num_threads(nthreads) if self.nthreads: os.environ["TF_INTRA_OP_PARALLELISM_THREADS"] = str(self.nthreads) class MKL_model(model): def set_num_threads(self, nthreads=0): super().set_num_threads(nthreads) if self.nthreads: os.environ["MKL_NUM_THREADS"] = str(self.nthreads) class OMP_model(model): def set_num_threads(self, nthreads=0): super().set_num_threads(nthreads) if self.nthreads: os.environ["OMP_NUM_THREADS"] = str(self.nthreads) class method_model(model): @classmethod def is_method_supported(cls, method): if 'supported_methods' in cls.__dict__: if method.casefold() in [m.casefold() for m in cls.supported_methods]: return True else: return False else: return None @classmethod def is_program_found(cls): if 'bin_env_name' in cls.__dict__: bin_env_name = cls.get_bin_env_var() if bin_env_name is None: return False else: return True else: return None @classmethod def get_bin_env_var(cls): if cls.bin_env_name in os.environ: return os.environ[cls.bin_env_name] else: return None # Parent model class
[文档] class ml_model(model): ''' Useful as a superclass for the ML models that need to be trained. '''
[文档] def train( self, molecular_database: data.molecular_database, property_to_learn: Union[str, None] = 'y', xyz_derivative_property_to_learn: str = None, ) -> None: ''' Train the model with molecular database provided. Arguments: molecular_database (:class:`mlatom.data.molecular_database`): The database of molecules for training. property_to_learn (str, optional): The label of property to be learned in model training. xyz_derivative_property_to_learn (str, optional): The label of XYZ derivative property to be learned. ''' self.set_num_threads()
[文档] @doc_inherit def predict( self, molecular_database: data.molecular_database = None, molecule: data.molecule = None, calculate_energy: bool = False, property_to_predict: Union[str, None] = 'estimated_y', calculate_energy_gradients: bool = False, xyz_derivative_property_to_predict: Union[str, None] = 'estimated_xyz_derivatives_y', calculate_hessian: bool = False, hessian_to_predict: Union[str, None] = 'estimated_hessian_y', ) -> None: ''' property_to_predict (str, optional): The label name where the predicted properties to be saved. xyz_derivative_property_to_predict (str, optional): The label name where the predicted XYZ derivatives to be saved. hessian_to_predict (str, optional): The label name where the predicted Hessians to be saved. ''' molecular_database = super().predict(molecular_database=molecular_database, molecule=molecule) if calculate_energy: property_to_predict = 'energy' if calculate_energy_gradients: xyz_derivative_property_to_predict = 'energy_gradients' if calculate_hessian: hessian_to_predict = 'hessian' return molecular_database, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict
[文档] def generate_model_dict(self): ''' Generates model dictionary for dumping in json format. ''' model_dict = { 'type': 'ml_model', 'ml_model_type': str(type(self)).split("'")[1], 'kwargs': { 'model_file': os.path.abspath(self.model_file) }, # 'hyperparameters': self.hyperparameters, 'nthreads': self.nthreads, } return model_dict
[文档] def reset(self): ''' Resets model (deletes the ML model file from the hard disk). ''' if os.path.exists(self.model_file): os.remove(self.model_file)
[文档] def dump(self, filename=None, format='json'): ''' Dumps model class object information in a json file (do not confused with saving the model itself, i.e., its parameters!). ''' if not self.model_file: self.save() model_dict = self.generate_model_dict() if format == 'json': import json with open(filename, 'w') as f: json.dump(model_dict, f, indent=4) if format == 'dict': return model_dict
def parse_args(self, args): super().parse_args(args) def parse_hyperparameter_optimization(self, args, arg_key): space_map = { 'loguniform': 'log', 'uniform': 'linear', } if args.hyperparameter_optimization['optimization_algorithm'] == 'tpe': value = args._hyperopt_str_dict[arg_key] space = space_map[value.split('(')[0].split('.')[-1]] lb = float(value.split('(')[1][:-1].split(',')[0]) hb = float(value.split('(')[1][:-1].split(',')[1]) self.hyperparameters[arg_key].optimization_space = space if space == 'log': self.hyperparameters[arg_key].minval = 2**lb self.hyperparameters[arg_key].maxval = 2**hb else: self.hyperparameters[arg_key].minval = lb self.hyperparameters[arg_key].maxval = hb
[文档] def calculate_validation_loss(self, training_kwargs=None, prediction_kwargs=None, cv_splits_molecular_databases=None, calculate_CV_split_errors=False, subtraining_molecular_database=None, validation_molecular_database=None, validation_loss_function=None, validation_loss_function_kwargs={}, debug=False): ''' Returns the validation loss for the given hyperparameters. By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients. Arguments: training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function. prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function. cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`. calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error. subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function. validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function. validation_loss_function (function, optional): user-defined validation function. validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``. ''' property_to_learn = self.get_property_to_learn(training_kwargs) xyz_derivative_property_to_learn = self.get_xyz_derivative_property_to_learn(training_kwargs) if property_to_learn == None and xyz_derivative_property_to_learn == None: property_to_learn = 'y' if training_kwargs is None: training_kwargs = {'property_to_learn': 'y'} else: training_kwargs['property_to_learn'] = 'y' property_to_predict = self.get_property_to_predict(prediction_kwargs) xyz_derivative_property_to_predict = self.get_xyz_derivative_property_to_predict(prediction_kwargs) if property_to_predict == None and xyz_derivative_property_to_predict == None: if prediction_kwargs == None: prediction_kwargs = {} if property_to_learn != None: property_to_predict = f'estimated_{property_to_learn}' prediction_kwargs['property_to_predict'] = property_to_predict if xyz_derivative_property_to_learn != None: xyz_derivative_property_to_predict = f'estimated_{xyz_derivative_property_to_learn}' prediction_kwargs['xyz_derivative_property_to_predict'] = xyz_derivative_property_to_predict estimated_y=None; y=None; estimated_xyz_derivatives=None; xyz_derivatives=None if type(cv_splits_molecular_databases) == type(None): self.holdout_validation(subtraining_molecular_database=subtraining_molecular_database, validation_molecular_database=validation_molecular_database, training_kwargs=training_kwargs, prediction_kwargs=prediction_kwargs) if property_to_learn != None: y = validation_molecular_database.get_properties(property_name=property_to_learn) estimated_y = validation_molecular_database.get_properties(property_name=property_to_predict) if xyz_derivative_property_to_learn != None: xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn) estimated_xyz_derivatives = validation_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict) else: self.cross_validation(cv_splits_molecular_databases=cv_splits_molecular_databases, training_kwargs=training_kwargs, prediction_kwargs=prediction_kwargs) training_molecular_database = data.molecular_database() if calculate_CV_split_errors: nsplits = len(cv_splits_molecular_databases) CV_y=[None for ii in range(nsplits)]; CV_yest=[None for ii in range(nsplits)]; CV_xyz_derivatives=[None for ii in range(nsplits)]; CV_estimated_xyz_derivatives=[None for ii in range(nsplits)] for CVsplit in cv_splits_molecular_databases: training_molecular_database.molecules += CVsplit.molecules if property_to_learn != None: y = training_molecular_database.get_properties(property_name=property_to_learn) estimated_y = training_molecular_database.get_properties(property_name=property_to_predict) if calculate_CV_split_errors: CV_y = [] ; CV_yest = [] for ii in range(nsplits): CV_y.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_learn)) CV_yest.append(cv_splits_molecular_databases[ii].get_properties(property_name=property_to_predict)) if xyz_derivative_property_to_learn != None: xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn) estimated_xyz_derivatives = training_molecular_database.get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict) if calculate_CV_split_errors: CV_xyz_derivatives = [] ; CV_estimated_xyz_derivatives = [] for ii in range(nsplits): CV_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_learn)) CV_estimated_xyz_derivatives.append(cv_splits_molecular_databases[ii].get_xyz_vectorial_properties(property_name=xyz_derivative_property_to_predict)) def geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives): total_rmse = 1 if property_to_learn != None: total_rmse *= stats.rmse(estimated_y,y) if xyz_derivative_property_to_learn != None: total_rmse *= stats.rmse(estimated_xyz_derivatives.reshape(estimated_xyz_derivatives.size),xyz_derivatives.reshape(xyz_derivatives.size)) if property_to_learn != None and xyz_derivative_property_to_learn != None: total_rmse = np.sqrt(total_rmse) return total_rmse if validation_loss_function == None: error = geomRMSEloc(estimated_y,y,estimated_xyz_derivatives,xyz_derivatives) else: error = validation_loss_function(**validation_loss_function_kwargs) self.reset() if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors: CV_errors = [] for ii in range(nsplits): if validation_loss_function == None: CVerror = geomRMSEloc(CV_yest[ii],CV_y[ii],CV_estimated_xyz_derivatives[ii],CV_xyz_derivatives[ii]) else: CVerror = validation_loss_function(**validation_loss_function_kwargs) CV_errors.append(CVerror) if debug: for each in self.hyperparameters.keys(): print(f" Hyperparameter {each} = {self.hyperparameters[each].value}") print(f" Validation loss: {error}") if type(cv_splits_molecular_databases) != type(None) and calculate_CV_split_errors: return error, CV_errors else: return error
[文档] def optimize_hyperparameters(self, hyperparameters=None, training_kwargs=None, prediction_kwargs=None, cv_splits_molecular_databases=None, subtraining_molecular_database=None, validation_molecular_database=None, optimization_algorithm=None, optimization_algorithm_kwargs={}, maximum_evaluations=10000, validation_loss_function=None, validation_loss_function_kwargs={}, debug=False): ''' Optimizes hyperparameters by minimizing the validation loss. By default, the validation loss is RMSE evaluated as a geometric mean of scalar and vectorial properties, e.g., energies and gradients. Arguments: hyperparameters (list, required): the list with strings - names of hyperparameters. Hyperparameters themselves must be in ``youmodel.hyperparameters`` defined with class instance :class:`hyperparameters <mlatom.models.hyperparameters>` consisting of :class:`hyperparameter <mlatom.models.hyperparameter>` defining the optimization space. training_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.train()`` function. prediction_kwargs (dict, optional): the kwargs to be passed to ``yourmodel.predict()`` function. cv_splits_molecular_databases (list, optional): the list with cross-validation splits, each element is :class:`molecular_database <mlatom.data.molecular_database>`. calculate_CV_split_errors (bool, optional): requests to return the errors for each cross-validation split as a list in addtion to the aggregate cross-validation error. subtraining_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for sub-training to be passed to ``yourmodel.train()`` function. validation_molecular_database (:class:`molecular_database <mlatom.data.molecular_database>`, optional): molecular database for validation to be passed to ``yourmodel.predict()`` function. validation_loss_function (function, optional): user-defined validation function. validation_loss_function_kwargs (dict, optional): kwargs for above ``validation_loss_function``. optimization_algorithm (str, required): optimization algorithm. No default, must be specified among: 'grid' ('brute'), 'TPE', 'Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact'. optimization_algorithm_kwargs (dict, optional): kwargs to be passed to optimization algorithm, e.g., ``{'grid_size': 5}`` (default 9 for the grid search). maximum_evaluations (int, optional): maximum number of optimization evaluations (default: 10000) supported by all optimizers except for grid search. Saves the final hyperparameters in ``yourmodel.hyperparameters`` adn validation loss in ``yourmodel.validation_loss``. ''' def validation_loss(current_hyperparameters): for ii in range(len(current_hyperparameters)): self.hyperparameters[hyperparameters[ii]].value = current_hyperparameters[ii] return self.calculate_validation_loss( training_kwargs=training_kwargs, prediction_kwargs=prediction_kwargs, cv_splits_molecular_databases=cv_splits_molecular_databases, subtraining_molecular_database=subtraining_molecular_database, validation_molecular_database=validation_molecular_database, validation_loss_function=validation_loss_function, validation_loss_function_kwargs=validation_loss_function_kwargs, debug=debug) import tempfile with tempfile.TemporaryDirectory() as tmpdirname: saved_name = self.model_file self.model_file = f'{tmpdirname}/{saved_name}' if optimization_algorithm.casefold() in [mm.casefold() for mm in ['Nelder-Mead', 'BFGS', 'L-BFGS-B', 'Powell', 'CG', 'Newton-CG', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-krylov', 'trust-exact']]: import scipy.optimize import numpy as np initial_hyperparameters = np.array([self.hyperparameters[key].value for key in hyperparameters]) bounds = np.array([[self.hyperparameters[key].minval, self.hyperparameters[key].maxval] for key in hyperparameters]) res = scipy.optimize.minimize(validation_loss, initial_hyperparameters, method=optimization_algorithm, bounds=bounds, options={'xatol': 1e-8, 'disp': True, 'maxiter': maximum_evaluations}) for ii in range(len(res.x)): self.hyperparameters[hyperparameters[ii]].value = res.x[ii] elif optimization_algorithm.casefold() in [mm.casefold() for mm in ['grid', 'brute']]: import scipy.optimize import numpy as np grid_slices = [] for key in hyperparameters: if 'grid_size' in optimization_algorithm_kwargs.keys(): grid_size = optimization_algorithm_kwargs['grid_size'] else: grid_size=9 if self.hyperparameters[key].optimization_space == 'linear': grid_slices.append(list(np.linspace(self.hyperparameters[key].minval, self.hyperparameters[key].maxval, num=grid_size))) if self.hyperparameters[key].optimization_space == 'log': grid_slices.append(list(np.logspace(np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval), num=grid_size, base=np.exp(1)))) params, _ = optimize_grid(validation_loss, grid_slices) for ii in range(len(params)): self.hyperparameters[hyperparameters[ii]].value = params[ii] elif optimization_algorithm.lower() == 'tpe': import hyperopt import numpy as np from hyperopt.std_out_err_redirect_tqdm import DummyTqdmFile def fileno(self): if self.file.name == '<stdin>': return 0 elif self.file.name == '<stdout>': return 1 elif self.file.name == '<stderr>': return 2 else: return 3 DummyTqdmFile.fileno = fileno validation_loss_wraper_for_hyperopt = lambda d: validation_loss([d[k] for k in hyperparameters]) space_mapping = {'linear': hyperopt.hp.uniform, 'log': hyperopt.hp.loguniform, 'normal': hyperopt.hp.normal, 'lognormal': hyperopt.hp.lognormal, 'discrete': hyperopt.hp.quniform, 'discretelog': hyperopt.hp.qloguniform, 'discretelognormal': hyperopt.hp.qlognormal, 'choices': hyperopt.hp.choice} def get_space(key): space_type = self.hyperparameters[key].optimization_space if space_type in ['log']: args = [np.log(self.hyperparameters[key].minval), np.log(self.hyperparameters[key].maxval)] elif space_type in ['linear']: args = [self.hyperparameters[key].minval, self.hyperparameters[key].maxval] else: raise NotImplementedError return space_mapping[space_type](key, *args) space = {key: get_space(key) for key in hyperparameters} res = hyperopt.fmin(fn=validation_loss_wraper_for_hyperopt, space=space, algo=hyperopt.tpe.suggest, max_evals=maximum_evaluations, show_progressbar=True)#, points_to_evaluate=initial_hyperparameters for k, v in res.items(): self.hyperparameters[k].value = v self.model_file = saved_name # Use the final hyperparameters to train the model and get the validation errors self.validation_loss = validation_loss(np.array([self.hyperparameters[key].value for key in hyperparameters]))
def holdout_validation(self, subtraining_molecular_database=None, validation_molecular_database=None, training_kwargs=None, prediction_kwargs=None): if type(training_kwargs) == type(None): training_kwargs = {} if type(prediction_kwargs) == type(None): prediction_kwargs = {} self.train(molecular_database=subtraining_molecular_database, **training_kwargs) self.predict(molecular_database = validation_molecular_database, **prediction_kwargs) def cross_validation(self, cv_splits_molecular_databases=None, training_kwargs=None, prediction_kwargs=None): if type(training_kwargs) == type(None): training_kwargs = {} if type(prediction_kwargs) == type(None): prediction_kwargs = {} nsplits = len(cv_splits_molecular_databases) for ii in range(nsplits): subtraining_molecular_database = data.molecular_database() for jj in range(nsplits): if ii != jj: subtraining_molecular_database.molecules += cv_splits_molecular_databases[jj].molecules validation_molecular_database = cv_splits_molecular_databases[ii] self.reset() self.train(molecular_database=subtraining_molecular_database, **training_kwargs) self.predict(molecular_database=validation_molecular_database, **prediction_kwargs) def get_property_to_learn(self, training_kwargs=None): if type(training_kwargs) == type(None): property_to_learn = None else: if 'property_to_learn' in training_kwargs: property_to_learn = training_kwargs['property_to_learn'] else: property_to_learn = None return property_to_learn def get_xyz_derivative_property_to_learn(self, training_kwargs=None): if type(training_kwargs) == type(None): xyz_derivative_property_to_learn = None else: if 'xyz_derivative_property_to_learn' in training_kwargs: xyz_derivative_property_to_learn = training_kwargs['xyz_derivative_property_to_learn'] else: xyz_derivative_property_to_learn = None return xyz_derivative_property_to_learn def get_property_to_predict(self, prediction_kwargs=None): if type(prediction_kwargs) != type(None): if 'property_to_predict' in prediction_kwargs: property_to_predict = prediction_kwargs['property_to_predict'] else: if 'calculate_energy' in prediction_kwargs: property_to_predict = 'estimated_energy' else: property_to_predict = 'estimated_y' else: property_to_predict = None return property_to_predict def get_xyz_derivative_property_to_predict(self,prediction_kwargs=None): if type(prediction_kwargs) != type(None): if 'xyz_derivative_property_to_predict' in prediction_kwargs: xyz_derivative_property_to_predict = prediction_kwargs['xyz_derivative_property_to_predict'] else: if 'calculate_energy_gradients' in prediction_kwargs: xyz_derivative_property_to_predict = 'estimated_energy_gradients' else: xyz_derivative_property_to_predict = 'estimated_xyz_derivatives_y' else: xyz_derivative_property_to_predict = None return xyz_derivative_property_to_predict
def optimize_grid(func, grid): ''' Optimizes on the given grid by finding parameters (provided by grid) leading to the minimum value of the given function. ''' last = True for ii in grid[:-1]: if len(ii) != 1: last = False break if last: other_params = [jj[0] for jj in grid[:-1]] opt_param = grid[-1][0] min_val = func(other_params + [opt_param]) for param in grid[-1][1:]: val = func(other_params + [param]) if val < min_val: opt_param = param min_val = val return other_params + [opt_param], min_val else: min_val = None for kk in range(len(grid))[:-1]: if len(grid[kk]) != 1: if kk == 0: other_params_left = [] else: other_params_left = [[grid[ii][0]] for ii in range(kk)] other_params_right = grid[kk+1:] for param in grid[kk]: params, val = optimize_grid(func,other_params_left + [[param]] + other_params_right) if min_val == None: min_val = val opt_params = params elif val < min_val: opt_params = params min_val = val break return opt_params, min_val class krr(ml_model): def train(self, molecular_database=None, property_to_learn='y', xyz_derivative_property_to_learn = None, save_model=True, invert_matrix=False, matrix_decomposition=None, kernel_function_kwargs=None,prior=None): xyz = np.array([mol.xyz_coordinates for mol in molecular_database.molecules]).astype(float) yy = molecular_database.get_properties(property_name=property_to_learn) if prior == None: self.prior = 0.0 elif type(prior) == float or type(prior) == int: self.prior = prior elif prior.casefold() == 'mean'.casefold(): self.prior = np.mean(yy) else: stopper.stopMLatom(f'Unsupported prior: {prior}') yy = yy - self.prior #self.train_x = xx self.Ntrain = len(xyz) self.train_xyz = xyz self.kernel_function = self.gaussian_kernel_function self.kernel_function_kwargs = kernel_function_kwargs self.kernel_matrix_size = 0 Natoms = len(xyz[0]) self.Natoms = Natoms self.train_property = False self.train_xyz_derivative_property = False yyref = np.array([]) if property_to_learn != None: self.train_property = True self.kernel_matrix_size += len(yy) yyref = np.concatenate((yyref,yy)) if xyz_derivative_property_to_learn != None: self.train_xyz_derivative_property = True self.kernel_matrix_size += 3*Natoms*len(yy) yygrad = molecular_database.get_xyz_derivative_properties().reshape(3*Natoms*len(yy)) yyref = np.concatenate((yyref,yygrad)) kernel_matrix = np.zeros((self.kernel_matrix_size,self.kernel_matrix_size)) kernel_matrix = kernel_matrix + np.identity(self.kernel_matrix_size)*self.hyperparameters['lambda'].value for ii in range(len(yy)): value_and_derivatives = self.kernel_function(xyz[ii],xyz[ii],calculate_value=self.train_property,calculate_gradients=self.train_xyz_derivative_property,calculate_Hessian=self.train_xyz_derivative_property,**self.kernel_function_kwargs) kernel_matrix[ii][ii] += value_and_derivatives['value'] if self.train_xyz_derivative_property: kernel_matrix[ii,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['gradients'].reshape(3*Natoms) kernel_matrix[len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['Hessian'] for jj in range(ii+1,len(yy)): value_and_derivatives = self.kernel_function(xyz[ii],xyz[jj],calculate_value=self.train_property,calculate_gradients=self.train_xyz_derivative_property,calculate_Hessian=self.train_xyz_derivative_property,calculate_gradients_j=self.train_xyz_derivative_property,**self.kernel_function_kwargs) kernel_matrix[ii][jj] += value_and_derivatives['value'] kernel_matrix[jj][ii] += value_and_derivatives['value'] if self.train_xyz_derivative_property: kernel_matrix[jj,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] = value_and_derivatives['gradients'].reshape(3*Natoms) kernel_matrix[ii,len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms] = value_and_derivatives['gradients_j'].reshape(3*Natoms) kernel_matrix[len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms,len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms] += value_and_derivatives['Hessian'] kernel_matrix[len(yy)+jj*3*Natoms:len(yy)+(jj+1)*3*Natoms,len(yy)+ii*3*Natoms:len(yy)+(ii+1)*3*Natoms] += value_and_derivatives['Hessian'] if self.train_xyz_derivative_property: kernel_matrix[len(yy):,:len(yy)] = kernel_matrix[:len(yy),len(yy):].T if invert_matrix: self.alphas = np.dot(np.linalg.inv(kernel_matrix), yyref) else: from scipy.linalg import cho_factor, cho_solve, lu_factor, lu_solve if matrix_decomposition==None: try: c, low = cho_factor(kernel_matrix, overwrite_a=True, check_finite=False) self.alphas = cho_solve((c, low), yyref, check_finite=False) except: c, low = lu_factor(kernel_matrix, overwrite_a=True, check_finite=False) self.alphas = lu_solve((c, low), yyref, check_finite=False) elif matrix_decomposition.casefold()=='Cholesky'.casefold(): c, low = cho_factor(kernel_matrix, overwrite_a=True, check_finite=False) self.alphas = cho_solve((c, low), yyref, check_finite=False) elif matrix_decomposition.casefold()=='LU'.casefold(): c, low = lu_factor(kernel_matrix, overwrite_a=True, check_finite=False) self.alphas = lu_solve((c, low), yyref, check_finite=False) def predict(self, molecular_database=None, molecule=None, calculate_energy=False, calculate_energy_gradients=False, calculate_hessian=False, # arguments if KREG is used as MLP ; hessian not implemented (possible with numerical differentiation) property_to_predict = None, xyz_derivative_property_to_predict = None, hessian_to_predict=None,): molDB, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict = \ super().predict(molecular_database=molecular_database, molecule=molecule, calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, calculate_hessian=calculate_hessian, property_to_predict = property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict, hessian_to_predict = hessian_to_predict) Natoms = len(molDB.molecules[0].atoms) kk_size = 0 if self.train_property: kk_size += self.Ntrain if self.train_xyz_derivative_property: kk_size += self.Ntrain * Natoms*3 for mol in molDB.molecules: kk = np.zeros(kk_size) kk_der = np.zeros((kk_size,3*Natoms)) for ii in range(self.Ntrain): value_and_derivatives = self.kernel_function(self.train_xyz[ii],mol.xyz_coordinates,calculate_gradients=self.train_xyz_derivative_property,**self.kernel_function_kwargs) kk[ii] = value_and_derivatives['value'] if self.train_xyz_derivative_property: kk[self.Ntrain+ii*3*Natoms:self.Ntrain+(ii+1)*3*Natoms] = value_and_derivatives['gradients'].reshape(3*Natoms) if xyz_derivative_property_to_predict: value_and_derivatives = self.kernel_function(mol.xyz_coordinates,self.train_xyz[ii],calculate_gradients=bool(xyz_derivative_property_to_predict),calculate_Hessian=self.train_xyz_derivative_property,**self.kernel_function_kwargs) kk_der[ii] = value_and_derivatives['gradients'].reshape(3*Natoms) if self.train_xyz_derivative_property: kk_der[self.Ntrain+ii*3*Natoms:self.Ntrain+(ii+1)*3*Natoms,:] = value_and_derivatives['Hessian'].T gradients = np.matmul(self.alphas.reshape(1,len(self.alphas)),kk_der)[0] mol.__dict__[property_to_predict] = np.sum(np.multiply(self.alphas, kk))+self.prior for iatom in range(len(mol.atoms)): mol.atoms[iatom].__dict__[xyz_derivative_property_to_predict] = gradients[3*iatom:3*iatom+3] def gaussian_kernel_function(self,coordi,coordj,calculate_value=True,calculate_gradients=False,calculate_gradients_j=False,calculate_Hessian=False,**kwargs): import torch if 'Req' in kwargs: Req = kwargs['Req'] if calculate_Hessian: calculate_gradients = True calculate_value = True elif calculate_gradients: calculate_value = True coordi_tensor = torch.tensor(coordi,requires_grad=True) coordj_tensor = torch.tensor(coordj,requires_grad=True) value = None gradients = None gradients_j = None Hessian = None xi_tensor = self.RE_descriptor_tensor(coordi_tensor,Req) xj_tensor = self.RE_descriptor_tensor(coordj_tensor,Req) if calculate_value: value_tensor = torch.exp(torch.sum(torch.square(xi_tensor - xj_tensor))/(-2*self.hyperparameters['sigma'].value**2)) value = value_tensor.detach().numpy() if calculate_gradients: gradients_tensor = torch.autograd.grad(value_tensor,coordi_tensor,create_graph=True,retain_graph=True)[0] gradients = gradients_tensor.detach().numpy() if calculate_gradients_j: gradients_j_tensor = torch.autograd.grad(value_tensor,coordj_tensor,create_graph=True,retain_graph=True)[0] gradients_j = gradients_j_tensor.detach().numpy() if calculate_Hessian: Hessian = [] gradients_tensor = gradients_tensor.reshape(len(gradients_tensor)*3) for ii in range(len(gradients_tensor)): Hessian.append(torch.autograd.grad(gradients_tensor[ii],coordj_tensor,create_graph=True,retain_graph=True)[0].detach().numpy().reshape((len(gradients_tensor)))) Hessian = np.array(Hessian) output = {'value':value,'gradients':gradients,'gradients_j':gradients_j,'Hessian':Hessian} return output def RE_descriptor_tensor(self,coord_tensor,Req): import torch Natoms = len(coord_tensor) icount = 0 for iatom in range(Natoms): for jatom in range(iatom+1,Natoms): output = Req[icount] / self.distance_tensor(coord_tensor[iatom],coord_tensor[jatom]) if icount == 0: descriptor = output.reshape(1) else: descriptor = torch.cat((descriptor,output.reshape(1))) icount += 1 return descriptor def distance_tensor(self, atomi,atomj): import torch return torch.sqrt(self.distance_squared_tensor(atomi,atomj)) def distance_squared_tensor(self, atomi,atomj): import torch return torch.sum(torch.square(atomi-atomj))
[文档] class hyperparameter(): ''' Class of hyperparameter object, containing data could be used in hyperparameter optimizations. Arguments: value (Any, optional): The value of the hyperparameter. optimization_space (str, optional): Defines the space for hyperparameter. Currently supports ``'linear'``, and ``'log'``. dtype (Callable, optional): A callable object that forces the data type of value. Automatically choose one if set to ``None``. ''' def __init__(self, value: Any = None, optimization_space: str = 'linear', dtype: Union[Callable, None] = None, name: str = "", minval: Any = None, maxval: Any = None, step: Any = None, choices: Iterable[Any] = [], **kwargs): self.name = name self.dtype = dtype if dtype else None if value is None else type(value) self.value = value# @Yifan self.optimization_space = optimization_space # 'linear' or 'log' self.minval = minval self.maxval = maxval self.step = step self.choices = choices def __setattr__(self, key, value): if key == 'value': value = (value if isinstance(value, self.dtype) else self._cast_dtype(value)) if self.dtype else value if key == 'dtype': self._set_dtype_cast_method(value) super().__setattr__(key, value) def __repr__(self): return f'hyperparameter {str(self.__dict__)}' def _set_dtype_cast_method(self, dtype): if type(dtype) == tuple: dtype = dtype[0] if dtype == np.ndarray: self._cast_dtype = np.array else: self._cast_dtype = dtype
[文档] def update(self, new_hyperparameter:hyperparameter) -> None: ''' Update hyperparameter with data in another instance. Arguments: new_hyperparameter (:class:`mlatom.models.hyperparamters`): Whose data are to be applied to the current instance. ''' self.__dict__.update(new_hyperparameter.__dict__)
[文档] def copy(self): ''' Returns a copy of current instance. Returns: :class:`mlatom.models.hyperparamter`: a new instance copied from current one. ''' return hyperparameter(**self.__dict__)
[文档] class hyperparameters(UserDict): ''' Class for storing hyperparameters, values are auto-converted to :class:`mlatom.models.hyperparameter` objects. Inherit from collections.UserDict. Initiaion: Initiate with a dictinoary or kwargs or both. e.g.: .. code-block:: hyperparamters({'a': 1.0}, b=hyperparameter(value=2, minval=0, maxval=4)) ''' def __setitem__(self, key, value): if isinstance(value, hyperparameter): if key in self: super().__getitem__(key).update(value) else: super().__setitem__(key, value) elif key in self: super().__getitem__(key).value = value else: super().__setitem__(key, hyperparameter(value=value, name=key)) def __getattr__(self, key): if key in self: return self[key].value else: return self.__dict__[key] def __setattr__(self, key, value): if key.startswith('__') or (key in self.__dict__) or key == 'data': super().__setattr__(key, value) else: self.__setitem__(key, value) def __getstate__(self): return vars(self) def __setstate__(self, state): vars(self).update(state)
[文档] def copy(self, keys: Union[Iterable[str], None] = None) -> hyperparameters: ''' Returns a copy of current instance. Arguments: keys (Iterable[str], optional): If keys provided, only the hyperparameters selected by keys will be copied, instead of all hyperparameters. Returns: :class:`mlatom.models.hyperparamters`: a new instance copied from current one. ''' if keys is None: keys = self.keys() return hyperparameters({key: self[key].copy() for key in keys})
[文档] class kreg(krr, OMP_model, MKL_model): ''' Create a KREG model object. Arguments: model_file (str, optional): The name of the file where the model should be dumped to or loaded from. ml_program (str, optional): Specify which ML program to use. Avaliable options: ``'KREG_API'``, ``'MLatomF``. equilibrium_molecule (:class:`mlatom.data.molecule` | None): Specify the equilibrium geometry to be used to generate RE descriptor. The geometry with lowest energy/value will be selected if set to ``None``. prior (default - None): the prior can be 'mean', None (0.0), or any floating point number. hyperparameters (Dict[str, Any] | :class:`mlatom.models.hyperparameters`, optional): Updates the hyperparameters of the model with provided. ''' hyperparameters = hyperparameters({'lambda': hyperparameter(value=2**-35, minval=2**-35, maxval=1.0, optimization_space='log', name='lambda'), 'sigma': hyperparameter(value=1.0, minval=2**-5, maxval=2**9, optimization_space='log', name='sigma')}) def __init__(self, model_file: Union[str, None] = None, ml_program: str = 'KREG_API', equilibrium_molecule: Union[data.molecule, None] = None, prior: float = 0, nthreads: Union[int, None] = None, hyperparameters: Union[Dict[str,Any], hyperparameters]={}): self.model_file = model_file self.equilibrium_molecule = equilibrium_molecule self.ml_program = ml_program if self.ml_program.casefold() == 'KREG_API'.casefold(): from .kreg_api import KREG_API if self.model_file != None: if self.model_file[-4:] != '.npz': self.model_file += '.npz' if os.path.exists(self.model_file): self.kreg_api = KREG_API() self.kreg_api.load_model(self.model_file) if self.ml_program.casefold() == 'MLatomF'.casefold(): from . import interface_MLatomF self.interface_mlatomf = interface_MLatomF self.hyperparameters = self.hyperparameters.copy() self.hyperparameters.update(hyperparameters) self.nthreads = nthreads def parse_args(self, args): super().parse_args(args) for hyperparam in self.hyperparameters: if hyperparam in args.hyperparameter_optimization['hyperparameters']: self.parse_hyperparameter_optimization(args, hyperparam) elif hyperparam in args.data: if args.data[hyperparam].lower() == 'opt': self.hyperparameters[hyperparam].dtype = str self.hyperparameters[hyperparam].value = args.data[hyperparam] if args.eqXYZfileIn: self.equilibrium_molecule = data.molecule.from_xyz_file(args.eqXYZfileIn)
[文档] def generate_model_dict(self): model_dict = super().generate_model_dict() model_dict['kwargs']['ml_program'] = self.ml_program return model_dict
def get_descriptor(self, molecule=None, molecular_database=None, descriptor_name='re', equilibrium_molecule=None): if molecular_database != None: molDB = molecular_database elif molecule != None: molDB = data.molecular_database() molDB.molecules.append(molecule) else: errmsg = 'Either molecule or molecular_database should be provided in input' raise ValueError(errmsg) if 'reference_distance_matrix' in self.__dict__: eq_distmat = self.reference_distance_matrix else: if equilibrium_molecule == None: if 'energy' in molDB.molecules[0].__dict__: energies = molDB.get_properties(property_name='energy') equilibrium_molecule = molDB.molecules[np.argmin(energies)] elif 'y' in molecular_database.molecules[0].__dict__: y = molecular_database.get_properties(property_name='y') equilibrium_molecule = molecular_database.molecules[np.argmin(y)] else: errmsg = 'equilibrium molecule is not provided and no energies are found in molecular database' raise ValueError(errmsg) eq_distmat = equilibrium_molecule.get_internuclear_distance_matrix() self.reference_distance_matrix = eq_distmat natoms = len(molDB.molecules[0].atoms) for mol in molDB.molecules: descriptor = np.zeros(int(natoms*(natoms-1)/2)) distmat = mol.get_internuclear_distance_matrix() ii = -1 for iatomind in range(natoms): for jatomind in range(iatomind+1,natoms): ii += 1 descriptor[ii] = eq_distmat[iatomind][jatomind]/distmat[iatomind][jatomind] mol.__dict__[descriptor_name] = descriptor mol.descriptor = descriptor def get_equilibrium_distances(self,molecular_database=None): if self.equilibrium_molecule == None: self.equilibrium_molecule = self.get_equilibrium_molecule(molecular_database=molecular_database) eq_distmat = self.equilibrium_molecule.get_internuclear_distance_matrix() Req = np.array([]) for ii in range(len(eq_distmat)): Req = np.concatenate((Req,eq_distmat[ii][ii+1:])) return Req def get_equilibrium_molecule(self,molecular_database=None): if 'energy' in molecular_database.molecules[0].__dict__: energies = molecular_database.get_properties(property_name='energy') equilibrium_molecule = molecular_database.molecules[np.argmin(energies)] elif 'y' in molecular_database.molecules[0].__dict__: y = molecular_database.get_properties(property_name='y') equilibrium_molecule = molecular_database.molecules[np.argmin(y)] else: errmsg = 'equilibrium molecule is not provided and no energies are found in molecular database' raise ValueError(errmsg) return equilibrium_molecule
[文档] def train(self, molecular_database=None, property_to_learn=None, xyz_derivative_property_to_learn = None, save_model=True, invert_matrix=False, matrix_decomposition=None, prior=None, hyperparameters: Union[Dict[str,Any], hyperparameters] = {},): ''' Train the KREG model with molecular database provided. Arguments: molecular_database (:class:`mlatom.data.molecular_database`): The database of molecules for training. property_to_learn (str, optional): The label of property to be learned in model training. xyz_derivative_property_to_learn (str, optional): The label of XYZ derivative property to be learned. prior (str or float or int, optional): default zero prior. It can also be 'mean' and any user-defined number. ''' self.hyperparameters.update(hyperparameters) if self.ml_program.casefold() == 'MLatomF'.casefold(): mlatomfargs = ['createMLmodel'] + ['%s=%s' % (param, self.hyperparameters[param].value) for param in self.hyperparameters.keys()] if save_model: if self.model_file == None: self.model_file =f'kreg_{str(uuid.uuid4())}.unf' with tempfile.TemporaryDirectory() as tmpdirname: molecular_database.write_file_with_xyz_coordinates(filename = f'{tmpdirname}/train.xyz') mlatomfargs.append(f'XYZfile={tmpdirname}/train.xyz') if property_to_learn != None: molecular_database.write_file_with_properties(filename = f'{tmpdirname}/y.dat', property_to_write = property_to_learn) mlatomfargs.append(f'Yfile={tmpdirname}/y.dat') if xyz_derivative_property_to_learn != None: molecular_database.write_file_with_xyz_derivative_properties(filename = f'{tmpdirname}/ygrad.xyz', xyz_derivative_property_to_write = xyz_derivative_property_to_learn) mlatomfargs.append(f'YgradXYZfile={tmpdirname}/ygrad.xyz') if prior != None: mlatomfargs.append(f'prior={prior}') mlatomfargs.append(f'MLmodelOut={self.model_file}') if 'additional_mlatomf_args' in self.__dict__: mlatomfargs += self.additional_mlatomf_args self.interface_mlatomf.ifMLatomCls.run(mlatomfargs, shutup=True) elif self.ml_program.casefold() == 'KREG_API'.casefold(): from .kreg_api import KREG_API if save_model: if self.model_file == None: self.model_file = f'kreg_{str(uuid.uuid4())}.npz' else: if self.model_file[-4:] != '.npz': self.model_file += '.npz' self.kreg_api = KREG_API() self.kreg_api.nthreads = self.nthreads if self.equilibrium_molecule == None: self.equilibrium_molecule = self.get_equilibrium_molecule(molecular_database=molecular_database) if not 'lambdaGradXYZ' in self.hyperparameters.keys(): lambdaGradXYZ = self.hyperparameters['lambda'].value else: lambdaGradXYZ = self.hyperparameters['lambdaGradXYZ'].value if 'prior' in self.hyperparameters.keys(): prior = self.hyperparameters['prior'].value self.kreg_api.train(self.hyperparameters['sigma'].value,self.hyperparameters['lambda'].value,lambdaGradXYZ,molecular_database,self.equilibrium_molecule,property_to_learn=property_to_learn,xyz_derivative_property_to_learn=xyz_derivative_property_to_learn,prior=prior) if save_model: self.kreg_api.save_model(self.model_file) else: if 'reference_distance_matrix' in self.__dict__: del self.__dict__['reference_distance_matrix'] Req = self.get_equilibrium_distances(molecular_database=molecular_database) super().train(molecular_database=molecular_database, property_to_learn=property_to_learn, invert_matrix=invert_matrix, matrix_decomposition=matrix_decomposition, kernel_function_kwargs={'Req':Req},prior=prior)
[文档] def predict(self, molecular_database=None, molecule=None, calculate_energy=False, calculate_energy_gradients=False, calculate_hessian=False, # arguments if KREG is used as MLP ; hessian not implemented (possible with numerical differentiation) property_to_predict = None, xyz_derivative_property_to_predict = None, hessian_to_predict = None,): if self.ml_program.casefold() != 'MLatomF'.casefold() and self.ml_program.casefold() != 'KREG_API'.casefold(): super().predict(molecular_database=molecular_database, molecule=molecule, calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, property_to_predict=property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict) else: molDB, property_to_predict, xyz_derivative_property_to_predict, hessian_to_predict = \ super(krr, self).predict(molecular_database=molecular_database, molecule=molecule, calculate_energy=calculate_energy, calculate_energy_gradients=calculate_energy_gradients, calculate_hessian=calculate_hessian, property_to_predict = property_to_predict, xyz_derivative_property_to_predict = xyz_derivative_property_to_predict, hessian_to_predict = hessian_to_predict) if self.ml_program.casefold() == 'KREG_API'.casefold(): if 'kreg_api' in dir(self): self.kreg_api.nthreads = self.nthreads self.kreg_api.predict(molecular_database=molecular_database,molecule=molecule,property_to_predict=property_to_predict,xyz_derivative_property_to_predict=xyz_derivative_property_to_predict) else: stopper.stopMLatom('KREG_API model not found') elif self.ml_program.casefold() == 'MLatomF'.casefold(): with tempfile.TemporaryDirectory() as tmpdirname: molDB.write_file_with_xyz_coordinates(filename = f'{tmpdirname}/predict.xyz') mlatomfargs = ['useMLmodel', 'MLmodelIn=%s' % self.model_file] mlatomfargs.append(f'XYZfile={tmpdirname}/predict.xyz') if property_to_predict: mlatomfargs.append(f'YestFile={tmpdirname}/yest.dat') if xyz_derivative_property_to_predict: mlatomfargs.append(f'YgradXYZestFile={tmpdirname}/ygradest.xyz') self.interface_mlatomf.ifMLatomCls.run(mlatomfargs, shutup=True) if not os.path.exists(f'{tmpdirname}/yest.dat'): import time time.sleep(0.0000001) # Sometimes program needs more time to write file yest.dat / P.O.D., 2023-04-18 os.system(f'cp -r {tmpdirname} .') if property_to_predict != None: molDB.add_scalar_properties_from_file(filename = f'{tmpdirname}/yest.dat', property_name = property_to_predict) if xyz_derivative_property_to_predict != None: molDB.add_xyz_derivative_properties_from_file(filename = f'{tmpdirname}/ygradest.xyz', xyz_derivative_property = xyz_derivative_property_to_predict)
from .interfaces.torchani_interface import ani, msani
[文档] def dpmd(**kwargs): ''' Returns a DPMD model object (see :class:`mlatom.interfaces.dpmd_interface.dpmd`). ''' from .interfaces.dpmd_interface import dpmd return dpmd(**kwargs)
[文档] def gap(**kwargs): ''' Returns a GAP model object (see :class:`mlatom.interfaces.gap_interface.gap`). ''' from .interfaces.gap_interface import gap return gap(**kwargs)
[文档] def physnet(**kwargs): ''' Returns a PhysNet model object (see :class:`mlatom.interfaces.physnet_interface.physnet`). ''' from .interfaces.physnet_interface import physnet return physnet(**kwargs)
[文档] def sgdml(**kwargs): ''' Returns an sGDML model object (see :class:`mlatom.interfaces.sgdml_interface.sgdml`). ''' from .interfaces.sgdml_interface import sgdml return sgdml(**kwargs)
[文档] def mace(**kwargs): ''' Returns an MACE model object (see :class:`mlatom.interfaces.mace_interface.mace`). ''' from .interfaces.mace_interface import mace return mace(**kwargs)
[文档] class model_tree_node(model): ''' Create a model tree node. Arguments: name (str): The name assign to the object. parent: The parent of the model node. children: The children of this model tree node. operator: Specify the operation to be made when making predictions. ''' def __init__(self, name=None, parent=None, children=None, operator=None, model=None): self.name = name self.parent = parent self.children = children if self.parent != None: if self.parent.children == None: self.parent.children = [] if not self in self.parent.children: self.parent.children.append(self) if self.children != None: for child in self.children: child.parent=self self.operator = operator self.model = model def set_num_threads(self, nthreads=0): super().set_num_threads(nthreads) if self.nthreads: if self.children != None: for child in self.children: child.set_num_threads(self.nthreads) else: self.model.set_num_threads(self.nthreads)
[文档] def predict(self, **kwargs): molDB = super().predict(**kwargs) if len(molDB) == 0: return if 'calculate_energy' in kwargs: calculate_energy = kwargs['calculate_energy'] else: calculate_energy = True if 'calculate_energy_gradients' in kwargs: calculate_energy_gradients = kwargs['calculate_energy_gradients'] else: calculate_energy_gradients = False if 'calculate_hessian' in kwargs: calculate_hessian = kwargs['calculate_hessian'] else: calculate_hessian = False if 'nstates' in kwargs: nstates = kwargs['nstates'] else: nstates = 1 if 'current_state' in kwargs: current_state = kwargs['current_state'] else: current_state = 0 properties = [] ; atomic_properties = [] if calculate_energy: properties.append('energy') if calculate_energy_gradients: atomic_properties.append('energy_gradients') if calculate_hessian: properties.append('hessian') for mol in molDB.molecules: if nstates: mol_copy = mol.copy() mol_copy.electronic_states = [] if nstates >1: for _ in range(nstates - len(mol.electronic_states)): mol.electronic_states.append(mol_copy.copy()) for mol_el_st in mol.electronic_states: if not self.name in mol_el_st.__dict__: parent = None if self.parent != None: if self.parent.name in mol_el_st.__dict__: parent = mol_el_st.__dict__[self.parent.name] children = None if self.children != None: for child in self.children: if child.name in mol_el_st.__dict__: if children == None: children = [] children.append(mol_el_st.__dict__[child.name]) mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children) if not self.name in mol.__dict__: parent = None if self.parent != None: if self.parent.name in mol.__dict__: parent = mol.__dict__[self.parent.name] children = None if self.children != None: for child in self.children: if child.name in mol.__dict__: if children == None: children = [] children.append(mol.__dict__[child.name]) mol.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children) if self.children == None and self.operator == 'predict': self.model.predict(**kwargs) for mol in molDB.molecules: if not mol.electronic_states: self.get_properties_from_molecule(mol, properties, atomic_properties) for mol_el_st in mol.electronic_states: # mol_el_st.__dict__[self.name] = data.properties_tree_node(name=self.name, parent=parent, children=children) self.get_properties_from_molecule(mol_el_st, properties, atomic_properties) else: for child in self.children: child.predict(**kwargs) if 'weight' in child.__dict__.keys(): mol.__dict__[child.name].__dict__['weight'] = child.weight if self.operator == 'sum': for mol in molDB.molecules: if not mol.electronic_states: mol.__dict__[self.name].sum(properties+atomic_properties) for mol_el_st in mol.electronic_states: mol_el_st.__dict__[self.name].sum(properties+atomic_properties) if self.operator == 'weighted_sum': for mol in molDB.molecules: if not mol.electronic_states: mol.__dict__[self.name].weighted_sum(properties+atomic_properties) for mol_el_st in mol.electronic_states: mol_el_st.__dict__[self.name].weighted_sum(properties+atomic_properties) if self.operator == 'average': for mol in molDB.molecules: if not mol.electronic_states: mol.__dict__[self.name].average(properties+atomic_properties) for mol_el_st in mol.electronic_states: mol_el_st.__dict__[self.name].average(properties+atomic_properties) if self.parent == None: self.update_molecular_properties(molecular_database=molDB, properties=properties, atomic_properties=atomic_properties, current_state=current_state)
def get_properties_from_molecule(self, molecule, properties=[], atomic_properties=[]): property_values = molecule.__dict__[self.name].__dict__ for property_name in properties: if property_name in molecule.__dict__: property_values[property_name] = molecule.__dict__.pop(property_name) for property_name in atomic_properties: property_values[property_name] = [] for atom in molecule.atoms: property_values[property_name].append(atom.__dict__.pop(property_name)) property_values[property_name] = np.array(property_values[property_name]).astype(float) def update_molecular_properties(self, molecular_database=None, molecule=None, properties=[], atomic_properties=[], current_state=0): molDB = molecular_database if molecule != None: molDB = data.molecular_database() molDB.molecules.append(molecule) for mol in molDB.molecules: for property_name in properties: for mol_el_st in mol.electronic_states: mol_el_st.__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name] if not mol.electronic_states: mol.__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name] else: mol.__dict__[property_name] = mol.electronic_states[current_state].__dict__[property_name] for property_name in atomic_properties: for mol_el_st in mol.electronic_states: for iatom in range(len(mol_el_st.atoms)): mol_el_st.atoms[iatom].__dict__[property_name] = mol_el_st.__dict__[self.name].__dict__[property_name][iatom] if not mol.electronic_states: for iatom in range(len(mol.atoms)): mol.atoms[iatom].__dict__[property_name] = mol.__dict__[self.name].__dict__[property_name][iatom] else: for iatom in range(len(mol.atoms)): mol.atoms[iatom].__dict__[property_name] = mol.electronic_states[current_state].atoms[iatom].__dict__[property_name]
[文档] def dump(self, filename=None, format='json'): ''' Dump the object to a file. ''' model_dict = { 'type': 'model_tree_node', 'name': self.name, 'children': [child.dump(format='dict') for child in self.children] if self.children else None, 'operator': self.operator, 'model': self.model.dump(format='dict') if self.model else None, 'nthreads': self.nthreads, 'weight': self.weight if 'weight' in self.__dict__ else None } if format == 'json': import json with open(filename, 'w') as f: json.dump(model_dict, f, indent=4) if format == 'dict': return model_dict
from .aiqm1 import aiqm1 from .aiqm2 import aiqm2 from .dens import dens from .aimnet2 import aimnet2_methods from .interfaces.torchani_interface import ani_methods from .interfaces.gaussian_interface import gaussian_methods from .interfaces.pyscf_interface import pyscf_methods from .interfaces.orca_interface import orca_methods from .interfaces.turbomole_interface import turbomole_methods from .interfaces.mndo_interface import mndo_methods from .interfaces.sparrow_interface import sparrow_methods from .interfaces.xtb_interface import xtb_methods from .interfaces.dftbplus_interface import dftbplus_methods from .interfaces.columbus_interface import columbus_methods from .composite_methods import ccsdtstarcbs_legacy as ccsdtstarcbs from .interfaces.dftd3_interface import dftd3_methods from .interfaces.dftd4_interface import dftd4_methods
[文档] def methods(method: str = None, program: str = None, **kwargs): ''' Create a model object with a specified method. Arguments: method (str): Specify the method. Available methods are listed in the section below. program (str, optional): Specify the program to use. **kwargs: Other method and program-specific options **Available Methods:** ``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'AM1'``, ``'ANI-1ccx'``, ``'ANI-1x'``, ``'ANI-1x-D4'``, ``'ANI-2x'``, ``'ANI-2x-D4'``, ``'CCSD(T)*/CBS'``, ``'CNDO/2'``, ``'D4'``, ``'DFTB0'``, ``'DFTB2'``, ``'DFTB3'``, ``'GFN2-xTB'``, ``'MINDO/3'``, ``'MNDO'``, ``'MNDO/H'``, ``'MNDO/d'``, ``'MNDO/dH'``, ``'MNDOC'``, ``'ODM2'``, ``'ODM2*'``, ``'ODM3'``, ``'ODM3*'``, ``'OM1'``, ``'OM2'``, ``'OM3'``, ``'PM3'``, ``'PM6'``, ``'RM1'``, ``'SCC-DFTB'``, ``'SCC-DFTB-heats'``. Methods listed above can be accepted without specifying a program. The required programs still have to be installed though as described in the installation manual. **Available Programs and Their Corresponding Methods:** .. table:: :align: center =============== ========================================================================================================================================================================== Program Methods =============== ========================================================================================================================================================================== TorchANI ``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'ANI-1ccx'``, ``'ANI-1x'``, ``'ANI-1x-D4'``, ``'ANI-2x'``, ``'ANI-2x-D4'``, ``'ANI-1xnr'`` dftd4 ``'AIQM1'``, ``'AIQM1@DFT'``, ``'ANI-1x-D4'``, ``'ANI-2x-D4'``, ``'D4'`` MNDO or Sparrow ``'AIQM1'``, ``'AIQM1@DFT'``, ``'AIQM1@DFT*'``, ``'MNDO'``, ``'MNDO/d'``, ``'ODM2*'``, ``'ODM3*'``, ``'OM2'``, ``'OM3'``, ``'PM3'``, ``'SCC-DFTB'``, ``'SCC-DFTB-heats'`` MNDO ``'CNDO/2'``, ``'MINDO/3'``, ``'MNDO/H'``, ``'MNDO/dH'``, ``'MNDOC'``, ``'ODM2'``, ``'ODM3'``, ``'OM1'``, semiempirical OMx, DFTB, NDDO-type methods Sparrow ``'DFTB0'``, ``'DFTB2'``, ``'DFTB3'``, ``'PM6'``, ``'RM1'``, semiempirical OMx, DFTB, NDDO-type methods xTB ``'GFN2-xTB'``, semiempirical GFNx-TB methods Orca ``'CCSD(T)*/CBS'``, DFT Gaussian ab initio methods, DFT PySCF ab initio methods, DFT =============== ========================================================================================================================================================================== ''' method_instance = None if program is not None: program_method = f'{program.casefold()}_methods' if not program_method in globals(): raise ValueError(f'The {program} is not recognized. Check the spelling or it might be not implemented.') program_method = globals()[program_method] if method is not None: method_instance = program_method(method=method, **kwargs) else: method_instance = program_method(**kwargs) elif method is not None: #mlatom_methods = ['aiqm'] # This is the order in which for method_class in method_classes(): if not hasattr(method_class, 'is_method_supported'): continue if hasattr(method_class, 'is_program_found'): # to-do implement in all interfaces which are not provided with mlatom (i.e., xtb is provided but others - not) if method_class.is_program_found() is False: continue if method_class.is_method_supported(method): method_instance = method_class(method=method, **kwargs) break else: raise ValueError('''This method is not detected in any of the interfaces MLatom could find. Possible reasons: 1. You might have misspelled method's name. 2. MLatom could not find the required program on your device (check installation instructions). 3. You might need to specify program, e.g., mymethod = mlatom.methods(method=[your method], program=[required program]), if you are sure that this program supports this method and the program is interfaced and found by MLatom. MLatom does not know all the methods available in each interfaced program. ''') return method_instance
def known_methods(): supported_methods = [] for method_class in method_classes(): if 'supported_methods' in method_class.__dict__: supported_methods += method_class.supported_methods return supported_methods def method_classes(): programs = ['gaussian', 'pyscf', 'orca', 'turbomole', 'mndo', 'sparrow', 'xtb', 'dftbplus', 'columbus', 'dftd3', 'dftd4'] all_classes = [aiqm1, aiqm2, dens, ani_methods, aimnet2_methods, ccsdtstarcbs] all_classes += [globals()[f'{prog.casefold()}_methods'] for prog in programs] return all_classes def load(filename, format=None): ''' Load a saved model object. ''' if filename[-5:] == '.json' or format == 'json': return load_json(filename) elif filename[-5:] == '.pkl' or format == 'pkl': return load_pickle(filename) def load_json(filename): import json with open(filename) as f: model_dict = json.load(f) return load_dict(model_dict) def load_pickle(filename): import pickle with open(filename, 'rb') as file: return pickle.load(file) def load_dict(model_dict): type = model_dict.pop('type') nthreads = model_dict.pop('nthreads') if 'nthreads' in model_dict else 0 if type == 'method': kwargs = {} if 'kwargs' in model_dict: kwargs = model_dict.pop('kwargs') model = methods(**model_dict, **kwargs) if type == 'ml_model': model = globals()[model_dict['ml_model_type'].split('.')[-1]](**model_dict['kwargs']) if type == 'model_tree_node': children = [load_dict(child_dict) for child_dict in model_dict['children']] if model_dict['children'] else None name = model_dict['name'] operator = model_dict['operator'] model = load_dict(model_dict['model']) if model_dict['model'] else None weight = model_dict['weight'] if 'weight' in model_dict else None model = model_tree_node(name=name, children=children, operator=operator, model=model) if weight: model.weight = weight if type not in ['method', 'ml_model', 'model_tree_node']: moduleinfo = model_dict.pop('module') module_import = import_from_path(moduleinfo['name'], moduleinfo['path']) modelcls = module_import.__dict__[type] import inspect init_signature = inspect.signature(modelcls.__init__) init_params = init_signature.parameters init_kwargs = {} other_kwargs = {} for key in model_dict.keys(): if key in init_params: init_kwargs[key] = model_dict[key] else: other_kwargs[key] = model_dict[key] if 'init_kwargs' in model_dict.keys(): init_kwargs = model_dict['init_kwargs'] model = modelcls(**init_kwargs) for key in other_kwargs.keys(): model.__dict__[key] = other_kwargs[key] model.set_num_threads(nthreads) return model def import_from_path(module_name, file_path): # https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly import importlib spec = importlib.util.spec_from_file_location(module_name, file_path) module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module if __name__ == '__main__': pass