Source code for OCDocker.OCScore.Scoring

#!/usr/bin/env python3

# Description
###############################################################################
'''
Set of functions to manage scoring and prediction in OCDocker in the context of
scoring functions.

Usage:

import OCDocker.OCScore.Scoring as ocscoring
'''

# Imports
###############################################################################

import os

import numpy as np
import pandas as pd

from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from typing import Any, Dict, Iterator, Optional, Union, cast

import OCDocker.Error as ocerror
import OCDocker.OCScore.Utils.Data as ocscoredata
import OCDocker.OCScore.Utils.IO as ocscoreio

# License
###############################################################################
'''
OCDocker
Authors: Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M.
Federal University of Rio de Janeiro
Carlos Chagas Filho Institute of Biophysics
Laboratory for Molecular Modeling and Dynamics

This program is proprietary software owned by the Federal University of Rio de Janeiro (UFRJ),
developed by Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M., and protected under Brazilian Law No. 9,609/1998.
All rights reserved. Use, reproduction, modification, and distribution are allowed under this UFRJ license,
provided this copyright notice is preserved. See the LICENSE file for details.

Contact: Artur Duque Rossi - arturossi10@gmail.com
'''

# Classes
###############################################################################

# Functions
###############################################################################
## Private ##

def _iter_query_rows(query: Any, batch_size: int = 1000) -> Iterator[Any]:
    '''Iterate query rows using streaming when available.

    Parameters
    ----------
    query : Any
        SQLAlchemy query-like object.
    batch_size : int, optional
        Preferred streaming batch size for query backends that support it.

    Returns
    -------
    Iterator[Any]
        Iterator over query rows.
    '''

    streamed = query
    if hasattr(streamed, "yield_per"):
        try:
            streamed = streamed.yield_per(batch_size)
        except Exception:
            streamed = query

    yielded_any = False
    try:
        for row in streamed:
            yielded_any = True
            yield row
        return
    except Exception:
        if yielded_any:
            raise

    if streamed is not query:
        try:
            for row in query:
                yield row
            return
        except Exception:
            pass

    all_method = getattr(query, "all", None)
    if callable(all_method):
        for row in all_method():
            yield row


def _apply_eager_relationship_loading(query: Any, model: Any) -> Any:
    '''Apply eager relationship loading to reduce N+1 queries when supported.

    Parameters
    ----------
    query : Any
        SQLAlchemy query-like object.
    model : Any
        ORM model class that may expose ``ligand`` and ``receptor`` relationships.

    Returns
    -------
    Any
        Query with eager loading options when possible, or the original query.
    '''

    if not hasattr(query, "options"):
        return query

    try:
        from sqlalchemy.orm import joinedload
    except Exception:
        return query

    eager_options = []
    for rel_name in ("ligand", "receptor"):
        rel_attr = getattr(model, rel_name, None)
        if rel_attr is None:
            continue
        try:
            eager_options.append(joinedload(rel_attr))
        except Exception:
            continue

    if not eager_options:
        return query

    try:
        return query.options(*eager_options)
    except Exception:
        return query


def _build_dataframe_from_complexes_query(
    query: Any,
    descriptors: list[str],
    batch_size: int = 1000,
) -> pd.DataFrame:
    '''Build a DataFrame from a complexes query using chunked row collection.

    Parameters
    ----------
    query : Any
        Query object returning Complexes ORM rows.
    descriptors : list[str]
        Descriptor names to collect from each complex row.
    batch_size : int, optional
        Number of rows to collect before creating each DataFrame chunk.

    Returns
    -------
    pd.DataFrame
        Aggregated DataFrame created from chunked query iteration.
    '''

    chunks: list[pd.DataFrame] = []
    rows: list[Dict[str, Any]] = []

    for complex_obj in _iter_query_rows(query, batch_size=batch_size):
        row: Dict[str, Any] = {}
        for desc in descriptors:
            desc_attr = desc.lower()
            if hasattr(complex_obj, desc_attr):
                value = getattr(complex_obj, desc_attr)
                if value is not None:
                    row[desc] = value

        if hasattr(complex_obj, 'ligand') and complex_obj.ligand and hasattr(complex_obj.ligand, 'name'):
            row['ligand'] = complex_obj.ligand.name
        if hasattr(complex_obj, 'receptor') and complex_obj.receptor and hasattr(complex_obj.receptor, 'name'):
            row['receptor'] = complex_obj.receptor.name
        if 'db' not in row:
            row['db'] = 'UNKNOWN'

        rows.append(row)
        if len(rows) >= batch_size:
            chunks.append(pd.DataFrame(rows))
            rows = []

    if rows:
        chunks.append(pd.DataFrame(rows))

    if not chunks:
        return pd.DataFrame()
    if len(chunks) == 1:
        return chunks[0]
    return pd.concat(chunks, ignore_index=True, sort=False)

## Public ##

[docs] def get_score( model_path: str, data: Optional[Union[pd.DataFrame, str]] = None, pca_model: Optional[Union[str, PCA]] = None, mask: Optional[Union[list, np.ndarray]] = None, score_columns_list: list[str] = ["SMINA", "VINA", "ODDT", "PLANTS"], scaler: Optional[str] = "standard", scaler_path: Optional[str] = None, invert_conditionally: bool = True, normalize: bool = True, no_scores: bool = False, only_scores: bool = False, columns_to_skip_pca: Optional[list[str]] = None, serialization_method: str = "auto", use_gpu: bool = True, enforce_reference_column_order: bool = True ) -> Union[pd.DataFrame, np.ndarray]: ''' Get scores by loading a model and applying the same preprocessing pipeline. This function loads a trained model and applies it to input data following the same preprocessing pipeline used during training. The data can be provided as a DataFrame or read from a database. Parameters ---------- model_path : str Path to the saved model file. data : pd.DataFrame | str, optional Input data. Can be: - A pandas DataFrame with the features - A string path to a CSV file - None to read from database (requires DB setup) Default is None. pca_model : str | PCA, optional Path to the PCA model file or a PCA model object. If provided, PCA transformation will be applied. If None, no PCA is used. Default is None. mask : list | np.ndarray, optional Feature mask array of 0s and 1s to filter features before prediction. Length should match the number of features after preprocessing. 1 means keep the feature, 0 means remove it. Default is None (no masking applied). score_columns_list : list[str], optional List of score column prefixes to identify score columns. Default is ["SMINA", "VINA", "ODDT", "PLANTS"]. scaler : str, optional Scaler to use for normalization if scaler_path is not provided. Options are "standard" or "minmax". If scaler_path is provided, this is ignored. Default is "standard". scaler_path : str, optional Path to a saved scaler file (saved with joblib/pickle). If provided, the saved scaler will be loaded and used instead of creating a new one. This ensures the same scaling parameters from training are applied. Default is None. invert_conditionally : bool, optional Whether to invert values conditionally (for VINA, SMINA, PLANTS columns). Default is True. normalize : bool, optional Whether to normalize the data. Default is True. no_scores : bool, optional If True, remove score columns from the data. Default is False. only_scores : bool, optional If True, keep only score columns and metadata. Default is False. columns_to_skip_pca : list[str], optional List of columns to skip during PCA transformation. If None, defaults to metadata columns: ["receptor", "ligand", "name", "type", "db"]. Default is None. serialization_method : str, optional Serialization method used to save the model. Options are "joblib" or "pickle". Default is "joblib". enforce_reference_column_order : bool, optional If True, reorder columns to match the `reference_column_order` from the config file before any preprocessing (scaling/PCA/masking). This is critical to keep feature alignment consistent with training. Default is True. Returns ------- pd.DataFrame | np.ndarray Predicted scores. Returns a DataFrame if input was a DataFrame (preserving metadata columns), otherwise returns a numpy array. Raises ------ FileNotFoundError If the model file or PCA model file is not found. ValueError If data is None and database is not available, or if invalid parameters are provided. ''' # Check if model file exists if not os.path.isfile(model_path): ocerror.Error.file_not_exist(f"Model file not found: {model_path}") raise FileNotFoundError(f"Model file not found: {model_path}") # Load the model - IO module now handles format detection automatically loaded_obj = ocscoreio.load_object(model_path, serialization_method="auto", trusted=True) # Handle different model formats # If loaded object is a dict, it might be a state_dict or a dict containing the model if isinstance(loaded_obj, dict): # Check if it's a state_dict (PyTorch model weights) if 'state_dict' in loaded_obj or any(key.startswith(('layer', 'fc', 'linear', 'encoder', 'decoder')) for key in loaded_obj.keys()): # This is likely a state_dict, but we need the model class to load it # For now, raise an error asking for the model object ocerror.Error.value_error("Model file contains a state_dict (weights only), not a complete model. Please load the model class first, then load_state_dict().") raise ValueError("Model file contains a state_dict (weights only), not a complete model. Please load the model class first, then load_state_dict().") elif 'model' in loaded_obj: # Dict contains the model under 'model' key model = loaded_obj['model'] elif 'network' in loaded_obj: # Dict contains the model under 'network' key model = loaded_obj['network'] else: # Try to find any value that looks like a model object for key, value in loaded_obj.items(): if hasattr(value, 'predict') or hasattr(value, 'forward'): model = value break else: ocerror.Error.value_error(f"Model file contains a dict but no model object found. Keys: {list(loaded_obj.keys())}") raise ValueError(f"Model file contains a dict but no model object found. Keys: {list(loaded_obj.keys())}") else: # Loaded object is the model itself model = loaded_obj # Set PyTorch models to eval mode for inference if hasattr(model, 'eval'): model.eval() # Determine device for PyTorch models import torch if use_gpu and torch.cuda.is_available(): device: torch.device = torch.device('cuda') else: device = torch.device('cpu') # Move PyTorch model to the correct device if hasattr(model, 'to'): model = model.to(device) # Also update model.device if it exists if hasattr(model, 'device'): model.device = device # Fix mask attribute if it's stored as dict/list instead of tensor # This can happen when models are saved/loaded def fix_mask_attribute(obj, device=None): """Recursively fix mask attributes in model and nested modules.""" if hasattr(obj, 'mask'): if obj.mask is None: obj.mask = [] elif isinstance(obj.mask, dict): # Extract array from dict mask_value = None for key in ['mask', 'array', 'feature_mask', 'ablation_mask']: if key in obj.mask: mask_value = obj.mask[key] break if mask_value is None: for v in obj.mask.values(): if isinstance(v, (list, np.ndarray, torch.Tensor)): mask_value = v break if mask_value is None: obj.mask = [] elif isinstance(mask_value, torch.Tensor): obj.mask = mask_value.float() elif isinstance(mask_value, np.ndarray): obj.mask = torch.from_numpy(mask_value).float() elif isinstance(mask_value, list): obj.mask = torch.tensor(mask_value, dtype=torch.float32) if mask_value else [] else: obj.mask = [] # Move to device if isinstance(obj.mask, torch.Tensor): target_device = device if device else (obj.device if hasattr(obj, 'device') else torch.device('cpu')) obj.mask = obj.mask.to(target_device) elif isinstance(obj.mask, (list, np.ndarray)) and not isinstance(obj.mask, torch.Tensor): # Convert list/array to tensor if isinstance(obj.mask, np.ndarray): obj.mask = torch.from_numpy(obj.mask).float() elif isinstance(obj.mask, list): obj.mask = torch.tensor(obj.mask, dtype=torch.float32) if obj.mask else [] # Move to device if isinstance(obj.mask, torch.Tensor): target_device = device if device else (obj.device if hasattr(obj, 'device') else torch.device('cpu')) obj.mask = obj.mask.to(target_device) # Also check nested modules if hasattr(obj, 'modules'): for module in obj.modules(): if module is not obj: # Avoid infinite recursion fix_mask_attribute(module, device) # Fix mask in the model and all nested modules model_device = device if hasattr(model, 'device') and isinstance(model.device, torch.device): model_device = model.device fix_mask_attribute(model, model_device) # Load or prepare the data if data is None: # Try to read from database try: import OCDocker.Initialise as init from OCDocker.DB.Models.Complexes import Complexes # Check if session is available if not hasattr(init, 'session') or init.session is None: ocerror.Error.session_not_created("Database session not available. Please provide data or initialize the database.") raise ValueError("Database session not available. Please provide data or initialize the database.") # Read from database with init.session() as s: query = s.query(Complexes) query = _apply_eager_relationship_loading(query, Complexes) df_db = _build_dataframe_from_complexes_query( query, list(getattr(Complexes, "allDescriptors", [])), batch_size=1000, ) if df_db.empty: ocerror.Error.data_not_found("No data found in database.") raise ValueError("No data found in database.") data = df_db except (ImportError, AttributeError) as e: ocerror.Error.data_not_found(f"Failed to read from database: {e}. Please provide data as DataFrame or file path.") raise ValueError(f"Failed to read from database: {e}. Please provide data as DataFrame or file path.") # If data is a string, treat it as a file path if isinstance(data, str): if not os.path.isfile(data): ocerror.Error.file_not_exist(f"Data file not found: {data}") raise FileNotFoundError(f"Data file not found: {data}") loaded = ocscoreio.load_data(data) if not isinstance(loaded, pd.DataFrame): ocerror.Error.value_error("Loaded data is not a pandas DataFrame.") raise ValueError("Loaded data is not a pandas DataFrame.") data = loaded # Ensure data is a DataFrame if not isinstance(data, pd.DataFrame): ocerror.Error.value_error("Data must be a pandas DataFrame or a path to a CSV file.") raise ValueError("Data must be a pandas DataFrame or a path to a CSV file.") df = cast(pd.DataFrame, data) # Enforce column order from config before any preprocessing (scaler/PCA/mask) if enforce_reference_column_order: try: from OCDocker.Config import get_config config = get_config() if config.paths.reference_column_order: df = ocscoredata.reorder_columns_to_match_data_order( df, data_source=None, keep_extra_columns=False, fill_missing_columns=False ) else: ocerror.Error.value_error("reference_column_order not set in config file. Cannot enforce column order.") raise ValueError("reference_column_order not set in config file. Cannot enforce column order.") except Exception as e: ocerror.Error.value_error(f"Failed to enforce reference column order: {e}") raise # Store only metadata needed for return format to avoid cloning all feature columns. original_metadata_cols = ["receptor", "ligand", "name", "type", "db", "experimental"] available_original_metadata_cols = [col for col in original_metadata_cols if col in df.columns] original_data = df[available_original_metadata_cols].copy() if available_original_metadata_cols else pd.DataFrame(index=df.index) is_dataframe = True # Identify score columns if score_columns_list: score_columns = df.filter(regex=f"^({'|'.join(score_columns_list)})").columns.to_list() else: score_columns = [] # Apply preprocessing pipeline (similar to preprocess_df) # Handle score columns if no_scores: # Remove score columns if score_columns: df = df.drop(columns=score_columns, errors='ignore') elif only_scores: # Keep only score columns and metadata metadata_cols = ["receptor", "ligand", "name", "type", "db"] columns_to_keep = [col for col in metadata_cols if col in df.columns] + score_columns df = ocscoredata.remove_other_columns(df, columns_to_keep, inplace=False) # Invert values conditionally if invert_conditionally: df = ocscoredata.invert_values_conditionally(df, inplace=False) # Normalize data if normalize: # Try to load scaler from file if scaler_path is provided scaler_obj = None if scaler_path is not None: if not os.path.isfile(scaler_path): ocerror.Error.file_not_exist(f"Scaler file not found: {scaler_path}") raise FileNotFoundError(f"Scaler file not found: {scaler_path}") try: scaler_obj = ocscoreio.load_object(scaler_path, serialization_method="auto", trusted=True) # Verify it's a scaler object if not isinstance(scaler_obj, (StandardScaler, MinMaxScaler)): ocerror.Error.value_error(f"File {scaler_path} does not contain a valid scaler object.") raise ValueError(f"File {scaler_path} does not contain a valid scaler object. Got {type(scaler_obj)}") except Exception as e: ocerror.Error.value_error(f"Failed to load scaler from {scaler_path}: {e}") raise ValueError(f"Failed to load scaler from {scaler_path}: {e}") # Use the loaded scaler or the scaler string if scaler_obj is not None: df = ocscoredata.norm_data(df, scaler=scaler_obj, inplace=False) else: # Create a new scaler (this will fit on the prediction data - not ideal but backward compatible) result = ocscoredata.norm_data(df, scaler=scaler, inplace=False) # Handle tuple return (DataFrame, scaler) when fitting new scaler if isinstance(result, tuple): df = result[0] else: df = result # Apply PCA if pca_model is provided if pca_model is not None: # Set default columns to skip PCA if columns_to_skip_pca is None: columns_to_skip_pca = ["receptor", "ligand", "name", "type", "db"] if score_columns: columns_to_skip_pca.extend(score_columns) # Apply PCA df = ocscoredata.apply_pca( df, pca_model, columns_to_skip_pca=columns_to_skip_pca, inplace=False ) # Prepare features for prediction (exclude metadata columns) metadata_cols = ["receptor", "ligand", "name", "type", "db", "experimental"] feature_cols = [col for col in df.columns if col not in metadata_cols] X = df[feature_cols].values # Apply mask if provided # NOTE: If the model has its own mask (e.g., PyTorch DynamicNN), we should NOT apply # the external mask here, as the model will apply its own mask in the forward pass. # The external mask parameter is for models that don't have built-in masking. model_has_mask = hasattr(model, 'mask') and model.mask is not None and len(model.mask) > 0 if mask is not None and not model_has_mask: # Only apply external mask if model doesn't have its own mask # Convert mask to numpy array if it's a list mask = np.asarray(mask, dtype=bool) # Validate mask length if len(mask) != X.shape[1]: ocerror.Error.value_error(f"Mask length ({len(mask)}) does not match number of features ({X.shape[1]}).") raise ValueError(f"Mask length ({len(mask)}) does not match number of features ({X.shape[1]}).") # Apply mask to filter features X = X[:, mask] # Make predictions try: # Try to use predict method (for sklearn, xgboost, etc.) if hasattr(model, 'predict'): predictions = model.predict(X) # Try to use forward method (for PyTorch models) elif hasattr(model, 'forward'): import torch model.eval() # Ensure mask is properly formatted before forward pass # This is critical - the mask must be a tensor, not a dict/list # Also ensure it's on the same device as the model if hasattr(model, 'mask'): if model.mask is None: # Set to empty list if None (DynamicNN expects list or tensor) model.mask = [] elif not isinstance(model.mask, torch.Tensor): # Convert mask to tensor if it's not already if isinstance(model.mask, dict): # Extract from dict - try common keys first mask_value = None for key in ['mask', 'array', 'feature_mask', 'ablation_mask']: if key in model.mask: mask_value = model.mask[key] break # If not found, try to find first array-like value if mask_value is None: for v in model.mask.values(): if isinstance(v, (list, np.ndarray, torch.Tensor)): mask_value = v break # If still None, try first value if mask_value is None and model.mask: first_val = list(model.mask.values())[0] if isinstance(first_val, (list, np.ndarray, torch.Tensor)): mask_value = first_val else: mask_value = [] elif isinstance(model.mask, (list, np.ndarray)): mask_value = model.mask else: mask_value = [] # Convert to tensor if isinstance(mask_value, torch.Tensor): model.mask = mask_value.float() elif isinstance(mask_value, np.ndarray): model.mask = torch.from_numpy(mask_value).float() elif isinstance(mask_value, list): if len(mask_value) > 0: model.mask = torch.tensor(mask_value, dtype=torch.float32) else: model.mask = [] else: model.mask = [] # Final safety check: ensure mask is not a dict before forward pass # This prevents the "dict * int" error in DynamicNN.forward() if hasattr(model, 'mask') and isinstance(model.mask, dict): # If still a dict after all conversion attempts, set to empty list # This will prevent the multiplication error model.mask = [] with torch.no_grad(): # Ensure writable backing memory to avoid PyTorch warnings with # read-only NumPy views originating from pandas. X_input = X if isinstance(X_input, np.ndarray) and not X_input.flags.writeable: X_input = np.array(X_input, copy=True) X_tensor = torch.as_tensor(X_input, dtype=torch.float32, device=device) # Ensure mask is on the same device as the input tensor if hasattr(model, 'mask') and isinstance(model.mask, torch.Tensor): model.mask = model.mask.to(device) predictions = model(X_tensor) if isinstance(predictions, torch.Tensor): predictions = predictions.detach().cpu().numpy() # Flatten if needed if isinstance(predictions, np.ndarray) and predictions.ndim > 1 and predictions.shape[1] == 1: predictions = predictions.flatten() else: ocerror.Error.value_error("Model does not have a predict or forward method.") raise ValueError("Model does not have a predict or forward method.") except Exception as e: import traceback error_details = traceback.format_exc() ocerror.Error.value_error(f"Error during prediction: {e}\n{error_details}") raise ValueError(f"Error during prediction: {e}\n{error_details}") # Return results in appropriate format if is_dataframe: # Create result DataFrame with metadata if available # Only include metadata columns that actually exist in the original data available_metadata_cols = [col for col in metadata_cols if col in original_data.columns] if available_metadata_cols: result = original_data[available_metadata_cols].copy() else: result = pd.DataFrame() result['predicted_score'] = predictions return result else: return predictions