Source code for spidet.spike_detection.nmf

from typing import Tuple, Dict

import nimfa
import numpy as np
from loguru import logger
from scipy.cluster.hierarchy import linkage, cophenet

from spidet.domain.Nmfsc import Nmfsc


[docs] class Nmf: """ This class hosts operations concerned with nonnegative matrix factorization (NMF). Parameters ---------- rank: int The rank used for the nonnegative matrix factorization sparseness: float, optional, default = 0.0 The sparseness parameter used in case NMF is run with sparseness constraints. """ def __init__(self, rank: int, sparseness: float = 0.0): self.rank = rank self.sparseness = float(sparseness) @staticmethod def __calculate_cophenetic_corr(consensus_matrix: np.ndarray) -> np.ndarray: # Extract the values from the lower triangle of A avec = np.array( [ consensus_matrix[i, j] for i in range(consensus_matrix.shape[0] - 1) for j in range(i + 1, consensus_matrix.shape[1]) ] ) # Consensus entries are similarities, conversion to distances Y = 1 - avec # Hierarchical clustering Z = linkage(Y, method="average") # Cophenetic correlation coefficient of a hierarchical clustering coph = cophenet(Z, Y)[0] return coph
[docs] def nmf_run( self, preprocessed_data: np.ndarray[np.dtype[float]], n_runs: int, ) -> Tuple[ Dict, np.ndarray[np.dtype[float]], np.ndarray[np.dtype[float]], np.ndarray[np.dtype[float]], ]: """ This method performs a number of nonegative matrix factorization runs for the rank defined at initialization of the :py:class:`Nmf` object. At the beginning of each run the :math:`W` and :math:`H` matrices are randomly initialized. The algorithm runs until convergence (i.e., the reconstruction error < 1e-5) or for a maximum of 10 iterations. Parameters ---------- preprocessed_data: numpy.ndarray[numpy.dtype[float]] The data used as input for NMF algorithm. n_runs: int The number of NMF runs. Returns ------- Tuple[Dict, np.ndarray[np.dtype[float]], np.ndarray[np.dtype[float]], np.ndarray[np.dtype[float]]] A dictionary containing the rank, the minimum reconstruction error, the cophenetic correlation and the instability index, together with the consensus, :math:`W` and :math:`H` matrices. """ data_matrix = preprocessed_data consensus = np.zeros((data_matrix.shape[0], data_matrix.shape[0])) obj = np.zeros(n_runs) lowest_obj = float("inf") h_best = None w_best = None if self.sparseness == 0.0: nmf = nimfa.Nmf( data_matrix.T, rank=self.rank, seed="random_vcol", max_iter=10 ) else: nmf = Nmfsc(data_matrix, rank=self.rank, max_iter=10, sW=self.sparseness) for n in range(n_runs): logger.debug( f"Rank {self.rank}, Run {n + 1}/{n_runs}: Perform matrix factorization" ) if self.sparseness != 0.0: fit = nmf() consensus += fit.connectivity() obj[n] = fit.final_obj if obj[n] < lowest_obj: logger.debug( f"Rank {self.rank}, Run {n + 1}/{n_runs}: Update COEFFICIENTS and BASIS FCTs" ) lowest_obj = obj[n] w_best = np.array(fit.basis()) h_best = np.array(fit.coef()) else: fit = nmf() consensus += fit.fit.connectivity() obj[n] = fit.fit.final_obj if obj[n] < lowest_obj: logger.debug( f"Rank {self.rank}, Run {n + 1}/{n_runs}: Update COEFFICIENTS and BASIS FCTs" ) lowest_obj = obj[n] w_best = np.array(fit.fit.coef().T) h_best = np.array(fit.fit.basis().T) consensus /= n_runs coph = self.__calculate_cophenetic_corr(consensus) instability = 1 - coph # Storing metrics metrics = { "Rank": self.rank, "Min Final Obj": lowest_obj, "Cophenetic Correlation": coph, "Instability index": instability, } logger.debug(f"Rank {self.rank}: Finished {n_runs} iterations of NMF") return metrics, consensus, h_best, w_best