Source code for OCDocker.Toolbox.Validation

#!/usr/bin/env python3

# Description
###############################################################################
'''
Sets of classes and functions that are used to validate data.

Usage:

import OCDocker.Toolbox.Validation as ocvalidation
'''

# Imports
###############################################################################
import os
import time

from Bio.PDB import MMCIFParser, PDBParser
from typing import Union

import OCDocker.Error as ocerror
import OCDocker.Toolbox.Printing as ocprint

# License
###############################################################################
'''
OCDocker
Authors: Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M.
Federal University of Rio de Janeiro
Carlos Chagas Filho Institute of Biophysics
Laboratory for Molecular Modeling and Dynamics

This program is proprietary software owned by the Federal University of Rio de Janeiro (UFRJ),
developed by Rossi, A.D.; Monachesi, M.C.E.; Spelta, G.I.; Torres, P.H.M., and protected under Brazilian Law No. 9,609/1998.
All rights reserved. Use, reproduction, modification, and distribution are allowed under this UFRJ license,
provided this copyright notice is preserved. See the LICENSE file for details.

Contact: Artur Duque Rossi - arturossi10@gmail.com
'''

# Classes
###############################################################################

# Functions
###############################################################################
## Private ##
def _safe_print_warning(message: str) -> None:
    '''Print warning safely even when a stubbed Printing module is incomplete.'''

    warn = getattr(ocprint, "print_warning", None)
    if callable(warn):
        warn(message)
        return

    # Fallback to print_error for test stubs that only expose error logging.
    err = getattr(ocprint, "print_error", None)
    if callable(err):
        err(f"WARNING: {message}")
        return

    # Last resort to keep behavior observable without crashing.
    print(f"WARNING: {message}")

def _safe_print_error(message: str) -> None:
    '''Print error safely even when a stubbed Printing module is incomplete.'''

    err = getattr(ocprint, "print_error", None)
    if callable(err):
        err(message)
        return

    print(f"ERROR: {message}")


## Public ##


[docs] def is_algorithm_allowed(path: str) -> bool: '''Finds if the given dir is a folder from an allowed algorithm. Parameters ---------- path : str Path to the dir which will be tested. The algorithm list and their shortcodes: - AffinityPropagation: ap - AgglomerativeClustering: ac - Birch: bi - DBSCAN: db - KMeans: km - MeanShift: ms - MiniBatchKMeans: mb - NoCluster: na - OPTICS: op - SpectralClustering: sc Returns ------- bool True if the dir is an allowed algorithm, False otherwise. ''' # Allowed algorithms allowed = ["ap", "ac", "bi", "db", "km", "ms", "mb", "na", "op", "sc"] return path.split(os.path.sep).pop() in allowed
[docs] def is_molecule_valid(molecule: str) -> bool: '''Check if a molecule is valid (protein or ligand). Parameters ---------- molecule : str The molecule to be checked. Returns ------- bool True if the molecule is valid, False otherwise. ''' # Check if file exists if os.path.isfile(molecule): # Check which is its extension to use the correct function extension = os.path.splitext(molecule)[1].lower() # Test if the molecule should be loaded with biopython or rdkit if molecule.lower().endswith((".cif", ".mmcif", ".pdb")): try: # Now we know that it is a file path, check which is its extension to use the correct function extension = os.path.splitext(molecule)[1].lower() # Choose the parser based on extension parser: PDBParser | MMCIFParser if extension == ".pdb": parser = PDBParser() elif extension in [".cif", ".mmcif"]: parser = MMCIFParser() else: # Not suitable extension, so... say False!!! return False # Parse it _ = parser.get_structure("Please, be ok", molecule) # If no problems occur, the molecule should be fine return True except (OSError, IOError, ValueError, AttributeError, ImportError): # Uh oh, some problem has been found return False elif type(validate_obabel_extension(molecule)) == str: try: # Import RDKit lazily to avoid hard dependency at import time from rdkit import Chem # Check if the extension is within the supported ones, if yes, parse it if extension == ".mol2": parsed_mol = Chem.rdmolfiles.MolFromMol2File(molecule, sanitize = True) if parsed_mol is None: return False elif extension == ".sdf": supplier = Chem.rdmolfiles.SDMolSupplier(molecule, sanitize = True) if supplier is None: return False if not any(mol is not None for mol in supplier): return False elif extension == ".mol": parsed_mol = Chem.rdmolfiles.MolFromMolFile(molecule, sanitize = True) if parsed_mol is None: return False elif extension == ".pdbqt": # RDKit's PDB parser can misread PDBQT atom types (e.g., "A") as elements. # Use OpenBabel to validate PDBQT files instead. try: from openbabel import openbabel ob_conversion = openbabel.OBConversion() ob_conversion.SetInFormat("pdbqt") ob_mol = openbabel.OBMol() if not ob_conversion.ReadFile(ob_mol, molecule): return False if ob_mol.NumAtoms() <= 0: return False except Exception: return False elif extension in [".smi", ".smiles"]: # Read SMILES string from file and parse try: with open(molecule, "r") as f: smi = f.read().strip().split()[0] except (OSError, IOError, FileNotFoundError, IndexError): return False parsed_mol = Chem.rdmolfiles.MolFromSmiles(smi, sanitize = True) if parsed_mol is None: return False else: # Not suitable extension, so... say False!!!! return False # If no problems occur, the molecule should be fine return True except (OSError, IOError, ValueError, AttributeError, ImportError): # Uh oh, some problem has been found return False # No file, so it is False return False
[docs] def is_molecule_valid_with_retry(molecule: str, retries: int = 5, delay: float = 1.0) -> bool: '''Check if a molecule is valid, retrying when the file is empty or mid-write. Parameters ---------- molecule : str The molecule to be checked. retries : int, optional Number of read attempts before giving up. Default is 5. delay : float, optional Delay in seconds between attempts. Default is 1.0. Returns ------- bool True if the molecule is valid within the retry window, False otherwise. ''' attempts = max(1, retries) for attempt in range(attempts): size_ok = False if os.path.isfile(molecule): try: size_ok = os.path.getsize(molecule) > 0 except OSError: size_ok = False if size_ok: if is_molecule_valid(molecule): return True if attempt < attempts - 1 and delay > 0: time.sleep(delay) return False
[docs] def validate_digest_extension(digestPath: str, digestFormat: str) -> bool: """Validates the digest extension. Parameters ---------- digestPath : str The digest file path. digestFormat : str The format of the digest file. The options are: [ json (default), hdf5 (not implemented) ] Returns ------- bool If the extension is supported or not. """ # Supported extensions for digest file supportedExtensions = ["json"] # Check if the format options is valid if not digestFormat.lower() in supportedExtensions: _safe_print_warning( f"The format '{digestFormat}' is not supported. Trying to determine its extension from the file '{digestPath}'." ) # Get the extension from the file digestFormat = digestPath.split(".")[-1] # Check if the extension is valid if not digestFormat.lower() in supportedExtensions: _safe_print_error( f"The format '{digestFormat}' is not supported. The supported formats are: {supportedExtensions}" ) return False return True return True
[docs] def validate_obabel_extension(path: str) -> Union[str, int]: '''Validate the input file extension to ensure the compability with obabel lib. Parameters ---------- path : str Path to the input file. Returns ------- str | int The exit code of the command (based on the Error.py code table) if fails or the extension otherwise. ''' supportedExtensions = [ 'acesin', 'adf', 'alc', 'ascii', 'bgf', 'box', 'bs', 'c3d1', 'c3d2', 'cac', 'caccrt', 'cache', 'cacint', 'can', 'cdjson', 'cdxml', 'cht', 'cif', 'ck', 'cml', 'cmlr', 'cof', 'com', 'confabreport', 'CONFIG', 'CONTCAR', 'CONTFF', 'copy', 'crk2d', 'crk3d', 'csr', 'cssr', 'ct', 'cub', 'cube', 'dalmol', 'dmol', 'dx', 'ent', 'exyz', 'fa', 'fasta', 'feat', 'fh', 'fhiaims', 'fix', 'fps', 'fpt', 'fract', 'fs', 'fsa', 'gamin', 'gau', 'gjc', 'gjf', 'gpr', 'gr96', 'gro', 'gukin', 'gukout', 'gzmat', 'hin', 'inchi', 'inchikey', 'inp', 'jin', 'k', 'lmpdat', 'lpmd', 'mcdl', 'mcif', 'MDFF', 'mdl', 'ml2', 'mmcif', 'mmd', 'mmod', 'mna', 'mol', 'mol2', 'mold', 'molden', 'molf', 'molreport', 'mop', 'mopcrt', 'mopin', 'mp', 'mpc', 'mpd', 'mpqcin', 'mrv', 'msms', 'nul', 'nw', 'orcainp', 'outmol', 'paint', 'pcjson', 'pcm', 'pdb', 'pdbqt', 'png', 'pointcloud', 'POSCAR', 'POSFF', 'pov', 'pqr', 'pqs', 'qcin', 'report', 'rinchi', 'rsmi', 'rxn', 'sd', 'sdf', 'smi', 'smiles', 'stl', 'svg', 'sy2', 'tdd', 'text', 'therm', 'tmol', 'txt', 'txyz', 'unixyz', 'VASP', 'vmol', 'xed', 'xyz', 'yob', 'zin' ] extension = os.path.splitext(path)[1][1:] if extension in supportedExtensions: return extension return ocerror.Error.unsupported_extension(message=f"Unsupported extension for input molecule file! Supported extensions are '{' '.join(supportedExtensions)}' and got '{extension}'.")