Skip to content

Commit

Permalink
Add suuport for CLI stacking
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdstein committed May 11, 2024
1 parent 967d998 commit 527c7e6
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 24 deletions.
2 changes: 1 addition & 1 deletion mirar/pipelines/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_error_output_path(self) -> Path:
"""
error_output_path = Path(
get_output_path(
base_name=f"{self.night}_error_stack.txt",
base_name=f"{Path(self.night).name}_error_stack.txt",
dir_root=self.night_sub_dir,
)
)
Expand Down
6 changes: 4 additions & 2 deletions mirar/pipelines/winter/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
LATEST_SAVE_KEY,
MAX_DITHER_KEY,
OBSCLASS_KEY,
PROC_HISTORY_KEY,
RAW_IMG_KEY,
SOURCE_HISTORY_KEY,
SOURCE_NAME_KEY,
Expand Down Expand Up @@ -134,6 +135,7 @@
from mirar.processors.utils import (
CustomImageBatchModifier,
HeaderAnnotator,
HeaderEditor,
ImageBatcher,
ImageDebatcher,
ImageLoader,
Expand Down Expand Up @@ -510,8 +512,8 @@
# Stack stacks together

stack_stacks = [
ImageLoader(input_sub_dir="final", input_img_dir=base_output_dir),
ImageRebatcher([BASE_NAME_KEY]),
HeaderEditor(PROC_HISTORY_KEY, "load"),
ImageSaver(output_dir_name="restack_masks", write_mask=True),
Sextractor(
**sextractor_astrometry_config,
Expand All @@ -529,7 +531,7 @@
ImageSaver(output_dir_name="post_scamp"),
ImageDebatcher(),
HeaderAnnotator(input_keys=["TARGNAME", "FIELDID"], output_key=TARGET_KEY),
ImageRebatcher(["SUBCOORD", "FILTER", "EXPTIME", "BOARD_ID", TARGET_KEY]),
ImageRebatcher(["SUBCOORD", "FILTER", TARGET_KEY]),
Swarp(
swarp_config_path=swarp_config_path,
calculate_dims_in_swarp=True,
Expand Down
173 changes: 173 additions & 0 deletions mirar/pipelines/winter/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Module containing WINTER CLI commands
"""

import argparse
import logging
import sys
import tempfile
from pathlib import Path

import numpy as np
from astropy.time import Time
from sqlalchemy import Select, text
from sqlalchemy.sql import func

from mirar.data import Dataset, ImageBatch, cache
from mirar.database.transactions.select import run_select
from mirar.io import open_raw_image
from mirar.paths import TEMP_DIR
from mirar.pipelines.winter.models import ExposuresTable, RawsTable, StacksTable
from mirar.pipelines.winter.winter_pipeline import WINTERPipeline
from mirar.processors.utils.image_loader import load_from_list

logger = logging.getLogger(__name__)


def run_winter(
config: str,
dataset: Dataset | None = None,
log_level: str = "INFO",
subdir: str = None,
):
"""
Run a WINTER pipeline
:param config:
:param dataset:
:param log_level:
:param subdir:
:return:
"""

# Set up logging

log = logging.getLogger("mirar")

handler = logging.StreamHandler(sys.stdout)

formatter = logging.Formatter(
"%(name)s [l %(lineno)d] - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(log_level)

pipe = WINTERPipeline(
selected_configurations=config,
night=subdir,
)

_, errorstack = pipe.reduce_images(
dataset=dataset,
catch_all_errors=True,
)

errorstack.summarise_error_stack()


def run_stack_of_stacks():
"""
CLI wrapper for stacking pre-existing stack images of a target
:return:
"""
parser = argparse.ArgumentParser(description="Stack of stacks")
parser.add_argument("-t", "--target", help="Target to stack", type=str)
parser.add_argument("-f", "--fieldid", help="Field ID", type=str)
parser.add_argument("--level", help="Logging level", type=str, default="INFO")
parser.add_argument(
"--bindays", help="Window, in days, for bin", type=int, default=None
)
args = parser.parse_args()

if args.target is not None:
db_constraint = f"targname = '{args.target}'"
subdir = args.target
if args.fieldid is not None:
err = "Cannot specify both target and fieldid"
logger.error(err)
raise ValueError(err)
elif args.fieldid is not None:
db_constraint = f"fieldid = '{args.fieldid}'"
subdir = args.fieldid
else:
err = "Must specify either target or fieldid"
logger.error(err)
raise ValueError(err)

sel = (
Select(
StacksTable.savepath,
func.min(ExposuresTable.utctime).label("utctime"),
)
.join(StacksTable.raw)
.join(RawsTable.exposure_ids)
.group_by(StacksTable.savepath)
.where(text(db_constraint))
)

df = run_select(sel, StacksTable)
df.sort_values(by="utctime", inplace=True)
df.reset_index(drop=True, inplace=True)

with tempfile.TemporaryDirectory(dir=TEMP_DIR) as temp_dir_path:
print(f"Using cache {temp_dir_path}")

cache.set_cache_dir(temp_dir_path)

savepaths = []

for x in df["savepath"]:
path = Path(x)
if not path.exists():
err = f"Stack file {path} does not exist"
logger.warning(err)

savepaths.append(path)

print(f"Found {len(savepaths)} stack images")

savepaths = [Path(x) for x in df["savepath"]]

img_batch = load_from_list(savepaths, open_raw_image)

if args.bindays is not None:

times = np.array(
[
Time(str(x["utctime"].to_datetime64()), format="isot").mjd
for _, x in df.iterrows()
]
)

bins = np.arange(min(times), max(times) + args.bindays, step=args.bindays)

new_batch = ImageBatch()

images = np.array(img_batch.get_batch())

for i, t_start in enumerate(bins[:-1]):
t_end = bins[i + 1]

mask = (times >= t_start) & (times < t_end)

for img in images[mask]:
targ_name = f"{subdir}_{i}"
img["TARGNAME"] = targ_name
img["RESTACKN"] = i
img["BINSTART"] = t_start
img["BINEND"] = t_end

new_batch.append(img)

img_batch = new_batch

dataset = Dataset(img_batch)

run_winter(
config="stack_stacks_db",
dataset=dataset,
log_level=args.level,
subdir=f"restacks/{subdir}",
)
3 changes: 2 additions & 1 deletion mirar/pipelines/winter/winter_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class WINTERPipeline(Pipeline):
+ detect_candidates
+ process_candidates
+ avro_broadcast,
"stack_stacks": stack_stacks,
"stack_stacks": load_final_stack + stack_stacks,
"stack_stacks_db": stack_stacks,
"focus_cals": focus_cals,
"mosaic": mosaic,
"log": load_raw + extract_all + csvlog,
Expand Down
4 changes: 2 additions & 2 deletions mirar/processors/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
Module for general utility processors such as I/O and interacting with metadata
"""

from mirar.processors.utils.header_annotate import HeaderAnnotator
from mirar.processors.utils.header_annotate import HeaderAnnotator, HeaderEditor
from mirar.processors.utils.header_reader import HeaderReader
from mirar.processors.utils.image_loader import ImageLoader, MEFLoader
from mirar.processors.utils.image_loader import ImageListLoader, ImageLoader, MEFLoader
from mirar.processors.utils.image_modifier import CustomImageBatchModifier
from mirar.processors.utils.image_plotter import ImagePlotter
from mirar.processors.utils.image_rejector import ImageRejector
Expand Down
93 changes: 75 additions & 18 deletions mirar/processors/utils/image_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,17 @@ def unzip(zipped_list: list[str]) -> list[str]:
return unzipped_list


def load_from_dir(
input_dir: str | Path,
def load_from_list(
img_list: list[str | Path],
open_f: Callable[[str | Path], Image | list[Image]],
) -> ImageBatch:
"""
Function to load all images in a directory
Load images from a list of files
:param input_dir: Input directory
:param img_list: Image list
:param open_f: Function to open images
:return: ImageBatch object
"""
img_list = sorted(glob(f"{input_dir}/*.fits"))

# check for zipped files too
zipped_list = sorted(glob(f"{input_dir}/*.fz"))
if len(zipped_list) > 0:
unzipped_list = unzip(zipped_list)
for file in unzipped_list:
img_list.append(file)

if len(img_list) < 1:
err = f"No images found in {input_dir}. Please check path is correct!"
logger.error(err)
raise ImageNotFoundError(err)

images = ImageBatch()

for path in tqdm(img_list):
Expand All @@ -98,6 +84,34 @@ def load_from_dir(
return images


def load_from_dir(
input_dir: str | Path,
open_f: Callable[[str | Path], Image | list[Image]],
) -> ImageBatch:
"""
Function to load all images in a directory
:param input_dir: Input directory
:param open_f: Function to open images
:return: ImageBatch object
"""
img_list = sorted(glob(f"{input_dir}/*.fits"))

# check for zipped files too
zipped_list = sorted(glob(f"{input_dir}/*.fz"))
if len(zipped_list) > 0:
unzipped_list = unzip(zipped_list)
for file in unzipped_list:
img_list.append(file)

if len(img_list) < 1:
err = f"No images found in {input_dir}. Please check path is correct!"
logger.error(err)
raise ImageNotFoundError(err)

return load_from_list(img_list, open_f)


class ImageLoader(BaseImageProcessor):
"""Processor to load raw images."""

Expand Down Expand Up @@ -181,6 +195,49 @@ def _apply_to_images(
return new_batch


class ImageListLoader(BaseImageProcessor):
"""Processor to load raw images."""

base_key = "loadlist"

image_type = Image
default_load_image = staticmethod(open_raw_image)

def __init__(
self,
img_list: list[Path],
load_image: Callable[[str], Image | list[Image]] = None,
):
super().__init__()
self.img_list = img_list
if len(self.img_list) < 1:
err = "No images found in list. Please check path is correct!"
logger.error(err)
raise ImageNotFoundError(err)
if load_image is None:
load_image = self.default_load_image
self.load_image = load_image

def __str__(self):
return f"Processor to load {len(self.img_list)} images from list"

def _apply_to_images(self, batch: ImageBatch) -> ImageBatch:
"""
Load images from a list of files
:param batch: Batch of images
:return: New batch of images
"""

if len(batch) > 0:
logger.warning("Batch is not empty. Overwriting images!")

return load_from_list(
self.img_list,
open_f=self.load_image,
)


class MEFLoader(ImageLoader):
"""Processor to load MEF images."""

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ extension-pkg-whitelist=["pydantic"]
[tool.pylint.typecheck]
generated-members=["u.*"]

[tool.poetry.scripts]
mirar-run = 'mirar.__main__:main'
winter-stack = 'mirar.pipelines.winter.run:run_stack_of_stacks'

[build-system]
requires = ["setuptools", "wheel", "poetry-core>=1.2.0",]
build-backend = "poetry.core.masonry.api"

0 comments on commit 527c7e6

Please sign in to comment.