From b36b7422ebfa9ccf3a83ce38af08f64a5fac7524 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 21 Dec 2024 17:11:27 -0500 Subject: [PATCH] Fix: fix memory leak in endpoint buffer (#808) NOTE: Still need to reproduce this locally before merging --- Makefile | 5 +++- {build_tests => e2e/build_tests}/.beta9ignore | 0 {build_tests => e2e/build_tests}/app.py | 0 e2e/load_tests/throughput.js | 30 +++++++++++++++++++ pkg/abstractions/endpoint/buffer.go | 20 ++++++++----- pkg/abstractions/endpoint/endpoint.go | 4 +-- pkg/abstractions/endpoint/task.go | 4 ++- 7 files changed, 51 insertions(+), 12 deletions(-) rename {build_tests => e2e/build_tests}/.beta9ignore (100%) rename {build_tests => e2e/build_tests}/app.py (100%) create mode 100644 e2e/load_tests/throughput.js diff --git a/Makefile b/Makefile index 9ff953b46..466c45348 100644 --- a/Makefile +++ b/Makefile @@ -72,4 +72,7 @@ build-test: poetry config virtualenvs.in-project true poetry install -C sdk poetry shell -C sdk - cd build_tests && python app.py $(MODE) + cd e2e/build_tests && python app.py $(MODE) + +load-test: + cd e2e/load_tests && k6 run --env URL=$(URL) --env TOKEN=$(TOKEN) throughput.js diff --git a/build_tests/.beta9ignore b/e2e/build_tests/.beta9ignore similarity index 100% rename from build_tests/.beta9ignore rename to e2e/build_tests/.beta9ignore diff --git a/build_tests/app.py b/e2e/build_tests/app.py similarity index 100% rename from build_tests/app.py rename to e2e/build_tests/app.py diff --git a/e2e/load_tests/throughput.js b/e2e/load_tests/throughput.js new file mode 100644 index 000000000..887edc65a --- /dev/null +++ b/e2e/load_tests/throughput.js @@ -0,0 +1,30 @@ +import http from "k6/http"; +import { check, sleep } from "k6"; + +const url = __ENV.URL; +const headers = { + Connection: "keep-alive", + "Content-Type": "application/json", + Authorization: `Bearer ${__ENV.TOKEN || "default_token"}`, +}; + +const payload = JSON.stringify({ + data: "x".repeat(1024 * 1024 * 2), // 2MB payload +}); + +export let options = { + stages: [ + { duration: "30s", target: 100 }, // Ramp-up to 100 VUs in 30 seconds + { duration: "1m", target: 100 }, // Stay at 100 VUs for 1 minute + { duration: "30s", target: 0 }, // Ramp-down to 0 VUs in 30 seconds + ], +}; + +export default function () { + const res = http.post(url, payload, { headers }); + check(res, { + "status is 200": (r) => r.status === 200, + }); + + sleep(0.01); // Adjust this to control request rate +} diff --git a/pkg/abstractions/endpoint/buffer.go b/pkg/abstractions/endpoint/buffer.go index b450c8f51..1f24ee6d1 100644 --- a/pkg/abstractions/endpoint/buffer.go +++ b/pkg/abstractions/endpoint/buffer.go @@ -131,16 +131,13 @@ func (rb *RequestBuffer) handleHeartbeatEvents() { } } -func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error { +func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error { done := make(chan bool) req := &request{ - ctx: ctx, - done: done, - payload: &types.TaskPayload{ - Args: task.msg.Args, - Kwargs: task.msg.Kwargs, - }, - task: task, + ctx: ctx, + done: done, + payload: payload, + task: task, } rb.buffer.Push(req, false) @@ -176,6 +173,12 @@ func (rb *RequestBuffer) processRequests() { continue } + if req.ctx.Request().Context().Err() != nil { + rb.cancelInFlightTask(req.task) + req.payload = nil + continue + } + go rb.handleRequest(req) } } @@ -534,6 +537,7 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) { func (rb *RequestBuffer) afterRequest(req *request, containerId string) { defer func() { req.done <- true + req.payload = nil }() defer rb.releaseRequestToken(containerId, req.task.msg.TaskId) diff --git a/pkg/abstractions/endpoint/endpoint.go b/pkg/abstractions/endpoint/endpoint.go index 818b0c12e..185e2f22f 100644 --- a/pkg/abstractions/endpoint/endpoint.go +++ b/pkg/abstractions/endpoint/endpoint.go @@ -200,7 +200,7 @@ func (es *HttpEndpointService) forwardRequest( ttl = DefaultEndpointRequestTTL } - task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, payload, types.TaskPolicy{ + task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, &types.TaskPayload{}, types.TaskPolicy{ MaxRetries: 0, Timeout: instance.StubConfig.TaskPolicy.Timeout, Expires: time.Now().Add(time.Duration(ttl) * time.Second), @@ -209,7 +209,7 @@ func (es *HttpEndpointService) forwardRequest( return err } - return task.Execute(ctx.Request().Context(), ctx) + return task.Execute(ctx.Request().Context(), ctx, payload) } func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) { diff --git a/pkg/abstractions/endpoint/task.go b/pkg/abstractions/endpoint/task.go index 563f35cb3..149bd5256 100644 --- a/pkg/abstractions/endpoint/task.go +++ b/pkg/abstractions/endpoint/task.go @@ -16,6 +16,8 @@ type EndpointTask struct { func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error { var err error = nil echoCtx := options[0].(echo.Context) + payload := options[1].(*types.TaskPayload) + instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId) if err != nil { return err @@ -30,7 +32,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro return err } - return instance.buffer.ForwardRequest(echoCtx, t) + return instance.buffer.ForwardRequest(echoCtx, t, payload) } func (t *EndpointTask) Retry(ctx context.Context) error {