diff --git a/python/lsst/consdb/pqclient.py b/python/lsst/consdb/pqclient.py
index fa7d8fcb..70688b1a 100644
--- a/python/lsst/consdb/pqclient.py
+++ b/python/lsst/consdb/pqclient.py
@@ -1,58 +1,242 @@
+# This file is part of consdb.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
import os
-from pandas import DataFrame
+from typing import Any
+from urllib.parse import quote
+
import requests
-from requests.exceptions import RequestException
-from typing import Any, Iterable
-from urllib.parse import urljoin
-
-session = requests.Session()
-base_url = os.environ["CONSDB_URL"]
-
-
-def insert(table: str, values: dict[str, Any], **kwargs):
- values.update(kwargs)
- # check values against schema for table
- data = {"table": table, "values": values}
- url = urljoin(base_url, "insert")
- try:
- response = requests.post(url, json=data)
- except RequestException as e:
- raise e
- response.raise_for_status()
-
-
-def query(
- tables: str | Iterable[str],
- columns: str | Iterable[str],
- *,
- where: str | None = None,
- join: str | None = None
-) -> list[Any]:
- if isinstance(tables, str):
- tables = [tables]
- if isinstance(columns, str):
- columns = [columns]
- url = urljoin(base_url, "query")
- data = {"tables": tables, "columns": columns, "where": where, "join": join}
- try:
- response = requests.post(url, json=data)
- except RequestException as e:
- raise e
- try:
+from astropy.table import Table
+
+__all__ = ["ConsDbClient"]
+
+
+logger = logging.getLogger(__name__)
+
+
+def urljoin(*args: str) -> str:
+ """Join parts of a URL with slashes.
+
+ Does not do any quoting. Mostly to remove a level of list-making.
+
+ Parameters
+ ----------
+ *args: `str`
+ Each parameter is a URL part.
+
+ Returns
+ -------
+ url: `str`
+ The joined URL.
+ """
+ return "/".join(args)
+
+
+class ConsDbClient:
+ """A client library for accessing the Consolidated Database.
+
+ This library provides a basic interface for using flexible metadata
+ (key/value pairs associated with observation ids from an observation
+ type table), determining the schema of ConsDB tables, querying the
+ ConsDB using a general SQL SELECT statement, and inserting into
+ ConsDB tables.
+
+ Parameters
+ ----------
+ url: `str`, optional
+ Base URL of the Web service, defaults to the value of environment
+ variable ``LSST_CONSDB_PQ_URL``.
+
+ Notes
+ -----
+ This client is a thin layer over a Web service, which avoids having a
+ dependency on database drivers.
+
+ It enforces the return of query results as Astropy Tables.
+
+ """
+
+ def __init__(self, url: str | None = None):
+ self.session = requests.Session()
+ if url is None:
+ self.url = os.environ["LSST_CONSDB_PQ_URL"]
+ else:
+ self.url = url
+ self.url = self.url.rstrip("/")
+
+ def _handle_get(
+ self, url: str, query: dict[str, str | list[str]] | None = None
+ ) -> Any:
+ """Utility function to submit GET requests.
+
+ Parameters
+ ----------
+ url: `str`
+ URL to GET.
+ query: `dict` [`str`, `str` | `list` [`str`]], optional
+ Query parameters to attach to the URL.
+
+ Raises
+ ------
+ requests.exceptions.RequestException
+ Raised if any kind of connection error occurs.
+ requests.exceptions.HTTPError
+ Raised if a non-successful status is returned.
+ requests.exceptions.JSONDecodeError
+ Raised if the result does not decode as JSON.
+
+ Returns
+ -------
+ result: `Any`
+ Result of decoding the Web service result content as JSON.
+ """
+ logger.debug(f"GET {url}")
+ try:
+ response = self.session.get(url, params=query)
+ except requests.exceptions.RequestException as e:
+ raise e
+ response.raise_for_status()
+ return response.json()
+
+ def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response:
+ """Utility function to submit POST requests.
+
+ Parameters
+ ----------
+ url: `str`
+ URL to POST.
+ data: `dict` [`str`, `Any`]
+ Key/value pairs of data to POST.
+
+ Raises
+ ------
+ requests.exceptions.RequestException
+ Raised if any kind of connection error occurs.
+ requests.exceptions.HTTPError
+ Raised if a non-successful status is returned.
+
+ Returns
+ -------
+ result: `requests.Response`
+ The raw Web service result object.
+ """
+ logger.debug(f"POST {url}: {data}")
+ try:
+ response = self.session.post(url, json=data)
+ except requests.exceptions.RequestException as e:
+ raise e
response.raise_for_status()
- except Exception as ex:
- print(response.content.decode())
- raise ex
- arr = response.json()
- return DataFrame(arr[1:], columns=arr[0])
-
-
-def schema(table: str):
- url = urljoin(base_url, "schema/")
- url = urljoin(url, table)
- try:
- response = requests.get(url)
- except RequestException as e:
- raise e
- response.raise_for_status()
- return response.json()
+ return response
+
+ @staticmethod
+ def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str:
+ """Public utility for computing flexible metadata table names.
+
+ Each instrument and observation type made with that instrument can
+ have a flexible metadata table. This function is useful when
+ issuing SQL queries, and it avoids a round-trip to the server.
+
+ Parameters
+ ----------
+ instrument: `str`
+ Name of the instrument (e.g. ``LATISS``).
+ obs_type: `str`
+ Name of the observation type (e.g. ``Exposure``).
+
+ Returns
+ -------
+ table_name: `str`
+ Name of the appropriate flexible metadata table.
+
+ """
+ return f"cdb_{instrument}.{obs_type}_flexdata"
+
+ def add_flexible_metadata_key(
+ self, instrument: str, obs_type: str, key: str, dtype: str, doc: str
+ ) -> requests.Response:
+ data = {"key": key, "dtype": dtype, "doc": doc}
+ url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey")
+ return self._handle_post(url, data)
+
+ def get_flexible_metadata_keys(
+ self, instrument: str, obs_type: str
+ ) -> list[tuple[str, str, str]]:
+ url = urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema")
+ return self._handle_get(url)
+
+ def get_flexible_metadata(
+ self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None
+ ) -> list[Any]:
+ url = urljoin(
+ self.url,
+ "flex",
+ quote(instrument),
+ quote(obs_type),
+ "obs",
+ quote(str(obs_id)),
+ )
+ return self._handle_get(url, {"k": keys} if keys else None)
+
+ def insert_flexible_metadata(
+ self,
+ instrument: str,
+ obs_type: str,
+ obs_id: int,
+ values: dict[str, Any],
+ **kwargs,
+ ) -> requests.Response:
+ values.update(kwargs)
+ data = values
+ url = urljoin(
+ self.url,
+ "flex",
+ quote(instrument),
+ quote(obs_type),
+ "obs",
+ quote(str(obs_id)),
+ )
+ return self._handle_post(url, data)
+
+ def insert(
+ self,
+ instrument: str,
+ table: str,
+ obs_id: int,
+ values: dict[str, Any],
+ allow_update=False,
+ **kwargs,
+ ) -> requests.Response:
+ values.update(kwargs)
+ data = {"table": table, "obs_id": obs_id, "values": values}
+ op = "upsert" if allow_update else "insert"
+ url = urljoin(self.url, op, quote(instrument))
+ return self._handle_post(url, data)
+
+ def query(self, query: str) -> Table:
+ url = urljoin(self.url, "query")
+ data = {"query": query}
+ result = self._handle_post(url, data).json()
+ return Table(rows=result["data"], names=result["columns"])
+
+ def schema(self, instrument: str, table: str) -> list[tuple[str, str, str]]:
+ url = urljoin(self.url, "schema", quote(instrument), quote(table))
+ return self._handle_get(url)
diff --git a/python/lsst/consdb/pqserver.py b/python/lsst/consdb/pqserver.py
index d10bcdb3..85f59bcf 100644
--- a/python/lsst/consdb/pqserver.py
+++ b/python/lsst/consdb/pqserver.py
@@ -1,45 +1,237 @@
-from flask import Flask, request
-from sqlalchemy import create_engine, MetaData
+# This file is part of consdb.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
import sqlalchemy.exc
+from flask import Flask, jsonify, request
+from sqlalchemy import MetaData
+
+from utils import setup_logging, setup_postgres
+
+INSTRUMENT_LIST = ["latiss"]
+OBS_TYPE_LIST = ["exposure", "visit", "ccdexposure", "ccdvisit"]
+DTYPE_LIST = ["bool", "int", "float", "str"]
+
+
+####################
+# Global app setup #
+####################
-from utils import setup_postgres
app = Flask(__name__)
engine = setup_postgres()
-metadata_obj = MetaData(schema="cdb_latiss")
-metadata_obj.reflect(engine)
+logger = setup_logging(__name__)
+
+
+########################
+# Schema preload class #
+########################
+
+
+class InstrumentTables:
+ def __init__(self):
+ self.table_names = set()
+ self.schemas = dict()
+ for instrument in INSTRUMENT_LIST:
+ md = MetaData(schema=f"cdb_{instrument}")
+ md.reflect(engine)
+ self.table_names.update([str(table) for table in md.tables])
+ self.schemas[instrument] = md
+ self.flexible_metadata_schemas[instrument] = dict()
+ for obs_type in OBS_TYPE_LIST:
+ table_name = f"cdb_{instrument}.{obs_type}_flexdata"
+ schema_table_name = table_name + "_schema"
+ if table_name in self.schemas and schema_table_name in self.schemas:
+ logger.debug(f"SELECT key, dtype, doc FROM {schema_table_name}")
+ # query = engine.select(["key", "dtype", "doc"]).table(schema_name)
+ # self.flexible_metadata_schemas[instrument][obs_type] = {
+ # key: (dtype, doc) for key, dtype, doc in engine.execute(query)
+ # }
+
+ def compute_flexible_metadata_table_name(self, instrument: str, obs_type: str):
+ if instrument not in self.flexible_metadata_schemas:
+ raise BadValueException(
+ "instrument", instrument, self.flexible_metadata_schemas.keys()
+ )
+ if obs_type not in self.flexible_metadata_schemas[instrument]:
+ raise BadValueException(
+ "observation type", obs_type, self.flexible_metadata_schemas.keys()
+ )
+ return f"cdb_{instrument}.{obs_type}_flexdata"
+
+ def compute_flexible_metadata_table_schema_name(
+ self, instrument: str, obs_type: str
+ ):
+ table_name = self.compute_flexible_metadata_table_name(instrument, obs_type)
+ return table_name + "_schema"
+
+
+instrument_tables = InstrumentTables()
+
+
+##################
+# Error handling #
+##################
+
+
+class BadValueException(Exception):
+ status_code = 404
+
+ def __init__(self, kind, value, valid=None):
+ self.kind = kind
+ self.value = value
+ self.valid = valid
+
+ def to_dict(self):
+ data = {
+ "message": f"Unknown {self.kind}",
+ "value": self.value,
+ "valid": self.valid,
+ }
+ return data
+
+
+@app.errorhandler(BadValueException)
+def handle_bad_value(e: BadValueException):
+ return jsonify(e.to_dict()), e.status_code
+
+
+###################################
+# Web service application methods #
+###################################
+
+
+@app.post("/flex///addkey")
+def add_flexible_metadata_key(instrument: str, obs_type: str):
+ info = request.json
+ schema_table = instrument_tables.compute_flexible_metadata_schema_table_name(
+ instrument, obs_type
+ )
+ key = info["key"]
+ dtype = info["dtype"]
+ if dtype not in DTYPE_LIST:
+ raise BadValueException("dtype", dtype, DTYPE_LIST)
+ doc = info["doc"]
+ logger.debug(
+ f"INSERT key, dtype, doc INTO {schema_table} VALUES ('{key}', '{dtype}', '{doc}')"
+ )
+ # with engine.conn() as session:
+ # session.insert(schema_table, [key, dtype, doc])
+ return ("OK", 200)
+
+
+@app.get("/flex///schema")
+def get_flexible_metadata_keys(instrument: str, obs_type: str):
+ table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
+ key_descriptions = []
+ logger.debug(f"SELECT key, dtype, doc FROM {table}")
+ # key_descriptions = select key, dtype, doc from table
+ return jsonify(key_descriptions)
+
+
+@app.get("/flex///obs/")
+def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int):
+ table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
+ result = dict()
+ if request.args:
+ cols = request.args["k"]
+ logger.debug(
+ f"SELECT key, value FROM {table} WHERE obs_id = {obs_id} AND key in ('{cols}')"
+ )
+ # query = engine.select(["key", "value"]).from(table).where("obs_id = :obs_id and key in (:cols)", obs_id, cols)
+ else:
+ logger.debug(f"SELECT key, value FROM {table} WHERE obs_id = {obs_id}")
+ # query = engine.select(["key", "value"]).from(table).where("obs_id = :obs_id", obs_id)
+ # for key, value in engine.execute(query):
+ # result[key] = value
+ return jsonify(result)
+
+
+@app.post("/flex///obs/")
+def insert_flexible_metadata(instrument: str, obs_type: str, obs_id: int):
+ info = request.json
+ table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
+ schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type]
+ key = info["key"]
+ if key not in schema:
+ raise BadValueException("key", key, schema.keys())
+ value = info["value"]
+ # check value against dtype
+ logger.debug(
+ f"INSERT obs_id, key, value INTO {table} VALUES ({obs_id}, '{key}', '{value}')"
+ )
+ # for key, value in info.items():
+ # engine.insert(table, obs_id, key, value)
+ return ("OK", 200)
-@app.post("/insert")
-def insert():
+@app.post("/insert/")
+def insert(instrument: str):
+ if instrument not in instrument_tables.schemas:
+ raise BadValueException(
+ "instrument", instrument, instrument_tables.schemas.keys()
+ )
info = request.json
- table = info["table"]
+ table = instrument + "." + info["table"]
valdict = info["values"]
keylist = list(valdict.keys())
valuelist = list(valdict.values())
- placeholders = ",".join(["?"] * len(valdict))
# check schema
- with engine.begin() as conn:
- conn.exec_driver_sql(
- f"INSERT OR UPDATE INTO ? ({placeholders}) VALUES ({placeholders})",
- [table] + keylist + valuelist,
+ logger.debug(f"INSERT {keylist} INTO {table} VALUES ({valuelist})")
+ # with engine.begin() as conn:
+ # conn.exec_driver_sql(
+ # f"INSERT INTO ? ({placeholders}) VALUES ({placeholders})",
+ # [table] + keylist + valuelist,
+ # )
+ return ("OK", 200)
+
+
+@app.post("/upsert/")
+def upsert(instrument: str):
+ if instrument not in instrument_tables.schemas:
+ raise BadValueException(
+ "instrument", instrument, instrument_tables.schemas.keys()
)
+ info = request.json
+ table = instrument + "." + info["table"]
+ valdict = info["values"]
+ keylist = list(valdict.keys())
+ valuelist = list(valdict.values())
+ # check schema
+
+ logger.debug(f"UPDATE {keylist} INTO {table} VALUES ({valuelist})")
+ # with engine.begin() as conn:
+ # conn.exec_driver_sql(
+ # f"UPDATE INTO {table} ({placeholders}) VALUES ({placeholders})",
+ # [table] + keylist + valuelist,
+ # )
return ("OK", 200)
@app.post("/query")
def query():
info = request.json
- tables = ",".join(info["tables"])
- columns = ",".join(info["columns"])
- if "where" in info:
- where = "WHERE " + info["where"]
- if ";" in where:
- return ("Cannot create query containing more than one statement", 403)
with engine.begin() as conn:
try:
- cursor = conn.exec_driver_sql(f"SELECT {columns} FROM {tables} {where}")
+ cursor = conn.exec_driver_sql(info["query"])
first = True
result = []
for row in cursor:
@@ -47,11 +239,19 @@ def query():
result.append(row._fields)
first = False
result.append(list(row))
- return result
+ return jsonify(result)
except sqlalchemy.exc.DBAPIError as e:
return (str(e), 500)
-@app.get("/schema/")
-def schema(table: str):
- return [(c.name, str(c.type), c.doc) for c in metadata_obj.tables[table.lower()].columns]
+@app.get("/schema//")
+def schema(instrument: str, table: str):
+ if instrument not in instrument_tables.schemas:
+ raise BadValueException(
+ "instrument", instrument, instrument_tables.schemas.keys()
+ )
+ schema = instrument_tables.schemas[instrument]
+ table = table.lower()
+ if table not in schema.tables:
+ raise BadValueException("table", table, schema.tables.keys())
+ return [(c.name, str(c.type), c.doc) for c in schema.tables[table].columns]
diff --git a/python/lsst/consdb/utils.py b/python/lsst/consdb/utils.py
index da966361..3a3d0345 100644
--- a/python/lsst/consdb/utils.py
+++ b/python/lsst/consdb/utils.py
@@ -1,9 +1,48 @@
+# This file is part of consdb.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Utility functions for consdb services.
+"""
import logging
import os
+import re
+import sys
import sqlalchemy
+__all__ = ["setup_postgres", "setup_logging"]
+
+
def setup_postgres() -> sqlalchemy.Engine:
+ """Set up a SQLAlchemy Engine to talk to Postgres.
+
+ Uses environment variables to get connection information, with an internal
+ default.
+
+ Returns
+ -------
+ engine: ``sqlalchemy.Engine``
+ A SQLAlchemy Engine.
+ """
host = os.environ.get("DB_HOST")
passwd = os.environ.get("DB_PASS")
user = os.environ.get("DB_USER")
@@ -17,4 +56,45 @@ def setup_postgres() -> sqlalchemy.Engine:
"POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1"
)
logging.info(f"Using POSTGRES_URL {user} {host} {dbname}")
- return create_engine(pg_url)
+ return sqlalchemy.create_engine(pg_url)
+
+
+def setup_logging(module: str) -> logging.Logger:
+ """Set up logging for a service.
+
+ Levels for any logger component may be set via the LOG_CONFIG environment
+ variable as a comma-separated list of "component=LEVEL" settings. The
+ special "." component refers to the root level of the LSST Science
+ Pipelines loggers.
+
+ Parameter
+ ---------
+ module: `str`
+ The ``__name__`` of the calling module.
+
+ Returns
+ -------
+ logger: `logging.Logger`
+ The logger to use for the module.
+ """
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format="{levelname} {asctime} {name} ({filename}:{lineno}) - {message}",
+ style="{",
+ stream=sys.stderr,
+ force=True,
+ )
+
+ logger = logging.getLogger(module)
+
+ logspec = os.environ.get("LOG_CONFIG")
+ if logspec:
+ # One-line "component=LEVEL" logging specification parser.
+ for component, level in re.findall(r"(?:([\w.]*)=)?(\w+)", logspec):
+ if component == ".":
+ # Specially handle "." as a component to mean the lsst root
+ component = "lsst"
+ logging.getLogger(component).setLevel(level)
+
+ return logger