Skip to content

Commit

Permalink
feat: Celery integration (#649)
Browse files Browse the repository at this point in the history
* feat: Celery integration

* feat: start the whole server instead of selectively init app

* Bump pytest-postgresql

* Update requirements.txt
  • Loading branch information
puehringer authored Dec 6, 2024
1 parent 6434da5 commit 8579ae7
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 53 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ ruff = ruff check $(pkg_src) setup.py --line-length 140 --select E,W,F,N,I,C,B,U
start:
python $(pkg_src)

.PHONY: celery ## Start the celery worker
celery:
celery -A $(pkg_src).dev_celery worker --loglevel=INFO --concurrency=8 -O fair -P prefork

.PHONY: all ## Perform the most common development-time rules
all: format lint test

Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '3.0'

services:
redis:
image: redis:7.4-alpine
ports:
- '6379:6379'
command: redis-server --requirepass admin
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# a2wsgi==1.6.0 # This WSIGMiddleware is not compatible with starlette_context
alembic==1.13.2
cachetools==5.3.3
celery>=5.0.0,<6.0.0
celery-types~=0.22.0
fastapi==0.112.2
json-cfg==0.4.2
openpyxl==3.1.5
Expand All @@ -18,9 +20,10 @@ psycopg==3.2.1
psycopg2==2.9.9
pydantic==1.10.17
pyjwt[crypto]==2.9.0
pytest-postgresql==6.0.1
pytest-postgresql==6.1.1
python-dateutil==2.9.0.post0
requests==2.32.3
redis>=5.0.0,<6.0.0
sentry-sdk~=2.13.0
SQLAlchemy>=1.4.40,<=1.4.53
starlette-context==0.3.6
Expand Down
44 changes: 44 additions & 0 deletions visyn_core/celery/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
import logging.config

from celery import Celery

from ..plugin.parser import EntryPointPlugin


def init_celery_manager(*, plugins: list[EntryPointPlugin]):
"""
Create a new Celery app and initialize it with the given plugins.
"""
from .. import manager

_log = logging.getLogger(__name__)

if not manager.settings.visyn_core.celery:
_log.warning("No Celery settings found in configuration, skipping Celery initialization")
return None

manager.celery = Celery("visyn", **manager.settings.visyn_core.celery)

_log.info("Initializing celery app")

# Discover tasks from all plugins, i.e. visyn_core.tasks, visyn_plugin.tasks
manager.celery.autodiscover_tasks([p.id for p in plugins])

return manager.celery


def create_celery_worker_app(*, workspace_config: dict | None = None):
"""
Create a new Celery app in standalone mode, i.e. without a FastAPI instance.
"""
from .. import manager
from ..server.visyn_server import create_visyn_server

# Create the whole FastAPI instance to ensure that the configuration and plugins are loaded, extension points are registered, database migrations are executed, ...
create_visyn_server(workspace_config=workspace_config)

_log = logging.getLogger(__name__)
_log.info("Starting celery worker")

return init_celery_manager(plugins=manager.registry.plugins)
3 changes: 3 additions & 0 deletions visyn_core/celery/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .app import create_celery_worker_app

celery_app = create_celery_worker_app()
6 changes: 2 additions & 4 deletions visyn_core/dev_app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import os
import pathlib
import sys

from .server.visyn_server import create_visyn_server

# This app is either started via the uvicorn runner in __main__.py,
# or as module to execute commands via `python -m <app>.dev_app db-migration exec ...`
app = create_visyn_server(
start_cmd=" ".join(sys.argv[1:]), workspace_config={"_env_file": os.path.join(os.path.dirname(os.path.realpath(__file__)), ".env")}
)
app = create_visyn_server(start_cmd=" ".join(sys.argv[1:]), workspace_config={"_env_file": pathlib.Path(__file__).parent / ".env"})
5 changes: 5 additions & 0 deletions visyn_core/dev_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import pathlib

from .celery.app import create_celery_worker_app

celery_app = create_celery_worker_app(workspace_config={"_env_file": pathlib.Path(__file__).parent / ".env"})
7 changes: 7 additions & 0 deletions visyn_core/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
# Additionally, we have to wrap our types/classes in '', such that they are evaluated lazily.
# See https://peps.python.org/pep-0563/#runtime-annotation-resolution-and-type-checking for more information.
if TYPE_CHECKING:
# Monkey-patch Celery to support proper type hints: https://pypi.org/project/celery-types/
from celery.app.task import Task

from .celery.app import Celery
from .dbmanager import DBManager
from .dbmigration.manager import DBMigrationManager
from .id_mapping.manager import MappingManager
from .plugin.registry import Registry
from .security.manager import SecurityManager
from .settings.model import GlobalSettings

Task.__class_getitem__ = classmethod(lambda cls, *args, **kwargs: cls) # type: ignore[attr-defined]


db: "DBManager" = None # type: ignore
db_migration: "DBMigrationManager" = None # type: ignore
id_mapping: "MappingManager" = None # type: ignore
security: "SecurityManager" = None # type: ignore
registry: "Registry" = None # type: ignore
settings: "GlobalSettings" = None # type: ignore
celery: "Celery" = None # type: ignore
41 changes: 41 additions & 0 deletions visyn_core/server/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import http
import logging
import logging.config
import time
import traceback

from fastapi import HTTPException
from pydantic import create_model
from pydantic.utils import deep_update

from .. import manager

Expand All @@ -17,6 +20,44 @@
_log = logging.getLogger(__name__)


def init_settings_manager(workspace_config: dict | None = None):
from ..settings.model import GlobalSettings
from ..settings.utils import load_workspace_config

# Load the workspace config.json and initialize the global settings
workspace_config = workspace_config if isinstance(workspace_config, dict) else load_workspace_config()
# Temporary backwards compatibility: if no visyn_core config entry is found, copy the one from tdp_core.
if "visyn_core" not in workspace_config and "tdp_core" in workspace_config:
logging.warn('You are still using "tdp_core" config entries instead of "visyn_core" entries. Please migrate as soon as possible!')
workspace_config["visyn_core"] = workspace_config["tdp_core"]

manager.settings = GlobalSettings(**workspace_config)

# Initialize the logging
logging_config = manager.settings.visyn_core.logging

if manager.settings.visyn_core.log_level:
try:
logging_config["root"]["level"] = manager.settings.visyn_core.log_level
except KeyError:
logging.warn("You have set visyn_core.log_level, but no root logger is defined in visyn_core.logging")

logging.config.dictConfig(logging_config)

# Load the initial plugins
from ..plugin.parser import get_config_from_plugins, load_all_plugins

plugins = load_all_plugins()

# With all the plugins, load the corresponding configuration files and create a new model based on the global settings, with all plugin models as sub-models
[plugin_config_files, plugin_settings_models] = get_config_from_plugins(plugins)
visyn_server_settings = create_model("VisynServerSettings", __base__=GlobalSettings, **plugin_settings_models) # type: ignore
# Patch the global settings by instantiating the new settings model with the global config, all config.json(s), and pydantic models
manager.settings: GlobalSettings = visyn_server_settings(**deep_update(*plugin_config_files, workspace_config)) # type: ignore

return plugins


init_legacy_app = None
try:
# Flask is an optional dependency and must be added to the requirements for legacy apps.
Expand Down
67 changes: 20 additions & 47 deletions visyn_core/server/visyn_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@
import threading
from typing import Any

from fastapi import FastAPI
from fastapi.middleware.wsgi import WSGIMiddleware
from pydantic import create_model
from pydantic.utils import deep_update
from starlette_context.middleware import RawContextMiddleware

from ..settings.constants import default_logging_dict

# Initialize the logging very early as otherwise the already created loggers receive a default loglevel WARN, leading to logs not being shown.
logging.config.dictConfig(default_logging_dict)


def create_visyn_server(
*, fast_api_args: dict[str, Any] | None = None, start_cmd: str | None = None, workspace_config: dict | None = None
) -> FastAPI:
def create_visyn_server(*, fast_api_args: dict[str, Any] | None = None, start_cmd: str | None = None, workspace_config: dict | None = None):
"""
Create a new FastAPI instance while ensuring that the configuration and plugins are loaded, extension points are registered, database migrations are executed, ...
Expand All @@ -30,48 +22,14 @@ def create_visyn_server(
if fast_api_args is None:
fast_api_args = {}
from .. import manager
from ..settings.model import GlobalSettings
from ..settings.utils import load_workspace_config

# Load the workspace config.json and initialize the global settings
workspace_config = workspace_config if isinstance(workspace_config, dict) else load_workspace_config()
# Temporary backwards compatibility: if no visyn_core config entry is found, copy the one from tdp_core.
if "visyn_core" not in workspace_config and "tdp_core" in workspace_config:
logging.warn('You are still using "tdp_core" config entries instead of "visyn_core" entries. Please migrate as soon as possible!')
workspace_config["visyn_core"] = workspace_config["tdp_core"]

manager.settings = GlobalSettings(**workspace_config)

# Initialize the logging
logging_config = manager.settings.visyn_core.logging

if manager.settings.visyn_core.log_level:
try:
logging_config["root"]["level"] = manager.settings.visyn_core.log_level
except KeyError:
logging.warn("You have set visyn_core.log_level, but no root logger is defined in visyn_core.logging")

logging.config.dictConfig(logging_config)
from .utils import init_settings_manager

# Filter out the metrics endpoint from the access log
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return "GET /api/metrics" and "GET /api/health" and "GET /metrics" and "GET /health" not in record.getMessage()

logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
plugins = init_settings_manager(workspace_config=workspace_config)

_log = logging.getLogger(__name__)
_log.info(f"Starting visyn_server in {manager.settings.env} mode")
_log.info(f"Starting in {manager.settings.env} mode")

# Load the initial plugins
from ..plugin.parser import get_config_from_plugins, load_all_plugins

plugins = load_all_plugins()
# With all the plugins, load the corresponding configuration files and create a new model based on the global settings, with all plugin models as sub-models
[plugin_config_files, plugin_settings_models] = get_config_from_plugins(plugins)
visyn_server_settings = create_model("VisynServerSettings", __base__=GlobalSettings, **plugin_settings_models) # type: ignore
# Patch the global settings by instantiating the new settings model with the global config, all config.json(s), and pydantic models
manager.settings = visyn_server_settings(**deep_update(*plugin_config_files, workspace_config))
from fastapi import FastAPI

app = FastAPI(
debug=manager.settings.is_development_mode,
Expand All @@ -84,6 +42,13 @@ def filter(self, record: logging.LogRecord) -> bool:
**fast_api_args,
)

# Filter out the metrics endpoint from the access log
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return "GET /api/metrics" and "GET /api/health" and "GET /metrics" and "GET /health" not in record.getMessage()

logging.getLogger("uvicorn.access").addFilter(EndpointFilter())

from ..middleware.exception_handler_middleware import ExceptionHandlerMiddleware

# TODO: For some reason, a @app.exception_handler(Exception) is not called here. We use a middleware instead.
Expand Down Expand Up @@ -113,6 +78,10 @@ def filter(self, record: logging.LogRecord) -> bool:
)

# Initialize global managers.
from ..celery.app import init_celery_manager

app.state.celery = init_celery_manager(plugins=plugins)

from ..plugin.registry import Registry

app.state.registry = manager.registry = Registry()
Expand Down Expand Up @@ -149,6 +118,8 @@ def filter(self, record: logging.LogRecord) -> bool:
sys.exit(0)

# Load all namespace plugins as WSGIMiddleware plugins
from fastapi.middleware.wsgi import WSGIMiddleware

from .utils import init_legacy_app, load_after_server_started_hooks

namespace_plugins = manager.registry.list("namespace")
Expand Down Expand Up @@ -203,6 +174,8 @@ async def change_anyio_total_tokens():

init_client_config(app)

from starlette_context.middleware import RawContextMiddleware

from ..middleware.request_context_plugin import RequestContextPlugin

# Use starlette-context to store the current request globally, i.e. accessible via context['request']
Expand Down
4 changes: 4 additions & 0 deletions visyn_core/settings/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class VisynCoreSettings(BaseModel):
"""
sentry: SentrySettings = SentrySettings()
"""
Settings for celery. If not set, celery will not be initialized.
"""
celery: dict[str, Any] | None = None
"""
Settings for Sentry. DSN will be shared via the client config.
"""
cypress: bool = False
Expand Down
8 changes: 7 additions & 1 deletion visyn_core/tests/fixtures/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ def mock_current_user_in_manager(self):
@pytest.fixture()
def workspace_config() -> dict:
return {
"visyn_core": {"enabled_plugins": ["visyn_core"]},
"visyn_core": {
"enabled_plugins": ["visyn_core"],
"celery": {
"broker": "memory://localhost/",
"task_always_eager": True,
},
},
}


Expand Down
13 changes: 13 additions & 0 deletions visyn_core/tests/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from fastapi.testclient import TestClient

from visyn_core import manager


def test_celery_worker(client: TestClient):

@manager.celery.task
def add(x, y):
return x + y

result = add.delay(2, 4)
assert result.get(timeout=1) == 6

0 comments on commit 8579ae7

Please sign in to comment.