diff --git a/.gitignore b/.gitignore index 84d4a9e0..a38bc724 100755 --- a/.gitignore +++ b/.gitignore @@ -70,6 +70,7 @@ instance/ # Sphinx documentation docs/_build/ +docs/source/user_guide/observatory_info.csv # PyBuilder target/ diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 5f90ac59..c0fd1df7 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -23,8 +23,8 @@ sphinx: fail_on_warning: true # Optionally build your docs in additional formats such as PDF and ePub -formats: - - pdf +# formats: + # - pdf # Optional but recommended, declare the Python requirements required # to build your documentation diff --git a/docs/source/conf.py b/docs/source/conf.py index 1ceb1f04..0ab0b817 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -8,6 +8,10 @@ from packaging.version import parse from sphinx_astropy.conf.v2 import * +sys.path.insert(0, pathlib.Path(__file__).parents[0].resolve().as_posix()) + +import headerCSVGenerator + sys.path.insert(0, pathlib.Path(__file__).parents[2].resolve().as_posix()) import pyscope @@ -57,6 +61,13 @@ extensions = list(map(lambda x: x.replace("viewcode", "linkcode"), extensions)) +# Generate CSV for header info +print("Generating CSV for header info...") +targetPath = os.path.join( + os.path.dirname(__file__), "user_guide", "observatory_info.csv" +) +headerCSVGenerator.HeaderCSVGenerator().generate_csv(targetPath) + def linkcode_resolve(domain, info): """ diff --git a/docs/source/headerCSVGenerator.py b/docs/source/headerCSVGenerator.py new file mode 100644 index 00000000..549f7306 --- /dev/null +++ b/docs/source/headerCSVGenerator.py @@ -0,0 +1,118 @@ +import ast +import csv +import inspect +import re + +from pyscope.observatory import Observatory + + +class HeaderCSVGenerator: + """Generates a CSV file containing the header information for the Observatory class. + + The CSV file contains the following columns: + - Header Key: The key of the header + - Header Value: The value of the header + - Header Description: The description of the header + + The CSV file is generated by parsing the Observatory class for the info dictionaries + and then combining them into a master dictionary. The master dictionary is then + output to a CSV file. + """ + + def __init__(self): + pass + + def get_info_dicts(self): + descriptors = inspect.getmembers( + Observatory, predicate=inspect.isdatadescriptor + ) + info = [] + for descriptor in descriptors: + if "info" in descriptor[0]: + info.append(descriptor) + + source_list = [] + for descriptor in info: + source_list.append(inspect.getsource(descriptor[1].fget)) + + # Split source into lines, remove tabs + source_lines = [] + for source in source_list: + source_lines.append(source.split("\n")) + + # Remove leading whitespace + stripped_lines = [] + for source in source_lines: + for line in source: + stripped_lines.append(line.lstrip()) + + # Return parts of source_list that contain 'info = {...}' + info_dict = [] + for property in source_list: + # Use regex to find info = {[^}]} and add it to info_dict + info_dict.append(re.findall(r"info = {[^}]*}", property)[0]) + + # Remove 'info = ' from each string + for i, property in enumerate(info_dict): + info_dict[i] = property[7:] + + # Encase any references to self. in quotes + for i, property in enumerate(info_dict): + info_dict[i] = re.sub(r"self\.([a-zA-Z0-9_.\[\]]+)", r'"\1"', property) + + # Find references to str() + for i, property in enumerate(info_dict): + info_dict[i] = re.sub(r"(str\(([a-zA-Z\"\_\.\[\]]+)\))", r"\2", property) + + # Replace any references to self.etc. with None + for i, property in enumerate(info_dict): + # Use regex "\(\s+(.*?\],)"gm + # info_dict[i] = re.sub(r'\(\\n\s+(.*?\],)', 'None', property) + group = re.findall(r"\(\n\s+([\s\S]*?\],)", property) + # replace the group with None + if group: + info_dict[i] = property.replace(group[0], "None,") + + # Enclose any parts with (sep="dms") in quotes + for i, property in enumerate(info_dict): + info_dict[i] = re.sub( + r"(\"\S+\(sep=\"dms\"\))", + lambda m: '"' + m.group(1).replace('"', " ") + '"', + property, + ) + + # Remove any parts matching \% i(?=\)) (or replace with "") + for i, property in enumerate(info_dict): + info_dict[i] = re.sub(r"( \% i(?=\)))", "", property) + + # Enclose in quotes any parts matching 'not \"\S+\" is None' + for i, property in enumerate(info_dict): + info_dict[i] = re.sub( + r"(not \"\S+\" is None)", + lambda m: '"' + m.group(1).replace('"', " ") + '"', + property, + ) + + # Pass each info_dict through ast.literal_eval to convert to dictionary + info_dict_parsed = [] + for info in info_dict: + info_dict_parsed.append(ast.literal_eval(info)) + + return info_dict_parsed + + def generate_csv(self, filename): + info_dicts = self.get_info_dicts() + # Add each dictionary to make a master dictionary + master_dict = {} + for info in info_dicts: + master_dict.update(info) + # Output to csv in the format key, value, description + # key is the key of the dictionary + # value is the first part of the tuple (the value) + # description is the second part of the tuple (the description) + with open(filename, "w", newline="") as csv_file: + writer = csv.writer(csv_file) + # Write header + writer.writerow(["Header Key", "Header Value", "Header Description"]) + for key, value in master_dict.items(): + writer.writerow([key, value[0], value[1]]) diff --git a/docs/source/user_guide/header.rst b/docs/source/user_guide/header.rst new file mode 100644 index 00000000..21fed342 --- /dev/null +++ b/docs/source/user_guide/header.rst @@ -0,0 +1,15 @@ +Header +====== + +This is a page listing all potential header keywords for an `Observatory` object. + +.. Note:: + This is auto-generated from the `info` dictionaries in the `Observatory` class. Some + of the information is not available for all observatories, and some header values may + contain nonsense due to the auto-generation script. + +.. csv-table:: Sample Header + :file: observatory_info.csv + :widths: 4, 6, 10 + :header-rows: 1 + diff --git a/docs/source/user_guide/index.rst b/docs/source/user_guide/index.rst index 90b29f94..b9022f7a 100644 --- a/docs/source/user_guide/index.rst +++ b/docs/source/user_guide/index.rst @@ -7,6 +7,7 @@ User Guide :maxdepth: 2 examples + header logging config help diff --git a/pyscope/observatory/observatory.py b/pyscope/observatory/observatory.py index ba1077a1..1350324b 100644 --- a/pyscope/observatory/observatory.py +++ b/pyscope/observatory/observatory.py @@ -2540,7 +2540,8 @@ def _read_out_kwargs(self, dictionary): @property def autofocus_info(self): logger.debug("Observatory.autofocus_info() called") - return {"AUTODRIV": self.autofocus_driver} + info = {"AUTODRIV": (self.autofocus_driver, "Autofocus driver")} + return info @property def camera_info(self): @@ -3050,7 +3051,7 @@ def focuser_info(self): @property def observatory_info(self): logger.debug("Observatory.observatory_info() called") - return { + info = { "OBSNAME": (self.site_name, "Observatory name"), "OBSINSTN": (self.instrument_name, "Instrument name"), "OBSINSTD": (self.instrument_description, "Instrument description"), @@ -3062,6 +3063,7 @@ def observatory_info(self): "XPIXSCAL": (self.pixel_scale[0], "Observatory x-pixel scale"), "YPIXSCAL": (self.pixel_scale[1], "Observatory y-pixel scale"), } + return info @property def observing_conditions_info(self): @@ -3392,39 +3394,41 @@ def safety_monitor_info(self, index=None): for i in range(len(self.safety_monitor)): try: self.safety_monitor[i].Connected = True + # Should likely be broken into multiple try/except blocks + info = { + ("SM%iCONN" % i): (True, "Safety monitor connected"), + ("SM%iISSAF" % i): ( + self.safety_monitor[i].IsSafe, + "Safety monitor safe", + ), + ("SM%iNAME" % i): ( + self.safety_monitor[i].Name, + "Safety monitor name", + ), + ("SM%iDRVER" % i): ( + self.safety_monitor[i].DriverVersion, + "Safety monitor driver version", + ), + ("SM%iDRV" % i): ( + str(self.safety_monitor[i].DriverInfo), + "Safety monitor driver name", + ), + ("SM%iINTF" % i): ( + self.safety_monitor[i].InterfaceVersion, + "Safety monitor interface version", + ), + ("SM%iDESC" % i): ( + self.safety_monitor[i].Description, + "Safety monitor description", + ), + ("SM%iSUPAC" % i): ( + str(self.safety_monitor[i].SupportedActions), + "Safety monitor supported actions", + ), + } except: info = {"SM%iCONN" % i: (False, "Safety monitor connected")} - info = { - ("SM%iCONN" % i): (True, "Safety monitor connected"), - ("SM%iISSAF" % i): ( - self.safety_monitor[i].IsSafe, - "Safety monitor safe", - ), - ("SM%iNAME" % i): ( - self.safety_monitor[i].Name, - "Safety monitor name", - ), - ("SM%iDRVER" % i): ( - self.safety_monitor[i].DriverVersion, - "Safety monitor driver version", - ), - ("SM%iDRV" % i): ( - str(self.safety_monitor[i].DriverInfo), - "Safety monitor driver name", - ), - ("SM%iINTF" % i): ( - self.safety_monitor[i].InterfaceVersion, - "Safety monitor interface version", - ), - ("SM%iDESC" % i): ( - self.safety_monitor[i].Description, - "Safety monitor description", - ), - ("SM%iSUPAC" % i): ( - str(self.safety_monitor[i].SupportedActions), - "Safety monitor supported actions", - ), - } + all_info.append(info) else: return {"SM0CONN": (False, "Safety monitor connected")} @@ -3444,61 +3448,69 @@ def switch_info(self, index=None): for i in range(len(self.switch)): try: self.switch.Connected = True + try: + info = { + ("SW%iCONN" % i): (True, "Switch connected"), + ("SW%iNAME" % i): (self.switch[i].Name, "Switch name"), + ("SW%iDRVER" % i): ( + self.switch[i].DriverVersion, + "Switch driver version", + ), + ("SW%iDRV" % i): ( + str(self.switch[i].DriverInfo), + "Switch driver name", + ), + ("SW%iINTF" % i): ( + self.switch[i].InterfaceVersion, + "Switch interface version", + ), + ("SW%iDESC" % i): ( + self.switch[i].Description, + "Switch description", + ), + ("SW%iSUPAC" % i): ( + str(self.switch[i].SupportedActions), + "Switch supported actions", + ), + ("SW%iMAXSW" % i): ( + self.switch[i].MaxSwitch, + "Switch maximum switch", + ), + } + for j in range(self.switch[i].MaxSwitch): + try: + info[("SW%iSW%iNM" % (i, j))] = ( + self.switch[i].GetSwitchName(j), + "Switch %i Device %i name" % (i, j), + ) + info[("SW%iSW%iDS" % (i, j))] = ( + self.switch[i].GetSwitchDescription(j), + "Switch %i Device %i description" % (i, j), + ) + info[("SW%iSW%i" % (i, j))] = ( + self.switch[i].GetSwitch(j), + "Switch %i Device %i state" % (i, j), + ) + info[("SW%iSW%iMN" % (i, j))] = ( + self.switch[i].MinSwitchValue(j), + "Switch %i Device %i minimum value" % (i, j), + ) + info[("SW%iSW%iMX" % (i, j))] = ( + self.switch[i].MaxSwitchValue(j), + "Switch %i Device %i maximum value" % (i, j), + ) + info[("SW%iSW%iST" % (i, j))] = ( + self.switch[i].SwitchStep(j), + "Switch %i Device %i step" % (i, j), + ) + except Exception as e: + logger.debug( + f"Sub-switch {j} of switch {i} gave the following error: {e}" + ) + except Exception as e: + logger.debug(f"Switch {i} gives the following error: {e}") except: info = {("SW%iCONN" % i): (False, "Switch connected")} - info = { - ("SW%iCONN" % i): (True, "Switch connected"), - ("SW%iNAME" % i): (self.switch[i].Name, "Switch name"), - ("SW%iDRVER" % i): ( - self.switch[i].DriverVersion, - "Switch driver version", - ), - ("SW%iDRV" % i): ( - str(self.switch[i].DriverInfo), - "Switch driver name", - ), - ("SW%iINTF" % i): ( - self.switch[i].InterfaceVersion, - "Switch interface version", - ), - ("SW%iDESC" % i): ( - self.switch[i].Description, - "Switch description", - ), - ("SW%iSUPAC" % i): ( - str(self.switch[i].SupportedActions), - "Switch supported actions", - ), - ("SW%iMAXSW" % i): ( - self.switch[i].MaxSwitch, - "Switch maximum switch", - ), - } - for j in range(self.switch[i].MaxSwitch): - info[("SW%iSW%iNM" % (i, j))] = ( - self.switch[i].GetSwitchName(j), - "Switch %i Device %i name" % (i, j), - ) - info[("SW%iSW%iDS" % (i, j))] = ( - self.switch[i].GetSwitchDescription(j), - "Switch %i Device %i description" % (i, j), - ) - info[("SW%iSW%i" % (i, j))] = ( - self.switch[i].GetSwitch(j), - "Switch %i Device %i state" % (i, j), - ) - info[("SW%iSW%iMN" % (i, j))] = ( - self.switch[i].MinSwitchValue(j), - "Switch %i Device %i minimum value" % (i, j), - ) - info[("SW%iSW%iMX" % (i, j))] = ( - self.switch[i].MaxSwitchValue(j), - "Switch %i Device %i maximum value" % (i, j), - ) - info[("SW%iSW%iST" % (i, j))] = ( - self.switch[i].SwitchStep(j), - "Switch %i Device %i step" % (i, j), - ) all_info.append(info) else: @@ -3827,7 +3839,7 @@ def telescope_info(self): @property def threads_info(self): logger.debug("Observatory.threads_info() called") - return { + info = { "DEROTATE": ( not self._derotation_thread is None, "Is derotation thread active", @@ -3841,11 +3853,13 @@ def threads_info(self): "Is status monitor thread active", ), } + return info @property def wcs_info(self): logger.debug("Observatory.wcs_info() called") - return {"WCSDRV": (str(self.wcs_driver), "WCS driver")} + info = {"WCSDRV": (str(self.wcs_driver), "WCS driver")} + return info @property def observatory_location(self): diff --git a/pyscope/utils/__init__.py b/pyscope/utils/__init__.py index 112b7c3b..0b752d78 100644 --- a/pyscope/utils/__init__.py +++ b/pyscope/utils/__init__.py @@ -3,9 +3,11 @@ from ._get_image_source_catalog import _get_image_source_catalog from ._html_line_parser import _get_number_from_line from .airmass import airmass +from .pinpoint_solve import pinpoint_solve from .pyscope_exception import PyscopeException __all__ = [ "airmass", + "pinpoint_solve", "PyscopeException", ] diff --git a/pyscope/utils/pinpoint_solve.py b/pyscope/utils/pinpoint_solve.py new file mode 100644 index 00000000..cf297024 --- /dev/null +++ b/pyscope/utils/pinpoint_solve.py @@ -0,0 +1,137 @@ +import logging +import os +import platform +import time + +import click + +logger = logging.getLogger(__name__) + + +def save_image(filepath): + maxim.SaveFile(filepath, 3, False, 1) + + +def open_image(filepath): + maxim.OpenFile(filepath) + + +def platesolve_image(filepath, new_filepath): + open_image(filepath) + maxim.PinPointSolve() + logger.info(f"Attempting to solve {filepath}") + try: + while maxim.PinPointStatus == 3: + time.sleep(0.1) + if maxim.PinPointStatus == 2: + logger.info("Solve successful") + else: + logger.info("Solve failed") + maxim.PinPointStop() + except Exception as e: + logger.error(f"Solve failed: {e}, saving unsolved image") + save_image(new_filepath) + logger.info(f"Saved to {new_filepath}") + + +@click.command() +@click.option( + "-i", + "input_dir", + type=click.Path(exists=True), + help="""Directory containing images to solve.""", +) +@click.option( + "-o", + "--out-dir", + "output_dir", + default=None, + type=click.Path(), + help="""Directory to save solved images to. If not specified, solved images will be saved to the same directory as the input images.""", +) +@click.option( + "-v", + "--verbose", + count=True, + type=click.IntRange(0, 1), + default=0, + help="""Verbosity level. -v prints info messages""", +) +def pinpoint_solve_cli(input_dir, output_dir=None, verbose=-1): + """Platesolve images in input_dir and save them to output_dir using PinPoint in MaxIm. \b + + Platesolve images in input_dir and save them to output_dir. If output_dir is not specified, solved images will be saved to the same directory as the input images. + + CLI Usage: `python pinpoint_solve.py -i input_dir -o output_dir` + + .. Note:: + This script requires MaxIm DL to be installed on your system, as it uses the PinPoint solver in MaxIm DL. \b + + Parameters + ---------- + input_dir : str + Directory containing images to solve. + output_dir : str (optional) + Directory to save solved images to. If not specified, solved images will be saved to the same directory as the input images. + verboxe : int (optional), default=-1 + Verbosity level. -v prints info messages + + Returns + ------- + None + + Examples + -------- + File directory structure:: + + cwd/ + test_images/ + image1.fit + image2.fit + image3.fit + solved_images/ + + Command + `python pinpoint_solve.py -i "test_images" -o "solved_images"` + + .. Note:: + You may also pass in absolute paths for `input_dir` and `output_dir`. + """ + if verbose > -1: + logger.setLevel(int(10 * (2 - verbose))) + logger.addHandler(logging.StreamHandler()) + logger.debug(f"Starting pinpoint_solve_cli({input_dir}, {output_dir})") + if platform.system() != "Windows": + raise Exception("PinPoint is only available on Windows.") + else: + from win32com.client import Dispatch + # Add input_dir to the end of the current working directory if it is not an absolute path + if not os.path.isabs(input_dir): + input_dir = os.path.join(os.getcwd(), input_dir) + if output_dir is None: + output_dir = input_dir + else: + if not os.path.isabs(output_dir): + output_dir = os.path.join(os.getcwd(), output_dir) + day_images = os.listdir(input_dir) + day_filepaths = [os.path.join(input_dir, filename) for filename in day_images] + new_filepaths = [os.path.join(output_dir, filename) for filename in day_images] + + global maxim + maxim = Dispatch("Maxim.Document") + + # Create end_dir if it doesn't exist + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + for filepath, new_filepath in zip(day_filepaths, new_filepaths): + platesolve_image(filepath, new_filepath) + + logger.debug(f"Finished pinpoint_solve_cli({input_dir}, {output_dir})") + + +pinpoint_solve = pinpoint_solve_cli.callback + + +# if __name__ == "__main__": +# pinpoint_solve_cli() diff --git a/requirements.txt b/requirements.txt index bc4aa266..2b70a6b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ astroquery == 0.4.6 astroscrappy == 1.1.0 click == 8.1.7 cmcrameri == 1.7.0 -markdown == 3.5.1 +markdown == 3.5.2 matplotlib == 3.8.2 numpy == 1.26.3 paramiko == 3.4.0