import json
import sys
import numpy as np
import plotly
import torch
from scipy.ndimage import gaussian_filter
from sklearn.neighbors import NearestNeighbors
from torch.nn import functional as F
from torch_scatter import scatter
from scipy.interpolate import interpn
from typing import List, Optional, Union
import collections
from copy import copy
'''
general utilities
'''
[docs]
def get_point_density_knn(xy, k=30, alpha=20):
X = xy.T.astype(np.float64)
# whiten
X -= X.mean(axis=0, keepdims=True)
X /= X.std(axis=0, keepdims=True) + 1e-12
nbrs = NearestNeighbors(n_neighbors=k, algorithm="auto").fit(X)
dists, _ = nbrs.kneighbors(X)
rk = dists[:, -1] # distance to k-th neighbor
z = 1.0 / (rk**2 + 1e-12) # 2D density proxy
z /= z.max()
z = np.log1p(alpha * z) / np.log1p(alpha)
return z
[docs]
def batch_compute_dipole(pos, batch, z, electronegativity_tensor):
"""
Compute a rough dipole moment for a flat batch of molecules, as the simple weighted sum of electronegativities.
Do this in a batch fashion assuming the input is a flat list of graphs, indexed by batch
Parameters
----------
pos : torch.tensor(n,3)
3d coordinates
batch : torch.tensor(n)
batch index
z : torch.tensor(n)
atom types
electronegativity_tensor
fixed electronegativities for each atom type
Returns
-------
dipole_moment : torch.tensor(num_graphs,3)
"""
centers_of_geometry = scatter(pos, batch, dim=0, reduce='mean')
centers_of_charge = scatter(electronegativity_tensor[z.long()][:, None] * pos, batch, dim=0, reduce='mean')
return centers_of_charge - centers_of_geometry
[docs]
def get_point_density(xy, bins=100, sigma=1.5, alpha=20):
x, y = xy
hist, x_e, y_e = np.histogram2d(x, y, bins=bins, density=True)
hist = gaussian_filter(hist, sigma=sigma, mode="nearest")
xc = 0.5 * (x_e[1:] + x_e[:-1])
yc = 0.5 * (y_e[1:] + y_e[:-1])
x_clipped = np.clip(x, xc[0], xc[-1])
y_clipped = np.clip(y, yc[0], yc[-1])
z = interpn((xc, yc), hist, np.vstack([x_clipped, y_clipped]).T,
bounds_error=False, fill_value=0.0)
# floor + log compression
z = np.clip(z, 1e-4 * z.max(), None)
z /= z.max()
z = np.log1p(alpha * z) / np.log1p(alpha)
return z
[docs]
def chunkify(lst: list, n: int):
"""
break up a list into n chunks of equal size (up to last chunk)
Parameters
----------
lst : list
n : int
Returns
-------
list_of_chunks : [n]
"""
return [lst[i::n] for i in range(n)]
[docs]
def torch_ptp(tensor: torch.tensor):
"""
torch implementation of np.ptp
Parameters
----------
tensor
Returns
-------
ptp : float
"""
return torch.max(tensor) - torch.min(tensor)
[docs]
def standardize_np(data: np.ndarray, return_standardization: bool = False, known_mean=None, known_std=None):
"""
standardize an input 1D array by subtracting mean and dividing by standard deviation
Parameters
----------
data : np.array
return_standardization : bool
return the mean and std
known_mean : float, optional
optionally use precomputed mean
known_std : float, optional
optionally use precomputed standard deviation
Returns
-------
std_data : np.array
mean and standard deviation : optional floats
"""
data = data.astype('float32')
if known_mean is not None:
mean = known_mean
else:
mean = np.mean(data)
if known_std is not None:
std = known_std
else:
std = np.std(data)
if std == 0:
std = 0.01 # if std == 0, data-mean will be all zeros. Doesn't matter what this value is.
std_data = (data - mean) / std
if return_standardization:
return std_data, mean, std
else:
return std_data
[docs]
def normalize_np(x: np.ndarray):
"""
normalize an input by its span
subtract min_x so that range is fixed on [0,1]
Parameters
----------
x : np.array
Returns
-------
normed_x : np.array
"""
normed_x = (x - np.amin(x)) / np.ptp(x)
return normed_x
[docs]
def softmax_np(x: np.ndarray, temperature: float = 1):
"""
softmax function implemented in numpy
Parameters
----------
x : np.array
temperature : float
softmax temperature
Returns
-------
softmax_x : np.array
"""
if x.ndim == 1:
x = x[None, :]
x = x.astype(float)
x -= x.max()
probabilities = np.exp(x / temperature) / (np.sum(np.exp(x / temperature), axis=1) + 1e-16)[:, None]
return probabilities
[docs]
def histogram_overlap_np(d1: np.ndarray, d2: np.ndarray):
"""
Compute the symmetric overlap of two histograms
Parameters
----------
d1 : np.array(n)
d2 : np.array(n)
Returns
-------
overlap : float
"""
return np.sum(np.minimum(d1, d2)) / np.average((d1.sum(), d2.sum()))
[docs]
def repeat_interleave(
repeats: List[int],
device: Optional[torch.device] = None,
):
"""
# todo why do we have this? There are builtin methods in torch.
# TODO replace and deprecate
Alternate implementation of torch.repeat_interleave
borrowed from torch_geometric.data.collate
Parameters
----------
repeats : list of ints
device : str or torch.device
Returns
-------
Repeated tensor which has the same shape as input, except along the given axis.
"""
outs = [torch.full((n,), i, device=device) for i, n in enumerate(repeats)]
return torch.cat(outs, dim=0)
[docs]
def namespace2dict(namespace_dict, higher_level=''):
"""
Convert a dict from an optionally nested namespace to an optionally nested dict.
Parameters
----------
namespace_dict : dict for the higher level namespace
higher_level : key for high level dict
Returns
-------
Dict matching higher level namespace
"""
copied_dict = copy(namespace_dict)
for key in copied_dict.keys():
if 'namespace' in str(type(copied_dict[key])).lower():
copied_dict[key] = namespace2dict(copied_dict[key].__dict__, higher_level=key)
else:
pass
return copied_dict
[docs]
def flatten_dict(dictionary, parent_key=False, separator='_'):
"""
From : https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys
Recursively convert a nested dictionary into a flattened dictionary
Parameters
----------
dictionary
parent_key
separator
Returns
-------
Dict with all nested dict flattened, with longer keys instead of nesting.
"""
items = []
for key, value in dictionary.items():
new_key = str(parent_key) + separator + key if parent_key else key
if isinstance(value, collections.abc.MutableMapping):
items.extend(flatten_dict(value, new_key, separator).items())
elif isinstance(value, list):
for k, v in enumerate(value):
items.extend(flatten_dict({str(k): v}, new_key).items())
else:
items.append((new_key, value))
return dict(items)
[docs]
def signed_log(y: Union[torch.tensor, np.ndarray]
) -> Union[torch.tensor, np.ndarray]:
if torch.is_tensor(y):
out = torch.sign(y) * torch.log(1 + torch.abs(y))
else:
out = np.sign(y) * np.log(1 + np.abs(y))
return out
[docs]
def parse_to_torch(array: Union[torch.Tensor, np.ndarray, list],
device: Union[torch.device, str],
dtype=torch.float32) -> torch.Tensor:
if torch.is_tensor(array):
return torch.tensor(array.clone().detach(), dtype=dtype, device=device)
elif isinstance(array, np.ndarray):
return torch.tensor(array, dtype=dtype, device=device)
elif isinstance(array, list):
if torch.is_tensor(array[0]):
return torch.cat(array, dim=0).to(device)
else:
return torch.tensor(array, dtype=dtype, device=device)
[docs]
def softplus_shift(x: torch.Tensor,
min_val: Optional[Union[float, torch.Tensor]] = 0.01,
beta: Optional[float] = 10) -> torch.Tensor:
return F.softplus(x - min_val, beta=beta) + min_val
[docs]
def invert_softplus_shift(y: torch.Tensor,
min_val: Optional[Union[float, torch.Tensor]] = 0.01,
beta: Optional[float] = 10) -> torch.Tensor:
return (1.0 / beta) * torch.log(torch.exp(beta * (y - min_val)) - 1.0) + min_val
[docs]
def scale_bh_energy(bh_energy, beta: Optional[float] = 100):
scaled_en = bh_energy.clone()
scaled_en[scaled_en > 0] = torch.log(1 + scaled_en[scaled_en > 0] / beta) * beta
return scaled_en
[docs]
def smooth_constraint(value: torch.Tensor, threshold: float, mode: 'str', hardness: float):
if mode == 'greater than':
return F.softplus(-(value - threshold), beta=hardness) ** 2
elif mode == 'less than':
return F.softplus(value - threshold, beta=hardness) ** 2
[docs]
def sample_triangular_right(n_samples, start, stop, device='cpu'):
"""
sample from the CDF of a uniform distribution
the right-aligned triangular distribution
"""
U = torch.rand(n_samples, device=device)
return start + (stop - start) * torch.sqrt(U)
[docs]
def siginv(x):
"""inverts the sigmoid function"""
return torch.log(x/(1-x))
[docs]
def get_plotly_fig_size_mb(fig) -> float:
# Convert Plotly figure to JSON string
fig_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
return sys.getsizeof(fig_json) / (1024 * 1024)
[docs]
def block_repeat_interleave(
lengths: torch.Tensor,
repeats: torch.Tensor) -> torch.Tensor:
"""
Repeat ragged blocks of a flat tensor in parallel on GPU.
Args:
lengths : (num_blocks,) tensor of block lengths.
repeats : (num_blocks,) tensor of repeat counts per block.
Returns:
1D tensor with each block repeated blockwise.
"""
device = lengths.device
# start offset of each block in flat
offsets = torch.cumsum(torch.cat([torch.tensor([0], device=device), lengths[:-1]]), 0)
# output size contributed per block
block_out_sizes = lengths * repeats
total_out = block_out_sizes.sum()
# which block each output element belongs to
block_ids = torch.repeat_interleave(torch.arange(len(lengths), device=device),
block_out_sizes)
# output start offsets
out_offsets = torch.cumsum(torch.cat([torch.tensor([0], device=device),
block_out_sizes[:-1]]), 0)
# local position within block (resets with modulo!)
local_index = torch.arange(total_out, device=device) - out_offsets[block_ids]
pos_in_block = local_index % lengths[block_ids]
# map back to flat indices
src_ids = offsets[block_ids] + pos_in_block
return src_ids
[docs]
def log_rescale_positive(y: torch.Tensor,
cutoff: float = 0
):
return torch.where(
y > cutoff, # where above cutoff
cutoff + torch.log1p(y - cutoff), # return log(1+x)
y # else return x
)
[docs]
def is_cuda_oom(e: Exception) -> bool:
if isinstance(e, torch.cuda.OutOfMemoryError):
return True
s = str(e).lower()
return (
("cuda" in s and "memory" in s)
or ("cublas" in s and "alloc" in s)
or ("cusolver" in s and "alloc" in s)
or ("out of memory" in s)
or ("nonzero is not supported for tensors with more than int_max elements" in s)
)