#!/usr/bin/env python3
# Description
###############################################################################
'''
Set of functions to manage scoring and prediction in OCDocker in the context of
scoring functions.
Usage:
import OCDocker.OCScore.Scoring as ocscoring
'''
# Imports
###############################################################################
import os
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from typing import Any, Dict, Iterator, Optional, Union, cast
import OCDocker.Error as ocerror
import OCDocker.OCScore.Utils.Data as ocscoredata
import OCDocker.OCScore.Utils.IO as ocscoreio
# License
###############################################################################
'''
OCDocker
Authors: Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M.
Federal University of Rio de Janeiro
Carlos Chagas Filho Institute of Biophysics
Laboratory for Molecular Modeling and Dynamics
This program is proprietary software owned by the Federal University of Rio de Janeiro (UFRJ),
developed by Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M., and protected under Brazilian Law No. 9,609/1998.
All rights reserved. Use, reproduction, modification, and distribution are allowed under this UFRJ license,
provided this copyright notice is preserved. See the LICENSE file for details.
Contact: Artur Duque Rossi - arturossi10@gmail.com
'''
# Classes
###############################################################################
# Functions
###############################################################################
## Private ##
def _iter_query_rows(query: Any, batch_size: int = 1000) -> Iterator[Any]:
'''Iterate query rows using streaming when available.
Parameters
----------
query : Any
SQLAlchemy query-like object.
batch_size : int, optional
Preferred streaming batch size for query backends that support it.
Returns
-------
Iterator[Any]
Iterator over query rows.
'''
streamed = query
if hasattr(streamed, "yield_per"):
try:
streamed = streamed.yield_per(batch_size)
except Exception:
streamed = query
yielded_any = False
try:
for row in streamed:
yielded_any = True
yield row
return
except Exception:
if yielded_any:
raise
if streamed is not query:
try:
for row in query:
yield row
return
except Exception:
pass
all_method = getattr(query, "all", None)
if callable(all_method):
for row in all_method():
yield row
def _apply_eager_relationship_loading(query: Any, model: Any) -> Any:
'''Apply eager relationship loading to reduce N+1 queries when supported.
Parameters
----------
query : Any
SQLAlchemy query-like object.
model : Any
ORM model class that may expose ``ligand`` and ``receptor`` relationships.
Returns
-------
Any
Query with eager loading options when possible, or the original query.
'''
if not hasattr(query, "options"):
return query
try:
from sqlalchemy.orm import joinedload
except Exception:
return query
eager_options = []
for rel_name in ("ligand", "receptor"):
rel_attr = getattr(model, rel_name, None)
if rel_attr is None:
continue
try:
eager_options.append(joinedload(rel_attr))
except Exception:
continue
if not eager_options:
return query
try:
return query.options(*eager_options)
except Exception:
return query
def _build_dataframe_from_complexes_query(
query: Any,
descriptors: list[str],
batch_size: int = 1000,
) -> pd.DataFrame:
'''Build a DataFrame from a complexes query using chunked row collection.
Parameters
----------
query : Any
Query object returning Complexes ORM rows.
descriptors : list[str]
Descriptor names to collect from each complex row.
batch_size : int, optional
Number of rows to collect before creating each DataFrame chunk.
Returns
-------
pd.DataFrame
Aggregated DataFrame created from chunked query iteration.
'''
chunks: list[pd.DataFrame] = []
rows: list[Dict[str, Any]] = []
for complex_obj in _iter_query_rows(query, batch_size=batch_size):
row: Dict[str, Any] = {}
for desc in descriptors:
desc_attr = desc.lower()
if hasattr(complex_obj, desc_attr):
value = getattr(complex_obj, desc_attr)
if value is not None:
row[desc] = value
if hasattr(complex_obj, 'ligand') and complex_obj.ligand and hasattr(complex_obj.ligand, 'name'):
row['ligand'] = complex_obj.ligand.name
if hasattr(complex_obj, 'receptor') and complex_obj.receptor and hasattr(complex_obj.receptor, 'name'):
row['receptor'] = complex_obj.receptor.name
if 'db' not in row:
row['db'] = 'UNKNOWN'
rows.append(row)
if len(rows) >= batch_size:
chunks.append(pd.DataFrame(rows))
rows = []
if rows:
chunks.append(pd.DataFrame(rows))
if not chunks:
return pd.DataFrame()
if len(chunks) == 1:
return chunks[0]
return pd.concat(chunks, ignore_index=True, sort=False)
## Public ##
[docs]
def get_score(
model_path: str,
data: Optional[Union[pd.DataFrame, str]] = None,
pca_model: Optional[Union[str, PCA]] = None,
mask: Optional[Union[list, np.ndarray]] = None,
score_columns_list: list[str] = ["SMINA", "VINA", "ODDT", "PLANTS"],
scaler: Optional[str] = "standard",
scaler_path: Optional[str] = None,
invert_conditionally: bool = True,
normalize: bool = True,
no_scores: bool = False,
only_scores: bool = False,
columns_to_skip_pca: Optional[list[str]] = None,
serialization_method: str = "auto",
use_gpu: bool = True,
enforce_reference_column_order: bool = True
) -> Union[pd.DataFrame, np.ndarray]:
''' Get scores by loading a model and applying the same preprocessing pipeline.
This function loads a trained model and applies it to input data following
the same preprocessing pipeline used during training. The data can be provided
as a DataFrame or read from a database.
Parameters
----------
model_path : str
Path to the saved model file.
data : pd.DataFrame | str, optional
Input data. Can be:
- A pandas DataFrame with the features
- A string path to a CSV file
- None to read from database (requires DB setup)
Default is None.
pca_model : str | PCA, optional
Path to the PCA model file or a PCA model object. If provided, PCA
transformation will be applied. If None, no PCA is used.
Default is None.
mask : list | np.ndarray, optional
Feature mask array of 0s and 1s to filter features before prediction.
Length should match the number of features after preprocessing.
1 means keep the feature, 0 means remove it.
Default is None (no masking applied).
score_columns_list : list[str], optional
List of score column prefixes to identify score columns.
Default is ["SMINA", "VINA", "ODDT", "PLANTS"].
scaler : str, optional
Scaler to use for normalization if scaler_path is not provided.
Options are "standard" or "minmax". If scaler_path is provided, this is ignored.
Default is "standard".
scaler_path : str, optional
Path to a saved scaler file (saved with joblib/pickle). If provided, the saved
scaler will be loaded and used instead of creating a new one. This ensures
the same scaling parameters from training are applied. Default is None.
invert_conditionally : bool, optional
Whether to invert values conditionally (for VINA, SMINA, PLANTS columns).
Default is True.
normalize : bool, optional
Whether to normalize the data. Default is True.
no_scores : bool, optional
If True, remove score columns from the data. Default is False.
only_scores : bool, optional
If True, keep only score columns and metadata. Default is False.
columns_to_skip_pca : list[str], optional
List of columns to skip during PCA transformation. If None, defaults to
metadata columns: ["receptor", "ligand", "name", "type", "db"].
Default is None.
serialization_method : str, optional
Serialization method used to save the model. Options are "joblib" or "pickle".
Default is "joblib".
enforce_reference_column_order : bool, optional
If True, reorder columns to match the `reference_column_order` from the
config file before any preprocessing (scaling/PCA/masking). This is
critical to keep feature alignment consistent with training. Default is True.
Returns
-------
pd.DataFrame | np.ndarray
Predicted scores. Returns a DataFrame if input was a DataFrame (preserving
metadata columns), otherwise returns a numpy array.
Raises
------
FileNotFoundError
If the model file or PCA model file is not found.
ValueError
If data is None and database is not available, or if invalid parameters are provided.
'''
# Check if model file exists
if not os.path.isfile(model_path):
ocerror.Error.file_not_exist(f"Model file not found: {model_path}")
raise FileNotFoundError(f"Model file not found: {model_path}")
# Load the model - IO module now handles format detection automatically
loaded_obj = ocscoreio.load_object(model_path, serialization_method="auto", trusted=True)
# Handle different model formats
# If loaded object is a dict, it might be a state_dict or a dict containing the model
if isinstance(loaded_obj, dict):
# Check if it's a state_dict (PyTorch model weights)
if 'state_dict' in loaded_obj or any(key.startswith(('layer', 'fc', 'linear', 'encoder', 'decoder')) for key in loaded_obj.keys()):
# This is likely a state_dict, but we need the model class to load it
# For now, raise an error asking for the model object
ocerror.Error.value_error("Model file contains a state_dict (weights only), not a complete model. Please load the model class first, then load_state_dict().")
raise ValueError("Model file contains a state_dict (weights only), not a complete model. Please load the model class first, then load_state_dict().")
elif 'model' in loaded_obj:
# Dict contains the model under 'model' key
model = loaded_obj['model']
elif 'network' in loaded_obj:
# Dict contains the model under 'network' key
model = loaded_obj['network']
else:
# Try to find any value that looks like a model object
for key, value in loaded_obj.items():
if hasattr(value, 'predict') or hasattr(value, 'forward'):
model = value
break
else:
ocerror.Error.value_error(f"Model file contains a dict but no model object found. Keys: {list(loaded_obj.keys())}")
raise ValueError(f"Model file contains a dict but no model object found. Keys: {list(loaded_obj.keys())}")
else:
# Loaded object is the model itself
model = loaded_obj
# Set PyTorch models to eval mode for inference
if hasattr(model, 'eval'):
model.eval()
# Determine device for PyTorch models
import torch
if use_gpu and torch.cuda.is_available():
device: torch.device = torch.device('cuda')
else:
device = torch.device('cpu')
# Move PyTorch model to the correct device
if hasattr(model, 'to'):
model = model.to(device)
# Also update model.device if it exists
if hasattr(model, 'device'):
model.device = device
# Fix mask attribute if it's stored as dict/list instead of tensor
# This can happen when models are saved/loaded
def fix_mask_attribute(obj, device=None):
"""Recursively fix mask attributes in model and nested modules."""
if hasattr(obj, 'mask'):
if obj.mask is None:
obj.mask = []
elif isinstance(obj.mask, dict):
# Extract array from dict
mask_value = None
for key in ['mask', 'array', 'feature_mask', 'ablation_mask']:
if key in obj.mask:
mask_value = obj.mask[key]
break
if mask_value is None:
for v in obj.mask.values():
if isinstance(v, (list, np.ndarray, torch.Tensor)):
mask_value = v
break
if mask_value is None:
obj.mask = []
elif isinstance(mask_value, torch.Tensor):
obj.mask = mask_value.float()
elif isinstance(mask_value, np.ndarray):
obj.mask = torch.from_numpy(mask_value).float()
elif isinstance(mask_value, list):
obj.mask = torch.tensor(mask_value, dtype=torch.float32) if mask_value else []
else:
obj.mask = []
# Move to device
if isinstance(obj.mask, torch.Tensor):
target_device = device if device else (obj.device if hasattr(obj, 'device') else torch.device('cpu'))
obj.mask = obj.mask.to(target_device)
elif isinstance(obj.mask, (list, np.ndarray)) and not isinstance(obj.mask, torch.Tensor):
# Convert list/array to tensor
if isinstance(obj.mask, np.ndarray):
obj.mask = torch.from_numpy(obj.mask).float()
elif isinstance(obj.mask, list):
obj.mask = torch.tensor(obj.mask, dtype=torch.float32) if obj.mask else []
# Move to device
if isinstance(obj.mask, torch.Tensor):
target_device = device if device else (obj.device if hasattr(obj, 'device') else torch.device('cpu'))
obj.mask = obj.mask.to(target_device)
# Also check nested modules
if hasattr(obj, 'modules'):
for module in obj.modules():
if module is not obj: # Avoid infinite recursion
fix_mask_attribute(module, device)
# Fix mask in the model and all nested modules
model_device = device
if hasattr(model, 'device') and isinstance(model.device, torch.device):
model_device = model.device
fix_mask_attribute(model, model_device)
# Load or prepare the data
if data is None:
# Try to read from database
try:
import OCDocker.Initialise as init
from OCDocker.DB.Models.Complexes import Complexes
# Check if session is available
if not hasattr(init, 'session') or init.session is None:
ocerror.Error.session_not_created("Database session not available. Please provide data or initialize the database.")
raise ValueError("Database session not available. Please provide data or initialize the database.")
# Read from database
with init.session() as s:
query = s.query(Complexes)
query = _apply_eager_relationship_loading(query, Complexes)
df_db = _build_dataframe_from_complexes_query(
query,
list(getattr(Complexes, "allDescriptors", [])),
batch_size=1000,
)
if df_db.empty:
ocerror.Error.data_not_found("No data found in database.")
raise ValueError("No data found in database.")
data = df_db
except (ImportError, AttributeError) as e:
ocerror.Error.data_not_found(f"Failed to read from database: {e}. Please provide data as DataFrame or file path.")
raise ValueError(f"Failed to read from database: {e}. Please provide data as DataFrame or file path.")
# If data is a string, treat it as a file path
if isinstance(data, str):
if not os.path.isfile(data):
ocerror.Error.file_not_exist(f"Data file not found: {data}")
raise FileNotFoundError(f"Data file not found: {data}")
loaded = ocscoreio.load_data(data)
if not isinstance(loaded, pd.DataFrame):
ocerror.Error.value_error("Loaded data is not a pandas DataFrame.")
raise ValueError("Loaded data is not a pandas DataFrame.")
data = loaded
# Ensure data is a DataFrame
if not isinstance(data, pd.DataFrame):
ocerror.Error.value_error("Data must be a pandas DataFrame or a path to a CSV file.")
raise ValueError("Data must be a pandas DataFrame or a path to a CSV file.")
df = cast(pd.DataFrame, data)
# Enforce column order from config before any preprocessing (scaler/PCA/mask)
if enforce_reference_column_order:
try:
from OCDocker.Config import get_config
config = get_config()
if config.paths.reference_column_order:
df = ocscoredata.reorder_columns_to_match_data_order(
df,
data_source=None,
keep_extra_columns=False,
fill_missing_columns=False
)
else:
ocerror.Error.value_error("reference_column_order not set in config file. Cannot enforce column order.")
raise ValueError("reference_column_order not set in config file. Cannot enforce column order.")
except Exception as e:
ocerror.Error.value_error(f"Failed to enforce reference column order: {e}")
raise
# Store only metadata needed for return format to avoid cloning all feature columns.
original_metadata_cols = ["receptor", "ligand", "name", "type", "db", "experimental"]
available_original_metadata_cols = [col for col in original_metadata_cols if col in df.columns]
original_data = df[available_original_metadata_cols].copy() if available_original_metadata_cols else pd.DataFrame(index=df.index)
is_dataframe = True
# Identify score columns
if score_columns_list:
score_columns = df.filter(regex=f"^({'|'.join(score_columns_list)})").columns.to_list()
else:
score_columns = []
# Apply preprocessing pipeline (similar to preprocess_df)
# Handle score columns
if no_scores:
# Remove score columns
if score_columns:
df = df.drop(columns=score_columns, errors='ignore')
elif only_scores:
# Keep only score columns and metadata
metadata_cols = ["receptor", "ligand", "name", "type", "db"]
columns_to_keep = [col for col in metadata_cols if col in df.columns] + score_columns
df = ocscoredata.remove_other_columns(df, columns_to_keep, inplace=False)
# Invert values conditionally
if invert_conditionally:
df = ocscoredata.invert_values_conditionally(df, inplace=False)
# Normalize data
if normalize:
# Try to load scaler from file if scaler_path is provided
scaler_obj = None
if scaler_path is not None:
if not os.path.isfile(scaler_path):
ocerror.Error.file_not_exist(f"Scaler file not found: {scaler_path}")
raise FileNotFoundError(f"Scaler file not found: {scaler_path}")
try:
scaler_obj = ocscoreio.load_object(scaler_path, serialization_method="auto", trusted=True)
# Verify it's a scaler object
if not isinstance(scaler_obj, (StandardScaler, MinMaxScaler)):
ocerror.Error.value_error(f"File {scaler_path} does not contain a valid scaler object.")
raise ValueError(f"File {scaler_path} does not contain a valid scaler object. Got {type(scaler_obj)}")
except Exception as e:
ocerror.Error.value_error(f"Failed to load scaler from {scaler_path}: {e}")
raise ValueError(f"Failed to load scaler from {scaler_path}: {e}")
# Use the loaded scaler or the scaler string
if scaler_obj is not None:
df = ocscoredata.norm_data(df, scaler=scaler_obj, inplace=False)
else:
# Create a new scaler (this will fit on the prediction data - not ideal but backward compatible)
result = ocscoredata.norm_data(df, scaler=scaler, inplace=False)
# Handle tuple return (DataFrame, scaler) when fitting new scaler
if isinstance(result, tuple):
df = result[0]
else:
df = result
# Apply PCA if pca_model is provided
if pca_model is not None:
# Set default columns to skip PCA
if columns_to_skip_pca is None:
columns_to_skip_pca = ["receptor", "ligand", "name", "type", "db"]
if score_columns:
columns_to_skip_pca.extend(score_columns)
# Apply PCA
df = ocscoredata.apply_pca(
df,
pca_model,
columns_to_skip_pca=columns_to_skip_pca,
inplace=False
)
# Prepare features for prediction (exclude metadata columns)
metadata_cols = ["receptor", "ligand", "name", "type", "db", "experimental"]
feature_cols = [col for col in df.columns if col not in metadata_cols]
X = df[feature_cols].values
# Apply mask if provided
# NOTE: If the model has its own mask (e.g., PyTorch DynamicNN), we should NOT apply
# the external mask here, as the model will apply its own mask in the forward pass.
# The external mask parameter is for models that don't have built-in masking.
model_has_mask = hasattr(model, 'mask') and model.mask is not None and len(model.mask) > 0
if mask is not None and not model_has_mask:
# Only apply external mask if model doesn't have its own mask
# Convert mask to numpy array if it's a list
mask = np.asarray(mask, dtype=bool)
# Validate mask length
if len(mask) != X.shape[1]:
ocerror.Error.value_error(f"Mask length ({len(mask)}) does not match number of features ({X.shape[1]}).")
raise ValueError(f"Mask length ({len(mask)}) does not match number of features ({X.shape[1]}).")
# Apply mask to filter features
X = X[:, mask]
# Make predictions
try:
# Try to use predict method (for sklearn, xgboost, etc.)
if hasattr(model, 'predict'):
predictions = model.predict(X)
# Try to use forward method (for PyTorch models)
elif hasattr(model, 'forward'):
import torch
model.eval()
# Ensure mask is properly formatted before forward pass
# This is critical - the mask must be a tensor, not a dict/list
# Also ensure it's on the same device as the model
if hasattr(model, 'mask'):
if model.mask is None:
# Set to empty list if None (DynamicNN expects list or tensor)
model.mask = []
elif not isinstance(model.mask, torch.Tensor):
# Convert mask to tensor if it's not already
if isinstance(model.mask, dict):
# Extract from dict - try common keys first
mask_value = None
for key in ['mask', 'array', 'feature_mask', 'ablation_mask']:
if key in model.mask:
mask_value = model.mask[key]
break
# If not found, try to find first array-like value
if mask_value is None:
for v in model.mask.values():
if isinstance(v, (list, np.ndarray, torch.Tensor)):
mask_value = v
break
# If still None, try first value
if mask_value is None and model.mask:
first_val = list(model.mask.values())[0]
if isinstance(first_val, (list, np.ndarray, torch.Tensor)):
mask_value = first_val
else:
mask_value = []
elif isinstance(model.mask, (list, np.ndarray)):
mask_value = model.mask
else:
mask_value = []
# Convert to tensor
if isinstance(mask_value, torch.Tensor):
model.mask = mask_value.float()
elif isinstance(mask_value, np.ndarray):
model.mask = torch.from_numpy(mask_value).float()
elif isinstance(mask_value, list):
if len(mask_value) > 0:
model.mask = torch.tensor(mask_value, dtype=torch.float32)
else:
model.mask = []
else:
model.mask = []
# Final safety check: ensure mask is not a dict before forward pass
# This prevents the "dict * int" error in DynamicNN.forward()
if hasattr(model, 'mask') and isinstance(model.mask, dict):
# If still a dict after all conversion attempts, set to empty list
# This will prevent the multiplication error
model.mask = []
with torch.no_grad():
# Ensure writable backing memory to avoid PyTorch warnings with
# read-only NumPy views originating from pandas.
X_input = X
if isinstance(X_input, np.ndarray) and not X_input.flags.writeable:
X_input = np.array(X_input, copy=True)
X_tensor = torch.as_tensor(X_input, dtype=torch.float32, device=device)
# Ensure mask is on the same device as the input tensor
if hasattr(model, 'mask') and isinstance(model.mask, torch.Tensor):
model.mask = model.mask.to(device)
predictions = model(X_tensor)
if isinstance(predictions, torch.Tensor):
predictions = predictions.detach().cpu().numpy()
# Flatten if needed
if isinstance(predictions, np.ndarray) and predictions.ndim > 1 and predictions.shape[1] == 1:
predictions = predictions.flatten()
else:
ocerror.Error.value_error("Model does not have a predict or forward method.")
raise ValueError("Model does not have a predict or forward method.")
except Exception as e:
import traceback
error_details = traceback.format_exc()
ocerror.Error.value_error(f"Error during prediction: {e}\n{error_details}")
raise ValueError(f"Error during prediction: {e}\n{error_details}")
# Return results in appropriate format
if is_dataframe:
# Create result DataFrame with metadata if available
# Only include metadata columns that actually exist in the original data
available_metadata_cols = [col for col in metadata_cols if col in original_data.columns]
if available_metadata_cols:
result = original_data[available_metadata_cols].copy()
else:
result = pd.DataFrame()
result['predicted_score'] = predictions
return result
else:
return predictions