Skip to content

Commit

Permalink
fix: #1874 support workers for ASGI FastStream (#1936)
Browse files Browse the repository at this point in the history
* fix: #1874 support workers for ASGI FastStream

- Add support workers via gunicorn

- Add support fd & unix socket

- Add new external params `--bind`

- Separate `--bind` params from app

* test: extend test for CLI

* docs: generate API References

* fix: change creation of ASGIRunner

* lint: fix CI

---------

Co-authored-by: sehat1137 <edox1j2n@duck.com>
Co-authored-by: Sehat1137 <Sehat1137@users.noreply.github.com>
Co-authored-by: Nikita Pastukhov <diementros@yandex.ru>
  • Loading branch information
4 people authored Nov 29, 2024
1 parent c75f001 commit 90aaf57
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ search:
- [get_log_level](api/faststream/cli/utils/logs/get_log_level.md)
- [set_log_level](api/faststream/cli/utils/logs/set_log_level.md)
- parser
- [is_bind_arg](api/faststream/cli/utils/parser/is_bind_arg.md)
- [parse_cli_args](api/faststream/cli/utils/parser/parse_cli_args.md)
- [remove_prefix](api/faststream/cli/utils/parser/remove_prefix.md)
- confluent
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/utils/parser/is_bind_arg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.utils.parser.is_bind_arg
32 changes: 32 additions & 0 deletions docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ uvicorn main:app

It does nothing but launch the app itself as an **ASGI lifespan**.

!!! note
If you want to run your app using several workers, you need to use something else than `uvicorn`.
```shell
faststream run main:app --workers 4
```
```shell
gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4
```
```shell
granian --interface asgi main:app --workers 4
```
```shell
hypercorn main:app --workers 4
```


### ASGI Routes

It doesn't look very helpful, so let's add some **HTTP** endpoints.
Expand Down Expand Up @@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi(
```shell
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
```
This possibility built on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI.
We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py).

## Other ASGI Compatibility

Expand Down Expand Up @@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker)
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))
```

!!! tip
You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default

```shell
faststream run main:app --bind unix:/tmp/socket.sock
```
```shell
faststream run main:app --bind fd://2
```
You can use multiple binds if you want
```shell
faststream run main:app --bind 0.0.0.0:8000 '[::]:8000'
```
65 changes: 46 additions & 19 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -146,25 +147,51 @@ async def run(
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
) -> None:
import uvicorn

if not run_extra_options:
run_extra_options = {}
port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type]
workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type]
host = str(run_extra_options.pop("host", "localhost"))
fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type]
config = uvicorn.Config(
self,
host=host,
port=port,
log_level=log_level,
workers=workers,
fd=fd if fd != -1 else None,
**run_extra_options,
)
server = uvicorn.Server(config)
await server.serve()
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
4 changes: 2 additions & 2 deletions faststream/broker/fastapi/get_dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
lambda x: isinstance(x, FieldInfo),
p.field_info.metadata or (),
),
Field(**field_data),
Field(**field_data), # type: ignore[pydantic-field,unused-ignore]
)

else:
Expand All @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
"le": info.field_info.le,
}
)
f = Field(**field_data)
f = Field(**field_data) # type: ignore[pydantic-field,unused-ignore]

params_unique[p.name] = (
info.annotation,
Expand Down
13 changes: 12 additions & 1 deletion faststream/cli/utils/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import re
from functools import reduce
from typing import TYPE_CHECKING, Dict, List, Tuple

if TYPE_CHECKING:
from faststream.types import SettingField


def is_bind_arg(arg: str) -> bool:
"""Determine whether the received argument refers to --bind.
bind arguments are like: 0.0.0.0:8000, [::]:8000, fd://2, /tmp/socket.sock
"""
bind_regex = re.compile(r":\d+$|:/+\d|:/[a-zA-Z0-9._-]+/[a-zA-Z0-9._-]+")
return bool(bind_regex.search(arg))


def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
"""Parses command line arguments."""
extra_kwargs: Dict[str, SettingField] = {}
Expand All @@ -22,7 +33,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]:
),
"-",
]:
if ":" in item:
if ":" in item and not is_bind_arg(item):
app = item

else:
Expand Down
8 changes: 4 additions & 4 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ def __init__(
asyncapi_url = str(amqp_url)

# respect ascynapi_url argument scheme
builded_asyncapi_url = urlparse(asyncapi_url)
self.virtual_host = builded_asyncapi_url.path
built_asyncapi_url = urlparse(asyncapi_url)
self.virtual_host = built_asyncapi_url.path
if protocol is None:
protocol = builded_asyncapi_url.scheme
protocol = built_asyncapi_url.scheme

super().__init__(
url=str(amqp_url),
Expand All @@ -268,7 +268,7 @@ def __init__(
# AsyncAPI args
description=description,
asyncapi_url=asyncapi_url,
protocol=protocol or builded_asyncapi_url.scheme,
protocol=protocol or built_asyncapi_url.scheme,
protocol_version=protocol_version,
security=security,
tags=tags,
Expand Down
34 changes: 25 additions & 9 deletions tests/cli/utils/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from faststream.cli.utils.parser import parse_cli_args
from faststream.cli.utils.parser import is_bind_arg, parse_cli_args

APPLICATION = "module:app"

Expand All @@ -23,19 +23,20 @@
)
ARG6 = ("--some-key",)
ARG7 = ("--k7", "1", "2", "--k7", "3")
ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2")


@pytest.mark.parametrize(
"args",
( # noqa: PT007
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION),
(APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8),
(*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION),
),
)
def test_custom_argument_parsing(args: Tuple[str]):
Expand All @@ -49,4 +50,19 @@ def test_custom_argument_parsing(args: Tuple[str]):
"k5": ["1", "1"],
"some_key": True,
"k7": ["1", "2", "3"],
"bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"],
}


@pytest.mark.parametrize(
"args", ["0.0.0.0:8000", "[::]:8000", "fd://2", "unix:/tmp/socket.sock"]
)
def test_bind_arg(args: str):
assert is_bind_arg(args) is True


@pytest.mark.parametrize(
"args", ["main:app", "src.main:app", "examples.nats.e01_basic:app2"]
)
def test_not_bind_arg(args: str):
assert is_bind_arg(args) is False

0 comments on commit 90aaf57

Please sign in to comment.