diff --git a/package.json b/package.json index 87de20050..997473e11 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "tdp_core", "description": "Target discovery platform for exploring rankings of genes, disease models, and other entities.", - "version": "20.1.0", + "version": "20.1.1", "author": { "name": "datavisyn GmbH", "email": "contact@datavisyn.io", @@ -72,7 +72,6 @@ "d3v3": "npm:d3@~3.5.17", "d3v7": "npm:d3@^7.4.0", "jquery": "~3.5.1", - "lineupjs": "4.7.0", "lodash": "~4.17.20", "marked": "~3.0.2", "md5": "^2.3.0", diff --git a/tdp_core/db.py b/tdp_core/db.py index fe9674e03..f519b8d1e 100644 --- a/tdp_core/db.py +++ b/tdp_core/db.py @@ -2,6 +2,7 @@ from typing import Any from flask import abort +from opentelemetry import trace from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.orm import Session from visyn_core import manager @@ -10,6 +11,7 @@ from .sql_filter import filter_logic from .utils import clean_query, secure_replacements +tracer = trace.get_tracer(__name__) _log = logging.getLogger(__name__) @@ -107,18 +109,19 @@ def __init__(self, engine): session wrapper of sql alchemy with auto cleanup :param engine: """ - self._engine = engine - import uuid - - self._name = uuid.uuid4() - _log.debug("%s - engine status before: %s", self._name, engine.pool.status()) - _log.debug("%s - creating session", self._name) - # add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW - # https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g - self._session: Session = manager.db.create_session(engine) - _log.debug("%s - session created", self._name) - self._supports_array_parameter = _supports_sql_parameters(engine.name) - _log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter) + with tracer.start_as_current_span("WrappedSession.__init__", attributes={"db.pool_status": engine.pool.status()}): + self._engine = engine + import uuid + + self._name = uuid.uuid4() + _log.debug("%s - engine status before: %s", self._name, engine.pool.status()) + _log.debug("%s - creating session", self._name) + # add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW + # https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g + self._session: Session = manager.db.create_session(engine) + _log.debug("%s - session created", self._name) + self._supports_array_parameter = _supports_sql_parameters(engine.name) + _log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter) def execute(self, sql, **kwargs): """ @@ -127,17 +130,18 @@ def execute(self, sql, **kwargs): :param kwargs: additional args to replace :return: the session result """ - _log.debug("%s - replace array parameter in sql query: %s", self._name, sql) - parsed = to_query(sql, self._supports_array_parameter, kwargs) - _log.debug("%s - execute the given query with the given args: %s", self._name, sql) - _log.debug("%s (%s)", parsed, kwargs) - try: - return self._session.execute(parsed, kwargs) - except OperationalError as error: - _log.error("OperationalError: %s", error) - abort(408, error) - except SQLAlchemyError as error: - _log.error("SQLAlchemyError: %s", error) + with tracer.start_as_current_span("WrappedSession.execute", attributes={"db.pool_status": self._engine.pool.status()}): + _log.debug("%s - replace array parameter in sql query: %s", self._name, sql) + parsed = to_query(sql, self._supports_array_parameter, kwargs) + _log.debug("%s - execute the given query with the given args: %s", self._name, sql) + _log.debug("%s (%s)", parsed, kwargs) + try: + return self._session.execute(parsed, kwargs) + except OperationalError as error: + _log.error("OperationalError: %s", error) + abort(408, error) + except SQLAlchemyError as error: + _log.error("SQLAlchemyError: %s", error) def run(self, sql, **kwargs): """ @@ -146,40 +150,49 @@ def run(self, sql, **kwargs): :param kwargs: args for this query :return: list of dicts """ - _log.debug("%s - run sql statement: %s", self._name, sql) - result = self.execute(sql, **kwargs) - _log.debug("%s - ran sql statement: %s", self._name, sql) - columns = result.keys() # type: ignore - return [{c: r[c] for c in columns} for r in result] # type: ignore + with tracer.start_as_current_span("WrappedSession.run"): + _log.debug("%s - run sql statement: %s", self._name, sql) + result = self.execute(sql, **kwargs) + _log.debug("%s - ran sql statement: %s", self._name, sql) + columns = result.keys() # type: ignore + return [{c: r[c] for c in columns} for r in result] # type: ignore def __call__(self, sql, **kwargs): - return self.run(sql, **kwargs) + with tracer.start_as_current_span("WrappedSession.__call__"): + return self.run(sql, **kwargs) def __enter__(self): - return self + with tracer.start_as_current_span("WrappedSession.__enter__"): + return self def commit(self): - self._session.commit() + with tracer.start_as_current_span("WrappedSession.commit"): + self._session.commit() def flush(self): - self._session.flush() + with tracer.start_as_current_span("WrappedSession.flush"): + self._session.flush() def rollback(self): - self._session.rollback() + with tracer.start_as_current_span("WrappedSession.rollback"): + self._session.rollback() def _destroy(self): - if self._session: - _log.debug("%s - removing session", self._name) - self._session.close() - self._session = None # type: ignore - _log.debug("%s - removed session", self._name) - _log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status()) + with tracer.start_as_current_span("WrappedSession._destroy", attributes={"db.pool_status": self._engine.pool.status()}): + if self._session: + _log.debug("%s - removing session", self._name) + self._session.close() + self._session = None # type: ignore + _log.debug("%s - removed session", self._name) + _log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status()) def __del__(self): - self._destroy() + with tracer.start_as_current_span("WrappedSession.__del__"): + self._destroy() def __exit__(self, exc_type, exc_val, exc_tb): - self._destroy() + with tracer.start_as_current_span("WrappedSession.__exit__"): + self._destroy() def session(engine): @@ -353,84 +366,90 @@ def get_data( :param filters: the dict of dynamically build filter :return: (r, view) tuple of the resulting rows and the resolved view """ - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db.get_data"): + config, engine, view = resolve_view(database, view_name) - kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) + kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) - query = view.query + query = view.query - if callable(query): - _log.debug("GET DATA with callback variant") - # callback variant - return query(engine, arguments, filters), view + if callable(query): + with tracer.start_as_current_span("db.get_data with callback"): + _log.debug("GET DATA with callback variant") + # callback variant + return query(engine, arguments, filters), view - with session(engine) as sess: - _log.debug("%s - GET DATA with session", sess._name) - if config.statement_timeout and config.statement_timeout_query: - _log.debug("set statement_timeout to {}".format(config.statement_timeout)) - sess.execute(config.statement_timeout_query.format(config.statement_timeout)) - _log.debug("%s - GET DATA before run", sess._name) - r = sess.run(query.format(**replace), **kwargs) - _log.debug("%s - GET DATA after run", sess._name) - return r, view + with session(engine) as sess: + _log.debug("%s - GET DATA with session", sess._name) + if config.statement_timeout and config.statement_timeout_query: + _log.debug("set statement_timeout to {}".format(config.statement_timeout)) + sess.execute(config.statement_timeout_query.format(config.statement_timeout)) + _log.debug("%s - GET DATA before run", sess._name) + r = sess.run(query.format(**replace), **kwargs) + _log.debug("%s - GET DATA after run", sess._name) + return r, view def get_query(database, view_name, replacements=None, arguments=None, extra_sql_argument=None): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db.get_query"): + config, engine, view = resolve_view(database, view_name) - kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) + kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) - query = view.query + query = view.query - if callable(query): - return {"query": "custom function", "args": kwargs} + if callable(query): + return {"query": "custom function", "args": kwargs} - return {"query": clean_query(query.format(**replace)), "args": kwargs} + return {"query": clean_query(query.format(**replace)), "args": kwargs} def get_filtered_data(database, view_name, args): - config, _, view = resolve_view(database, view_name) - # convert to index lookup - # row id start with 1 - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + with tracer.start_as_current_span("db.get_filtered_data"): + config, _, view = resolve_view(database, view_name) + # convert to index lookup + # row id start with 1 + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - return get_data(database, view_name, replacements, processed_args, extra_args, where_clause) + return get_data(database, view_name, replacements, processed_args, extra_args, where_clause) def get_filtered_query(database, view_name, args): - config, _, view = resolve_view(database, view_name) - # convert to index lookup - # row id start with 1 - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + with tracer.start_as_current_span("db.get_filtered_query"): + config, _, view = resolve_view(database, view_name) + # convert to index lookup + # row id start with 1 + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - return get_query(database, view_name, replacements, processed_args, extra_args) + return get_query(database, view_name, replacements, processed_args, extra_args) def _get_count(database, view_name, args): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db._get_count"): + config, engine, view = resolve_view(database, view_name) - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - kwargs, replace = prepare_arguments(view, config, replacements, processed_args, extra_args) + kwargs, replace = prepare_arguments(view, config, replacements, processed_args, extra_args) - if "count" in view.queries: - count_query = view.queries["count"] - elif view.table: - count_query = "SELECT count(d.*) as count FROM {table} d {{joins}} {{where}}".format(table=view.table) - else: - count_query = None - abort(500, "invalid view configuration, missing count query and cannot derive it") + if "count" in view.queries: + count_query = view.queries["count"] + elif view.table: + count_query = "SELECT count(d.*) as count FROM {table} d {{joins}} {{where}}".format(table=view.table) + else: + count_query = None + abort(500, "invalid view configuration, missing count query and cannot derive it") - return config, engine, count_query, processed_args, where_clause, replace, kwargs + return config, engine, count_query, processed_args, where_clause, replace, kwargs def get_count(database, view_name, args): @@ -440,154 +459,168 @@ def get_count(database, view_name, args): :param view_name: view name :return: the count of results """ + with tracer.start_as_current_span("db.get_count"): + ( + config, + engine, + count_query, + processed_args, + where_clause, + replace, + kwargs, + ) = _get_count(database, view_name, args) + + if callable(count_query): + with tracer.start_as_current_span("db.get_count with callback"): + # callback variant + return count_query(engine, processed_args, where_clause) - ( - config, - engine, - count_query, - processed_args, - where_clause, - replace, - kwargs, - ) = _get_count(database, view_name, args) - - if callable(count_query): - # callback variant - return count_query(engine, processed_args, where_clause) - - with session(engine) as sess: - _log.debug("%s - GET COUNT with session", sess._name) - if config.statement_timeout and config.statement_timeout_query: - _log.debug("set statement_timeout to {}".format(config.statement_timeout)) - sess.execute(config.statement_timeout_query.format(config.statement_timeout)) - _log.debug("%s - GET COUNT before run", sess._name) - r = sess.run(count_query.format(**replace), **kwargs) - _log.debug("%s - GET COUNT after run", sess._name) - if r: - return r[0]["count"] - return 0 + with session(engine) as sess: + _log.debug("%s - GET COUNT with session", sess._name) + if config.statement_timeout and config.statement_timeout_query: + _log.debug("set statement_timeout to {}".format(config.statement_timeout)) + sess.execute(config.statement_timeout_query.format(config.statement_timeout)) + _log.debug("%s - GET COUNT before run", sess._name) + r = sess.run(count_query.format(**replace), **kwargs) + _log.debug("%s - GET COUNT after run", sess._name) + if r: + return r[0]["count"] + return 0 def get_count_query(database, view_name, args): - ( - config, - engine, - count_query, - processed_args, - where_clause, - replace, - kwargs, - ) = _get_count(database, view_name, args) + with tracer.start_as_current_span("db.get_count_query"): + ( + config, + engine, + count_query, + processed_args, + where_clause, + replace, + kwargs, + ) = _get_count(database, view_name, args) - if callable(count_query): - return {"query": "custom function", "args": kwargs} + if callable(count_query): + return {"query": "custom function", "args": kwargs} - return {"query": count_query.format(**replace), "args": kwargs} + return {"query": count_query.format(**replace), "args": kwargs} def derive_columns(table_name, engine, columns=None): """ helper function to derive the columns of a table """ - columns = columns or {} - - for col in get_columns(engine, table_name): - name = col["column"] - if name in columns: - # merge - old = columns[name] - for k, v in col.items(): - if k not in old: - old[k] = v - else: - columns[name] = col - - # derive the missing domains and categories - number_columns = [k for k, col in columns.items() if col["type"] == "number" and ("min" not in col or "max" not in col)] - categorical_columns = [ - k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col - ] - if number_columns or categorical_columns: - with session(engine) as sess: - _log.debug("%s - DERIVE COLUMNS with session", sess._name) - if number_columns: - template = "min({col}) as {col}_min, max({col}) as {col}_max" - minmax = ", ".join(template.format(col=col) for col in number_columns) - _log.debug("%s - DERIVE COLUMNS number columns before run", sess._name) - row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax)))) # type: ignore - _log.debug("%s - DERIVE COLUMNS number columns after run", sess._name) - for num_col in number_columns: - columns[num_col]["min"] = row[num_col + "_min"] - columns[num_col]["max"] = row[num_col + "_max"] - for col in categorical_columns: - template = """SELECT distinct {col} as cat FROM {table} WHERE {col} is not NULL""" - if _differentiates_empty_string_and_null(engine.name): - template += """ AND {col} <> ''""" - template += """ ORDER BY {col} ASC""" - _log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col) - cats = sess.execute(template.format(col=col, table=table_name)) - _log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col) - categories = [str(r["cat"]) for r in cats if r["cat"] is not None] # type: ignore - if columns[col]["type"] == "set": - separator = getattr(columns[col], "separator", ";") - separated_categories = [category.split(separator) for category in categories] - # flatten array - categories = list({category for sublist in separated_categories for category in sublist}) - categories.sort() # sort list to avoid random order with each run - columns[col]["categories"] = categories - _log.debug("%s - DERIVE COLUMNS done", sess._name) - - return columns + with tracer.start_as_current_span("db.derive_columns"): + columns = columns or {} + + for col in get_columns(engine, table_name): + name = col["column"] + if name in columns: + # merge + old = columns[name] + for k, v in col.items(): + if k not in old: + old[k] = v + else: + columns[name] = col + + # derive the missing domains and categories + number_columns = [k for k, col in columns.items() if col["type"] == "number" and ("min" not in col or "max" not in col)] + categorical_columns = [ + k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col + ] + if number_columns or categorical_columns: + with tracer.start_as_current_span( + "db.derive_column", + attributes={ + "db.table_name": table_name, + "db.number_columns": number_columns, + "db.categorical_columns": categorical_columns, + }, + ), session(engine) as sess: + _log.debug("%s - DERIVE COLUMNS with session", sess._name) + if number_columns: + template = "min({col}) as {col}_min, max({col}) as {col}_max" + minmax = ", ".join(template.format(col=col) for col in number_columns) + _log.debug("%s - DERIVE COLUMNS number columns before run", sess._name) + row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax)))) # type: ignore + _log.debug("%s - DERIVE COLUMNS number columns after run", sess._name) + for num_col in number_columns: + columns[num_col]["min"] = row[num_col + "_min"] + columns[num_col]["max"] = row[num_col + "_max"] + for col in categorical_columns: + template = """SELECT distinct {col} as cat FROM {table} WHERE {col} is not NULL""" + if _differentiates_empty_string_and_null(engine.name): + template += """ AND {col} <> ''""" + template += """ ORDER BY {col} ASC""" + _log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col) + cats = sess.execute(template.format(col=col, table=table_name)) + _log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col) + categories = [str(r["cat"]) for r in cats if r["cat"] is not None] # type: ignore + if columns[col]["type"] == "set": + separator = getattr(columns[col], "separator", ";") + separated_categories = [category.split(separator) for category in categories] + # flatten array + categories = list({category for sublist in separated_categories for category in sublist}) + categories.sort() # sort list to avoid random order with each run + columns[col]["categories"] = categories + _log.debug("%s - DERIVE COLUMNS done", sess._name) + + return columns def _fill_up_columns(view, engine): - _log.debug("fill up view %s", view) - # update the real object - view.columns = derive_columns(view.table, engine, view.columns) - view.columns_filled_up = True + with tracer.start_as_current_span("db._fill_up_columns"): + _log.debug("fill up view %s", view) + # update the real object + view.columns = derive_columns(view.table, engine, view.columns) + view.columns_filled_up = True def _lookup(database, view_name, query, page, limit, args): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db._lookup"): + config, engine, view = resolve_view(database, view_name) - arguments = MultiDict(args) - offset = page * limit - # replace with wildcard version - arguments["query"] = "%{}%".format(query) - arguments["query_end"] = "%{}".format(query) - arguments["query_start"] = "{}%".format(query) - arguments["query_match"] = "{}".format(query) - # add 1 for checking if we have more - replacements = {"limit": limit + 1, "offset": offset, "offset2": (offset + limit + 1)} + arguments = MultiDict(args) + offset = page * limit + # replace with wildcard version + arguments["query"] = "%{}%".format(query) + arguments["query_end"] = "%{}".format(query) + arguments["query_start"] = "{}%".format(query) + arguments["query_match"] = "{}".format(query) + # add 1 for checking if we have more + replacements = {"limit": limit + 1, "offset": offset, "offset2": (offset + limit + 1)} - kwargs, replace = prepare_arguments(view, config, replacements, arguments) + kwargs, replace = prepare_arguments(view, config, replacements, arguments) - return engine, view, view.query, replace, kwargs + return engine, view, view.query, replace, kwargs def lookup_query(database, view_name, query, page, limit, args): - engine, _, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) + with tracer.start_as_current_span("db.lookup_query"): + engine, _, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) - if callable(sql): - return {"query": "custom function", "args": kwargs} + if callable(sql): + return {"query": "custom function", "args": kwargs} - return {"query": sql.format(**replace), "args": kwargs} + return {"query": sql.format(**replace), "args": kwargs} def lookup(database, view_name, query, page, limit, args): - engine, view, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) + with tracer.start_as_current_span("db.lookup"): + engine, view, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) - if callable(sql): - kwargs.update(replace) - # callback variant - return sql(engine, kwargs, None) + if callable(sql): + kwargs.update(replace) + # callback variant + return sql(engine, kwargs, None) - with session(engine) as sess: - r_items = sess.run(sql.format(**replace), **kwargs) + with session(engine) as sess: + r_items = sess.run(sql.format(**replace), **kwargs) - more = len(r_items) > limit - if more: - # hit the boundary of more remove the artificial one - del r_items[-1] + more = len(r_items) > limit + if more: + # hit the boundary of more remove the artificial one + del r_items[-1] - return r_items, more, view + return r_items, more, view diff --git a/tdp_core/dbview.py b/tdp_core/dbview.py index 7df2e3b32..36e1eb1bd 100644 --- a/tdp_core/dbview.py +++ b/tdp_core/dbview.py @@ -4,12 +4,15 @@ from typing import Any import sqlalchemy +from opentelemetry import trace from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import QueuePool from visyn_core.security import current_user, is_logged_in from .utils import clean_query +tracer = trace.get_tracer(__name__) _log = logging.getLogger(__name__) REGEX_TYPE = type(re.compile("")) @@ -607,28 +610,41 @@ def __init__(self, views=None, agg_score=None, mappings=None): :param agg_score: optional specify how aggregation should be handled :param mappings: optional database mappings """ - _log.debug("create db connector") - self.agg_score = agg_score or default_agg_score - self.views = views or {} - self.dburl: str = None # type: ignore - self.mappings = mappings - self.statement_timeout = None - self.statement_timeout_query: str | None = None - self.description = "" + with tracer.start_as_current_span("DBConnector.init"): + _log.debug("create db connector") + self.agg_score = agg_score or default_agg_score + self.views = views or {} + self.dburl: str = None # type: ignore + self.mappings = mappings + self.statement_timeout = None + self.statement_timeout_query: str | None = None + self.description = "" def dump(self, name): return OrderedDict(name=name, description=self.description) def create_engine(self, config) -> Engine: - engine_options = { - # Increase the pool size to 30 to avoid "too many clients" errors - "pool_size": 30, - "pool_pre_ping": True, - } + import importlib + + poolclass_name = config.get("poolclass", "QueuePool") # set to SQLAlchemy default poolclass = QueuePool + try: + poolclass = getattr(importlib.import_module("sqlalchemy.pool"), poolclass_name) + except AttributeError: + _log.warning("db connector: poolclass %s not found, using default QueuePool", poolclass_name) + poolclass = QueuePool + + _log.info("db connector: using poolclass %s", poolclass) + + # Set some default engine options for QueuePool to be backwards compatible with previous tdp_core code + engine_options = {"pool_size": 30, "pool_pre_ping": True} if poolclass_name == "QueuePool" or poolclass == QueuePool else {} + engine_options.update(config.get("engine", {})) _log.debug("db connector: create engine with options %s", engine_options) - return sqlalchemy.create_engine(self.dburl, **engine_options) + + with tracer.start_as_current_span("DBConnector.create_engine"): + return sqlalchemy.create_engine(self.dburl, poolclass=poolclass, **engine_options) def create_sessionmaker(self, engine) -> sessionmaker: - _log.debug("db connector: create_sessionmaker") - return sessionmaker(bind=engine) + with tracer.start_as_current_span("DBConnector.create_sessionmaker"): + _log.debug("db connector: create_sessionmaker") + return sessionmaker(bind=engine)