Skip to content

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
  • Loading branch information
parashardhapola committed Nov 22, 2023
2 parents c1ffe23 + e8f4482 commit 480f8af
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 33 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.28.6
0.28.7
5 changes: 3 additions & 2 deletions scarf/assay.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,10 +974,11 @@ def col_renamer(x):
min_mean = 2**min_mean
if min_var != -np.Inf:
min_var = 2**min_var

if blacklist != "":
bl = self.feats.index_to_bool(
self.feats.get_index_by(self.feats.grep(blacklist), "names"), invert=True
self.feats.get_index_by(self.feats.grep(blacklist), "names"),
invert=True,
)
else:
bl = np.ones(self.feats.N).astype(bool)
Expand Down
45 changes: 31 additions & 14 deletions scarf/datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def run_marker_search(
from_assay: Optional[str] = None,
group_key: Optional[str] = None,
cell_key: Optional[str] = None,
feat_key: Optional[str] = None,
gene_batch_size: int = 50,
use_prenormed: bool = False,
prenormed_store: Optional[str] = None,
Expand All @@ -347,6 +348,7 @@ def run_marker_search(
how the cells will be grouped. Usually this would be a column denoting cell clusters.
cell_key: To run the test on specific subset of cells, provide the name of a boolean column in
the cell metadata table. (Default value: 'I')
feat_key:
gene_batch_size: Number of genes to be loaded in memory at a time. All cells (from ell_key) are loaded for
these number of cells at a time.
use_prenormed: If True then prenormalized cache generated using Assay.save_normed_for_query is used.
Expand All @@ -367,17 +369,20 @@ def run_marker_search(
)
if cell_key is None:
cell_key = "I"
if feat_key is None:
feat_key = "I"
if n_threads is None:
n_threads = self.nthreads
assay = self._get_assay(from_assay)
markers = find_markers_by_rank(
assay,
group_key,
cell_key,
gene_batch_size,
use_prenormed,
prenormed_store,
n_threads,
assay=assay,
group_key=group_key,
cell_key=cell_key,
feat_key=feat_key,
batch_size=gene_batch_size,
use_prenormed=use_prenormed,
prenormed_store=prenormed_store,
n_threads=n_threads,
**norm_params,
)
z = self.zw[assay.name]
Expand All @@ -397,6 +402,7 @@ def run_pseudotime_marker_search(
self,
from_assay: Optional[str] = None,
cell_key: Optional[str] = None,
feat_key: Optional[str] = None,
pseudotime_key: Optional[str] = None,
min_cells: int = 10,
gene_batch_size: int = 50,
Expand All @@ -412,6 +418,7 @@ def run_pseudotime_marker_search(
from_assay: Name of the assay to be used. If no value is provided then the default assay will be used.
cell_key: To run the test on specific subset of cells, provide the name of a boolean column in
the cell metadata table. (Default value: 'I')
feat_key:
pseudotime_key: Required parameter. This has to be a column name from cell metadata table. This column
contains values for pseudotime ordering of the cells.
min_cells: Minimum number of cells where a gene should have non-zero value to be considered for test.
Expand All @@ -431,10 +438,18 @@ def run_pseudotime_marker_search(
)
if cell_key is None:
cell_key = "I"
if feat_key is None:
feat_key = "I"
assay = self._get_assay(from_assay)
ptime = assay.cells.fetch(pseudotime_key, key=cell_key)
markers = find_markers_by_regression(
assay, cell_key, ptime, min_cells, gene_batch_size, **norm_params
assay=assay,
cell_key=cell_key,
feat_key=feat_key,
regressor=ptime,
min_cells=min_cells,
batch_size=gene_batch_size,
**norm_params,
)
assay.feats.insert(
f"{cell_key}__{pseudotime_key}__r",
Expand Down Expand Up @@ -463,7 +478,7 @@ def run_pseudotime_aggregation(
n_clusters: int = 10,
batch_size: int = 100,
ann_params: Optional[dict] = None,
nan_cluster_value: Union[int, str] = -1
nan_cluster_value: Union[int, str] = -1,
) -> None:
"""This method performs clustering of features based on pseudotime
ordered cells. The values from the pseudotime ordered cells are
Expand Down Expand Up @@ -675,9 +690,7 @@ def export_markers_to_csv(
raise ValueError(
"ERROR: Please provide a value for parameter `csv_filename`"
)
from_assay, cell_key, _ = self._get_latest_keys(
from_assay, cell_key, None
)
from_assay, cell_key, _ = self._get_latest_keys(from_assay, cell_key, None)
clusters = self.cells.fetch(group_key, key=cell_key)
markers_table = {}
for group_id in sorted(set(clusters)):
Expand Down Expand Up @@ -768,7 +781,9 @@ def run_cell_cycle_scoring(
g2m_score_label = self._col_renamer(from_assay, cell_key, g2m_score_label)
self.cells.insert(g2m_score_label, g2m_score, key=cell_key, overwrite=True)

phase = pd.Series(["S" for _ in range(self.cells.fetch(cell_key, key=cell_key).sum())])
phase = pd.Series(
["S" for _ in range(self.cells.fetch(cell_key, key=cell_key).sum())]
)
phase[g2m_score > s_score] = "G2M"
phase[(g2m_score < 0) & (s_score < 0)] = "G1"
phase_label = self._col_renamer(from_assay, cell_key, phase_label)
Expand Down Expand Up @@ -991,7 +1006,9 @@ def make_reps(v, n_reps: int, seed: int) -> List[np.ndarray]:
sec_groups_set = [None]
else:
sec_groups = self.cells.fetch_all(secondary_group_key)
sec_groups_set = sorted(set(self.cells.fetch(secondary_group_key, key=cell_key)))
sec_groups_set = sorted(
set(self.cells.fetch(secondary_group_key, key=cell_key))
)

assay = self._get_assay(from_assay)

Expand Down
4 changes: 3 additions & 1 deletion scarf/datastore/graph_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,9 @@ def pseudo_inverse(lap, k, rseed, r):
)
ss_vec = np.ones(graph.shape[0])
else:
clusts = pd.Series(self.cells.fetch(source_sink_key, key=cell_key)[cell_idx])
clusts = pd.Series(
self.cells.fetch(source_sink_key, key=cell_key)[cell_idx]
)
if sources is None:
sources = []
else:
Expand Down
20 changes: 14 additions & 6 deletions scarf/markers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def find_markers_by_rank(
assay: Assay,
group_key: str,
cell_key: str,
feat_key: str,
batch_size: int,
use_prenormed: bool,
prenormed_store: Optional[str],
Expand All @@ -37,6 +38,7 @@ def find_markers_by_rank(
assay:
group_key:
cell_key:
feat_key:
batch_size:
use_prenormed:
prenormed_store:
Expand Down Expand Up @@ -141,11 +143,15 @@ def prenormed_mean_rank_wrapper(gene_idx):
return results
else:
batch_iterator = assay.iter_normed_feature_wise(
cell_key, "I", batch_size, "Finding markers", **norm_params
cell_key=cell_key,
feat_key=feat_key,
batch_size=batch_size,
msg="Finding markers",
**norm_params,
)
temp = np.vstack([calc(x) for x in batch_iterator])
results = {}
feat_index = assay.feats.active_index("I")
feat_index = assay.feats.active_index(feat_key)
for n, i in enumerate(group_set):
results[i] = (
pd.DataFrame(temp[:, n, :], columns=out_cols[1:], index=feat_index)
Expand All @@ -160,6 +166,7 @@ def prenormed_mean_rank_wrapper(gene_idx):
def find_markers_by_regression(
assay: Assay,
cell_key: str,
feat_key: str,
regressor: np.ndarray,
min_cells: int,
batch_size: int = 50,
Expand All @@ -170,6 +177,7 @@ def find_markers_by_regression(
Args:
assay:
cell_key:
feat_key:
regressor:
min_cells:
batch_size:
Expand All @@ -180,10 +188,10 @@ def find_markers_by_regression(

res = {}
for df in assay.iter_normed_feature_wise(
cell_key,
"I",
batch_size,
"Finding correlated features",
cell_key=cell_key,
feat_key=feat_key,
batch_size=batch_size,
msg="Finding correlated features",
**norm_params,
):
for i in df:
Expand Down
24 changes: 17 additions & 7 deletions scarf/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ def _get_valid_barcodes(
data = self.grp["data"][idx[0] : idx[-1]]
indices = self.grp["indices"][idx[0] : idx[-1]]
cell_idx = np.repeat(range(len(idx) - 1), np.diff(idx))
mat = coo_matrix((data, (cell_idx, indices)), shape=(len(idx) - 1, self.nFeatures))
mat = coo_matrix(
(data, (cell_idx, indices)), shape=(len(idx) - 1, self.nFeatures)
)
valid_idx.append(np.array(mat.sum(axis=1)).T[0] > filtering_cutoff)
test_counter += data.shape[0]
assert test_counter == self.grp["data"].shape[0]
Expand Down Expand Up @@ -280,7 +282,9 @@ def consume(
data = data[idx - idx[0]]
indices = self.grp["indices"][idx[0] : idx[-1] + 1]
indices = indices[idx - idx[0]]
yield coo_matrix((data, (cell_idx, indices)), shape=(len(v_pos), self.nFeatures))
yield coo_matrix(
(data, (cell_idx, indices)), shape=(len(v_pos), self.nFeatures)
)

def close(self) -> None:
"""Closes file connection."""
Expand Down Expand Up @@ -392,22 +396,25 @@ def to_sparse(self, a: np.ndarray, dtype) -> coo_matrix:
(a[:, 0] + self.indexOffset).astype(int),
),
),
shape=(c[-1]+1, self.nFeatures),
shape=(c[-1] + 1, self.nFeatures),
dtype=dtype,
)

# noinspection DuplicatedCode
def consume(
self, batch_size: int, lines_in_mem: int = int(1e5), dtype=np.uint32,
self,
batch_size: int,
lines_in_mem: int = int(1e5),
dtype=np.uint32,
) -> Generator[coo_matrix, None, None]:
stream = pd.read_csv(
self.matFn, skiprows=3, sep=self.sep, header=None, chunksize=lines_in_mem
)
start = 1
dfs = []
for df in stream:
if df.iloc[-1, 1] - start > batch_size:
idx = df[1] < batch_size + start
if (df.iloc[-1, 1] - start) >= batch_size:
idx = df[1] < (batch_size + start)
dfs.append(df[idx])
yield self.to_sparse(np.vstack(dfs), dtype=dtype)
dfs = [df[~idx]]
Expand Down Expand Up @@ -717,7 +724,10 @@ def consume_group(self, batch_size: int) -> Generator[coo_matrix, None, None]:
idx = np.array(i)
n = idx.shape[0] - 1
nidx = np.repeat(range(n), np.diff(idx).astype("int32"))
yield coo_matrix((grp["data"][s:e], (nidx, grp["indices"][s:e])), shape=(n, self.nFeatures))
yield coo_matrix(
(grp["data"][s:e], (nidx, grp["indices"][s:e])),
shape=(n, self.nFeatures),
)
s = e

def consume(self, batch_size: int):
Expand Down
3 changes: 1 addition & 2 deletions scarf/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,11 +1339,10 @@ def _ini_cell_data(self, overwrite) -> None:
logger.info(f"cellData already exists so skipping _ini_cell_data")

def _dask_to_coo(self, d_arr, order: np.ndarray, n_threads: int) -> coo_matrix:
mat = np.zeros((d_arr.shape[0], self.nFeats))
mat = np.zeros((d_arr.shape[0], self.nFeats))
mat[:, order] = controlled_compute(d_arr, n_threads)
return coo_matrix(mat)


def dump(self, nthreads=2):
"""Copy the values from individual assays to the merged assay.
Expand Down

0 comments on commit 480f8af

Please sign in to comment.