Skip to content

Commit

Permalink
Add pool decorator to functions (#489)
Browse files Browse the repository at this point in the history
* add pool decorator to previous functions

* Fix pool_decorator

* add pool_decorator to dotfinder.py and expected.py

* combined import command for pool_decorator with others

* add tests for pooled functions to check if outputs are the same as using regular map

* changed lambda func to a def func

* try to set start method for multiprocessing

* test pool.map insteand of pool.imap

* switch multiprocessing to multiprocess

* swith to  multiprocess

* change job back to lambda

* change transforms back to lambda function

* streamlined tests for multiprocess

* move multiprocess code from cli/coverage and cli/sample to api

* add nproc to test_sample and reduce the random sampling size

* decrease test_sample sampling value

* change frac to 0.1 in test_sample

* regulate tolerance in test_sample

* 1.move logging into decorator 2. use map_functor as the argument name 3. add explicit docstring to pool_decorator 4. all pool.terminate() 5. support third-party map functor

* remove unused imports

* remove unused imports in cli

* unified definitions of nproc and map_functor

* Update pool_decorator docstring

---------

Co-authored-by: Nezar Abdennur <nabdennur@gmail.com>
  • Loading branch information
Yaoyx and nvictus authored Feb 7, 2024
1 parent 6280617 commit 41340d4
Show file tree
Hide file tree
Showing 26 changed files with 274 additions and 247 deletions.
27 changes: 16 additions & 11 deletions cooltools/api/coverage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import numpy as np
import cooler.parallel
from ..lib.checks import is_cooler_balanced
from ..lib.common import pool_decorator



def _apply_balancing(chunk, bias, balanced_column_name='balanced'):
Expand Down Expand Up @@ -60,16 +62,17 @@ def _get_chunk_coverage(chunk, pixel_weight_key="count"):

return covs


@pool_decorator
def coverage(
clr,
ignore_diags=None,
chunksize=int(1e7),
map=map,
use_lock=False,
clr_weight_name=None,
store=False,
store_prefix="cov",
nproc=1,
map_functor=map,
):

"""
Expand All @@ -83,25 +86,27 @@ def coverage(
----------
clr : cooler.Cooler
Cooler object
ignore_diags : int, optional
Drop elements occurring on the first ``ignore_diags`` diagonals of the
matrix (including the main diagonal).
If None, equals the number of diagonals ignored during IC balancing.
chunksize : int, optional
Split the contact matrix pixel records into equally sized chunks to
save memory and/or parallelize. Default is 10^7
map : callable, optional
Map function to dispatch the matrix chunks to workers.
Default is the builtin ``map``, but alternatives include parallel map
implementations from a multiprocessing pool.
clr_weight_name : str
Name of the weight column. Specify to calculate coverage of balanced cooler.
ignore_diags : int, optional
Drop elements occurring on the first ``ignore_diags`` diagonals of the
matrix (including the main diagonal).
If None, equals the number of diagonals ignored during IC balancing.
store : bool, optional
If True, store the results in the input cooler file when finished. If clr_weight_name=None,
also stores total cis counts in the cooler info. Default is False.
store_prefix : str, optional
Name prefix of the columns of the bin table to save cis and total coverages.
Will add suffixes _cis and _tot, as well as _raw in the default case or _clr_weight_name if specified.
nproc : int, optional
How many processes to use for calculation. Ignored if map_functor is passed.
map_functor : callable, optional
Map function to dispatch the matrix chunks to workers.
If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool;
If nproc=1 this defaults the builtin map.
Returns
-------
Expand Down Expand Up @@ -136,7 +141,7 @@ def coverage(
f"balancing weight {clr_weight_name} is not available in the cooler."
)

chunks = cooler.parallel.split(clr, chunksize=chunksize, map=map, use_lock=use_lock)
chunks = cooler.parallel.split(clr, chunksize=chunksize, map=map_functor, use_lock=use_lock)

if ignore_diags:
chunks = chunks.pipe(_zero_diags, n_diags=ignore_diags)
Expand Down
2 changes: 0 additions & 2 deletions cooltools/api/directionality.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import warnings
import numpy as np
import pandas as pd
from ..lib import peaks, numutils


def _dirscore(
pixels, bins, window=10, ignore_diags=2, balanced=True, signed_chi2=False
Expand Down
43 changes: 14 additions & 29 deletions cooltools/api/dotfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,12 @@
"""

from functools import partial, reduce
import multiprocess as mp
import logging
import warnings
import time

from scipy.linalg import toeplitz
from scipy.ndimage import convolve
from scipy.stats import poisson
from scipy.sparse import coo_matrix
import numpy as np
import pandas as pd
from sklearn.cluster import Birch
Expand All @@ -96,8 +93,8 @@
is_compatible_viewframe,
is_valid_expected,
)
from ..lib.common import pool_decorator

from bioframe import make_viewframe

warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -1265,7 +1262,7 @@ def cluster_filtering_hiccups(
# large helper functions wrapping smaller step-specific ones
####################################################################


@pool_decorator
def scoring_and_histogramming_step(
clr,
expected_indexed,
Expand All @@ -1277,6 +1274,7 @@ def scoring_and_histogramming_step(
max_nans_tolerated,
loci_separation_bins,
nproc,
map_functor=map,
):
"""
This implements the 1st step of the lambda-binning scoring procedure - histogramming.
Expand Down Expand Up @@ -1306,21 +1304,13 @@ def scoring_and_histogramming_step(

# standard multiprocessing implementation
if nproc > 1:
logging.info(f"creating a Pool of {nproc} workers to tackle {len(tiles)} tiles")
pool = mp.Pool(nproc)
map_ = pool.imap
map_kwargs = dict(chunksize=int(np.ceil(len(tiles) / nproc)))
else:
logging.info("fallback to serial implementation.")
map_ = map
map_kwargs = {}
try:
# consider using
# https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59
histogram_chunks = map_(job, tiles, **map_kwargs)
finally:
if nproc > 1:
pool.close()

# consider using
# https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59
histogram_chunks = map_functor(job, tiles, **map_kwargs)

# now we need to combine/sum all of the histograms for different kernels:
def _sum_hists(hx, hy):
Expand Down Expand Up @@ -1351,7 +1341,7 @@ def _sum_hists(hx, hy):
# returning filtered histogram
return final_hist


@pool_decorator
def scoring_and_extraction_step(
clr,
expected_indexed,
Expand All @@ -1366,6 +1356,7 @@ def scoring_and_extraction_step(
nproc,
bin1_id_name="bin1_id",
bin2_id_name="bin2_id",
map_functor=map,
):
"""
This implements the 2nd step of the lambda-binning scoring procedure,
Expand Down Expand Up @@ -1402,21 +1393,15 @@ def scoring_and_extraction_step(
# standard multiprocessing implementation
if nproc > 1:
logging.info(f"creating a Pool of {nproc} workers to tackle {len(tiles)} tiles")
pool = mp.Pool(nproc)
map_ = pool.imap
map_kwargs = dict(chunksize=int(np.ceil(len(tiles) / nproc)))
else:
logging.info("fallback to serial implementation.")
map_ = map
map_kwargs = {}
try:
# consider using
# https://github.com/mirnylab/cooler/blob/9e72ee202b0ac6f9d93fd2444d6f94c524962769/cooler/tools.py#L59
filtered_pix_chunks = map_(job, tiles, **map_kwargs)
significant_pixels = pd.concat(filtered_pix_chunks, ignore_index=True)
finally:
if nproc > 1:
pool.close()

# TODO: try using cooler.parallel.MultiplexDataPipe for pipelining the steps
filtered_pix_chunks = map_functor(job, tiles, **map_kwargs)
significant_pixels = pd.concat(filtered_pix_chunks, ignore_index=True)

# same pixels should never be scored >1 times with the current tiling of the interactions matrix
if significant_pixels.duplicated().any():
raise ValueError(
Expand Down
97 changes: 45 additions & 52 deletions cooltools/api/expected.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from functools import partial

import warnings
import multiprocess as mp

import numpy as np
import pandas as pd
Expand All @@ -18,6 +17,7 @@
from ..lib.schemas import diag_expected_dtypes, block_expected_dtypes

from ..sandbox import expected_smoothing
from ..lib.common import pool_decorator

# common expected_df column names, take from schemas
_REGION1 = list(diag_expected_dtypes)[0]
Expand Down Expand Up @@ -877,6 +877,7 @@ def blocksum_pairwise(


# user-friendly wrapper for diagsum_symm and diagsum_pairwise - part of new "public" API
@pool_decorator
def expected_cis(
clr,
view_df=None,
Expand All @@ -888,6 +889,7 @@ def expected_cis(
ignore_diags=2, # should default to cooler info
chunksize=10_000_000,
nproc=1,
map_functor=map,
):
"""
Calculate average interaction frequencies as a function of genomic
Expand Down Expand Up @@ -932,7 +934,11 @@ def expected_cis(
chunksize : int, optional
Size of pixel table chunks to process
nproc : int, optional
How many processes to use for calculation
How many processes to use for calculation. Ignored if map_functor is passed.
map_functor : callable, optional
Map function to dispatch the matrix chunks to workers.
If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool;
If nproc=1 this defaults the builtin map.
Returns
-------
Expand Down Expand Up @@ -972,44 +978,34 @@ def expected_cis(
weight1 = clr_weight_name + "1"
weight2 = clr_weight_name + "2"
transforms = {"balanced": lambda p: p["count"] * p[weight1] * p[weight2]}

else:
raise ValueError(
"cooler is not balanced, or"
f"balancing weight {clr_weight_name} is not available in the cooler."
)

# execution details
if nproc > 1:
pool = mp.Pool(nproc)
map_ = pool.map
else:
map_ = map

# using try-clause to close mp.Pool properly
try:
if intra_only:
result = diagsum_symm(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
ignore_diags=ignore_diags,
chunksize=chunksize,
map=map_,
)
else:
result = diagsum_pairwise(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
ignore_diags=ignore_diags,
chunksize=chunksize,
map=map_,
)
finally:
if nproc > 1:
pool.close()
if intra_only:
result = diagsum_symm(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
ignore_diags=ignore_diags,
chunksize=chunksize,
map=map_functor,
)
else:
result = diagsum_pairwise(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
ignore_diags=ignore_diags,
chunksize=chunksize,
map=map_functor,
)

# calculate actual averages by dividing sum by n_valid:
for key in chain(["count"], transforms):
Expand Down Expand Up @@ -1046,12 +1042,14 @@ def expected_cis(


# user-friendly wrapper for diagsum_symm and diagsum_pairwise - part of new "public" API
@pool_decorator
def expected_trans(
clr,
view_df=None,
clr_weight_name="weight",
chunksize=10_000_000,
nproc=1,
map_functor=map,
):
"""
Calculate average interaction frequencies for inter-chromosomal
Expand Down Expand Up @@ -1080,7 +1078,11 @@ def expected_trans(
chunksize : int, optional
Size of pixel table chunks to process
nproc : int, optional
How many processes to use for calculation
How many processes to use for calculation. Ignored if map_functor is passed.
map_functor : callable, optional
Map function to dispatch the matrix chunks to workers.
If left unspecified, pool_decorator applies the following defaults: if nproc>1 this defaults to multiprocess.Pool;
If nproc=1 this defaults the builtin map.
Returns
-------
Expand Down Expand Up @@ -1115,32 +1117,23 @@ def expected_trans(
weight1 = clr_weight_name + "1"
weight2 = clr_weight_name + "2"
transforms = {"balanced": lambda p: p["count"] * p[weight1] * p[weight2]}

else:
raise ValueError(
"cooler is not balanced, or"
f"balancing weight {clr_weight_name} is not available in the cooler."
)

# execution details
if nproc > 1:
pool = mp.Pool(nproc)
map_ = pool.map
else:
map_ = map

# using try-clause to close mp.Pool properly
try:
result = blocksum_pairwise(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
chunksize=chunksize,
map=map_,
)
finally:
if nproc > 1:
pool.close()
result = blocksum_pairwise(
clr,
view_df,
transforms=transforms,
clr_weight_name=clr_weight_name,
chunksize=chunksize,
map=map_functor,
)

# keep only trans interactions for the user-friendly function:
_name_to_region = view_df.set_index("name")
Expand Down
Loading

0 comments on commit 41340d4

Please sign in to comment.