From 5da01b565261b1dbdbf02b48be8d77228f7cdf94 Mon Sep 17 00:00:00 2001 From: Robert Stein Date: Sat, 11 May 2024 12:23:40 -0700 Subject: [PATCH] Add suuport for CLI stacking --- mirar/pipelines/base_pipeline.py | 2 +- mirar/pipelines/winter/blocks.py | 6 +- mirar/pipelines/winter/run.py | 173 ++++++++++++++++++++++ mirar/pipelines/winter/winter_pipeline.py | 3 +- mirar/processors/utils/__init__.py | 4 +- mirar/processors/utils/image_loader.py | 93 +++++++++--- pyproject.toml | 4 + 7 files changed, 261 insertions(+), 24 deletions(-) create mode 100644 mirar/pipelines/winter/run.py diff --git a/mirar/pipelines/base_pipeline.py b/mirar/pipelines/base_pipeline.py index 907112b73..47f8778db 100644 --- a/mirar/pipelines/base_pipeline.py +++ b/mirar/pipelines/base_pipeline.py @@ -203,7 +203,7 @@ def get_error_output_path(self) -> Path: """ error_output_path = Path( get_output_path( - base_name=f"{self.night}_error_stack.txt", + base_name=f"{Path(self.night).name}_error_stack.txt", dir_root=self.night_sub_dir, ) ) diff --git a/mirar/pipelines/winter/blocks.py b/mirar/pipelines/winter/blocks.py index 24344aa3b..93e111c3a 100644 --- a/mirar/pipelines/winter/blocks.py +++ b/mirar/pipelines/winter/blocks.py @@ -14,6 +14,7 @@ LATEST_SAVE_KEY, MAX_DITHER_KEY, OBSCLASS_KEY, + PROC_HISTORY_KEY, RAW_IMG_KEY, SOURCE_HISTORY_KEY, SOURCE_NAME_KEY, @@ -134,6 +135,7 @@ from mirar.processors.utils import ( CustomImageBatchModifier, HeaderAnnotator, + HeaderEditor, ImageBatcher, ImageDebatcher, ImageLoader, @@ -510,8 +512,8 @@ # Stack stacks together stack_stacks = [ - ImageLoader(input_sub_dir="final", input_img_dir=base_output_dir), ImageRebatcher([BASE_NAME_KEY]), + HeaderEditor(PROC_HISTORY_KEY, "load"), ImageSaver(output_dir_name="restack_masks", write_mask=True), Sextractor( **sextractor_astrometry_config, @@ -529,7 +531,7 @@ ImageSaver(output_dir_name="post_scamp"), ImageDebatcher(), HeaderAnnotator(input_keys=["TARGNAME", "FIELDID"], output_key=TARGET_KEY), - ImageRebatcher(["SUBCOORD", "FILTER", "EXPTIME", "BOARD_ID", TARGET_KEY]), + ImageRebatcher(["SUBCOORD", "FILTER", TARGET_KEY]), Swarp( swarp_config_path=swarp_config_path, calculate_dims_in_swarp=True, diff --git a/mirar/pipelines/winter/run.py b/mirar/pipelines/winter/run.py new file mode 100644 index 000000000..8dfb6c147 --- /dev/null +++ b/mirar/pipelines/winter/run.py @@ -0,0 +1,173 @@ +""" +Module containing WINTER CLI commands +""" + +import argparse +import logging +import sys +import tempfile +from pathlib import Path + +import numpy as np +from astropy.time import Time +from sqlalchemy import Select, text +from sqlalchemy.sql import func + +from mirar.data import Dataset, ImageBatch, cache +from mirar.database.transactions.select import run_select +from mirar.io import open_raw_image +from mirar.paths import TEMP_DIR +from mirar.pipelines.winter.models import ExposuresTable, RawsTable, StacksTable +from mirar.pipelines.winter.winter_pipeline import WINTERPipeline +from mirar.processors.utils.image_loader import load_from_list + +logger = logging.getLogger(__name__) + + +def run_winter( + config: str, + dataset: Dataset | None = None, + log_level: str = "INFO", + subdir: str = None, +): + """ + Run a WINTER pipeline + + :param config: + :param dataset: + :param log_level: + :param subdir: + :return: + """ + + # Set up logging + + log = logging.getLogger("mirar") + + handler = logging.StreamHandler(sys.stdout) + + formatter = logging.Formatter( + "%(name)s [l %(lineno)d] - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + log.addHandler(handler) + log.setLevel(log_level) + + pipe = WINTERPipeline( + selected_configurations=config, + night=subdir, + ) + + _, errorstack = pipe.reduce_images( + dataset=dataset, + catch_all_errors=True, + ) + + errorstack.summarise_error_stack() + + +def run_stack_of_stacks(): + """ + CLI wrapper for stacking pre-existing stack images of a target + + :return: + """ + parser = argparse.ArgumentParser(description="Stack of stacks") + parser.add_argument("-t", "--target", help="Target to stack", type=str) + parser.add_argument("-f", "--fieldid", help="Field ID", type=str) + parser.add_argument("--level", help="Logging level", type=str, default="INFO") + parser.add_argument( + "--bindays", help="Window, in days, for bin", type=int, default=None + ) + args = parser.parse_args() + + if args.target is not None: + db_constraint = f"targname = '{args.target}'" + subdir = args.target + if args.fieldid is not None: + err = "Cannot specify both target and fieldid" + logger.error(err) + raise ValueError(err) + elif args.fieldid is not None: + db_constraint = f"fieldid = '{args.fieldid}'" + subdir = args.fieldid + else: + err = "Must specify either target or fieldid" + logger.error(err) + raise ValueError(err) + + sel = ( + Select( + StacksTable.savepath, + func.min(ExposuresTable.utctime).label("utctime"), + ) + .join(StacksTable.raw) + .join(RawsTable.exposure_ids) + .group_by(StacksTable.savepath) + .where(text(db_constraint)) + ) + + df = run_select(sel, StacksTable) + df.sort_values(by="utctime", inplace=True) + df.reset_index(drop=True, inplace=True) + + with tempfile.TemporaryDirectory(dir=TEMP_DIR) as temp_dir_path: + print(f"Using cache {temp_dir_path}") + + cache.set_cache_dir(temp_dir_path) + + savepaths = [] + + for x in df["savepath"]: + path = Path(x) + if not path.exists(): + err = f"Stack file {path} does not exist" + logger.warning(err) + + savepaths.append(path) + + print(f"Found {len(savepaths)} stack images") + + savepaths = [Path(x) for x in df["savepath"]] + + img_batch = load_from_list(savepaths, open_raw_image) + + if args.bindays is not None: + + times = np.array( + [ + Time(str(x["utctime"].to_datetime64()), format="isot").mjd + for _, x in df.iterrows() + ] + ) + + bins = np.arange(min(times), max(times) + args.bindays, step=args.bindays) + + new_batch = ImageBatch() + + images = np.array(img_batch.get_batch()) + + for i, t_start in enumerate(bins[:-1]): + t_end = bins[i + 1] + + mask = (times >= t_start) & (times < t_end) + + for img in images[mask]: + targ_name = f"{subdir}_{i}" + img["TARGNAME"] = targ_name + img["RESTACKN"] = i + img["BINSTART"] = t_start + img["BINEND"] = t_end + + new_batch.append(img) + + img_batch = new_batch + + dataset = Dataset(img_batch) + + run_winter( + config="stack_stacks_db", + dataset=dataset, + log_level=args.level, + subdir=f"restacks/{subdir}", + ) diff --git a/mirar/pipelines/winter/winter_pipeline.py b/mirar/pipelines/winter/winter_pipeline.py index 9516a26f7..25e956196 100644 --- a/mirar/pipelines/winter/winter_pipeline.py +++ b/mirar/pipelines/winter/winter_pipeline.py @@ -121,7 +121,8 @@ class WINTERPipeline(Pipeline): + detect_candidates + process_candidates + avro_broadcast, - "stack_stacks": stack_stacks, + "stack_stacks": load_final_stack + stack_stacks, + "stack_stacks_db": stack_stacks, "focus_cals": focus_cals, "mosaic": mosaic, "log": load_raw + extract_all + csvlog, diff --git a/mirar/processors/utils/__init__.py b/mirar/processors/utils/__init__.py index 6117bac40..e1b578d5f 100644 --- a/mirar/processors/utils/__init__.py +++ b/mirar/processors/utils/__init__.py @@ -2,9 +2,9 @@ Module for general utility processors such as I/O and interacting with metadata """ -from mirar.processors.utils.header_annotate import HeaderAnnotator +from mirar.processors.utils.header_annotate import HeaderAnnotator, HeaderEditor from mirar.processors.utils.header_reader import HeaderReader -from mirar.processors.utils.image_loader import ImageLoader, MEFLoader +from mirar.processors.utils.image_loader import ImageListLoader, ImageLoader, MEFLoader from mirar.processors.utils.image_modifier import CustomImageBatchModifier from mirar.processors.utils.image_plotter import ImagePlotter from mirar.processors.utils.image_rejector import ImageRejector diff --git a/mirar/processors/utils/image_loader.py b/mirar/processors/utils/image_loader.py index 722856ee3..495ecc77c 100644 --- a/mirar/processors/utils/image_loader.py +++ b/mirar/processors/utils/image_loader.py @@ -47,31 +47,17 @@ def unzip(zipped_list: list[str]) -> list[str]: return unzipped_list -def load_from_dir( - input_dir: str | Path, +def load_from_list( + img_list: list[str | Path], open_f: Callable[[str | Path], Image | list[Image]], ) -> ImageBatch: """ - Function to load all images in a directory + Load images from a list of files - :param input_dir: Input directory + :param img_list: Image list :param open_f: Function to open images :return: ImageBatch object """ - img_list = sorted(glob(f"{input_dir}/*.fits")) - - # check for zipped files too - zipped_list = sorted(glob(f"{input_dir}/*.fz")) - if len(zipped_list) > 0: - unzipped_list = unzip(zipped_list) - for file in unzipped_list: - img_list.append(file) - - if len(img_list) < 1: - err = f"No images found in {input_dir}. Please check path is correct!" - logger.error(err) - raise ImageNotFoundError(err) - images = ImageBatch() for path in tqdm(img_list): @@ -98,6 +84,34 @@ def load_from_dir( return images +def load_from_dir( + input_dir: str | Path, + open_f: Callable[[str | Path], Image | list[Image]], +) -> ImageBatch: + """ + Function to load all images in a directory + + :param input_dir: Input directory + :param open_f: Function to open images + :return: ImageBatch object + """ + img_list = sorted(glob(f"{input_dir}/*.fits")) + + # check for zipped files too + zipped_list = sorted(glob(f"{input_dir}/*.fz")) + if len(zipped_list) > 0: + unzipped_list = unzip(zipped_list) + for file in unzipped_list: + img_list.append(file) + + if len(img_list) < 1: + err = f"No images found in {input_dir}. Please check path is correct!" + logger.error(err) + raise ImageNotFoundError(err) + + return load_from_list(img_list, open_f) + + class ImageLoader(BaseImageProcessor): """Processor to load raw images.""" @@ -181,6 +195,49 @@ def _apply_to_images( return new_batch +class ImageListLoader(BaseImageProcessor): + """Processor to load raw images.""" + + base_key = "loadlist" + + image_type = Image + default_load_image = staticmethod(open_raw_image) + + def __init__( + self, + img_list: list[Path], + load_image: Callable[[str], Image | list[Image]] = None, + ): + super().__init__() + self.img_list = img_list + if len(self.img_list) < 1: + err = "No images found in list. Please check path is correct!" + logger.error(err) + raise ImageNotFoundError(err) + if load_image is None: + load_image = self.default_load_image + self.load_image = load_image + + def __str__(self): + return f"Processor to load {len(self.img_list)} images from list" + + def _apply_to_images(self, batch: ImageBatch) -> ImageBatch: + """ + Load images from a list of files + + :param batch: Batch of images + :return: New batch of images + """ + + if len(batch) > 0: + logger.warning("Batch is not empty. Overwriting images!") + + return load_from_list( + self.img_list, + open_f=self.load_image, + ) + + class MEFLoader(ImageLoader): """Processor to load MEF images.""" diff --git a/pyproject.toml b/pyproject.toml index e82fc52bd..433f2c622 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,10 @@ extension-pkg-whitelist=["pydantic"] [tool.pylint.typecheck] generated-members=["u.*"] +[tool.poetry.scripts] +mirar-run = 'mirar.__main__:main' +winter-stack = 'mirar.pipelines.winter.run:run_stack_of_stacks' + [build-system] requires = ["setuptools", "wheel", "poetry-core>=1.2.0",] build-backend = "poetry.core.masonry.api"