OCScore Pipeline Examples¶
These examples demonstrate how to train and use OCScore models for consensus scoring.
Training Models from Database¶
Example of training machine learning models using data from the OCDocker database:
#!/usr/bin/env python3
"""
Example: Train Model from Database
This example demonstrates how to train machine learning models (DNN or XGBoost) using data
from the OCDocker database. The script:
1. Reads data from the OCDocker database (or CSV file)
2. Applies the full preprocessing pipeline (outlier removal, normalization, PCA, etc.)
3. Finds the best hyperparameters across multiple Optuna studies
4. Trains a model using the best hyperparameters
5. Saves the trained model and feature mask to OCScore_models directory
The script can work with:
- Database data (using --from_db flag)
- CSV files (using --df_path flag)
- Multiple Optuna studies (finds best trial across all studies)
- DNN or XGBoost models
- Custom preprocessing options (PCA, feature selection, etc.)
Usage:
# Train XGBoost model from CSV
python examples/11_python_api_train_model_from_db.py --model_type XGB --model_name my_model --df_path data.csv
# Train DNN model from database with GPU
python examples/11_python_api_train_model_from_db.py --from_db --model_type DNN --model_name my_model --use_gpu
# Train DNN model using best trial from multiple Optuna studies
python examples/11_python_api_train_model_from_db.py --from_db --model_type DNN --studies NN_Optimization_6 NN_Optimization_7 NN_Optimization_8 NN_Optimization_9 NN_Optimization_10 --use_gpu
"""
import argparse
import hashlib
import json
import os
import platform
import shlex
import subprocess
import sys
import numpy as np
import pandas as pd
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from typing import Any, List, Optional
from urllib.parse import quote_plus, urlparse, urlunparse
# Add parent directory to path to allow importing OCDocker
# This allows the script to be run from any directory
_script_dir = os.path.dirname(os.path.abspath(__file__))
_parent_dir = os.path.dirname(_script_dir)
if _parent_dir not in sys.path:
sys.path.insert(0, _parent_dir)
# Import OCDocker modules
import OCDocker.Initialise as init
import OCDocker.OCScore.Optimization.XGBoost as ocxgb
import OCDocker.OCScore.Utils.Data as ocscoredata
import OCDocker.OCScore.Utils.IO as ocscoreio
from OCDocker.DB.DB import export_db_to_csv
from OCDocker.DB.Models import Complexes, Ligands, Receptors
from OCDocker.OCScore.DNN.DNNOptimizer import DNNOptimizer
def _normalize_db_backend(raw_backend: str) -> str:
'''Normalize backend aliases used by config and environment variables.
Parameters
----------
raw_backend : str
Raw backend string from config or environment variable (e.g., "postgresql", "mysql", "sqlite")
Returns
-------
str
Normalized backend string ("postgresql", "mysql", or "sqlite")
'''
backend = str(raw_backend).strip().lower()
if backend in ('postgresql', 'postgres', 'pgsql'):
return 'postgresql'
if backend in ('mysql', 'mariadb'):
return 'mysql'
if backend in ('sqlite', 'sqlite3'):
return 'sqlite'
return 'postgresql'
def _sqlite_storage_for_model(model_type: str) -> str:
'''Return a local SQLite storage URL for Optuna studies.
Parameters
----------
model_type : str
Type of model (e.g., "DNN", "XGB") to determine which SQLite file to use
Returns
-------
str
SQLite storage URL (e.g., "sqlite:///NN_optimization.db")
'''
mt = str(model_type).upper()
if mt in ('DNN', 'NN'):
return "sqlite:///NN_optimization.db"
if mt == 'XGB':
return "sqlite:///XGB_optimization.db"
return "sqlite:///model_optimization.db"
def _build_storage_url_from_config(config: Any, model_type: str) -> str:
'''Build Optuna storage URL from OCDocker config and backend settings.
Parameters
----------
config : Any
OCDocker config object (after bootstrap)
model_type : str
Type of model (e.g., "DNN", "XGB") to determine which SQLite file to use if backend is SQLite
Returns
-------
str
Optuna storage URL (e.g., "postgresql+psycopg://user:***@host:port/db" or "sqlite:///NN_optimization.db")
'''
backend_env = os.getenv('OCDOCKER_DB_BACKEND', '') or os.getenv('DB_BACKEND', '')
backend_cfg = getattr(config.database, 'backend', '')
backend = _normalize_db_backend(backend_env or backend_cfg or 'postgresql')
if backend == 'sqlite':
return _sqlite_storage_for_model(model_type)
user = config.database.user
password = config.database.password
host = config.database.host
port = config.database.port
db = config.database.optimizedb # OPTIMIZEDB stores Optuna studies
if backend == 'mysql':
driver = "mysql+pymysql"
else:
driver = "postgresql+psycopg"
return f"{driver}://{user}:{quote_plus(password)}@{host}:{port}/{db}"
def load_data_from_database(session: Session, methodology: Optional[str] = None) -> pd.DataFrame:
''' Load data from the database and convert to DataFrame format expected by the pipeline.
Parameters
----------
session : sqlalchemy.orm.session.Session
Database session
methodology : str, optional
Filter by methodology (e.g., 'NN', 'XGB', 'Trans', 'AE', 'PCA', 'PCA80', 'PCA85', 'PCA90', 'PCA95', 'GA', 'ScoreOnly', 'NoScores').
If None, fetches all data. Filters by complex name pattern that matches the methodology.
Returns
-------
pd.DataFrame
DataFrame with data from database, formatted for the training pipeline.
'''
# Query complexes with methodology filter if provided
if methodology:
# Filter by complex name containing the methodology pattern
# Methodology can be: NN, XGB, Trans, AE, PCA, PCA80, PCA85, PCA90, PCA95, GA, ScoreOnly, NoScores
merged_data = session.query(Complexes.Complexes, Ligands.Ligands, Receptors.Receptors)\
.join(Ligands.Ligands, Ligands.Ligands.id == Complexes.Complexes.ligand_id)\
.join(Receptors.Receptors, Receptors.Receptors.id == Complexes.Complexes.receptor_id)\
.filter(Complexes.Complexes.name.like(f'%{methodology}%'))\
.all()
# Convert to DataFrame format manually
result = []
for complex_obj, ligand, receptor in merged_data:
merged_entry = {
'name': complex_obj.name,
**{key: value for key, value in complex_obj.__dict__.items()
if not key.startswith('_') and key not in ['created_at', 'modified_at', 'id', 'name', 'ligand_id', 'receptor_id']},
**{key: value for key, value in ligand.__dict__.items()
if not key.startswith('_') and key not in ['created_at', 'modified_at', 'id', 'name']},
**{key: value for key, value in receptor.__dict__.items()
if not key.startswith('_') and key not in ['created_at', 'modified_at', 'id', 'name']},
'receptor': receptor.name,
'ligand': ligand.name.split('_')[-1] if ligand.name else None
}
result.append(merged_entry)
# Get column order
complex_columns = [c.name for c in Complexes.Complexes.__table__.columns
if c.name not in ['created_at', 'modified_at', 'id', 'name', 'ligand_id', 'receptor_id']]
ligand_columns = [c.name for c in Ligands.Ligands.__table__.columns
if c.name not in ['created_at', 'modified_at', 'id', 'name']]
receptor_columns = [c.name for c in Receptors.Receptors.__table__.columns
if c.name not in ['created_at', 'modified_at', 'id', 'name']]
column_order = ['name'] + complex_columns + receptor_columns + ligand_columns + ['receptor', 'ligand']
result = [{col: entry.get(col, None) for col in column_order} for entry in result]
# Drop rows with missing values
result = [entry for entry in result if all(value is not None for value in entry.values())]
df = pd.DataFrame(result)
else:
# Export all data from database (OCSCORE can be empty - it's the target we predict)
print("Exporting data from database...")
result = export_db_to_csv(
session=session,
output_format='dataframe',
drop_na=False # Don't drop NA - OCSCORE will be empty and that's expected
)
# Ensure we have a DataFrame
if result is None:
raise ValueError("Failed to export data from database")
df = result if isinstance(result, pd.DataFrame) else pd.DataFrame()
if df.empty:
raise ValueError("Database is empty. No complexes found in the database.")
print(f"Exported {len(df)} rows from database")
# Drop OCSCORE column if it exists and is all NaN (it's the target, not a feature)
if 'OCSCORE' in df.columns:
ocscore_na_count = df['OCSCORE'].isna().sum()
if ocscore_na_count == len(df):
print("Dropping OCSCORE column (all NaN - this is the target variable, not a feature)")
df = df.drop(columns=['OCSCORE'])
else:
print(f"Warning: OCSCORE column has {len(df) - ocscore_na_count} non-NaN values. This should typically be empty for training data.")
# Check if 'name' column exists
if 'name' not in df.columns:
print(f"Warning: 'name' column not found in DataFrame. Available columns: {list(df.columns)}")
# Try to find an alternative column name
name_col = None
for col in ['name', 'complex_name', 'Name', 'complexName']:
if col in df.columns:
name_col = col
break
if name_col is None:
raise ValueError(f"Could not find 'name' column in DataFrame. Available columns: {list(df.columns)}")
# Rename the column to 'name' for consistency
df = df.rename(columns={name_col: 'name'})
# Add 'db' column based on name pattern (if not already present)
if 'db' not in df.columns:
# Check if name ends with 'ligand' for PDBbind, otherwise DUDEz
df['db'] = df['name'].apply(
lambda x: 'PDBBIND' if str(x).endswith('ligand') else 'DUDEZ'
)
# Add 'type' column for DUDEz data (ligand vs decoy)
if 'type' not in df.columns:
df['type'] = df['name'].apply(
lambda x: 'ligand' if 'ligand' in str(x).lower() else 'decoy'
)
# For PDBbind, set type to 'ligand'
df.loc[df['db'] == 'PDBBIND', 'type'] = 'ligand'
# Add 'experimental' column
# For PDBbind: use experimental binding affinity (if available) or a placeholder
# For DUDEz: use type (1 for ligand, 0 for decoy)
if 'experimental' not in df.columns:
df['experimental'] = df.apply(
lambda row: 1.0 if row['type'] == 'ligand' else 0.0,
axis=1
)
# If there's a binding affinity column, use it for PDBbind
# You may need to adjust this based on your actual database schema
if 'binding_affinity' in df.columns:
df.loc[df['db'] == 'PDBBIND', 'experimental'] = df.loc[df['db'] == 'PDBBIND', 'binding_affinity']
# Always return DataFrame (we'll handle CSV saving separately if needed)
return df
def get_best_nn_trial_from_study(
study_name: str,
storage: str,
selection_metric: str = "combined"
) -> dict[str, Any]:
'''Return best complete NN trial from one study.
Parameters
----------
study_name : str
Optuna NN study name.
storage : str
Optuna storage URL.
selection_metric : str, optional
One of ``combined``, ``rmse``, ``auc``.
- ``combined``: minimize RMSE - AUC
- ``rmse``: minimize RMSE
- ``auc``: maximize AUC
'''
import optuna
study = optuna.load_study(study_name=study_name, storage=storage)
study_df = study.trials_dataframe()
study_df = study_df[study_df["state"] == "COMPLETE"].copy()
if study_df.empty:
raise ValueError(f"No complete trials found in {study_name}")
if "value" not in study_df.columns:
raise ValueError(f"Study {study_name} does not contain trial value (RMSE)")
study_df = study_df.dropna(subset=["value"]).copy()
if study_df.empty:
raise ValueError(f"Study {study_name} has no complete trials with RMSE")
if "user_attrs_AUC" in study_df.columns:
study_df["user_attrs_AUC"] = pd.to_numeric(study_df["user_attrs_AUC"], errors="coerce")
else:
study_df["user_attrs_AUC"] = np.nan
study_df["combined_metric"] = study_df["value"] - study_df["user_attrs_AUC"]
metric = str(selection_metric).lower().strip()
if metric == "rmse":
best_row = study_df.sort_values(
by=["value", "user_attrs_AUC"],
ascending=[True, False]
).iloc[0]
elif metric == "auc":
auc_df = study_df.dropna(subset=["user_attrs_AUC"]).copy()
if auc_df.empty:
raise ValueError(f"Study {study_name} has no complete trials with AUC")
best_row = auc_df.sort_values(
by=["user_attrs_AUC", "value"],
ascending=[False, True]
).iloc[0]
else:
combined_df = study_df.dropna(subset=["user_attrs_AUC"]).copy()
if combined_df.empty:
raise ValueError(f"Study {study_name} has no complete trials with AUC required for combined metric")
best_row = combined_df.sort_values(
by=["combined_metric", "value", "user_attrs_AUC"],
ascending=[True, True, False]
).iloc[0]
best_trial = study.trials[int(best_row["number"])]
return {
"study_name": study_name,
"trial": best_trial,
"rmse": float(best_row["value"]),
"auc": float(best_row["user_attrs_AUC"]),
"combined_metric": float(best_row["combined_metric"])
}
def get_best_ao_trial_from_study(study_name: str, storage: str) -> dict[str, Any]:
'''Return best complete AO trial from one study using RMSE then val RMSE.'''
import optuna
study = optuna.load_study(study_name=study_name, storage=storage)
study_df = study.trials_dataframe()
study_df = study_df[study_df["state"] == "COMPLETE"].copy()
if study_df.empty:
raise ValueError(f"No complete trials found in {study_name}")
study_df = study_df.dropna(subset=["value"]).copy()
if study_df.empty:
raise ValueError(f"Study {study_name} has no complete trials with RMSE")
if "user_attrs_val_rmse" in study_df.columns:
study_df["user_attrs_val_rmse"] = pd.to_numeric(study_df["user_attrs_val_rmse"], errors="coerce").fillna(np.inf)
else:
study_df["user_attrs_val_rmse"] = np.inf
best_row = study_df.sort_values(
by=["value", "user_attrs_val_rmse"],
ascending=[True, True]
).iloc[0]
best_trial = study.trials[int(best_row["number"])]
return {
"study_name": study_name,
"trial": best_trial,
"rmse": float(best_row["value"]),
"val_rmse": float(best_row["user_attrs_val_rmse"])
}
def evaluate_trained_dnn_model(model: Any, data: dict) -> dict[str, float]:
'''Evaluate trained DNN using RMSE on X_test and AUC on X_val.'''
import torch
from sklearn.metrics import roc_auc_score
device = next(model.parameters()).device
model.eval()
with torch.no_grad():
X_test_tensor = torch.tensor(np.asarray(data["X_test"]), dtype=torch.float32, device=device)
test_pred = model(X_test_tensor).detach().cpu().numpy().reshape(-1)
y_test_np = np.asarray(data["y_test"]).reshape(-1)
rmse = float(np.sqrt(np.mean((test_pred - y_test_np) ** 2)))
auc = float("nan")
auc_adjusted = float("nan")
if data.get("X_val") is not None and data.get("y_val") is not None:
with torch.no_grad():
X_val_tensor = torch.tensor(np.asarray(data["X_val"]), dtype=torch.float32, device=device)
val_pred = model(X_val_tensor).detach().cpu().numpy().reshape(-1)
y_val_np = np.asarray(data["y_val"]).reshape(-1)
unique_classes = np.unique(y_val_np[~np.isnan(y_val_np)])
if unique_classes.size >= 2:
auc = float(roc_auc_score(y_val_np, val_pred))
auc_adjusted = float(max(auc, 1.0 - auc))
return {"rmse": rmse, "auc": auc, "auc_adjusted": auc_adjusted}
def _json_sanitize(value: Any) -> Any:
'''Convert nested values to JSON-serializable builtin types.'''
if isinstance(value, dict):
return {str(k): _json_sanitize(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_json_sanitize(v) for v in value]
if isinstance(value, np.integer):
return int(value)
if isinstance(value, np.floating):
if np.isnan(value) or np.isinf(value):
return None
return float(value)
if isinstance(value, np.ndarray):
return [_json_sanitize(v) for v in value.tolist()]
if isinstance(value, float):
if np.isnan(value) or np.isinf(value):
return None
return value
if isinstance(value, (str, int, bool)) or value is None:
return value
return str(value)
def _sha256_file(path: str, chunk_size: int = 1024 * 1024) -> Optional[str]:
'''Compute SHA256 checksum for a file path.'''
if not os.path.isfile(path):
return None
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def _git_head_info(repo_dir: str) -> dict[str, Optional[str]]:
'''Collect git HEAD and dirty status (best-effort).'''
head = None
dirty = None
try:
head = subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=repo_dir,
text=True,
stderr=subprocess.DEVNULL
).strip()
dirty_state = subprocess.check_output(
["git", "status", "--porcelain"],
cwd=repo_dir,
text=True,
stderr=subprocess.DEVNULL
).strip()
dirty = "yes" if dirty_state else "no"
except Exception:
pass
return {"head": head, "dirty": dirty}
def main():
parser = argparse.ArgumentParser(description='Train model from database or CSV file')
parser.add_argument('--from_db', action='store_true',
help='Read data from database instead of CSV file')
parser.add_argument('--methodology', type=str, default=None,
help='Filter by methodology (e.g., NN, XGB, Trans, AE, PCA, PCA80, PCA85, PCA90, PCA95, GA, ScoreOnly, NoScores). Only used with --from_db')
parser.add_argument('--df_path', type=str, default=None,
help='Path to CSV file (required if --from_db is not set)')
parser.add_argument('--model_type', type=str, choices=['XGB', 'DNN'], default='XGB',
help='Type of model to train (XGB or DNN)')
parser.add_argument('--model_name', type=str, default='OCScore',
help='Name for the saved model and mask (default: OCScore)')
parser.add_argument('--storage_id', type=int, default=1,
help='Storage ID for the study')
parser.add_argument('--storage', type=str, default=None,
help='Optuna storage URL (default: fetched from config or model type)')
parser.add_argument('--studies', type=str, nargs='+', default=None,
help='List of Optuna study names to use (e.g., "NN_Optimization_6" "NN_Optimization_7"). Default: AE with NN studies (6-10)')
parser.add_argument('--use_pca', action='store_true',
help='Use PCA preprocessing')
parser.add_argument('--pca_model', type=str, default="",
help='Path to PCA model file')
parser.add_argument('--pca_type', type=int, default=95,
help='PCA variance percentage')
parser.add_argument('--no_scores', action='store_true',
help='Exclude score columns')
parser.add_argument('--only_scores', action='store_true',
help='Use only score columns')
parser.add_argument('--use_pdb_train', action='store_true', default=True,
help='Use PDBbind for training')
parser.add_argument('--use_gpu', action='store_true',
help='Use GPU for training')
parser.add_argument('--random_seed', type=int, default=42,
help='Random seed')
parser.add_argument('--invert_conditionally', action='store_true', default=True,
help='Invert score values conditionally (default: True)')
parser.add_argument('--no_invert', action='store_true',
help='Disable conditional inversion of scores')
parser.add_argument('--normalize', action='store_true', default=True,
help='Normalize data (default: True)')
parser.add_argument('--no_normalize', action='store_true',
help='Disable data normalization')
parser.add_argument('--scaler', type=str, choices=['standard', 'minmax'], default='standard',
help='Scaler type for normalization (default: standard)')
parser.add_argument('--output_dir', type=str, default=None,
help='Output directory for saving models and masks. If not provided, uses default OCScore_models directory.')
parser.add_argument('--verbose', action='store_true', default=True,
help='Verbose output')
parser.add_argument('--pair_by_major', action='store_true',
help='For each major N, pair best AO_Optimization_N + NN_Optimization_N, train one model per N, and keep the best performer.')
parser.add_argument('--major_numbers', type=int, nargs='+', default=[6, 7, 8, 9, 10],
help='Major IDs used by --pair_by_major (default: 6 7 8 9 10)')
parser.add_argument('--pair_select_by', type=str, choices=['combined', 'rmse', 'auc'], default='combined',
help='Selection metric for --pair_by_major: combined=min(RMSE-AUC), rmse=min(RMSE), auc=max(AUC).')
args = parser.parse_args()
# Get default studies if not provided (AE with NN: studies 6-10)
if args.studies is None:
try:
from OCDocker.OCScore.Analysis.PerformanceEvaluation import get_all_lists
_, ao_nn_list_len, _ = get_all_lists()
# AE with NN studies are NN_Optimization_6 through NN_Optimization_10
args.studies = [f"NN_Optimization_{i}" for i in range(6, 6 + ao_nn_list_len)]
print(f"Using default AE with NN studies: {args.studies}")
except ImportError:
# Fallback if function not available
args.studies = [f"NN_Optimization_{i}" for i in range(6, 11)]
print(f"Using fallback AE with NN studies: {args.studies}")
# Get default storage if not provided - read from config after bootstrap
if args.storage is None:
# We'll set this after bootstrap when config is available
args.storage = None # Will be set after bootstrap
print(f"Training with studies: {args.studies}")
# Shared preprocessing flags (applies to both DB and CSV paths)
invert_conditionally = args.invert_conditionally and not args.no_invert
normalize = args.normalize and not args.no_normalize
# Check if we need database or CSV
if args.from_db:
# Initialize OCDocker (this sets up database connection)
# Pass a namespace to bootstrap to avoid parsing sys.argv (we handle our own args)
print("Initializing OCDocker...")
import OCDocker.Error as ocerror
bootstrap_ns = argparse.Namespace(
multiprocess=True,
update=False,
config_file=os.getenv('OCDOCKER_CONFIG', 'OCDocker.cfg'),
output_level=ocerror.ReportLevel.WARNING,
overwrite=False
)
init.bootstrap(bootstrap_ns)
# Get database session
if not hasattr(init, 'session') or init.session is None:
print("ERROR: Database session not available. Please ensure database is configured.")
sys.exit(1)
# Get storage from config if not provided
if args.storage is None:
from OCDocker.Config import get_config
config = get_config()
args.storage = _build_storage_url_from_config(config, args.model_type)
print(f"Using storage from config: {mask_password_in_url(args.storage)}")
# Prepare data from database
with init.session() as session:
data = prepare_data_from_db(
session=session,
storage_id=args.storage_id,
optimization_type=args.model_type,
pca_model=args.pca_model,
use_PCA=args.use_pca,
pca_type=args.pca_type,
no_scores=args.no_scores,
only_scores=args.only_scores,
use_pdb_train=args.use_pdb_train,
random_seed=args.random_seed,
invert_conditionally=invert_conditionally,
normalize=normalize,
scaler=args.scaler,
methodology=args.methodology
)
else:
# Read from CSV file
if args.df_path is None:
print("ERROR: Either --from_db or --df_path must be provided.")
sys.exit(1)
if not os.path.exists(args.df_path):
print(f"ERROR: CSV file not found: {args.df_path}")
sys.exit(1)
# Get storage from config if not provided (for CSV mode, we still need storage for Optuna)
if args.storage is None:
# Try to bootstrap to get config (but don't require database connection)
try:
import OCDocker.Error as ocerror
bootstrap_ns = argparse.Namespace(
multiprocess=True,
update=False,
config_file=os.getenv('OCDOCKER_CONFIG', 'OCDocker.cfg'),
output_level=ocerror.ReportLevel.WARNING,
overwrite=False
)
init.bootstrap(bootstrap_ns)
from OCDocker.Config import get_config
config = get_config()
args.storage = _build_storage_url_from_config(config, args.model_type)
print(f"Using storage from config: {mask_password_in_url(args.storage)}")
except Exception as e:
# Fallback to SQLite if config read fails
print(f"Warning: Could not read config for storage, using SQLite fallback: {e}")
args.storage = _sqlite_storage_for_model(args.model_type)
print(f"Using fallback storage: {mask_password_in_url(args.storage)}")
# Use existing load_data function for CSV
# Note: load_data requires base_models_folder, so we use default (not used for anything)
base_models_folder = ocscoreio.get_models_dir()
data = ocscoredata.load_data(
base_models_folder=base_models_folder,
storage_id=args.storage_id,
df_path=args.df_path,
optimization_type=args.model_type,
pca_model=args.pca_model,
no_scores=args.no_scores,
only_scores=args.only_scores,
use_PCA=args.use_pca,
pca_type=args.pca_type,
use_pdb_train=args.use_pdb_train,
random_seed=args.random_seed,
invert_conditionally=invert_conditionally,
normalize=normalize,
scaler=args.scaler,
enforce_reference_order=True
)
print(f"Data prepared: Train={data['X_train'].shape}, Test={data['X_test'].shape}")
if data.get('X_val') is not None:
print(f"Validation: {data['X_val'].shape}")
# CSV pipeline compatibility: load_data() may create `<models_dir>/<model_type>_<storage_id>`
# even though script 11 does not use that directory for outputs.
# Remove it only when it's empty to avoid leaving clutter like `DNN_1`.
if (not args.from_db) and isinstance(data, dict):
csv_models_folder = data.get("models_folder")
if isinstance(csv_models_folder, str):
try:
if os.path.isdir(csv_models_folder) and len(os.listdir(csv_models_folder)) == 0:
os.rmdir(csv_models_folder)
if args.verbose:
print(f"Removed unused empty folder: {csv_models_folder}")
except OSError:
# Keep silent if folder is non-empty or cannot be removed.
pass
selected_nn_trial_obj: Optional[Any] = None
selected_ao_params: Optional[dict[str, Any]] = None
selection_context: dict[str, Any] = {}
final_eval_summary: Optional[dict[str, Any]] = None
pairwise_summary_df = None
if args.pair_by_major:
if args.model_type != 'DNN':
print("ERROR: --pair_by_major is only supported with --model_type DNN.")
sys.exit(1)
major_numbers = sorted(set(args.major_numbers))
if len(major_numbers) == 0:
print("ERROR: No major numbers provided for --pair_by_major.")
sys.exit(1)
print(f"\n{'='*60}")
print("PAIRWISE MAJOR SELECTION (AO_N + NN_N)")
print(f"{'='*60}")
print(f"Majors: {major_numbers}")
print("Selecting the best major from study metrics, then training only that pair.")
print(f"{'='*60}\n")
pairwise_candidates: list[dict[str, Any]] = []
for major in major_numbers:
nn_study_name = f"NN_Optimization_{major}"
ao_study_name = f"AO_Optimization_{major}"
print(f"\n{'-'*60}")
print(f"Major {major}: selecting best AO + best NN")
print(f" AO study: {ao_study_name}")
print(f" NN study: {nn_study_name}")
print(f"{'-'*60}")
try:
nn_best = get_best_nn_trial_from_study(
nn_study_name,
args.storage,
selection_metric=args.pair_select_by
)
ao_best = get_best_ao_trial_from_study(ao_study_name, args.storage)
except Exception as e:
print(f" Warning: skipping major {major} due to study loading error: {e}")
continue
print(
f" Selected NN trial {nn_best['trial'].number}: "
f"RMSE={nn_best['rmse']:.4f}, AUC={nn_best['auc']:.4f}, Combined={nn_best['combined_metric']:.4f}"
)
print(
f" Selected AO trial {ao_best['trial'].number}: "
f"RMSE={ao_best['rmse']:.4f}, Val_RMSE={ao_best['val_rmse']:.4f}"
)
pairwise_candidates.append({
"major": major,
"nn_study": nn_study_name,
"nn_trial": int(nn_best["trial"].number),
"ao_study": ao_study_name,
"ao_trial": int(ao_best["trial"].number),
"nn_optuna_rmse": float(nn_best["rmse"]),
"nn_optuna_auc": float(nn_best["auc"]),
"nn_optuna_combined_metric": float(nn_best["combined_metric"]),
"ao_optuna_rmse": float(ao_best["rmse"]),
"ao_optuna_val_rmse": float(ao_best["val_rmse"]),
"nn_trial_obj": nn_best["trial"],
"ao_params": ao_best["trial"].params
})
if len(pairwise_candidates) == 0:
print("ERROR: No pairwise study candidates were found.")
sys.exit(1)
if args.pair_select_by == "rmse":
sort_key = lambda x: (x["nn_optuna_rmse"], x["ao_optuna_rmse"], x["ao_optuna_val_rmse"])
sort_label = "NN Optuna RMSE (ascending)"
elif args.pair_select_by == "auc":
sort_key = lambda x: (-x["nn_optuna_auc"], x["nn_optuna_rmse"], x["nn_optuna_combined_metric"])
sort_label = "NN Optuna AUC (descending)"
else:
sort_key = lambda x: (x["nn_optuna_combined_metric"], x["nn_optuna_rmse"], -x["nn_optuna_auc"])
sort_label = "NN Optuna RMSE - AUC (ascending)"
pairwise_candidates = sorted(pairwise_candidates, key=sort_key)
best_pair = pairwise_candidates[0]
selected_nn_trial_obj = best_pair["nn_trial_obj"]
selected_ao_params = best_pair["ao_params"]
selection_context = {
"mode": "pair_by_major",
"pair_select_by": args.pair_select_by,
"selected_major": int(best_pair["major"]),
"selected_nn_study": best_pair["nn_study"],
"selected_nn_trial": int(best_pair["nn_trial"]),
"selected_ao_study": best_pair["ao_study"],
"selected_ao_trial": int(best_pair["ao_trial"]),
"selected_optuna_metrics": {
"nn_optuna_rmse": float(best_pair["nn_optuna_rmse"]),
"nn_optuna_auc": float(best_pair["nn_optuna_auc"]),
"nn_optuna_combined_metric": float(best_pair["nn_optuna_combined_metric"]),
"ao_optuna_rmse": float(best_pair["ao_optuna_rmse"]),
"ao_optuna_val_rmse": float(best_pair["ao_optuna_val_rmse"])
}
}
display_cols = [
"major", "nn_trial", "ao_trial",
"nn_optuna_rmse", "nn_optuna_auc", "nn_optuna_combined_metric",
"ao_optuna_rmse", "ao_optuna_val_rmse"
]
print(f"\n{'='*60}")
print(f"PAIRWISE CANDIDATES (sorted by {sort_label})")
print(f"{'='*60}")
print(
pd.DataFrame(
[{k: v for k, v in row.items() if k in display_cols} for row in pairwise_candidates]
)[display_cols].to_string(index=False, float_format=lambda x: f"{x:.4f}")
)
print(f"{'='*60}")
print(f"Selected major: {best_pair['major']}")
print(f"Selected NN study/trial: {best_pair['nn_study']} / {best_pair['nn_trial']}")
print(f"Selected AO study/trial: {best_pair['ao_study']} / {best_pair['ao_trial']}")
print(f"Selected NN Optuna RMSE: {best_pair['nn_optuna_rmse']:.4f}")
print(f"Selected NN Optuna AUC: {best_pair['nn_optuna_auc']:.4f}")
print(f"Selected NN Optuna Combined: {best_pair['nn_optuna_combined_metric']:.4f}")
print(f"Selection criterion: {args.pair_select_by}")
print(f"{'='*60}\n")
print(f"\n{'='*60}")
print("TRAINING SELECTED BEST PAIR")
print(f"{'='*60}")
model, mask = train_dnn_model(
data=data,
storage=args.storage,
storage_id=int(best_pair["major"]),
model_name=args.model_name,
optimization_type=args.model_type,
use_PCA=args.use_pca,
pca_type=args.pca_type,
no_scores=args.no_scores,
only_scores=args.only_scores,
study_name=str(best_pair["nn_study"]),
use_gpu=args.use_gpu,
verbose=args.verbose,
best_trial=best_pair["nn_trial_obj"],
encoder_params_override=best_pair["ao_params"],
random_seed=args.random_seed
)
best_eval = evaluate_trained_dnn_model(model, data)
best_eval_auc = best_eval["auc"]
best_eval_combined = best_eval["rmse"] - best_eval_auc if not np.isnan(best_eval_auc) else float("inf")
print(
f"Selected pair evaluation: RMSE={best_eval['rmse']:.4f}, "
f"AUC={best_eval_auc:.4f}, AUC_adj={best_eval['auc_adjusted']:.4f}, "
f"Combined={best_eval_combined:.4f}"
)
final_eval_summary = {
"rmse": float(best_eval["rmse"]),
"auc": float(best_eval_auc),
"auc_adjusted": float(best_eval["auc_adjusted"]),
"combined_metric": float(best_eval_combined)
}
pairwise_summary_rows = []
for row in pairwise_candidates:
summary_row = {k: v for k, v in row.items() if k not in {"nn_trial_obj", "ao_params"}}
selected = int(row["major"] == best_pair["major"])
summary_row["selected"] = selected
summary_row["selection_criterion"] = args.pair_select_by
if selected:
summary_row["eval_rmse"] = float(best_eval["rmse"])
summary_row["eval_auc"] = float(best_eval_auc)
summary_row["eval_auc_adjusted"] = float(best_eval["auc_adjusted"])
summary_row["eval_combined_metric"] = float(best_eval_combined)
else:
summary_row["eval_rmse"] = np.nan
summary_row["eval_auc"] = np.nan
summary_row["eval_auc_adjusted"] = np.nan
summary_row["eval_combined_metric"] = np.nan
pairwise_summary_rows.append(summary_row)
pairwise_summary_df = pd.DataFrame(pairwise_summary_rows)
else:
# Find the best trial across ALL studies
print(f"\n{'='*60}")
print(f"FINDING BEST TRIAL ACROSS ALL STUDIES")
print(f"{'='*60}")
import optuna
import re
all_trials = [] # List of (study_name, trial, combined_metric)
for study_name in args.studies:
print(f"Loading study: {study_name}...")
try:
study = optuna.load_study(study_name=study_name, storage=args.storage)
study_df = study.trials_dataframe()
study_df = study_df[study_df['state'] == 'COMPLETE']
if len(study_df) == 0:
print(f" Warning: No complete trials in {study_name}, skipping...")
continue
# Compute combined metric (RMSE - AUC) for all trials
study_df['combined_metric'] = study_df['value'] - study_df['user_attrs_AUC']
# Add all trials from this study
for _, row in study_df.iterrows():
trial = study.trials[row['number']]
all_trials.append({
'study_name': study_name,
'trial': trial,
'combined_metric': row['combined_metric'],
'rmse': row['value'],
'auc': row['user_attrs_AUC']
})
print(f" Found {len(study_df)} complete trials")
except Exception as e:
print(f" Error loading {study_name}: {e}")
continue
if len(all_trials) == 0:
print("ERROR: No trials found in any study!")
sys.exit(1)
# Find the best trial (lowest combined_metric = RMSE - AUC)
best_trial_info = min(all_trials, key=lambda x: x['combined_metric'])
best_study_name = best_trial_info['study_name']
best_trial = best_trial_info['trial']
selected_nn_trial_obj = best_trial
selection_context = {
"mode": "global_best_trial",
"selected_study": best_study_name,
"selected_trial": int(best_trial.number),
"selected_optuna_metrics": {
"rmse": float(best_trial_info["rmse"]),
"auc": float(best_trial_info["auc"]),
"combined_metric": float(best_trial_info["combined_metric"])
}
}
print(f"\n{'='*60}")
print(f"BEST TRIAL FOUND")
print(f"{'='*60}")
print(f"Study: {best_study_name}")
print(f"Trial number: {best_trial.number}")
print(f"RMSE: {best_trial_info['rmse']:.4f}")
print(f"AUC: {best_trial_info['auc']:.4f}")
print(f"Combined metric (RMSE - AUC): {best_trial_info['combined_metric']:.4f}")
print(f"Total trials evaluated: {len(all_trials)}")
print(f"{'='*60}\n")
# Extract storage_id from best study name
match = re.search(r'_(\d+)$', best_study_name)
if match:
study_storage_id = int(match.group(1))
else:
study_storage_id = args.storage_id # Fallback to provided storage_id
# Train only ONE model using the best trial
print(f"\n{'='*60}")
print(f"TRAINING MODEL WITH BEST PIPELINE")
print(f"{'='*60}")
print(f"Using best trial from: {best_study_name}")
print(f"{'='*60}\n")
if args.model_type == 'XGB':
model, mask = train_xgboost_model(
data=data,
storage=args.storage,
storage_id=study_storage_id,
model_name=args.model_name, # Use base name, not study-specific
optimization_type=args.model_type,
use_PCA=args.use_pca,
pca_type=args.pca_type,
no_scores=args.no_scores,
only_scores=args.only_scores,
study_name=best_study_name, # Pass best study name for context
use_gpu=args.use_gpu,
verbose=args.verbose
)
else: # DNN
# Extract AO study numbers from NN study names (for finding best AE)
# Extract numbers from study names like "NN_Optimization_6" -> [6, 7, 8, 9, 10]
ao_study_numbers = []
for study_name in args.studies:
match = re.search(r'_(\d+)$', study_name)
if match:
study_num = int(match.group(1))
# Only include if it's in the AE with NN range (typically 6-10, but be flexible)
if study_num >= 6:
ao_study_numbers.append(study_num)
# Remove duplicates and sort
ao_study_numbers = sorted(list(set(ao_study_numbers))) if ao_study_numbers else None
model, mask = train_dnn_model(
data=data,
storage=args.storage,
storage_id=study_storage_id,
model_name=args.model_name, # Use base name, not study-specific
optimization_type=args.model_type,
use_PCA=args.use_pca,
pca_type=args.pca_type,
no_scores=args.no_scores,
only_scores=args.only_scores,
study_name=best_study_name, # Pass best study name for context
use_gpu=args.use_gpu,
verbose=args.verbose,
best_trial=best_trial, # Pass the best trial found across all studies
ao_study_numbers=ao_study_numbers, # Pass AO study numbers to search for best AE
random_seed=args.random_seed
)
print(f"\n{'='*60}")
print(f"TRAINING COMPLETED")
print(f"{'='*60}")
# Save model and mask
# Use custom output directory if provided, otherwise use default
if args.output_dir:
models_dir = os.path.abspath(args.output_dir)
# Create directory if it doesn't exist
if not os.path.isdir(models_dir):
os.makedirs(models_dir, exist_ok=True)
print(f"Created output directory: {models_dir}")
else:
models_dir = ocscoreio.get_models_dir()
print(f"Saving model and mask to: {models_dir}")
# Determine file extension based on model type
# PyTorch models should use .pt, others use .pkl
import torch
if isinstance(model, torch.nn.Module):
model_ext = ".pt"
model_path = os.path.join(models_dir, f"{args.model_name}{model_ext}")
# Use IO module to save PyTorch model
ocscoreio.save_object(model, model_path, serialization_method="torch")
print(f"Saved PyTorch model to: {model_path}")
else:
# XGBoost or other sklearn-style models
model_ext = ".pkl"
model_path = os.path.join(models_dir, f"{args.model_name}{model_ext}")
# Use IO module to save model
ocscoreio.save_object(model, model_path, serialization_method="joblib")
print(f"Saved model (joblib) to: {model_path}")
mask_path = ocscoreio.save_mask(mask, name=args.model_name, models_dir=models_dir)
# Save scaler if normalization was used
scaler_path = None
if data.get('scaler') is not None:
scaler_path = os.path.join(models_dir, f"{args.model_name}_scaler.pkl")
ocscoreio.save_object(data['scaler'], scaler_path, serialization_method="joblib")
print(f"Saved scaler to: {scaler_path}")
pairwise_summary_path = None
if pairwise_summary_df is not None:
pairwise_summary_path = os.path.join(models_dir, f"{args.model_name}_pairwise_major_results.csv")
pairwise_summary_df.to_csv(pairwise_summary_path, index=False)
print(f"Saved pairwise major summary to: {pairwise_summary_path}")
# Save reproducibility/statistics artifact for this training run.
stats_path = os.path.join(models_dir, f"{args.model_name}_run_stats.json")
input_data_info: dict[str, Any] = {"from_db": bool(args.from_db)}
if args.df_path is not None:
input_path = os.path.abspath(args.df_path)
input_data_info["df_path"] = input_path
if os.path.isfile(input_path):
try:
st = os.stat(input_path)
input_data_info["df_size_bytes"] = int(st.st_size)
input_data_info["df_mtime_epoch"] = float(st.st_mtime)
input_data_info["df_sha256"] = _sha256_file(input_path)
except OSError:
pass
selected_trial_info = None
if selected_nn_trial_obj is not None:
trial_value = selected_nn_trial_obj.value
selected_trial_info = {
"number": int(selected_nn_trial_obj.number),
"value": None if trial_value is None else float(trial_value),
"params": _json_sanitize(dict(selected_nn_trial_obj.params)),
"user_attrs": _json_sanitize(dict(selected_nn_trial_obj.user_attrs))
}
pairwise_summary_records = None
if pairwise_summary_df is not None:
pairwise_summary_records = _json_sanitize(
pairwise_summary_df.astype(object).where(pd.notna(pairwise_summary_df), None).to_dict(orient="records")
)
git_info = _git_head_info(_parent_dir)
torch_env = {}
try:
import torch as _torch
torch_env = {
"torch_version": _torch.__version__,
"cuda_available": bool(_torch.cuda.is_available()),
"cuda_device_count": int(_torch.cuda.device_count()) if _torch.cuda.is_available() else 0,
"cudnn_benchmark": bool(getattr(_torch.backends.cudnn, "benchmark", False)),
"cudnn_deterministic": bool(getattr(_torch.backends.cudnn, "deterministic", False))
}
except Exception:
torch_env = {}
run_stats = {
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"script_path": os.path.abspath(__file__),
"command": " ".join(shlex.quote(arg) for arg in sys.argv),
"args": _json_sanitize(vars(args)),
"storage_url": args.storage,
"storage_url_masked": mask_password_in_url(args.storage) if args.storage else None,
"input_data": _json_sanitize(input_data_info),
"data_shapes": {
"X_train": list(data["X_train"].shape),
"X_test": list(data["X_test"].shape),
"X_val": None if data.get("X_val") is None else list(data["X_val"].shape),
"y_train": int(len(data["y_train"])),
"y_test": int(len(data["y_test"])),
"y_val": None if data.get("y_val") is None else int(len(data["y_val"]))
},
"selection_context": _json_sanitize(selection_context),
"selected_nn_trial": _json_sanitize(selected_trial_info),
"selected_ao_params": _json_sanitize(selected_ao_params),
"final_eval": _json_sanitize(final_eval_summary),
"pairwise_summary_records": pairwise_summary_records,
"artifacts": {
"model_path": model_path,
"mask_path": mask_path,
"scaler_path": scaler_path,
"pairwise_summary_path": pairwise_summary_path
},
"mask_info": {
"shape": list(mask.shape) if hasattr(mask, "shape") else None,
"active_features": int(np.sum(mask)) if mask is not None else None
},
"git": git_info,
"environment": {
"python_version": sys.version,
"platform": platform.platform(),
"numpy_version": np.__version__,
"pandas_version": pd.__version__,
**torch_env
}
}
with open(stats_path, "w", encoding="utf-8") as f:
json.dump(_json_sanitize(run_stats), f, indent=2, sort_keys=True)
print(f"Saved run statistics to: {stats_path}")
print(f"\nModel saved to: {model_path}")
print(f"Mask saved to: {mask_path}")
if scaler_path:
print(f"Scaler saved to: {scaler_path}")
if pairwise_summary_path:
print(f"Pairwise summary saved to: {pairwise_summary_path}")
print(f"Run stats saved to: {stats_path}")
print(f"\nTo use this model:")
print(f" import OCDocker.OCScore.Scoring as ocscoring")
print(f" import OCDocker.OCScore.Utils.IO as ocscoreio")
print(f" import pandas as pd")
print(f" ")
print(f" # Load your data (DataFrame, CSV file path, or None for database)")
print(f" # Option 1: From CSV file")
print(f" your_data = pd.read_csv('your_data.csv')")
print(f" # Option 2: From DataFrame (already loaded)")
print(f" # your_data = your_dataframe")
print(f" # Option 3: From database (set data=None)")
print(f" ")
print(f" # Load mask (use models_dir if using custom directory)")
if args.output_dir:
print(f" mask = ocscoreio.load_mask('{args.model_name}', models_dir='{models_dir}')")
else:
print(f" mask = ocscoreio.load_mask('{args.model_name}') # Uses default directory")
print(f" # Or with custom directory:")
print(f" # mask = ocscoreio.load_mask('{args.model_name}', models_dir='/path/to/custom/dir')")
print(f" ")
print(f" # Get predictions")
if scaler_path:
print(f" scores = ocscoring.get_score(")
print(f" model_path='{model_path}',")
print(f" data=your_data, # DataFrame, CSV file path (str), or None for database")
print(f" mask=mask,")
print(f" scaler_path='{scaler_path}' # IMPORTANT: Use the saved scaler")
print(f" )")
else:
print(f" scores = ocscoring.get_score(")
print(f" model_path='{model_path}',")
print(f" data=your_data, # DataFrame, CSV file path (str), or None for database")
print(f" mask=mask")
print(f" )")
def mask_password_in_url(url: str) -> str:
'''Mask password in a database URL for secure printing.
Parameters
----------
url : str
Database URL (e.g., postgresql+psycopg://user:<db_password>@host:port/db)
Returns
-------
str
URL with password masked (e.g., postgresql+psycopg://user:***@host:port/db)
'''
try:
parsed = urlparse(url)
if parsed.password:
# Replace password with ***
masked_netloc = f"{parsed.username}:***@{parsed.hostname}"
if parsed.port:
masked_netloc += f":{parsed.port}"
return urlunparse(parsed._replace(netloc=masked_netloc))
return url
except Exception:
# If parsing fails, just return the URL (might be SQLite path)
return url
def prepare_data_from_db(
session,
storage_id: int,
optimization_type: str = "XGB",
pca_model: str = "",
use_PCA: bool = False,
pca_type: int = 95,
no_scores: bool = False,
only_scores: bool = False,
use_pdb_train: bool = True,
random_seed: int = 42,
invert_conditionally: bool = True,
normalize: bool = True,
scaler: str = "standard",
methodology: Optional[str] = None
) -> dict:
'''
Prepare data from database using the full preprocessing pipeline.
Parameters
----------
session : sqlalchemy.orm.session.Session
Database session
storage_id : int
Storage ID for the study
optimization_type : str
Type of optimization (XGB, NN, etc.)
pca_model : str
Path to PCA model (if using PCA)
use_PCA : bool
Whether to use PCA
pca_type : int
PCA variance percentage
no_scores : bool
Whether to exclude score columns
only_scores : bool
Whether to use only score columns
use_pdb_train : bool
Whether to use PDBbind for training
random_seed : int
Random seed
invert_conditionally : bool
Whether to invert the conditionally
normalize : bool
Whether to normalize the data
scaler : str
The scaler to use
methodology : str
The methodology to use
Returns
-------
dict
Dictionary with preprocessed data (X_train, X_test, y_train, y_test, etc.)
'''
# Load data from database
print(f"Loading data from database{f' (methodology: {methodology})' if methodology else ''}...")
db_data = load_data_from_database(session, methodology=methodology)
# Save to temporary CSV for compatibility with preprocess_df
temp_csv_path = "/tmp/ocscore_db_data.csv"
db_data.to_csv(temp_csv_path, index=False)
print(f"Data loaded: {len(db_data)} rows")
# Use preprocess_df exactly as in existing load_data function (mimics optimize_NN/optimize_XGB pattern)
# Request scaler if normalization is enabled (needed for consistent scaling during prediction)
if normalize:
dudez_data, pdbbind_data, score_columns, fitted_scaler = ocscoredata.preprocess_df(
file_name=temp_csv_path,
score_columns_list=["SMINA", "VINA", "ODDT", "PLANTS"], # Default score columns
outliers_columns_list=None, # No outlier removal by default
scaler=scaler,
invert_conditionally=invert_conditionally,
normalize=normalize,
return_scaler=True # Get the fitted scaler to save it
)
else:
dudez_data, pdbbind_data, score_columns = ocscoredata.preprocess_df(
file_name=temp_csv_path,
score_columns_list=["SMINA", "VINA", "ODDT", "PLANTS"], # Default score columns
outliers_columns_list=None, # No outlier removal by default
scaler=scaler,
invert_conditionally=invert_conditionally,
normalize=normalize
)
fitted_scaler = None
# Handle score columns
if no_scores:
dudez_data = dudez_data.drop(columns=score_columns, errors='ignore')
pdbbind_data = pdbbind_data.drop(columns=score_columns, errors='ignore')
study_name = f"NoScores_{optimization_type}_Optimization"
elif only_scores:
metadata_cols = ["receptor", "ligand", "name", "type", "db"]
columns_to_keep = [col for col in metadata_cols if col in dudez_data.columns] + score_columns
dudez_data = ocscoredata.remove_other_columns(dudez_data, columns_to_keep, inplace=False)
columns_to_keep = [col for col in metadata_cols + ["experimental"] if col in pdbbind_data.columns] + score_columns
pdbbind_data = ocscoredata.remove_other_columns(pdbbind_data, columns_to_keep, inplace=False)
study_name = f"ScoreOnly_{optimization_type}_Optimization"
else:
study_name = f"{optimization_type}_Optimization"
# Apply PCA if needed
if use_PCA and pca_model:
columns_to_skip_pca = ["receptor", "ligand", "name", "type", "db", "experimental"] + score_columns
pdbbind_data = ocscoredata.apply_pca(
pdbbind_data,
pca_model,
columns_to_skip_pca=columns_to_skip_pca,
inplace=False
)
if use_pdb_train:
columns_to_skip_pca = ["receptor", "ligand", "name", "type", "db"] + score_columns
dudez_data = ocscoredata.apply_pca(
dudez_data,
pca_model,
columns_to_skip_pca=columns_to_skip_pca,
inplace=False
)
study_name = f"PCA{pca_type}_{study_name}"
# CRITICAL: Reorder columns to match reference column order from config
# This ensures the column order matches the training data order, which is essential
# for proper mask application and model inference consistency.
# Do this AFTER all transformations (score column handling, PCA, etc.) but BEFORE splitting
print("Reordering columns to match reference column order from config...")
dudez_data = ocscoredata.reorder_columns_to_match_data_order(
dudez_data,
data_source=None, # Uses config.reference_column_order by default
keep_extra_columns=True, # Keep any extra columns (e.g., PCA components) that might exist
fill_missing_columns=False # Don't add missing columns as NaN
)
pdbbind_data = ocscoredata.reorder_columns_to_match_data_order(
pdbbind_data,
data_source=None, # Uses config.reference_column_order by default
keep_extra_columns=True, # Keep any extra columns (e.g., PCA components) that might exist
fill_missing_columns=False # Don't add missing columns as NaN
)
# Split data
if use_pdb_train:
X_train_df = pdbbind_data.drop(
columns=["receptor", "ligand", "name", "type", "db", "experimental"],
errors="ignore"
)
X_train, X_test, y_train, y_test = ocscoredata.split_dataset(
X_train_df,
pdbbind_data["experimental"],
test_size=0.25,
random_state=random_seed
)
X_val_df = dudez_data.drop(
columns=["receptor", "ligand", "name", "type", "db", "experimental"],
errors="ignore"
)
X_val = X_val_df
y_val = dudez_data["type"].map({"ligand": 1, "decoy": 0})
# Get feature column names from DataFrame before converting to numpy
feature_columns = X_train_df.columns.tolist()
else:
X_train_df = dudez_data.drop(
columns=["receptor", "ligand", "name", "type", "db", "experimental"],
errors="ignore"
)
X_train = X_train_df
y_train = dudez_data["experimental"]
X_test_df = dudez_data.drop(
columns=["receptor", "ligand", "name", "type", "db", "experimental"],
errors="ignore"
)
X_test = X_test_df
y_test = dudez_data["type"].map({"ligand": 1, "decoy": 0})
X_val = None
y_val = None
# Get feature column names from DataFrame before converting to numpy
feature_columns = X_train_df.columns.tolist()
# Convert to numpy arrays for compatibility with existing code
X_train = X_train.values if isinstance(X_train, pd.DataFrame) else X_train
X_test = X_test.values if isinstance(X_test, pd.DataFrame) else X_test
X_val = X_val.values if isinstance(X_val, pd.DataFrame) else X_val if X_val is not None else None
# Add storage_id to study name to match the pattern: {prefix}_Optimization_{storage_id}
study_name_with_id = f"{study_name}_{storage_id}"
data = {
"study_name": study_name_with_id,
"X_train": X_train,
"X_test": X_test,
"y_train": y_train,
"y_test": y_test,
"X_val": X_val,
"y_val": y_val,
"feature_columns": feature_columns, # Add feature column names for mask application
"scaler": fitted_scaler # Add fitted scaler (None if normalization is disabled)
}
# Clean up temporary file if we created it
if temp_csv_path and os.path.exists(temp_csv_path) and temp_csv_path.startswith('/tmp/'):
try:
os.remove(temp_csv_path)
except:
pass
return data
def train_dnn_model(
data: dict,
storage: str,
storage_id: int,
model_name: str,
optimization_type: str = "NN",
use_PCA: bool = False,
pca_type: int = 95,
no_scores: bool = False,
only_scores: bool = False,
study_name: Optional[str] = None,
mask: Optional[np.ndarray] = None,
use_gpu: bool = True,
verbose: bool = True,
best_trial: Optional[Any] = None, # Optional: if provided, use this trial instead of loading from study
ao_study_numbers: Optional[List[int]] = None, # Optional: list of AO study numbers to search (e.g., [6, 7, 8, 9, 10])
encoder_params_override: Optional[dict[str, Any]] = None,
random_seed: int = 42
) -> tuple:
'''
Train a DNN model.
Parameters
----------
data : dict
The data dictionary.
storage : str
The storage to use.
storage_id : int
The storage ID to use.
model_name : str
The name of the model.
optimization_type : str
The optimization type.
use_PCA : bool
If True, use PCA.
pca_type : int
The PCA type to use.
no_scores : bool
If True, don't use scores.
only_scores : bool
If True, only use scores.
study_name : str
The name of the study.
mask : np.ndarray
The mask to use.
use_gpu : bool
If True, use the GPU.
verbose : bool
If True, print the output.
encoder_params_override : dict[str, Any], optional
If provided, use these AO parameters directly instead of searching AO studies.
random_seed : int, optional
Random seed used for final DNN retraining.
Returns
-------
tuple
(model, mask) - trained model and feature mask
'''
print("Training DNN model...")
# Extract study number from study name
import re
match = re.search(r'_(\d+)$', study_name)
study_num = int(match.group(1)) if match else None
# Fetch mask from ablation study and autoencoder params for AE with NN studies (6-10)
encoder_params = encoder_params_override
if mask is None:
# Check if this is an AE with NN study (studies 6-10)
if study_num and 6 <= study_num <= 10:
import optuna
if encoder_params is None:
print(f"Fetching best autoencoder params across ALL AO studies...")
# Find the best AE across ALL AO studies (not just the one matching the NN study number)
# This ensures we use the globally best autoencoder, not just the one from the matching study
# Use provided ao_study_numbers or default to [6, 7, 8, 9, 10] for AE with NN studies
if ao_study_numbers is None:
ao_study_numbers = [6, 7, 8, 9, 10] # Default: All AE with NN study numbers
all_ao_trials = []
for ao_num in ao_study_numbers:
ao_study_name = f"AO_Optimization_{ao_num}"
try:
ao_study = optuna.load_study(study_name=ao_study_name, storage=storage)
ao_df = ao_study.trials_dataframe()
ao_df = ao_df[ao_df['state'] == 'COMPLETE']
if len(ao_df) > 0:
# Sort by value (RMSE) and validation RMSE
ao_df = ao_df.sort_values(by=['value', 'user_attrs_val_rmse'], ascending=[True, True])
best_ao_trial = ao_study.trials[ao_df.iloc[0].number]
all_ao_trials.append({
'study_name': ao_study_name,
'study_number': ao_num,
'trial': best_ao_trial,
'value': best_ao_trial.value,
'val_rmse': best_ao_trial.user_attrs.get('val_rmse', float('inf'))
})
if verbose:
print(f" Found best trial in {ao_study_name}: RMSE={best_ao_trial.value:.4f}, Val_RMSE={best_ao_trial.user_attrs.get('val_rmse', 'N/A')}")
except Exception as e:
if verbose:
print(f" Warning: Could not load autoencoder study {ao_study_name}: {e}")
# Find the best AE trial across all studies
if len(all_ao_trials) > 0:
# Sort by value (RMSE) first, then by validation RMSE
best_ao_info = min(all_ao_trials, key=lambda x: (x['value'], x['val_rmse']))
encoder_params = best_ao_info['trial'].params
if verbose:
print(f"\nSelected best autoencoder from: {best_ao_info['study_name']}")
print(f" Trial number: {best_ao_info['trial'].number}")
print(f" RMSE: {best_ao_info['value']:.4f}")
print(f" Val RMSE: {best_ao_info['val_rmse']:.4f}")
print(f" (Selected from {len(all_ao_trials)} AO studies)")
else:
if verbose:
print(f"Warning: No autoencoder studies found. Training without autoencoder.")
elif verbose:
print("Using provided autoencoder params (encoder_params_override).")
# Load ablation study to get mask
ablation_study_name = "NN_Ablation_Optimization_1"
try:
ablation_study = optuna.load_study(study_name=ablation_study_name, storage=storage)
ablation_df = ablation_study.trials_dataframe()
# Filter to only complete trials
ablation_df = ablation_df[ablation_df['state'] == 'COMPLETE']
ablation_df = ablation_df.reset_index(drop=True)
# Rename columns for clarity
ablation_df = ablation_df.rename(columns={
'value': 'RMSE',
'user_attrs_Feature_Mask': 'Feature_Mask',
'user_attrs_AUC': 'AUC'
})
# Compute score (RMSE - AUC) and get best
ablation_df['score'] = ablation_df['RMSE'] - ablation_df['AUC']
best_ablation_df = ablation_df.sort_values(by=['score'], ascending=[True])
# Get mask from best ablation trial
mask_str = best_ablation_df.iloc[0]['Feature_Mask']
# Convert mask string to numpy array of 0s and 1s
mask_from_study = np.array([int(x) for x in mask_str])
# The ablation study creates masks for SFs (scoring functions) only
# The mask stored is a full-length mask where only SF positions are modified
# We need to identify SF columns in the current data and extract/apply the mask correctly
# Get feature column names (if available)
feature_columns = data.get('feature_columns')
# Create full mask (all 1s)
mask = np.ones(data['X_train'].shape[1], dtype=int)
if len(mask_from_study) == 16:
# This is an SF-only mask (16 values for 16 SFs)
# We need to find SF column indices in current data and apply mask there
if feature_columns is not None:
# Identify SF columns (VINA*, SMINA*, ODDT*, PLANTS*)
import re
sf_pattern = re.compile(r'^(VINA|SMINA|ODDT|PLANTS)', re.IGNORECASE)
sf_indices = [i for i, col in enumerate(feature_columns) if sf_pattern.match(str(col))]
if len(sf_indices) == 16:
# Apply the 16-element mask to SF positions
for idx, mask_val in zip(sf_indices, mask_from_study):
mask[idx] = mask_val
print(f"Applied 16-element SF mask to {len(sf_indices)} SF features.")
else:
print(f"Warning: Found {len(sf_indices)} SF features, but mask has 16 elements.")
print(f" SF features found: {sf_indices[:10]}..." if len(sf_indices) > 10 else f" SF features: {sf_indices}")
print(f" Using default mask (all 1s).")
else:
print(f"Warning: Cannot identify SF positions - feature column names not available.")
print(f" Using default mask (all 1s).")
elif len(mask_from_study) == data['X_train'].shape[1]:
# This is a full-length mask that matches current data shape
# Extract only SF positions if we have feature names
if feature_columns is not None:
import re
sf_pattern = re.compile(r'^(VINA|SMINA|ODDT|PLANTS)', re.IGNORECASE)
sf_indices = [i for i, col in enumerate(feature_columns) if sf_pattern.match(str(col))]
if len(sf_indices) == 16:
# Create mask: all 1s, but apply SF mask values to SF positions
for idx, sf_idx in enumerate(sf_indices):
mask[sf_idx] = mask_from_study[sf_idx]
print(f"Extracted SF mask from full mask and applied to {len(sf_indices)} SF features.")
else:
# Use full mask as-is
mask = mask_from_study
print(f"Using full-length mask from ablation study ({len(mask_from_study)} features).")
else:
# Use full mask as-is
mask = mask_from_study
print(f"Using full-length mask from ablation study ({len(mask_from_study)} features).")
else:
# Mask size doesn't match - ablation study was run on different data
print(f"Warning: Mask from ablation study has {len(mask_from_study)} elements,")
print(f" but current data has {data['X_train'].shape[1]} features.")
print(f" The ablation study was likely run on different data.")
print(f" Using default mask (all 1s).")
if verbose:
print(f"Loaded mask from ablation study: {ablation_study_name}")
print(f"Original mask shape: {mask_from_study.shape}")
print(f"Applied mask shape: {mask.shape}, sum: {mask.sum()}")
print(f"Mask (first 50 values): {mask[:50] if len(mask) > 50 else mask}")
except Exception as e:
print(f"Warning: Could not load mask from ablation study: {e}")
print("Using default mask (all 1s)")
mask = np.ones(data['X_train'].shape[1], dtype=int)
else:
# For other studies, use default mask (all 1s)
mask = np.ones(data['X_train'].shape[1], dtype=int)
# Print mask information
print(f"\n{'='*60}")
print(f"Using mask:")
print(f" Shape: {mask.shape}")
print(f" Sum (active features): {mask.sum()}/{len(mask)}")
print(f" Source: {'Ablation study' if study_num and 6 <= study_num <= 10 and mask.sum() < len(mask) else 'Default (all 1s)'}")
# Identify which scoring functions are kept/removed
feature_columns = data.get('feature_columns')
sf_indices = []
if feature_columns is not None:
import re
sf_pattern = re.compile(r'^(VINA|SMINA|ODDT|PLANTS)', re.IGNORECASE)
sf_indices = [i for i, col in enumerate(feature_columns) if sf_pattern.match(str(col))]
# Print mask preview for SF values only (n = number of SFs)
if len(sf_indices) > 0:
sf_mask_values = [mask[idx] for idx in sf_indices]
print(f" SF mask values ({len(sf_indices)} SFs): {sf_mask_values}")
else:
# Fallback: print first n values where n is a reasonable default
n = min(16, len(mask))
print(f" Mask preview (first {n}): {mask[:n]}")
if feature_columns is not None and len(sf_indices) > 0:
print(f"\n Scoring Functions (SFs) status:")
kept_sfs = []
removed_sfs = []
for sf_idx in sf_indices:
sf_name = feature_columns[sf_idx]
if mask[sf_idx] == 1:
kept_sfs.append(sf_name)
else:
removed_sfs.append(sf_name)
print(f" Kept ({len(kept_sfs)}/{len(sf_indices)}): {', '.join(kept_sfs) if kept_sfs else 'None'}")
if removed_sfs:
print(f" Removed ({len(removed_sfs)}/{len(sf_indices)}): {', '.join(removed_sfs)}")
else:
print(f" Removed (0/{len(sf_indices)}): None")
print(f"{'='*60}\n")
# Create optimizer
optimizer = DNNOptimizer(
X_train=data['X_train'],
y_train=data['y_train'],
X_test=data['X_test'],
y_test=data['y_test'],
X_validation=data.get('X_val'),
y_validation=data.get('y_val'),
mask=mask,
storage=storage,
output_size=1,
random_seed=random_seed,
use_gpu=use_gpu,
verbose=verbose
)
# Use provided study_name (must be provided - we're using existing studies)
if study_name is None:
raise ValueError(f"study_name must be provided when using existing studies")
# Use provided best_trial or load from study
import optuna
if best_trial is None:
# Load existing study and find best trial
study = optuna.load_study(study_name=study_name, storage=storage)
if verbose:
print(f"Loaded study: {study_name}")
# Get best trial using combined metric (RMSE - AUC) as in your scripts
nn_df = study.trials_dataframe()
nn_df = nn_df[nn_df['state'] == 'COMPLETE']
nn_df['combined_metric'] = nn_df['value'] - nn_df['user_attrs_AUC']
best_nn_df = nn_df.sort_values(by=['combined_metric'], ascending=[True])
best_trial = study.trials[best_nn_df.iloc[0].number]
if verbose:
print(f"Best trial: {best_trial.number}, RMSE: {best_trial.value:.4f}, AUC: {best_trial.user_attrs.get('AUC', 'N/A')}, Combined: {best_trial.value - best_trial.user_attrs.get('AUC', 0):.4f}")
else:
# Use provided best_trial
if verbose:
print(f"Using provided best trial: {best_trial.number}, RMSE: {best_trial.value:.4f}, AUC: {best_trial.user_attrs.get('AUC', 'N/A')}, Combined: {best_trial.value - best_trial.user_attrs.get('AUC', 0):.4f}")
# Build and train final model
from OCDocker.OCScore.DNN.DNNOptimizer import NeuralNet
neural = NeuralNet(
data['X_train'].shape[1],
1,
encoder_params=encoder_params, # Use autoencoder params if available (for AE with NN)
nn_params=best_trial.params,
random_seed=random_seed,
use_gpu=use_gpu,
verbose=verbose,
mask=mask
)
# Train the model using the best hyperparameters
print(f"\n{'='*60}")
print(f"TRAINING MODEL")
print(f"{'='*60}")
print(f"Training model with best hyperparameters from trial {best_trial.number}...")
print(f" Epochs: {best_trial.params.get('epochs', 'N/A')}")
print(f" Batch size: {best_trial.params.get('batch_size', 'N/A')}")
print(f" Learning rate: {best_trial.params.get('lr', 'N/A')}")
print(f" Optimizer: {best_trial.params.get('optimizer', 'N/A')}")
print(f" Random seed: {random_seed}")
print(f" Using GPU: {use_gpu}")
print(f" Training samples: {data['X_train'].shape[0]}")
print(f" Test samples: {data['X_test'].shape[0]}")
if data.get('X_val') is not None:
print(f" Validation samples: {data['X_val'].shape[0]}")
print(f"{'='*60}\n")
# Train the model - this will run the full training loop
neural.train_model(
X_train=data['X_train'],
y_train=data['y_train'],
X_test=data['X_test'],
y_test=data['y_test'],
X_validation=data.get('X_val'),
y_validation=data.get('y_val')
)
# Get the trained model (now with trained weights!)
model = neural.NN
print(f"\n{'='*60}")
print(f"TRAINING COMPLETED")
print(f"{'='*60}")
print(f" Model trained using best hyperparameters from Optuna trial {best_trial.number}")
print(f" Best trial metrics (from Optuna optimization):")
print(f" RMSE: {best_trial.value:.4f}")
auc_value = best_trial.user_attrs.get('AUC', None)
if auc_value is not None:
print(f" AUC: {auc_value:.4f}")
combined_metric = best_trial.value - (auc_value if auc_value is not None else 0)
print(f" Combined metric (RMSE - AUC): {combined_metric:.4f}")
print(f"{'='*60}\n")
return model, mask
def train_xgboost_model(
data: dict,
storage: str,
storage_id: int,
model_name: str,
optimization_type: str = "XGB",
use_PCA: bool = False,
pca_type: int = 95,
no_scores: bool = False,
only_scores: bool = False,
study_name: Optional[str] = None,
use_gpu: bool = False,
verbose: bool = True
) -> tuple:
'''
Train an XGBoost model.
Parameters
----------
data : dict
The data dictionary.
storage : str
The storage to use.
storage_id : int
The storage ID to use.
model_name : str
The name of the model.
optimization_type : str
The optimization type.
use_PCA : bool
If True, use PCA.
pca_type : int
The PCA type to use.
no_scores : bool
If True, don't use scores.
only_scores : bool
If True, only use scores.
study_name : str
The name of the study.
use_gpu : bool
If True, use the GPU.
verbose : bool
If True, print the output.
Returns
-------
tuple
(model, mask) - trained model and feature mask
'''
print("Training XGBoost model...")
# Import XGBoost optimizer
from OCDocker.OCScore.XGBoost.XGBoostOptimizer import XGBoostOptimizer
# Create optimizer
optimizer = XGBoostOptimizer(
X_train=data['X_train'],
y_train=data['y_train'],
X_test=data['X_test'],
y_test=data['y_test'],
X_validation=data.get('X_val'),
y_validation=data.get('y_val'),
storage=storage,
use_gpu=use_gpu,
verbose=verbose
)
# Use provided study_name or build from methodology parameters
import optuna
if study_name is None:
if use_PCA:
study_name = f"PCA{pca_type}_{optimization_type}_Optimization_{storage_id}"
elif only_scores:
study_name = f"ScoreOnly_{optimization_type}_Optimization_{storage_id}"
elif no_scores:
study_name = f"NoScores_{optimization_type}_Optimization_{storage_id}"
else:
study_name = f"{optimization_type}_Optimization_{storage_id}"
# Load existing study (must exist - we're using best trial only)
study = optuna.load_study(study_name=study_name, storage=storage)
if verbose:
print(f"Loaded study: {study_name}")
# Get best trial from existing study (no new optimization)
best_trial = study.best_trial
if verbose:
print(f"Best trial: {best_trial.number}, RMSE: {best_trial.value:.4f}")
best_params = best_trial.params
# Train final model with best parameters
import OCDocker.OCScore.XGBoost.OCxgboost as OCxgboost
model, _ = OCxgboost.run_xgboost(
data['X_train'],
data['y_train'],
data['X_test'],
data['y_test'],
params=best_params,
verbose=verbose
)
# Create mask (all 1s for XGBoost, as it handles feature selection internally)
mask = np.ones(data['X_train'].shape[1], dtype=int)
print(f"XGBoost training completed. Best RMSE: {best_trial.value:.4f}")
return model, mask
if __name__ == "__main__":
main()
This script demonstrates:
Loading data from the OCDocker database
Finding best hyperparameters across multiple Optuna studies
Training DNN or XGBoost models
Saving trained models and masks
Using the full preprocessing pipeline
Complete OCScore Pipeline¶
Complete end-to-end pipeline to obtain OCScore results from scratch:
#!/usr/bin/env python3
"""
Example: Complete OCScore Pipeline
This example demonstrates the complete pipeline to obtain OCScore results from scratch:
1. Receptor and ligand preparation
2. Docking with multiple engines (Vina, PLANTS)
3. Pose clustering to find representative poses
4. Rescoring with multiple scoring functions (ODDT, PLANTS, Vina, SMINA)
5. Feature extraction (receptor and ligand descriptors)
6. Model inference using trained OCScore model
The rescoring results are automatically mapped to database column names for consistency.
Usage:
# Update paths in the script to match your system
python examples/12_python_api_complete_ocscore_pipeline.py
"""
###############################################################################
# USER CONFIGURATION - Update these variables to match your system
###############################################################################
# OCDocker configuration file path
# Set this to the absolute path of your OCDocker.cfg file
# If None, will use OCDOCKER_CONFIG environment variable or search for OCDocker.cfg
OCDOCKER_CONFIG_FILE = "/path/to/OCDocker.cfg" # Update this path
# Receptor configuration
RECEPTOR_PATH = "/path/to/receptor_directory/receptor.pdb"
RECEPTOR_NAME = "Receptor"
PREPARED_RECEPTOR_PDBQT = "/path/to/receptor_directory/prepared_receptor.pdbqt"
PREPARED_RECEPTOR_MOL2 = "/path/to/receptor_directory/prepared_receptor.mol2"
BOX_CENTER = (0.0, 0.0, 0.0) # (x, y, z) coordinates of the box center
# Ligand configuration - List of ligand paths (base directories)
# Each path should contain: {ligand_name}.smi, boxes/box0.pdb, and subdirectories for outputs
# Ligand names are automatically extracted from the last folder name in each path
LIGAND_PATHS = [
]
# Add more ligand paths here (you can use glob to get all ligand folders):
# "/path/to/ligand2",
# "/path/to/ligand3",
# Model configuration
MODEL_NAME = "OCScore" # Name of your trained model (without extension)
MODELS_DIR = "OCScore_models" # Directory containing models
PCA_MODEL_PATH = None # Path to PCA model if used, e.g., f"{MODELS_DIR}/{MODEL_NAME}_pca.pkl"
# Preprocessing configuration (should match training settings)
SCALER = "standard" # "standard" or "minmax"
INVERT_CONDITIONALLY = True
NORMALIZE = True
USE_MASK = True
SCORE_COLUMNS_LIST = ["SMINA", "VINA", "ODDT", "PLANTS"]
###############################################################################
# !!! CRITICAL WARNING: SCORING FUNCTION COLUMN ORDER !!!
###############################################################################
#
# THE FOLLOWING ORDER MUST BE STRICTLY RESPECTED WHEN APPLYING MASKS:
#
# 1. SMINA_VINA
# 2. SMINA_SCORING_DKOES
# 3. SMINA_VINARDO
# 4. SMINA_OLD_SCORING_DKOES
# 5. SMINA_FAST_DKOES
# 6. SMINA_SCORING_AD4
# 7. VINA_VINA
# 8. VINA_VINARDO
# 9. PLANTS_CHEMPLP
# 10. PLANTS_PLP
# 11. PLANTS_PLP95
# 12. ODDT_RFSCORE_V1
# 13. ODDT_RFSCORE_V2
# 14. ODDT_RFSCORE_V3
# 15. ODDT_PLECRF_P5_L1_S65536
# 16. ODDT_NNSCORE
#
# !!! WARNING: If you change the order of scoring function columns in the output,
# the mask will be applied incorrectly, leading to wrong predictions!
#
# The mask is a 16-element array where each position corresponds to one of the
# scoring functions above in the exact order listed. Position 0 = SMINA_VINA,
# position 1 = SMINA_SCORING_DKOES, etc.
#
# DO NOT MODIFY THE ORDER OF SCORING FUNCTION COLUMNS WITHOUT UPDATING THE MASK!
#
###############################################################################
# GPU configuration
USE_GPU = True # Set to False to force CPU usage (useful if CUDA is not available or to avoid GPU memory issues)
# Output configuration
OUTPUT_FILE = "ocscore_results.csv" # CSV file to save results (None to skip saving)
SAVE_TO_FILE = True # Set to False to only store results in memory
# Multiprocessing configuration
N_JOBS = 4 # Number of parallel jobs (cores) to use. Set to -1 for all available cores
USE_MULTIPROCESSING = True # Set to False to process ligands sequentially
# Checkpoint/resume configuration
# When enabled, each ligand writes a per-ligand checkpoint and can resume from the
# last completed stage. This is backward-compatible with runs done before this code:
# if no checkpoint file exists, completion is inferred from existing output artifacts.
ENABLE_LIGAND_CHECKPOINT = True
LIGAND_CHECKPOINT_FILE = ".ocscore_pipeline_checkpoint.json"
LIGAND_FEATURES_CACHE_FILE = ".ocscore_pipeline_features.json"
###############################################################################
# END USER CONFIGURATION
###############################################################################
import argparse
import json
import os
import time
import warnings
import numpy as np
import pandas as pd
from glob import glob
from typing import Any, Dict, Optional, Tuple
# Configure sklearn/joblib to use threading backend for parallel execution
# This allows sklearn models to use multiple threads while main process uses multiprocessing
# The threading backend avoids the "Loky-backed parallel loops cannot be called in multiprocessing" issue
warnings.filterwarnings('ignore', message='.*Loky-backed parallel loops cannot be called in a multiprocessing.*')
# Note: We keep the default multiprocessing start method ('fork' on Linux)
# which is faster and works well with proper tmp directory isolation
try:
import joblib
from joblib import parallel_backend, Parallel, delayed
# Set default backend to threading so sklearn can parallelize within multiprocessing workers
# Threading backend works inside multiprocessing contexts (unlike Loky)
joblib.parallel.DEFAULT_BACKEND = 'threading'
JOBLIB_AVAILABLE = True
except (ImportError, AttributeError):
# If joblib not available, try setting environment variable
os.environ['JOBLIB_BACKEND'] = 'threading'
JOBLIB_AVAILABLE = False
USE_MULTIPROCESSING = False
print("Warning: joblib not available. Multiprocessing disabled.")
# Explicitly bootstrap OCDocker with the specified config file BEFORE other imports
# This ensures the config is loaded correctly regardless of working directory
# Set OCDOCKER_NO_AUTO_BOOTSTRAP to prevent auto-bootstrap from running first
os.environ['OCDOCKER_NO_AUTO_BOOTSTRAP'] = '1'
if OCDOCKER_CONFIG_FILE and os.path.isfile(OCDOCKER_CONFIG_FILE):
import OCDocker.Error as ocerror
import OCDocker.Initialise as init
print(f"Loading OCDocker configuration from: {OCDOCKER_CONFIG_FILE}")
bootstrap_ns = argparse.Namespace(
multiprocess=USE_MULTIPROCESSING,
update=False,
config_file=OCDOCKER_CONFIG_FILE,
output_level=ocerror.ReportLevel.WARNING,
overwrite=False
)
init.bootstrap(bootstrap_ns)
print("OCDocker configuration loaded successfully.\n")
else:
# Fall back to auto-bootstrap if config file not specified or not found
if OCDOCKER_CONFIG_FILE:
print(f"Warning: Config file not found at {OCDOCKER_CONFIG_FILE}, using auto-bootstrap...")
# Re-enable auto-bootstrap
os.environ.pop('OCDOCKER_NO_AUTO_BOOTSTRAP', None)
# Now import other OCDocker modules (they won't trigger auto-bootstrap since we already bootstrapped)
import OCDocker.Docking.PLANTS as ocplants
import OCDocker.Docking.Smina as ocsmina
import OCDocker.Docking.Vina as ocvina
import OCDocker.Ligand as ocl
import OCDocker.OCScore.Scoring as ocscoring
import OCDocker.OCScore.Utils.Data as ocscoredata
import OCDocker.OCScore.Utils.IO as ocscoreio
import OCDocker.Processing.Preprocessing.RmsdClustering as ocrmsdclust
import OCDocker.Receptor as ocr
import OCDocker.Rescoring.ODDT as ocoddt
import OCDocker.Toolbox.Conversion as occonversion
import OCDocker.Toolbox.MoleculeProcessing as ocmolproc
import OCDocker.Toolbox.Security as ocsec
# Allow unsafe deserialization
ocsec.allow_unsafe_runtime(deserialization=True, script_exec=False)
def main():
'''Main function to process all ligands.'''
# OCDocker auto-bootstraps on import, so configuration is already loaded
# If you need to verify bootstrap or use custom settings, you can:
# 1. Set OCDOCKER_NO_AUTO_BOOTSTRAP=1 environment variable
# 2. Import OCDocker.Initialise and call bootstrap() explicitly
# Automatically derive ligand names from paths (last folder name)
ligand_names = [os.path.basename(os.path.normpath(path)) for path in LIGAND_PATHS]
# Create receptor object
receptor = ocr.Receptor(RECEPTOR_PATH, name=RECEPTOR_NAME)
# Prepare list of (ligand_path, ligand_name) tuples
ligand_tasks = list(zip(LIGAND_PATHS, ligand_names))
print(f"\n{'='*60}")
print(f"OCSCORE PIPELINE")
print(f"{'='*60}")
print(f"Receptor: {RECEPTOR_NAME}")
print(f"Number of ligands: {len(ligand_tasks)}")
print(f"Use mask: {USE_MASK}")
print(f"Multiprocessing: {USE_MULTIPROCESSING}")
if USE_MULTIPROCESSING:
print(f"Number of jobs: {N_JOBS}")
print(f"{'='*60}\n")
# Process ligands
if USE_MULTIPROCESSING and JOBLIB_AVAILABLE and len(ligand_tasks) > 1:
# Use joblib for parallel processing
print(f"Processing {len(ligand_tasks)} ligands in parallel using {N_JOBS} cores...")
results = Parallel(n_jobs=N_JOBS)(
delayed(process_single_ligand)(ligand_path, ligand_name, receptor)
for ligand_path, ligand_name in ligand_tasks
)
else:
# Process sequentially
print(f"Processing {len(ligand_tasks)} ligands sequentially...")
results = []
for ligand_path, ligand_name in ligand_tasks:
print(f"Processing ligand: {ligand_name}")
result = process_single_ligand(ligand_path, ligand_name, receptor)
results.append(result)
# Filter out None results (failed processing)
results = [r for r in results if r is not None]
if not results:
print("No ligands were successfully processed.")
return
# Batch all ligands together for model inference
# This ensures proper normalization (scaler fit on all data, not single rows)
print(f"\n{'='*60}")
print(f"BATCH MODEL INFERENCE")
print(f"{'='*60}")
# Convert all results to a single DataFrame
if results:
# Get all unique keys from all dictionaries
all_keys = set()
for result in results:
if result is not None:
all_keys.update(result.keys())
# Ensure all dictionaries have all keys (fill missing with None)
normalized_results = []
for result in results:
if result is not None:
normalized_result = {key: result.get(key, None) for key in all_keys}
normalized_results.append(normalized_result)
# Create DataFrame from normalized dictionaries
feature_df = pd.DataFrame(normalized_results)
else:
feature_df = pd.DataFrame()
if feature_df.empty:
print("No features to process for model inference.")
return
# Path to your trained model
model_path = f"{MODELS_DIR}/{MODEL_NAME}.pt"
mask_path = f"{MODELS_DIR}/{MODEL_NAME}_mask.pkl"
scaler_path = f"{MODELS_DIR}/{MODEL_NAME}_scaler.pkl" # Path to saved scaler
# Load the mask if it exists
mask = None
if os.path.isfile(mask_path) and USE_MASK:
try:
mask = ocscoreio.load_mask(MODEL_NAME, models_dir=MODELS_DIR)
except Exception as e:
print(f"Warning: Could not load mask: {e}")
mask = None
# Check if scaler exists (required for proper normalization)
if NORMALIZE and not os.path.isfile(scaler_path):
print(f"WARNING: Scaler file not found at {scaler_path}")
print(" This means normalization will use a NEW scaler fitted on prediction data,")
print(" which is INCORRECT. The scaler should be saved during training.")
print(" Predictions may be inaccurate!")
scaler_path = None # Will create new scaler (incorrect but won't crash)
elif NORMALIZE:
print(f"Using saved scaler from: {scaler_path}")
# Get OCScore predictions for all ligands at once
try:
print(f"Running model inference on {len(feature_df)} ligands...")
print(f"Feature DataFrame shape: {feature_df.shape}")
ocscore_predictions = ocscoring.get_score(
model_path=model_path,
data=feature_df,
pca_model=PCA_MODEL_PATH,
mask=mask,
score_columns_list=SCORE_COLUMNS_LIST,
scaler=SCALER,
scaler_path=scaler_path if NORMALIZE else None, # Use saved scaler if normalization is enabled
invert_conditionally=INVERT_CONDITIONALLY,
normalize=NORMALIZE,
serialization_method="auto", # Auto-detect model format
use_gpu=USE_GPU # Use GPU if available and USE_GPU=True
)
if isinstance(ocscore_predictions, pd.DataFrame):
print(f"Prediction DataFrame shape: {ocscore_predictions.shape}")
print(f"Prediction DataFrame columns: {list(ocscore_predictions.columns)}")
if 'predicted_score' in ocscore_predictions.columns:
print(f"All predicted_score values: {ocscore_predictions['predicted_score'].tolist()}")
print(f"Unique predicted_score values: {ocscore_predictions['predicted_score'].nunique()}")
elif isinstance(ocscore_predictions, (pd.Series, np.ndarray)):
predictions_array = np.asarray(ocscore_predictions)
print(f"Prediction array shape: {predictions_array.shape}")
print(f"All prediction values: {predictions_array.tolist()}")
print(f"Unique prediction values: {len(np.unique(predictions_array))}")
# Add OCScore predictions to results
if isinstance(ocscore_predictions, pd.DataFrame):
if 'predicted_score' in ocscore_predictions.columns:
# Map predictions back to results by index
for idx, result in enumerate(results):
if result is not None and idx < len(ocscore_predictions):
result['OCSCORE'] = ocscore_predictions['predicted_score'].iloc[idx]
print(f" Mapped prediction {idx} to ligand {result.get('ligand', 'unknown')}: {result['OCSCORE']}")
elif len(ocscore_predictions.columns) == 1:
# Single prediction column
for idx, result in enumerate(results):
if result is not None and idx < len(ocscore_predictions):
result['OCSCORE'] = ocscore_predictions.iloc[idx, 0]
else:
# Try to find a numeric column
numeric_cols = ocscore_predictions.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
for idx, result in enumerate(results):
if result is not None and idx < len(ocscore_predictions):
result['OCSCORE'] = ocscore_predictions[numeric_cols[0]].iloc[idx]
elif isinstance(ocscore_predictions, (pd.Series, np.ndarray)):
# Array/Series of predictions
predictions_array = np.asarray(ocscore_predictions)
for idx, result in enumerate(results):
if result is not None and idx < len(predictions_array):
result['OCSCORE'] = float(predictions_array[idx])
print(f"Model inference completed for {len(results)} ligands.")
except FileNotFoundError as e:
print(f"Warning: Model file not found: {e}")
for result in results:
if result is not None:
result['OCSCORE'] = None
except Exception as e:
print(f"Error during model inference: {e}")
import traceback
traceback.print_exc()
for result in results:
if result is not None:
result['OCSCORE'] = None
# Convert results to DataFrame
# Use orient='index' and transpose to preserve order, then convert properly
# First, ensure all dictionaries have the same keys (fill missing with None)
if results:
# Get all unique keys from all dictionaries
all_keys = set()
for result in results:
if result is not None:
all_keys.update(result.keys())
# Ensure all dictionaries have all keys (fill missing with None)
normalized_results = []
for result in results:
if result is not None:
normalized_result = {key: result.get(key, None) for key in all_keys}
normalized_results.append(normalized_result)
else:
normalized_results.append({key: None for key in all_keys})
# Create DataFrame from normalized dictionaries
results_df = pd.DataFrame(normalized_results)
else:
results_df = pd.DataFrame()
# Reorder columns to match the data source order (from training data file)
# !!! CRITICAL: This ensures all columns (especially SFs) are in the exact same order
# as the training data, which is essential for proper mask application and model inference!
if not results_df.empty:
# Get the column order from config (no file path needed)
# Uses reference_column_order from OCDocker.cfg
source_order = ocscoredata.get_column_order() # Uses config by default
# Use the reorder function to match the config column order
# This handles OCSCORE insertion and extra columns automatically
results_df = ocscoredata.reorder_columns_to_match_data_order(
df=results_df,
data_source=None, # Uses config.reference_column_order by default
keep_extra_columns=True, # Keep OCSCORE and any other extra columns
fill_missing_columns=False # Don't add missing columns as NaN
)
# Manually insert OCSCORE right after 'ligand' if it exists
if 'OCSCORE' in results_df.columns:
cols = list(results_df.columns)
if 'ligand' in cols:
# Remove OCSCORE from current position
cols.remove('OCSCORE')
# Insert after 'ligand'
ligand_idx = cols.index('ligand')
cols.insert(ligand_idx + 1, 'OCSCORE')
results_df = results_df[cols]
# Print summary
print(f"\n{'='*60}")
print(f"PROCESSING COMPLETE")
print(f"{'='*60}")
print(f"Successfully processed: {len(results)}/{len(ligand_tasks)} ligands")
if 'OCSCORE' in results_df.columns:
print(f"OCScore predictions: {results_df['OCSCORE'].notna().sum()}/{len(results_df)}")
if results_df['OCSCORE'].notna().any():
print(f"OCScore range: {results_df['OCSCORE'].min():.4f} - {results_df['OCSCORE'].max():.4f}")
print(f"{'='*60}\n")
# Save to file if requested
if SAVE_TO_FILE and OUTPUT_FILE:
output_path = OUTPUT_FILE
results_df.to_csv(output_path, index=False)
print(f"Results saved to: {output_path}")
return results_df
# Mapping function to convert rescoring keys to database column names
def map_rescoring_key_to_db_column(key: str, engine: Optional[str] = None) -> str:
'''Map rescoring result keys to database column names.
Parameters
----------
key : str
The key from rescoring results (e.g., 'vina_vina_rescoring', 'smina_vinardo_rescoring', etc.)
engine : str
The engine name (e.g., 'vina', 'smina')
If None, the engine will be inferred from the key.
If provided, the engine will be used to determine the database column name.
If not provided, the engine will be inferred from the key.
Returns
-------
str
The database column name (e.g., 'VINA_VINA', 'SMINA_VINARDO', etc.)
'''
key_lower = key.lower()
# Mapping dictionary for rescoring keys to database column names
# !!! CRITICAL: These must match SCORING_FUNCTION_ORDER for mask application !!!
mapping = {
# VINA mappings
'vina_vina_rescoring': 'VINA_VINA',
'vina_vinardo_rescoring': 'VINA_VINARDO',
# SMINA mappings - MUST match SCORING_FUNCTION_ORDER
'smina_vina_rescoring': 'SMINA_VINA',
'smina_vinardo_rescoring': 'SMINA_VINARDO',
'smina_dkoes_scoring_rescoring': 'SMINA_SCORING_DKOES',
'smina_scoring_dkoes_rescoring': 'SMINA_SCORING_DKOES', # Alternative format
'smina_old_scoring_dkoes_rescoring': 'SMINA_OLD_SCORING_DKOES',
'smina_fast_dkoes_rescoring': 'SMINA_FAST_DKOES',
'smina_ad4_scoring_rescoring': 'SMINA_SCORING_AD4',
# Handle alternative SMINA key formats (from actual rescoring output)
'smina_dkoes_fast': 'SMINA_FAST_DKOES',
'smina_dkoes_scoring_old': 'SMINA_OLD_SCORING_DKOES',
'smina_dkoes_fast_rescoring': 'SMINA_FAST_DKOES',
'smina_dkoes_scoring_old_rescoring': 'SMINA_OLD_SCORING_DKOES',
# PLANTS mappings
'plants_chemplp': 'PLANTS_CHEMPLP',
'plants_plp': 'PLANTS_PLP',
'plants_plp95': 'PLANTS_PLP95',
# ODDT mappings (these come from the dataframe columns, already prefixed with oddt_)
'oddt_rfscore_v1': 'ODDT_RFSCORE_V1',
'oddt_rfscore_v2': 'ODDT_RFSCORE_V2',
'oddt_rfscore_v3': 'ODDT_RFSCORE_V3',
'oddt_plecrf_p5_l1_s65536': 'ODDT_PLECRF_P5_L1_S65536',
'oddt_plec_p5_l1_s65536': 'ODDT_PLECRF_P5_L1_S65536', # Alternative naming
'oddt_nnscore': 'ODDT_NNSCORE',
}
# Check if exact match exists
if key_lower in mapping:
return mapping[key_lower]
# Handle ODDT keys that are already prefixed with oddt_
if key_lower.startswith('oddt_'):
# Remove oddt_ prefix
inner_key = key_lower[5:] # Remove 'oddt_'
# Try to match with known ODDT patterns
if inner_key.startswith('rfscore_v'):
# Extract version number (handle both 'rfscore_v1' and 'rfscorev1' formats)
version = inner_key.replace('rfscore_v', '').replace('rfscorev', '')
return f'ODDT_RFSCORE_V{version.upper()}'
elif 'plec' in inner_key.lower():
# Handle PLEC variations (case-insensitive)
inner_key_lower = inner_key.lower()
if 'p5_l1_s65536' in inner_key_lower or 'p5l1s65536' in inner_key_lower:
return 'ODDT_PLECRF_P5_L1_S65536'
elif inner_key.lower() == 'nnscore':
return 'ODDT_NNSCORE'
# Handle new format: rescoring_{scoring_function}_{pose_number} or rescoring_{pose_number}
if key_lower.startswith('rescoring_'):
# Extract scoring function and pose number
# Format: rescoring_{scoring_function}_{pose_number} or rescoring_{pose_number}
parts = key_lower.split('_')
if len(parts) >= 2:
# Check if last part is a number (pose number)
if parts[-1].isdigit():
pose_number = parts[-1]
# Remove 'rescoring' and pose number, rest is scoring function
scoring_function_parts = parts[1:-1]
else:
# No pose number, just scoring function after 'rescoring'
scoring_function_parts = parts[1:]
if scoring_function_parts:
# Reconstruct scoring function name
scoring_function = '_'.join(scoring_function_parts)
# Use provided engine if available, otherwise try to detect
engines_to_try = [engine] if engine else ['vina', 'smina']
# Try to match with known formats
for eng in engines_to_try:
test_key_old = f'{eng}_{scoring_function}_rescoring'
test_key_new = f'{eng}_{scoring_function}'
if test_key_old in mapping:
return mapping[test_key_old]
if test_key_new in mapping:
return mapping[test_key_new]
# If not found in mapping, construct based on engine or scoring function
if engine:
# Use provided engine
if engine == 'vina':
return f'VINA_{scoring_function.upper()}'
elif engine == 'smina':
sf_mapping = {
'dkoes_scoring': 'SCORING_DKOES',
'scoring_dkoes': 'SCORING_DKOES',
'old_scoring_dkoes': 'OLD_SCORING_DKOES',
'dkoes_scoring_old': 'OLD_SCORING_DKOES',
'fast_dkoes': 'FAST_DKOES',
'ad4_scoring': 'SCORING_AD4',
}
if scoring_function in sf_mapping:
return f'SMINA_{sf_mapping[scoring_function]}'
return f'SMINA_{scoring_function.upper()}'
else:
# Try to detect engine from scoring function
if scoring_function in ['vina', 'vinardo']:
return f'VINA_{scoring_function.upper()}'
else:
# Assume smina for other scoring functions
sf_mapping = {
'dkoes_scoring': 'SCORING_DKOES',
'scoring_dkoes': 'SCORING_DKOES',
'old_scoring_dkoes': 'OLD_SCORING_DKOES',
'dkoes_scoring_old': 'OLD_SCORING_DKOES',
'fast_dkoes': 'FAST_DKOES',
'ad4_scoring': 'SCORING_AD4',
}
if scoring_function in sf_mapping:
return f'SMINA_{sf_mapping[scoring_function]}'
return f'SMINA_{scoring_function.upper()}'
else:
# Just 'rescoring_{pose_number}' - no scoring function specified
# Use engine if provided, otherwise default to vina_vina
if engine == 'smina':
return 'SMINA_VINARDO' # Default SMINA scoring function
return 'VINA_VINA' # Default VINA scoring function
# Handle old format: VINA/SMINA rescoring keys (remove _rescoring suffix if present)
if key_lower.endswith('_rescoring'):
key_without_suffix = key_lower[:-10] # Remove '_rescoring'
if key_without_suffix in mapping:
return mapping[key_without_suffix]
# Try to construct the mapping
if key_without_suffix.startswith('vina_'):
sf = key_without_suffix.replace('vina_', '')
return f'VINA_{sf.upper()}'
elif key_without_suffix.startswith('smina_'):
sf = key_without_suffix.replace('smina_', '')
# Handle special SMINA scoring function names
# !!! CRITICAL: These must match SCORING_FUNCTION_ORDER !!!
sf_mapping = {
'dkoes_scoring': 'SCORING_DKOES',
'scoring_dkoes': 'SCORING_DKOES', # Alternative format
'old_scoring_dkoes': 'OLD_SCORING_DKOES',
'dkoes_scoring_old': 'OLD_SCORING_DKOES', # Alternative format
'fast_dkoes': 'FAST_DKOES',
'dkoes_fast': 'FAST_DKOES', # Alternative format
'ad4_scoring': 'SCORING_AD4',
'vina': 'VINA',
'vinardo': 'VINARDO',
}
if sf in sf_mapping:
return f'SMINA_{sf_mapping[sf]}'
else:
return f'SMINA_{sf.upper()}'
# If no mapping found, return uppercase version of the key
return key.upper()
CHECKPOINT_STAGE_DOCKING = "docking"
CHECKPOINT_STAGE_RESCORING = "rescoring"
CHECKPOINT_STAGE_FEATURES = "features"
def _checkpoint_path(ligand_path: str) -> str:
return os.path.join(ligand_path, LIGAND_CHECKPOINT_FILE)
def _features_cache_path(ligand_path: str) -> str:
return os.path.join(ligand_path, LIGAND_FEATURES_CACHE_FILE)
def _write_json_atomic(path: str, payload: Dict[str, Any]) -> None:
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True, default=_json_default)
os.replace(tmp_path, path)
def _json_default(value: Any) -> Any:
if isinstance(value, np.generic):
return value.item()
if isinstance(value, (set, tuple)):
return list(value)
return str(value)
def _default_checkpoint(ligand_name: str) -> Dict[str, Any]:
return {
"version": 1,
"ligand": ligand_name,
"status": "new",
"completed_stages": [],
"stage_data": {},
"failed_stage": "",
"last_error": "",
"updated_at": time.time(),
}
def _load_checkpoint(ligand_path: str, ligand_name: str) -> Dict[str, Any]:
checkpoint = _default_checkpoint(ligand_name)
if not ENABLE_LIGAND_CHECKPOINT:
return checkpoint
path = _checkpoint_path(ligand_path)
if not os.path.isfile(path):
return checkpoint
try:
with open(path, "r", encoding="utf-8") as handle:
loaded = json.load(handle)
except Exception:
return checkpoint
if not isinstance(loaded, dict):
return checkpoint
checkpoint["status"] = str(loaded.get("status", checkpoint["status"]))
checkpoint["failed_stage"] = str(loaded.get("failed_stage", ""))
checkpoint["last_error"] = str(loaded.get("last_error", ""))
checkpoint["updated_at"] = loaded.get("updated_at", checkpoint["updated_at"])
completed = loaded.get("completed_stages", [])
if isinstance(completed, list):
checkpoint["completed_stages"] = sorted({str(stage) for stage in completed})
stage_data = loaded.get("stage_data", {})
if isinstance(stage_data, dict):
checkpoint["stage_data"] = stage_data
return checkpoint
def _save_checkpoint(ligand_path: str, checkpoint: Dict[str, Any]) -> None:
if not ENABLE_LIGAND_CHECKPOINT:
return
checkpoint["updated_at"] = time.time()
_write_json_atomic(_checkpoint_path(ligand_path), checkpoint)
def _is_stage_complete(checkpoint: Dict[str, Any], stage: str) -> bool:
completed = checkpoint.get("completed_stages", [])
return stage in completed if isinstance(completed, list) else False
def _clear_stage(checkpoint: Dict[str, Any], stage: str) -> None:
completed = checkpoint.get("completed_stages", [])
if isinstance(completed, list):
checkpoint["completed_stages"] = sorted([st for st in completed if st != stage])
stage_data = checkpoint.get("stage_data", {})
if isinstance(stage_data, dict):
stage_data.pop(stage, None)
checkpoint["status"] = "in_progress"
def _mark_stage_complete(checkpoint: Dict[str, Any], stage: str, stage_data: Optional[Dict[str, Any]] = None) -> None:
completed = checkpoint.get("completed_stages", [])
completed_set = set(completed if isinstance(completed, list) else [])
completed_set.add(stage)
checkpoint["completed_stages"] = sorted(completed_set)
checkpoint["status"] = "in_progress" if stage != CHECKPOINT_STAGE_FEATURES else "completed"
checkpoint["failed_stage"] = ""
checkpoint["last_error"] = ""
if stage_data:
checkpoint.setdefault("stage_data", {})[stage] = stage_data
def _mark_stage_failed(checkpoint: Dict[str, Any], stage: str, message: str) -> None:
checkpoint["status"] = "failed"
checkpoint["failed_stage"] = stage
checkpoint["last_error"] = message
def _load_cached_features(ligand_path: str) -> Optional[Dict[str, Any]]:
path = _features_cache_path(ligand_path)
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
if isinstance(payload, dict):
return payload
except Exception:
return None
return None
def _save_cached_features(ligand_path: str, features: Dict[str, Any]) -> None:
if not ENABLE_LIGAND_CHECKPOINT:
return
_write_json_atomic(_features_cache_path(ligand_path), features)
def _normalize_sf_names(rescoring_result: Dict[str, Any]) -> Dict[str, Any]:
sf_name_corrections = {
"SMINA_DKOES_FAST": "SMINA_FAST_DKOES",
"SMINA_DKOES_SCORING_OLD": "SMINA_OLD_SCORING_DKOES",
"SMINA_SCORING_DKOES_OLD": "SMINA_OLD_SCORING_DKOES",
"SMINA_FAST_DKOES_RESCORING": "SMINA_FAST_DKOES",
"SMINA_OLD_SCORING_DKOES_RESCORING": "SMINA_OLD_SCORING_DKOES",
}
for old_name, correct_name in sf_name_corrections.items():
if old_name in rescoring_result and correct_name not in rescoring_result:
rescoring_result[correct_name] = rescoring_result.pop(old_name)
return rescoring_result
def _value_from_rescore_entry(value: Any) -> Any:
if isinstance(value, list) and len(value) > 0:
return value[0]
return value
def _add_plants_scores_to_rescoring_result(plants_rescoring: Dict[str, Any], rescoring_result: Dict[str, Any]) -> None:
for outer_key, inner_dict in plants_rescoring.items():
db_column_name = map_rescoring_key_to_db_column(outer_key)
if isinstance(inner_dict, dict):
if "PLANTS_TOTAL_SCORE" in inner_dict:
total_score = inner_dict["PLANTS_TOTAL_SCORE"]
rescoring_result[db_column_name] = _value_from_rescore_entry(total_score)
else:
print(
f"Warning: PLANTS_TOTAL_SCORE not found in inner dict for {outer_key}. "
f"Available keys: {list(inner_dict.keys())}"
)
else:
rescoring_result[db_column_name] = _value_from_rescore_entry(inner_dict)
def _read_rescoring_outputs(
ligand_path: str,
ligand_name: str,
vina_ligand: ocvina.Vina,
plants_ligand: ocplants.PLANTS,
smina_ligand: ocsmina.Smina,
) -> Optional[Dict[str, Any]]:
rescoring_result: Dict[str, Any] = {}
oddt_csv = os.path.join(ligand_path, "oddt", f"{ligand_name}.csv")
if not os.path.isfile(oddt_csv):
return None
try:
oddt_df = pd.read_csv(oddt_csv)
oddt_dict = ocoddt.df_to_dict(oddt_df)
if not oddt_dict:
return None
first_key = list(oddt_dict.keys())[0]
oddt_scores = oddt_dict[first_key]
if not isinstance(oddt_scores, dict):
return None
for key, value in oddt_scores.items():
db_column_name = map_rescoring_key_to_db_column(f"oddt_{key}")
rescoring_result[db_column_name] = value
except Exception:
return None
try:
plants_rescoring = plants_ligand.read_rescore_logs(f"{ligand_path}/plantsFiles")
except Exception:
return None
if not plants_rescoring:
return None
_add_plants_scores_to_rescoring_result(plants_rescoring, rescoring_result)
try:
vina_rescoring = vina_ligand.read_rescore_logs(f"{ligand_path}/vinaFiles/rescoring")
except Exception:
return None
if not vina_rescoring:
return None
for key, value in vina_rescoring.items():
db_column_name = map_rescoring_key_to_db_column(key, engine="vina")
rescoring_result[db_column_name] = _value_from_rescore_entry(value)
try:
smina_rescoring = smina_ligand.read_rescore_logs(f"{ligand_path}/sminaFiles/rescoring")
except Exception:
return None
if not smina_rescoring:
return None
for key, value in smina_rescoring.items():
db_column_name = map_rescoring_key_to_db_column(key, engine="smina")
rescoring_result[db_column_name] = _value_from_rescore_entry(value)
return _normalize_sf_names(rescoring_result)
def _run_docking_stage(ligand_path: str, vina_ligand: ocvina.Vina, plants_ligand: ocplants.PLANTS) -> Tuple[list, list]:
# Vina
vina_ligand.run_prepare_receptor(overwrite=True)
vina_ligand.run_prepare_ligand(overwrite=True)
vina_ligand.run_docking(overwrite=True)
vina_ligand.split_poses()
vina_poses_dir = os.path.dirname(vina_ligand.output_vina) if hasattr(vina_ligand, "output_vina") else f"{ligand_path}/vinaFiles"
vina_pattern = f"{vina_poses_dir}/*_split_*.pdbqt"
pose_files_found = False
for _ in range(50):
found_files = glob(vina_pattern)
if found_files and wait_for_files_ready(found_files, max_wait=2.0):
pose_files_found = True
break
time.sleep(0.2)
if not pose_files_found:
print("Warning: No stable pose files found for Vina after waiting, proceeding anyway...")
time.sleep(0.5)
vina_poses = vina_ligand.get_docked_poses()
if vina_poses and not wait_for_files_ready(vina_poses, max_wait=5.0):
print("Warning: Some Vina pose files may not be fully ready, but proceeding...")
# PLANTS
plants_ligand.run_prepare_receptor(overwrite=True)
plants_ligand.run_prepare_ligand(overwrite=True)
plants_ligand.run_docking(overwrite=True)
plants_output_dir = plants_ligand.output_plants if hasattr(plants_ligand, "output_plants") else f"{ligand_path}/plantsFiles"
plants_run_dir = os.path.join(plants_output_dir, "run")
plants_pattern = f"{plants_run_dir}/*.mol2"
plants_files_found = False
for _ in range(100):
found_files = [
f for f in glob(plants_pattern)
if not f.endswith("_protein.mol2") and not f.endswith("_fixed.mol2")
]
if found_files and wait_for_files_ready(found_files, max_wait=2.0):
plants_files_found = True
break
time.sleep(0.2)
if not plants_files_found:
print("Warning: No stable PLANTS output files found after waiting, proceeding anyway...")
time.sleep(0.5)
time.sleep(0.5)
plants_poses = plants_ligand.get_docked_poses()
if plants_poses and not wait_for_files_ready(plants_poses, max_wait=3.0):
print("Warning: Some PLANTS pose files may not be fully ready, but proceeding...")
if not vina_poses or not plants_poses:
raise ValueError("Docking did not generate valid pose files for both Vina and PLANTS.")
return vina_poses, plants_poses
def _load_existing_docking_poses(vina_ligand: ocvina.Vina, plants_ligand: ocplants.PLANTS) -> Tuple[list, list]:
vina_poses = vina_ligand.get_docked_poses()
plants_poses = plants_ligand.get_docked_poses()
if vina_poses:
_ = wait_for_files_ready(vina_poses, max_wait=2.0)
if plants_poses:
_ = wait_for_files_ready(plants_poses, max_wait=2.0)
return vina_poses, plants_poses
def _select_representative_medoid(
ligand_path: str,
ligand_name: str,
vina_poses: list,
plants_poses: list,
vina_ligand: ocvina.Vina,
plants_ligand: ocplants.PLANTS,
) -> str:
poses_list = vina_poses + plants_poses
valid_poses = []
max_retries = 5
retry_delay = 0.3
for pose_file in poses_list:
validated = False
for attempt in range(max_retries):
if validate_molecule_file(pose_file):
valid_poses.append(pose_file)
validated = True
break
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
if not validated:
print(f"Warning: Could not validate pose file {pose_file} after {max_retries} attempts, skipping.")
if not valid_poses:
raise ValueError(f"No valid pose files found for ligand {ligand_name} after validation")
if len(valid_poses) < 2:
raise ValueError(f"Need at least 2 valid poses for RMSD calculation, found {len(valid_poses)} for ligand {ligand_name}")
mol2_poses_dir = f"{ligand_path}/poses_mol2"
os.makedirs(mol2_poses_dir, exist_ok=True)
mol2_poses = []
mol2_to_original_map = {}
for pose_file in valid_poses:
pose_ext = os.path.splitext(pose_file)[1].lower()
pose_basename = os.path.basename(pose_file)
if pose_ext == ".mol2":
mol2_poses.append(pose_file)
mol2_to_original_map[pose_file] = pose_file
else:
mol2_path = os.path.join(mol2_poses_dir, f"{os.path.splitext(pose_basename)[0]}.mol2")
occonversion.convert_mols(pose_file, mol2_path, overwrite=True)
if wait_for_file_stable(mol2_path, max_wait=3.0):
mol2_poses.append(mol2_path)
mol2_to_original_map[mol2_path] = pose_file
else:
print(f"Warning: Could not convert {pose_file} to MOL2 format, skipping for RMSD calculation")
if len(mol2_poses) < 2:
raise ValueError(f"Need at least 2 valid MOL2 poses for RMSD calculation, found {len(mol2_poses)} for ligand {ligand_name}")
if not wait_for_files_ready(mol2_poses, max_wait=5.0):
print("Warning: Some MOL2 pose files may not be fully ready for RMSD calculation, but proceeding...")
time.sleep(0.5)
rmsd_matrix = ocmolproc.get_rmsd_matrix(mol2_poses)
pose_engine_map = {}
for mol2_pose in mol2_poses:
original_pose = mol2_to_original_map.get(mol2_pose, mol2_pose)
if original_pose in vina_poses:
pose_engine_map[mol2_pose] = "vina"
elif original_pose in plants_poses:
pose_engine_map[mol2_pose] = "plants"
clusters = ocrmsdclust.cluster_rmsd(
rmsd_matrix,
algorithm="agglomerativeClustering",
outputPlot=f"{ligand_path}/medoids.png",
pose_engine_map=pose_engine_map,
)
medoids_mol2 = ocrmsdclust.get_medoids(rmsd_matrix, clusters, onlyBiggest=True)
medoids = [mol2_to_original_map.get(medoid_mol2, medoid_mol2) for medoid_mol2 in medoids_mol2]
medoids_dict = {}
for medoid in medoids:
if medoid in vina_poses:
medoids_dict[medoid] = vina_ligand.read_log(onlyBest=False)[ocvina.get_pose_index_from_file_path(medoid)]
elif medoid in plants_poses:
medoids_dict[medoid] = plants_ligand.read_log(onlyBest=False)[ocplants.get_pose_index_from_file_path(medoid)]
if not medoids_dict:
raise ValueError(f"Could not determine medoid for ligand {ligand_name}")
return list(medoids_dict.keys())[0]
def _run_rescoring_stage(
ligand_path: str,
ligand_name: str,
ligand: ocl.Ligand,
medoid: str,
vina_ligand: ocvina.Vina,
plants_ligand: ocplants.PLANTS,
smina_ligand: ocsmina.Smina,
) -> Dict[str, Any]:
oddt_outdir = os.path.join(ligand_path, "oddt")
try:
from joblib import parallel_backend
with parallel_backend("threading"):
_ = ocoddt.run_oddt(
vina_ligand.prepared_receptor,
medoid,
ligand.name,
oddt_outdir,
overwrite=True,
)
except ImportError:
_ = ocoddt.run_oddt(
vina_ligand.prepared_receptor,
medoid,
ligand.name,
oddt_outdir,
overwrite=True,
)
medoid_extension = os.path.splitext(medoid)[1].lower()
if medoid_extension != ".mol2":
plants_input = medoid.replace(medoid_extension, ".mol2")
occonversion.convert_mols(medoid, plants_input, overwrite=True)
else:
plants_input = medoid
plants_pose_list = f"{ligand_path}/plantsFiles/plants_pose_list.txt"
ocplants.write_pose_list(plants_input, plants_pose_list)
plants_ligand.run_rescore(plants_pose_list, logFile="", overwrite=True)
time.sleep(0.3)
if medoid_extension != ".pdbqt":
vina_smina_input = medoid.replace(medoid_extension, ".pdbqt")
occonversion.convert_mols(medoid, vina_smina_input, overwrite=True)
else:
vina_smina_input = medoid
vina_ligand.run_rescore(
f"{ligand_path}/vinaFiles/rescoring",
vina_smina_input,
overwrite=True,
splitLigand=False,
)
smina_ligand.run_rescore(
f"{ligand_path}/sminaFiles/rescoring",
vina_smina_input,
overwrite=True,
splitLigand=False,
)
rescoring_result = _read_rescoring_outputs(ligand_path, ligand_name, vina_ligand, plants_ligand, smina_ligand)
if rescoring_result is None:
raise ValueError(f"Rescoring outputs are incomplete or invalid for ligand {ligand_name}")
return rescoring_result
def _infer_legacy_stages(
ligand_path: str,
ligand_name: str,
checkpoint: Dict[str, Any],
vina_ligand: ocvina.Vina,
plants_ligand: ocplants.PLANTS,
smina_ligand: ocsmina.Smina,
) -> None:
if _is_stage_complete(checkpoint, CHECKPOINT_STAGE_FEATURES) and _is_stage_complete(checkpoint, CHECKPOINT_STAGE_RESCORING):
return
cached_features = _load_cached_features(ligand_path)
if isinstance(cached_features, dict) and not _is_stage_complete(checkpoint, CHECKPOINT_STAGE_FEATURES):
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_FEATURES)
rescoring_result = _read_rescoring_outputs(ligand_path, ligand_name, vina_ligand, plants_ligand, smina_ligand)
if rescoring_result is not None:
if not _is_stage_complete(checkpoint, CHECKPOINT_STAGE_RESCORING):
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_RESCORING)
if not _is_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING):
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING)
return
vina_poses, plants_poses = _load_existing_docking_poses(vina_ligand, plants_ligand)
if vina_poses and plants_poses and not _is_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING):
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING)
def process_single_ligand(ligand_path: str, ligand_name: str, receptor: ocr.Receptor) -> Optional[dict]:
''' Process a single ligand through the complete OCScore pipeline with checkpoint/resume support.
Parameters
----------
ligand_path : str
Base path to the ligand directory
ligand_name : str
Name of the ligand (without extension)
receptor : ocr.Receptor
Receptor object
Returns
-------
dict | None
Dictionary containing all features and OCScore prediction. None if processing fails.
'''
checkpoint: Optional[Dict[str, Any]] = None
current_stage = "initialization"
try:
ligand = ocl.Ligand(f"{ligand_path}/{ligand_name}.smi", name=ligand_name)
ligand.create_box(centroid=BOX_CENTER, save_path=f"{ligand_path}/boxes/")
vina_ligand = ocvina.Vina(
f"{ligand_path}/vinaFiles/conf_vina.txt",
f"{ligand_path}/boxes/box0.pdb",
receptor, PREPARED_RECEPTOR_PDBQT,
ligand, f"{ligand_path}/prepared_ligand.pdbqt",
f"{ligand_path}/vinaFiles/vina.log", f"{ligand_path}/vinaFiles/vina.pdbqt",
name=f"Vina {receptor.name}-{ligand_name}",
)
plants_ligand = ocplants.PLANTS(
f"{ligand_path}/plantsFiles/conf_plants.txt",
f"{ligand_path}/boxes/box0.pdb",
receptor, PREPARED_RECEPTOR_MOL2,
ligand, f"{ligand_path}/prepared_ligand.mol2",
f"{ligand_path}/plantsFiles/plants.log", f"{ligand_path}/plantsFiles",
name=f"Plants {receptor.name}-{ligand_name}",
)
smina_ligand = ocsmina.Smina(
f"{ligand_path}/sminaFiles/conf_smina.txt",
f"{ligand_path}/boxes/box0.pdb",
receptor, PREPARED_RECEPTOR_PDBQT,
ligand, f"{ligand_path}/prepared_ligand.pdbqt",
f"{ligand_path}/sminaFiles/smina.log", f"{ligand_path}/sminaFiles/smina.pdbqt",
name=f"Smina {receptor.name}-{ligand_name}",
)
current_stage = "checkpoint"
checkpoint = _load_checkpoint(ligand_path, ligand_name)
resume_enabled = ENABLE_LIGAND_CHECKPOINT
if resume_enabled:
_infer_legacy_stages(ligand_path, ligand_name, checkpoint, vina_ligand, plants_ligand, smina_ligand)
_save_checkpoint(ligand_path, checkpoint)
if resume_enabled and _is_stage_complete(checkpoint, CHECKPOINT_STAGE_FEATURES):
cached_features = _load_cached_features(ligand_path)
if isinstance(cached_features, dict):
print(f"[{ligand_name}] Resume: using cached features.")
return cached_features
_clear_stage(checkpoint, CHECKPOINT_STAGE_FEATURES)
_save_checkpoint(ligand_path, checkpoint)
rescoring_result: Optional[Dict[str, Any]] = None
if resume_enabled and _is_stage_complete(checkpoint, CHECKPOINT_STAGE_RESCORING):
rescoring_result = _read_rescoring_outputs(ligand_path, ligand_name, vina_ligand, plants_ligand, smina_ligand)
if rescoring_result is None:
print(f"[{ligand_name}] Checkpoint says rescoring complete, but outputs are missing/corrupted. Recomputing.")
_clear_stage(checkpoint, CHECKPOINT_STAGE_RESCORING)
_clear_stage(checkpoint, CHECKPOINT_STAGE_FEATURES)
_save_checkpoint(ligand_path, checkpoint)
else:
print(f"[{ligand_name}] Resume: using existing rescoring outputs.")
if rescoring_result is None:
vina_poses: list = []
plants_poses: list = []
if resume_enabled and _is_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING):
vina_poses, plants_poses = _load_existing_docking_poses(vina_ligand, plants_ligand)
if not vina_poses or not plants_poses:
print(f"[{ligand_name}] Checkpoint says docking complete, but poses are missing. Re-running docking.")
_clear_stage(checkpoint, CHECKPOINT_STAGE_DOCKING)
_save_checkpoint(ligand_path, checkpoint)
else:
print(f"[{ligand_name}] Resume: using existing docking outputs.")
if not vina_poses or not plants_poses:
current_stage = CHECKPOINT_STAGE_DOCKING
vina_poses, plants_poses = _run_docking_stage(ligand_path, vina_ligand, plants_ligand)
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_DOCKING)
_save_checkpoint(ligand_path, checkpoint)
current_stage = CHECKPOINT_STAGE_RESCORING
medoid = _select_representative_medoid(
ligand_path,
ligand_name,
vina_poses,
plants_poses,
vina_ligand,
plants_ligand,
)
rescoring_result = _run_rescoring_stage(
ligand_path,
ligand_name,
ligand,
medoid,
vina_ligand,
plants_ligand,
smina_ligand,
)
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_RESCORING, stage_data={"medoid": medoid})
_save_checkpoint(ligand_path, checkpoint)
current_stage = CHECKPOINT_STAGE_FEATURES
receptor_descriptors = receptor.get_descriptors()
ligand_descriptors = ligand.get_descriptors()
all_features: Dict[str, Any] = {}
all_features.update(rescoring_result if rescoring_result is not None else {})
all_features.update(receptor_descriptors)
all_features.update(ligand_descriptors)
all_features["name"] = f"{receptor.name}_{ligand.name}"
all_features["receptor"] = receptor.name
all_features["ligand"] = ligand.name
_save_cached_features(ligand_path, all_features)
_mark_stage_complete(checkpoint, CHECKPOINT_STAGE_FEATURES, stage_data={"feature_count": len(all_features)})
_save_checkpoint(ligand_path, checkpoint)
return all_features
except Exception as e:
if checkpoint is not None:
try:
_mark_stage_failed(checkpoint, current_stage, str(e))
_save_checkpoint(ligand_path, checkpoint)
except Exception:
pass
print(f"Error processing ligand {ligand_name}: {e}")
import traceback
traceback.print_exc()
return None
def validate_molecule_file(file_path: str) -> bool:
'''Validate that a molecule file can be loaded and is complete.
Parameters
----------
file_path : str
Path to the molecule file
Returns
-------
bool
True if file is valid and can be loaded
'''
from OCDocker.Toolbox import Validation as ocvalidation
# First check if file is stable (not being written)
if not wait_for_file_stable(file_path, max_wait=2.0):
return False
# Then validate the molecule structure
try:
return ocvalidation.is_molecule_valid(file_path)
except Exception:
return False
def wait_for_file_stable(file_path: str, max_wait: float = 5.0, check_interval: float = 0.1) -> bool:
'''Wait for a file to stabilize (size stops changing).
Parameters
----------
file_path : str
Path to the file to check
max_wait : float
Maximum time to wait in seconds
check_interval : float
Time between checks in seconds
Returns
-------
bool
True if file stabilized, False if timeout
'''
if not os.path.isfile(file_path):
return False
start_time = time.time()
last_size = -1
stable_count = 0
required_stable_checks = 3 # File must be stable for 3 consecutive checks
while time.time() - start_time < max_wait:
try:
current_size = os.path.getsize(file_path)
if current_size == last_size:
stable_count += 1
if stable_count >= required_stable_checks:
return True
else:
stable_count = 0
last_size = current_size
time.sleep(check_interval)
except (OSError, IOError):
# File might be locked or deleted
time.sleep(check_interval)
continue
return False
def wait_for_files_ready(file_paths: list, max_wait: float = 8.0, check_interval: float = 0.2) -> bool:
'''Wait for all files in a list to exist, be stable, and be readable.
Parameters
----------
file_paths : list
List of file paths to wait for
max_wait : float
Maximum time to wait in seconds
check_interval : float
Time between checks in seconds
Returns
-------
bool
True if all files are ready, False if timeout
'''
if not file_paths:
return True
start_time = time.time()
ready_files = set()
while time.time() - start_time < max_wait:
all_ready = True
for file_path in file_paths:
if file_path in ready_files:
continue
if wait_for_file_stable(file_path, max_wait=check_interval * 2, check_interval=check_interval / 2):
ready_files.add(file_path)
else:
all_ready = False
if all_ready and len(ready_files) == len(file_paths):
return True
time.sleep(check_interval)
return len(ready_files) == len(file_paths)
if __name__ == "__main__":
results = main()
This script demonstrates:
Receptor and ligand preparation
Multi-engine docking (Vina, PLANTS)
Pose clustering to find representative poses
Rescoring with multiple scoring functions (ODDT, PLANTS, Vina, SMINA)
Feature extraction (receptor and ligand descriptors)
Model inference using trained OCScore model
Automatic mapping of rescoring results to database column names
Multiprocessing support for processing multiple ligands
Inference from CSV¶
Example of OCScore inference loading features directly from a CSV file:
#!/usr/bin/env python3
"""
Example: OCScore Inference from CSV
This example demonstrates how to run OCScore model inference using feature data
loaded from a CSV file. The script:
1. Loads OCDocker configuration to enforce `reference_column_order`
2. Loads input CSV data and keeps the original table for output
3. Loads model artifacts (model, optional mask, optional scaler)
4. Runs `ocscoring.get_score(...)` for inference
5. Exports an output CSV preserving original rows/columns plus `OCSCORE`
Usage:
# Recommended: pass config path explicitly
python examples/15_python_api_inference_from_csv.py \
--csv-path /path/to/features.csv \
--model-name OCScore \
--config-path /path/to/OCDocker.cfg \
--output-csv /path/to/scored.csv
# Alternative: use OCDOCKER_CONFIG environment variable
export OCDOCKER_CONFIG=/path/to/OCDocker.cfg
python examples/15_python_api_inference_from_csv.py \
--csv-path /path/to/features.csv \
--model-name OCScore
"""
import argparse
import os
import sys
import numpy as np
import pandas as pd
# Allow running the example directly from source checkout without installation.
_script_dir = os.path.dirname(os.path.abspath(__file__))
_parent_dir = os.path.dirname(_script_dir)
if _parent_dir not in sys.path:
sys.path.insert(0, _parent_dir)
import OCDocker.Config as occonfig
import OCDocker.OCScore.Scoring as ocscoring
import OCDocker.OCScore.Utils.IO as ocscoreio
def _resolve_model_path(model_name: str, models_dir: str, model_path: str | None) -> str:
"""Resolve the model path from an explicit path or common model-name patterns."""
if model_path:
if not os.path.isfile(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
return model_path
candidates = [
os.path.join(models_dir, f"{model_name}.pt"),
os.path.join(models_dir, f"{model_name}.pth"),
os.path.join(models_dir, f"{model_name}.pkl"),
]
for candidate in candidates:
if os.path.isfile(candidate):
return candidate
raise FileNotFoundError(
f"Could not find model for '{model_name}' in {models_dir}. "
"Tried .pt, .pth, and .pkl."
)
def _resolve_config_path(config_path: str | None) -> str | None:
"""Resolve config path from argument/env/common local locations."""
if config_path:
candidate = os.path.abspath(config_path)
if not os.path.isfile(candidate):
raise FileNotFoundError(f"Config file not found: {candidate}")
return candidate
env_cfg = os.getenv("OCDOCKER_CONFIG", "").strip()
if env_cfg:
env_candidate = os.path.abspath(env_cfg)
if os.path.isfile(env_candidate):
return env_candidate
repo_cfg = os.path.join(_parent_dir, "OCDocker.cfg")
if os.path.isfile(repo_cfg):
return os.path.abspath(repo_cfg)
cwd_cfg = os.path.abspath("OCDocker.cfg")
if os.path.isfile(cwd_cfg):
return cwd_cfg
return None
def _ensure_reference_order_config(config_path: str | None) -> str | None:
"""Load config into OCDocker singleton so get_score can enforce column order."""
resolved = _resolve_config_path(config_path)
if resolved is None:
return None
os.environ["OCDOCKER_CONFIG"] = resolved
loaded = occonfig.OCDockerConfig.from_config_file(resolved)
occonfig.set_config(loaded)
return resolved
def main() -> None:
parser = argparse.ArgumentParser(description="Run OCScore inference from a CSV file.")
parser.add_argument(
"--csv-path",
required=True,
help="Path to input CSV file containing features for inference.",
)
parser.add_argument(
"--model-name",
default="OCScore",
help="Model name used to locate model/mask/scaler files (default: OCScore).",
)
parser.add_argument(
"--models-dir",
default=None,
help="Directory containing model artifacts. Defaults to OCScore_models.",
)
parser.add_argument(
"--model-path",
default=None,
help="Explicit model path. If omitted, inferred from --model-name in --models-dir.",
)
parser.add_argument(
"--config-path",
default=None,
help="Path to OCDocker config file (.cfg/.yml). Needed to enforce reference_column_order.",
)
parser.add_argument(
"--scaler-path",
default=None,
help="Optional scaler path. If omitted, tries <model_name>_scaler.pkl in models dir.",
)
parser.add_argument(
"--no-mask",
action="store_true",
help="Disable mask loading and run inference without a mask.",
)
parser.add_argument(
"--use-gpu",
action="store_true",
help="Use GPU when available (PyTorch models).",
)
parser.add_argument(
"--output-csv",
default="predictions_from_csv.csv",
help="Output CSV file for predictions (default: predictions_from_csv.csv).",
)
parser.add_argument(
"--score-column",
default="OCSCORE",
help="Column name to store predictions in the output CSV (default: OCSCORE).",
)
parser.add_argument(
"--disable-reference-order",
action="store_true",
help="Disable enforcement of reference_column_order (not recommended).",
)
parser.add_argument(
"--invert-conditionally",
dest="invert_conditionally",
action="store_true",
default=False,
help="Invert VINA/SMINA/PLANTS-like columns before inference (default: enabled).",
)
args = parser.parse_args()
if not os.path.isfile(args.csv_path):
raise FileNotFoundError(f"Input CSV file not found: {args.csv_path}")
enforce_reference_order = not args.disable_reference_order
loaded_config_path = _ensure_reference_order_config(args.config_path)
if enforce_reference_order:
cfg = occonfig.get_config()
if not cfg.paths.reference_column_order:
raise ValueError(
"reference_column_order is not set in the active config. "
"Pass --config-path /path/to/OCDocker.cfg (or set OCDOCKER_CONFIG)."
)
print(f"Using OCDocker config: {loaded_config_path if loaded_config_path else 'active runtime config'}")
# Keep a full copy for output (same input rows/columns + OCScore column).
input_df = pd.read_csv(args.csv_path)
# Use OCScore loader for inference input (drops rows with invalid NaNs).
scoring_df = ocscoreio.load_data(args.csv_path)
if scoring_df.empty:
raise ValueError("No valid rows found for inference after CSV preprocessing.")
models_dir = os.path.abspath(args.models_dir) if args.models_dir else ocscoreio.get_models_dir()
model_path = _resolve_model_path(args.model_name, models_dir, args.model_path)
mask = None
if not args.no_mask:
try:
mask = ocscoreio.load_mask(args.model_name, models_dir=models_dir)
print(f"Loaded mask for model '{args.model_name}'.")
except FileNotFoundError:
print(f"Mask file not found for model '{args.model_name}'. Continuing without mask.")
scaler_path = args.scaler_path
if scaler_path is None:
candidate_scaler = os.path.join(models_dir, f"{args.model_name}_scaler.pkl")
if os.path.isfile(candidate_scaler):
scaler_path = candidate_scaler
if scaler_path:
print(f"Using scaler: {scaler_path}")
else:
print("No scaler provided/found. Inference will fit a scaler on the prediction data.")
predictions = ocscoring.get_score(
model_path=model_path,
data=scoring_df,
mask=mask,
scaler_path=scaler_path,
use_gpu=args.use_gpu,
enforce_reference_column_order=enforce_reference_order,
invert_conditionally=args.invert_conditionally,
)
if "predicted_score" not in predictions.columns:
raise ValueError("Prediction output does not contain 'predicted_score' column.")
# Build output with the same rows/columns as input plus an OCScore column.
# Rows removed during preprocessing keep NaN in the score column.
output_df = input_df.copy()
output_df[args.score_column] = np.nan
output_df.loc[scoring_df.index, args.score_column] = predictions["predicted_score"].to_numpy()
print(f"Input shape: {input_df.shape}")
print(f"Output shape: {output_df.shape}")
print(f"Predictions assigned to {predictions['predicted_score'].notna().sum()} rows.")
print(output_df[[args.score_column]].head())
if args.output_csv:
output_df.to_csv(args.output_csv, index=False)
print(f"Saved output CSV to: {args.output_csv}")
if __name__ == "__main__":
main()
This script demonstrates:
Loading input features from a
.csvfileLoading OCDocker config so
reference_column_orderis enforcedResolving model artifacts from
OCScore_models(or a custom directory)Optional mask and scaler loading
Running model inference and exporting an output CSV with original rows/columns plus
OCSCORE
Example command:
python examples/15_python_api_inference_from_csv.py \
--csv-path /path/to/features.csv \
--model-name OCScore \
--config-path /path/to/OCDocker.cfg \
--output-csv /path/to/scored.csv
Configuration¶
The complete pipeline script includes a configuration section at the top where you can customize:
Receptor and ligand paths
Model paths and names
Preprocessing settings
Output file paths
Multiprocessing settings
Example configuration:
# Receptor configuration
RECEPTOR_PATH = "/path/to/receptor.pdb"
RECEPTOR_NAME = "Receptor"
# Ligand configuration
LIGAND_PATHS = [
"/path/to/ligand1",
"/path/to/ligand2",
]
# Model configuration
MODEL_NAME = "OCScore"
MODELS_DIR = "OCScore_models"
# Multiprocessing
N_JOBS = 4 # Number of parallel jobs
USE_MULTIPROCESSING = True