# vim: set fileencoding=<utf-8> :
# Copyright 2018-2023 John Lees and Nick Croucher
'''Refine mixture model using network properties'''
# universal
import os
import sys
# additional
from itertools import chain
from functools import partial
import numpy as np
import scipy.optimize
import collections
from math import sqrt
from tqdm import tqdm
try:
from multiprocessing import Pool, shared_memory
from multiprocessing.managers import SharedMemoryManager
NumpyShared = collections.namedtuple('NumpyShared', ('name', 'shape', 'dtype'))
except ImportError as e:
sys.stderr.write("This version of PopPUNK requires python v3.8 or higher\n")
sys.exit(0)
import graph_tool.all as gt
import pandas as pd
# Load GPU libraries
try:
import cupyx
import cugraph
import cudf
import cupy as cp
from numba import cuda
import rmm
except ImportError:
pass
import poppunk_refine
from .__main__ import betweenness_sample_default
from .network import construct_network_from_df, printClusters
from .network import construct_network_from_edge_list
from .network import networkSummary
from .network import generate_cugraph
from .utils import transformLine
from .utils import decisionBoundary
from .utils import check_and_set_gpu
[docs]def refineFit(distMat, sample_names, mean0, mean1, scale,
max_move, min_move, slope = 2, score_idx = 0,
unconstrained = False, no_local = False, num_processes = 1,
betweenness_sample = betweenness_sample_default, sample_size = None,
use_gpu = False):
"""Try to refine a fit by maximising a network score based on transitivity and density.
Iteratively move the decision boundary to do this, using starting point from existing model.
Args:
distMat (numpy.array)
n x 2 array of core and accessory distances for n samples
sample_names (list)
List of query sequence labels
mean0 (numpy.array)
Start point to define search line
mean1 (numpy.array)
End point to define search line
scale (numpy.array)
Scaling factor of distMat
max_move (float)
Maximum distance to move away from start point
min_move (float)
Minimum distance to move away from start point
slope (int)
Set to 0 for a vertical line, 1 for a horizontal line, or
2 to use a slope
score_idx (int)
Index of score from :func:`~PopPUNK.network.networkSummary` to use
[default = 0]
unconstrained (bool)
If True, search in 2D and change the slope of the boundary
no_local (bool)
Turn off the local optimisation step.
Quicker, but may be less well refined.
num_processes (int)
Number of threads to use in the global optimisation step.
(default = 1)
betweenness_sample (int)
Number of sequences per component used to estimate betweenness using
a GPU. Smaller numbers are faster but less precise [default = 100]
sample_size (int)
Number of nodes to subsample for graph statistic calculation
use_gpu (bool)
Whether to use cugraph for graph analyses
Returns:
optimal_x (float)
x-coordinate of refined fit
optimal_y (float)
y-coordinate of refined fit
optimised_s (float)
Position along search range of refined fit
"""
# Optimize boundary - grid search for global minimum
sys.stderr.write("Trying to optimise score globally\n")
# Boundary is left of line normal to this point and first line
gradient = (mean1[1] - mean0[1]) / (mean1[0] - mean0[0])
if unconstrained:
if slope != 2:
raise RuntimeError("Unconstrained optimization and indiv-refine incompatible")
global_grid_resolution = 20
x_max_start, y_max_start = decisionBoundary(mean0, gradient, adj = -1*min_move)
x_max_end, y_max_end = decisionBoundary(mean1, gradient, adj = max_move)
if x_max_start < 0 or y_max_start < 0:
raise RuntimeError("Boundary range below zero")
x_max = np.linspace(x_max_start, x_max_end, global_grid_resolution, dtype=np.float32)
y_max = np.linspace(y_max_start, y_max_end, global_grid_resolution, dtype=np.float32)
sys.stderr.write("Searching core intercept from " +
"{:.3f}".format(x_max_start * scale[0]) +
" to " + "{:.3f}".format(x_max_end * scale[0]) + "\n")
sys.stderr.write("Searching accessory intercept from " +
"{:.3f}".format(y_max_start * scale[1]) +
" to " + "{:.3f}".format(y_max_end * scale[1]) + "\n")
if use_gpu:
global_s = map(partial(newNetwork2D,
sample_names = sample_names,
distMat = distMat,
x_range = x_max,
y_range = y_max,
score_idx = score_idx,
betweenness_sample = betweenness_sample,
sample_size = sample_size,
use_gpu = True),
range(global_grid_resolution))
else:
if gt.openmp_enabled():
gt.openmp_set_num_threads(1)
with SharedMemoryManager() as smm:
shm_distMat = smm.SharedMemory(size = distMat.nbytes)
distances_shared_array = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = shm_distMat.buf)
distances_shared_array[:] = distMat[:]
distances_shared = NumpyShared(name = shm_distMat.name, shape = distMat.shape, dtype = distMat.dtype)
with Pool(processes = num_processes) as pool:
global_s = pool.map(partial(newNetwork2D,
sample_names = sample_names,
distMat = distances_shared,
x_range = x_max,
y_range = y_max,
score_idx = score_idx,
betweenness_sample = betweenness_sample,
sample_size = sample_size,
use_gpu = False),
range(global_grid_resolution))
if gt.openmp_enabled():
gt.openmp_set_num_threads(num_processes)
global_s = np.array(list(chain.from_iterable(global_s)))
global_s[np.isnan(global_s)] = 1
min_idx = np.argmin(global_s)
optimal_x = x_max[min_idx % global_grid_resolution]
optimal_y = y_max[min_idx // global_grid_resolution]
optimised_s = global_s[min_idx]
if not (optimal_x > x_max_start and optimal_x < x_max_end and \
optimal_y > y_max_start and optimal_y < y_max_end):
no_local = True
elif not no_local:
# We have a fixed gradient and want to optimised the intercept
# This parameterisation is a little awkward to match the 1D case:
# Make two points along the right slope
gradient = optimal_x / optimal_y # of 1D search
delta = x_max[1] - x_max[0]
bounds = [-delta, delta]
mean1 = (optimal_x + delta, delta * gradient)
else:
# Set the range of points to search
search_length = max_move + ((mean1[0] - mean0[0])**2 + (mean1[1] - mean0[1])**2)**0.5
global_grid_resolution = 40 # Seems to work
s_range = np.linspace(-min_move, search_length, num = global_grid_resolution)
(min_x, max_x), (min_y, max_y) = \
check_search_range(scale, mean0, mean1, s_range[0], s_range[-1])
if min_x < 0 or min_y < 0:
raise RuntimeError("Boundary range below zero")
i_vec, j_vec, idx_vec = \
poppunk_refine.thresholdIterate1D(distMat, s_range, slope,
mean0[0], mean0[1],
mean1[0], mean1[1], num_processes)
if len(idx_vec) == distMat.shape[0]:
raise RuntimeError("Boundary range includes all points")
global_s = np.array(growNetwork(sample_names,
i_vec,
j_vec,
idx_vec,
s_range,
score_idx,
betweenness_sample = betweenness_sample,
sample_size = sample_size,
use_gpu = use_gpu))
global_s[np.isnan(global_s)] = 1
min_idx = np.argmin(np.array(global_s))
if min_idx > 0 and min_idx < len(s_range) - 1:
bounds = [s_range[min_idx-1], s_range[min_idx+1]]
else:
no_local = True
if no_local:
optimised_s = s_range[min_idx]
# Local optimisation around global optimum
if not no_local:
sys.stderr.write("Trying to optimise score locally\n")
local_s = scipy.optimize.minimize_scalar(
newNetwork,
bounds = bounds,
method = 'Bounded', options={'disp': True},
args = (sample_names, distMat, mean0, mean1, gradient,
slope, score_idx, num_processes,
betweenness_sample, sample_size, use_gpu)
)
optimised_s = local_s.x
# Convert to x_max, y_max if needed
if not unconstrained or not no_local:
optimised_coor = transformLine(optimised_s, mean0, mean1)
if slope == 2:
optimal_x, optimal_y = decisionBoundary(optimised_coor, gradient)
if optimal_x < 0 or optimal_y < 0:
raise RuntimeError("Optimisation failed: produced a boundary outside of allowed range\n")
else:
optimal_x = optimised_coor[0]
optimal_y = optimised_coor[1]
if (slope == 0 and optimal_x < 0) or (slope == 1 and optimal_y < 0):
raise RuntimeError("Optimisation failed: produced a boundary outside of allowed range\n")
return optimal_x, optimal_y, optimised_s
[docs]def multi_refine(distMat, sample_names, mean0, mean1, scale, s_max,
n_boundary_points, output_prefix, num_processes = 1,
betweenness_sample = betweenness_sample_default, sample_size = None,
use_gpu = False):
"""Move the refinement boundary between the optimum and where it meets an
axis. Discrete steps, output the clusers at each step
Args:
distMat (numpy.array)
n x 2 array of core and accessory distances for n samples
sample_names (list)
List of query sequence labels
mean0 (numpy.array)
Start point to define search line
mean1 (numpy.array)
End point to define search line
scale (numpy.array)
Scaling factor of distMat
s_max (float)
The optimal s position from refinement (:func:`~PopPUNK.refine.refineFit`)
n_boundary_points (int)
Number of positions to try drawing the boundary at
num_processes (int)
Number of threads to use in the global optimisation step.
(default = 1)
betweenness_sample (int)
Number of sequences per component used to estimate betweenness using
a GPU. Smaller numbers are faster but less precise [default = 100]
sample_size (int)
Number of nodes to subsample for graph statistic calculation
use_gpu (bool)
Whether to use cugraph for graph analyses
"""
# Set the range
# Between optimised s and where line meets an axis
gradient = (mean1[1] - mean0[1]) / (mean1[0] - mean0[0])
if mean0[1] >= gradient * mean0[0]:
s_min = -mean0[0] * sqrt(1 + gradient * gradient)
else:
s_min = -mean0[1] * sqrt(1 + 1 / (gradient * gradient))
s_range = np.linspace(s_min, s_max, num = n_boundary_points)
(min_x, max_x), (min_y, max_y) = \
check_search_range(scale, mean0, mean1, s_range[0], s_range[-1])
if min_x < 0 or min_y < 0:
sys.stderr.write("Boundary range below zero")
i_vec, j_vec, idx_vec = \
poppunk_refine.thresholdIterate1D(distMat, s_range, 2,
mean0[0], mean0[1],
mean1[0], mean1[1],
num_processes)
growNetwork(sample_names,
i_vec,
j_vec,
idx_vec,
s_range,
0,
write_clusters = output_prefix,
betweenness_sample = betweenness_sample,
sample_size = sample_size,
use_gpu = use_gpu)
[docs]def check_search_range(scale, mean0, mean1, lower_s, upper_s):
"""Checks a search range is within a valid range
Args:
scale (np.array)
Rescaling factor to [0, 1] for each axis
mean0 (np.array)
(x, y) of starting point defining line
mean1 (np.array)
(x, y) of end point defining line
lower_s (float)
distance along line to start search
upper_s (float)
distance along line to end search
Returns:
min_x, max_x
minimum and maximum x-intercepts of the search range
min_y, max_y
minimum and maximum x-intercepts of the search range
"""
gradient = (mean1[1] - mean0[1]) / (mean1[0] - mean0[0])
bottom_end = transformLine(lower_s, mean0, mean1)
top_end = transformLine(upper_s, mean0, mean1)
min_x, min_y = decisionBoundary(bottom_end, gradient)
max_x, max_y = decisionBoundary(top_end, gradient)
sys.stderr.write("Search range (" +
",".join(["{:.3f}".format(x) for x in bottom_end * scale]) +
") to (" +
",".join(["{:.3f}".format(x) for x in top_end * scale]) + ")\n")
sys.stderr.write("Searching core intercept from " +
"{:.3f}".format(min_x * scale[0]) +
" to " + "{:.3f}".format(max_x * scale[0]) + "\n")
sys.stderr.write("Searching accessory intercept from " +
"{:.3f}".format(min_y * scale[1]) +
" to " + "{:.3f}".format(max_y * scale[1]) + "\n")
return((min_x, max_x), (min_y, max_y))
[docs]def expand_cugraph_network(G, G_extra_df):
"""Reconstruct a cugraph network with additional edges.
Args:
G (cugraph network)
Original cugraph network
extra_edges (cudf dataframe)
Data frame of edges to add
Returns:
G (cugraph network)
Expanded cugraph network
"""
G_vertex_count = G.number_of_vertices()-1
G_original_df = G.view_edge_list()
if 'src' in G_original_df.columns:
G_original_df.columns = ['source','destination']
G_df = cudf.concat([G_original_df,G_extra_df])
G = generate_cugraph(G_df, G_vertex_count, weights = False, renumber = False)
return G
[docs]def growNetwork(sample_names, i_vec, j_vec, idx_vec, s_range, score_idx = 0,
thread_idx = 0, betweenness_sample = betweenness_sample_default,
write_clusters = None, sample_size = None, use_gpu = False):
"""Construct a network, then add edges to it iteratively.
Input is from ``pp_sketchlib.iterateBoundary1D`` or``pp_sketchlib.iterateBoundary2D``
Args:
sample_names (list)
Sample names corresponding to distMat (accessed by iterator)
i_vec (list)
Ordered ref vertex index to add
j_vec (list)
Ordered query (==ref) vertex index to add
idx_vec (list)
For each i, j tuple, the index of the intercept at which these enter
the network. These are sorted and increasing
s_range (list)
Offsets which correspond to idx_vec entries
score_idx (int)
Index of score from :func:`~PopPUNK.network.networkSummary` to use
[default = 0]
thread_idx (int)
Optional thread idx (if multithreaded) to offset progress bar by
betweenness_sample (int)
Number of sequences per component used to estimate betweenness using
a GPU. Smaller numbers are faster but less precise [default = 100]
write_clusters (str)
Set to a prefix to write the clusters from each position to files
[default = None]
sample_size (int)
Number of nodes to subsample for graph statistic calculation
use_gpu (bool)
Whether to use cugraph for graph analyses
Returns:
scores (list)
-1 * network score for each of x_range.
Where network score is from :func:`~PopPUNK.network.networkSummary`
"""
scores = []
prev_idx = -1
# create data frame
if use_gpu:
edge_list_df = cudf.DataFrame()
else:
edge_list_df = pd.DataFrame()
edge_list_df['source'] = i_vec
edge_list_df['destination'] = j_vec
edge_list_df['idx_list'] = idx_vec
if use_gpu:
idx_values = edge_list_df.to_pandas().idx_list.unique()
else:
idx_values = edge_list_df.idx_list.unique()
# Grow a network
with tqdm(total = max(idx_values) + 1,
bar_format = "{bar}| {n_fmt}/{total_fmt}",
ncols = 40,
position = thread_idx) as pbar:
for idx in idx_values:
# Create DF
edge_df = edge_list_df.loc[(edge_list_df['idx_list']==idx),['source','destination']]
# At first offset, make a new network, otherwise just add the new edges
if prev_idx == -1:
G = construct_network_from_df(sample_names, sample_names,
edge_df,
summarise = False,
use_gpu = use_gpu)
else:
if use_gpu:
G = expand_cugraph_network(G, edge_df)
else:
edge_list = list(edge_df[['source','destination']].itertuples(index=False, name=None))
G.add_edge_list(edge_list)
edge_list = []
# Add score into vector for any offsets passed (should usually just be one)
G_summary = networkSummary(G,
score_idx > 0,
betweenness_sample = betweenness_sample,
subsample = sample_size,
use_gpu = use_gpu)
latest_score = -G_summary[1][score_idx]
for s in range(prev_idx, idx):
scores.append(latest_score)
pbar.update(1)
# Write the cluster output as long as there is at least one
# non-trivial cluster
if write_clusters and G_summary[0][0] < len(sample_names):
o_prefix = \
f"{write_clusters}/{os.path.basename(write_clusters)}_boundary{s + 1}"
printClusters(G,
sample_names,
outPrefix=o_prefix,
write_unwords=False,
use_gpu=use_gpu)
prev_idx = idx
return(scores)
[docs]def newNetwork(s, sample_names, distMat, mean0, mean1, gradient,
slope=2, score_idx=0, cpus=1, betweenness_sample = betweenness_sample_default,
sample_size = None, use_gpu = False):
"""Wrapper function for :func:`~PopPUNK.network.construct_network_from_edge_list` which is called
by optimisation functions moving a triangular decision boundary.
Given the boundary parameterisation, constructs the network and returns
its score, to be minimised.
Args:
s (float)
Distance along line between start_point and mean1 from start_point
sample_names (list)
Sample names corresponding to distMat (accessed by iterator)
distMat (numpy.array or NumpyShared)
Core and accessory distances or NumpyShared describing these in sharedmem
mean0 (numpy.array)
Start point
mean1 (numpy.array)
End point
gradient (float)
Gradient of line to move along
slope (int)
Set to 0 for a vertical line, 1 for a horizontal line, or
2 to use a slope
[default = 2]
score_idx (int)
Index of score from :func:`~PopPUNK.network.networkSummary` to use
[default = 0]
cpus (int)
Number of CPUs to use for calculating assignment
betweenness_sample (int)
Number of sequences per component used to estimate betweenness using
a GPU. Smaller numbers are faster but less precise [default = 100]
sample_size (int)
Number of nodes to subsample for graph statistic calculation
use_gpu (bool)
Whether to use cugraph for graph analysis
Returns:
score (float)
-1 * network score. Where network score is from :func:`~PopPUNK.network.networkSummary`
"""
if isinstance(distMat, NumpyShared):
distMat_shm = shared_memory.SharedMemory(name = distMat.name)
distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf)
# Set up boundary
new_intercept = transformLine(s, mean0, mean1)
if slope == 2:
x_max, y_max = decisionBoundary(new_intercept, gradient)
elif slope == 0:
x_max = new_intercept[0]
y_max = 0
elif slope == 1:
x_max = 0
y_max = new_intercept[1]
# Make network
connections = poppunk_refine.edgeThreshold(distMat, slope, x_max, y_max)
G = construct_network_from_edge_list(sample_names,
sample_names,
connections,
summarise = False,
use_gpu = use_gpu)
# Return score
score = networkSummary(G,
score_idx > 0,
subsample = sample_size,
betweenness_sample = betweenness_sample,
use_gpu = use_gpu)[1][score_idx]
return(-score)
[docs]def newNetwork2D(y_idx, sample_names, distMat, x_range, y_range, score_idx=0,
betweenness_sample = betweenness_sample_default, sample_size = None,
use_gpu = False):
"""Wrapper function for thresholdIterate2D and :func:`growNetwork`.
For a given y_max, constructs networks across x_range and returns a list
of scores
Args:
y_idx (float)
Maximum y-intercept of boundary, as index into y_range
sample_names (list)
Sample names corresponding to distMat (accessed by iterator)
distMat (numpy.array or NumpyShared)
Core and accessory distances or NumpyShared describing these in sharedmem
x_range (list)
Sorted list of x-intercepts to search
y_range (list)
Sorted list of y-intercepts to search
score_idx (int)
Index of score from :func:`~PopPUNK.network.networkSummary` to use
[default = 0]
betweenness_sample (int)
Number of sequences per component used to estimate betweenness using
a GPU. Smaller numbers are faster but less precise [default = 100]
sample_size (int)
Number of nodes to subsample for graph statistic calculation
use_gpu (bool)
Whether to use cugraph for graph analysis
Returns:
scores (list)
-1 * network score for each of x_range.
Where network score is from :func:`~PopPUNK.network.networkSummary`
"""
if gt.openmp_enabled():
gt.openmp_set_num_threads(1)
if isinstance(distMat, NumpyShared):
distMat_shm = shared_memory.SharedMemory(name = distMat.name)
distMat = np.ndarray(distMat.shape, dtype = distMat.dtype, buffer = distMat_shm.buf)
y_max = y_range[y_idx]
i_vec, j_vec, idx_vec = \
poppunk_refine.thresholdIterate2D(distMat, x_range, y_max)
# If everything is in the network, skip this boundary
if len(idx_vec) == distMat.shape[0]:
scores = [0] * len(x_range)
else:
scores = growNetwork(sample_names,
i_vec,
j_vec,
idx_vec,
x_range,
score_idx,
y_idx,
betweenness_sample,
sample_size = sample_size,
use_gpu = use_gpu)
return(scores)
[docs]def readManualStart(startFile):
"""Reads a file to define a manual start point, rather than using ``--fit-model``
Throws and exits if incorrectly formatted.
Args:
startFile (str)
Name of file with values to read
Returns:
mean0 (numpy.array)
Centre of within-strain distribution
mean1 (numpy.array)
Centre of between-strain distribution
scaled (bool)
True if means are scaled between [0,1]
"""
mean0 = None
mean1 = None
scaled = True
with open(startFile, 'r') as start:
for line in start:
(param, value) = line.rstrip().split()
if param == 'start':
mean_read = []
for mean_val in value.split(','):
mean_read.append(float(mean_val))
mean0 = np.array(mean_read)
elif param == 'end':
mean_read = []
for mean_val in value.split(','):
mean_read.append(float(mean_val))
mean1 = np.array(mean_read)
elif param == 'scaled':
if value == "False" or value == "false":
scaled = False
else:
raise RuntimeError("Incorrectly formatted manual start file")
try:
if not isinstance(mean0, np.ndarray) or not isinstance(mean1, np.ndarray):
raise RuntimeError('Must set both start and end')
if mean0.shape != (2,) or mean1.shape != (2,):
raise RuntimeError('Wrong size for values')
check_vals = np.hstack([mean0, mean1])
for val in np.nditer(check_vals):
if val > 1 or val < 0:
raise RuntimeError('Value out of range (between 0 and 1)')
except RuntimeError as e:
sys.stderr.write("Could not read manual start file " + startFile + "\n")
sys.stderr.write(str(e) + "\n")
sys.exit(1)
return mean0, mean1, scaled
[docs]def likelihoodBoundary(s, model, start, end, within, between):
"""Wrapper function around :func:`~PopPUNK.bgmm.fit2dMultiGaussian` so that it can
go into a root-finding function for probabilities between components
Args:
s (float)
Distance along line from mean0
model (BGMMFit)
Fitted mixture model
start (numpy.array)
The co-ordinates of the centre of the within-strain distribution
end (numpy.array)
The co-ordinates of the centre of the between-strain distribution
within (int)
Label of the within-strain distribution
between (int)
Label of the between-strain distribution
Returns:
responsibility (float)
The difference between responsibilities of assignment to the within component
and the between assignment
"""
X = transformLine(s, start, end).reshape(1, -1)
responsibilities = model.assign(X, progress = False, values = True)
return(responsibilities[0, within] - responsibilities[0, between])