From 64c2190c7dd0b0c525010b78cb2a69734be99161 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Fri, 26 Apr 2024 23:17:48 -0700 Subject: [PATCH] Refactor and extend publish/query. --- python/lsst/consdb/pqclient.py | 294 +++++++++++++++++++++++++++------ python/lsst/consdb/pqserver.py | 248 ++++++++++++++++++++++++--- python/lsst/consdb/utils.py | 82 ++++++++- 3 files changed, 544 insertions(+), 80 deletions(-) diff --git a/python/lsst/consdb/pqclient.py b/python/lsst/consdb/pqclient.py index fa7d8fcb..70688b1a 100644 --- a/python/lsst/consdb/pqclient.py +++ b/python/lsst/consdb/pqclient.py @@ -1,58 +1,242 @@ +# This file is part of consdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging import os -from pandas import DataFrame +from typing import Any +from urllib.parse import quote + import requests -from requests.exceptions import RequestException -from typing import Any, Iterable -from urllib.parse import urljoin - -session = requests.Session() -base_url = os.environ["CONSDB_URL"] - - -def insert(table: str, values: dict[str, Any], **kwargs): - values.update(kwargs) - # check values against schema for table - data = {"table": table, "values": values} - url = urljoin(base_url, "insert") - try: - response = requests.post(url, json=data) - except RequestException as e: - raise e - response.raise_for_status() - - -def query( - tables: str | Iterable[str], - columns: str | Iterable[str], - *, - where: str | None = None, - join: str | None = None -) -> list[Any]: - if isinstance(tables, str): - tables = [tables] - if isinstance(columns, str): - columns = [columns] - url = urljoin(base_url, "query") - data = {"tables": tables, "columns": columns, "where": where, "join": join} - try: - response = requests.post(url, json=data) - except RequestException as e: - raise e - try: +from astropy.table import Table + +__all__ = ["ConsDbClient"] + + +logger = logging.getLogger(__name__) + + +def urljoin(*args: str) -> str: + """Join parts of a URL with slashes. + + Does not do any quoting. Mostly to remove a level of list-making. + + Parameters + ---------- + *args: `str` + Each parameter is a URL part. + + Returns + ------- + url: `str` + The joined URL. + """ + return "/".join(args) + + +class ConsDbClient: + """A client library for accessing the Consolidated Database. + + This library provides a basic interface for using flexible metadata + (key/value pairs associated with observation ids from an observation + type table), determining the schema of ConsDB tables, querying the + ConsDB using a general SQL SELECT statement, and inserting into + ConsDB tables. + + Parameters + ---------- + url: `str`, optional + Base URL of the Web service, defaults to the value of environment + variable ``LSST_CONSDB_PQ_URL``. + + Notes + ----- + This client is a thin layer over a Web service, which avoids having a + dependency on database drivers. + + It enforces the return of query results as Astropy Tables. + + """ + + def __init__(self, url: str | None = None): + self.session = requests.Session() + if url is None: + self.url = os.environ["LSST_CONSDB_PQ_URL"] + else: + self.url = url + self.url = self.url.rstrip("/") + + def _handle_get( + self, url: str, query: dict[str, str | list[str]] | None = None + ) -> Any: + """Utility function to submit GET requests. + + Parameters + ---------- + url: `str` + URL to GET. + query: `dict` [`str`, `str` | `list` [`str`]], optional + Query parameters to attach to the URL. + + Raises + ------ + requests.exceptions.RequestException + Raised if any kind of connection error occurs. + requests.exceptions.HTTPError + Raised if a non-successful status is returned. + requests.exceptions.JSONDecodeError + Raised if the result does not decode as JSON. + + Returns + ------- + result: `Any` + Result of decoding the Web service result content as JSON. + """ + logger.debug(f"GET {url}") + try: + response = self.session.get(url, params=query) + except requests.exceptions.RequestException as e: + raise e + response.raise_for_status() + return response.json() + + def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response: + """Utility function to submit POST requests. + + Parameters + ---------- + url: `str` + URL to POST. + data: `dict` [`str`, `Any`] + Key/value pairs of data to POST. + + Raises + ------ + requests.exceptions.RequestException + Raised if any kind of connection error occurs. + requests.exceptions.HTTPError + Raised if a non-successful status is returned. + + Returns + ------- + result: `requests.Response` + The raw Web service result object. + """ + logger.debug(f"POST {url}: {data}") + try: + response = self.session.post(url, json=data) + except requests.exceptions.RequestException as e: + raise e response.raise_for_status() - except Exception as ex: - print(response.content.decode()) - raise ex - arr = response.json() - return DataFrame(arr[1:], columns=arr[0]) - - -def schema(table: str): - url = urljoin(base_url, "schema/") - url = urljoin(url, table) - try: - response = requests.get(url) - except RequestException as e: - raise e - response.raise_for_status() - return response.json() + return response + + @staticmethod + def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str: + """Public utility for computing flexible metadata table names. + + Each instrument and observation type made with that instrument can + have a flexible metadata table. This function is useful when + issuing SQL queries, and it avoids a round-trip to the server. + + Parameters + ---------- + instrument: `str` + Name of the instrument (e.g. ``LATISS``). + obs_type: `str` + Name of the observation type (e.g. ``Exposure``). + + Returns + ------- + table_name: `str` + Name of the appropriate flexible metadata table. + + """ + return f"cdb_{instrument}.{obs_type}_flexdata" + + def add_flexible_metadata_key( + self, instrument: str, obs_type: str, key: str, dtype: str, doc: str + ) -> requests.Response: + data = {"key": key, "dtype": dtype, "doc": doc} + url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey") + return self._handle_post(url, data) + + def get_flexible_metadata_keys( + self, instrument: str, obs_type: str + ) -> list[tuple[str, str, str]]: + url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema") + return self._handle_get(url) + + def get_flexible_metadata( + self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None + ) -> list[Any]: + url = urljoin( + self.url, + "flex", + quote(instrument), + quote(obs_type), + "obs", + quote(str(obs_id)), + ) + return self._handle_get(url, {"k": keys} if keys else None) + + def insert_flexible_metadata( + self, + instrument: str, + obs_type: str, + obs_id: int, + values: dict[str, Any], + **kwargs, + ) -> requests.Response: + values.update(kwargs) + data = values + url = urljoin( + self.url, + "flex", + quote(instrument), + quote(obs_type), + "obs", + quote(str(obs_id)), + ) + return self._handle_post(url, data) + + def insert( + self, + instrument: str, + table: str, + obs_id: int, + values: dict[str, Any], + allow_update=False, + **kwargs, + ) -> requests.Response: + values.update(kwargs) + data = {"table": table, "obs_id": obs_id, "values": values} + op = "upsert" if allow_update else "insert" + url = urljoin(self.url, op, quote(instrument)) + return self._handle_post(url, data) + + def query(self, query: str) -> Table: + url = urljoin(self.url, "query") + data = {"query": query} + result = self._handle_post(url, data).json() + return Table(rows=result["data"], names=result["columns"]) + + def schema(self, instrument: str, table: str) -> list[tuple[str, str, str]]: + url = urljoin(self.url, "schema", quote(instrument), quote(table)) + return self._handle_get(url) diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py index d10bcdb3..85f59bcf 100644 --- a/python/lsst/consdb/pqserver.py +++ b/python/lsst/consdb/pqserver.py @@ -1,45 +1,237 @@ -from flask import Flask, request -from sqlalchemy import create_engine, MetaData +# This file is part of consdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + import sqlalchemy.exc +from flask import Flask, jsonify, request +from sqlalchemy import MetaData + +from utils import setup_logging, setup_postgres + +INSTRUMENT_LIST = ["latiss"] +OBS_TYPE_LIST = ["exposure", "visit", "ccdexposure", "ccdvisit"] +DTYPE_LIST = ["bool", "int", "float", "str"] + + +#################### +# Global app setup # +#################### -from utils import setup_postgres app = Flask(__name__) engine = setup_postgres() -metadata_obj = MetaData(schema="cdb_latiss") -metadata_obj.reflect(engine) +logger = setup_logging(__name__) + + +######################## +# Schema preload class # +######################## + + +class InstrumentTables: + def __init__(self): + self.table_names = set() + self.schemas = dict() + for instrument in INSTRUMENT_LIST: + md = MetaData(schema=f"cdb_{instrument}") + md.reflect(engine) + self.table_names.update([str(table) for table in md.tables]) + self.schemas[instrument] = md + self.flexible_metadata_schemas[instrument] = dict() + for obs_type in OBS_TYPE_LIST: + table_name = f"cdb_{instrument}.{obs_type}_flexdata" + schema_table_name = table_name + "_schema" + if table_name in self.schemas and schema_table_name in self.schemas: + logger.debug(f"SELECT key, dtype, doc FROM {schema_table_name}") + # query = engine.select(["key", "dtype", "doc"]).table(schema_name) + # self.flexible_metadata_schemas[instrument][obs_type] = { + # key: (dtype, doc) for key, dtype, doc in engine.execute(query) + # } + + def compute_flexible_metadata_table_name(self, instrument: str, obs_type: str): + if instrument not in self.flexible_metadata_schemas: + raise BadValueException( + "instrument", instrument, self.flexible_metadata_schemas.keys() + ) + if obs_type not in self.flexible_metadata_schemas[instrument]: + raise BadValueException( + "observation type", obs_type, self.flexible_metadata_schemas.keys() + ) + return f"cdb_{instrument}.{obs_type}_flexdata" + + def compute_flexible_metadata_table_schema_name( + self, instrument: str, obs_type: str + ): + table_name = self.compute_flexible_metadata_table_name(instrument, obs_type) + return table_name + "_schema" + + +instrument_tables = InstrumentTables() + + +################## +# Error handling # +################## + + +class BadValueException(Exception): + status_code = 404 + + def __init__(self, kind, value, valid=None): + self.kind = kind + self.value = value + self.valid = valid + + def to_dict(self): + data = { + "message": f"Unknown {self.kind}", + "value": self.value, + "valid": self.valid, + } + return data + + +@app.errorhandler(BadValueException) +def handle_bad_value(e: BadValueException): + return jsonify(e.to_dict()), e.status_code + + +################################### +# Web service application methods # +################################### + + +@app.post("/flex///addkey") +def add_flexible_metadata_key(instrument: str, obs_type: str): + info = request.json + schema_table = instrument_tables.compute_flexible_metadata_schema_table_name( + instrument, obs_type + ) + key = info["key"] + dtype = info["dtype"] + if dtype not in DTYPE_LIST: + raise BadValueException("dtype", dtype, DTYPE_LIST) + doc = info["doc"] + logger.debug( + f"INSERT key, dtype, doc INTO {schema_table} VALUES ('{key}', '{dtype}', '{doc}')" + ) + # with engine.conn() as session: + # session.insert(schema_table, [key, dtype, doc]) + return ("OK", 200) + + +@app.get("/flex///schema") +def get_flexible_metadata_keys(instrument: str, obs_type: str): + table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) + key_descriptions = [] + logger.debug(f"SELECT key, dtype, doc FROM {table}") + # key_descriptions = select key, dtype, doc from table + return jsonify(key_descriptions) + + +@app.get("/flex///obs/") +def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int): + table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) + result = dict() + if request.args: + cols = request.args["k"] + logger.debug( + f"SELECT key, value FROM {table} WHERE obs_id = {obs_id} AND key in ('{cols}')" + ) + # query = engine.select(["key", "value"]).from(table).where("obs_id = :obs_id and key in (:cols)", obs_id, cols) + else: + logger.debug(f"SELECT key, value FROM {table} WHERE obs_id = {obs_id}") + # query = engine.select(["key", "value"]).from(table).where("obs_id = :obs_id", obs_id) + # for key, value in engine.execute(query): + # result[key] = value + return jsonify(result) + + +@app.post("/flex///obs/") +def insert_flexible_metadata(instrument: str, obs_type: str, obs_id: int): + info = request.json + table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type) + schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type] + key = info["key"] + if key not in schema: + raise BadValueException("key", key, schema.keys()) + value = info["value"] + # check value against dtype + logger.debug( + f"INSERT obs_id, key, value INTO {table} VALUES ({obs_id}, '{key}', '{value}')" + ) + # for key, value in info.items(): + # engine.insert(table, obs_id, key, value) + return ("OK", 200) -@app.post("/insert") -def insert(): +@app.post("/insert/") +def insert(instrument: str): + if instrument not in instrument_tables.schemas: + raise BadValueException( + "instrument", instrument, instrument_tables.schemas.keys() + ) info = request.json - table = info["table"] + table = instrument + "." + info["table"] valdict = info["values"] keylist = list(valdict.keys()) valuelist = list(valdict.values()) - placeholders = ",".join(["?"] * len(valdict)) # check schema - with engine.begin() as conn: - conn.exec_driver_sql( - f"INSERT OR UPDATE INTO ? ({placeholders}) VALUES ({placeholders})", - [table] + keylist + valuelist, + logger.debug(f"INSERT {keylist} INTO {table} VALUES ({valuelist})") + # with engine.begin() as conn: + # conn.exec_driver_sql( + # f"INSERT INTO ? ({placeholders}) VALUES ({placeholders})", + # [table] + keylist + valuelist, + # ) + return ("OK", 200) + + +@app.post("/upsert/") +def upsert(instrument: str): + if instrument not in instrument_tables.schemas: + raise BadValueException( + "instrument", instrument, instrument_tables.schemas.keys() ) + info = request.json + table = instrument + "." + info["table"] + valdict = info["values"] + keylist = list(valdict.keys()) + valuelist = list(valdict.values()) + # check schema + + logger.debug(f"UPDATE {keylist} INTO {table} VALUES ({valuelist})") + # with engine.begin() as conn: + # conn.exec_driver_sql( + # f"UPDATE INTO {table} ({placeholders}) VALUES ({placeholders})", + # [table] + keylist + valuelist, + # ) return ("OK", 200) @app.post("/query") def query(): info = request.json - tables = ",".join(info["tables"]) - columns = ",".join(info["columns"]) - if "where" in info: - where = "WHERE " + info["where"] - if ";" in where: - return ("Cannot create query containing more than one statement", 403) with engine.begin() as conn: try: - cursor = conn.exec_driver_sql(f"SELECT {columns} FROM {tables} {where}") + cursor = conn.exec_driver_sql(info["query"]) first = True result = [] for row in cursor: @@ -47,11 +239,19 @@ def query(): result.append(row._fields) first = False result.append(list(row)) - return result + return jsonify(result) except sqlalchemy.exc.DBAPIError as e: return (str(e), 500) -@app.get("/schema/") -def schema(table: str): - return [(c.name, str(c.type), c.doc) for c in metadata_obj.tables[table.lower()].columns] +@app.get("/schema//
") +def schema(instrument: str, table: str): + if instrument not in instrument_tables.schemas: + raise BadValueException( + "instrument", instrument, instrument_tables.schemas.keys() + ) + schema = instrument_tables.schemas[instrument] + table = table.lower() + if table not in schema.tables: + raise BadValueException("table", table, schema.tables.keys()) + return [(c.name, str(c.type), c.doc) for c in schema.tables[table].columns] diff --git a/python/lsst/consdb/utils.py b/python/lsst/consdb/utils.py index da966361..3a3d0345 100644 --- a/python/lsst/consdb/utils.py +++ b/python/lsst/consdb/utils.py @@ -1,9 +1,48 @@ +# This file is part of consdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Utility functions for consdb services. +""" import logging import os +import re +import sys import sqlalchemy +__all__ = ["setup_postgres", "setup_logging"] + + def setup_postgres() -> sqlalchemy.Engine: + """Set up a SQLAlchemy Engine to talk to Postgres. + + Uses environment variables to get connection information, with an internal + default. + + Returns + ------- + engine: ``sqlalchemy.Engine`` + A SQLAlchemy Engine. + """ host = os.environ.get("DB_HOST") passwd = os.environ.get("DB_PASS") user = os.environ.get("DB_USER") @@ -17,4 +56,45 @@ def setup_postgres() -> sqlalchemy.Engine: "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1" ) logging.info(f"Using POSTGRES_URL {user} {host} {dbname}") - return create_engine(pg_url) + return sqlalchemy.create_engine(pg_url) + + +def setup_logging(module: str) -> logging.Logger: + """Set up logging for a service. + + Levels for any logger component may be set via the LOG_CONFIG environment + variable as a comma-separated list of "component=LEVEL" settings. The + special "." component refers to the root level of the LSST Science + Pipelines loggers. + + Parameter + --------- + module: `str` + The ``__name__`` of the calling module. + + Returns + ------- + logger: `logging.Logger` + The logger to use for the module. + """ + + logging.basicConfig( + level=logging.INFO, + format="{levelname} {asctime} {name} ({filename}:{lineno}) - {message}", + style="{", + stream=sys.stderr, + force=True, + ) + + logger = logging.getLogger(module) + + logspec = os.environ.get("LOG_CONFIG") + if logspec: + # One-line "component=LEVEL" logging specification parser. + for component, level in re.findall(r"(?:([\w.]*)=)?(\w+)", logspec): + if component == ".": + # Specially handle "." as a component to mean the lsst root + component = "lsst" + logging.getLogger(component).setLevel(level) + + return logger