"""
This module provides functionalities to derive and obtain typed polarizabilities from QM reference data.
This module uses `Ray` to distribute the optimization process. The optimization process is performed by `scipy.optimize.minimize` with `Nelder-Mead` method.
"""
import copy
import os
import shutil
from typing import List
import logging
import numpy as np
import scipy
import pint
import ray
from openff.recharge.esp.storage import MoleculeESPRecord
from openff.toolkit import ForceField
from scipy.optimize import minimize, nnls
from collections import defaultdict
from factorpol.charge_training import ChargeTrainer
from factorpol.qm_worker import rebuild_molecule
from factorpol.utilities import (
calc_rrms,
flatten_a_list,
Polarizability,
StorageHandler,
pair_equivalent,
)
ureg = pint.UnitRegistry()
Q_ = ureg.Quantity
logger = logging.getLogger(__name__)
# elementary_charge/bohr to kcal/mol/elementary_charge
au_to_kcal = 633.0917033332278
a03_to_ang3 = Q_(1, "bohr**3").to("angstrom**3").magnitude
[docs]class AlphaWorker(ChargeTrainer):
"""
A class used to derive polarizability from QM ESPs.
Parameters
----------
record: MoleculeESPRecord
QM reference record
off_forcefield: ForceField
OpenFF force field to handle SMIRNOFF typing of polarizabilities
polarizability: Polarizability
Input polarizabilities
coulomb14scale: float
Scaling factor to scale Coulomb 1-4 interactions
"""
def __init__(
self,
record: MoleculeESPRecord,
off_forcefield: ForceField,
polarizability: Polarizability,
coulomb14scale: float = 0.5,
):
super().__init__(
record=record,
off_forcefield=off_forcefield,
polarizability=polarizability,
coulomb14scale=coulomb14scale,
)
# self.alphas = polarizability
self.coulomb14scale = coulomb14scale
self.perturb_dipole = record.esp_settings.perturb_dipole
@property
def vdiff(self):
return self._vdiff
@vdiff.setter
def vdiff(self, value: np.ndarray):
self._vdiff = value
[docs]def _update_workers(
workers: List[AlphaWorker], parameters_path: str, coulomb14scale: float = 0.5
):
"""
Method to update charge workers for next optimization iteration
Parameters
----------
workers: List[AlphaWorker]
A list of AlphaWorker
parameters_path: str
Path to newly solved polarizabilities
coulomb14scale: float
Scaling factor of Coulomb 1-4 interactions.
Normally set as constant but it could be optimized.
Returns
-------
AlphaWorker
Returns updated AlphasWorker
"""
pols = Polarizability(data_source=parameters_path)
for w in workers:
w.alphas = [
pols.parameters[p].to("bohr**3").magnitude for p in w.smirnoff_patterns
]
# w.coulomb14scale = coulomb14scale # remove comment if optimizing
return workers
[docs]class AlphasTrainer:
"""
Top level optimizer to train polarizability parameters
Parameters
----------
workers: List[AlphaWorker]
A list of AlphaWorker
prior: Polarizability
Initial polarizability
working_directory: str
The path to the working directory
"""
def __init__(
self,
workers: List[AlphaWorker],
prior: Polarizability,
working_directory: str = os.path.join(os.getcwd(), "data_alphas"),
):
self.coulomb14scale = None
self.alphas_path = None
self.working_directory = working_directory
self.iteration = 0
self.base = prior.data
self.parameter_type_to_train = prior.parameters.keys()
self.prior = [v.magnitude for v in prior.parameters.values()]
if os.path.exists(self.working_directory):
logger.warning("Path exists, deleting")
shutil.rmtree(self.working_directory)
os.makedirs(self.working_directory, exist_ok=True)
self.workers = workers
[docs] def worker(self, input_data: np.ndarray):
"""
A method to compute the loss
Parameters
----------
input_data: ndarray
Polarizability solved from the previous optimization
Returns
-------
Returns the optimizer output object.
"""
self.iteration += 1
self.alphas_path = os.path.join(
self.working_directory, f"alpha_{self.iteration:03d}.log"
)
for k, v in zip(self.parameter_type_to_train, input_data):
self.base.loc[k, "Polarizability (angstrom**3)"] = v
self.base.to_csv(self.alphas_path)
workers = _update_workers(
workers=self.workers,
parameters_path=self.alphas_path,
)
loss = [AlphasTrainer._calc_loss.remote(w) for w in workers]
ret = np.mean(ray.get(loss))
os.system(f"echo {ret} >> {os.path.join(self.working_directory, 'Loss.log')}")
return ret
[docs] def optimize(self, bounds, num_cpus=8):
"""
Distribute the optimization process with `Ray`
Parameters
----------
bounds: tuple
The bounds of each polarizability type
num_cpus: int
The number of CPUs available for optimization
Returns
-------
Returns the result object of optimizer.
"""
ray.shutdown()
ray.init(num_cpus=num_cpus, num_gpus=0)
x0 = self.prior
# a simple boundary
# y = lambda x: (x/2, x*2)
# bounds = tuple([y(x) for x in x0])
res = minimize(self.worker, x0=x0, method="Nelder-Mead", bounds=bounds)
ray.shutdown()
return res
[docs] @staticmethod
def _calc_Esps_mu(worker: AlphaWorker) -> np.ndarray:
"""
Calculate the MM ESPs on grid points using mutual polarization
Parameters
----------
worker: AlphaWorker
The AlphaWorker that contains all ESP-fitting related data
Returns
-------
ndarray
Returns the computed ESPs on grid points
"""
external_field = worker.perturb_dipole
# cast a local electric field matrix
efield = np.full_like(a=np.zeros((worker.natoms, 3)), fill_value=external_field)
esps = np.zeros(worker.npoints)
alphas = copy.deepcopy(worker.alphas)
for j in range(worker.natoms):
for i in range(worker.npoints):
esps[i] += alphas[j] * np.dot(
efield[j], worker.r_ij[j, i] * worker.r_ij3[j, i]
)
return esps
@staticmethod
@ray.remote(num_cpus=1)
def _calc_loss(worker: AlphaWorker) -> float:
"""
Calculate the lost function (RRMS) for one worker
Parameters
----------
worker: AlphasWorker
The AlphaWorker that contains all ESP-fitting related data
Returns
-------
float
Returns the Loss (RRMS) value
"""
calced = AlphasTrainer._calc_Esps_mu(worker)
ref = worker.vdiff
loss = calc_rrms(calced, ref) # rrmse
return loss
[docs]class AlphaData:
"""
A class to prepare reference QM ESPs for optimization of polarizability
Parameters
----------
database_name: str
The name of database to query
dataset: List[str]
A list of molecules to query
off_forcefield: ForceField
An OpenFF Force Field to specify SMIRNOFF patterns
polarizability: Polarizability
A polarizability library
num_cpus: int
The number of process to initialize to generate relevant data
"""
def __init__(
self,
database_name: str,
dataset: List[str],
off_forcefield: ForceField,
polarizability: Polarizability,
num_cpus: int = 8,
):
self.database_name = database_name
self.dataset = dataset
self.workers = []
ray.shutdown()
ray.init(num_cpus=num_cpus)
ret = [
create_worker.remote(
database_name,
molecule=mol,
polarizability=polarizability,
off_forcefield=off_forcefield,
coulomb14scale=0.5,
)
for mol in self.dataset
]
workers = ray.get(ret)
self.workers = flatten_a_list(workers)
ray.shutdown()
@ray.remote(num_cpus=1)
def create_worker(
database_name: str,
molecule: str,
polarizability: Polarizability,
off_forcefield: ForceField,
coulomb14scale: float = 0.5,
) -> List[AlphaWorker]:
"""
This is a function to gather all necessary information to optmize polarizability
Parameters
----------
database_name: str
The name of dataset to query
molecule: str
The SMILES to look for.
polarizability: Polarizability
A polarizability library
off_forcefield: ForceField
OpenFF ForceField to label molecules with SMIRNOFF patterns.
coulomb14scale: float
Scaling factor of Coulomb 1-4 interactions.
Normally set as constant but it could be optimized.
Returns
-------
List[AlphaWorker]
Returns a list of prepared AlphaWorkers.
"""
workers = []
store = StorageHandler()
my_session = store.session(database_name)
records_dict = rebuild_molecule(my_session=my_session, molecule=molecule)
for conf_idx, records in records_dict.items():
base = None
this_conf = []
for r in records:
worker = AlphaWorker(
record=r,
off_forcefield=off_forcefield,
polarizability=polarizability,
coulomb14scale=coulomb14scale,
)
if np.allclose(np.zeros(3), r.esp_settings.perturb_dipole):
base = copy.deepcopy(worker)
base.vdiff = np.zeros(base.npoints)
else:
this_conf.append(worker)
for w in this_conf:
w.vdiff = w.esp_values - base.esp_values
workers.extend(this_conf)
return workers
@ray.remote(num_cpus=1)
def fit_alphas(worker: AlphaWorker, global_opt=False) -> np.ndarray:
r"""
Fit the polarizability of a molecule to reference QM ESPs
This is a remote function to be called by ray
Atomic units
.. math:: \sum_{j = 1}^{n}\sum_{i = 1}^{m}\frac{\alpha_j \mathrm{r}_{ij} \mathrm{E^2}}{r_{ij}^3r_{ik}} = \sum_{i = 1}^{m}\frac{V_i \mathrm{E} \mathrm{r}_{ij}}{r_{ik} r_{ij}^3}
Parameters
-----------
worker: AlphaWorker
The AlphaWorker that contains all ESP-fitting related data
global_opt: bool
Whether to perform global optimization
Returns
-----------
ndarray
Returns the fitted polarizability alphas if global_opt is False
Returns built matrix, vector, and polarizability types if global_opt is True
"""
natoms = worker.natoms
external_field = worker.perturb_dipole
# cast a local electric field matrix
efield = np.full_like(a=np.zeros((natoms, 3)), fill_value=external_field)
vdiff = worker.vdiff
# find same polarizability types to constraint them to be the same
tmp1 = defaultdict(list)
for idx1, rank in enumerate(worker.smirnoff_patterns):
tmp1[rank].append(idx1)
tmp2 = []
tmp3 = {}
for k, v in tmp1.items():
tmp3[k] = v[0]
if len(v) > 1:
tmp2.append([[v[i], v[i + 1]] for i in range(len(v) - 1)])
alphas_pairs = np.concatenate(tmp2)
ndim = worker.natoms + len(alphas_pairs)
# Distance matrix
D_ij = np.multiply(worker.r_ij, worker.r_ij3)
matrix = np.zeros((ndim, ndim, 3))
# This ugly loop is to make sure the maths is correct
for k in range(natoms):
for j in range(natoms):
for i in range(worker.npoints):
matrix[k, j] += np.square(external_field) * D_ij[k, i] * D_ij[j, i]
# force some polarizability types to have the same values
matrix = np.linalg.norm(matrix, axis=-1)
for idx, pair in enumerate(alphas_pairs):
matrix[natoms + idx, pair[0]] = 1.0
matrix[natoms + idx, pair[1]] = -1.0
matrix[pair[0], natoms + idx] = 1.0
matrix[pair[1], natoms + idx] = -1.0
vector = np.zeros((ndim, 3))
for k in range(natoms):
for i in range(worker.npoints):
vector[k] += external_field * D_ij[k, i] * vdiff[i]
vector = np.linalg.norm(vector, axis=-1)
if global_opt:
return matrix[:natoms, :natoms], vector[:natoms], worker.smirnoff_patterns
else:
ret = np.linalg.solve(matrix, vector)
return {k: v * a03_to_ang3 for k, v in zip(tmp1.keys(), ret[:natoms])}
[docs]def optimize_alphas(
worker_list: List[AlphaWorker], solved=True, num_cpus=8
) -> np.ndarray:
r"""
A function to optimize the polarizability of a dataset to reference QM ESPs
Atomic units
Objective Function:
.. math:: {\chi}^2 = \sum_{k=1}^{N_{conf}} \sum_{l=1}^{6}\sum_{i=1}^{m} (V_{diff,ikl} -\sum_{j=1}^{n_k}\frac{{\mu}_{{ind,jl}}{r}_{ij}}{r_{ij}^3})^2
Parameters
-----------
worker_list: List[AlphaWorker]
solved: bool
Whether the polarizability is solved or not
num_cpus: int
Number of CPUs to use for ray workers
Returns
-----------
ndarray, ndarray, ndarray
Returns matrix, vector, and polarizability types
Returns the fitted polarizability alphas and objectives if solved is True
"""
ray.shutdown()
ray.init(num_cpus=num_cpus)
logger.info("Fitting alphas with %s ray workers" % num_cpus)
workers = [fit_alphas.remote(w, global_opt=True) for w in worker_list]
# Get results from ray
ret = ray.get(workers)
# Split the returns to matrix, vector, and polarizability types
a_lst = [r[0] for r in ret]
b_lst = [r[1] for r in ret]
pol_lst = [r[-1] for r in ret]
# Calculate and pair the same polarizabilities
pol_lst_flatten = flatten_a_list(pol_lst)
pairs = pair_equivalent(pol_lst_flatten)
n_same_alphas = len(pairs)
ndim0 = np.sum([m.natoms for m in worker_list])
ndim = ndim0 + n_same_alphas
# Initializing a new matrix to do the optimization
final_a = np.zeros((ndim, ndim))
design = scipy.linalg.block_diag(*a_lst)
final_a[:ndim0, :ndim0] = design
# Apply same alphas restraints
for idx, pair in enumerate(pairs):
this_idx = (ndim - n_same_alphas) + idx
final_a[this_idx, pair[0]] = 1.0
final_a[this_idx, pair[1]] = -1.0
final_a[pair[0], this_idx] = 1.0
final_a[pair[1], this_idx] = -1.0
final_b = np.append(np.concatenate(b_lst), np.zeros(n_same_alphas))
if solved:
# ret = nnls(final_a, final_b)
ret = np.linalg.solve(final_a, final_b)
dt = {
k: Q_(v, "a0**3").to("angstrom**3")
for k, v in zip(pol_lst_flatten, ret[: len(pol_lst_flatten)])
}
return dt
else:
return final_a, final_b, set(pol_lst_flatten)
@ray.remote(num_cpus=1)
def fit_alphas_fast(worker: AlphaWorker, global_opt=False) -> np.ndarray:
r"""
** This method is extremely experimental and not recommended for production use **
Fit the polarizability of a molecule to reference QM ESPs with a fast method
This is a remote function to be called by ray
Atomic units
Parameters
-----------
worker: AlphaWorker
The AlphaWorker that contains all ESP-fitting related data
global_opt: bool
Whether to perform global optimization
Returns
-----------
ndarray
Returns the fitted polarizability alphas if global_opt is False
Returns built matrix, vector, and polarizability types if global_opt is True
"""
natoms = worker.natoms
external_field = worker.perturb_dipole
vdiff = worker.vdiff.reshape(-1, 1)
param_dict = defaultdict(list)
for idx1, rank in enumerate(worker.smirnoff_patterns):
param_dict[rank].append(idx1)
ndim = len(param_dict)
A = np.zeros((ndim, ndim))
B = np.zeros((ndim))
D_ijE = worker.r_ij * worker.r_ij3 * external_field
for k, (_, alphas_k) in enumerate(param_dict.items()):
Cik = D_ijE[alphas_k].sum(axis=0)
B[k] = np.einsum("ic,ik->", Cik, vdiff)
for l, (_, alphas_l) in enumerate(param_dict.items()):
Cil = D_ijE[alphas_l].sum(axis=0)
A[k, l] = np.einsum("ic,ik->", Cil, Cik)
if global_opt:
return A, B, param_dict.keys()
else:
ret = nnls(A, B)[0]
return {k: v * a03_to_ang3 for k, v in zip(param_dict.keys(), ret)}
[docs]def optimize_alphas_fast(
worker_list: List[AlphaWorker], solved=True, num_cpus=8
) -> np.ndarray:
r"""
** This method is extremely experimental and not recommended for production use **
This implementation is not efficient enough and needs to be improved.
A fast method to optimize the polarizability of a dataset to reference QM ESPs
Atomic units
Objective Function:
Parameters
-----------
worker_list: List[AlphaWorker]
solved: bool
Whether the polarizability is solved or not
num_cpus: int
Number of CPUs to use for ray workers
Returns
-----------
ndarray, ndarray, ndarray
Returns matrix, vector, and polarizability types
Returns the fitted polarizability alphas and objectives if solved is True
"""
ray.shutdown()
ray.init(num_cpus=num_cpus)
logger.info("Fitting alphas with %s ray workers" % num_cpus)
workers = [fit_alphas_fast.remote(w, global_opt=True) for w in worker_list]
# Get results from ray
ret = ray.get(workers)
# Split the returns to matrix, vector, and polarizability types
a_lst = [r[0] for r in ret]
b_lst = [r[1] for r in ret]
pol_lst = [r[-1] for r in ret]
# Calculate and pair the same polarizabilities
pol_lst_flatten = flatten_a_list(pol_lst)
pairs = pair_equivalent(pol_lst_flatten)
n_same_alphas = len(pairs)
ndim0 = np.sum([len(m) for m in a_lst])
ndim = ndim0 + n_same_alphas
# Initializing a new matrix to do the optimization
final_a = np.zeros((ndim, ndim))
design = scipy.linalg.block_diag(*a_lst)
final_a[:ndim0, :ndim0] = design
# Apply same alphas restraints
for idx, pair in enumerate(pairs):
this_idx = (ndim - n_same_alphas) + idx
final_a[this_idx, pair[0]] = 1.0
final_a[this_idx, pair[1]] = -1.0
final_a[pair[0], this_idx] = 1.0
final_a[pair[1], this_idx] = -1.0
final_b = np.append(np.concatenate(b_lst), np.zeros(n_same_alphas))
if solved:
ret = np.linalg.solve(final_a, final_b)
# ret = nnls(final_a, final_b)
dt = {
k: Q_(v, "a0**3").to("angstrom**3")
for k, v in zip(pol_lst_flatten, ret[: len(pol_lst_flatten)])
}
return dt
else:
return final_a, final_b, set(pol_lst_flatten)