# vim: set fileencoding=<utf-8> :# Copyright 2018-2023 John Lees and Nick Croucher'''DBSCAN using hdbscan'''# universalimportosimportsys# hdbscanimporthdbscanfrom.utilsimportcheck_and_set_gpu
[docs]deffitDbScan(X,min_samples,min_cluster_size,cache_out,use_gpu=False):"""Function to fit DBSCAN model as an alternative to the Gaussian Fits the DBSCAN model to the distances using hdbscan Args: X (np.array) n x 2 array of core and accessory distances for n samples min_samples (int) Parameter for DBSCAN clustering 'conservativeness' min_cluster_size (int) Minimum number of points in a cluster for HDBSCAN cache_out (str) Prefix for DBSCAN cache used for refitting use_gpu (bool) Whether GPU algorithms should be used in DBSCAN fitting Returns: hdb (hdbscan.HDBSCAN or cuml.cluster.HDBSCAN) Fitted HDBSCAN to subsampled data labels (list) Cluster assignments of each sample n_clusters (int) Number of clusters used """# set DBSCAN clustering parametersifuse_gpu:fromcumlimportclusterimportcupyascpsys.stderr.write('Fitting HDBSCAN model using a GPU\n')hdb=cluster.hdbscan.HDBSCAN(min_samples=min_samples,output_type='cupy',prediction_data=True,min_cluster_size=min_cluster_size).fit(X)# Number of clusters in labels, ignoring noise if present.labels=hdb.labels_n_clusters=len(cp.unique(labels[labels>-1]))else:sys.stderr.write('Fitting HDBSCAN model using a CPU\n')hdb=hdbscan.HDBSCAN(algorithm='boruvka_balltree',min_samples=min_samples,#core_dist_n_jobs = threads, # may cause error, see #19memory=cache_out,prediction_data=True,min_cluster_size=min_cluster_size).fit(X)# Number of clusters in labels, ignoring noise if present.labels=hdb.labels_n_clusters=len(set(labels))-(1if-1inlabelselse0)# return model parametersreturnhdb,labels,n_clusters
[docs]defevaluate_dbscan_clusters(model):"""Evaluate whether fitted dbscan model contains non-overlapping clusters Args: model (DBSCANFit) Fitted model from :func:`~PopPUNK.models.DBSCANFit.fit` Returns: indistinct (bool) Boolean indicating whether putative within- and between-strain clusters of points overlap """indistinct=True# calculate ranges of minima and maximacore_minimum_of_between=model.cluster_mins[model.between_label,0]core_maximum_of_within=model.cluster_maxs[model.within_label,0]accessory_minimum_of_between=model.cluster_mins[model.between_label,1]accessory_maximum_of_within=model.cluster_maxs[model.within_label,1]# evaluate whether maxima of cluster nearest origin do not# overlap with minima of cluster furthest from originifcore_minimum_of_between>core_maximum_of_withinor \
accessory_minimum_of_between>accessory_maximum_of_within:indistinct=False# return distinctivenessreturnindistinct
[docs]deffindBetweenLabel(assignments,within_cluster):"""Identify between-strain links from a DBSCAN model Finds the component containing the largest number of between-strain links, excluding the cluster identified as containing within-strain links. Args: assignments (numpy.array) Sample cluster assignments within_cluster (int) Cluster ID assigned to within-strain assignments, from :func:`~PopPUNK.bgmm.findWithinLabel` Returns: between_cluster (int) The cluster label for the between-strain assignments """# remove noise and within-strain distance clusterassignment_list=assignments.tolist()assignment_list=list(filter((within_cluster).__ne__,assignment_list))# remove within-clusterassignment_list=list(filter((-1).__ne__,assignment_list))# remove noise# identify non-within cluster with most membersbetween_cluster=max(set(assignment_list),key=assignment_list.count)returnbetween_cluster