-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #18 from MolecularAI/prepare-for-release-1.5.0
Adding updates for 1.5.0 release
- Loading branch information
Showing
16 changed files
with
649 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
from __future__ import annotations | ||
|
||
import argparse | ||
from collections import OrderedDict | ||
from typing import List, Optional, Sequence | ||
|
||
import pandas as pd | ||
from rdkit import Chem | ||
|
||
|
||
def _get_atom_identifier(atom: Chem.rdchem.Atom) -> str: | ||
""" | ||
Get atom identifier for neighborhood identification. | ||
The identifier is either the atom-map number if available, otherwise the symbol. | ||
:param atom: rdkit atom | ||
:return: an atom identifier string | ||
""" | ||
atom_id = atom.GetAtomMapNum() | ||
if atom_id == 0: | ||
atom_id = atom.GetSymbol() | ||
return str(atom_id) | ||
|
||
|
||
def _get_bond_environment_identifier( | ||
atoms: Sequence[Chem.rdchem.Atom], bond: Chem.rdchem.Bond | ||
) -> str: | ||
""" | ||
Get the environment of a specific bond. | ||
:param atoms: atoms in the molecule. | ||
:param bond: bond for which the environment should be specified | ||
:return: string representation of the bond environment | ||
""" | ||
atom_map1 = _get_atom_identifier(atoms[bond.GetBeginAtomIdx()]) | ||
atom_map2 = _get_atom_identifier(atoms[bond.GetEndAtomIdx()]) | ||
bond_order = bond.GetBondType() | ||
atom_map1, atom_map2 = sorted([atom_map1, atom_map2]) | ||
return f"{atom_map1}_{atom_map2}_{bond_order}" | ||
|
||
|
||
def _get_atomic_neighborhoods(smiles: str) -> OrderedDict[int, List[str]]: | ||
""" | ||
Obtains a dictionary containing each atom (atomIdx) and a list of its | ||
bonding environment. | ||
:param smiles: Atom-mapped SMILES string | ||
:return: A dictionary containing each atom (atomIdx) and a list of its | ||
bonding environment identifiers. | ||
""" | ||
|
||
mol = Chem.MolFromSmiles(smiles) | ||
atoms = mol.GetAtoms() | ||
|
||
neighbor_dict = {} | ||
for atom in atoms: | ||
bonds_list = [] | ||
if atom.GetAtomMapNum() != 0: | ||
for bond in atom.GetBonds(): | ||
|
||
bonds_list.append(_get_bond_environment_identifier(atoms, bond)) | ||
|
||
neighbor_dict[atom.GetAtomMapNum()] = sorted(bonds_list) | ||
ordered_neighbor_dict = OrderedDict(sorted(neighbor_dict.items())) | ||
|
||
return ordered_neighbor_dict | ||
|
||
|
||
def get_atom_list(reactants_smiles: str, product_smiles: str) -> List[int]: | ||
""" | ||
Given two sets of SMILES strings corresponding to a set of reactants and products, | ||
returns a list of atomIdxs for which the atomic environment has changed, | ||
as defined by a change in the bonds. | ||
:param reactants_smiles: Atom-mapped SMILES string for the reactant(s) | ||
:param product_smiles: Atom-mapped SMILES string for the product(s) | ||
:return: List of atoms (atomIdx) for which the atomic environment has changed | ||
""" | ||
|
||
ordered_reactant_neighbor_dict = _get_atomic_neighborhoods(reactants_smiles) | ||
ordered_product_neighbor_dict = _get_atomic_neighborhoods(product_smiles) | ||
|
||
all_indices = set(ordered_product_neighbor_dict.keys()) | set( | ||
ordered_reactant_neighbor_dict.keys() | ||
) | ||
|
||
# Checks to see equivlence of atomic enviroments. | ||
# If environment changed, then add atom to list | ||
atom_list = [ | ||
atom_map | ||
for atom_map in all_indices | ||
if ordered_reactant_neighbor_dict.get(atom_map, []) | ||
!= ordered_product_neighbor_dict.get(atom_map, []) | ||
] | ||
|
||
return atom_list | ||
|
||
|
||
def atom_map_tag_products(mapped_rxn: str) -> str: | ||
""" | ||
Given atom-mapped reaction, returns disconnection site-tagged product where atoms | ||
with changed atom environment are represented by [<atom>:1]. | ||
:param mapped_rxn: Atom-mapped reaction SMILES | ||
:return: SMILES of the product containing tags corresponding to atoms changed in the | ||
reaction. | ||
""" | ||
reactants_smiles, _, product_smiles = mapped_rxn.split(">") | ||
|
||
product_mol = Chem.MolFromSmiles(product_smiles) | ||
atom_list = get_atom_list(reactants_smiles, product_smiles) | ||
|
||
# Set atoms in product with a different environment in reactants to 1 | ||
for atom in product_mol.GetAtoms(): | ||
if atom.GetAtomMapNum() in atom_list: | ||
atom.SetAtomMapNum(1) | ||
else: | ||
atom.SetAtomMapNum(0) | ||
|
||
return Chem.MolToSmiles(product_mol) | ||
|
||
|
||
def main(args: Optional[Sequence[str]] = None) -> None: | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("--input") | ||
parser.add_argument("--in_column", default="RxnSmilesClean") | ||
parser.add_argument("--out_column", default="products_atom_map_tagged") | ||
parser.add_argument("--output") | ||
|
||
args = parser.parse_args(args) | ||
|
||
data = pd.read_csv(args.input, sep="\t") | ||
|
||
smiles_col = data[args.in_column].apply(atom_map_tag_products) | ||
data = data.assign(**{args.out_column: smiles_col}) | ||
data.to_csv(args.output, sep="\t", index=False) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
from __future__ import annotations | ||
|
||
import re | ||
from typing import List, Tuple | ||
|
||
from rdkit import Chem | ||
|
||
from rxnutils.chem.utils import remove_atom_mapping | ||
|
||
|
||
def smiles_tokens(smiles: str) -> List[str]: | ||
""" | ||
Tokenize SMILES using basic regex pattern for Chemformer. | ||
:param smiles: SMILES to tokenize | ||
:return: List of tokens identified in SMILES. | ||
""" | ||
pattern = r"(\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\\\|\/|:|~|@|\?|>|\*|\!|\$|\%[0-9]{2}|[0-9])" | ||
regex = re.compile(pattern) | ||
tokens = [token for token in regex.findall(smiles)] | ||
assert smiles == "".join(tokens) | ||
return tokens | ||
|
||
|
||
def _next_tagged_token( | ||
product_tagged_tokens: List[str], untagged_token: str, tagged_token_idx: int | ||
) -> Tuple[str, int]: | ||
""" | ||
Get the next tagged token in the sequence. Includes checks and fixes for | ||
stereochemistry changes due to removing atom mapping. | ||
:param product_tagged_tokens: tokens of product tagged with [<atom>:1] | ||
:param untagged_token: the current token from the untagged product | ||
:param tagged_token_idx: the current token index of the tagged product | ||
:return: the next (tagged-product) token and the corresponding token index | ||
""" | ||
tagged_token = product_tagged_tokens[tagged_token_idx] | ||
|
||
# Check if the stereo chemistry has changed after removing atom-mapping and | ||
# handle each specific case. | ||
if tagged_token != untagged_token and (tagged_token == "/" or tagged_token == "\\"): | ||
if untagged_token == "/" or untagged_token == "\\": | ||
return untagged_token, tagged_token_idx | ||
else: | ||
tagged_token_idx += 1 | ||
return product_tagged_tokens[tagged_token_idx], tagged_token_idx | ||
|
||
if ( | ||
tagged_token != untagged_token | ||
and not ":1" in tagged_token | ||
and "@" in tagged_token | ||
): | ||
return untagged_token, tagged_token_idx | ||
|
||
return tagged_token, tagged_token_idx | ||
|
||
|
||
def tagged_smiles_from_tokens( | ||
product_tagged_tokens: List[str], product_untagged_tokens: List[str] | ||
) -> Tuple[str, str]: | ||
""" | ||
Convert the tagged SMILES from atom-mapping to unmapped-token + '!' | ||
:param product_tagged_tokens: tokens of product tagged with [<atom>:1] | ||
:param product_untagged_tokens: tokens of the untagged product | ||
:return: Tuple of SMILES of the product containing tags corresponding to atoms changed in the | ||
reaction using "<atom>!", and SMILES of the (reconstructed) untagged product | ||
""" | ||
|
||
print(product_tagged_tokens) | ||
|
||
product_converted = "" | ||
product_untagged = "" | ||
|
||
tagged_token_idx = 0 | ||
|
||
for untagged_token in product_untagged_tokens: | ||
|
||
tagged_token, tagged_token_idx = _next_tagged_token( | ||
product_tagged_tokens, untagged_token, tagged_token_idx | ||
) | ||
|
||
if tagged_token != untagged_token and ( | ||
untagged_token == "/" or untagged_token == "\\" | ||
): | ||
continue | ||
|
||
if tagged_token == untagged_token: | ||
product_converted += untagged_token | ||
else: | ||
# Remove brackets around a single letter | ||
if ( | ||
len(untagged_token) == 3 | ||
and untagged_token.startswith("[") | ||
and untagged_token.endswith("]") | ||
): | ||
untagged_token = untagged_token[1] | ||
product_converted += untagged_token + "!" | ||
|
||
product_untagged += untagged_token | ||
|
||
tagged_token_idx += 1 | ||
|
||
return product_converted, product_untagged | ||
|
||
|
||
def _canonicalize_tagged_smiles( | ||
product_tagged: str, product_untagged: str = None | ||
) -> Tuple[str, str]: | ||
""" | ||
Reorder the tagged-product SMILES on canonical form using the canonicalized | ||
untagged product. | ||
:param product_tagged: SMILES of tagged product | ||
:param product_untagged: SMILES of untagged product | ||
:return: canonicalized untagged and tagged product SMILES | ||
""" | ||
mol = Chem.MolFromSmiles(product_tagged) | ||
mol_untagged = Chem.MolFromSmiles(product_untagged) | ||
|
||
_, canonical_atom_order = tuple( | ||
zip( | ||
*sorted( | ||
[(j, i) for i, j in enumerate(Chem.CanonicalRankAtoms(mol_untagged))] | ||
) | ||
) | ||
) | ||
|
||
mol = Chem.RenumberAtoms(mol, canonical_atom_order) | ||
mol_untagged = Chem.RenumberAtoms(mol_untagged, canonical_atom_order) | ||
return Chem.MolToSmiles(mol, canonical=False), Chem.MolToSmiles(mol_untagged) | ||
|
||
|
||
def convert_atom_map_tag(product_atom_map_tagged: str) -> str: | ||
""" | ||
Replace product tagged by atom-mapping [<atom>:1] to product tagged by "<atom>!". | ||
Returns empty string if no atom-map tagging or the failed to create untagged product. | ||
:param product_tagged: SMILES of the product containing tags corresponding to | ||
atoms changed in the reaction using [<atom>:1] | ||
:return: SMILES of the product containing tags corresponding to atoms changed in the | ||
reaction using "<atom>!" | ||
""" | ||
|
||
# Check number of tags | ||
n_tags = len(re.findall(r"\[[^\]]+:1]", product_atom_map_tagged)) | ||
|
||
if n_tags < 1: | ||
return "" | ||
|
||
product_untagged = remove_atom_mapping(product_atom_map_tagged, canonical=False) | ||
|
||
if not Chem.MolFromSmiles(product_untagged): | ||
return "" | ||
|
||
product_tagged, product_untagged = _canonicalize_tagged_smiles( | ||
product_atom_map_tagged, product_untagged | ||
) | ||
|
||
# Update the SMILES string to remove atom-mapping brackets and explicit [H]:s and | ||
# replace by <atom>! | ||
product_tagged_tokens = smiles_tokens(product_tagged) | ||
product_untagged_tokens = smiles_tokens(product_untagged) | ||
|
||
product_tagged_converted, product_untagged = tagged_smiles_from_tokens( | ||
product_tagged_tokens, product_untagged_tokens | ||
) | ||
|
||
n_new_tags = product_tagged_converted.count("!") | ||
|
||
if n_new_tags != n_tags: | ||
raise AssertionError( | ||
f"The number of tags is not the same after converting to '!' tagging. " | ||
f"product_tagged_atom_map: {product_atom_map_tagged}" | ||
f"product_tagged_converted: {product_tagged_converted}." | ||
) | ||
|
||
if product_tagged_converted.replace("!", "") != product_untagged: | ||
raise AssertionError( | ||
f"product_tagged.replace('!', '') != product_untagged." | ||
f"product_tagged: {product_tagged_converted}, product_untagged: {product_untagged}" | ||
) | ||
|
||
return product_tagged_converted |
Oops, something went wrong.