Source code for PopPUNK.models

# vim: set fileencoding=<utf-8> :
# Copyright 2018-2023 John Lees and Nick Croucher

'''Classes used for model fits'''

# universal
import os
import sys
# additional
import numpy as np
import random
import pickle
import shutil
import re
from sklearn import utils
import scipy.optimize
from scipy import stats
import scipy.sparse
import hdbscan

# Parallel support
from tqdm import tqdm
from tqdm.contrib.concurrent import thread_map, process_map
from functools import partial
import collections
try:
    from multiprocessing import Pool, RLock, shared_memory
    from multiprocessing.managers import SharedMemoryManager
    NumpyShared = collections.namedtuple('NumpyShared', ('name', 'shape', 'dtype'))
except ImportError as e:
    sys.stderr.write("This version of PopPUNK requires python v3.8 or higher\n")
    sys.exit(0)

# Load GPU libraries
try:
    import cupyx
    import cugraph
    import cudf
    import cupy as cp
    from numba import cuda
    import rmm
    from cuml import cluster
except ImportError:
    pass

import pp_sketchlib
import poppunk_refine

from .__main__ import betweenness_sample_default

from .utils import set_env
from .utils import check_and_set_gpu

# BGMM
from .bgmm import fit2dMultiGaussian
from .bgmm import findWithinLabel
from .bgmm import findBetweenLabel_bgmm
from .bgmm import log_likelihood
from .plot import plot_results
from .plot import plot_contours

# DBSCAN
from .dbscan import fitDbScan
from .dbscan import findBetweenLabel
from .dbscan import evaluate_dbscan_clusters
from .plot import plot_dbscan_results

# refine
from .refine import refineFit, multi_refine
from .refine import readManualStart
from .plot import plot_refined_results

# lineage
from .plot import distHistogram
epsilon = 1e-10

# Format for rank fits
def rankFile(rank):
    return('_rank_' + str(rank) + '_fit.npz')

[docs]def loadClusterFit(pkl_file, npz_file, outPrefix = "", max_samples = 100000, use_gpu = False): '''Call this to load a fitted model Args: pkl_file (str) Location of saved .pkl file on disk npz_file (str) Location of saved .npz file on disk outPrefix (str) Output prefix for model to save to (e.g. plots) max_samples (int) Maximum samples if subsampling X [default = 100000] use_gpu (bool) Whether to load npz file with GPU libraries for lineage models Returns: load_obj (model) Loaded model ''' with open(pkl_file, 'rb') as pickle_obj: fit_object, fit_type = pickle.load(pickle_obj) if fit_type == 'lineage': # Can't save multiple sparse matrices to the same file, so do some # file name processing fit_file = os.path.basename(pkl_file) prefix = re.match(r"^(.+)_fit\.pkl$", fit_file) rank_file = os.path.dirname(pkl_file) + "/" + \ prefix.group(1) + '_sparse_dists.npz' fit_data = scipy.sparse.load_npz(rank_file) else: fit_data = np.load(npz_file) if fit_type == "bgmm": sys.stderr.write("Loading BGMM 2D Gaussian model\n") load_obj = BGMMFit(outPrefix, max_samples) elif fit_type == "dbscan": sys.stderr.write("Loading DBSCAN model\n") load_obj = DBSCANFit(outPrefix, max_samples) elif fit_type == "refine": sys.stderr.write("Loading previously refined model\n") load_obj = RefineFit(outPrefix) elif fit_type == "lineage": sys.stderr.write("Loading lineage cluster model\n") load_obj = LineageFit(outPrefix, *fit_object) else: raise RuntimeError("Undefined model type: " + str(fit_type)) load_obj.load(fit_data, fit_object) sys.stderr.write("Completed model loading\n") return load_obj
[docs]def assign_samples(chunk, X, y, model, scale, chunk_size, values = False): """Runs a models assignment on a chunk of input Args: chunk (int) Index of chunk to process X (NumpyShared) n x 2 array of core and accessory distances for n samples y (NumpyShared) An n-vector to store results, with the most likely cluster memberships or an n by k matrix with the component responsibilities for each sample. weights (numpy.array) Component weights from :class:`~PopPUNK.models.BGMMFit` means (numpy.array) Component means from :class:`~PopPUNK.models.BGMMFit` covars (numpy.array) Component covariances from :class:`~PopPUNK.models.BGMMFit` scale (numpy.array) Scaling of core and accessory distances from :class:`~PopPUNK.models.BGMMFit` chunk_size (int) Size of each chunk in X values (bool) Whether to return the responsibilities, rather than the most likely assignment (used for entropy calculation). Default is False """ # Make sure this is run single threaded with set_env(MKL_NUM_THREADS='1', NUMEXPR_NUM_THREADS='1', OMP_NUM_THREADS='1'): if isinstance(X, NumpyShared): X_shm = shared_memory.SharedMemory(name = X.name) X = np.ndarray(X.shape, dtype = X.dtype, buffer = X_shm.buf) if isinstance(y, NumpyShared): y_shm = shared_memory.SharedMemory(name = y.name) y = np.ndarray(y.shape, dtype = y.dtype, buffer = y_shm.buf) start = chunk * chunk_size end = min((chunk + 1) * chunk_size, X.shape[0]) if start >= end: raise RuntimeError("start >= end in BGMM assign") if isinstance(model, BGMMFit): logprob, lpr = log_likelihood(X[start:end, :], model.weights, model.means, model.covariances, scale) responsibilities = np.exp(lpr - logprob[:, np.newaxis]) # Default to return the most likely cluster if values == False: y[start:end] = responsibilities.argmax(axis=1) # Can return the actual responsibilities else: y[start:end, :] = responsibilities elif isinstance(model, DBSCANFit): y[start:end] = hdbscan.approximate_predict(model.hdb, X[start:end, :]/scale)[0]
[docs]class ClusterFit: '''Parent class for all models used to cluster distances Args: outPrefix (str) The output prefix used for reading/writing ''' def __init__(self, outPrefix, default_dtype = np.float32): self.outPrefix = outPrefix if outPrefix != "" and not os.path.isdir(outPrefix): try: os.makedirs(outPrefix) except OSError: sys.stderr.write("Cannot create output directory " + outPrefix + "\n") sys.exit(1) self.fitted = False self.indiv_fitted = False self.default_dtype = default_dtype self.threads = 1 def set_threads(self, threads): self.threads = threads
[docs] def fit(self, X = None): '''Initial steps for all fit functions. Creates output directory. If preprocess is set then subsamples passed X Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. (default = None) default_dtype (numpy dtype) Type to use if no X provided ''' # set output dir if not os.path.isdir(self.outPrefix): if not os.path.isfile(self.outPrefix): os.makedirs(self.outPrefix) else: sys.stderr.write(self.outPrefix + " already exists as a file! Use a different --output\n") sys.exit(1) if X is not None: self.default_dtype = X.dtype # preprocess subsampling if self.preprocess: if X.shape[0] > self.max_samples: self.subsampled_X = utils.shuffle(X, random_state=random.randint(1,10000))[0:self.max_samples,] else: self.subsampled_X = np.copy(X) # perform scaling self.scale = np.amax(self.subsampled_X, axis = 0) self.subsampled_X /= self.scale
[docs] def plot(self, X=None): '''Initial steps for all plot functions. Ensures model has been fitted. Args: X (numpy.array) The core and accessory distances to subsample. (default = None) ''' if not self.fitted: raise RuntimeError("Trying to plot unfitted model")
[docs] def no_scale(self): '''Turn off scaling (useful for refine, where optimization is done in the scaled space). ''' self.scale = np.array([1, 1], dtype = self.default_dtype)
[docs] def copy(self, prefix): """Copy the model to a new directory """ self.outPrefix = prefix self.save()
[docs]class BGMMFit(ClusterFit): '''Class for fits using the Gaussian mixture model. Inherits from :class:`ClusterFit`. Must first run either :func:`~BGMMFit.fit` or :func:`~BGMMFit.load` before calling other functions Args: outPrefix (str) The output prefix used for reading/writing max_samples (int) The number of subsamples to fit the model to (default = 100000) ''' def __init__(self, outPrefix, max_samples = 100000, max_batch_size = 100000, assign_points = True): ClusterFit.__init__(self, outPrefix) self.type = 'bgmm' self.preprocess = True self.max_samples = max_samples self.max_batch_size = max_batch_size self.assign_points = assign_points
[docs] def fit(self, X, max_components): '''Extends :func:`~ClusterFit.fit` Fits the BGMM and returns assignments by calling :func:`~PopPUNK.bgmm.fit2dMultiGaussian`. Fitted parameters are stored in the object. Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. max_components (int) Maximum number of mixture components to use. Returns: y (numpy.array) Cluster assignments of samples in X ''' ClusterFit.fit(self, X) self.dpgmm = fit2dMultiGaussian(self.subsampled_X, max_components) self.weights = self.dpgmm.weights_ self.means = self.dpgmm.means_ self.covariances = self.dpgmm.covariances_ self.fitted = True # Allow for partial fitting that only assigns the subsample not the full set if self.assign_points: y = self.assign(X, max_batch_size = self.max_batch_size) else: y = self.assign(self.subsampled_X, max_batch_size = self.max_batch_size) self.within_label = findWithinLabel(self.means, y) self.between_label = findBetweenLabel_bgmm(self.means, y) return y
[docs] def save(self): '''Save the model to disk, as an npz and pkl (using outPrefix).''' if not self.fitted: raise RuntimeError("Trying to save unfitted model") else: np.savez(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.npz', weights=self.weights, means=self.means, covariances=self.covariances, within=self.within_label, between=self.between_label, scale=self.scale) with open(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.pkl', 'wb') as pickle_file: pickle.dump([self.dpgmm, self.type], pickle_file)
[docs] def load(self, fit_npz, fit_obj): '''Load the model from disk. Called from :func:`~loadClusterFit` Args: fit_npz (dict) Fit npz opened with :func:`numpy.load` fit_obj (sklearn.mixture.BayesianGaussianMixture) The saved fit object ''' self.dpgmm = fit_obj self.weights = fit_npz['weights'] self.means = fit_npz['means'] self.covariances = fit_npz['covariances'] self.scale = fit_npz['scale'] self.within_label = fit_npz['within'].item() self.between_label = fit_npz['between'].item() self.fitted = True
[docs] def plot(self, X, y): '''Extends :func:`~ClusterFit.plot` Write a summary of the fit, and plot the results using :func:`PopPUNK.plot.plot_results` and :func:`PopPUNK.plot.plot_contours` Args: X (numpy.array) Core and accessory distances y (numpy.array) Cluster assignments from :func:`~BGMMFit.assign` ''' ClusterFit.plot(self, X) # Generate a subsampling if one was not used in the fit if not hasattr(self, 'subsampled_X'): self.subsampled_X = utils.shuffle(X, random_state=random.randint(1,10000))[0:self.max_samples,] y_subsample = self.assign(self.subsampled_X, max_batch_size = self.max_batch_size, values=True, progress=False) avg_entropy = np.mean(np.apply_along_axis(stats.entropy, 1, y_subsample)) used_components = np.unique(y).size sys.stderr.write("Fit summary:\n" + "\n".join(["\tAvg. entropy of assignment\t" + "{:.4f}".format(avg_entropy), "\tNumber of components used\t" + str(used_components)]) + "\n\n") sys.stderr.write("Scaled component means:\n") for centre in self.means: sys.stderr.write("\t" + str(centre) + "\n") sys.stderr.write("\n") title = "DPGMM – estimated number of spatial clusters: " + str(len(np.unique(y))) outfile = self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_DPGMM_fit" plot_results(X, y, self.means, self.covariances, self.scale, title, outfile) plot_contours(self, y, title + " assignment boundary", outfile + "_contours")
[docs] def assign(self, X, max_batch_size = 100000, values = False, progress=True): '''Assign the clustering of new samples using :func:`~PopPUNK.bgmm.assign_samples` Args: X (numpy.array) Core and accessory distances values (bool) Return the responsibilities of assignment rather than most likely cluster max_batch_size (int) Size of batches to be assigned progress (bool) Show progress bar [default = True] Returns: y (numpy.array) Cluster assignments or values by samples ''' if not self.fitted: raise RuntimeError("Trying to assign using an unfitted model") else: if progress: sys.stderr.write("Assigning distances with BGMM model\n") if values: y = np.zeros((X.shape[0], len(self.weights)), dtype=X.dtype) else: y = np.zeros(X.shape[0], dtype=int) block_size = max_batch_size with SharedMemoryManager() as smm: shm_X = smm.SharedMemory(size = X.nbytes) X_shared_array = np.ndarray(X.shape, dtype = X.dtype, buffer = shm_X.buf) X_shared_array[:] = X[:] X_shared = NumpyShared(name = shm_X.name, shape = X.shape, dtype = X.dtype) shm_y = smm.SharedMemory(size = y.nbytes) y_shared_array = np.ndarray(y.shape, dtype = y.dtype, buffer = shm_y.buf) y_shared_array[:] = y[:] y_shared = NumpyShared(name = shm_y.name, shape = y.shape, dtype = y.dtype) thread_map(partial(assign_samples, X = X_shared, y = y_shared, model = self, scale = self.scale, chunk_size = block_size, values = values), range((X.shape[0] - 1) // block_size + 1), max_workers=self.threads, disable=(progress == False)) y[:] = y_shared_array[:] return y
[docs]class DBSCANFit(ClusterFit): '''Class for fits using HDBSCAN. Inherits from :class:`ClusterFit`. Must first run either :func:`~DBSCANFit.fit` or :func:`~DBSCANFit.load` before calling other functions Args: outPrefix (str) The output prefix used for reading/writing max_samples (int) The number of subsamples to fit the model to (default = 100000) ''' def __init__(self, outPrefix, use_gpu = False, max_batch_size = 5000, max_samples = 100000, assign_points = True): ClusterFit.__init__(self, outPrefix) self.type = 'dbscan' self.preprocess = True self.max_batch_size = max_batch_size self.max_samples = max_samples self.assign_points = assign_points self.use_gpu = use_gpu # Updated below
[docs] def fit(self, X, max_num_clusters, min_cluster_prop, use_gpu = False): '''Extends :func:`~ClusterFit.fit` Fits the distances with HDBSCAN and returns assignments by calling :func:`~PopPUNK.dbscan.fitDbScan`. Fitted parameters are stored in the object. Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. max_num_clusters (int) Maximum number of clusters in DBSCAN fitting min_cluster_prop (float) Minimum proportion of points in a cluster in DBSCAN fitting use_gpu (bool) Whether GPU algorithms should be used in DBSCAN fitting Returns: y (numpy.array) Cluster assignments of samples in X ''' ClusterFit.fit(self, X) # DBSCAN parameters cache_out = "./" + self.outPrefix + "_cache" min_samples = max(int(min_cluster_prop * self.subsampled_X.shape[0]), 10) # do not allow clusters of < 10 points min_samples = min(min_samples,1023) # do not allow clusters to require more than 1023 points at the start min_cluster_size = max(int(0.01 * self.subsampled_X.shape[0]), 10) # Check on initialisation of GPU libraries and memory # Convert to cupy if using GPU to avoid implicit numpy conversion below if use_gpu: try: import cudf from cuml import cluster import cupy as cp gpu_lib = True except ImportError as e: gpu_lib = False # check on GPU use_gpu = check_and_set_gpu(use_gpu, gpu_lib, quit_on_fail = True) if use_gpu: self.use_gpu = True self.subsampled_X = cp.asarray(self.subsampled_X) else: self.use_gpu = False indistinct_clustering = True while indistinct_clustering and min_cluster_size >= min_samples and min_samples >= 10: # Fit model self.hdb, self.labels, self.n_clusters = fitDbScan(self.subsampled_X, min_samples, min_cluster_size, cache_out, use_gpu = use_gpu) self.fitted = True # needed for predict # Test whether model fit contains distinct clusters if self.n_clusters > 1 and self.n_clusters <= max_num_clusters: if use_gpu: # get within strain cluster self.max_cluster_num = int(self.labels.max()) self.cluster_means = cp.full((self.n_clusters,2),0.0,dtype=float) self.cluster_mins = cp.full((self.n_clusters,2),0.0,dtype=float) self.cluster_maxs = cp.full((self.n_clusters,2),0.0,dtype=float) for i in range(self.max_cluster_num+1): labelled_rows = cp.where(self.labels==i,True,False) self.cluster_means[cp.array(i),] = [cp.mean(self.subsampled_X[labelled_rows,cp.array([0])]),cp.mean(self.subsampled_X[labelled_rows,cp.array([1])])] self.cluster_mins[cp.array(i),] = [cp.min(self.subsampled_X[labelled_rows,cp.array([0])]),cp.min(self.subsampled_X[labelled_rows,cp.array([1])])] self.cluster_maxs[cp.array(i),] = [cp.max(self.subsampled_X[labelled_rows,cp.array([0])]),cp.max(self.subsampled_X[labelled_rows,cp.array([1])])] else: # get within strain cluster self.max_cluster_num = self.labels.max() self.cluster_means = np.full((self.n_clusters,2),0.0,dtype=float) self.cluster_mins = np.full((self.n_clusters,2),0.0,dtype=float) self.cluster_maxs = np.full((self.n_clusters,2),0.0,dtype=float) for i in range(self.max_cluster_num+1): self.cluster_means[i,] = [np.mean(self.subsampled_X[self.labels==i,0]),np.mean(self.subsampled_X[self.labels==i,1])] self.cluster_mins[i,] = [np.min(self.subsampled_X[self.labels==i,0]),np.min(self.subsampled_X[self.labels==i,1])] self.cluster_maxs[i,] = [np.max(self.subsampled_X[self.labels==i,0]),np.max(self.subsampled_X[self.labels==i,1])] # Run assignment y = self.assign(self.subsampled_X, no_scale=True, progress=False, max_batch_size = self.subsampled_X.shape[0], use_gpu = use_gpu) # Evaluate clustering self.within_label = findWithinLabel(self.cluster_means, y) self.between_label = findBetweenLabel(y, self.within_label) indistinct_clustering = evaluate_dbscan_clusters(self) # Alter minimum cluster size criterion if min_cluster_size < min_samples / 2: min_samples = min_samples // 10 min_cluster_size = int(min_cluster_size / 2) # Report failure where it happens if indistinct_clustering: self.fitted = False sys.stderr.write("Failed to find distinct clusters in this dataset\n") sys.exit(1) elif not use_gpu: shutil.rmtree(cache_out) # Allow for partial fitting that only assigns the subsample not the full set if self.assign_points: y = self.assign(X, max_batch_size = self.max_batch_size, use_gpu = use_gpu) else: y = self.assign(self.subsampled_X, max_batch_size = self.max_batch_size, use_gpu = use_gpu) return y
[docs] def save(self): '''Save the model to disk, as an npz and pkl (using outPrefix).''' if not self.fitted: raise RuntimeError("Trying to save unfitted model") else: np.savez(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.npz', n_clusters=self.n_clusters, within=self.within_label, between=self.between_label, means=self.cluster_means, maxs=self.cluster_maxs, mins=self.cluster_mins, scale=self.scale, assign_points = self.assign_points, use_gpu=self.use_gpu) with open(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.pkl', 'wb') as pickle_file: pickle.dump([self.hdb, self.type], pickle_file)
[docs] def load(self, fit_npz, fit_obj): '''Load the model from disk. Called from :func:`~loadClusterFit` Args: fit_npz (dict) Fit npz opened with :func:`numpy.load` fit_obj (hdbscan.HDBSCAN) The saved fit object ''' self.hdb = fit_obj self.labels = self.hdb.labels_ self.n_clusters = fit_npz['n_clusters'] self.scale = fit_npz['scale'] self.within_label = fit_npz['within'].item() self.between_label = fit_npz['between'].item() self.cluster_means = fit_npz['means'] self.cluster_maxs = fit_npz['maxs'] self.cluster_mins = fit_npz['mins'] self.scale = fit_npz['scale'] if 'use_gpu' in fit_npz.keys(): self.use_gpu = fit_npz['use_gpu'] else: # Default for backwards compatibility self.use_gpu = False if 'assign_points' in fit_npz.keys(): self.assign_points = fit_npz['assign_points'] else: # Default for backwards compatibility self.assign_points = True self.fitted = True
[docs] def plot(self, X=None, y=None): '''Extends :func:`~ClusterFit.plot` Write a summary of the fit, and plot the results using :func:`PopPUNK.plot.plot_dbscan_results` Args: X (numpy.array) Core and accessory distances y (numpy.array) Cluster assignments from :func:`~BGMMFit.assign` ''' ClusterFit.plot(self, X) # Generate a subsampling if one was not used in the fit if not hasattr(self, 'subsampled_X'): self.subsampled_X = utils.shuffle(X, random_state=random.randint(1,10000))[0:self.max_samples,] non_noise = np.sum(self.labels != -1) sys.stderr.write("Fit summary:\n" + "\n".join(["\tNumber of clusters\t" + str(self.n_clusters), "\tNumber of datapoints\t" + str(self.subsampled_X.shape[0]), "\tNumber of assignments\t" + str(non_noise)]) + "\n\n") sys.stderr.write("Scaled component means\n") for centre in self.cluster_means: sys.stderr.write("\t" + str(centre) + "\n") sys.stderr.write("\n") # Harmonise scales if self.use_gpu: import cupy as cp self.scale = cp.asarray(self.scale) plot_dbscan_results(self.subsampled_X * self.scale, self.assign(self.subsampled_X, max_batch_size = self.max_batch_size, no_scale=True, progress=False, use_gpu=self.use_gpu), self.n_clusters, self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_dbscan", self.use_gpu)
[docs] def assign(self, X, no_scale = False, progress = True, max_batch_size = 5000, use_gpu = False): '''Assign the clustering of new samples using :func:`~PopPUNK.dbscan.assign_samples_dbscan` Args: X (numpy.array or cupy.array) Core and accessory distances no_scale (bool) Do not scale X [default = False] progress (bool) Show progress bar [default = True] max_batch_size (int) Batch size used for assignments [default = 5000] use_gpu (bool) Use GPU-enabled algorithms for clustering [default = False] Returns: y (numpy.array) Cluster assignments by samples ''' if not self.fitted: raise RuntimeError("Trying to assign using an unfitted model") else: if no_scale: scale = np.array([1, 1], dtype = X.dtype) else: scale = self.scale if progress: sys.stderr.write("Assigning distances with DBSCAN model\n") # Set block size block_size = max_batch_size if use_gpu: y = np.zeros(X.shape[0], dtype=int) n_blocks = (X.shape[0] - 1) // block_size + 1 for block in range(n_blocks): start_index = block*block_size end_index = min((block+1)*block_size-1,X.shape[0]) sys.stderr.write("Processing rows " + str(start_index) + " to " + str(end_index) + "\n") # cuml v24.02 always returns numpy therefore make conversion explicit y[start_index:end_index], y_probabilities = cluster.hdbscan.approximate_predict(self.hdb, X[start_index:end_index,], convert_dtype = True) del y_probabilities else: y = np.zeros(X.shape[0], dtype=int) n_blocks = (X.shape[0] - 1) // block_size + 1 with SharedMemoryManager() as smm: shm_X = smm.SharedMemory(size = X.nbytes) X_shared_array = np.ndarray(X.shape, dtype = X.dtype, buffer = shm_X.buf) X_shared_array[:] = X[:] X_shared = NumpyShared(name = shm_X.name, shape = X.shape, dtype = X.dtype) shm_y = smm.SharedMemory(size = y.nbytes) y_shared_array = np.ndarray(y.shape, dtype = y.dtype, buffer = shm_y.buf) y_shared_array[:] = y[:] y_shared = NumpyShared(name = shm_y.name, shape = y.shape, dtype = y.dtype) tqdm.set_lock(RLock()) process_map(partial(assign_samples, X = X_shared, y = y_shared, model = self, scale = scale, chunk_size = block_size, values = False), range(n_blocks), max_workers=self.threads, chunksize=min(10, max(1, n_blocks // self.threads)), disable=(progress == False)) y[:] = y_shared_array[:] return y
[docs]class RefineFit(ClusterFit): '''Class for fits using a triangular boundary and network properties. Inherits from :class:`ClusterFit`. Must first run either :func:`~RefineFit.fit` or :func:`~RefineFit.load` before calling other functions Args: outPrefix (str) The output prefix used for reading/writing ''' def __init__(self, outPrefix): ClusterFit.__init__(self, outPrefix) self.type = 'refine' self.preprocess = False self.within_label = -1 self.slope = 2 self.threshold = False self.unconstrained = False self.assign_points = True
[docs] def fit(self, X, sample_names, model, max_move, min_move, startFile = None, indiv_refine = False, unconstrained = False, multi_boundary = 0, score_idx = 0, no_local = False, betweenness_sample = betweenness_sample_default, sample_size = None, use_gpu = False): '''Extends :func:`~ClusterFit.fit` Fits the distances by optimising network score, by calling :func:`~PopPUNK.refine.refineFit2D`. Fitted parameters are stored in the object. Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. sample_names (list) Sample names in X (accessed by :func:`~PopPUNK.utils.iterDistRows`) model (ClusterFit) The model fit to refine max_move (float) Maximum distance to move away from start point min_move (float) Minimum distance to move away from start point startFile (str) A file defining an initial fit, rather than one from ``--fit-model``. See documentation for format. (default = None). indiv_refine (str) Run refinement for core or accessory distances separately (default = None). multi_boundary (int) Produce cluster output at multiple boundary positions downward from the optimum. (default = 0). unconstrained (bool) If True, search in 2D and change the slope of the boundary score_idx (int) Index of score from :func:`~PopPUNK.network.networkSummary` to use [default = 0] no_local (bool) Turn off the local optimisation step. Quicker, but may be less well refined. betweenness_sample (int) Number of sequences per component used to estimate betweenness using a GPU. Smaller numbers are faster but less precise [default = 100] sample_size (int) Number of nodes to subsample for graph statistic calculation use_gpu (bool) Whether to use cugraph for graph analyses Returns: y (numpy.array) Cluster assignments of samples in X ''' ClusterFit.fit(self) self.scale = np.copy(model.scale) self.max_move = max_move self.min_move = min_move self.unconstrained = unconstrained # Get starting point model.no_scale() if startFile: self.mean0, self.mean1, scaled = readManualStart(startFile) if not scaled: self.mean0 /= self.scale self.mean1 /= self.scale elif model.type == 'dbscan': sys.stderr.write("Initial model-based network construction based on DBSCAN fit\n") self.mean0 = model.cluster_means[model.within_label, :] self.mean1 = model.cluster_means[model.between_label, :] elif model.type == 'bgmm': sys.stderr.write("Initial model-based network construction based on Gaussian fit\n") self.mean0 = model.means[model.within_label, :] self.mean1 = model.means[model.between_label, :] else: raise RuntimeError("Unrecognised model type") # Main refinement in 2D scaled_X = X / self.scale self.optimal_x, self.optimal_y, optimal_s = \ refineFit(scaled_X, sample_names, self.mean0, self.mean1, self.scale, self.max_move, self.min_move, slope = 2, score_idx = score_idx, unconstrained = unconstrained, no_local = no_local, num_processes = self.threads, betweenness_sample = betweenness_sample, sample_size = sample_size, use_gpu = use_gpu) self.fitted = True # Output clusters at more positions if requested if multi_boundary > 1: sys.stderr.write("Creating multiple boundary fits\n") multi_refine(scaled_X, sample_names, self.mean0, self.mean1, self.scale, optimal_s, multi_boundary, self.outPrefix, num_processes = self.threads, betweenness_sample = betweenness_sample, sample_size = sample_size, use_gpu = use_gpu) # Try and do a 1D refinement for both core and accessory self.core_boundary = self.optimal_x self.accessory_boundary = self.optimal_y if indiv_refine is not None: try: for dist_type, slope in zip(['core', 'accessory'], [0, 1]): if indiv_refine == 'both' or indiv_refine == dist_type: sys.stderr.write("Refining " + dist_type + " distances separately\n") # optimise core distance boundary core_boundary, accessory_boundary, s = \ refineFit(scaled_X, sample_names, self.mean0, self.mean1, self.scale, self.max_move, self.min_move, slope = slope, score_idx = score_idx, no_local = no_local, num_processes = self.threads, betweenness_sample = betweenness_sample, sample_size = sample_size, use_gpu = use_gpu) if dist_type == "core": self.core_boundary = core_boundary if dist_type == "accessory": self.accessory_boundary = accessory_boundary self.indiv_fitted = True except RuntimeError as e: print(e) sys.stderr.write("Could not separately refine core and accessory boundaries. " "Using joint 2D refinement only.\n") y = self.assign(X) return y
[docs] def apply_threshold(self, X, threshold): '''Applies a boundary threshold, given by user. Does not run optimisation. Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. threshold (float) The value along the x-axis (core distance) at which to draw the assignment boundary Returns: y (numpy.array) Cluster assignments of samples in X ''' self.scale = np.array([1, 1], dtype = X.dtype) # Blank values to pass to plot self.mean0 = None self.mean1 = None self.min_move = None self.max_move = None # Sets threshold self.core_boundary = threshold self.accessory_boundary = np.nan self.optimal_x = threshold self.optimal_y = np.nan self.slope = 0 # Flags on refine model self.fitted = True self.threshold = True self.indiv_fitted = False self.unconstrained = False y = self.assign(X) return y
[docs] def save(self): '''Save the model to disk, as an npz and pkl (using outPrefix).''' if not self.fitted: raise RuntimeError("Trying to save unfitted model") else: np.savez(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.npz', intercept=np.array([self.optimal_x, self.optimal_y]), core_acc_intercepts=np.array([self.core_boundary, self.accessory_boundary]), scale=self.scale, indiv_fitted=self.indiv_fitted) with open(self.outPrefix + "/" + os.path.basename(self.outPrefix) + '_fit.pkl', 'wb') as pickle_file: pickle.dump([None, self.type], pickle_file)
[docs] def load(self, fit_npz, fit_obj): '''Load the model from disk. Called from :func:`~loadClusterFit` Args: fit_npz (dict) Fit npz opened with :func:`numpy.load` fit_obj (None) The saved fit object (not used) ''' self.optimal_x = fit_npz['intercept'].item(0) self.optimal_y = fit_npz['intercept'].item(1) self.core_boundary = fit_npz['core_acc_intercepts'].item(0) self.accessory_boundary = fit_npz['core_acc_intercepts'].item(1) self.scale = fit_npz['scale'] self.fitted = True if 'indiv_fitted' in fit_npz: self.indiv_fitted = fit_npz['indiv_fitted'] else: self.indiv_fitted = False # historical behaviour for backward compatibility if np.isnan(self.optimal_y) and np.isnan(self.accessory_boundary): self.threshold = True # blank values to pass to plot (used in --use-model) self.mean0 = None self.mean1 = None self.min_move = None self.max_move = None
[docs] def plot(self, X, y=None): '''Extends :func:`~ClusterFit.plot` Write a summary of the fit, and plot the results using :func:`PopPUNK.plot.plot_refined_results` Args: X (numpy.array) Core and accessory distances y (numpy.array) Assignments (unused) ''' ClusterFit.plot(self, X) # Subsamples huge plots to save on memory max_points = int(0.5*(5000)**2) if X.shape[0] > max_points: plot_X = utils.shuffle(X, random_state=random.randint(1, 10000))[0:max_points, ] else: plot_X = X plot_refined_results(plot_X, self.assign(plot_X), self.optimal_x, self.optimal_y, self.core_boundary, self.accessory_boundary, self.mean0, self.mean1, self.min_move, self.max_move, self.scale, self.threshold, self.indiv_fitted, self.unconstrained, "Refined fit boundary", self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_refined_fit")
[docs] def assign(self, X, slope=None): '''Assign the clustering of new samples Args: X (numpy.array) Core and accessory distances slope (int) Override self.slope. Default - use self.slope Set to 0 for a vertical line, 1 for a horizontal line, or 2 to use a slope Returns: y (numpy.array) Cluster assignments by samples ''' if not self.fitted: raise RuntimeError("Trying to assign using an unfitted model") else: if slope == None: slope = self.slope if slope == 2: y = poppunk_refine.assignThreshold(X/self.scale, 2, self.optimal_x, self.optimal_y, self.threads) elif slope == 0: y = poppunk_refine.assignThreshold(X/self.scale, 0, self.core_boundary, 0, self.threads) elif slope == 1: y = poppunk_refine.assignThreshold(X/self.scale, 1, 0, self.accessory_boundary, self.threads) return y
# Wrapper function for LineageFit.__reduce_rank__ to be called by # multiprocessing threads def reduce_rank(lower_rank, fit, higher_rank_sparse_mat, n_samples, dtype): # Only modify the matrix if the method or rank differs - otherwise save in unmodified form if lower_rank==fit.max_search_depth and fit.reciprocal_only is False and fit.count_unique_distances is False: fit.__save_sparse__(higher_rank_sparse_mat[2], higher_rank_sparse_mat[0], higher_rank_sparse_mat[1], lower_rank, n_samples, dtype) else: fit.__reduce_rank__(higher_rank_sparse_mat, lower_rank, n_samples, dtype)
[docs]class LineageFit(ClusterFit): '''Class for fits using the lineage assignment model. Inherits from :class:`ClusterFit`. Must first run either :func:`~LineageFit.fit` or :func:`~LineageFit.load` before calling other functions Args: outPrefix (str) The output prefix used for reading/writing ranks (list) The ranks used in the fit ''' def __init__(self, outPrefix, ranks, max_search_depth, reciprocal_only, count_unique_distances, dist_col = None, use_gpu = False): ClusterFit.__init__(self, outPrefix) self.type = 'lineage' self.preprocess = False self.max_search_depth = max_search_depth+5 # Set to highest rank by default in main; need to store additional distances # when there is redundancy (e.g. reciprocal matching, unique distance counting) # or other sequences may be pruned out of the database self.nn_dists = None # stores the unprocessed kNN at the maximum search depth self.ranks = [] for rank in sorted(ranks): if (rank < 1): sys.stderr.write("Rank must be at least 1") sys.exit(0) else: self.ranks.append(int(rank)) self.lower_rank_dists = {} self.reciprocal_only = reciprocal_only self.count_unique_distances = count_unique_distances self.dist_col = dist_col self.use_gpu = use_gpu def __save_sparse__(self, data, row, col, rank, n_samples, dtype, is_nn_dist = False): '''Save a sparse matrix in coo format ''' if self.use_gpu: data = cp.array(data) data[data < epsilon] = epsilon if is_nn_dist: self.nn_dists = cupyx.scipy.sparse.coo_matrix((data, (cp.array(row), cp.array(col))), shape=(n_samples, n_samples), dtype = dtype) else: self.lower_rank_dists[rank] = cupyx.scipy.sparse.coo_matrix((data, (cp.array(row), cp.array(col))), shape=(n_samples, n_samples), dtype = dtype) else: data = np.array(data) data[data < epsilon] = epsilon if is_nn_dist: self.nn_dists = scipy.sparse.coo_matrix((data, (row, col)), shape=(n_samples, n_samples), dtype = dtype) else: self.lower_rank_dists[rank] = scipy.sparse.coo_matrix((data, (row, col)), shape=(n_samples, n_samples), dtype = dtype) def __reduce_rank__(self, higher_rank_sparse_mat, lower_rank, n_samples, dtype): '''Lowers the rank of a fit and saves it ''' lower_rank_sparse_mat = \ poppunk_refine.lowerRank( higher_rank_sparse_mat, n_samples, lower_rank, self.reciprocal_only, self.count_unique_distances, self.threads) self.__save_sparse__(lower_rank_sparse_mat[2], lower_rank_sparse_mat[0], lower_rank_sparse_mat[1], lower_rank, n_samples, dtype)
[docs] def fit(self, X, accessory): '''Extends :func:`~ClusterFit.fit` Gets assignments by using nearest neigbours. Args: X (numpy.array) The core and accessory distances to cluster. Must be set if preprocess is set. accessory (bool) Use accessory rather than core distances Returns: y (numpy.array) Cluster assignments of samples in X ''' ClusterFit.fit(self, X) sample_size = int(round(0.5 * (1 + np.sqrt(1 + 8 * X.shape[0])))) if (max(self.ranks) >= sample_size): sys.stderr.write("Rank must be less than the number of samples") sys.exit(0) if accessory: self.dist_col = 1 else: self.dist_col = 0 row, col, data = \ poppunk_refine.get_kNN_distances( distMat=pp_sketchlib.longToSquare(distVec=X[:, [self.dist_col]], num_threads=self.threads), kNN=self.max_search_depth, dist_col=self.dist_col, num_threads=self.threads ) self.__save_sparse__(data, row, col, self.max_search_depth, sample_size, X.dtype, is_nn_dist = True) # Apply filtering of links if requested and extract lower ranks - parallelisation within C++ code for rank in self.ranks: reduce_rank( rank, fit=self, higher_rank_sparse_mat=(row, col, data), n_samples=sample_size, dtype=X.dtype ) self.fitted = True y = self.assign(min(self.ranks)) return y
[docs] def save(self): '''Save the model to disk, as an npz and pkl (using outPrefix).''' if not self.fitted: raise RuntimeError("Trying to save unfitted model") else: scipy.sparse.save_npz( self.outPrefix + "/" + os.path.basename(self.outPrefix) + \ '_sparse_dists.npz', self.nn_dists) for rank in self.ranks: scipy.sparse.save_npz( self.outPrefix + "/" + os.path.basename(self.outPrefix) + \ rankFile(rank), self.lower_rank_dists[rank]) with open(self.outPrefix + "/" + os.path.basename(self.outPrefix) + \ '_fit.pkl', 'wb') as pickle_file: pickle.dump([[self.ranks, self.max_search_depth, self.reciprocal_only, self.count_unique_distances, self.dist_col], self.type], pickle_file)
[docs] def load(self, fit_npz, fit_obj): '''Load the model from disk. Called from :func:`~loadClusterFit` Args: fit_npz (dict) Fit npz opened with :func:`numpy.load` fit_obj (sklearn.mixture.BayesianGaussianMixture) The saved fit object ''' self.ranks, self.max_search_depth, self.reciprocal_only, self.count_unique_distances, self.dist_col = fit_obj self.nn_dists = fit_npz self.fitted = True
[docs] def plot(self, X, y = None): '''Extends :func:`~ClusterFit.plot` Write a summary of the fit, and plot the results using :func:`PopPUNK.plot.plot_results` and :func:`PopPUNK.plot.plot_contours` Args: X (numpy.array) Core and accessory distances y (any) Unused variable for compatibility with other plotting functions ''' ClusterFit.plot(self, X) for rank in self.ranks: if self.use_gpu: hist_data = self.lower_rank_dists[rank].get().data else: hist_data = self.lower_rank_dists[rank].data distHistogram(hist_data, rank, self.outPrefix + "/" + os.path.basename(self.outPrefix))
[docs] def assign(self, rank): '''Get the edges for the network. A little different from other methods, as it doesn't go through the long form distance vector (as coo_matrix is basically already in the correct gt format) Args: rank (int) Rank to assign at Returns: y (list of tuples) Edges to include in network ''' if not self.fitted: raise RuntimeError("Trying to assign using an unfitted model") else: y = [] for row, col in zip(self.lower_rank_dists[rank].row, self.lower_rank_dists[rank].col): y.append((row, col)) return y
[docs] def edge_weights(self, rank): '''Get the distances for each edge returned by assign Args: rank (int) Rank assigned at Returns: weights (list) Distance for each assignment ''' if not self.fitted: raise RuntimeError("Trying to get weights from an unfitted model") else: return (self.lower_rank_dists[rank].data)
[docs] def extend(self, qqDists, qrDists): '''Update the sparse distance matrix of nearest neighbours after querying Args: qqDists (numpy or cupy ndarray) Two column array of query-query distances qqDists (numpy or cupy ndarray) Two column array of reference-query distances Returns: y (list of tuples) Edges to include in network ''' # Convert data structures if using GPU if self.use_gpu: qqDists = cp.array(qqDists) qrDists = cp.array(qrDists) # Reshape qq and qr dist matrices qqSquare = pp_sketchlib.longToSquare(distVec=qqDists[:, [self.dist_col]], num_threads=self.threads) qqSquare[qqSquare < epsilon] = epsilon n_ref = self.nn_dists.shape[0] n_query = qqSquare.shape[1] qrRect = qrDists[:, [self.dist_col]].reshape(n_query, n_ref).T qrRect[qrRect < epsilon] = epsilon higher_rank = \ poppunk_refine.extend( (self.nn_dists.row, self.nn_dists.col, self.nn_dists.data), qqSquare, qrRect, self.max_search_depth, self.threads) # Update NN dist associated with model self.__save_sparse__(higher_rank[2], higher_rank[0], higher_rank[1], self.max_search_depth, n_ref + n_query, self.nn_dists.dtype, is_nn_dist = True) # Apply lower ranks - parallelisation within C++ code for rank in self.ranks: reduce_rank( rank, fit=self, higher_rank_sparse_mat=higher_rank, n_samples=n_ref + n_query, dtype=self.nn_dists.dtype ) y = self.assign(min(self.ranks)) return y