diff --git a/openfecli/clicktypes/__init__.py b/openfecli/clicktypes/__init__.py new file mode 100644 index 000000000..29b5b1291 --- /dev/null +++ b/openfecli/clicktypes/__init__.py @@ -0,0 +1 @@ +from .hyphenchoice import HyphenAwareChoice diff --git a/openfecli/clicktypes/hyphenchoice.py b/openfecli/clicktypes/hyphenchoice.py new file mode 100644 index 000000000..70ac77501 --- /dev/null +++ b/openfecli/clicktypes/hyphenchoice.py @@ -0,0 +1,13 @@ +import click + +def _normalize_to_hyphen(string): + return string.replace("_", "-") + +class HyphenAwareChoice(click.Choice): + def __init__(self, choices, case_sensitive=True): + choices = [_normalize_to_hyphen(choice) for choice in choices] + super().__init__(choices, case_sensitive) + + def convert(self, value, param, ctx): + value = _normalize_to_hyphen(value) + return super().convert(value, param, ctx) diff --git a/openfecli/commands/gather.py b/openfecli/commands/gather.py index 225226eba..7b9e466e0 100644 --- a/openfecli/commands/gather.py +++ b/openfecli/commands/gather.py @@ -3,9 +3,43 @@ import click from openfecli import OFECommandPlugin +from openfecli.clicktypes import HyphenAwareChoice import pathlib +def _get_column(val): + import numpy as np + if val == 0: + return 0 + + log10 = np.log10(val) + + if log10 >= 0.0: + col = np.floor(log10 + 1) + else: + col = np.floor(log10) + return int(col) + + +def format_estimate_uncertainty( + est: float, + unc: float, + unc_prec: int = 1, +) -> tuple[str, str]: + import numpy as np + # get the last column needed for uncertainty + unc_col = _get_column(unc) - (unc_prec - 1) + + if unc_col < 0: + est_str = f"{est:.{-unc_col}f}" + unc_str = f"{unc:.{-unc_col}f}" + else: + est_str = f"{np.round(est, -unc_col + 1)}" + unc_str = f"{np.round(unc, -unc_col + 1)}" + + return est_str, unc_str + + def is_results_json(f): # sanity check on files before we try and deserialize return 'estimate' in open(f, 'r').read(20) @@ -51,85 +85,8 @@ def legacy_get_type(res_fn): return 'complex' -@click.command( - 'gather', - short_help="Gather result jsons for network of RFE results into a TSV file" -) -@click.argument('rootdir', - type=click.Path(dir_okay=True, file_okay=False, - path_type=pathlib.Path), - required=True) -@click.option('output', '-o', - type=click.File(mode='w'), - default='-') -def gather(rootdir, output): - """Gather simulation result jsons of relative calculations to a tsv file - - Will walk ROOTDIR recursively and find all results files ending in .json - (i.e those produced by the quickrun command). Each of these contains the - results of a separate leg from a relative free energy thermodynamic cycle. - - Paired legs of simulations will be combined to give the DDG values between - two ligands in the corresponding phase, producing either binding ('DDGbind') - or hydration ('DDGhyd') relative free energies. These will be reported as - 'DDGbind(B,A)' meaning DGbind(B) - DGbind(A), the difference in free energy - of binding for ligand B relative to ligand A. - - Individual leg results will be also be written. These are reported as - either DGvacuum(A,B) DGsolvent(A,B) or DGcomplex(A,B) for the vacuum, - solvent or complex free energy of transmuting ligand A to ligand B. - - \b - Will produce a **tab** separated file with 6 columns: - 1) a description of the measurement, for example DDGhyd(A, B) - 2) the type of this measurement, either RBFE or RHFE - 3) the identifier of the first ligand - 4) the identifier of the second ligand - 5) the estimated value (in kcal/mol) - 6) the uncertainty on the value (also kcal/mol) - - By default, outputs to stdout, use -o option to choose file. - """ - from collections import defaultdict - import glob - import networkx as nx +def _get_ddgs(legs): import numpy as np - from cinnabar.stats import mle - - def dp2(v: float) -> str: - # turns 0.0012345 -> '0.0012', round() would get this wrong - return np.format_float_positional(v, precision=2, trim='0', - fractional=False) - - # 1) find all possible jsons - json_fns = glob.glob(str(rootdir) + '/**/*json', recursive=True) - - # 2) filter only result jsons - result_fns = filter(is_results_json, json_fns) - - # 3) pair legs of simulations together into dict of dicts - legs = defaultdict(dict) - - for result_fn in result_fns: - result = load_results(result_fn) - if result is None: - continue - elif result['estimate'] is None or result['uncertainty'] is None: - click.echo(f"WARNING: Calculations for {result_fn} did not finish succesfully!", - err=True) - - try: - names = get_names(result) - except KeyError: - raise ValueError("Failed to guess names") - try: - simtype = get_type(result) - except KeyError: - simtype = legacy_get_type(result_fn) - - legs[names][simtype] = result['estimate'], result['uncertainty'] - - # 4a for each ligand pair, resolve legs DDGs = [] for ligpair, vals in sorted(legs.items()): DDGbind = None @@ -144,15 +101,54 @@ def dp2(v: float) -> str: # DDG(2,1)bind = DG(1->2)complex - DG(1->2)solvent DDGbind = (DG1_mag - DG2_mag).m bind_unc = np.sqrt(np.sum(np.square([DG1_unc.m, DG2_unc.m]))) - if 'solvent' in vals and 'vacuum' in vals: + elif 'solvent' in vals and 'vacuum' in vals: DG1_mag, DG1_unc = vals['solvent'] DG2_mag, DG2_unc = vals['vacuum'] if not ((DG1_mag is None) or (DG2_mag is None)): DDGhyd = (DG1_mag - DG2_mag).m hyd_unc = np.sqrt(np.sum(np.square([DG1_unc.m, DG2_unc.m]))) + else: # -no-cov- + raise RuntimeError(f"Unknown DDG type for {vals}") DDGs.append((*ligpair, DDGbind, bind_unc, DDGhyd, hyd_unc)) + return DDGs + + +def _write_ddg(legs, writer): + DDGs = _get_ddgs(legs) + writer.writerow(["ligand_i", "ligand_j", "DDG(i->j) (kcal/mol)", + "uncertainty (kcal/mol)"]) + for ligA, ligB, DDGbind, bind_unc, DDGhyd, hyd_unc in DDGs: + name = f"{ligB}, {ligA}" + if DDGbind is not None: + DDGbind, bind_unc = format_estimate_uncertainty(DDGbind, + bind_unc) + writer.writerow([ligA, ligB, DDGbind, bind_unc]) + if DDGhyd is not None: + DDGhyd, hyd_unc = format_estimate_uncertainty(DDGbind, + bind_unc) + writer.writerow([ligA, ligB, DDGhyd, hyd_unc]) + + +def _write_dg_raw(legs, writer): + writer.writerow(["leg", "ligand_i", "ligand_j", "DG(i->j) (kcal/mol)", + "uncertainty (kcal/mol)"]) + for ligpair, vals in sorted(legs.items()): + name = ', '.join(ligpair) + for simtype, (m, u) in sorted(vals.items()): + if m is None: + m, u = 'NaN', 'NaN' + else: + m, u = format_estimate_uncertainty(m.m, u.m) + writer.writerow([simtype, *ligpair, m, u]) + + +def _write_dg_mle(legs, writer): + import networkx as nx + import numpy as np + from cinnabar.stats import mle + DDGs = _get_ddgs(legs) MLEs = [] # 4b) perform MLE g = nx.DiGraph() @@ -193,35 +189,103 @@ def dp2(v: float) -> str: ligname = idx_to_nm[node] MLEs.append((ligname, f, df)) - output.write('measurement\ttype\tligand_i\tligand_j\testimate (kcal/mol)' - '\tuncertainty (kcal/mol)\n') - # 5a) write out MLE values + writer.writerow(["ligand", "DG(MLE) (kcal/mol)", + "uncertainty (kcal/mol)"]) for ligA, DG, unc_DG in MLEs: - DG, unc_DG = dp2(DG), dp2(unc_DG) - output.write(f'DGbind({ligA})\tDG(MLE)\tZero\t{ligA}\t{DG}\t{unc_DG}\n') + DG, unc_DG = format_estimate_uncertainty(DG, unc_DG) + writer.writerow([ligA, DG, unc_DG]) - # 5b) write out DDG values - for ligA, ligB, DDGbind, bind_unc, DDGhyd, hyd_unc in DDGs: - name = f"{ligB}, {ligA}" - if DDGbind is not None: - DDGbind, bind_unc = dp2(DDGbind), dp2(bind_unc) - output.write(f'DDGbind({name})\tRBFE\t{ligA}\t{ligB}' - f'\t{DDGbind}\t{bind_unc}\n') - if DDGhyd is not None: - DDGhyd, hyd_unc = dp2(DDGhyd), dp2(hyd_unc) - output.write(f'DDGhyd({name})\tRHFE\t{ligA}\t{ligB}\t' - f'{DDGhyd}\t{hyd_unc}\n') +@click.command( + 'gather', + short_help="Gather result jsons for network of RFE results into a TSV file" +) +@click.argument('rootdir', + type=click.Path(dir_okay=True, file_okay=False, + path_type=pathlib.Path), + required=True) +@click.option( + '--report', + type=HyphenAwareChoice(['dg', 'ddg', 'dg-raw'], case_sensitive=False), + default="dg", show_default=True, + help=( + "What data to report. 'dg' gives maximum-likelihood estimate of " + "absolute deltaG, 'ddg' gives delta-delta-G, and 'dg-raw' gives " + "the raw result of the deltaG for a leg." + ) +) +@click.option('output', '-o', + type=click.File(mode='w'), + default='-') +def gather(rootdir, output, report): + """Gather simulation result jsons of relative calculations to a tsv file + + This walks ROOTDIR recursively and finds all result JSON files from the + quickrun command (these files must end in .json). Each of these contains + the results of a separate leg from a relative free energy thermodynamic + cycle. + + The results reported depend on ``--report`` flag: + + \b + * 'dg' (default) reports the ligand and the results are the maximum + likelihood estimate of its absolute free, and the uncertainty in + that. + * 'ddg' reports pairs of ligand_i and ligand_j, the calculated + relative free energy DDG(i->j) = DG(j) - DG(i) and its uncertainty + * 'dg-raw' reports the raw results, giving the leg (vacuum, solvent, or + complex), ligand_i, ligand_j, the raw DG(i->j) associated with it. + + The output is a table of **tab** separated values. By default, this + outputs to stdout, use the -o option to choose an output file. + """ + from collections import defaultdict + import glob + import csv + + # 1) find all possible jsons + json_fns = glob.glob(str(rootdir) + '/**/*json', recursive=True) + + # 2) filter only result jsons + result_fns = filter(is_results_json, json_fns) + + # 3) pair legs of simulations together into dict of dicts + legs = defaultdict(dict) + + for result_fn in result_fns: + result = load_results(result_fn) + if result is None: + continue + elif result['estimate'] is None or result['uncertainty'] is None: + click.echo(f"WARNING: Calculations for {result_fn} did not finish succesfully!", + err=True) + + try: + names = get_names(result) + except KeyError: + raise ValueError("Failed to guess names") + try: + simtype = get_type(result) + except KeyError: + simtype = legacy_get_type(result_fn) + + legs[names][simtype] = result['estimate'], result['uncertainty'] + + writer = csv.writer( + output, + delimiter="\t", + lineterminator="\n", # to exactly reproduce previous, prefer "\r\n" + ) + + # 5a) write out MLE values + # 5b) write out DDG values # 5c) write out each leg - for ligpair, vals in sorted(legs.items()): - name = ', '.join(ligpair) - for simtype, (m, u) in sorted(vals.items()): - if m is None: - m, u = 'NaN', 'NaN' - else: - m, u = dp2(m.m), dp2(u.m) - output.write(f'DG{simtype}({name})\t{simtype}\t{ligpair[0]}\t' - f'{ligpair[1]}\t{m}\t{u}\n') + writing_func = { + 'dg': _write_dg_mle, + 'ddg': _write_ddg, + 'dg-raw': _write_dg_raw, + }[report.lower()] + writing_func(legs, writer) PLUGIN = OFECommandPlugin( @@ -229,3 +293,6 @@ def dp2(v: float) -> str: section='Quickrun Executor', requires_ofe=(0, 6), ) + +if __name__ == "__main__": + gather() diff --git a/openfecli/tests/clicktypes/test_hyphenchoice.py b/openfecli/tests/clicktypes/test_hyphenchoice.py new file mode 100644 index 000000000..c7c0bb5a2 --- /dev/null +++ b/openfecli/tests/clicktypes/test_hyphenchoice.py @@ -0,0 +1,19 @@ +import pytest +import click +from openfecli.clicktypes.hyphenchoice import HyphenAwareChoice + +class TestHyphenAwareChoice: + @pytest.mark.parametrize('value', [ + "foo_bar_baz", "foo_bar-baz", "foo-bar_baz", "foo-bar-baz" + ]) + def test_init(self, value): + ch = HyphenAwareChoice([value]) + assert ch.choices == ["foo-bar-baz"] + + @pytest.mark.parametrize('value', [ + "foo_bar_baz", "foo_bar-baz", "foo-bar_baz", "foo-bar-baz" + ]) + def test_convert(self, value): + ch = HyphenAwareChoice(['foo-bar-baz']) + # counting on __call__ to get to convert() + assert ch(value) == "foo-bar-baz" diff --git a/openfecli/tests/commands/test_gather.py b/openfecli/tests/commands/test_gather.py index afafc4e29..68f7b6475 100644 --- a/openfecli/tests/commands/test_gather.py +++ b/openfecli/tests/commands/test_gather.py @@ -4,7 +4,24 @@ import tarfile import pytest -from openfecli.commands.gather import gather +from openfecli.commands.gather import ( + gather, format_estimate_uncertainty, _get_column +) + +@pytest.mark.parametrize('est,unc,unc_prec,est_str,unc_str', [ + (12.432, 0.111, 2, "12.43", "0.11"), + (0.9999, 0.01, 2, "1.000", "0.010"), + (1234, 100, 2, "1230", "100"), +]) +def test_format_estimate_uncertainty(est, unc, unc_prec, est_str, unc_str): + assert format_estimate_uncertainty(est, unc, unc_prec) == (est_str, unc_str) + +@pytest.mark.parametrize('val, col', [ + (1.0, 1), (0.1, -1), (-0.0, 0), (0.0, 0), (0.2, -1), (0.9, -1), + (0.011, -2), (9, 1), (10, 2), (15, 2), +]) +def test_get_column(val, col): + assert _get_column(val) == col @pytest.fixture