Source code for OCDocker.OCScore.Dimensionality.future.utils

#!/usr/bin/env python3

# Description
###############################################################################
'''Utilities for the future Autoencoder pipeline.'''

# Imports
###############################################################################

from __future__ import annotations

import torch

import numpy as np

from typing import Dict

# License
###############################################################################
'''
OCDocker
Authors: Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M.
Federal University of Rio de Janeiro
Carlos Chagas Filho Institute of Biophysics
Laboratory for Molecular Modeling and Dynamics

This program is proprietary software owned by the Federal University of Rio de Janeiro (UFRJ),
developed by Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M., and protected under Brazilian Law No. 9,609/1998.
All rights reserved. Use, reproduction, modification, and distribution are allowed under this UFRJ license,
provided this copyright notice is preserved. See the LICENSE file for details.

Contact: Artur Duque Rossi - arturossi10@gmail.com
'''

# Classes
###############################################################################


# Functions
###############################################################################
## Private ##

def _rankdata(values: np.ndarray) -> np.ndarray:
    '''Compute rank data for Spearman correlation.

    Parameters
    ----------
    values : np.ndarray
        Input array.

    Returns
    -------
    np.ndarray
        Rank-transformed array.
    '''

    order = np.argsort(values)
    # Ranks are zero-based; ties are not specially handled (consistent ordering).
    ranks = np.empty_like(order, dtype=float)
    ranks[order] = np.arange(len(values), dtype=float)
    return ranks


## Public ##

[docs] def apply_noise( inputs: torch.Tensor, noise_type: str = "none", mask_prob: float = 0.0, gaussian_std: float = 0.0, swap_prob: float = 0.0 ) -> torch.Tensor: '''Apply input noise for denoising autoencoder training. Parameters ---------- inputs : torch.Tensor Input batch tensor. noise_type : str, optional Noise strategy string (e.g., "mask", "gaussian", "swap", combos), by default "none". mask_prob : float, optional Feature masking probability, by default 0.0. gaussian_std : float, optional Gaussian noise standard deviation, by default 0.0. swap_prob : float, optional Swap noise probability, by default 0.0. Returns ------- torch.Tensor Noised input tensor. ''' if noise_type == "none": return inputs x = inputs if "gaussian" in noise_type and gaussian_std > 0.0: # Additive noise perturbs continuous inputs. x = x + torch.randn_like(x) * gaussian_std if "mask" in noise_type and mask_prob > 0.0: # Feature masking simulates missing data and encourages robustness. mask = torch.rand_like(x) < mask_prob x = x.masked_fill(mask, 0.0) if "swap" in noise_type and swap_prob > 0.0: batch_size = x.shape[0] if batch_size > 1: perm = torch.randperm(batch_size, device=x.device) swap_mask = torch.rand_like(x) < swap_prob swapped = x[perm] # Swap features across samples to encourage robustness. x = torch.where(swap_mask, swapped, x) return x
[docs] def embedding_stats(embeddings: np.ndarray, collapse_threshold: float = 1e-6) -> Dict[str, object]: '''Compute basic embedding statistics. Parameters ---------- embeddings : np.ndarray Embedding matrix (N, D). collapse_threshold : float, optional Variance threshold to define collapsed dimensions, by default 1e-6. Returns ------- Dict[str, object] Dictionary with variance, collapse rate, and mean norm. ''' if embeddings.size == 0: return { "variance": [], "collapse_rate": 0.0, "mean_norm": 0.0 } var = np.var(embeddings, axis=0) # Collapse rate captures fraction of near-zero variance dimensions. collapse_rate = float(np.mean(var < collapse_threshold)) mean_norm = float(np.mean(np.linalg.norm(embeddings, axis=1))) return { "variance": var.tolist(), "collapse_rate": collapse_rate, "mean_norm": mean_norm }
[docs] def ramp_weight( target: float, epoch: int, ramp_epochs: int, ramp_type: str = "linear" ) -> float: '''Compute ramped weight value for a given epoch. Parameters ---------- target : float Final target weight. epoch : int Current epoch index (0-based). ramp_epochs : int Number of epochs to ramp over. ramp_type : str, optional Ramp schedule type ("linear" or "sigmoid"), by default "linear". Returns ------- float Ramped weight for the given epoch. ''' if ramp_epochs <= 0: return float(target) progress = min(1.0, max(0.0, float(epoch + 1) / float(ramp_epochs))) if ramp_type == "sigmoid": # Sigmoid ramp grows slowly then saturates. return float(target) * float(1.0 / (1.0 + np.exp(-10.0 * (progress - 0.5)))) return float(target) * progress
[docs] def spearman_corr(x: np.ndarray, y: np.ndarray) -> float: '''Compute Spearman correlation (rank-based Pearson). Parameters ---------- x : np.ndarray First input vector. y : np.ndarray Second input vector. Returns ------- float Spearman correlation coefficient. ''' if x.size == 0 or y.size == 0: return 0.0 if x.size != y.size: return 0.0 rx = _rankdata(x) ry = _rankdata(y) rx = rx - rx.mean() ry = ry - ry.mean() denom = np.sqrt(np.sum(rx ** 2) * np.sum(ry ** 2)) + 1e-8 return float(np.sum(rx * ry) / denom)