Skip to content

Commit

Permalink
fix: support only uvicorn ASGI Runner (#1965)
Browse files Browse the repository at this point in the history
* fix: support only uvicorn ASGI Runner

* docs: generate API References

* remove unused "type: ignore"

---------

Co-authored-by: Sehat1137 <edox1j2n@duck.com>
Co-authored-by: Sehat1137 <Sehat1137@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 777794e commit cea4452
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 50 deletions.
3 changes: 3 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ search:
- [make_ping_asgi](api/faststream/asgi/make_ping_asgi.md)
- app
- [AsgiFastStream](api/faststream/asgi/app/AsgiFastStream.md)
- [cast_uvicorn_params](api/faststream/asgi/app/cast_uvicorn_params.md)
- factories
- [make_asyncapi_asgi](api/faststream/asgi/factories/make_asyncapi_asgi.md)
- [make_ping_asgi](api/faststream/asgi/factories/make_ping_asgi.md)
Expand Down Expand Up @@ -448,6 +449,8 @@ search:
- [run](api/faststream/cli/main/run.md)
- [version_callback](api/faststream/cli/main/version_callback.md)
- supervisors
- asgi_multiprocess
- [ASGIMultiprocess](api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md)
- basereload
- [BaseReload](api/faststream/cli/supervisors/basereload/BaseReload.md)
- multiprocess
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/asgi/app/cast_uvicorn_params.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asgi.app.cast_uvicorn_params
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess
68 changes: 27 additions & 41 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
import traceback
from contextlib import asynccontextmanager
Expand All @@ -6,7 +7,6 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -44,6 +44,14 @@
)


def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
if port := params.get("port"):
params["port"] = int(port)
if fd := params.get("fd"):
params["fd"] = int(fd)
return params


class AsgiFastStream(Application):
def __init__(
self,
Expand Down Expand Up @@ -148,50 +156,28 @@ async def run(
sleep_time: float = 0.1,
) -> None:
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()
run_extra_options = cast_uvicorn_params(run_extra_options or {})

uvicorn_config_params = set(inspect.signature(uvicorn.Config).parameters.keys())

config = uvicorn.Config(
app=self,
log_level=log_level,
**{
key: v
for key, v in run_extra_options.items()
if key in uvicorn_config_params
},
)

server = uvicorn.Server(config)
await server.serve()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
18 changes: 14 additions & 4 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from faststream import FastStream
from faststream.__about__ import __version__
from faststream._internal.application import Application
from faststream.asgi.app import AsgiFastStream
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level
Expand Down Expand Up @@ -146,17 +147,26 @@ def run(
).run()

elif workers > 1:
from faststream.cli.supervisors.multiprocess import Multiprocess

if isinstance(app_obj, FastStream):
from faststream.cli.supervisors.multiprocess import Multiprocess

Multiprocess(
target=_run,
args=(*args, logging.DEBUG),
workers=workers,
).run()
elif isinstance(app_obj, AsgiFastStream):
from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess

ASGIMultiprocess(
target=app,
args=args, # type: ignore[arg-type]
workers=workers,
).run()
else:
args[1]["workers"] = workers # type: ignore[assignment]
_run(*args)
raise typer.BadParameter(
f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
)

else:
_run_imported_app(
Expand Down
38 changes: 38 additions & 0 deletions faststream/cli/supervisors/asgi_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import inspect
from typing import Dict, Tuple

from faststream.asgi.app import cast_uvicorn_params


class ASGIMultiprocess:
def __init__(
self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int
) -> None:
_, uvicorn_kwargs, is_factory, log_level = args
self._target = target
self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {})
self._workers = workers
self._is_factory = is_factory
self._log_level = log_level

def run(self) -> None:
try:
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys())

uvicorn.run(
self._target,
factory=self._is_factory,
workers=self._workers,
log_level=self._log_level,
**{
key: v
for key, v in self._uvicorn_kwargs.items()
if key in uvicorn_params
},
)
88 changes: 83 additions & 5 deletions tests/cli/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from faststream.app import FastStream
from faststream.asgi import AsgiFastStream
from faststream.cli.main import cli as faststream_app
from faststream.cli.utils.logs import get_log_level


@pytest.mark.parametrize(
Expand Down Expand Up @@ -36,13 +37,42 @@ def test_run(runner: CliRunner, app: Application):
assert result.exit_code == 0


@pytest.mark.parametrize("workers", [1, 2, 5])
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Application):
def test_run_as_asgi_with_single_worker(runner: CliRunner, app: Application):
app.run = AsyncMock()

with patch(
"faststream.cli.utils.imports._import_obj_or_factory", return_value=(None, app)
):
result = runner.invoke(
faststream_app,
[
"run",
"faststream:app",
"--host",
"0.0.0.0",
"--port",
"8000",
"--workers",
"1",
],
)
app.run.assert_awaited_once_with(
logging.INFO, {"host": "0.0.0.0", "port": "8000"}
)
assert result.exit_code == 0


@pytest.mark.parametrize("workers", [3, 5, 7])
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_with_many_workers(
runner: CliRunner, workers: int, app: Application
):
asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess"
_import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory"

with patch(asgi_multiprocess) as asgi_runner, patch(
_import_obj_or_factory, return_value=(None, app)
):
result = runner.invoke(
faststream_app,
Expand All @@ -57,13 +87,61 @@ def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Applicat
str(workers),
],
)
extra = {"workers": workers} if workers > 1 else {}
assert result.exit_code == 0

app.run.assert_awaited_once_with(
logging.INFO, {"host": "0.0.0.0", "port": "8000", **extra}
asgi_runner.assert_called_once()
asgi_runner.assert_called_once_with(
target="faststream:app",
args=("faststream:app", {"host": "0.0.0.0", "port": "8000"}, False, 0),
workers=workers,
)
asgi_runner().run.assert_called_once()


@pytest.mark.parametrize(
"log_level",
["critical", "fatal", "error", "warning", "warn", "info", "debug", "notset"],
)
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_mp_with_log_level(
runner: CliRunner, app: Application, log_level: str
):
asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess"
_import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory"

with patch(asgi_multiprocess) as asgi_runner, patch(
_import_obj_or_factory, return_value=(None, app)
):
result = runner.invoke(
faststream_app,
[
"run",
"faststream:app",
"--host",
"0.0.0.0",
"--port",
"8000",
"--workers",
"3",
"--log-level",
log_level,
],
)
assert result.exit_code == 0

asgi_runner.assert_called_once()
asgi_runner.assert_called_once_with(
target="faststream:app",
args=(
"faststream:app",
{"host": "0.0.0.0", "port": "8000"},
False,
get_log_level(log_level),
),
workers=3,
)
asgi_runner().run.assert_called_once()


@pytest.mark.parametrize(
"app", [pytest.param(FastStream()), pytest.param(AsgiFastStream())]
Expand Down

0 comments on commit cea4452

Please sign in to comment.