diff --git a/Makefile b/Makefile index 129c6c118..76c2d6126 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,10 @@ ruff = ruff check $(pkg_src) setup.py --line-length 140 --select E,W,F,N,I,C,B,U start: python $(pkg_src) +.PHONY: celery ## Start the celery worker +celery: + celery -A $(pkg_src).dev_celery worker --loglevel=INFO --concurrency=8 -O fair -P prefork + .PHONY: all ## Perform the most common development-time rules all: format lint test diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..55cc55399 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: '3.0' + +services: + redis: + image: redis:7.4-alpine + ports: + - '6379:6379' + command: redis-server --requirepass admin diff --git a/requirements.txt b/requirements.txt index 720889b6c..66df29597 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # a2wsgi==1.6.0 # This WSIGMiddleware is not compatible with starlette_context alembic==1.13.2 cachetools==5.3.3 +celery>=5.0.0,<6.0.0 +celery-types~=0.22.0 fastapi==0.112.2 json-cfg==0.4.2 openpyxl==3.1.5 @@ -18,9 +20,10 @@ psycopg==3.2.1 psycopg2==2.9.9 pydantic==1.10.17 pyjwt[crypto]==2.9.0 -pytest-postgresql==6.0.1 +pytest-postgresql==6.1.1 python-dateutil==2.9.0.post0 requests==2.32.3 +redis>=5.0.0,<6.0.0 sentry-sdk~=2.13.0 SQLAlchemy>=1.4.40,<=1.4.53 starlette-context==0.3.6 diff --git a/visyn_core/celery/app.py b/visyn_core/celery/app.py new file mode 100644 index 000000000..f416bef93 --- /dev/null +++ b/visyn_core/celery/app.py @@ -0,0 +1,44 @@ +import logging +import logging.config + +from celery import Celery + +from ..plugin.parser import EntryPointPlugin + + +def init_celery_manager(*, plugins: list[EntryPointPlugin]): + """ + Create a new Celery app and initialize it with the given plugins. + """ + from .. import manager + + _log = logging.getLogger(__name__) + + if not manager.settings.visyn_core.celery: + _log.warning("No Celery settings found in configuration, skipping Celery initialization") + return None + + manager.celery = Celery("visyn", **manager.settings.visyn_core.celery) + + _log.info("Initializing celery app") + + # Discover tasks from all plugins, i.e. visyn_core.tasks, visyn_plugin.tasks + manager.celery.autodiscover_tasks([p.id for p in plugins]) + + return manager.celery + + +def create_celery_worker_app(*, workspace_config: dict | None = None): + """ + Create a new Celery app in standalone mode, i.e. without a FastAPI instance. + """ + from .. import manager + from ..server.visyn_server import create_visyn_server + + # Create the whole FastAPI instance to ensure that the configuration and plugins are loaded, extension points are registered, database migrations are executed, ... + create_visyn_server(workspace_config=workspace_config) + + _log = logging.getLogger(__name__) + _log.info("Starting celery worker") + + return init_celery_manager(plugins=manager.registry.plugins) diff --git a/visyn_core/celery/main.py b/visyn_core/celery/main.py new file mode 100644 index 000000000..46f7a2215 --- /dev/null +++ b/visyn_core/celery/main.py @@ -0,0 +1,3 @@ +from .app import create_celery_worker_app + +celery_app = create_celery_worker_app() diff --git a/visyn_core/dev_app.py b/visyn_core/dev_app.py index 6919b2217..f896b8cec 100644 --- a/visyn_core/dev_app.py +++ b/visyn_core/dev_app.py @@ -1,10 +1,8 @@ -import os +import pathlib import sys from .server.visyn_server import create_visyn_server # This app is either started via the uvicorn runner in __main__.py, # or as module to execute commands via `python -m .dev_app db-migration exec ...` -app = create_visyn_server( - start_cmd=" ".join(sys.argv[1:]), workspace_config={"_env_file": os.path.join(os.path.dirname(os.path.realpath(__file__)), ".env")} -) +app = create_visyn_server(start_cmd=" ".join(sys.argv[1:]), workspace_config={"_env_file": pathlib.Path(__file__).parent / ".env"}) diff --git a/visyn_core/dev_celery.py b/visyn_core/dev_celery.py new file mode 100644 index 000000000..b41490ff7 --- /dev/null +++ b/visyn_core/dev_celery.py @@ -0,0 +1,5 @@ +import pathlib + +from .celery.app import create_celery_worker_app + +celery_app = create_celery_worker_app(workspace_config={"_env_file": pathlib.Path(__file__).parent / ".env"}) diff --git a/visyn_core/manager.py b/visyn_core/manager.py index 926d733e4..e81ce4479 100644 --- a/visyn_core/manager.py +++ b/visyn_core/manager.py @@ -5,6 +5,10 @@ # Additionally, we have to wrap our types/classes in '', such that they are evaluated lazily. # See https://peps.python.org/pep-0563/#runtime-annotation-resolution-and-type-checking for more information. if TYPE_CHECKING: + # Monkey-patch Celery to support proper type hints: https://pypi.org/project/celery-types/ + from celery.app.task import Task + + from .celery.app import Celery from .dbmanager import DBManager from .dbmigration.manager import DBMigrationManager from .id_mapping.manager import MappingManager @@ -12,6 +16,8 @@ from .security.manager import SecurityManager from .settings.model import GlobalSettings + Task.__class_getitem__ = classmethod(lambda cls, *args, **kwargs: cls) # type: ignore[attr-defined] + db: "DBManager" = None # type: ignore db_migration: "DBMigrationManager" = None # type: ignore @@ -19,3 +25,4 @@ security: "SecurityManager" = None # type: ignore registry: "Registry" = None # type: ignore settings: "GlobalSettings" = None # type: ignore +celery: "Celery" = None # type: ignore diff --git a/visyn_core/server/utils.py b/visyn_core/server/utils.py index eff3efc34..5342e1b20 100644 --- a/visyn_core/server/utils.py +++ b/visyn_core/server/utils.py @@ -1,9 +1,12 @@ import http import logging +import logging.config import time import traceback from fastapi import HTTPException +from pydantic import create_model +from pydantic.utils import deep_update from .. import manager @@ -17,6 +20,44 @@ _log = logging.getLogger(__name__) +def init_settings_manager(workspace_config: dict | None = None): + from ..settings.model import GlobalSettings + from ..settings.utils import load_workspace_config + + # Load the workspace config.json and initialize the global settings + workspace_config = workspace_config if isinstance(workspace_config, dict) else load_workspace_config() + # Temporary backwards compatibility: if no visyn_core config entry is found, copy the one from tdp_core. + if "visyn_core" not in workspace_config and "tdp_core" in workspace_config: + logging.warn('You are still using "tdp_core" config entries instead of "visyn_core" entries. Please migrate as soon as possible!') + workspace_config["visyn_core"] = workspace_config["tdp_core"] + + manager.settings = GlobalSettings(**workspace_config) + + # Initialize the logging + logging_config = manager.settings.visyn_core.logging + + if manager.settings.visyn_core.log_level: + try: + logging_config["root"]["level"] = manager.settings.visyn_core.log_level + except KeyError: + logging.warn("You have set visyn_core.log_level, but no root logger is defined in visyn_core.logging") + + logging.config.dictConfig(logging_config) + + # Load the initial plugins + from ..plugin.parser import get_config_from_plugins, load_all_plugins + + plugins = load_all_plugins() + + # With all the plugins, load the corresponding configuration files and create a new model based on the global settings, with all plugin models as sub-models + [plugin_config_files, plugin_settings_models] = get_config_from_plugins(plugins) + visyn_server_settings = create_model("VisynServerSettings", __base__=GlobalSettings, **plugin_settings_models) # type: ignore + # Patch the global settings by instantiating the new settings model with the global config, all config.json(s), and pydantic models + manager.settings: GlobalSettings = visyn_server_settings(**deep_update(*plugin_config_files, workspace_config)) # type: ignore + + return plugins + + init_legacy_app = None try: # Flask is an optional dependency and must be added to the requirements for legacy apps. diff --git a/visyn_core/server/visyn_server.py b/visyn_core/server/visyn_server.py index 593370edc..6e21fda2a 100644 --- a/visyn_core/server/visyn_server.py +++ b/visyn_core/server/visyn_server.py @@ -4,21 +4,13 @@ import threading from typing import Any -from fastapi import FastAPI -from fastapi.middleware.wsgi import WSGIMiddleware -from pydantic import create_model -from pydantic.utils import deep_update -from starlette_context.middleware import RawContextMiddleware - from ..settings.constants import default_logging_dict # Initialize the logging very early as otherwise the already created loggers receive a default loglevel WARN, leading to logs not being shown. logging.config.dictConfig(default_logging_dict) -def create_visyn_server( - *, fast_api_args: dict[str, Any] | None = None, start_cmd: str | None = None, workspace_config: dict | None = None -) -> FastAPI: +def create_visyn_server(*, fast_api_args: dict[str, Any] | None = None, start_cmd: str | None = None, workspace_config: dict | None = None): """ Create a new FastAPI instance while ensuring that the configuration and plugins are loaded, extension points are registered, database migrations are executed, ... @@ -30,48 +22,14 @@ def create_visyn_server( if fast_api_args is None: fast_api_args = {} from .. import manager - from ..settings.model import GlobalSettings - from ..settings.utils import load_workspace_config - - # Load the workspace config.json and initialize the global settings - workspace_config = workspace_config if isinstance(workspace_config, dict) else load_workspace_config() - # Temporary backwards compatibility: if no visyn_core config entry is found, copy the one from tdp_core. - if "visyn_core" not in workspace_config and "tdp_core" in workspace_config: - logging.warn('You are still using "tdp_core" config entries instead of "visyn_core" entries. Please migrate as soon as possible!') - workspace_config["visyn_core"] = workspace_config["tdp_core"] - - manager.settings = GlobalSettings(**workspace_config) - - # Initialize the logging - logging_config = manager.settings.visyn_core.logging - - if manager.settings.visyn_core.log_level: - try: - logging_config["root"]["level"] = manager.settings.visyn_core.log_level - except KeyError: - logging.warn("You have set visyn_core.log_level, but no root logger is defined in visyn_core.logging") - - logging.config.dictConfig(logging_config) + from .utils import init_settings_manager - # Filter out the metrics endpoint from the access log - class EndpointFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - return "GET /api/metrics" and "GET /api/health" and "GET /metrics" and "GET /health" not in record.getMessage() - - logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) + plugins = init_settings_manager(workspace_config=workspace_config) _log = logging.getLogger(__name__) - _log.info(f"Starting visyn_server in {manager.settings.env} mode") + _log.info(f"Starting in {manager.settings.env} mode") - # Load the initial plugins - from ..plugin.parser import get_config_from_plugins, load_all_plugins - - plugins = load_all_plugins() - # With all the plugins, load the corresponding configuration files and create a new model based on the global settings, with all plugin models as sub-models - [plugin_config_files, plugin_settings_models] = get_config_from_plugins(plugins) - visyn_server_settings = create_model("VisynServerSettings", __base__=GlobalSettings, **plugin_settings_models) # type: ignore - # Patch the global settings by instantiating the new settings model with the global config, all config.json(s), and pydantic models - manager.settings = visyn_server_settings(**deep_update(*plugin_config_files, workspace_config)) + from fastapi import FastAPI app = FastAPI( debug=manager.settings.is_development_mode, @@ -84,6 +42,13 @@ def filter(self, record: logging.LogRecord) -> bool: **fast_api_args, ) + # Filter out the metrics endpoint from the access log + class EndpointFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return "GET /api/metrics" and "GET /api/health" and "GET /metrics" and "GET /health" not in record.getMessage() + + logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) + from ..middleware.exception_handler_middleware import ExceptionHandlerMiddleware # TODO: For some reason, a @app.exception_handler(Exception) is not called here. We use a middleware instead. @@ -113,6 +78,10 @@ def filter(self, record: logging.LogRecord) -> bool: ) # Initialize global managers. + from ..celery.app import init_celery_manager + + app.state.celery = init_celery_manager(plugins=plugins) + from ..plugin.registry import Registry app.state.registry = manager.registry = Registry() @@ -149,6 +118,8 @@ def filter(self, record: logging.LogRecord) -> bool: sys.exit(0) # Load all namespace plugins as WSGIMiddleware plugins + from fastapi.middleware.wsgi import WSGIMiddleware + from .utils import init_legacy_app, load_after_server_started_hooks namespace_plugins = manager.registry.list("namespace") @@ -203,6 +174,8 @@ async def change_anyio_total_tokens(): init_client_config(app) + from starlette_context.middleware import RawContextMiddleware + from ..middleware.request_context_plugin import RequestContextPlugin # Use starlette-context to store the current request globally, i.e. accessible via context['request'] diff --git a/visyn_core/settings/model.py b/visyn_core/settings/model.py index 1fee6ef36..b5e83a9be 100644 --- a/visyn_core/settings/model.py +++ b/visyn_core/settings/model.py @@ -180,6 +180,10 @@ class VisynCoreSettings(BaseModel): """ sentry: SentrySettings = SentrySettings() """ + Settings for celery. If not set, celery will not be initialized. + """ + celery: dict[str, Any] | None = None + """ Settings for Sentry. DSN will be shared via the client config. """ cypress: bool = False diff --git a/visyn_core/tests/fixtures/app.py b/visyn_core/tests/fixtures/app.py index 4ff11e186..be808a9db 100644 --- a/visyn_core/tests/fixtures/app.py +++ b/visyn_core/tests/fixtures/app.py @@ -23,7 +23,13 @@ def mock_current_user_in_manager(self): @pytest.fixture() def workspace_config() -> dict: return { - "visyn_core": {"enabled_plugins": ["visyn_core"]}, + "visyn_core": { + "enabled_plugins": ["visyn_core"], + "celery": { + "broker": "memory://localhost/", + "task_always_eager": True, + }, + }, } diff --git a/visyn_core/tests/test_celery.py b/visyn_core/tests/test_celery.py new file mode 100644 index 000000000..910a8e094 --- /dev/null +++ b/visyn_core/tests/test_celery.py @@ -0,0 +1,13 @@ +from fastapi.testclient import TestClient + +from visyn_core import manager + + +def test_celery_worker(client: TestClient): + + @manager.celery.task + def add(x, y): + return x + y + + result = add.delay(2, 4) + assert result.get(timeout=1) == 6