From 41340d4d34e02bbc33a930b4b0927c73795bdfac Mon Sep 17 00:00:00 2001 From: Yao Xiao <37345096+Yaoyx@users.noreply.github.com> Date: Wed, 7 Feb 2024 09:44:23 -0800 Subject: [PATCH] Add pool decorator to functions (#489) * add pool decorator to previous functions * Fix pool_decorator * add pool_decorator to dotfinder.py and expected.py * combined import command for pool_decorator with others * add tests for pooled functions to check if outputs are the same as using regular map * changed lambda func to a def func * try to set start method for multiprocessing * test pool.map insteand of pool.imap * switch multiprocessing to multiprocess * swith to multiprocess * change job back to lambda * change transforms back to lambda function * streamlined tests for multiprocess * move multiprocess code from cli/coverage and cli/sample to api * add nproc to test_sample and reduce the random sampling size * decrease test_sample sampling value * change frac to 0.1 in test_sample * regulate tolerance in test_sample * 1.move logging into decorator 2. use map_functor as the argument name 3. add explicit docstring to pool_decorator 4. all pool.terminate() 5. support third-party map functor * remove unused imports * remove unused imports in cli * unified definitions of nproc and map_functor * Update pool_decorator docstring --------- Co-authored-by: Nezar Abdennur --- cooltools/api/coverage.py | 27 +++++---- cooltools/api/directionality.py | 2 - cooltools/api/dotfinder.py | 43 +++++--------- cooltools/api/expected.py | 97 +++++++++++++++----------------- cooltools/api/insulation.py | 52 ++++++++--------- cooltools/api/rearrange.py | 1 - cooltools/api/sample.py | 20 +++++-- cooltools/api/snipping.py | 27 ++++----- cooltools/api/virtual4c.py | 20 +++---- cooltools/cli/coverage.py | 22 ++------ cooltools/cli/dots.py | 6 +- cooltools/cli/eigs_trans.py | 1 - cooltools/cli/expected_cis.py | 4 -- cooltools/cli/expected_trans.py | 4 -- cooltools/cli/logbin_expected.py | 2 - cooltools/cli/pileup.py | 1 - cooltools/cli/saddle.py | 2 - cooltools/cli/sample.py | 35 ++++-------- cooltools/lib/common.py | 45 ++++++++------- tests/test_call-dots.py | 22 ++++++++ tests/test_coverage.py | 13 +++++ tests/test_expected.py | 25 +++++++- tests/test_insulation.py | 15 ++++- tests/test_sample.py | 20 ++++--- tests/test_snipping.py | 11 ++++ tests/test_virtual4c.py | 4 ++ 26 files changed, 274 insertions(+), 247 deletions(-) diff --git a/cooltools/api/coverage.py b/cooltools/api/coverage.py index 2737876f..5430925d 100644 --- a/cooltools/api/coverage.py +++ b/cooltools/api/coverage.py @@ -1,6 +1,8 @@ import numpy as np import cooler.parallel from ..lib.checks import is_cooler_balanced +from ..lib.common import pool_decorator + def _apply_balancing(chunk, bias, balanced_column_name='balanced'): @@ -60,16 +62,17 @@ def _get_chunk_coverage(chunk, pixel_weight_key="count"): return covs - +@pool_decorator def coverage( clr, ignore_diags=None, chunksize=int(1e7), - map=map, use_lock=False, clr_weight_name=None, store=False, store_prefix="cov", + nproc=1, + map_functor=map, ): """ @@ -83,25 +86,27 @@ def coverage( ---------- clr : cooler.Cooler Cooler object + ignore_diags : int, optional + Drop elements occurring on the first ``ignore_diags`` diagonals of the + matrix (including the main diagonal). + If None, equals the number of diagonals ignored during IC balancing. chunksize : int, optional Split the contact matrix pixel records into equally sized chunks to save memory and/or parallelize. Default is 10^7 - map : callable, optional - Map function to dispatch the matrix chunks to workers. - Default is the builtin ``map``, but alternatives include parallel map - implementations from a multiprocessing pool. clr_weight_name : str Name of the weight column. Specify to calculate coverage of balanced cooler. - ignore_diags : int, optional - Drop elements occurring on the first ``ignore_diags`` diagonals of the - matrix (including the main diagonal). - If None, equals the number of diagonals ignored during IC balancing. store : bool, optional If True, store the results in the input cooler file when finished. If clr_weight_name=None, also stores total cis counts in the cooler info. Default is False. store_prefix : str, optional Name prefix of the columns of the bin table to save cis and total coverages. Will add suffixes _cis and _tot, as well as _raw in the default case or _clr_weight_name if specified. + nproc : int, optional + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -136,7 +141,7 @@ def coverage( f"balancing weight {clr_weight_name} is not available in the cooler." ) - chunks = cooler.parallel.split(clr, chunksize=chunksize, map=map, use_lock=use_lock) + chunks = cooler.parallel.split(clr, chunksize=chunksize, map=map_functor, use_lock=use_lock) if ignore_diags: chunks = chunks.pipe(_zero_diags, n_diags=ignore_diags) diff --git a/cooltools/api/directionality.py b/cooltools/api/directionality.py index 41b83613..86d79bc6 100644 --- a/cooltools/api/directionality.py +++ b/cooltools/api/directionality.py @@ -1,8 +1,6 @@ import warnings import numpy as np import pandas as pd -from ..lib import peaks, numutils - def _dirscore( pixels, bins, window=10, ignore_diags=2, balanced=True, signed_chi2=False diff --git a/cooltools/api/dotfinder.py b/cooltools/api/dotfinder.py index 4ed37cd2..2709aa0b 100644 --- a/cooltools/api/dotfinder.py +++ b/cooltools/api/dotfinder.py @@ -75,15 +75,12 @@ """ from functools import partial, reduce -import multiprocess as mp import logging import warnings import time -from scipy.linalg import toeplitz from scipy.ndimage import convolve from scipy.stats import poisson -from scipy.sparse import coo_matrix import numpy as np import pandas as pd from sklearn.cluster import Birch @@ -96,8 +93,8 @@ is_compatible_viewframe, is_valid_expected, ) +from ..lib.common import pool_decorator -from bioframe import make_viewframe warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning) logging.basicConfig(level=logging.INFO) @@ -1265,7 +1262,7 @@ def cluster_filtering_hiccups( # large helper functions wrapping smaller step-specific ones #################################################################### - +@pool_decorator def scoring_and_histogramming_step( clr, expected_indexed, @@ -1277,6 +1274,7 @@ def scoring_and_histogramming_step( max_nans_tolerated, loci_separation_bins, nproc, + map_functor=map, ): """ This implements the 1st step of the lambda-binning scoring procedure - histogramming. @@ -1306,21 +1304,13 @@ def scoring_and_histogramming_step( # standard multiprocessing implementation if nproc > 1: - logging.info(f"creating a Pool of {nproc} workers to tackle {len(tiles)} tiles") - pool = mp.Pool(nproc) - map_ = pool.imap map_kwargs = dict(chunksize=int(np.ceil(len(tiles) / nproc))) else: - logging.info("fallback to serial implementation.") - map_ = map map_kwargs = {} - try: - # consider using - # https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59 - histogram_chunks = map_(job, tiles, **map_kwargs) - finally: - if nproc > 1: - pool.close() + + # consider using + # https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59 + histogram_chunks = map_functor(job, tiles, **map_kwargs) # now we need to combine/sum all of the histograms for different kernels: def _sum_hists(hx, hy): @@ -1351,7 +1341,7 @@ def _sum_hists(hx, hy): # returning filtered histogram return final_hist - +@pool_decorator def scoring_and_extraction_step( clr, expected_indexed, @@ -1366,6 +1356,7 @@ def scoring_and_extraction_step( nproc, bin1_id_name="bin1_id", bin2_id_name="bin2_id", + map_functor=map, ): """ This implements the 2nd step of the lambda-binning scoring procedure, @@ -1402,21 +1393,15 @@ def scoring_and_extraction_step( # standard multiprocessing implementation if nproc > 1: logging.info(f"creating a Pool of {nproc} workers to tackle {len(tiles)} tiles") - pool = mp.Pool(nproc) - map_ = pool.imap map_kwargs = dict(chunksize=int(np.ceil(len(tiles) / nproc))) else: logging.info("fallback to serial implementation.") - map_ = map map_kwargs = {} - try: - # consider using - # https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59 - filtered_pix_chunks = map_(job, tiles, **map_kwargs) - significant_pixels = pd.concat(filtered_pix_chunks, ignore_index=True) - finally: - if nproc > 1: - pool.close() + + # TODO: try using cooler.parallel.MultiplexDataPipe for pipelining the steps + filtered_pix_chunks = map_functor(job, tiles, **map_kwargs) + significant_pixels = pd.concat(filtered_pix_chunks, ignore_index=True) + # same pixels should never be scored >1 times with the current tiling of the interactions matrix if significant_pixels.duplicated().any(): raise ValueError( diff --git a/cooltools/api/expected.py b/cooltools/api/expected.py index 40273f22..8d2fbeac 100644 --- a/cooltools/api/expected.py +++ b/cooltools/api/expected.py @@ -2,7 +2,6 @@ from functools import partial import warnings -import multiprocess as mp import numpy as np import pandas as pd @@ -18,6 +17,7 @@ from ..lib.schemas import diag_expected_dtypes, block_expected_dtypes from ..sandbox import expected_smoothing +from ..lib.common import pool_decorator # common expected_df column names, take from schemas _REGION1 = list(diag_expected_dtypes)[0] @@ -877,6 +877,7 @@ def blocksum_pairwise( # user-friendly wrapper for diagsum_symm and diagsum_pairwise - part of new "public" API +@pool_decorator def expected_cis( clr, view_df=None, @@ -888,6 +889,7 @@ def expected_cis( ignore_diags=2, # should default to cooler info chunksize=10_000_000, nproc=1, + map_functor=map, ): """ Calculate average interaction frequencies as a function of genomic @@ -932,7 +934,11 @@ def expected_cis( chunksize : int, optional Size of pixel table chunks to process nproc : int, optional - How many processes to use for calculation + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -972,44 +978,34 @@ def expected_cis( weight1 = clr_weight_name + "1" weight2 = clr_weight_name + "2" transforms = {"balanced": lambda p: p["count"] * p[weight1] * p[weight2]} + else: raise ValueError( "cooler is not balanced, or" f"balancing weight {clr_weight_name} is not available in the cooler." ) - # execution details - if nproc > 1: - pool = mp.Pool(nproc) - map_ = pool.map - else: - map_ = map - # using try-clause to close mp.Pool properly - try: - if intra_only: - result = diagsum_symm( - clr, - view_df, - transforms=transforms, - clr_weight_name=clr_weight_name, - ignore_diags=ignore_diags, - chunksize=chunksize, - map=map_, - ) - else: - result = diagsum_pairwise( - clr, - view_df, - transforms=transforms, - clr_weight_name=clr_weight_name, - ignore_diags=ignore_diags, - chunksize=chunksize, - map=map_, - ) - finally: - if nproc > 1: - pool.close() + if intra_only: + result = diagsum_symm( + clr, + view_df, + transforms=transforms, + clr_weight_name=clr_weight_name, + ignore_diags=ignore_diags, + chunksize=chunksize, + map=map_functor, + ) + else: + result = diagsum_pairwise( + clr, + view_df, + transforms=transforms, + clr_weight_name=clr_weight_name, + ignore_diags=ignore_diags, + chunksize=chunksize, + map=map_functor, + ) # calculate actual averages by dividing sum by n_valid: for key in chain(["count"], transforms): @@ -1046,12 +1042,14 @@ def expected_cis( # user-friendly wrapper for diagsum_symm and diagsum_pairwise - part of new "public" API +@pool_decorator def expected_trans( clr, view_df=None, clr_weight_name="weight", chunksize=10_000_000, nproc=1, + map_functor=map, ): """ Calculate average interaction frequencies for inter-chromosomal @@ -1080,7 +1078,11 @@ def expected_trans( chunksize : int, optional Size of pixel table chunks to process nproc : int, optional - How many processes to use for calculation + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -1115,6 +1117,7 @@ def expected_trans( weight1 = clr_weight_name + "1" weight2 = clr_weight_name + "2" transforms = {"balanced": lambda p: p["count"] * p[weight1] * p[weight2]} + else: raise ValueError( "cooler is not balanced, or" @@ -1122,25 +1125,15 @@ def expected_trans( ) # execution details - if nproc > 1: - pool = mp.Pool(nproc) - map_ = pool.map - else: - map_ = map - # using try-clause to close mp.Pool properly - try: - result = blocksum_pairwise( - clr, - view_df, - transforms=transforms, - clr_weight_name=clr_weight_name, - chunksize=chunksize, - map=map_, - ) - finally: - if nproc > 1: - pool.close() + result = blocksum_pairwise( + clr, + view_df, + transforms=transforms, + clr_weight_name=clr_weight_name, + chunksize=chunksize, + map=map_functor, + ) # keep only trans interactions for the user-friendly function: _name_to_region = view_df.set_index("name") diff --git a/cooltools/api/insulation.py b/cooltools/api/insulation.py index 2c2a87e8..f0fda853 100644 --- a/cooltools/api/insulation.py +++ b/cooltools/api/insulation.py @@ -4,7 +4,6 @@ logging.basicConfig(level=logging.INFO) import warnings -import multiprocess as mp from functools import partial import numpy as np @@ -16,7 +15,7 @@ from ..lib import peaks, numutils from ..lib.checks import is_compatible_viewframe, is_cooler_balanced -from ..lib.common import make_cooler_view +from ..lib.common import make_cooler_view, pool_decorator def get_n_pixels(bad_bin_mask, window=10, ignore_diags=2): @@ -151,7 +150,7 @@ def insul_diamond( return score, n_pixels, sum_balanced, sum_counts - +@pool_decorator def calculate_insulation_score( clr, window_bp, @@ -164,6 +163,7 @@ def calculate_insulation_score( clr_weight_name="weight", verbose=False, nproc=1, + map_functor=map, ): """Calculate the diamond insulation scores for all bins in a cooler. @@ -194,7 +194,11 @@ def calculate_insulation_score( verbose : bool If True, report real-time progress. nproc : int, optional - How many processes to use for calculation + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -252,33 +256,21 @@ def calculate_insulation_score( ) # Calculate insulation score for each region separately. - # Define mapper depending on requested number of threads: - if nproc > 1: - pool = mp.Pool(nproc) - map_ = pool.map - else: - map_ = map - # Using try-clause to close mp.Pool properly - try: - # Apply get_region_insulation: - job = partial( - _get_region_insulation, - clr, - is_bad_bin_key, - clr_weight_name, - chunksize, - window_bp, - min_dist_bad_bin, - ignore_diags, - append_raw_scores, - verbose, - ) - ins_region_tables = map_(job, view_df[["chrom", "start", "end", "name"]].values) - - finally: - if nproc > 1: - pool.close() + # Apply get_region_insulation: + job = partial( + _get_region_insulation, + clr, + is_bad_bin_key, + clr_weight_name, + chunksize, + window_bp, + min_dist_bad_bin, + ignore_diags, + append_raw_scores, + verbose, + ) + ins_region_tables = map_functor(job, view_df[["chrom", "start", "end", "name"]].values) ins_table = pd.concat(ins_region_tables) return ins_table diff --git a/cooltools/api/rearrange.py b/cooltools/api/rearrange.py index 3cf7d88e..1de7403c 100644 --- a/cooltools/api/rearrange.py +++ b/cooltools/api/rearrange.py @@ -1,7 +1,6 @@ import bioframe as bf import cooler import numpy as np -import pandas as pd from ..lib.checks import is_compatible_viewframe import logging diff --git a/cooltools/api/sample.py b/cooltools/api/sample.py index a91473e2..ee24b872 100644 --- a/cooltools/api/sample.py +++ b/cooltools/api/sample.py @@ -4,6 +4,8 @@ import cooler import cooler.parallel from .coverage import coverage +from ..lib.common import pool_decorator + def sample_pixels_approx(pixels, frac): @@ -43,7 +45,7 @@ def sample_pixels_exact(pixels, count): def _extract_pixel_chunk(chunk): return chunk["pixels"] - +@pool_decorator def sample( clr, out_clr_path, @@ -51,8 +53,9 @@ def sample( cis_count=None, frac=None, exact=False, - map_func=map, chunksize=int(1e7), + nproc=1, + map_functor=map, ): """ Pick a random subset of contacts from a Hi-C map. @@ -83,12 +86,17 @@ def sample( If False, binomial sampling will be used instead and the sample size will be randomly distributed around the target value. - map_func : function - A map implementation. - chunksize : int The number of pixels loaded and processed per step of computation. + nproc : int, optional + How many processes to use for calculation. Ignored if map_functor is passed. + + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. + """ if issubclass(type(clr), str): clr = cooler.Cooler(clr) @@ -121,7 +129,7 @@ def sample( else: pipeline = ( cooler.parallel.split( - clr, include_bins=False, map=map_func, chunksize=chunksize + clr, include_bins=False, map=map_functor, chunksize=chunksize ) .pipe(_extract_pixel_chunk) .pipe(sample_pixels_approx, frac=frac) diff --git a/cooltools/api/snipping.py b/cooltools/api/snipping.py index a03a4133..3d02df60 100644 --- a/cooltools/api/snipping.py +++ b/cooltools/api/snipping.py @@ -36,19 +36,16 @@ import numpy as np import pandas as pd -import bioframe from ..lib.checks import ( is_compatible_viewframe, is_cooler_balanced, is_valid_expected, ) -from ..lib.common import assign_view_auto, make_cooler_view - +from ..lib.common import assign_view_auto, make_cooler_view, pool_decorator from ..lib.numutils import LazyToeplitz import warnings -import multiprocessing def expand_align_features(features_df, flank, resolution, format="bed"): @@ -818,7 +815,7 @@ def snip(self, exp, region1, region2, tup): snippet[D] = np.nan return snippet - +@pool_decorator def pileup( clr, features_df, @@ -829,6 +826,7 @@ def pileup( min_diag="auto", clr_weight_name="weight", nproc=1, + map_functor=map, ): """ Pileup features over the cooler. @@ -858,8 +856,12 @@ def pileup( Value of the column that contains the balancing weights force : bool Allows start>end in the features (not implemented) - nproc : str - How many cores to use + nproc : int, optional + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -978,15 +980,8 @@ def pileup( expected_value_col=expected_value_col, ) - if nproc > 1: - pool = multiprocessing.Pool(nproc) - mymap = pool.map - else: - mymap = map - stack = _pileup(features_df, snipper.select, snipper.snip, map=mymap) + stack = _pileup(features_df, snipper.select, snipper.snip, map=map_functor) if feature_type == "bed": stack = np.fmax(stack, np.transpose(stack, axes=(0, 2, 1))) - - if nproc > 1: - pool.close() + return stack diff --git a/cooltools/api/virtual4c.py b/cooltools/api/virtual4c.py index 067469d3..197279a1 100644 --- a/cooltools/api/virtual4c.py +++ b/cooltools/api/virtual4c.py @@ -1,18 +1,17 @@ -import re import logging logging.basicConfig(level=logging.INFO) -import multiprocess as mp from functools import partial import numpy as np import pandas as pd -import cooler import bioframe from ..lib.checks import is_cooler_balanced +from ..lib.common import pool_decorator + def _extract_profile(chrom, clr, clr_weight_name, viewpoint): @@ -56,12 +55,13 @@ def _extract_profile(chrom, clr, clr_weight_name, viewpoint): else: return pd.concat(to_return, ignore_index=True) - +@pool_decorator def virtual4c( clr, viewpoint, clr_weight_name="weight", nproc=1, + map_functor=map, ): """Generate genome-wide contact profile for a given viewpoint. @@ -76,7 +76,11 @@ def virtual4c( clr_weight_name : str Name of the column in the bin table with weight nproc : int, optional - How many processes to use for calculation + How many processes to use for calculation. Ignored if map_functor is passed. + map_functor : callable, optional + Map function to dispatch the matrix chunks to workers. + If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool; + If nproc=1 this defaults the builtin map. Returns ------- @@ -107,11 +111,7 @@ def virtual4c( _extract_profile, clr=clr, clr_weight_name=clr_weight_name, viewpoint=viewpoint ) - if nproc > 1: - with mp.Pool(nproc) as p: - counts = list(p.map(f, clr.chromnames)) - else: - counts = list(map(f, clr.chromnames)) + counts = list(map_functor(f, clr.chromnames)) # Concatenate all chrompsome dfs into one v4c = pd.concat(counts, ignore_index=True) diff --git a/cooltools/cli/coverage.py b/cooltools/cli/coverage.py index 98d82fdb..2e714a5d 100644 --- a/cooltools/cli/coverage.py +++ b/cooltools/cli/coverage.py @@ -1,12 +1,11 @@ import click import cooler -from .. import coverage - from . import cli from .. import api + import bioframe -import multiprocessing as mp + @cli.command() @@ -82,19 +81,10 @@ def coverage( clr = cooler.Cooler(cool_path) - if nproc > 1: - pool = mp.Pool(nproc) - _map = pool.imap - else: - _map = map - - try: - cis_cov, tot_cov = api.coverage.coverage( - clr, ignore_diags=ignore_diags, chunksize=chunksize, map=_map, store=store, clr_weight_name=clr_weight_name - ) - finally: - if nproc > 1: - pool.close() + cis_cov, tot_cov = api.coverage.coverage( + clr, ignore_diags=ignore_diags, chunksize=chunksize, nproc=nproc, store=store, clr_weight_name=clr_weight_name + ) + coverage_table = clr.bins()[:][["chrom", "start", "end"]] if clr_weight_name is None: diff --git a/cooltools/cli/dots.py b/cooltools/cli/dots.py index ea7e9021..b4c29275 100644 --- a/cooltools/cli/dots.py +++ b/cooltools/cli/dots.py @@ -1,16 +1,12 @@ -import os.path as op from functools import partial -import pandas as pd -import numpy as np import cooler -import bioframe import logging import click from . import cli from .. import api -from ..lib.common import make_cooler_view, assign_regions +from ..lib.common import make_cooler_view from ..lib.io import read_viewframe_from_file, read_expected_from_file from .util import validate_csv diff --git a/cooltools/cli/eigs_trans.py b/cooltools/cli/eigs_trans.py index bd3c9bbb..4466fe99 100644 --- a/cooltools/cli/eigs_trans.py +++ b/cooltools/cli/eigs_trans.py @@ -4,7 +4,6 @@ import bioframe from ..api import eigdecomp from ..lib.common import make_cooler_view -from ..lib.io import read_viewframe_from_file import click from .util import TabularFilePath, sniff_for_header diff --git a/cooltools/cli/expected_cis.py b/cooltools/cli/expected_cis.py index a103cb33..60dd2141 100644 --- a/cooltools/cli/expected_cis.py +++ b/cooltools/cli/expected_cis.py @@ -1,8 +1,4 @@ -import multiprocess as mp -import pandas as pd -from itertools import combinations import cooler -import bioframe from .. import api from ..lib.common import make_cooler_view from ..lib.io import read_viewframe_from_file diff --git a/cooltools/cli/expected_trans.py b/cooltools/cli/expected_trans.py index b8b4d6db..592227c4 100644 --- a/cooltools/cli/expected_trans.py +++ b/cooltools/cli/expected_trans.py @@ -1,8 +1,4 @@ -import multiprocess as mp -import pandas as pd -from itertools import combinations import cooler -import bioframe from .. import api from ..lib.common import make_cooler_view from ..lib.io import read_viewframe_from_file diff --git a/cooltools/cli/logbin_expected.py b/cooltools/cli/logbin_expected.py index 29ead332..37e6a2aa 100755 --- a/cooltools/cli/logbin_expected.py +++ b/cooltools/cli/logbin_expected.py @@ -1,5 +1,3 @@ -import pandas as pd -import numpy as np from functools import partial from ..api import expected from ..lib.io import read_expected_from_file diff --git a/cooltools/cli/pileup.py b/cooltools/cli/pileup.py index 5a32ba3c..fb8a095b 100644 --- a/cooltools/cli/pileup.py +++ b/cooltools/cli/pileup.py @@ -1,7 +1,6 @@ import pandas as pd import numpy as np import cooler -import bioframe from .. import api diff --git a/cooltools/cli/saddle.py b/cooltools/cli/saddle.py index 64eca7f9..855005d5 100644 --- a/cooltools/cli/saddle.py +++ b/cooltools/cli/saddle.py @@ -15,9 +15,7 @@ from ..lib.common import make_cooler_view, mask_cooler_bad_bins, align_track_with_cooler from ..lib.io import read_viewframe_from_file, read_expected_from_file -from ..lib.checks import is_track -from . import util from . import cli diff --git a/cooltools/cli/sample.py b/cooltools/cli/sample.py index 0b8d8ea0..a861b9dc 100644 --- a/cooltools/cli/sample.py +++ b/cooltools/cli/sample.py @@ -1,8 +1,5 @@ -import multiprocess as mp import click -import cooler - from . import cli from .. import api @@ -70,24 +67,14 @@ def random_sample(in_path, out_path, count, cis_count, frac, exact, nproc, chunk Specify the target sample size with either --count or --frac. """ - - if nproc > 1: - pool = mp.Pool(nproc) - map_ = pool.map - else: - map_ = map - - try: - api.sample.sample( - in_path, - out_path, - count=count, - cis_count=cis_count, - frac=frac, - exact=exact, - chunksize=chunksize, - map_func=map_, - ) - finally: - if nproc > 1: - pool.close() + + api.sample.sample( + in_path, + out_path, + count=count, + cis_count=cis_count, + frac=frac, + exact=exact, + chunksize=chunksize, + nproc=nproc + ) diff --git a/cooltools/lib/common.py b/cooltools/lib/common.py index 7946ef4f..5a841ae1 100644 --- a/cooltools/lib/common.py +++ b/cooltools/lib/common.py @@ -1,15 +1,10 @@ import warnings -from itertools import tee, starmap -from operator import gt -from copy import copy - import numpy as np import pandas as pd - import bioframe - -from multiprocessing import Pool - +from multiprocess import Pool +from functools import wraps +import logging def assign_view_paired( features, @@ -512,7 +507,10 @@ def align_track_with_cooler( def pool_decorator(func): """ - A decorator function that enables multiprocessing for a given function. + A decorator function that enables multiprocessing for a given function. + The function must have a ``map_functor`` keyword argument. + It works by hijacking map_functor argument and substituting it with the + parallel one: multiprocess.Pool(nproc).imap, when nproc > 1 Parameters ---------- @@ -523,24 +521,31 @@ def pool_decorator(func): ------- A wrapper function that enables multiprocessing for the given function. """ - + @wraps(func) def wrapper(*args, **kwargs): + # If alternative or third party map functors are provided + if "map_functor" in kwargs.keys(): + logging.info(f"using an alternative map functor: {kwargs['map_functor']}") + return func(*args, **kwargs, map_functor=kwargs["map_functor"]) + pool = None if "nproc" in kwargs.keys(): if kwargs["nproc"] > 1: + logging.info(f"creating a Pool of {kwargs['nproc']} workers") pool = Pool(kwargs["nproc"]) mymap = pool.imap else: + logging.info("fallback to serial implementation.") mymap = map - - func(*args, **kwargs, map=mymap) - + try: + result = func(*args, **kwargs, map_functor=mymap) + finally: + if pool is not None: + pool.close() + pool.terminate() + return result else: - raise TypeError( - "nproc must be specified as a keyword argument (e.g. nproc=1, nproc=5...)" - ) - - if pool != None: - pool.close() - + logging.warning("nproc is not specified, single thread is used") + return func(*args, **kwargs, map_functor=map) + return wrapper diff --git a/tests/test_call-dots.py b/tests/test_call-dots.py index e1dec94f..2b48fbf9 100644 --- a/tests/test_call-dots.py +++ b/tests/test_call-dots.py @@ -49,6 +49,27 @@ def test_dots(request): # just checking if it runs without errors assert not dot_calls_df.empty + dot_calls_df_pooled = api.dotfinder.dots( + clr, + expected_df, + view_df=view_df, + kernels={ + "d": np.array([[1, 0, 1], [0, 0, 0], [1, 0, 1]]), + "v": np.array([[0, 1, 0], [0, 0, 0], [0, 1, 0]]), + "h": np.array([[0, 0, 0], [1, 0, 1], [0, 0, 0]]), + }, + max_loci_separation=100_000_000, + max_nans_tolerated=1, + n_lambda_bins=50, + lambda_bin_fdr=0.1, + clustering_radius=False, + cluster_filtering=None, + tile_size=50_000_000, + nproc=3, + ) + + assert dot_calls_df.equals(dot_calls_df_pooled) + def test_call_dots_cli(request, tmpdir): in_cool = op.join(request.fspath.dirname, "data/CN.mm9.1000kb.cool") @@ -105,3 +126,4 @@ def test_call_dots_cli(request, tmpdir): # assert result.exit_code == 0 # # make sure output is generated: # assert op.isfile(out_dots) + diff --git a/tests/test_coverage.py b/tests/test_coverage.py index f79fb990..87c6ae91 100644 --- a/tests/test_coverage.py +++ b/tests/test_coverage.py @@ -17,6 +17,13 @@ def test_coverage_symmetric_upper(request): # Test that minimal coverage is larger than 0.5 assert tot_cov[tot_cov > 0].min() >= 1 + # Check multiprocessed result + cis_cov_pooled, tot_cov_pooled = cooltools.api.coverage.coverage( + clr, ignore_diags=2, chunksize=int(1e7), nproc=3 + ) + assert np.array_equal(cis_cov, cis_cov_pooled, equal_nan=True) + assert np.array_equal(tot_cov, tot_cov_pooled, equal_nan=True) + # Test that dense matrix marginal is the same: mtx = clr.matrix(balance=False, as_pixels=False)[:] @@ -63,6 +70,12 @@ def test_balanced_coverage(request): # Test that mean total balanced coverage is 1.0 assert np.nanmean(tot_cov_weight) == 1.0 + + cis_cov_weight_pooled, tot_cov_weight_pooled = cooltools.api.coverage.coverage( + clr, ignore_diags=2, chunksize=int(1e7), clr_weight_name="weight", nproc=3 + ) + assert np.array_equal(cis_cov_weight, cis_cov_weight_pooled, equal_nan=True) + assert np.array_equal(tot_cov_weight, tot_cov_weight_pooled, equal_nan=True) # Generate test matrix with weights bins=pd.DataFrame( diff --git a/tests/test_expected.py b/tests/test_expected.py index 82964585..31ee1fe7 100644 --- a/tests/test_expected.py +++ b/tests/test_expected.py @@ -272,6 +272,7 @@ def test_expected_cis(request): desired=desired_expected, equal_nan=True, ) + # asymm and symm result together - engaging diagsum_pairwise res_all = cooltools.api.expected.expected_cis( clr, @@ -299,6 +300,18 @@ def test_expected_cis(request): desired=desired_expected, equal_nan=True, ) + + # check multiprocessed result + res_all_pooled = cooltools.api.expected.expected_cis( + clr, + view_df=view_df, + intra_only=False, + clr_weight_name=clr_weight_name, + chunksize=chunksize, + ignore_diags=ignore_diags, + nproc=3 + ) + assert res_all.equals(res_all_pooled) def test_blocksum_pairwise(request): @@ -345,6 +358,16 @@ def test_expected_trans(request): equal_nan=True, ) + # Check multiprocessed result + res_pooled = cooltools.api.expected.expected_trans( + clr, + view_df=view_df, + clr_weight_name=clr_weight_name, + chunksize=chunksize, + nproc=3 + ) + assert res.equals(res_pooled) + ### Test CLI: def test_expected_cli(request, tmpdir): @@ -669,4 +692,4 @@ def test_diagsum_from_array(): exp = _diagsum_symm_dense(ar, bad_bins=list(range(3, 5))) exp1 = diagsum_from_array(ar, ignore_diags=0) exp1["balanced.avg"] = exp1["balanced.sum"] / exp1["n_valid"] - assert np.allclose(exp, exp1["balanced.avg"].values, equal_nan=True) + assert np.allclose(exp, exp1["balanced.avg"].values, equal_nan=True) \ No newline at end of file diff --git a/tests/test_insulation.py b/tests/test_insulation.py index dc32354c..17492e8d 100644 --- a/tests/test_insulation.py +++ b/tests/test_insulation.py @@ -59,6 +59,9 @@ def test_calculate_insulation_score(request): assert {f"n_valid_pixels_{window}" for window in windows}.issubset( insulation.columns ) + # check multiprocessed result + insulation_pooled = calculate_insulation_score(clr, windows, nproc=3) + assert insulation.equals(insulation_pooled) # II. Insulation with masking bad bins insulation = calculate_insulation_score(clr, 10_000_000, min_dist_bad_bin=1) @@ -70,6 +73,9 @@ def test_calculate_insulation_score(request): assert np.any( ~np.isnan(insulation.query("dist_bad_bin==1")["log2_insulation_score_10000000"]) ) + # check multiprocessed result + insulation_pooled = calculate_insulation_score(clr, 10_000_000, min_dist_bad_bin=1, nproc=3) + assert insulation.equals(insulation_pooled) # III. Insulation for separate view: region = pd.DataFrame( @@ -79,10 +85,15 @@ def test_calculate_insulation_score(request): clr, 10_000_000, min_dist_bad_bin=0, view_df=region ) assert len(insulation) == 10 + # check multiprocessed result + insulation_pooled = calculate_insulation_score( + clr, 10_000_000, min_dist_bad_bin=0, view_df=region, nproc=3 + ) + assert insulation.equals(insulation_pooled) # IV. Insulation with string or float inputs for window sizes should work. calculate_insulation_score(clr, '10_000_000') - + calculate_insulation_score(clr, '10_000_000', nproc=3) def test_find_boundaries(request): @@ -175,4 +186,4 @@ def test_insulation_sparse_vs_dense(request): insul_dense["log2_insulation_score_10000000"], boundaries_sparse["log2_insulation_score_10000000"], equal_nan=True, - ) + ) \ No newline at end of file diff --git a/tests/test_sample.py b/tests/test_sample.py index b5d723a2..8892f521 100644 --- a/tests/test_sample.py +++ b/tests/test_sample.py @@ -13,24 +13,26 @@ def test_sample(request): cooltools.api.sample.sample( clr, op.join(request.fspath.dirname, "data/CN.mm9.1000kb.test_sampled.cool"), - frac=0.5, + frac=0.2, + nproc=3 ) clr_result = cooler.Cooler( op.join(request.fspath.dirname, "data/CN.mm9.1000kb.test_sampled.cool") ) # Test that deviation from expected total is very small - testing.assert_allclose(clr_result.info["sum"], clr.info["sum"] / 2, rtol=1e-3) + testing.assert_allclose(clr_result.info["sum"], clr.info["sum"] / 5, rtol=1e-3) cooltools.api.sample.sample( clr, op.join(request.fspath.dirname, "data/CN.mm9.1000kb.test_sampled.cool"), - count=200000000, + count=20000000, + nproc=3 ) clr_result = cooler.Cooler( op.join(request.fspath.dirname, "data/CN.mm9.1000kb.test_sampled.cool") ) # Test that deviation from expected total is very small - testing.assert_allclose(clr_result.info["sum"], 200000000, rtol=1e-3) + testing.assert_allclose(clr_result.info["sum"], 20000000, rtol=1e-2) def test_sample_exact(request): @@ -40,23 +42,25 @@ def test_sample_exact(request): cooltools.api.sample.sample( clr, op.join(request.fspath.dirname, "data/CN.mm9.10000kb.test_sampled.cool"), - frac=0.5, + frac=0.2, exact=True, + nproc=3 ) clr_result = cooler.Cooler( op.join(request.fspath.dirname, "data/CN.mm9.10000kb.test_sampled.cool") ) # Test that result matches expectation exactly - testing.assert_equal(clr_result.info["sum"], round(clr.info["sum"] * 0.5)) + testing.assert_equal(clr_result.info["sum"], round(clr.info["sum"] * 0.2)) cooltools.api.sample.sample( clr, op.join(request.fspath.dirname, "data/CN.mm9.10000kb.test_sampled.cool"), - count=200000000, + count=2000000, exact=True, + nproc=3 ) clr_result = cooler.Cooler( op.join(request.fspath.dirname, "data/CN.mm9.10000kb.test_sampled.cool") ) # Test that result matches expectation exactly - testing.assert_equal(clr_result.info["sum"], 200000000) + testing.assert_equal(clr_result.info["sum"], 2000000) diff --git a/tests/test_snipping.py b/tests/test_snipping.py index 4406d8ff..603da4f3 100644 --- a/tests/test_snipping.py +++ b/tests/test_snipping.py @@ -110,6 +110,9 @@ def test_pileup(request): # Check that NaNs were propagated assert np.all(np.isnan(stack[0, 2, :])) assert not np.all(np.isnan(stack)) + # check multiprocessed result + stack_pooled = cooltools.api.snipping.pileup(clr, windows, view_df=None, flank=None, nproc=3) + assert np.array_equal(stack, stack_pooled, equal_nan=True) stack = cooltools.api.snipping.pileup( clr, windows, view_df=view_df, expected_df=exp, flank=None @@ -117,6 +120,7 @@ def test_pileup(request): # Check that the size of snips is OK and there are two of them. # Now with view and expected: assert stack.shape == (2, 5, 5) + # II. # Example off-diagonal features, two features from annotated genomic regions: @@ -135,6 +139,11 @@ def test_pileup(request): ) # Check that the size of snips is OK and there are two of them: assert stack.shape == (2, 5, 5) + # check multiprocessed result + stack_pooled = cooltools.api.snipping.pileup( + clr, windows, view_df=view_df, expected_df=exp, flank=None, nproc=3 + ) + assert np.array_equal(stack, stack_pooled, equal_nan=True) # III. # Example off-diagonal features, one region outside the view: @@ -167,6 +176,8 @@ def test_pileup(request): ) with pytest.raises(ValueError): stack = cooltools.api.snipping.pileup(clr, windows, view_df, exp, flank=None) + with pytest.raises(ValueError): + stack_pooled = cooltools.api.snipping.pileup(clr, windows, view_df, exp, flank=None, nproc=3) # DRAFT # Should work with force=True: # stack = cooltools.api.snipping.pileup(clr, windows, view_df, exp, flank=None, force=True) diff --git a/tests/test_virtual4c.py b/tests/test_virtual4c.py index df596fdd..1d3d2b11 100644 --- a/tests/test_virtual4c.py +++ b/tests/test_virtual4c.py @@ -16,6 +16,10 @@ def test_virtual4c(request): assert v4c.shape[0] == clr.bins()[:].shape[0] + # check multiprocessed result + pooled_v4c = virtual4c.virtual4c(clr, viewpoint, nproc=3) + assert v4c.equals(pooled_v4c) + def test_virtual4c_cli(request, tmpdir):