Skip to content

Commit

Permalink
Merge pull request #39 from FRBs/pulsar
Browse files Browse the repository at this point in the history
Pulsar-1
  • Loading branch information
pravirkr authored Aug 21, 2024
2 parents 090f5bc + 5fbbe24 commit 6ac524c
Show file tree
Hide file tree
Showing 27 changed files with 3,186 additions and 1,255 deletions.
18 changes: 14 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"numpy",
"numba",
"astropy",
"matplotlib",
"h5py",
"bottleneck",
"attrs",
Expand Down Expand Up @@ -61,7 +62,14 @@ spp_extract = "sigpyproc.apps.spp_extract:main"
spp_clean = "sigpyproc.apps.spp_clean:main"

[tool.ruff]
include = ["pyproject.toml", "sigpyproc/**/*.py"]
include = [
"pyproject.toml",
"sigpyproc/**/*.py",
"tests/**/*.py",
]

exclude = ["sigpyproc/apps/spp_digifil.py"]

line-length = 88
indent-width = 4
target-version = "py39"
Expand All @@ -75,12 +83,12 @@ select = ["ALL"]
ignore = ["D1", "ANN1", "PLR2004", "G004"]

[tool.ruff.lint.pylint]
max-args = 10
max-args = 15

[tool.ruff.lint.pydocstyle]
convention = "numpy"

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = ["S101", "FBT", "PLR2004", "PT011", "SLF001"]

[tool.pytest.ini_options]
Expand All @@ -91,7 +99,7 @@ testpaths = "tests"
source = ["./sigpyproc/"]

[tool.coverage.run]
omit = ["tests/*", "docs/*", "*__init__.py", "sigpyproc/core/kernels.py"]
omit = ["tests/*", "docs/*", "*__init__.py"]

[tool.coverage.report]
show_missing = true
Expand All @@ -100,10 +108,12 @@ ignore_errors = true
exclude_lines = [
'raise NotImplementedError',
'if TYPE_CHECKING:',
'except ModuleNotFoundError:',
'if __name__ == "__main__":',
'if outfile_name is None:',
]

[tool.mypy]
enable_incomplete_feature = ["Unpack"]
ignore_missing_imports = true
plugins = ["numpy.typing.mypy_plugin"]
32 changes: 13 additions & 19 deletions sigpyproc/apps/spp_digifil.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,20 @@
help="Output filename",
)
def main(
filfile,
cont,
nbits,
block_size,
rescale_constant,
tscrunch_factor,
fscrunch_factor,
rescale_seconds,
scale_fac,
apply_FITS_scale_and_offset,
):
filfile: str,
cont: bool,
nbits: int,
block_size: int,
rescale_constant: bool,
tscrunch_factor: int,
fscrunch_factor: int,
rescale_seconds: float,
scale_fac: float,
apply_FITS_scale_and_offset: bool,
) -> None:
"""Convert to sigproc output digifil style."""
raise NotImplementedError("This function is not implemented yet.")
#logger = get_logger(__name__)
#nbytes_per_sample =
#gulpsize = block_size * 1024 * 1024 //
#logger.info(f"Reading {filfile}")
#fil = FilReader(filfile)
#fil.downsample(tfactor=tfactor, ffactor=ffactor, gulp=gulp, filename=outfile)

msg = "This function is not implemented yet."
raise NotImplementedError(msg)


if __name__ == "__main__":
Expand Down
156 changes: 99 additions & 57 deletions sigpyproc/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import ExitStack
from typing import TYPE_CHECKING

import numpy as np
Expand Down Expand Up @@ -45,7 +46,13 @@ def header(self) -> Header:
""":class:`~sigpyproc.header.Header`: Header metadata of input file."""

@abstractmethod
def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
def read_block(
self,
start: int,
nsamps: int,
fch1: float | None = None,
nchans: int | None = None,
) -> FilterbankBlock:
"""Read a data block from the filterbank file stream.
Parameters
Expand All @@ -54,6 +61,10 @@ def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
first time sample of the block to be read
nsamps : int
number of samples in the block (i.e. block will be nsamps*nchans in size)
fch1 : float, optional
frequency of the first channel, by default None (header value)
nchans : int, optional
number of channels in the block, by default None (header value)
Returns
-------
Expand All @@ -63,7 +74,7 @@ def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
Raises
------
ValueError
if requested samples are out of range
if requested samples or channels are out of range
"""

@abstractmethod
Expand Down Expand Up @@ -178,13 +189,13 @@ def compute_stats(
Keyword arguments for :func:`read_plan`.
"""
bag = ChannelStats(self.header.nchans, self.header.nsamples)
for nsamps_r, ii, data in self.read_plan(
for _, ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
bag.push_data(data, nsamps_r, ii, mode="full")
bag.push_data(data, ii, mode="full")
self._chan_stats = bag

def compute_stats_basic(
Expand All @@ -208,13 +219,13 @@ def compute_stats_basic(
Keyword arguments for :func:`read_plan`.
"""
bag = ChannelStats(self.header.nchans, self.header.nsamples)
for nsamps_r, ii, data in self.read_plan(
for _, ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
bag.push_data(data, nsamps_r, ii, mode="basic")
bag.push_data(data, ii, mode="basic")
self._chan_stats = bag

def collapse(
Expand Down Expand Up @@ -289,7 +300,10 @@ def bandpass(
kernels.extract_bpass(data, bpass_ar, self.header.nchans, nsamps_r)
num_samples += nsamps_r
bpass_ar /= num_samples
return TimeSeries(bpass_ar, self.header.new_header({"nchans": 1}))
return TimeSeries(
bpass_ar,
self.header.new_header({"nchans": 1, "nsamples": len(bpass_ar)}),
)

def dedisperse(
self,
Expand Down Expand Up @@ -345,7 +359,10 @@ def dedisperse(
nsamps_r,
ii * (gulp - max_delay),
)
return TimeSeries(tim_ar, self.header.new_header({"nchans": 1, "dm": dm}))
return TimeSeries(
tim_ar,
self.header.new_header({"nchans": 1, "dm": dm, "nsamples": tim_len}),
)

def read_chan(
self,
Expand Down Expand Up @@ -626,6 +643,7 @@ def extract_chans(
self,
chans: np.ndarray | None = None,
outfile_base: str | None = None,
batch_size: int = 200,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
Expand All @@ -639,6 +657,8 @@ def extract_chans(
channel numbers to extract, by default all channels
outfile_base : str, optional
base name of output files, by default ``header.basename``.
batch_size : int, optional
number of channels to extract in each batch, by default 200
gulp : int, optional
number of samples in each read, by default 16384
start : int, optional
Expand Down Expand Up @@ -671,30 +691,38 @@ def extract_chans(
raise ValueError(msg)
if outfile_base is None:
outfile_base = self.header.basename

filenames = [
f"{outfile_base}_chan{chans[ichan]:04d}.tim"
for ichan in range(nchans_extract)
]
out_files = [
self.header.prep_outfile(
filenames[ichan],
updates={"nchans": 1, "nbits": 32, "data_type": "time series"},
nbits=32,
)
for ichan in range(nchans_extract)
]
for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
out_file.cwrite(data_2d[:, chans[ifile]])
for out_file in out_files:
out_file.close()
filenames = [f"{outfile_base}_chan{chan:04d}.tim" for chan in chans]

# Process in batches to avoid file open/close limits
for batch_start in range(0, nchans_extract, batch_size):
batch_end = min(batch_start + batch_size, nchans_extract)
batch_chans = chans[batch_start:batch_end]
batch_files = filenames[batch_start:batch_end]

with ExitStack() as stack:
out_files = [
stack.enter_context(
self.header.prep_outfile(
filename,
updates={
"nchans": 1,
"nbits": 32,
"data_type": "time series",
},
nbits=32,
),
)
for filename in batch_files
]
for nsamps_r, _, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
out_file.cwrite(data_2d[:, batch_chans[ifile]])
return filenames

def extract_bands(
Expand All @@ -703,6 +731,7 @@ def extract_bands(
nchans: int,
chanpersub: int | None = None,
outfile_base: str | None = None,
batch_size: int = 200,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
Expand All @@ -720,6 +749,8 @@ def extract_bands(
number of channels in each sub-band, by default ``nchans``
outfile_base: str, optional
base name of output files, by default ``header.basename``.
batch_size: int, optional
number of sub-bands to extract in each batch, by default 200
gulp : int, optional
number of samples in each read, by default 16384
start : int, optional
Expand Down Expand Up @@ -767,31 +798,42 @@ def extract_bands(
outfile_base = self.header.basename

filenames = [f"{outfile_base}_sub{isub:02d}.fil" for isub in range(nsub)]
out_files = [
self.header.prep_outfile(
filenames[isub],
updates={
"nchans": chanpersub,
"fch1": fstart + isub * chanpersub * self.header.foff,
},
nbits=self.header.nbits,
)
for isub in range(nsub)
]

for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
iband_chanstart = chanstart + ifile * chanpersub
subband_ar = data_2d[:, iband_chanstart : iband_chanstart + chanpersub]
out_file.cwrite(subband_ar.ravel())
for out_file in out_files:
out_file.close()
# Process in batches to avoid file open/close limits
for batch_start in range(0, nsub, batch_size):
batch_end = min(batch_start + batch_size, nsub)
batch_files = filenames[batch_start:batch_end]

with ExitStack() as stack:
out_files = [
stack.enter_context(
self.header.prep_outfile(
filename,
updates={
"nchans": chanpersub,
"fch1": fstart
+ (batch_start + i) * chanpersub * self.header.foff,
},
nbits=self.header.nbits,
),
)
for i, filename in enumerate(batch_files)
]

for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
iband_chanstart = chanstart + (batch_start + ifile) * chanpersub
subband_ar = data_2d[
:,
iband_chanstart : iband_chanstart + chanpersub,
]
out_file.cwrite(subband_ar.ravel())
return filenames

def requantize(
Expand Down Expand Up @@ -892,7 +934,7 @@ def remove_zerodm(
if outfile_name is None:
outfile_name = f"{self.header.basename}_noZeroDM.fil"

bpass = self.bandpass(**plan_kwargs)
bpass = self.bandpass(**plan_kwargs).data
chanwts = bpass / bpass.sum()
out_ar = np.empty(
self.header.nsamples * self.header.nchans,
Expand Down
Loading

0 comments on commit 6ac524c

Please sign in to comment.