diff --git a/.env b/.env index 3b7daa4..c1f627c 100644 --- a/.env +++ b/.env @@ -1,22 +1,40 @@ -# 示例见 .env.example +# 示例与解释见 .env.example -# LOG_CHAT: 是否开启日志 +# `LOG_CHAT`: 是否记录日志 LOG_CHAT=false -# OPENAI_BASE_URL: 转发openai风格的任何服务地址,允许指定多个, 以逗号隔开。 +# `OPENAI_BASE_URL`: 转发openai风格的任何服务地址,允许指定多个, 以逗号隔开。 # 如果指定超过一个,则任何OPENAI_ROUTE_PREFIX/EXTRA_ROUTE_PREFIX都不能为根路由/ OPENAI_BASE_URL=https://api.openai.com -# OPENAI_ROUTE_PREFIX: 可指定所有openai风格(为记录日志)服务的转发路由前缀 -OPENAI_ROUTE_PREFIX=/ +# `OPENAI_ROUTE_PREFIX`: 可指定所有openai风格(为记录日志)服务的转发路由前缀 +OPENAI_ROUTE_PREFIX= OPENAI_API_KEY= FORWARD_KEY= -# EXTRA_BASE_URL: 可指定任意服务转发 +# `EXTRA_BASE_URL`: 可指定任意服务转发 EXTRA_BASE_URL= -# EXTRA_ROUTE_PREFIX: 与 EXTRA_BASE_URL 匹配的路由前缀 +# `EXTRA_ROUTE_PREFIX`: 与 EXTRA_BASE_URL 匹配的路由前缀 EXTRA_ROUTE_PREFIX= +# `RATE_LIMIT`: i.e. RPM 对指定路由的请求速率限制 +# format: {route: ratelimit-string} +# ratelimit-string format [count] [per|/] [n (optional)] [second|minute|hour|day|month|year] :ref:`ratelimit-string`: https://limits.readthedocs.io/en/stable/quickstart.html#rate-limit-string-notation +RATE_LIMIT='{ +"/healthz": "1000/2minutes", +"/v1/chat/completions": "300/minute" +}' + +#`RATE_LIMIT_STRATEGY` Options: (fixed-window, fixed-window-elastic-expiry, moving-window) ref: https://limits.readthedocs.io/en/latest/strategies.html +# `fixed-window`: most memory efficient strategy; `moving-window`:most effective for preventing bursts but higher memory cost. +RATE_LIMIT_STRATEGY=moving-window + +# `GLOBAL_RATE_LIMIT`: 所有`RATE_LIMIT`没有指定的路由. 不填默认无限制 +GLOBAL_RATE_LIMIT= + +# TPM: 返回的token速率限制 +TOKEN_RATE_LIMIT=50/second + # 设定时区 TZ=Asia/Shanghai \ No newline at end of file diff --git a/.env.example b/.env.example index 06c0a40..b86908d 100644 --- a/.env.example +++ b/.env.example @@ -1,29 +1,45 @@ # LOG_CHAT: 是否开启日志 LOG_CHAT=true - # OPENAI_BASE_URL: 转发openai风格的任何服务地址,允许指定多个, 以逗号隔开。 # 如果指定超过一个,则任何OPENAI_ROUTE_PREFIX/EXTRA_ROUTE_PREFIX都不能为根路由/ -OPENAI_BASE_URL=https://api.openai.com, http:localhost:8080 +OPENAI_BASE_URL='https://api.openai.com, http:localhost:8080' # OPENAI_ROUTE_PREFIX: 可指定所有openai风格(为记录日志)服务的转发路由前缀 -OPENAI_ROUTE_PREFIX=/openai, /localai +OPENAI_ROUTE_PREFIX='/openai, /localai' # OPENAI_API_KEY:允许输入多个api key, 以逗号隔开, 形成轮询池 -OPENAI_API_KEY=sk-xxx1,sk-xxx2,sk-xxx3 +OPENAI_API_KEY='sk-xxx1, sk-xxx2, sk-xxx3' # FORWARD_KEY: 当前面的OPENAI_API_KEY被设置,就可以设置这里的FORWARD_KEY,客户端调用时可以使用FORWARD_KEY作为api key FORWARD_KEY=fk-xxx1 - # EXTRA_BASE_URL: 可指定任意服务转发 -EXTRA_BASE_URL=http://localhost:8882, http://localhost:8881 +EXTRA_BASE_URL='http://localhost:8882, http://localhost:8881' # EXTRA_ROUTE_PREFIX: 与 EXTRA_BASE_URL 匹配的路由前缀 -EXTRA_ROUTE_PREFIX=/tts,/translate +EXTRA_ROUTE_PREFIX='/tts, /translate' + +# RATE LIMIT: 指定路由的请求速率限制(不区分客户) +# format: {route: ratelimit-string} +# ratelimit-string format [count] [per|/] [n (optional)] [second|minute|hour|day|month|year] :ref:`ratelimit-string`: https://limits.readthedocs.io/en/stable/quickstart.html#rate-limit-string-notation +RATE_LIMIT='{ +"/healthz": "50/3minutes", +"/openai/v1/chat/completions": "1/10seconds", +"/localai/v1/chat/completions": "2/second"}' -# PROXY 配置代理 +# `GLOBAL_RATE_LIMIT`: 所有`RATE_LIMIT`没有指定的路由. 不填默认无限制 +GLOBAL_RATE_LIMIT=2/5seconds + +#`RATE_LIMIT_STRATEGY` Options: (fixed-window, fixed-window-elastic-expiry, moving-window) ref: https://limits.readthedocs.io/en/latest/strategies.html +# `fixed-window`: most memory efficient strategy; `moving-window`:most effective for preventing bursts but higher memory cost. +RATE_LIMIT_STRATEGY=fixed-window + +# `PROXY` http代理 PROXY=http://localhost:7890 +# `TOKEN_RATE_LIMIT` 对每一份流式返回的token速率限制 (注:这里的token并不严格等于gpt中定义的token,而是SSE的chunk) +TOKEN_RATE_LIMIT=16/second + # 设定时区 TZ=Asia/Shanghai \ No newline at end of file diff --git a/.github/images/separators/aqua.png b/.github/images/separators/aqua.png new file mode 100644 index 0000000..3dc6b66 Binary files /dev/null and b/.github/images/separators/aqua.png differ diff --git a/.github/images/separators/aqua.webp b/.github/images/separators/aqua.webp new file mode 100644 index 0000000..1039d5d Binary files /dev/null and b/.github/images/separators/aqua.webp differ diff --git a/README.md b/README.md index 8ca6857..1e6f958 100644 --- a/README.md +++ b/README.md @@ -51,15 +51,17 @@ 本项目用于解决一些地区无法直接访问OpenAI的问题,将该服务部署在可以正常访问OpenAI API的(云)服务器上, 通过该服务转发OpenAI的请求。即搭建反向代理服务; 允许输入多个OpenAI API-KEY 组成轮询池; 可自定义二次分发api key. ---- + + + + + 由本项目搭建的长期代理地址: > https://api.openai-forward.com -> https://cloudflare.worker.openai-forward.com +> https://render.openai-forward.com > https://cloudflare.page.openai-forward.com > https://vercel.openai-forward.com -> https://render.openai-forward.com -> https://railway.openai-forward.com ## 功能 diff --git a/deploy.md b/deploy.md index 6163427..a540973 100644 --- a/deploy.md +++ b/deploy.md @@ -8,10 +8,10 @@ [pip部署](#pip部署) | [docker部署](#docker部署) | -[railway一键部署](#railway-一键部署) | [render一键部署](#render-一键部署) | -[Vercel一键部署](#Vercel-一键部署) | +[railway一键部署](#railway-一键部署) | [cloudflare部署](#cloudflare-部署) | +[Vercel一键部署](#Vercel-一键部署) @@ -33,8 +33,8 @@ openai-forward run # 或者使用别名 aifd run ### 服务调用 -使用方式只需将`https://api.openai.com` 替换为服务所在端口`http://{ip}:{port}` 就可以了。 -比如 +使用方式只需将`https://api.openai.com` 替换为服务所在端口`http://{ip}:{port}` +如 ```bash # 默认 https://api.openai.com/v1/chat/completions @@ -63,7 +63,7 @@ docker run -d -p 9999:8000 beidongjiedeguang/openai-forward:latest ``` 将映射宿主机的9999端口,通过`http://{ip}:9999`访问服务。 -容器内日志路径为`/home/openai-forward/Log/`, 可以启动时将其映射出来。 +容器内日志路径为`/home/openai-forward/Log/`, 可在启动时将其映射出来。 注:同样可以在启动命令中通过-e传入环境变量OPENAI_API_KEY=sk-xxx作为默认api key 启用SSL同上. @@ -84,39 +84,42 @@ openai-forward run # 或使用别名 aifd run --- -## Railway 一键部署 -[![Deploy on Railway](https://railway.app/button.svg)](https://railway.app/template/tejCum?referralCode=U0-kXv) - -1. 点击上面部署按钮进行一键部署 - 也可先fork本仓库,再手动在操作界面导入自己的fork项目 -2. 填写环境变量,必填项`PORT` :`8000`, 可选项 如默认的OPENAI_API_KEY 等 -3. 绑定自定义域名 - -注: Railway 每月提供 $5.0和500小时执行时间的免费计划。这意味着单个免费用户每个月只能使用大约21天 - -> https://railway.openai-forward.com - ---- ## Render 一键部署 [![Deploy to Render](https://render.com/images/deploy-to-render-button.svg)](https://render.com/deploy?repo=https://github.com/beidongjiedeguang/openai-forward) -体验下来,Render应该算是所有部署中最简易的一种, 并且它生成的域名国内可以直接访问! +Render应该算是所有部署中最简易的一种, 并且它生成的域名国内可以直接访问! 1. 点击一键部署按钮 - 如果提示需要绑定卡,则可先fork本仓库 -->到Render的Dashboard上 New Web Services --> Connect 到刚刚fork到仓库 + 如果提示需要绑定卡,则可先fork本仓库 -->到Render的Dashboard上 New Web Services --> Connect 到刚刚fork到仓库 后面步骤均默认即可 2. 填写环境变量,如默认的OPENAI_API_KEY 等,也可以不填 然后等待部署完成即可。 Render的免费计划: 每月750小时免费实例时间(意味着单个实例可以不间断运行)、100G带宽流量、500分钟构建时长. 注:默认render在15分钟内没有服务请求时会自动休眠(好处是休眠后不会占用750h的免费实例时间),休眠后下一次请求会被阻塞 5~10s。 -若不希望服务15分钟自动休眠,可以使用定时脚本(如每14分钟)去请求服务进行保活。 -如果希望零停机部署可以在设置中设置`Health Check Path`为`/healthz` +如果希望服务15分钟不自动休眠,可以使用定时脚本(如每14分钟)去请求服务进行保活。保活脚本参考`scripts/keep_render_alive.py`. +如果希望零停机部署可以在设置中设置`Health Check Path`为`/healthz` + > https://render.openai-forward.com > https://openai-forward.onrender.com +--- + +## Railway 一键部署 +[![Deploy on Railway](https://railway.app/button.svg)](https://railway.app/template/tejCum?referralCode=U0-kXv) + +1. 点击上面部署按钮进行一键部署 + 也可先fork本仓库,再手动在操作界面导入自己的fork项目 +2. 填写环境变量,必填项`PORT` :`8000`, 可选项 如默认的OPENAI_API_KEY 等 +3. 绑定自定义域名 + +注: Railway 每月提供 $5.0和500小时执行时间的免费计划。这意味着单个免费用户每个月只能使用大约21天 + +> https://railway.openai-forward.com + + --- ⚠️下面两种部署方式仅提供简单的转发服务,没有任何额外功能。 @@ -142,7 +145,7 @@ Render的免费计划: 每月750小时免费实例时间(意味着单个实例 * Pages部署: fork本仓库,在[cloudflare](https://dash.cloudflare.com/)上创建应用程序时选择Pages, 然后选择连接到Git, 选择刚刚fork的仓库即可完成部署。 * Workers部署: 在[cloudflare](https://dash.cloudflare.com/)上创建应用程序时选择Workers, 部署好示例代码后,点击快速修改(quick edit)复制[_worker.js](_worker.js) 至代码编辑器即可完成服务部署。 -绑定自定义域名: cloudflare自动分配的域名国内也无法访问,所以也需要绑定自定义域名. (目前Pages部署时自动分配的域名国内还可以访问) +绑定自定义域名: cloudflare自动分配的域名国内也无法访问,所以也需要绑定自定义域名. (**目前Pages部署时自动分配的域名国内可以直接访问**) 绑定自定义域名需要将域名默认nameserver(域名服务器)绑定到cloudflare提供的nameserver,大体上过程是: ```mermaid @@ -155,7 +158,7 @@ stateDiagram-v2 去注册域名机构更改默认nameserver为cloudflare提供的nameserver --> 在cloudflare的worker/page中添加域名: 域名服务器更改验证成功 在cloudflare的worker/page中添加域名 --> 成功 ``` -这种部署方式轻便简洁,支持流式转发. 对于没有vps的用户还是十分推荐的。不过目前[_worker.js](_worker.js)这个简单脚本仅提供转发服务, 不提供额外功能。 +这种部署方式轻便简洁,支持流式转发. 对于没有vps的用户还是十分推荐的。不过目前[_worker.js](_worker.js)这个简单脚本仅提供转发服务, 不支持额外功能。 > https://cloudflare.worker.openai-forward.com > https://cloudflare.page.openai-forward.com diff --git a/openai_forward/__init__.py b/openai_forward/__init__.py index 78d7ef6..4b8f5de 100644 --- a/openai_forward/__init__.py +++ b/openai_forward/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.4.1" +__version__ = "0.5.0-alpha" from dotenv import load_dotenv diff --git a/openai_forward/__main__.py b/openai_forward/__main__.py index 531b336..40864ad 100644 --- a/openai_forward/__main__.py +++ b/openai_forward/__main__.py @@ -16,6 +16,10 @@ def run( openai_route_prefix=None, extra_base_url=None, extra_route_prefix=None, + rate_limit=None, + global_rate_limit=None, + rate_limit_strategy=None, + token_rate_limit=None, ip_whitelist=None, ip_blacklist=None, proxy=None, @@ -37,6 +41,7 @@ def run( ip_whitelist: str, None ip_blacklist: str, None """ + if log_chat: os.environ["LOG_CHAT"] = log_chat if openai_base_url: @@ -51,6 +56,17 @@ def run( os.environ["EXTRA_ROUTE_PREFIX"] = extra_route_prefix if forward_key: os.environ["FORWARD_KEY"] = forward_key + if rate_limit: + import json + + assert isinstance(rate_limit, dict) + os.environ["RATE_LIMIT"] = json.dumps(rate_limit) + if global_rate_limit: + os.environ["GLOBAL_RATE_LIMIT"] = global_rate_limit + if rate_limit_strategy: + os.environ["RATE_LIMIT_STRATEGY"] = rate_limit_strategy + if token_rate_limit: + os.environ["TOKEN_RATE_LIMIT"] = token_rate_limit if ip_whitelist: os.environ["IP_WHITELIST"] = ip_whitelist if ip_blacklist: @@ -71,12 +87,24 @@ def run( ) @staticmethod - def convert(log_folder: str = "./Log/chat", target_path: str = "./Log/chat.json"): + def convert(log_folder: str = None, target_path: str = None): """Convert log folder to jsonl file""" + from openai_forward.forwarding.settings import OPENAI_ROUTE_PREFIX from openai_forward.helper import convert_folder_to_jsonl - print(f"Convert {log_folder}/*.log to {target_path}") - convert_folder_to_jsonl(log_folder, target_path) + print(60 * '-') + if log_folder is None: + _prefix_list = [i.replace("/", "_") for i in OPENAI_ROUTE_PREFIX] + for _prefix in _prefix_list: + log_folder = f"./Log/chat/{_prefix}" + target_path = f"./Log/chat{_prefix}.json" + print(f"Convert {log_folder}/*.log to {target_path}") + convert_folder_to_jsonl(log_folder, target_path) + print(60 * '-') + else: + print(f"Convert {log_folder}/*.log to {target_path}") + convert_folder_to_jsonl(log_folder, target_path) + print(60 * '-') def main(): diff --git a/openai_forward/app.py b/openai_forward/app.py index e800e67..6970a5c 100644 --- a/openai_forward/app.py +++ b/openai_forward/app.py @@ -1,8 +1,20 @@ -from fastapi import FastAPI, status +from fastapi import FastAPI, Request, status +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.errors import RateLimitExceeded from .forwarding import get_fwd_anything_objs, get_fwd_openai_style_objs +from .forwarding.settings import ( + RATE_LIMIT_STRATEGY, + dynamic_rate_limit, + get_limiter_key, +) + +limiter = Limiter(key_func=get_limiter_key, strategy=RATE_LIMIT_STRATEGY) + +app = FastAPI(title="openai_forward", version="0.5") -app = FastAPI(title="openai_forward", version="0.4") +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get( @@ -11,13 +23,15 @@ response_description="Return HTTP Status Code 200 (OK)", status_code=status.HTTP_200_OK, ) -def healthz(): +@limiter.limit(dynamic_rate_limit) +def healthz(request: Request): + print(request.scope.get("client")) return "OK" add_route = lambda obj: app.add_route( obj.ROUTE_PREFIX + "{api_path:path}", - obj.reverse_proxy, + limiter.limit(dynamic_rate_limit)(obj.reverse_proxy), methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"], ) diff --git a/openai_forward/config.py b/openai_forward/config.py index ef7e12a..cbfb040 100644 --- a/openai_forward/config.py +++ b/openai_forward/config.py @@ -1,3 +1,4 @@ +import functools import logging import os import sys @@ -9,7 +10,10 @@ from rich.table import Table -def print_startup_info(base_url, route_prefix, api_key, no_auth_mode, log_chat): +def print_startup_info(base_url, route_prefix, api_key, fwd_key, log_chat): + """ + Prints the startup information of the application. + """ try: from dotenv import load_dotenv @@ -17,25 +21,54 @@ def print_startup_info(base_url, route_prefix, api_key, no_auth_mode, log_chat): except Exception: ... route_prefix = route_prefix or "/" - api_key_info = True if len(api_key) else False - table = Table(title="", box=None, width=100) - table.add_column("base-url", justify="left", style="#df412f") - table.add_column("route-prefix", justify="center", style="green") - table.add_column("api-key-polling-pool", justify="center", style="green") - table.add_column( - "no-auth-mode", justify="center", style="red" if no_auth_mode else "green" - ) - table.add_column("Log-chat", justify="center", style="green") - table.add_row( - base_url, - route_prefix, - str(api_key_info), - str(no_auth_mode), - str(log_chat), - ) + if isinstance(api_key, str): + api_key = api_key + else: + api_key = str(True if len(api_key) else False) + if isinstance(fwd_key, str): + fwd_key = fwd_key + else: + fwd_key = True if len(fwd_key) else False + table = Table(title="", box=None, width=50) + + matrcs = { + "base url": {'value': base_url, 'style': "#df412f"}, + "route prefix": {'value': route_prefix, 'style': "green"}, + "api keys": {'value': api_key, 'style': "green"}, + "forward keys": {'value': str(fwd_key), 'style': "green" if fwd_key else "red"}, + "Log chat": {'value': str(log_chat), 'style': "green"}, + } + table.add_column("matrcs", justify='left', width=10) + table.add_column("value", justify='left') + for key, value in matrcs.items(): + table.add_row(key, value['value'], style=value['style']) + print(Panel(table, title="🤗 openai-forward is ready to serve! ", expand=False)) +def show_rate_limit_info(rate_limit: dict, strategy: str, **kwargs): + """ + Print rate limit information. + + Parameters: + rate_limit (dict): A dictionary containing rate limit information. + strategy (str): The strategy used for rate limiting. + **kwargs: Additional keyword arguments. + + Returns: + None + """ + table = Table(title="", box=None, width=50) + table.add_column("matrics") + table.add_column("value") + table.add_row("strategy", strategy, style='blue') + for key, value in rate_limit.items(): + table.add_row(key, str(value), style='green') + for key, value in kwargs.items(): + table.add_row(key, str(value), style='green') + print(Panel(table, title="⏱️ Rate Limit configuration", expand=False)) + + class InterceptHandler(logging.Handler): def emit(self, record): # Get corresponding Loguru level if it exists @@ -54,7 +87,15 @@ def emit(self, record): ) -def setting_log(save_file=False, log_name="openai_forward", multi_process=True): +def setting_log( + save_file=False, + openai_route_prefix=None, + log_name="openai_forward", + multi_process=True, +): + """ + Configures the logging settings for the application. + """ # TODO 修复时区配置 if os.environ.get("TZ") == "Asia/Shanghai": os.environ["TZ"] = "UTC-8" @@ -68,28 +109,34 @@ def setting_log(save_file=False, log_name="openai_forward", multi_process=True): config_handlers = [ {"sink": sys.stdout, "level": "DEBUG"}, - { - "sink": f"./Log/chat/chat.log", - "enqueue": multi_process, - "rotation": "50 MB", - "filter": lambda record: "chat" in record["extra"], - "format": "{message}", - }, - { - "sink": f"./Log/whisper/whisper.log", - "enqueue": multi_process, - "rotation": "30 MB", - "filter": lambda record: "whisper" in record["extra"], - "format": "{message}", - }, - { - "sink": f"./Log/extra/extra.log", - "enqueue": multi_process, - "rotation": "50 MB", - "filter": lambda record: "extra" in record["extra"], - "format": "{message}", - }, ] + + def filter_func(_prefix, _postfix, record): + chat_key = f"{_prefix}{_postfix}" + return chat_key in record["extra"] + + for prefix in openai_route_prefix or []: + _prefix = prefix.replace('/', '_') + + config_handlers.extend( + [ + { + "sink": f"./Log/chat/{_prefix}/chat.log", + "enqueue": multi_process, + "rotation": "50 MB", + "filter": functools.partial(filter_func, _prefix, "_chat"), + "format": "{message}", + }, + { + "sink": f"./Log/whisper/{_prefix}/whisper.log", + "enqueue": multi_process, + "rotation": "30 MB", + "filter": functools.partial(filter_func, _prefix, "_whisper"), + "format": "{message}", + }, + ] + ) + if save_file: config_handlers += [ { diff --git a/openai_forward/content/__init__.py b/openai_forward/content/__init__.py index d80933f..d4039d2 100644 --- a/openai_forward/content/__init__.py +++ b/openai_forward/content/__init__.py @@ -1,3 +1,2 @@ from .chat import ChatSaver -from .extra import ExtraForwardingSaver from .whisper import WhisperSaver diff --git a/openai_forward/content/chat.py b/openai_forward/content/chat.py index 3e7a9cd..cf119f1 100644 --- a/openai_forward/content/chat.py +++ b/openai_forward/content/chat.py @@ -7,12 +7,15 @@ from loguru import logger from orjson import JSONDecodeError +from ..helper import get_client_ip from .decode import parse_to_lines class ChatSaver: - def __init__(self): - self.logger = logger.bind(chat=True) + def __init__(self, route_prefix: str): + _prefix = route_prefix.replace('/', '_') + kwargs = {_prefix + "_chat": True} + self.logger = logger.bind(**kwargs) @staticmethod async def parse_payload(request: Request): @@ -23,7 +26,7 @@ async def parse_payload(request: Request): content = { "messages": [{msg["role"]: msg["content"]} for msg in msgs], "model": model, - "forwarded-for": request.headers.get("x-forwarded-for") or "", + "forwarded-for": get_client_ip(request) or "", "uid": uid, "datetime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), } diff --git a/openai_forward/content/extra.py b/openai_forward/content/extra.py deleted file mode 100644 index 980e57f..0000000 --- a/openai_forward/content/extra.py +++ /dev/null @@ -1,10 +0,0 @@ -from loguru import logger - - -class ExtraForwardingSaver: - def __init__(self): - self.logger = logger.bind(extra=True) - - def add_log(self, bytes_: bytes): - text_content = bytes_.decode("utf-8") - self.logger.debug(text_content) diff --git a/openai_forward/content/whisper.py b/openai_forward/content/whisper.py index cda6cbf..b210d89 100644 --- a/openai_forward/content/whisper.py +++ b/openai_forward/content/whisper.py @@ -2,8 +2,9 @@ class WhisperSaver: - def __init__(self): - self.logger = logger.bind(whisper=True) + def __init__(self, route_prefix: str): + _prefix = route_prefix.replace('/', '_') + self.logger = logger.bind(**{f"{_prefix}_whisper": True}) def add_log(self, bytes_: bytes): text_content = bytes_.decode("utf-8") diff --git a/openai_forward/forwarding/base.py b/openai_forward/forwarding/base.py index 97c822d..c5863ef 100644 --- a/openai_forward/forwarding/base.py +++ b/openai_forward/forwarding/base.py @@ -1,3 +1,4 @@ +import time import traceback import uuid from itertools import cycle @@ -8,7 +9,8 @@ from fastapi.responses import StreamingResponse from loguru import logger -from ..content import ChatSaver, ExtraForwardingSaver, WhisperSaver +from ..content import ChatSaver, WhisperSaver +from ..helper import async_retry, retry from .settings import * @@ -16,6 +18,7 @@ class ForwardingBase: BASE_URL = None ROUTE_PREFIX = None client: httpx.AsyncClient = None + if IP_BLACKLIST or IP_WHITELIST: validate_host = True else: @@ -23,11 +26,17 @@ class ForwardingBase: timeout = 600 - if LOG_CHAT: - extrasaver = ExtraForwardingSaver() - @staticmethod def validate_request_host(ip): + """ + Validates the request host IP address against the IP whitelist and blacklist. + + Parameters: + ip (str): The IP address to be validated. + + Raises: + HTTPException: If the IP address is not in the whitelist or if it is in the blacklist. + """ if IP_WHITELIST and ip not in IP_WHITELIST: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, @@ -39,18 +48,28 @@ def validate_request_host(ip): detail=f"Forbidden, ip={ip} in blacklist!", ) - @classmethod async def aiter_bytes( - cls, r: httpx.Response, **kwargs + self, r: httpx.Response, **kwargs ) -> AsyncGenerator[bytes, Any]: - bytes_ = b"" async for chunk in r.aiter_bytes(): - bytes_ += chunk yield chunk await r.aclose() - cls.extrasaver.add_log(bytes_) + @async_retry(max_retries=3, delay=0.5, backoff=2, exceptions=(HTTPException,)) async def try_send(self, client_config: dict, request: Request): + """ + Try to send the request. + + Args: + client_config (dict): The configuration for the client. + request (Request): The request to be sent. + + Returns: + Response: The response from the client. + + Raises: + HTTPException: If there is a connection error or any other exception occurs. + """ try: req = self.client.build_request( method=request.method, @@ -78,17 +97,34 @@ async def try_send(self, client_config: dict, request: Request): ) def prepare_client(self, request: Request): + """ + Prepares the client configuration based on the given request. + + Args: + request (Request): The request object containing the necessary information. + + Returns: + dict: The client configuration dictionary with the necessary parameters set. + The dictionary has the following keys: + - 'auth': The authorization header value. + - 'headers': The dictionary of headers. + - 'url': The URL object. + - 'url_path': The URL path. + + Raises: + AssertionError: If the `BASE_URL` or `ROUTE_PREFIX` is not set. + """ assert self.BASE_URL is not None assert self.ROUTE_PREFIX is not None if self.validate_host: ip = request.headers.get("x-forwarded-for") or "" self.validate_request_host(ip) - url_path = request.url.path + _url_path = request.url.path prefix_index = 0 if self.ROUTE_PREFIX == '/' else len(self.ROUTE_PREFIX) - route_path = url_path[prefix_index:] - url = httpx.URL(path=route_path, query=request.url.query.encode("utf-8")) + url_path = _url_path[prefix_index:] + url = httpx.URL(path=url_path, query=request.url.query.encode("utf-8")) headers = dict(request.headers) auth = headers.pop("authorization", "") content_type = headers.pop("content-type", "application/json") @@ -97,13 +133,23 @@ def prepare_client(self, request: Request): 'auth': auth, 'headers': auth_headers_dict, 'url': url, - 'url_path': route_path, + 'url_path': url_path, } return client_config async def reverse_proxy(self, request: Request): + """ + Reverse proxies the given request. + + Args: + request (Request): The request to be reverse proxied. + + Returns: + StreamingResponse: The response from the reverse proxied server, as a streaming response. + """ assert self.client is not None + client_config = self.prepare_client(request) r = await self.try_send(client_config, request) @@ -119,13 +165,24 @@ class OpenaiBase(ForwardingBase): _cycle_api_key = cycle(OPENAI_API_KEY) _no_auth_mode = OPENAI_API_KEY != [] and FWD_KEY == set() - if LOG_CHAT: - chatsaver = ChatSaver() - whispersaver = WhisperSaver() + chatsaver: ChatSaver = None + whispersaver: WhisperSaver = None def _add_result_log( self, byte_list: List[bytes], uid: str, route_path: str, request_method: str ): + """ + Adds a result log for the given byte list, uid, route path, and request method. + + Args: + byte_list (List[bytes]): The list of bytes to be processed. + uid (str): The unique identifier. + route_path (str): The route path. + request_method (str): The request method. + + Returns: + None + """ try: if LOG_CHAT and request_method == "POST": if route_path == "/v1/chat/completions": @@ -143,6 +200,24 @@ def _add_result_log( logger.warning(f"log chat (not) error:\n{traceback.format_exc()}") async def _add_payload_log(self, request: Request, url_path: str): + """ + Adds a payload log for the given request. + + Args: + request (Request): The request object. + url_path (str): The URL path of the request. + + Returns: + str: The unique identifier (UID) of the payload log, which is used to match the chat result log. + + Raises: + Suppress all errors. + + Notes: + - If `LOG_CHAT` is True and the request method is "POST", the chat payload will be logged. + - If the `url_path` is "/v1/chat/completions", the chat payload will be parsed and logged. + - If the `url_path` starts with "/v1/audio/", a new UID will be generated. + """ uid = None if LOG_CHAT and request.method == "POST": try: @@ -167,9 +242,31 @@ async def _add_payload_log(self, request: Request, url_path: str): async def aiter_bytes( self, r: httpx.Response, request: Request, route_path: str, uid: str ): + """ + Asynchronously iterates over the bytes of the response and yields each chunk. + + Args: + r (httpx.Response): The HTTP response object. + request (Request): The original request object. + route_path (str): The route path. + uid (str): The unique identifier. + + Returns: + A generator that yields each chunk of bytes from the response. + """ byte_list = [] + start_time = time.perf_counter() + idx = 0 async for chunk in r.aiter_bytes(): + idx += 1 byte_list.append(chunk) + if TOKEN_INTERVAL > 0: + current_time = time.perf_counter() + delta = current_time - start_time + sleep_time = TOKEN_INTERVAL - delta + if sleep_time > 0: + time.sleep(sleep_time) + start_time = time.perf_counter() yield chunk await r.aclose() @@ -182,6 +279,15 @@ async def aiter_bytes( logger.warning(f'uid: {uid}\n' f'{response_info}') async def reverse_proxy(self, request: Request): + """ + Reverse proxies the given requests. + + Args: + request (Request): The incoming request object. + + Returns: + StreamingResponse: The response from the reverse proxied server, as a streaming response. + """ client_config = self.prepare_client(request) url_path = client_config["url_path"] diff --git a/openai_forward/forwarding/extra.py b/openai_forward/forwarding/extra.py index c64cf66..69e5dc1 100644 --- a/openai_forward/forwarding/extra.py +++ b/openai_forward/forwarding/extra.py @@ -1,5 +1,4 @@ -from ..config import print_startup_info -from .base import LOG_CHAT, ForwardingBase +from .base import ForwardingBase class AnyForwarding(ForwardingBase): @@ -11,7 +10,6 @@ def __init__(self, base_url: str, route_prefix: str, proxy=None): self.client = httpx.AsyncClient( base_url=self.BASE_URL, proxies=proxy, http1=True, http2=False ) - print_startup_info(self.BASE_URL, self.ROUTE_PREFIX, [], "\\", LOG_CHAT) def get_fwd_anything_objs(): diff --git a/openai_forward/forwarding/openai.py b/openai_forward/forwarding/openai.py index 13aed15..3e792e2 100644 --- a/openai_forward/forwarding/openai.py +++ b/openai_forward/forwarding/openai.py @@ -1,6 +1,7 @@ -from ..config import print_startup_info -from .base import OpenaiBase -from .settings import LOG_CHAT, OPENAI_API_KEY +import time + +from .base import ChatSaver, OpenaiBase, WhisperSaver +from .settings import LOG_CHAT class OpenaiForwarding(OpenaiBase): @@ -9,16 +10,14 @@ def __init__(self, base_url: str, route_prefix: str, proxy=None): self.BASE_URL = base_url self.ROUTE_PREFIX = route_prefix + if LOG_CHAT: + self.chatsaver = ChatSaver(route_prefix) + self.whispersaver = WhisperSaver(route_prefix) self.client = httpx.AsyncClient( base_url=self.BASE_URL, proxies=proxy, http1=True, http2=False ) - print_startup_info( - self.BASE_URL, - self.ROUTE_PREFIX, - OPENAI_API_KEY, - self._no_auth_mode, - LOG_CHAT, - ) + self.token_counts = 0 + self.token_limit_dict = {'time': time.time(), 'count': 0} def get_fwd_openai_style_objs(): diff --git a/openai_forward/forwarding/settings.py b/openai_forward/forwarding/settings.py index 43bf9b2..3903324 100644 --- a/openai_forward/forwarding/settings.py +++ b/openai_forward/forwarding/settings.py @@ -1,7 +1,10 @@ import os -from ..config import setting_log -from ..helper import env2list, format_route_prefix +import limits +from fastapi import Request + +from ..config import print_startup_info, setting_log, show_rate_limit_info +from ..helper import env2dict, env2list, format_route_prefix, get_client_ip ENV_VAR_SEP = "," OPENAI_BASE_URL = env2list("OPENAI_BASE_URL", sep=ENV_VAR_SEP) or [ @@ -19,7 +22,7 @@ LOG_CHAT = os.environ.get("LOG_CHAT", "False").strip().lower() == "true" if LOG_CHAT: - setting_log(save_file=False) + setting_log(openai_route_prefix=OPENAI_ROUTE_PREFIX) IP_WHITELIST = env2list("IP_WHITELIST", sep=ENV_VAR_SEP) IP_BLACKLIST = env2list("IP_BLACKLIST", sep=ENV_VAR_SEP) @@ -29,3 +32,44 @@ PROXY = os.environ.get("PROXY", "").strip() PROXY = PROXY if PROXY else None + +GLOBAL_RATE_LIMIT = os.environ.get("GLOBAL_RATE_LIMIT", "fixed-window").strip() or None +RATE_LIMIT_STRATEGY = os.environ.get("RATE_LIMIT_STRATEGY", "").strip() or None +rate_limit_conf = env2dict('RATE_LIMIT') + + +def get_limiter_key(request: Request): + limiter_prefix = f"{request.scope.get('root_path')}{request.scope.get('path')}" + key = f"{limiter_prefix}" # -{get_client_ip(request)}" + return key + + +def dynamic_rate_limit(key: str): + for route in rate_limit_conf: + if key.startswith(route): + return rate_limit_conf[route] + return GLOBAL_RATE_LIMIT + + +TOKEN_RATE_LIMIT = os.environ.get("TOKEN_RATE_LIMIT", "").strip() +if TOKEN_RATE_LIMIT: + rate_limit_item = limits.parse(TOKEN_RATE_LIMIT) + TOKEN_INTERVAL = ( + rate_limit_item.multiples * rate_limit_item.GRANULARITY.seconds + ) / rate_limit_item.amount +else: + TOKEN_INTERVAL = 0 + + +for base_url, route_prefix in zip(OPENAI_BASE_URL, OPENAI_ROUTE_PREFIX): + print_startup_info(base_url, route_prefix, OPENAI_API_KEY, FWD_KEY, LOG_CHAT) +for base_url, route_prefix in zip(EXTRA_BASE_URL, EXTRA_ROUTE_PREFIX): + print_startup_info(base_url, route_prefix, "\\", "\\", LOG_CHAT) + +show_rate_limit_info( + rate_limit_conf, + strategy=RATE_LIMIT_STRATEGY, + global_rate_limit=GLOBAL_RATE_LIMIT if GLOBAL_RATE_LIMIT else 'inf', + token_rate_limit=TOKEN_RATE_LIMIT if TOKEN_RATE_LIMIT else 'inf', + token_interval_time=f"{TOKEN_INTERVAL:.4f}s", +) diff --git a/openai_forward/helper.py b/openai_forward/helper.py index 9c16326..b8e54eb 100644 --- a/openai_forward/helper.py +++ b/openai_forward/helper.py @@ -1,13 +1,25 @@ import ast +import asyncio import inspect import os +import time +from functools import wraps from pathlib import Path from typing import Dict, List, Union import orjson +from fastapi import Request from rich import print +def get_client_ip(request: Request): + if "x-forwarded-for" in request.headers: + return request.headers["x-forwarded-for"] + elif not request.client or not request.client.host: + return "127.0.0.1" + return request.client.host + + def relp(rel_path: Union[str, Path], parents=0, return_str=True, strict=False): currentframe = inspect.currentframe() f = currentframe.f_back @@ -34,6 +46,70 @@ def ls(_dir, *patterns, concat='extend', recursive=False): return path_list +def retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,)): + """ + Retry decorator. + + Parameters: + - max_retries: Maximum number of retries. + - delay: Initial delay in seconds. + - backoff: Multiplier for delay after each retry. + - exceptions: Exceptions to catch and retry on, as a tuple. + + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + retries = 0 + current_delay = delay + while retries < max_retries: + try: + return func(*args, **kwargs) + except exceptions as e: + retries += 1 + if retries >= max_retries: + raise + time.sleep(current_delay) + current_delay *= backoff + + return wrapper + + return decorator + + +def async_retry(max_retries=3, delay=1, backoff=2, exceptions=(Exception,)): + """ + Retry decorator for asynchronous functions. + + Parameters: + - max_retries: Maximum number of retries. + - delay: Initial delay in seconds. + - backoff: Multiplier for delay after each retry. + - exceptions: Exceptions to catch and retry on, as a tuple. + + """ + + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + retries = 0 + current_delay = delay + while retries < max_retries: + try: + return await func(*args, **kwargs) + except exceptions as e: + retries += 1 + if retries >= max_retries: + raise + await asyncio.sleep(current_delay) + current_delay *= backoff + + return wrapper + + return decorator + + def json_load(filepath: str, rel=False, mode="rb"): abs_path = relp(filepath, parents=1) if rel else filepath with open(abs_path, mode=mode) as f: @@ -51,6 +127,13 @@ def json_dump( f.write(orjson.dumps(data, option=orjson_option)) +def toml_load(filepath: str, rel=False): + import toml + + abs_path = relp(filepath, parents=1) if rel else filepath + return toml.load(abs_path) + + def str2list(s: str, sep): if s: return [i.strip() for i in s.split(sep) if i.strip()] @@ -62,6 +145,15 @@ def env2list(env_name: str, sep=","): return str2list(os.environ.get(env_name, "").strip(), sep=sep) +def env2dict(env_name: str) -> Dict: + import json + + env_str = os.environ.get(env_name, "").strip() + if not env_str: + return {} + return json.loads(env_str) + + def format_route_prefix(route_prefix: str): if route_prefix: if route_prefix.endswith("/"): diff --git a/pyproject.toml b/pyproject.toml index 747eec0..b2fe1cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "rich", "fire", "pytz", + "slowapi==0.1.8", ] dynamic = ["version"] @@ -40,9 +41,15 @@ Source = "https://github.com/beidongjiedeguang/openai-forward" [project.optional-dependencies] test = [ - "openai>=0.27.8", "pytest", - "sparrow-python" + "openai>=0.27.8", + "sparrow-python", +] + +dev = [ + "openai>=0.27.8", + "sparrow-python", + "schedule", ] [project.scripts] diff --git a/pytest.ini b/pytest.ini index 0313166..fc6533c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,4 +5,4 @@ markers = timeout: marks test timeout duration repeat: marks that test run n times addopts = --doctest-modules --doctest-glob=README.md --doctest-glob=*.py --ignore=setup.py -norecursedirs = Examples ssl +norecursedirs = Examples ssl scripts diff --git a/render.yaml b/render.yaml index 3aa3b3e..8c326de 100644 --- a/render.yaml +++ b/render.yaml @@ -1,4 +1,6 @@ services: - name: openai-forward type: web - env: docker \ No newline at end of file + env: docker + region: singapore + plan: free \ No newline at end of file diff --git a/scripts/keep_render_alive.py b/scripts/keep_render_alive.py new file mode 100644 index 0000000..96bbc16 --- /dev/null +++ b/scripts/keep_render_alive.py @@ -0,0 +1,24 @@ +import time +from urllib.parse import urljoin + +import httpx +import schedule + + +def job(url: str = "https://render.openai-forward.com"): + health_url = urljoin(url, "/healthz") + try: + r = httpx.get(health_url, timeout=5) + result = r.json() + print(result) + assert result == "OK" + except Exception as e: + print(e) + + +if __name__ == "__main__": + job() + schedule.every(10).minutes.do(job) + while True: + schedule.run_pending() + time.sleep(60)