Skip to content

Commit

Permalink
Use max pending tasks for ring buffer size (#137)
Browse files Browse the repository at this point in the history
Co-authored-by: Luke Lombardi <luke@beam.cloud>
  • Loading branch information
luke-lombardi and Luke Lombardi authored Apr 16, 2024
1 parent f7759b6 commit 64249ae
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions internal/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type HttpEndpointService struct {
var (
endpointContainerPrefix string = "endpoint"
endpointRoutePrefix string = "/endpoint"
endpointRingBufferSize int = 10000000
endpointRequestTimeoutS int = 180
endpointServeContainerTimeout time.Duration = 120 * time.Second
endpointRequestHeartbeatInterval time.Duration = 30 * time.Second
endpointMinRequestBufferSize int = 10
)

type EndpointServiceOpts struct {
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewEndpointService(

go es.handleContainerEvents()

es.taskDispatcher.Register(string(types.ExecutorTaskQueue), es.endpointTaskFactory)
es.taskDispatcher.Register(string(types.ExecutorEndpoint), es.endpointTaskFactory)

// Register HTTP routes
authMiddleware := auth.AuthMiddleware(es.backendRepo)
Expand Down Expand Up @@ -255,7 +255,7 @@ func (es *HttpEndpointService) forwardRequest(
instance, _ = es.endpointInstances.Get(stubId)
}

task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorTaskQueue), instance.workspace.Name, stubId, payload, types.TaskPolicy{
task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), instance.workspace.Name, stubId, payload, types.TaskPolicy{
MaxRetries: 0,
Timeout: endpointRequestTimeoutS,
Expires: time.Now().Add(time.Duration(endpointRequestTimeoutS) * time.Second),
Expand Down Expand Up @@ -292,6 +292,11 @@ func (es *HttpEndpointService) createEndpointInstance(stubId string, options ...
ctx, cancelFunc := context.WithCancel(es.ctx)
lock := common.NewRedisLock(es.rdb)

requestBufferSize := int(stubConfig.MaxPendingTasks)
if requestBufferSize < endpointMinRequestBufferSize {
requestBufferSize = endpointMinRequestBufferSize
}

// Create endpoint instance & override any default options
instance := &endpointInstance{
ctx: ctx,
Expand All @@ -310,7 +315,7 @@ func (es *HttpEndpointService) createEndpointInstance(stubId string, options ...
containers: make(map[string]bool),
scaleEventChan: make(chan int, 1),
rdb: es.rdb,
buffer: NewRequestBuffer(ctx, es.rdb, &stub.Workspace, stubId, endpointRingBufferSize, es.containerRepo, stubConfig),
buffer: NewRequestBuffer(ctx, es.rdb, &stub.Workspace, stubId, requestBufferSize, es.containerRepo, stubConfig),
}
for _, o := range options {
o(instance)
Expand Down

0 comments on commit 64249ae

Please sign in to comment.