diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 58d0f9c3f7..c2f9915daf 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -230,6 +230,7 @@ search: - [make_ping_asgi](api/faststream/asgi/make_ping_asgi.md) - app - [AsgiFastStream](api/faststream/asgi/app/AsgiFastStream.md) + - [cast_uvicorn_params](api/faststream/asgi/app/cast_uvicorn_params.md) - factories - [make_asyncapi_asgi](api/faststream/asgi/factories/make_asyncapi_asgi.md) - [make_ping_asgi](api/faststream/asgi/factories/make_ping_asgi.md) @@ -448,6 +449,8 @@ search: - [run](api/faststream/cli/main/run.md) - [version_callback](api/faststream/cli/main/version_callback.md) - supervisors + - asgi_multiprocess + - [ASGIMultiprocess](api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md) - basereload - [BaseReload](api/faststream/cli/supervisors/basereload/BaseReload.md) - multiprocess diff --git a/docs/docs/en/api/faststream/asgi/app/cast_uvicorn_params.md b/docs/docs/en/api/faststream/asgi/app/cast_uvicorn_params.md new file mode 100644 index 0000000000..1431e2c833 --- /dev/null +++ b/docs/docs/en/api/faststream/asgi/app/cast_uvicorn_params.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.asgi.app.cast_uvicorn_params diff --git a/docs/docs/en/api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md b/docs/docs/en/api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md new file mode 100644 index 0000000000..8424b2d5fa --- /dev/null +++ b/docs/docs/en/api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index db9eeee27a..9f662ff1b9 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -1,3 +1,4 @@ +import inspect import logging import traceback from contextlib import asynccontextmanager @@ -6,7 +7,6 @@ Any, AsyncIterator, Dict, - List, Optional, Sequence, Tuple, @@ -44,6 +44,14 @@ ) +def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]: + if port := params.get("port"): + params["port"] = int(port) + if fd := params.get("fd"): + params["fd"] = int(fd) + return params + + class AsgiFastStream(Application): def __init__( self, @@ -148,50 +156,28 @@ async def run( sleep_time: float = 0.1, ) -> None: try: - import uvicorn # noqa: F401 - from gunicorn.app.base import BaseApplication + import uvicorn except ImportError as e: raise RuntimeError( - "You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn" + "You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn" ) from e - class ASGIRunner(BaseApplication): # type: ignore[misc] - def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None: - self.options = options - self.asgi_app = asgi_app - super().__init__() - - def load_config(self) -> None: - for k, v in self.options.items(): - if k in self.cfg.settings and v is not None: - self.cfg.set(k.lower(), v) - - def load(self) -> "ASGIApp": - return self.asgi_app - - run_extra_options = run_extra_options or {} - - bindings: List[str] = [] - host = run_extra_options.pop("host", None) - port = run_extra_options.pop("port", None) - if host is not None and port is not None: - bindings.append(f"{host}:{port}") - elif host is not None: - bindings.append(f"{host}:8000") - elif port is not None: - bindings.append(f"127.0.0.1:{port}") - - bind = run_extra_options.get("bind") - if isinstance(bind, list): - bindings.extend(bind) # type: ignore - elif isinstance(bind, str): - bindings.append(bind) - - run_extra_options["bind"] = bindings or "127.0.0.1:8000" - # We use gunicorn with uvicorn workers because uvicorn don't support multiple workers - run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker" - - ASGIRunner(run_extra_options, self).run() + run_extra_options = cast_uvicorn_params(run_extra_options or {}) + + uvicorn_config_params = set(inspect.signature(uvicorn.Config).parameters.keys()) + + config = uvicorn.Config( + app=self, + log_level=log_level, + **{ + key: v + for key, v in run_extra_options.items() + if key in uvicorn_config_params + }, + ) + + server = uvicorn.Server(config) + await server.serve() @asynccontextmanager async def start_lifespan_context(self) -> AsyncIterator[None]: diff --git a/faststream/cli/main.py b/faststream/cli/main.py index d6336a5803..b149e7b8d0 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -12,6 +12,7 @@ from faststream import FastStream from faststream.__about__ import __version__ from faststream._internal.application import Application +from faststream.asgi.app import AsgiFastStream from faststream.cli.docs.app import docs_app from faststream.cli.utils.imports import import_from_string from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level @@ -146,17 +147,26 @@ def run( ).run() elif workers > 1: - from faststream.cli.supervisors.multiprocess import Multiprocess - if isinstance(app_obj, FastStream): + from faststream.cli.supervisors.multiprocess import Multiprocess + Multiprocess( target=_run, args=(*args, logging.DEBUG), workers=workers, ).run() + elif isinstance(app_obj, AsgiFastStream): + from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess + + ASGIMultiprocess( + target=app, + args=args, # type: ignore[arg-type] + workers=workers, + ).run() else: - args[1]["workers"] = workers # type: ignore[assignment] - _run(*args) + raise typer.BadParameter( + f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}." + ) else: _run_imported_app( diff --git a/faststream/cli/supervisors/asgi_multiprocess.py b/faststream/cli/supervisors/asgi_multiprocess.py new file mode 100644 index 0000000000..2696398f8a --- /dev/null +++ b/faststream/cli/supervisors/asgi_multiprocess.py @@ -0,0 +1,38 @@ +import inspect +from typing import Dict, Tuple + +from faststream.asgi.app import cast_uvicorn_params + + +class ASGIMultiprocess: + def __init__( + self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int + ) -> None: + _, uvicorn_kwargs, is_factory, log_level = args + self._target = target + self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {}) + self._workers = workers + self._is_factory = is_factory + self._log_level = log_level + + def run(self) -> None: + try: + import uvicorn + except ImportError as e: + raise RuntimeError( + "You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn" + ) from e + + uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys()) + + uvicorn.run( + self._target, + factory=self._is_factory, + workers=self._workers, + log_level=self._log_level, + **{ + key: v + for key, v in self._uvicorn_kwargs.items() + if key in uvicorn_params + }, + ) diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index b080cf7285..eb736baaeb 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -8,6 +8,7 @@ from faststream.app import FastStream from faststream.asgi import AsgiFastStream from faststream.cli.main import cli as faststream_app +from faststream.cli.utils.logs import get_log_level @pytest.mark.parametrize( @@ -36,13 +37,42 @@ def test_run(runner: CliRunner, app: Application): assert result.exit_code == 0 -@pytest.mark.parametrize("workers", [1, 2, 5]) @pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())]) -def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Application): +def test_run_as_asgi_with_single_worker(runner: CliRunner, app: Application): app.run = AsyncMock() with patch( "faststream.cli.utils.imports._import_obj_or_factory", return_value=(None, app) + ): + result = runner.invoke( + faststream_app, + [ + "run", + "faststream:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--workers", + "1", + ], + ) + app.run.assert_awaited_once_with( + logging.INFO, {"host": "0.0.0.0", "port": "8000"} + ) + assert result.exit_code == 0 + + +@pytest.mark.parametrize("workers", [3, 5, 7]) +@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())]) +def test_run_as_asgi_with_many_workers( + runner: CliRunner, workers: int, app: Application +): + asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess" + _import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory" + + with patch(asgi_multiprocess) as asgi_runner, patch( + _import_obj_or_factory, return_value=(None, app) ): result = runner.invoke( faststream_app, @@ -57,13 +87,61 @@ def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Applicat str(workers), ], ) - extra = {"workers": workers} if workers > 1 else {} + assert result.exit_code == 0 - app.run.assert_awaited_once_with( - logging.INFO, {"host": "0.0.0.0", "port": "8000", **extra} + asgi_runner.assert_called_once() + asgi_runner.assert_called_once_with( + target="faststream:app", + args=("faststream:app", {"host": "0.0.0.0", "port": "8000"}, False, 0), + workers=workers, + ) + asgi_runner().run.assert_called_once() + + +@pytest.mark.parametrize( + "log_level", + ["critical", "fatal", "error", "warning", "warn", "info", "debug", "notset"], +) +@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())]) +def test_run_as_asgi_mp_with_log_level( + runner: CliRunner, app: Application, log_level: str +): + asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess" + _import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory" + + with patch(asgi_multiprocess) as asgi_runner, patch( + _import_obj_or_factory, return_value=(None, app) + ): + result = runner.invoke( + faststream_app, + [ + "run", + "faststream:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--workers", + "3", + "--log-level", + log_level, + ], ) assert result.exit_code == 0 + asgi_runner.assert_called_once() + asgi_runner.assert_called_once_with( + target="faststream:app", + args=( + "faststream:app", + {"host": "0.0.0.0", "port": "8000"}, + False, + get_log_level(log_level), + ), + workers=3, + ) + asgi_runner().run.assert_called_once() + @pytest.mark.parametrize( "app", [pytest.param(FastStream()), pytest.param(AsgiFastStream())]