Source code for mxtaltools.common.utils

import json
import sys

import numpy as np
import plotly

import torch
from scipy.ndimage import gaussian_filter
from sklearn.neighbors import NearestNeighbors
from torch.nn import functional as F
from torch_scatter import scatter
from scipy.interpolate import interpn
from typing import List, Optional, Union
import collections
from copy import copy

'''
general utilities
'''

[docs] def get_point_density_knn(xy, k=30, alpha=20): X = xy.T.astype(np.float64) # whiten X -= X.mean(axis=0, keepdims=True) X /= X.std(axis=0, keepdims=True) + 1e-12 nbrs = NearestNeighbors(n_neighbors=k, algorithm="auto").fit(X) dists, _ = nbrs.kneighbors(X) rk = dists[:, -1] # distance to k-th neighbor z = 1.0 / (rk**2 + 1e-12) # 2D density proxy z /= z.max() z = np.log1p(alpha * z) / np.log1p(alpha) return z
[docs] def batch_compute_dipole(pos, batch, z, electronegativity_tensor): """ Compute a rough dipole moment for a flat batch of molecules, as the simple weighted sum of electronegativities. Do this in a batch fashion assuming the input is a flat list of graphs, indexed by batch Parameters ---------- pos : torch.tensor(n,3) 3d coordinates batch : torch.tensor(n) batch index z : torch.tensor(n) atom types electronegativity_tensor fixed electronegativities for each atom type Returns ------- dipole_moment : torch.tensor(num_graphs,3) """ centers_of_geometry = scatter(pos, batch, dim=0, reduce='mean') centers_of_charge = scatter(electronegativity_tensor[z.long()][:, None] * pos, batch, dim=0, reduce='mean') return centers_of_charge - centers_of_geometry
[docs] def get_point_density(xy, bins=100, sigma=1.5, alpha=20): x, y = xy hist, x_e, y_e = np.histogram2d(x, y, bins=bins, density=True) hist = gaussian_filter(hist, sigma=sigma, mode="nearest") xc = 0.5 * (x_e[1:] + x_e[:-1]) yc = 0.5 * (y_e[1:] + y_e[:-1]) x_clipped = np.clip(x, xc[0], xc[-1]) y_clipped = np.clip(y, yc[0], yc[-1]) z = interpn((xc, yc), hist, np.vstack([x_clipped, y_clipped]).T, bounds_error=False, fill_value=0.0) # floor + log compression z = np.clip(z, 1e-4 * z.max(), None) z /= z.max() z = np.log1p(alpha * z) / np.log1p(alpha) return z
[docs] def chunkify(lst: list, n: int): """ break up a list into n chunks of equal size (up to last chunk) Parameters ---------- lst : list n : int Returns ------- list_of_chunks : [n] """ return [lst[i::n] for i in range(n)]
[docs] def torch_ptp(tensor: torch.tensor): """ torch implementation of np.ptp Parameters ---------- tensor Returns ------- ptp : float """ return torch.max(tensor) - torch.min(tensor)
[docs] def standardize_np(data: np.ndarray, return_standardization: bool = False, known_mean=None, known_std=None): """ standardize an input 1D array by subtracting mean and dividing by standard deviation Parameters ---------- data : np.array return_standardization : bool return the mean and std known_mean : float, optional optionally use precomputed mean known_std : float, optional optionally use precomputed standard deviation Returns ------- std_data : np.array mean and standard deviation : optional floats """ data = data.astype('float32') if known_mean is not None: mean = known_mean else: mean = np.mean(data) if known_std is not None: std = known_std else: std = np.std(data) if std == 0: std = 0.01 # if std == 0, data-mean will be all zeros. Doesn't matter what this value is. std_data = (data - mean) / std if return_standardization: return std_data, mean, std else: return std_data
[docs] def normalize_np(x: np.ndarray): """ normalize an input by its span subtract min_x so that range is fixed on [0,1] Parameters ---------- x : np.array Returns ------- normed_x : np.array """ normed_x = (x - np.amin(x)) / np.ptp(x) return normed_x
[docs] def softmax_np(x: np.ndarray, temperature: float = 1): """ softmax function implemented in numpy Parameters ---------- x : np.array temperature : float softmax temperature Returns ------- softmax_x : np.array """ if x.ndim == 1: x = x[None, :] x = x.astype(float) x -= x.max() probabilities = np.exp(x / temperature) / (np.sum(np.exp(x / temperature), axis=1) + 1e-16)[:, None] return probabilities
[docs] def histogram_overlap_np(d1: np.ndarray, d2: np.ndarray): """ Compute the symmetric overlap of two histograms Parameters ---------- d1 : np.array(n) d2 : np.array(n) Returns ------- overlap : float """ return np.sum(np.minimum(d1, d2)) / np.average((d1.sum(), d2.sum()))
[docs] def repeat_interleave( repeats: List[int], device: Optional[torch.device] = None, ): """ # todo why do we have this? There are builtin methods in torch. # TODO replace and deprecate Alternate implementation of torch.repeat_interleave borrowed from torch_geometric.data.collate Parameters ---------- repeats : list of ints device : str or torch.device Returns ------- Repeated tensor which has the same shape as input, except along the given axis. """ outs = [torch.full((n,), i, device=device) for i, n in enumerate(repeats)] return torch.cat(outs, dim=0)
[docs] def namespace2dict(namespace_dict, higher_level=''): """ Convert a dict from an optionally nested namespace to an optionally nested dict. Parameters ---------- namespace_dict : dict for the higher level namespace higher_level : key for high level dict Returns ------- Dict matching higher level namespace """ copied_dict = copy(namespace_dict) for key in copied_dict.keys(): if 'namespace' in str(type(copied_dict[key])).lower(): copied_dict[key] = namespace2dict(copied_dict[key].__dict__, higher_level=key) else: pass return copied_dict
[docs] def flatten_dict(dictionary, parent_key=False, separator='_'): """ From : https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys Recursively convert a nested dictionary into a flattened dictionary Parameters ---------- dictionary parent_key separator Returns ------- Dict with all nested dict flattened, with longer keys instead of nesting. """ items = [] for key, value in dictionary.items(): new_key = str(parent_key) + separator + key if parent_key else key if isinstance(value, collections.abc.MutableMapping): items.extend(flatten_dict(value, new_key, separator).items()) elif isinstance(value, list): for k, v in enumerate(value): items.extend(flatten_dict({str(k): v}, new_key).items()) else: items.append((new_key, value)) return dict(items)
[docs] def signed_log(y: Union[torch.tensor, np.ndarray] ) -> Union[torch.tensor, np.ndarray]: if torch.is_tensor(y): out = torch.sign(y) * torch.log(1 + torch.abs(y)) else: out = np.sign(y) * np.log(1 + np.abs(y)) return out
[docs] def sample_uniform(num_samples, max_value, device): return torch.rand(size=(num_samples,), device=device) * max_value
[docs] def parse_to_torch(array: Union[torch.Tensor, np.ndarray, list], device: Union[torch.device, str], dtype=torch.float32) -> torch.Tensor: if torch.is_tensor(array): return torch.tensor(array.clone().detach(), dtype=dtype, device=device) elif isinstance(array, np.ndarray): return torch.tensor(array, dtype=dtype, device=device) elif isinstance(array, list): if torch.is_tensor(array[0]): return torch.cat(array, dim=0).to(device) else: return torch.tensor(array, dtype=dtype, device=device)
[docs] def softplus_shift(x: torch.Tensor, min_val: Optional[Union[float, torch.Tensor]] = 0.01, beta: Optional[float] = 10) -> torch.Tensor: return F.softplus(x - min_val, beta=beta) + min_val
[docs] def invert_softplus_shift(y: torch.Tensor, min_val: Optional[Union[float, torch.Tensor]] = 0.01, beta: Optional[float] = 10) -> torch.Tensor: return (1.0 / beta) * torch.log(torch.exp(beta * (y - min_val)) - 1.0) + min_val
[docs] def scale_bh_energy(bh_energy, beta: Optional[float] = 100): scaled_en = bh_energy.clone() scaled_en[scaled_en > 0] = torch.log(1 + scaled_en[scaled_en > 0] / beta) * beta return scaled_en
[docs] def smooth_constraint(value: torch.Tensor, threshold: float, mode: 'str', hardness: float): if mode == 'greater than': return F.softplus(-(value - threshold), beta=hardness) ** 2 elif mode == 'less than': return F.softplus(value - threshold, beta=hardness) ** 2
[docs] def sample_triangular_right(n_samples, start, stop, device='cpu'): """ sample from the CDF of a uniform distribution the right-aligned triangular distribution """ U = torch.rand(n_samples, device=device) return start + (stop - start) * torch.sqrt(U)
[docs] def siginv(x): """inverts the sigmoid function""" return torch.log(x/(1-x))
[docs] def std_normal_to_uniform(rands): return torch.distributions.Normal(0, 1).cdf(rands)
[docs] def uniform_to_std_normal(uniform): return torch.distributions.Normal(0, 1).icdf(uniform.clip(min=1e-5, max=1-1e-5)) # prevent exploding values
[docs] def get_plotly_fig_size_mb(fig) -> float: # Convert Plotly figure to JSON string fig_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) return sys.getsizeof(fig_json) / (1024 * 1024)
[docs] def block_repeat_interleave( lengths: torch.Tensor, repeats: torch.Tensor) -> torch.Tensor: """ Repeat ragged blocks of a flat tensor in parallel on GPU. Args: lengths : (num_blocks,) tensor of block lengths. repeats : (num_blocks,) tensor of repeat counts per block. Returns: 1D tensor with each block repeated blockwise. """ device = lengths.device # start offset of each block in flat offsets = torch.cumsum(torch.cat([torch.tensor([0], device=device), lengths[:-1]]), 0) # output size contributed per block block_out_sizes = lengths * repeats total_out = block_out_sizes.sum() # which block each output element belongs to block_ids = torch.repeat_interleave(torch.arange(len(lengths), device=device), block_out_sizes) # output start offsets out_offsets = torch.cumsum(torch.cat([torch.tensor([0], device=device), block_out_sizes[:-1]]), 0) # local position within block (resets with modulo!) local_index = torch.arange(total_out, device=device) - out_offsets[block_ids] pos_in_block = local_index % lengths[block_ids] # map back to flat indices src_ids = offsets[block_ids] + pos_in_block return src_ids
[docs] def log_rescale_positive(y: torch.Tensor, cutoff: float = 0 ): return torch.where( y > cutoff, # where above cutoff cutoff + torch.log1p(y - cutoff), # return log(1+x) y # else return x )
[docs] def is_cuda_oom(e: Exception) -> bool: if isinstance(e, torch.cuda.OutOfMemoryError): return True s = str(e).lower() return ( ("cuda" in s and "memory" in s) or ("cublas" in s and "alloc" in s) or ("cusolver" in s and "alloc" in s) or ("out of memory" in s) or ("nonzero is not supported for tensors with more than int_max elements" in s) )