Source code for scprep.reduce

from . import utils
from scipy import sparse
from sklearn import decomposition
from sklearn import random_projection

import numpy as np
import pandas as pd
import sklearn.base
import warnings


[docs]class InvertibleRandomProjection(random_projection.GaussianRandomProjection): """Gaussian random projection with an inverse transform using the pseudoinverse.""" def __init__( self, n_components="auto", eps=0.3, orthogonalize=False, random_state=None ): self.orthogonalize = orthogonalize super().__init__(n_components=n_components, eps=eps, random_state=random_state) @property def pseudoinverse(self): """Pseudoinverse of the random projection. This inverts the projection operation for any vector in the span of the random projection. For small enough `eps`, this should be close to the correct inverse. """ try: return self._pseudoinverse except AttributeError: if self.orthogonalize: # orthogonal matrix: inverse is just its transpose self._pseudoinverse = self.components_ else: self._pseudoinverse = np.linalg.pinv(self.components_.T) return self._pseudoinverse
[docs] def fit(self, X): super().fit(X) if self.orthogonalize: Q, _ = np.linalg.qr(self.components_.T) self.components_ = Q.T return self
def inverse_transform(self, X): return X.dot(self.pseudoinverse)
[docs]class AutomaticDimensionSVD(decomposition.TruncatedSVD): """Truncated SVD with automatic dimensionality selected by Johnson-Lindenstrauss.""" def __init__( self, n_components="auto", eps=0.3, algorithm="randomized", n_iter=5, random_state=None, tol=0.0, ): self.eps = eps if n_components == "auto": # just pass through -1 - we will change it later n_components = -1 super().__init__( n_components=n_components, algorithm=algorithm, n_iter=n_iter, random_state=random_state, tol=tol, )
[docs] def fit(self, X): if self.n_components == -1: super().set_params( n_components=random_projection.johnson_lindenstrauss_min_dim( n_samples=X.shape[0], eps=self.eps ) ) try: return super().fit(X) except ValueError: if self.n_components >= X.shape[1]: raise RuntimeError( "eps={} and n_samples={} lead to a target " "dimension of {} which is larger than the " "original space with n_features={}".format( self.eps, X.shape[0], self.n_components, X.shape[1] ) ) else: raise
[docs]class SparseInputPCA(sklearn.base.BaseEstimator): """Calculate PCA using random projections to handle sparse matrices. Uses the Johnson-Lindenstrauss Lemma to determine the number of dimensions of random projections prior to subtracting the mean. Parameters ---------- n_components : int, optional (default: 2) Number of components to keep. eps : strictly positive float, optional (default=0.15) Parameter to control the quality of the embedding according to the Johnson-Lindenstrauss lemma when n_components is set to ‘auto’. Smaller values lead to more accurate embeddings but higher computational and memory costs method : {'svd', 'orth_rproj', 'rproj'}, optional (default: 'svd') Dimensionality reduction method applied prior to mean centering. The method choice affects accuracy (`svd` > `orth_rproj` > `rproj`) comes with increased computational cost (but not memory.) random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by np.random. kwargs Additional keyword arguments for `sklearn.decomposition.PCA` """ def __init__( self, n_components=2, eps=0.3, random_state=None, method="svd", **kwargs ): self.pca_op = decomposition.PCA( n_components=n_components, random_state=random_state ) if method == "svd": self.proj_op = AutomaticDimensionSVD(eps=eps, random_state=random_state) elif method == "orth_rproj": self.proj_op = InvertibleRandomProjection( eps=eps, random_state=random_state, orthogonalize=True ) elif method == "rproj": self.proj_op = InvertibleRandomProjection( eps=eps, random_state=random_state, orthogonalize=False ) else: raise ValueError( "Expected `method` in ['svd', 'orth_rproj', 'rproj']. " "Got '{}'".format(method) ) @property def singular_values_(self): """Singular values of the PCA decomposition.""" return self.pca_op.singular_values_ @property def explained_variance_(self): """The amount of variance explained by each of the selected components.""" return self.pca_op.explained_variance_ @property def explained_variance_ratio_(self): """Percentage of variance explained by each of the selected components. The sum of the ratios is equal to 1.0. If n_components is `None` then the number of components grows as`eps` gets smaller. """ return self.pca_op.explained_variance_ratio_ @property def components_(self): """Principal axes in feature space, representing directions of maximum variance. The components are sorted by explained variance. """ return self.proj_op.inverse_transform(self.pca_op.components_) def _fit(self, X): self.proj_op.fit(X) X_proj = self.proj_op.transform(X) self.pca_op.fit(X_proj) return X_proj
[docs] def fit(self, X): """Fit the model with X. Parameters ---------- X : array-like, shape=(n_samples, n_features) """ self._fit(X) return self
[docs] def transform(self, X): """Apply dimensionality reduction to X. Parameters ---------- X : array-like, shape=(n_samples, n_features) Returns ------- X_new : array-like, shape=(n_samples, n_components) """ X_proj = self.proj_op.transform(X) X_pca = self.pca_op.transform(X_proj) return X_pca
[docs] def fit_transform(self, X): """Fit the model with X and apply the dimensionality reduction on X. Parameters ---------- X : array-like, shape=(n_samples, n_features) Returns ------- X_new : array-like, shape=(n_samples, n_components) """ X_proj = self._fit(X) X_pca = self.pca_op.transform(X_proj) return X_pca
[docs] def inverse_transform(self, X): """Transform data back to its original space. Parameters ---------- X : array-like, shape=(n_samples, n_components) Returns ------- X_new : array-like, shape=(n_samples, n_features) """ X_proj = self.pca_op.inverse_transform(X) X_ambient = self.proj_op.inverse_transform(X_proj) return X_ambient
[docs]def pca( data, n_components=100, eps=0.3, method="svd", seed=None, return_singular_values=False, n_pca=None, svd_offset=None, svd_multiples=None, ): """Calculate PCA using random projections to handle sparse matrices. Uses the Johnson-Lindenstrauss Lemma to determine the number of dimensions of random projections prior to subtracting the mean. Dense matrices are provided to `sklearn.decomposition.PCA` directly. Parameters ---------- data : array-like, shape=[n_samples, n_features] Input data n_components : int, optional (default: 100) Number of PCs to compute eps : strictly positive float, optional (default=0.3) Parameter to control the quality of the embedding of sparse input. Smaller values lead to more accurate embeddings but higher computational and memory costs method : {'svd', 'orth_rproj', 'rproj', 'dense'}, optional (default: 'svd') Dimensionality reduction method applied prior to mean centering of sparse input. The method choice affects accuracy (`svd` > `orth_rproj` > `rproj`) and comes with increased computational cost (but not memory.) On the other hand, `method='dense'` adds a memory cost but is faster. seed : int, RandomState or None, optional (default: None) Random state. return_singular_values : bool, optional (default: False) If True, also return the singular values n_pca : Deprecated. svd_offset : Deprecated. svd_multiples :Deprecated. Returns ------- data_pca : array-like, shape=[n_samples, n_components] PCA reduction of `data` singular_values : list-like, shape=[n_components] Singular values corresponding to principal components returned only if return_values is True """ if n_pca is not None: warnings.warn( "n_pca is deprecated. Setting n_components={}.".format(n_pca), FutureWarning ) n_components = n_pca if svd_offset is not None: warnings.warn( "svd_offset is deprecated. Please use `eps` instead.", FutureWarning ) if svd_multiples is not None: warnings.warn( "svd_multiples is deprecated. Please use `eps` instead.", FutureWarning ) if not 0 < n_components <= min(data.shape): raise ValueError( "n_components={} must be between 0 and " "min(n_samples, n_features)={}".format(n_components, min(data.shape)) ) # handle dataframes if isinstance(data, pd.DataFrame): index = data.index else: index = None if method == "dense": data = utils.toarray(data) else: data = utils.to_array_or_spmatrix(data) # handle sparsity if sparse.issparse(data): try: pca_op = SparseInputPCA( n_components=n_components, eps=eps, method=method, random_state=seed ) data = pca_op.fit_transform(data) except RuntimeError as e: if "which is larger than the original space" in str(e): # eps too small - the best we can do is make the data dense return pca( utils.toarray(data), n_components=n_components, seed=seed, return_singular_values=return_singular_values, ) else: pca_op = decomposition.PCA(n_components, random_state=seed) data = pca_op.fit_transform(data) if index is not None: data = pd.DataFrame( data, index=index, columns=["PC{}".format(i + 1) for i in range(n_components)], ) if return_singular_values: data = (data, pca_op.singular_values_) return data