#!/usr/bin/env python3
# Description
###############################################################################
'''
Sets of classes and functions that are used to extract and process information
of any kind of molecule.
Usage:
import OCDocker.Toolbox.MoleculeProcessing as ocmolproc
'''
# Imports
###############################################################################
import os
import tempfile
from functools import lru_cache
from spyrmsd import io, rmsd
try:
from spyrmsd.exceptions import NonIsomorphicGraphs
except Exception:
NonIsomorphicGraphs = ValueError
from threading import Lock
from typing import Dict, List, Set, Tuple, Union, cast
import OCDocker.Error as ocerror
import OCDocker.Toolbox.FilesFolders as ocff
import OCDocker.Toolbox.Printing as ocprint
import OCDocker.Toolbox.Running as ocrun
from OCDocker.Config import get_config
# License
###############################################################################
'''
OCDocker
Authors: Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M.
Federal University of Rio de Janeiro
Carlos Chagas Filho Institute of Biophysics
Laboratory for Molecular Modeling and Dynamics
This program is proprietary software owned by the Federal University of Rio de Janeiro (UFRJ),
developed by Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M., and protected under Brazilian Law No. 9,609/1998.
All rights reserved. Use, reproduction, modification, and distribution are allowed under this UFRJ license,
provided this copyright notice is preserved. See the LICENSE file for details.
Contact: Artur Duque Rossi - arturossi10@gmail.com
'''
# Classes
###############################################################################
# Functions
###############################################################################
## Private ##
AtomKey = Tuple[str, str] # (resname, atomname) in CHARMM scheme
AtomVal = Tuple[str, str] # (resname, atomname) in canonical scheme
# Optional: collapse PDB2PQR “protonation/patch” residue names back to common PDB ones
_CANON_COLLAPSE = {
"HSD": "HIS", "HSE": "HIS", "HSP": "HIS", "HID": "HIS", "HIE": "HIS", "HIP": "HIS",
"ASH": "ASP",
"GLH": "GLU",
"LYN": "LYS",
"CYM": "CYS", "CYX": "CYS",
"TYM": "TYR",
}
def _format_atom_name(atom: str, element: str) -> str:
'''Format atom name into PDB columns 13-16.
Parameters
----------
atom : str
The atom name.
element : str
The element symbol.
Returns
-------
str
The formatted atom name.
'''
atom = atom.strip()
element = (element or "").strip().upper()
if len(atom) >= 4:
return atom[:4]
if atom and atom[0].isdigit():
# e.g. 1HG1: left-justify
return f"{atom:<4}"
# If element is 1-letter (C, N, O, S, P, H...), PDB usually uses a leading space
if len(element) == 1:
return f" {atom:<3}"
# Otherwise right-justify
return f"{atom:>4}"
[docs]
@lru_cache(maxsize=1)
def build_charmm_to_canonical_map() -> Dict[AtomKey, AtomVal]:
'''Build a complete CHARMM -> canonical mapping using PDB2PQR definitions and the CHARMM forcefield naming rules.
Returns
-------
Dict[AtomKey, AtomVal]
The mapping from CHARMM (key) to canonical (value).
'''
try:
from pdb2pqr import io as pqr_io
from pdb2pqr import forcefield as pqr_ff
except ImportError as e:
raise ImportError("pdb2pqr is required to build CHARMM->canonical maps.") from e
definition = pqr_io.get_definitions()
ff = pqr_ff.Forcefield("charmm", definition, userff=None, usernames=None)
rev: Dict[AtomKey, AtomVal] = {}
for canon_resname, defres in definition.map.items():
# defres.map: canonical atomname -> definition atom object
for canon_atomname in defres.map.keys():
ff_res, ff_atom = ff.get_names(canon_resname, canon_atomname)
if not ff_res or not ff_atom:
continue
# Invert: CHARMM -> canonical
rev.setdefault((ff_res, ff_atom), (canon_resname, canon_atomname))
return rev
## Public ##
[docs]
def clean_for_dssp(structurePath: str) -> int:
'''Make a pdb file with HEADER (empty), then CRYST1 and finally ATOM records only.
Parameters
----------
structurePath : str
The path to the structure file.
Returns
-------
int
The exit code of the command (based on the Error.py code table).
'''
# Initialise hasCryst1 flag
hasCryst1 = False
# Inigialise hasHeader flag
hasHeader = False
# List of lines and dssp lines
lines = []
# Check if structurePath is a valid file
if os.path.isfile(structurePath):
# Open it (for cleaning)
with open(structurePath, 'r') as pdbFile:
# For each line in pdbFile
for line in pdbFile:
if not line.startswith("HEADER") and not hasHeader:
# Set the hasHeader flag to True
hasHeader = True
# Add the line to the list
lines.append("HEADER \n")
if not line.startswith("CRYST1") and hasHeader and not hasCryst1:
# Set the hasCryst1 flag to True
hasCryst1 = True
# Add the line to the list
lines.append("CRYST1 1.000 1.000 1.000 90.00 90.00 90.00 P 1 1\n")
# Check if the line starts with ATOM
if line.startswith("ATOM"):
# Check if there is a chain in the line (all the lines should have a chain)
if line[21] == " ":
# Assume that the protein has only one chain and call it A
line = f"{line[:21]}A{line[22:]}"
# Add the line to the list
lines.append(line)
# Write atomically to avoid truncated/partial files being read by
# concurrent DSSP calls. Readers will see either the old full file
# or the new full file, never an in-progress write.
temp_dir = os.path.dirname(structurePath) or "."
temp_path = ""
try:
with tempfile.NamedTemporaryFile(
mode="w",
dir=temp_dir,
prefix=f".{os.path.basename(structurePath)}.dssp_tmp.",
suffix=".pdb",
delete=False,
) as pdbFile:
temp_path = pdbFile.name
pdbFile.writelines(lines)
os.replace(temp_path, structurePath)
finally:
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
return ocerror.Error.ok()
else:
return ocerror.Error.file_not_exist(message = f"The file '{structurePath}' does not exist!", level = ocerror.ReportLevel.ERROR)
[docs]
def clean_pdb_file(
structurePath: str,
outputPath: str,
overwrite: bool = False,
keep_hetatm: bool = False,
) -> int:
'''Create a cleaned PDB file with HEADER, CRYST1, and ATOM (optionally HETATM) records.
Parameters
----------
structurePath : str
The path to the input structure file.
outputPath : str
The path to the cleaned output file.
overwrite : bool, optional
If True, overwrites the output file when it already exists.
keep_hetatm : bool, optional
If True, keep HETATM lines in the output.
Returns
-------
int
The exit code of the command (based on the Error.py code table).
'''
if not os.path.isfile(structurePath):
return ocerror.Error.file_not_exist(message=f"The file '{structurePath}' does not exist!", level=ocerror.ReportLevel.ERROR)
if os.path.isfile(outputPath) and not overwrite:
return ocerror.Error.file_exists(message=f"The file '{outputPath}' already exists, aborting cleaning.", level=ocerror.ReportLevel.WARNING)
# Initialise flags
hasCryst1 = False
hasHeader = False
lines: List[str] = []
try:
with open(structurePath, 'r') as pdbFile:
for line in pdbFile:
if not line.startswith("HEADER") and not hasHeader:
hasHeader = True
lines.append("HEADER \n")
if not line.startswith("CRYST1") and hasHeader and not hasCryst1:
hasCryst1 = True
lines.append("CRYST1 1.000 1.000 1.000 90.00 90.00 90.00 P 1 1\n")
if line.startswith("ATOM") or (keep_hetatm and line.startswith("HETATM")):
if line[21] == " ":
line = f"{line[:21]}A{line[22:]}"
lines.append(line)
# Write the cleaned output
lock = Lock()
with lock:
with open(outputPath, 'w') as pdbFile:
pdbFile.writelines(lines)
except Exception as e:
return ocerror.Error.write_file(message=f"Could not clean PDB file '{structurePath}'. Error: {e}", level=ocerror.ReportLevel.ERROR)
return ocerror.Error.ok()
[docs]
def convert_pdb_charmm_to_canonical(
pdb_in: Union[str, os.PathLike],
pdb_out: Union[str, os.PathLike],
*,
collapse_resnames: bool = True,
overwrite: bool = False,
in_place: bool = False,
) -> int:
'''Convert a CHARMM-named PDB to canonical PDB names using PDB2PQR mappings.
Parameters
----------
pdb_in : str | os.PathLike
Input PDB file.
pdb_out : str | os.PathLike
Output PDB file (ignored if in_place=True).
collapse_resnames : bool, optional
Collapse patched/protonated residue names to common PDB names.
overwrite : bool, optional
Whether to overwrite output file when in_place=False.
in_place : bool, optional
If True, overwrite the input file path atomically.
'''
input_path = os.fspath(pdb_in)
output_path = os.fspath(pdb_out)
if in_place:
output_path = input_path
if not os.path.isfile(input_path):
return ocerror.Error.file_not_exist(
message=f"The file '{input_path}' does not exist!",
level=ocerror.ReportLevel.ERROR,
)
if os.path.isfile(output_path) and not overwrite and not in_place:
return ocerror.Error.file_exists(
message=f"The file '{output_path}' already exists, aborting conversion.",
level=ocerror.ReportLevel.WARNING,
)
try:
rev_map = build_charmm_to_canonical_map()
except ImportError as e:
return ocerror.Error.value_error(
message=f"pdb2pqr is required for CHARMM->canonical PDB renaming. Error: {e}",
level=ocerror.ReportLevel.ERROR,
)
except Exception as e:
return ocerror.Error.unknown(
message=f"Failed to build CHARMM->canonical map. Error: {e}",
level=ocerror.ReportLevel.ERROR,
)
out_lines: List[str] = []
try:
with open(input_path, "r", encoding="utf-8", errors="replace") as fh:
for line in fh:
if not (line.startswith("ATOM ") or line.startswith("HETATM")):
out_lines.append(line)
continue
atom = line[12:16].strip()
res = line[17:20].strip()
element = line[76:78].strip() if len(line) >= 78 else ""
new_res, new_atom = rev_map.get((res, atom), (res, atom))
if collapse_resnames:
new_res = _CANON_COLLAPSE.get(new_res, new_res)
line_list = list(line.rstrip("\n"))
if len(line_list) < 80:
line_list.extend([" "] * (80 - len(line_list)))
atom_field = _format_atom_name(new_atom, element)
res_field = f"{new_res:>3}"[:3]
line_list[12:16] = list(atom_field)
line_list[17:20] = list(res_field)
out_lines.append("".join(line_list) + "\n")
# Ensure output directory exists
out_dir = os.path.dirname(os.path.abspath(output_path))
if out_dir:
os.makedirs(out_dir, exist_ok=True)
if in_place:
tmp_path = f"{output_path}.canonical_tmp"
with open(tmp_path, "w", encoding="utf-8") as fh:
fh.writelines(out_lines)
os.replace(tmp_path, output_path)
else:
with open(output_path, "w", encoding="utf-8") as fh:
fh.writelines(out_lines)
except Exception as e:
return ocerror.Error.write_file(
message=f"Failed to convert '{input_path}' to canonical PDB names. Error: {e}",
level=ocerror.ReportLevel.ERROR,
)
ocprint.print_success(f"Converted '{input_path}' to canonical PDB '{output_path}'.")
return ocerror.Error.ok()
[docs]
def get_rmsd(reference: str, molecule: str) -> Union[List[float], float]:
'''Get the rmsd between a reference and a molecule file (it supports more than one molecule in this second file).
Parameters
----------
reference : str
The reference file.
molecule : str
The molecule file.
Returns
-------
List[float] | float
The rmsd between the reference and the molecule file.
'''
# Load reference
ref = io.loadmol(reference)
# Remove its hydrogens
ref.strip()
# Load all molecules (if only one, a list with a single element will be generated)
mols = io.loadallmols(molecule)
# For each molecule in molecules
for mol in mols:
# Remove its hydrogens
mol.strip()
# Get the reference and molecules coordinates
refCoordinates = ref.coordinates
molCoordinates = [mol.coordinates for mol in mols]
# Get the reference and molecules atomicnums
refAtmNum = ref.atomicnums
molAtmNum = mols[0].atomicnums
# Get the reference and molecules adjacency_matrix
refAdjMat = ref.adjacency_matrix
molAdjMat = mols[0].adjacency_matrix
# Return the symmetric rmsd (account for symmetry because it is important)
return cast(Union[List[float], float],
rmsd.symmrmsd(refCoordinates, molCoordinates, refAtmNum, molAtmNum, refAdjMat, molAdjMat))
[docs]
def get_rmsd_matrix(molecules: List[str]) -> Dict[str, Dict[str, float]]:
'''Get the rmsd matrix between a list of molecules.
Parameters
----------
molecules : List[str]
The list of molecules.
Returns
-------
Dict[str, Dict[str, float]]
The rmsd matrix.
'''
# Initialise the rmsd matrix with diagonal values
rmsdMatrix: Dict[str, Dict[str, float]] = {
molecule: {other: (0.0 if molecule == other else 0.0) for other in molecules}
for molecule in molecules
}
# Assign a large finite penalty when bond-graph inference yields
# non-isomorphic poses for the same target (e.g., malformed PDBQT->MOL2).
non_isomorphic_penalty = 1_000.0
non_isomorphic_pairs = 0
failed_pairs = 0
affected_poses: Set[str] = set()
def _pose_label(path: str) -> str:
'''Return a compact pose label suitable for one-line warnings.
Parameters
----------
path : str
The path to the pose file.
Returns
-------
str
A compact label for the pose, derived from the filename.
'''
base = os.path.basename(path)
engine = base.split("_", 1)[0]
if "_split_" in base:
idx = base.rsplit("_split_", 1)[1].split(".", 1)[0]
return f"{engine}{idx}"
return base
# Compute only upper triangle and mirror to keep the matrix symmetric.
for index, molecule in enumerate(molecules):
for otherMolecule in molecules[index + 1:]:
value: float
try:
tmpMolecule = get_rmsd(molecule, otherMolecule)
value = float(tmpMolecule if isinstance(tmpMolecule, float) else tmpMolecule[0])
except NonIsomorphicGraphs:
value = non_isomorphic_penalty
non_isomorphic_pairs += 1
affected_poses.add(molecule)
affected_poses.add(otherMolecule)
ocprint.print_warning(
f"RMSD noniso: {_pose_label(molecule)}|{_pose_label(otherMolecule)} "
f"p={non_isomorphic_penalty:.0f}"
)
except Exception as e:
value = non_isomorphic_penalty
failed_pairs += 1
ocprint.print_warning(
f"RMSD err: {_pose_label(molecule)}|{_pose_label(otherMolecule)} "
f"p={non_isomorphic_penalty:.0f} t={type(e).__name__}"
)
rmsdMatrix[molecule][otherMolecule] = value
rmsdMatrix[otherMolecule][molecule] = value
total_poses = len(molecules)
total_pairs = (total_poses * (total_poses - 1)) // 2
if total_poses > 0 and non_isomorphic_pairs > 0:
affected_ratio = len(affected_poses) / total_poses
pair_ratio = (non_isomorphic_pairs / total_pairs) if total_pairs > 0 else 0.0
if affected_ratio > 0.5:
severity = "high"
elif affected_ratio > 0.2:
severity = "moderate"
else:
severity = "low"
ocprint.print_warning(
"RMSD QC: "
f"noniso={non_isomorphic_pairs}/{total_pairs} "
f"aff={len(affected_poses)}/{total_poses} "
f"sev={severity}"
)
if affected_ratio > 0.5:
ocprint.print_warning(
"RMSD QC: aff>50%; review target."
)
if failed_pairs > 0:
ocprint.print_warning(
f"RMSD QC: failed={failed_pairs} p={non_isomorphic_penalty:.0f}"
)
return rmsdMatrix
[docs]
def needs_canonical_pdb_fix(
pdb_path: Union[str, os.PathLike],
*,
collapse_resnames: bool = True,
) -> bool:
'''Check if a PDB file contains CHARMM-style names that should be canonicalized.'''
input_path = os.fspath(pdb_path)
if not os.path.isfile(input_path):
return False
try:
rev_map = build_charmm_to_canonical_map()
except ImportError as e:
ocprint.print_warning(f"pdb2pqr not available; skipping canonical name detection. Error: {e}")
return False
except Exception as e:
ocprint.print_warning(f"Failed to build CHARMM->canonical map. Error: {e}")
return False
try:
with open(input_path, "r", encoding="utf-8", errors="replace") as fh:
for line in fh:
if not (line.startswith("ATOM ") or line.startswith("HETATM")):
continue
atom = line[12:16].strip()
res = line[17:20].strip()
new_res, new_atom = rev_map.get((res, atom), (res, atom))
if collapse_resnames:
new_res = _CANON_COLLAPSE.get(new_res, new_res)
if new_res != res or new_atom != atom:
return True
except Exception as e:
ocprint.print_warning(f"Failed while scanning '{input_path}' for CHARMM names. Error: {e}")
return False
return False
[docs]
def split_poses(ligand: str, ligandName: str, outPath: str, suffix: str = "", logFile: str = "") -> Union[int, Tuple[int, str]]:
'''Split the input ligand into its poses.
Parameters
----------
ligand : str
The path to the input ligand.
ligandName : str
The name of the input ligand.
outPath : str
The path to the output folder.
suffix : str, optional
The suffix to be added to the output files, by default "".
logFile : str, optional
The path to the log file, by default "".
Returns
-------
int
The exit code of the command (based on the Error.py code table).
'''
# Split the input ligand
config = get_config()
# Ensure ligand input path is absolute and normalized (remove duplicate directory components)
ligand = ocff.normalize_path(ligand)
# Ensure outPath is normalized (removes duplicate slashes, . and .. components, and duplicate directories) and absolute
outPath = ocff.normalize_path(outPath)
os.makedirs(outPath, exist_ok=True)
# Construct the output file prefix (vina_split will append numbers like _0, _1, etc.)
# Use normalize again to ensure no duplicate path components
output_prefix = ocff.normalize_path(os.path.join(outPath, f"{ligandName}{suffix}"))
cmd = [config.vina.split_executable, "--input", ligand, "--flex", "''", "--ligand", output_prefix]
# Print verbosity
ocprint.printv(f"Spliting the ligand '{ligand}'.")
# Run the command
return ocrun.run(cmd, logFile = logFile)