Source code for espaloma.data.qcarchive_utils

# =============================================================================
# IMPORTS
# =============================================================================
from collections import namedtuple
from typing import Tuple

import numpy as np
import qcportal
import torch
from openmm import unit
from openmm.unit import Quantity

import espaloma as esp


# =============================================================================
# CONSTANTS
# =============================================================================


# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
[docs] def get_client(url: str = "api.qcarchive.molssi.org") -> qcportal.client.PortalClient: """ Returns a instance of the qcportal client. Parameters ---------- url: str, default="api.qcarchive.molssi.org" qcportal instance to connect Returns ------- qcportal.client.PortalClient qcportal client instance. """ # Note, this may need to be modified to include username/password for non-public servers return qcportal.PortalClient(url)
[docs] def get_collection( client, collection_type="optimization", name="OpenFF Full Optimization Benchmark 1", ): """ Connects to a specific dataset on qcportal Parameters ---------- client: qcportal.client, required The qcportal client instance collection_type: str, default="optimization" The type of qcarchive collection, options are "torsiondrive", "optimization", "gridoptimization", "reaction", "singlepoint" "manybody" name: str, default="OpenFF Full Optimization Benchmark 1" Name of the dataset Returns ------- (qcportal dataset, list(str)) Tuple with an instance of qcportal dataset and list of record names """ collection = client.get_dataset( dataset_type=collection_type, dataset_name=name, ) record_names = collection.entry_names return collection, record_names
[docs] def process_record(record, entry): """ Processes a given record/entry pair from a dataset and returns the graph Parameters ---------- record: qcportal.optimization.record_models.OptimizationRecord qcportal record entry: cportal.optimization.dataset_models.OptimizationDatasetEntry qcportal entry Returns ------- esp.Graph """ from openff.toolkit.topology import Molecule if record.record_type == "optimization": trajectory = record.trajectory if trajectory is None: return None else: raise Exception( f"{record.record_type} is not supported: only optimization datasets can be processed." ) mol = Molecule.from_qcschema(entry.dict()) g = esp.Graph(mol) # energy is already hartree g.nodes["g"].data["u_ref"] = torch.tensor( [ Quantity( snapshot.properties["scf_total_energy"], esp.units.HARTREE_PER_PARTICLE, ).value_in_unit(esp.units.ENERGY_UNIT) for snapshot in trajectory ], dtype=torch.get_default_dtype(), )[None, :] g.nodes["n1"].data["xyz"] = torch.tensor( np.stack( [ Quantity( snapshot.molecule.geometry, unit.bohr, ).value_in_unit(esp.units.DISTANCE_UNIT) for snapshot in trajectory ], axis=1, ), requires_grad=True, dtype=torch.get_default_dtype(), ) g.nodes["n1"].data["u_ref_prime"] = torch.stack( [ torch.tensor( Quantity( np.array(snapshot.properties["return_result"]).reshape((-1, 3)), esp.units.HARTREE_PER_PARTICLE / unit.bohr, ).value_in_unit(esp.units.FORCE_UNIT), dtype=torch.get_default_dtype(), ) for snapshot in trajectory ], dim=1, ) return g
[docs] def get_graph(collection, record_name, spec_name="default"): """ Processes the qcportal data for a given record name. This supports optimization and singlepoint datasets. Parameters ---------- collection, qcportal dataset, required The instance of the qcportal dataset record_name, str, required The name of a give record spec_name, str, default="default" Retrieve data for a given qcportal specification. Returns ------- Graph """ # get record and trajectory record = collection.get_record(record_name, specification_name=spec_name) entry = collection.get_entry(record_name) g = process_record(record, entry) return g
[docs] def get_graphs(collection, record_names, spec_name="default"): """ Processes the qcportal data for a given set of record names. This uses the qcportal iteration functions which are faster than processing records one at a time. This supports optimization and singlepoint datasets. Parameters ---------- collection, qcportal dataset, required The instance of the qcportal dataset record_name, List[str], required A list of the record_names of a give record spec_name, str, default="default" Retrieve data for a given qcportal specification. Returns ------- list(graph) Returns a list of the corresponding graph for each record name """ g_list = [] for record, entry in zip( collection.iterate_records(record_names, specification_names=[spec_name]), collection.iterate_entries(record_names), ): # note iterate records returns a tuple of length 3 (name, spec_name, actual record information) g = process_record(record[2], entry) g_list.append(g) return g_list
[docs] def fetch_td_record(record: qcportal.torsiondrive.record_models.TorsiondriveRecord): """ Fetches configuration, energy, and gradients for a given torsiondrive record as a function of different angles. Parameters ---------- record: qcportal.torsiondrive.record_models.TorsiondriveRecord, required Torsiondrive record of interest Returns ------- tuple, ( numpy.array, numpy.array, numpy.array,numpy.array) Returned data is a tuple of numpy arrays. The first index contains angles and subsequent arrays represent molecule coordinate, energy and gradients associated with each angle. """ molecule_optimization = record.optimizations angle_keys = list(molecule_optimization.keys()) xyzs = [] energies = [] gradients = [] for angle in angle_keys: # NOTE: this is calling the first index of the optimization array # this gives the same value as the prior implementation. # however it seems to be that this contains multiple different initial configurations # that have been optimized. Should all conformers and energies/gradients be considered? mol = molecule_optimization[angle][0].final_molecule result = molecule_optimization[angle][0].trajectory[-1].properties """Note: force = - gradient""" # TODO: attach units here? or later? e = result["current energy"] g = np.array(result["current gradient"]).reshape(-1, 3) xyzs.append(mol.geometry) energies.append(e) gradients.append(g) # to arrays xyz = np.array(xyzs) energies = np.array(energies) gradients = np.array(gradients) # assume each angle key is a tuple -- sort by first angle in tuple # NOTE: (for now making the assumption that these torsion drives are 1D) for k in angle_keys: assert len(k) == 1 to_ordered = np.argsort([k[0] for k in angle_keys]) angles_in_order = [angle_keys[i_] for i_ in to_ordered] flat_angles = np.array(angles_in_order).flatten() # put the xyz's, energies, and gradients in the same order as the angles xyz_in_order = xyz[to_ordered] energies_in_order = energies[to_ordered] gradients_in_order = gradients[to_ordered] # TODO: put this return blob into a better struct return flat_angles, xyz_in_order, energies_in_order, gradients_in_order
MolWithTargets = namedtuple( "MolWithTargets", ["offmol", "xyz", "energies", "gradients"] )
[docs] def h5_to_dataset(df): def get_smiles(x): try: return x["offmol"].to_smiles() except: return np.nan df["smiles"] = df.apply(get_smiles, axis=1) df = df.dropna() groups = df.groupby("smiles") gs = [] for name, group in groups: mol_ref = group["offmol"][0] assert all(mol_ref == entry for entry in group["offmol"]) g = esp.Graph(mol_ref) u_ref = np.concatenate(group["energies"].values) u_ref_prime = np.concatenate(group["gradients"].values, axis=0).transpose( 1, 0, 2 ) xyz = np.concatenate(group["xyz"].values, axis=0).transpose(1, 0, 2) assert u_ref_prime.shape[0] == xyz.shape[0] == mol_ref.n_atoms assert u_ref.shape[0] == u_ref_prime.shape[1] == xyz.shape[1] # energy is already hartree g.nodes["g"].data["u_ref"] = torch.tensor( Quantity(u_ref, esp.units.HARTREE_PER_PARTICLE).value_in_unit( esp.units.ENERGY_UNIT ), dtype=torch.get_default_dtype(), )[None, :] g.nodes["n1"].data["xyz"] = torch.tensor( Quantity( xyz, unit.bohr, ).value_in_unit(esp.units.DISTANCE_UNIT), requires_grad=True, dtype=torch.get_default_dtype(), ) g.nodes["n1"].data["u_ref_prime"] = torch.tensor( Quantity( u_ref_prime, esp.units.HARTREE_PER_PARTICLE / unit.bohr, ).value_in_unit(esp.units.FORCE_UNIT), dtype=torch.get_default_dtype(), ) gs.append(g) return esp.data.dataset.GraphDataset(gs)
[docs] def breakdown_along_time_axis(g, batch_size=32): n_snapshots = g.nodes["g"].data["u_ref"].flatten().shape[0] idxs = list(range(n_snapshots)) from random import shuffle shuffle(idxs) chunks = [ idxs[_idx * batch_size: (_idx + 1) * batch_size] for _idx in range(n_snapshots // batch_size) ] _gs = [] for chunk in chunks: _g = esp.Graph(g.mol) _g.nodes["g"].data["u_ref"] = ( g.nodes["g"].data["u_ref"][:, chunk].detach().clone() ) _g.nodes["n1"].data["xyz"] = ( g.nodes["n1"].data["xyz"][:, chunk, :].detach().clone() ) _g.nodes["n1"].data["u_ref_prime"] = ( g.nodes["n1"].data["u_ref_prime"][:, chunk, :].detach().clone() ) _g.nodes["n1"].data["xyz"].requires_grad = True _gs.append(_g) return _gs
[docs] def make_batch_size_consistent(ds, batch_size=32): import itertools return esp.data.dataset.GraphDataset( list( itertools.chain.from_iterable( [breakdown_along_time_axis(g, batch_size=batch_size) for g in ds] ) ) )
[docs] def weight_by_snapshots(g, key="weight"): n_snapshots = g.nodes["n1"].data["xyz"].shape[1] g.nodes["g"].data[key] = torch.tensor(float(1.0 / n_snapshots))[None, :]