diff --git a/internal/abstractions/endpoint/endpoint.go b/internal/abstractions/endpoint/endpoint.go index 78e3c31d0..b6b0e62fa 100644 --- a/internal/abstractions/endpoint/endpoint.go +++ b/internal/abstractions/endpoint/endpoint.go @@ -46,10 +46,10 @@ type HttpEndpointService struct { var ( endpointContainerPrefix string = "endpoint" endpointRoutePrefix string = "/endpoint" - endpointRingBufferSize int = 10000000 endpointRequestTimeoutS int = 180 endpointServeContainerTimeout time.Duration = 120 * time.Second endpointRequestHeartbeatInterval time.Duration = 30 * time.Second + endpointMinRequestBufferSize int = 10 ) type EndpointServiceOpts struct { @@ -99,7 +99,7 @@ func NewEndpointService( go es.handleContainerEvents() - es.taskDispatcher.Register(string(types.ExecutorTaskQueue), es.endpointTaskFactory) + es.taskDispatcher.Register(string(types.ExecutorEndpoint), es.endpointTaskFactory) // Register HTTP routes authMiddleware := auth.AuthMiddleware(es.backendRepo) @@ -255,7 +255,7 @@ func (es *HttpEndpointService) forwardRequest( instance, _ = es.endpointInstances.Get(stubId) } - task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorTaskQueue), instance.workspace.Name, stubId, payload, types.TaskPolicy{ + task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), instance.workspace.Name, stubId, payload, types.TaskPolicy{ MaxRetries: 0, Timeout: endpointRequestTimeoutS, Expires: time.Now().Add(time.Duration(endpointRequestTimeoutS) * time.Second), @@ -292,6 +292,11 @@ func (es *HttpEndpointService) createEndpointInstance(stubId string, options ... ctx, cancelFunc := context.WithCancel(es.ctx) lock := common.NewRedisLock(es.rdb) + requestBufferSize := int(stubConfig.MaxPendingTasks) + if requestBufferSize < endpointMinRequestBufferSize { + requestBufferSize = endpointMinRequestBufferSize + } + // Create endpoint instance & override any default options instance := &endpointInstance{ ctx: ctx, @@ -310,7 +315,7 @@ func (es *HttpEndpointService) createEndpointInstance(stubId string, options ... containers: make(map[string]bool), scaleEventChan: make(chan int, 1), rdb: es.rdb, - buffer: NewRequestBuffer(ctx, es.rdb, &stub.Workspace, stubId, endpointRingBufferSize, es.containerRepo, stubConfig), + buffer: NewRequestBuffer(ctx, es.rdb, &stub.Workspace, stubId, requestBufferSize, es.containerRepo, stubConfig), } for _, o := range options { o(instance)