Source code for GPmix.unigmm

import math
import numpy as np
import skfda
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import least_squares
from scipy.stats import norm
from math import comb, factorial, log, floor
from cmath import log as clog
from sklearn.mixture import GaussianMixture
from sklearn.cluster import SpectralClustering
from sklearn.metrics import adjusted_mutual_info_score as ami,  adjusted_rand_score as ari, accuracy_score
from itertools import permutations
from joblib import Parallel, delayed
from .misc import gmms_fit_plot_, silhouette_score, davies_bouldin_score
import warnings


"""
unigmm.py
=========

This module provides classes for estimating parameters of base univariate GMMs and also for formulating
a consensus clustering from these GMMs. The main classes are:

- GaussianMixtureParameterEstimator: Estimates parameters of a univariate GMM using the method of moments.
- UniGaussianMixtureEnsemble: Constructs a consensus clustering using an ensemble of univariate
"""

[docs] class GaussianMixtureParameterEstimator(): """ Estimate parameters of a univariate Gaussian mixture model (GMM) using the method of moments. This estimator implements a numerically stable method-of-moments approach for univariate GMMs, using the log-sum-exp trick for moment calculations. It supports flexible initialization, simple parameter constraints, and optional normalization of the input data. THIS IS STILL EXPERIMENTAL. USE WITH CAUTION. Parameters ---------- n_comp : int Number of mixture components. n_moments : int or None, optional Highest order of moment to consider. If ``None``, defaults to ``4 * n_comp - 2``. epsilon : float, optional Lower bound on mixture weights and bounds for means/variances used to improve numerical stability. If set to 0.0, no additional constraints are applied. Attributes ---------- n_comp : int Number of mixture components. n_moments : int Highest order of moment used during fitting. epsilon : float Lower bound/constraint strength used during fitting. init_guess_ : ndarray of shape (3 * n_comp,) Initial parameter guess in the order: weights, means, variances. Notes ----- The method of moments can be sensitive to both the choice of initialization and the moment order. The solver minimizes a system of moment equations via non-linear least squares. For stability, computed log-moments use the log-sum-exp trick. """ def __init__(self, n_comp: int, n_moments: int or None = None, epsilon= 0.0) -> None: self.n_comp = n_comp self.epsilon = epsilon if n_moments is None: self.n_moments= 4 * self.n_comp - 2 else: self.n_moments = n_moments
[docs] def log_sample_moments(self, data: np.ndarray, order: int) -> float: '''Compute the log of sample moments using log-sum-exp trick''' n = len(data) log_data = np.array([clog(x) for x in data]) a = np.max(log_data).real sumexp = np.sum(np.exp(order * (log_data - a))) return order * a - log(n) + clog(sumexp)
[docs] def log_theoretical_moments(self, parameters: np.ndarray or list, order: int) -> float: ''' Compute the log of theoretical moments of a univariate Gaussian using log-sum-exp trick Attributes ------ parameters : array-like of shape (2, ) mean and variance of univatiate Gaussian, strictly this order: mean, variance order : int order of moment ''' summand_ = lambda mu, sigma2, j : clog(comb(order, 2 * j) * factorial(2 * j) / (factorial(j) * 2 ** j)) + (order - (2 * j)) * clog(mu) + j * clog(sigma2) summands = np.array([summand_(*parameters, j) for j in range(floor(order / 2) + 1)]) a = np.max(summands).real sumexp = np.sum(np.exp(summands - a)) return a + clog(sumexp)
[docs] def log_theoretical_mixture_moments(self, parameters: np.ndarray or list, order: int) -> float: ''' Compute the log of theoretical moments of a univariate Gaussian mixture using log-sum-exp trick Attributes ------ parameters : array-like of shape (3 * n_comp, ) weights, means and variances of univatiate Gaussian, strictly this order: weights, means, variances order : int order of moment ''' assert len(parameters) == 3 * self.n_comp , "Ensure that the parameters are ordered this way: p_1, p_2, ..., p_k, mu_1, mu_2, ..., mu_k, sigma2_1, sigma2_2, ... sigma2_k." weight_ = parameters[: self.n_comp] mu_var_ = np.array(parameters[self.n_comp : ]).reshape(-1, 2) log_moments = [self.log_theoretical_moments(param_, order) for param_ in mu_var_] exponents = np.log(weight_) + np.array(log_moments) max_exponent = np.max(exponents).real sumexp = np.sum(np.exp(exponents - max_exponent)) return max_exponent + clog(sumexp)
[docs] def fit(self, data: np.ndarray, normalize: bool = True, full_output: bool = False): ''' Fit data to univariate GMM using method of moment estimation Attributes ------ data : array-like of shape (sample size,) Sample data. normalize : boolean, default = True. If True, the already centered data will be transformed to having unit variance. full_output : boolean, default = False If True, a comprehensive report of the (optimization) solver is returned, else only the estimates are returned. returns ------- If 'full_output = False', returns an array of shape (3*n_comp,); ordered in the form weights, means, variance. else if 'full_output = True', a comprehensive report on the (optimization) solver is returned. ''' data_std = np.std(data) if normalize: #normalize data to mean 0 and unit variance # data in the algorithm is already centered, only scale data with the std data = data / data_std if self.epsilon: l_bounds = [(self.epsilon) for _ in range(self.n_comp)] + [(-1/ self.epsilon) for _ in range(self.n_comp)] + [(1e-15) for _ in range(self.n_comp)] h_bounds = [(1 - self.epsilon) for _ in range(self.n_comp)] + [(1/self.epsilon) for _ in range(self.n_comp)] + [(1 / self.epsilon) for _ in range(self.n_comp)] #set feasible initialization weight_init = np.random.uniform(self.epsilon, 1/self.n_comp, self.n_comp) mean_init = np.random.uniform(-1/self.epsilon, 1/self.epsilon, self.n_comp) var_init = np.random.uniform(1e-15, 1 / self.epsilon, self.n_comp) else: l_bounds = [(0) for _ in range(self.n_comp)] + [(-np.inf) for _ in range(self.n_comp)] + [(1e-15) for _ in range(self.n_comp)] h_bounds = [(1) for _ in range(self.n_comp)] + [(np.inf) for _ in range(self.n_comp)] + [(np.inf) for _ in range(self.n_comp)] #set feasible initialization weight_init = np.random.uniform(0, 1/self.n_comp, self.n_comp) mean_init = np.random.rand(self.n_comp) var_init = np.abs(np.random.rand(self.n_comp)) self.init_guess_ = np.concatenate((weight_init/np.sum(weight_init), mean_init, var_init)) #the system of moments moment_system = lambda input_vars: [input_vars[: self.n_comp].sum() - 1, input_vars[: self.n_comp] @ np.array(input_vars[self.n_comp : 2 * self.n_comp])] + [2 * (self.log_theoretical_mixture_moments(input_vars, order).real - self.log_sample_moments(data, order)).real for order in range(2, self.n_moments + 1)] #set up the solver with warnings.catch_warnings(): warnings.simplefilter("ignore") res = least_squares(moment_system, self.init_guess_, bounds= (tuple(l_bounds), tuple(h_bounds)), max_nfev = 200 * len(self.init_guess_), ftol = 1e-20, gtol = 1e-6) ## check if iteration was successful if not res.success: warnings.warn("Numerical iteration failed to converge:{message}. EM is initialized with values from the values from last iteration. As this may not be optimal, it is recommended that other initialization techniques be employed".format(message = res.message)) if normalize: #adjust the mean (scale by data std) and std (scale by data std^2) res.x = np.concatenate((res.x[ :self.n_comp], data_std * res.x[self.n_comp : 2 * self.n_comp], data_std ** 2 * res.x[2 * self.n_comp : ])) # if res.x[ : self.n_comp].sum() != 1: # res.x[ :self.n_comp] = np.abs(res.x[:self.n_comp]) / res.x[ : self.n_comp].sum() if full_output: return res else: return res.x
[docs] class UniGaussianMixtureEnsemble: """ Consensus clustering using an ensemble of univariate Gaussian Mixture Models (GMMs). This class fits univariate GMMs to multiple one-dimensional projections of the data, computes base clusterings, and combines them into a consensus clustering using spectral clustering on an affinity matrix built from binary membership matrices. Base clusterings are weighted by an estimate of their total misclassification probability. Parameters ---------- n_clusters : int Number of mixture components (clusters) to fit in each GMM and in the consensus clustering. init_method : {"kmeans", "k-means++", "random", "random_from_data", "mom"}, optional Initialization method for GMM parameters. Default is ``"kmeans"``. The ``"mom"`` option uses method-of-moments initialization. n_init : int, optional Number of initializations to perform for each GMM fit. The best result is kept. Default is ``10``. mom_epsilon : float, optional Lower bound for GMM weights (and related constraints) when using ``init_method="mom"``. Ignored otherwise. Default is ``5e-2``. Attributes ---------- n_projs : int Number of projections (base clusterings). data_size : int Number of samples in the data. gmms : tuple of sklearn.mixture.GaussianMixture Fitted univariate GMMs for each projection. MoM_res : tuple If ``init_method == 'mom'``, the method-of-moments solver results for each projection. clustering_weights_ : ndarray of shape (n_projs,) Weights assigned to each base clustering. labels_ : ndarray of shape (n_samples,) Cluster labels assigned by the consensus clustering. max_cca_labels_ : tuple Permutation of predicted labels that yields the highest classification accuracy when compared to ground truth. Notes ----- The affinity matrix is constructed as a weighted sum of outer products of binary membership matrices (one per projection), where the weights are proportional to the inverse of each GMM's estimated total misclassification probability. """ def __init__(self, n_clusters: int, init_method: str = 'kmeans', n_init: int = 10, mom_epsilon: float = 5e-2) -> None: """ Initialize the UniGaussianMixtureEnsemble. Parameters ---------- n_clusters : int Number of mixture components (clusters). init_method : str, default='kmeans' Initialization method for GMMs. One of 'kmeans', 'k-means++', 'random', 'random_from_data', 'mom'. n_init : int, default=10 Number of initializations for each GMM. mom_epsilon : float, default=5e-2 Lower bound for GMM weights when using 'mom' initialization. """ self.n_clusters = n_clusters self.init_method = init_method self.n_init = n_init self.mom_epsilon = mom_epsilon assert self.init_method in ['kmeans', 'k-means++', 'random', 'random_from_data', 'mom'], \ "Unknown value for 'init_method'. Set to one of: 'kmeans', 'k-means++', 'random', 'random_from_data', 'mom'."
[docs] def gmm_with_MoM_inits(self, data: np.ndarray): '''Fit gmms with initialization from method of moment estimation''' gmm_, res_ = [], [] for _ in range(self.n_init): #get inits try: mom_estimator = GaussianMixtureParameterEstimator(n_comp= self.n_clusters, epsilon= self.mom_epsilon) res = mom_estimator.fit(data, full_output = True) res_.append(res) except ValueError as error: print("'mom' initialization is not appropriate for this dataset. Attempt to reduce the number of projection basis or use other initialization methods'.") raise(error) #process inits mom_inits = res.x.reshape(-1, self.n_clusters) #adjust to ensure weights sum to 1 mom_inits[0] = np.abs(mom_inits[0]) / np.abs(mom_inits[0]).sum() #pass inits to EM solver and fit gmm = GaussianMixture(n_components = self.n_clusters, covariance_type= 'spherical', weights_init=mom_inits[0], means_init= mom_inits[1].reshape(-1,1), precisions_init= 1 / mom_inits[2]) gmm.fit(data) gmm_.append(gmm) #select the highest lower bound ind = np.argmax([[gmm.lower_bound_ for gmm in gmm_]]) return gmm_[ind], res_[ind]
#helper function to parallelize gmm fiting with MOM inits def _fit_gmms_with_MoM_inits(self, proj_coeffs: np.ndarray): gmm, res = self.gmm_with_MoM_inits(proj_coeffs.reshape(-1,1)) return (gmm, res) #helper function to parallelize gmm fiting with other inits def _fit_gmms_with_other_inits(self, proj_coeffs: np.ndarray): gmm = GaussianMixture(n_components = self.n_clusters, covariance_type='spherical', n_init= self.n_init, init_params= self.init_method) gmm.fit(proj_coeffs.reshape(-1,1)) return gmm
[docs] def fit_gmms(self, projs_coeffs: np.ndarray, n_jobs = -1, **kwargs): """ Fit projection coefficients to univariate Gaussian mixture models Arguments --------- projs_coeffs : array-like of shape (number of projections, number of samples) array of projection coefficients to fit to univariate GMMs. kwargs : keyword arguments for joblib Parallel """ self.projs_coeffs = projs_coeffs self.n_projs = self.projs_coeffs.shape[0] self.data_size = self.projs_coeffs.shape[1] if self.init_method == 'mom': #parallelize proccess with joblib results = Parallel(n_jobs = n_jobs, **kwargs)(delayed(self._fit_gmms_with_MoM_inits)(proj_coeffs) for proj_coeffs in self.projs_coeffs) self.gmms, self.MoM_res = zip(*results) elif self.init_method in ['kmeans', 'k-means++', 'random', 'random_from_data']: #parallelize proccess with joblib self.gmms = Parallel(n_jobs = n_jobs, **kwargs)(delayed(self._fit_gmms_with_other_inits)(proj_coeffs) for proj_coeffs in self.projs_coeffs)
# def gmms_lower_bound_(self): # return np.sum([g.lower_bound_ for g in self.gmms])
[docs] def plot_gmms(self, ncols: int = 4, fontsize: int = 12, fig_kws = {}, **kwargs): '''Visualize GMM fits Parameters ---------- ncols : int, optional Number of columns in the plot grid. Default is 4. fontsize : int, optional Font size for axis labels. Default is 12. fig_kws : dict, optional Additional keyword arguments for figure creation. Default is an empty dict. kwargs : dict, optional Additional keyword arguments for seaborn's histplot function. Default is an empty dict. ''' # create the figure and axes if self.n_projs == 1: fig, axes = plt.subplots(1, self.n_projs, **fig_kws) axes = [axes] elif self.n_projs < 4: fig, axes = plt.subplots(1, self.n_projs, **fig_kws) axes = axes.ravel() # flattening the array makes indexing easier else: fig, axes = plt.subplots(int(np.ceil(self.n_projs / ncols)), ncols, **fig_kws) axes = axes.ravel() # flattening the array makes indexing easier # label_ = ['alpha_{i' + str(i) + '}' for i in range(1, self.n_projs + 1)] for i, coeffs, ax in zip(range(self.n_projs), self.projs_coeffs, axes): sns.histplot(data=coeffs, stat='density', ax=ax, **kwargs) gmms_fit_plot_(self.gmms[i].weights_, self.gmms[i].means_.ravel(), np.sqrt(self.gmms[i].covariances_), ax= ax, color = 'r') label_ = 'alpha_{i' + str(i+1) + '}' ax.set_xlabel(fr'$\{label_}$', fontsize = fontsize) ax.yaxis.get_label().set_fontsize(fontsize) # fig.suptitle('Plot of fitted GMMs') fig.tight_layout()
[docs] def fuzzy_membership_matrix(self) -> np.ndarray: """ Construct the cluster membership matrices from GMM fits. """ membership_matrix = np.array([gmm.predict_proba(proj_coeffs.reshape(-1, 1)) for proj_coeffs, gmm in zip(self.projs_coeffs, self.gmms)]) return membership_matrix
[docs] def binary_membership_matrix(self) -> np.ndarray: """ Construct a binary membership indicator matrices from the cluster membership matrice. """ membership_matrix = self.fuzzy_membership_matrix() binary_matrix = np.zeros_like(membership_matrix) max_indices = np.argmax(membership_matrix, axis=2) x_indices, y_indices = np.meshgrid(np.arange(membership_matrix.shape[0]), np.arange(membership_matrix.shape[1]), indexing='ij') binary_matrix[x_indices, y_indices, max_indices] = 1 return binary_matrix
[docs] def get_omega_prob(self, dist_a, dist_b) -> float: '''Construct misclassification probability omega_{b|a} for univariate GMMs Parameters ---------- dist_* : array-like of shape (3,) THe parameters of the mixture component *: [weight, mean, variance] ''' coeff_a = 1/dist_b[2] - 1/dist_a[2] coeff_b = 2 * (dist_a[1] / dist_a[2] - dist_b[1] / dist_b[2]) coeff_c = dist_b[1] ** 2 / dist_b[2] - dist_a[1] ** 2 / dist_a[2] - math.log((dist_b[0] / dist_a[0]) ** 2 * dist_a[2] / dist_b[2]) #for quadratic inequality if coeff_a != 0: #for real roots if coeff_b ** 2 - (4 * coeff_a * coeff_c) >= 0: #compute zeros zeros = [((-1 * coeff_b) + math.sqrt(coeff_b ** 2 - (4 * coeff_a * coeff_c))) / (2 * coeff_a), ((-1 * coeff_b) - math.sqrt(coeff_b ** 2 - (4 * coeff_a * coeff_c))) / (2 * coeff_a)] #normalize zeros with parameters of distribution a norm_zeros = (np.array(zeros) - dist_a[1]) / math.sqrt(dist_a[2]) left_ = np.min(norm_zeros) right_ = np.max(norm_zeros) #compute misclassification probability if 2 * coeff_a > 0: prob = norm.cdf(right_) - norm.cdf(left_) elif 2 * coeff_a < 0: prob = 1 + norm.cdf(left_) - norm.cdf(right_) #for complex roots else: if 2 * coeff_a > 0: prob = 0.0 else: prob = 1.0 #for linear inequality elif coeff_a == 0 and coeff_b != 0: zero = -1 * coeff_c / coeff_b norm_zero = (zero - dist_a[1]) / math.sqrt(dist_a[2]) if coeff_b > 0: prob = norm.cdf(norm_zero) elif coeff_b < 0: prob = 1 - norm.cdf(norm_zero) #for a constant elif coeff_a == 0 and coeff_b == 0: #if equal means and variances if coeff_c <= 0: prob = 1.0 elif coeff_c > 0: prob = 0.0 return prob
[docs] def get_omega_map(self, weights, means, vars) -> np.ndarray: '''Construct matrix of misclassification probabilities''' omega_array = np.zeros((self.n_clusters, self.n_clusters)) for i in range(self.n_clusters): omega_array[i] = np.array([self.get_omega_prob([weights[i], means[i], vars[i]], [weights[j], means[j], vars[j]]) for j in range(self.n_clusters)]) return omega_array
[docs] def get_total_omega(self, weights, means, vars, weighted_sum) -> float: '''Compute total misclassification probability for univariate GMM''' #sum of misclassification probabilities within GMM omega_map = self.get_omega_map(weights, means, vars) omega_sum = omega_map.sum(axis = 1) - np.diag(omega_map) #return the total probability of misclassification if weighted_sum: return np.array(weights) @ omega_sum else: return omega_sum.sum()
[docs] def get_clustering_weights(self, weighted_sum, precompute_gmms = None) -> np.ndarray: '''Compute weights for base clusterings''' if precompute_gmms: self.gmms = precompute_gmms if len(self.gmms) == 1: self.clustering_weights_ = np.array([1]) else: #get params from fitted gmms weights_ = [gmm.weights_ for gmm in self.gmms] means_ = [gmm.means_.ravel() for gmm in self.gmms] vars_ = [gmm.covariances_ for gmm in self.gmms] #compute total probability of misclassification for the fitted gmms total_omega_ = np.array([self.get_total_omega(weights_[i], means_[i], vars_[i], weighted_sum) for i in range(len(self.gmms))]) #catch zero omegas to deal with potential math errors as a result of undefine computations omega_zero = np.argwhere(total_omega_ < 1e-20) # #adjust total misclassification probability # if len(omega_zero) == 0: # total_omega_inv = 1 / np.array(total_omega_) # elif len(omega_zero) == 1: # warnings.warn('The choice of projection basis might result in unstable clusterings for the dataset. Try other kind of projection basis.') # total_omega_inv = np.insert(1 / np.delete(total_omega_, int(omega_zero)), int(omega_zero), 0) # else: # warnings.warn('The choice of projection basis might result in unstable clusterings for the dataset. Try other kind of projection basis.') # total_omega_inv = np.ones(len(self.gmms)) total_omega_[omega_zero] = 1e-30 total_omega_inv = 1 / np.array(total_omega_) #compute clustering weights self.clustering_weights_ = total_omega_inv / total_omega_inv.sum() return self.clustering_weights_
[docs] def get_affinity_matrix(self, weighted_sum: bool, precompute_gmms = None) -> np.ndarray: ''' Construct affinity matrix using binary membership matrices and clustering weights''' if precompute_gmms: self.gmms = precompute_gmms bm_matrix = self.binary_membership_matrix() clustering_weights = self.get_clustering_weights(weighted_sum, self.gmms) affinity_matrix = np.zeros((self.data_size, self.data_size)) for i in range(len(self.gmms)): affinity_matrix += clustering_weights[i] * np.matmul(bm_matrix[i], bm_matrix[i].T) return affinity_matrix
[docs] def get_clustering(self, weighted_sum: bool = True, precompute_gmms = None, **kwargs) -> np.ndarray: '''Obtain the consensus clustering via Spectral clustering of Affinity matrix''' if precompute_gmms: self.gmms = precompute_gmms clustering = SpectralClustering(n_clusters = self.n_clusters, affinity='precomputed', assign_labels='discretize', **kwargs) clustering.fit(self.get_affinity_matrix(weighted_sum, self.gmms)) self.labels_ = clustering.labels_ return self.labels_
[docs] def plot_clustering(self, fdata: skfda.FDataGrid): fdata.plot(group = self.labels_)
[docs] def adjusted_mutual_info_score(self, true_labels) -> float: return ami(self.labels_, true_labels)
[docs] def adjusted_rand_score(self, true_labels) -> float: return ari(self.labels_, true_labels)
[docs] def correct_classification_accuracy(self, true_labels) -> float: true_classes = np.unique(true_labels) assert self.n_clusters == len(true_classes), f"number of clusters {self.n_clusters} do not match number of true clusters {len(true_classes)}." pred_classes = np.arange(self.n_clusters) pred_perm = np.zeros_like(self.labels_) cca_list = [] #permute the pred labels and compute classification accuracy for each for perm in permutations(pred_classes): for i,j in enumerate(perm): pred_perm[self.labels_ == j] = true_classes[i] #compute CCA cca_list.append(accuracy_score(true_labels, pred_perm)) max_ind = np.argmax(cca_list) self.max_cca_labels_ = list(permutations(pred_classes))[max_ind] #return max classification score return cca_list[max_ind]
[docs] def silhouette_score(self, fdata: skfda.FDataGrid): return silhouette_score(fdata, self.labels_)
[docs] def davies_bouldin_score(self, fdata: skfda.FDataGrid): return davies_bouldin_score(fdata, self.labels_)