Skip to content

Commit

Permalink
Simple implementation of ASGI lifespan protocol (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
mliezun authored Jun 5, 2024
1 parent 39d6a38 commit c5fa380
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 16 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/integration_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jobs:
- name: Run integration tests
working-directory: tests/${{ matrix.tool-name }}/
run: |
./caddy run --config Caddyfile 2>/dev/null &
./caddy run --config Caddyfile > caddy.log 2>&1 &
sleep 2
source venv/bin/activate
python main_test.py
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ $ curl http://localhost:9080/hello-world
Hello world!
```

> NOTE: It's possible to enable/disable [lifespan events](https://fastapi.tiangolo.com/advanced/events/) by adding the `lifespan on|off` directive to your Caddy configuration. In the above case the lifespan events are disabled because the directive was omitted.
See how to setup [Hot Reloading](#hot-reloading)

## Use docker image
Expand Down
52 changes: 52 additions & 0 deletions caddysnake.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static PyObject *asyncio_Loop;
static PyObject *asyncio_run_coroutine_threadsafe;
static PyObject *build_receive;
static PyObject *build_send;
static PyObject *build_lifespan;

char *concatenate_strings(const char *str1, const char *str2) {
size_t new_str_len = strlen(str1) + strlen(str2) + 1;
Expand Down Expand Up @@ -418,6 +419,9 @@ static struct PyModuleDef CaddysnakeModule = {
// ASGI 3.0 protocol implementation
struct AsgiApp {
PyObject *handler;
PyObject *state;

PyObject *lifespan_shutdown;
};

AsgiApp *AsgiApp_import(const char *module_name, const char *app_name,
Expand All @@ -426,6 +430,7 @@ AsgiApp *AsgiApp_import(const char *module_name, const char *app_name,
if (app == NULL) {
return NULL;
}
app->lifespan_shutdown = NULL;
PyGILState_STATE gstate = PyGILState_Ensure();

// Add venv_path into sys.path list
Expand All @@ -449,11 +454,51 @@ AsgiApp *AsgiApp_import(const char *module_name, const char *app_name,
PyGILState_Release(gstate);
return NULL;
}
app->state = PyDict_New();

PyGILState_Release(gstate);
return app;
}

uint8_t AsgiApp_lifespan_startup(AsgiApp *app) {
PyGILState_STATE gstate = PyGILState_Ensure();

PyObject *args = PyTuple_New(2);
PyTuple_SetItem(args, 0, app->handler);
PyTuple_SetItem(args, 1, app->state);
PyObject *result = PyObject_Call(build_lifespan, args, NULL);
Py_DECREF(args);

PyObject *lifespan_startup = PyTuple_GetItem(result, 0);
app->lifespan_shutdown = PyTuple_GetItem(result, 1);

result = PyObject_CallNoArgs(lifespan_startup);

uint8_t status = result == Py_True;

Py_DECREF(lifespan_startup);

PyGILState_Release(gstate);

return status;
}

uint8_t AsgiApp_lifespan_shutdown(AsgiApp *app) {
if (app->lifespan_shutdown == NULL) {
return 1;
}

PyGILState_STATE gstate = PyGILState_Ensure();

PyObject *result = PyObject_CallNoArgs(app->lifespan_shutdown);

uint8_t status = result == Py_True;

PyGILState_Release(gstate);

return status;
}

struct AsgiEvent {
PyObject_HEAD AsgiApp *app;
uint64_t request_id;
Expand Down Expand Up @@ -683,6 +728,10 @@ void AsgiApp_handle_request(AsgiApp *app, uint64_t request_id, MapKeyVal *scope,
PyDict_SetItemString(scope_dict, "server", server_tuple);
Py_DECREF(server_tuple);

PyObject *state = PyDict_Copy(app->state);
PyDict_SetItemString(scope_dict, "state", state);
Py_DECREF(state);

AsgiEvent *asgi_event =
(AsgiEvent *)PyObject_CallObject((PyObject *)&AsgiEventType, NULL);
asgi_event->app = app;
Expand Down Expand Up @@ -733,6 +782,8 @@ void AsgiApp_handle_request(AsgiApp *app, uint64_t request_id, MapKeyVal *scope,
void AsgiApp_cleanup(AsgiApp *app) {
PyGILState_STATE gstate = PyGILState_Ensure();
Py_XDECREF(app->handler);
Py_XDECREF(app->state);
Py_XDECREF(app->lifespan_shutdown);
PyGILState_Release(gstate);
free(app);
}
Expand Down Expand Up @@ -805,6 +856,7 @@ void Py_init_and_release_gil(const char *setup_py) {
asyncio_Event_ts = PyTuple_GetItem(asgi_setup_result, 0);
build_receive = PyTuple_GetItem(asgi_setup_result, 1);
build_send = PyTuple_GetItem(asgi_setup_result, 2);
build_lifespan = PyTuple_GetItem(asgi_setup_result, 3);
PyRun_SimpleString("del caddysnake_setup_asgi");
// Setup ASGI version
asgi_version = PyDict_New();
Expand Down
92 changes: 79 additions & 13 deletions caddysnake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ var caddysnake_py string

// AppServer defines the interface to interacting with a WSGI or ASGI server
type AppServer interface {
Cleanup()
Cleanup() error
HandleRequest(w http.ResponseWriter, r *http.Request) error
}

// CaddySnake module that communicates with a Python app
type CaddySnake struct {
ModuleWsgi string `json:"module_wsgi,omitempty"`
ModuleAsgi string `json:"module_asgi,omitempty"`
Lifespan string `json:"lifespan,omitempty"`
VenvPath string `json:"venv_path,omitempty"`
logger *zap.Logger
app AppServer
Expand All @@ -64,6 +65,10 @@ func (f *CaddySnake) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
if !d.Args(&f.ModuleWsgi) {
return d.Errf("expected exactly one argument for module_wsgi")
}
case "lifespan":
if !d.Args(&f.Lifespan) || (f.Lifespan != "on" && f.Lifespan != "off") {
return d.Errf("expected exactly one argument for lifespan: on|off")
}
case "venv":
if !d.Args(&f.VenvPath) {
return d.Errf("expected exactly one argument for venv")
Expand Down Expand Up @@ -95,15 +100,17 @@ func (f *CaddySnake) Provision(ctx caddy.Context) error {
if err != nil {
return err
}
if f.Lifespan != "" {
f.logger.Warn("lifespan is only used in ASGI mode", zap.String("lifespan", f.Lifespan))
}
f.logger.Info("imported wsgi app", zap.String("module_wsgi", f.ModuleWsgi), zap.String("venv_path", f.VenvPath))
f.app = w
} else if f.ModuleAsgi != "" {
a, err := NewAsgi(f.ModuleAsgi, f.VenvPath)
var err error
f.app, err = NewAsgi(f.ModuleAsgi, f.VenvPath, f.Lifespan == "on")
if err != nil {
return err
}
f.logger.Info("imported asgi app", zap.String("module_asgi", f.ModuleAsgi), zap.String("venv_path", f.VenvPath))
f.app = a
}
return nil
}
Expand All @@ -117,7 +124,7 @@ func (m *CaddySnake) Validate() error {
func (m *CaddySnake) Cleanup() error {
if m.app != nil {
m.logger.Info("cleaning up module")
m.app.Cleanup()
return m.app.Cleanup()
}
return nil
}
Expand Down Expand Up @@ -216,11 +223,21 @@ func findPythonDirectory(libPath string) (string, error) {

// Wsgi stores a reference to a Python Wsgi application
type Wsgi struct {
app *C.WsgiApp
app *C.WsgiApp
wsgi_pattern string
}

var wsgiapp_cache map[string]*Wsgi = map[string]*Wsgi{}

// NewWsgi imports a WSGI app
func NewWsgi(wsgi_pattern string, venv_path string) (*Wsgi, error) {
wsgi_lock.Lock()
defer wsgi_lock.Unlock()

if app, ok := wsgiapp_cache[wsgi_pattern]; ok {
return app, nil
}

module_app := strings.Split(wsgi_pattern, ":")
if len(module_app) != 2 {
return nil, errors.New("expected pattern $(MODULE_NAME):$(VARIABLE_NAME)")
Expand All @@ -246,16 +263,28 @@ func NewWsgi(wsgi_pattern string, venv_path string) (*Wsgi, error) {
if app == nil {
return nil, errors.New("failed to import module")
}
return &Wsgi{app}, nil

result := &Wsgi{app, wsgi_pattern}
wsgiapp_cache[wsgi_pattern] = result
return result, nil
}

// Cleanup deallocates CGO resources used by Wsgi app
func (m *Wsgi) Cleanup() {
func (m *Wsgi) Cleanup() error {
if m.app != nil {
wsgi_lock.Lock()
if _, ok := wsgiapp_cache[m.wsgi_pattern]; !ok {
wsgi_lock.Unlock()
return nil
}
delete(wsgiapp_cache, m.wsgi_pattern)
wsgi_lock.Unlock()

runtime.LockOSThread()
defer runtime.UnlockOSThread()
C.WsgiApp_cleanup(m.app)
}
return nil
}

// from golang cgi
Expand Down Expand Up @@ -414,12 +443,22 @@ func wsgi_write_response(request_id C.int64_t, status_code C.int, headers *C.Map

// Asgi stores a reference to a Python Asgi application
type Asgi struct {
app *C.AsgiApp
app *C.AsgiApp
asgi_pattern string
}

var asgiapp_cache map[string]*Asgi = map[string]*Asgi{}

// NewAsgi imports a Python ASGI app
func NewAsgi(wsgi_pattern string, venv_path string) (*Asgi, error) {
module_app := strings.Split(wsgi_pattern, ":")
func NewAsgi(asgi_pattern string, venv_path string, lifespan bool) (*Asgi, error) {
asgi_lock.Lock()
defer asgi_lock.Unlock()

if app, ok := asgiapp_cache[asgi_pattern]; ok {
return app, nil
}

module_app := strings.Split(asgi_pattern, ":")
if len(module_app) != 2 {
return nil, errors.New("expected pattern $(MODULE_NAME):$(VARIABLE_NAME)")
}
Expand All @@ -444,16 +483,43 @@ func NewAsgi(wsgi_pattern string, venv_path string) (*Asgi, error) {
if app == nil {
return nil, errors.New("failed to import module")
}
return &Asgi{app}, nil

var err error

if lifespan {
status := C.AsgiApp_lifespan_startup(app)
if uint8(status) == 0 {
err = errors.New("startup failed")
}
}

result := &Asgi{app, asgi_pattern}
asgiapp_cache[asgi_pattern] = result
return result, err
}

// Cleanup deallocates CGO resources used by Asgi app
func (m *Asgi) Cleanup() {
func (m *Asgi) Cleanup() (err error) {
if m.app != nil {
asgi_lock.Lock()
if _, ok := asgiapp_cache[m.asgi_pattern]; !ok {
asgi_lock.Unlock()
return
}
delete(asgiapp_cache, m.asgi_pattern)
asgi_lock.Unlock()

runtime.LockOSThread()
defer runtime.UnlockOSThread()

status := C.AsgiApp_lifespan_shutdown(m.app)
if uint8(status) == 0 {
err = errors.New("shutdown failure")
}

C.AsgiApp_cleanup(m.app)
}
return
}

// AsgiRequestHandler stores pointers to the request and the response writer
Expand Down
2 changes: 2 additions & 0 deletions caddysnake.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extern void wsgi_write_response(int64_t, int, MapKeyVal *, char *);
typedef struct AsgiApp AsgiApp;
typedef struct AsgiEvent AsgiEvent;
AsgiApp *AsgiApp_import(const char *, const char *, const char *);
uint8_t AsgiApp_lifespan_startup(AsgiApp *);
uint8_t AsgiApp_lifespan_shutdown(AsgiApp *);
void AsgiApp_handle_request(AsgiApp *, uint64_t, MapKeyVal *, MapKeyVal *,
const char *, int, const char *, int);
void AsgiEvent_set(AsgiEvent *, const char *);
Expand Down
71 changes: 70 additions & 1 deletion caddysnake.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,75 @@ async def send(data):

return send

def build_lifespan(app, state):
import sys
import warnings

scope = {
"type": "lifespan",
"asgi": {
"version": "3.0",
"spec_version": "2.3",
},
"state": state,
}

startup_ok = asyncio.Future(loop=loop)
shutdown_ok = asyncio.Future(loop=loop)

async def send(data):
if data.get("message") and data["type"].endswith("failed"):
print(data["message"], file=sys.stderr)

ok = data["type"].endswith(".complete")
if "startup" in data["type"]:
startup_ok.set_result(ok)
if "shutdown" in data["type"]:
shutdown_ok.set_result(ok)

if sys.version_info[1] < 10:
# Ignore loop arg deprecation warning
with warnings.catch_warnings():
warnings.simplefilter("ignore")
receive_queue = asyncio.Queue(loop=loop)
else:
# Loop is not needed on Python 3.10 and onwards
receive_queue = asyncio.Queue()

async def receive():
return await receive_queue.get()

def wrap_future(future):
async def wrapper():
return await future

return wrapper()

def lifespan_startup():
loop.call_soon_threadsafe(
receive_queue.put_nowait, {"type": "lifespan.startup"}
)
coro = wrap_future(startup_ok)
fut = asyncio.run_coroutine_threadsafe(coro, loop=loop)
return fut.result()

def lifespan_shutdown():
loop.call_soon_threadsafe(
receive_queue.put_nowait, {"type": "lifespan.shutdown"}
)
coro = wrap_future(shutdown_ok)
fut = asyncio.run_coroutine_threadsafe(coro, loop=loop)
return fut.result()

def run_lifespan():
coro = app(scope, receive, send)
fut = asyncio.run_coroutine_threadsafe(coro, loop)
fut.result()

Thread(target=run_lifespan).start()

return lifespan_startup, lifespan_shutdown

Thread(target=loop.run_forever).start()

return Event_ts, build_receive, build_send
return Event_ts, build_receive, build_send, build_lifespan
Loading

0 comments on commit c5fa380

Please sign in to comment.