Skip to content

Commit

Permalink
Stop endpoint serves and improve concurrency (#150)
Browse files Browse the repository at this point in the history
- Propogate timeout to endpoints
- Fix endpoint concurrency to be truly multiple process
- Change name of EndpointServe to StartEndpointServe

---------

Co-authored-by: Luke Lombardi <luke@beam.cloud>
  • Loading branch information
luke-lombardi and Luke Lombardi authored Apr 19, 2024
1 parent 52b0f93 commit 233a98d
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 151 deletions.
10 changes: 9 additions & 1 deletion internal/abstractions/endpoint/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ func endpointDeploymentScaleFunc(i *endpointInstance, sample *endpointAutoscaler
func endpointServeScaleFunc(i *endpointInstance, sample *endpointAutoscalerSample) *abstractions.AutoscalerResult {
desiredContainers := 1

if sample.TotalRequests == 0 {
timeoutKey := Keys.endpointServeLock(i.workspace.Name, i.stub.ExternalId)
exists, err := i.rdb.Exists(i.ctx, timeoutKey).Result()
if err != nil {
return &abstractions.AutoscalerResult{
ResultValid: false,
}
}

if sample.TotalRequests == 0 && exists == 0 {
desiredContainers = 0
}

Expand Down
16 changes: 8 additions & 8 deletions internal/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type request struct {
}

type container struct {
id string
address string
inFlight int
id string
address string
inFlightRequests int
}

type RequestBuffer struct {
Expand Down Expand Up @@ -168,16 +168,16 @@ func (rb *RequestBuffer) discoverContainers() {
return
}

inFlight, err := rb.requestsInFlight(cs.ContainerId)
inFlightRequests, err := rb.requestsInFlight(cs.ContainerId)
if err != nil {
return
}

if rb.checkAddressIsReady(containerAddress) {
availableContainersChan <- container{
id: cs.ContainerId,
address: containerAddress,
inFlight: inFlight,
id: cs.ContainerId,
address: containerAddress,
inFlightRequests: inFlightRequests,
}
return
}
Expand All @@ -195,7 +195,7 @@ func (rb *RequestBuffer) discoverContainers() {

// Sort availableContainers by # of in-flight requests (ascending)
sort.Slice(availableContainers, func(i, j int) bool {
return availableContainers[i].inFlight < availableContainers[j].inFlight
return availableContainers[i].inFlightRequests < availableContainers[j].inFlightRequests
})

rb.availableContainersLock.Lock()
Expand Down
106 changes: 91 additions & 15 deletions internal/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

type EndpointService interface {
pb.EndpointServiceServer
EndpointServe(in *pb.EndpointServeRequest, stream pb.EndpointService_EndpointServeServer) error
StartEndpointServe(in *pb.StartEndpointServeRequest, stream pb.EndpointService_StartEndpointServeServer) error
}

type HttpEndpointService struct {
Expand All @@ -44,12 +44,13 @@ type HttpEndpointService struct {
}

var (
endpointContainerPrefix string = "endpoint"
endpointRoutePrefix string = "/endpoint"
endpointRequestTimeoutS int = 180
endpointServeContainerTimeout time.Duration = 120 * time.Second
endpointRequestHeartbeatInterval time.Duration = 30 * time.Second
endpointMinRequestBufferSize int = 10
endpointContainerPrefix string = "endpoint"
endpointRoutePrefix string = "/endpoint"
endpointRequestTimeoutS int = 180
endpointServeContainerTimeout time.Duration = 600 * time.Second
endpointServeContainerKeepaliveInterval time.Duration = 30 * time.Second
endpointRequestHeartbeatInterval time.Duration = 30 * time.Second
endpointMinRequestBufferSize int = 10
)

type EndpointServiceOpts struct {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (es *HttpEndpointService) endpointTaskFactory(ctx context.Context, msg type
}, nil
}

func (es *HttpEndpointService) EndpointServe(in *pb.EndpointServeRequest, stream pb.EndpointService_EndpointServeServer) error {
func (es *HttpEndpointService) StartEndpointServe(in *pb.StartEndpointServeRequest, stream pb.EndpointService_StartEndpointServeServer) error {
ctx := stream.Context()
authInfo, _ := auth.AuthInfoFromContext(ctx)

Expand All @@ -132,32 +133,56 @@ func (es *HttpEndpointService) EndpointServe(in *pb.EndpointServeRequest, stream
return err
}

// Set lock (used by autoscaler to scale up the single serve container)
instance, _ := es.endpointInstances.Get(in.StubId)
err = instance.startContainers(1)
if err != nil {
return err
}
instance.rdb.SetEx(
context.Background(),
Keys.endpointServeLock(instance.workspace.Name, instance.stub.ExternalId),
1,
endpointServeContainerTimeout,
)

container, err := es.waitForContainer(ctx, in.StubId)
if err != nil {
return err
}

sendCallback := func(o common.OutputMsg) error {
if err := stream.Send(&pb.EndpointServeResponse{Output: o.Msg, Done: o.Done}); err != nil {
if err := stream.Send(&pb.StartEndpointServeResponse{Output: o.Msg, Done: o.Done}); err != nil {
return err
}

return nil
}

exitCallback := func(exitCode int32) error {
if err := stream.Send(&pb.EndpointServeResponse{Done: true, ExitCode: int32(exitCode)}); err != nil {
if err := stream.Send(&pb.StartEndpointServeResponse{Done: true, ExitCode: int32(exitCode)}); err != nil {
return err
}
return nil
}

// Keep serve container active for as long as user has their terminal open
// We can handle timeouts on the client side
go func() {
ticker := time.NewTicker(endpointServeContainerKeepaliveInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
instance.rdb.SetEx(
context.Background(),
Keys.endpointServeLock(instance.workspace.Name, instance.stub.ExternalId),
1,
endpointServeContainerTimeout,
)
}
}
}()

logStream, err := abstractions.NewLogStream(abstractions.LogStreamOpts{
SendCallback: sendCallback,
ExitCallback: exitCallback,
Expand All @@ -173,6 +198,52 @@ func (es *HttpEndpointService) EndpointServe(in *pb.EndpointServeRequest, stream
return logStream.Stream(ctx, authInfo, container.ContainerId)
}

func (es *HttpEndpointService) StopEndpointServe(ctx context.Context, in *pb.StopEndpointServeRequest) (*pb.StopEndpointServeResponse, error) {
_, exists := es.endpointInstances.Get(in.StubId)
if !exists {
err := es.createEndpointInstance(in.StubId,
withEntryPoint(func(instance *endpointInstance) []string {
return []string{instance.stubConfig.PythonVersion, "-m", "beta9.runner.serve"}
}),
withAutoscaler(func(instance *endpointInstance) *abstractions.AutoScaler[*endpointInstance, *endpointAutoscalerSample] {
return abstractions.NewAutoscaler(instance, endpointSampleFunc, endpointServeScaleFunc)
}),
)
if err != nil {
return &pb.StopEndpointServeResponse{Ok: false}, nil
}
}

instance, _ := es.endpointInstances.Get(in.StubId)

// Delete serve timeout lock
instance.rdb.Del(
context.Background(),
Keys.endpointServeLock(instance.workspace.Name, instance.stub.ExternalId),
)

// Delete all keep warms
// With serves, there should only ever be one container running, but this is the easiest way to find that container
containers, err := instance.containerRepo.GetActiveContainersByStubId(instance.stub.ExternalId)
if err != nil {
return nil, err
}

for _, container := range containers {
if container.Status == types.ContainerStatusStopping || container.Status == types.ContainerStatusPending {
continue
}

instance.rdb.Del(
context.Background(),
Keys.endpointKeepWarmLock(instance.workspace.Name, instance.stub.ExternalId, container.ContainerId),
)

}

return &pb.StopEndpointServeResponse{Ok: true}, nil
}

func (es *HttpEndpointService) waitForContainer(ctx context.Context, stubId string) (*types.ContainerState, error) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -258,7 +329,7 @@ func (es *HttpEndpointService) forwardRequest(

task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), instance.workspace.Name, stubId, payload, types.TaskPolicy{
MaxRetries: 0,
Timeout: endpointRequestTimeoutS,
Timeout: instance.stubConfig.TaskPolicy.Timeout,
Expires: time.Now().Add(time.Duration(endpointRequestTimeoutS) * time.Second),
})
if err != nil {
Expand Down Expand Up @@ -356,6 +427,7 @@ var (
endpointInstanceLock string = "endpoint:%s:%s:instance_lock"
endpointRequestsInFlight string = "endpoint:%s:%s:requests_in_flight:%s"
endpointRequestHeartbeat string = "endpoint:%s:%s:request_heartbeat:%s"
endpointServeLock string = "endpoint:%s:%s:serve_lock"
)

func (k *keys) endpointKeepWarmLock(workspaceName, stubId, containerId string) string {
Expand All @@ -373,3 +445,7 @@ func (k *keys) endpointRequestsInFlight(workspaceName, stubId, containerId strin
func (k *keys) endpointRequestHeartbeat(workspaceName, stubId, taskId string) string {
return fmt.Sprintf(endpointRequestHeartbeat, workspaceName, stubId, taskId)
}

func (k *keys) endpointServeLock(workspaceName, stubId string) string {
return fmt.Sprintf(endpointServeLock, workspaceName, stubId)
}
13 changes: 9 additions & 4 deletions internal/abstractions/endpoint/endpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ option go_package = "github.com/beam-cloud/beta9/proto";
package endpoint;

service EndpointService {
rpc EndpointServe(EndpointServeRequest)
returns (stream EndpointServeResponse) {}
rpc StartEndpointServe(StartEndpointServeRequest)
returns (stream StartEndpointServeResponse) {}
rpc StopEndpointServe(StopEndpointServeRequest)
returns (StopEndpointServeResponse) {}
}

message EndpointServeRequest { string stub_id = 1; }
message StartEndpointServeRequest { string stub_id = 1; }

message EndpointServeResponse {
message StartEndpointServeResponse {
string output = 1;
bool done = 2;
int32 exit_code = 3;
}

message StopEndpointServeRequest { string stub_id = 1; }
message StopEndpointServeResponse { bool ok = 1; }
1 change: 1 addition & 0 deletions internal/abstractions/endpoint/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (i *endpointInstance) startContainers(containersToRun int) error {
fmt.Sprintf("CONCURRENCY=%d", i.stubConfig.Concurrency),
fmt.Sprintf("KEEP_WARM_SECONDS=%d", i.stubConfig.KeepWarmSeconds),
fmt.Sprintf("PYTHON_VERSION=%s", i.stubConfig.PythonVersion),
fmt.Sprintf("TIMEOUT=%d", i.stubConfig.TaskPolicy.Timeout),
},
Cpu: i.stubConfig.Runtime.Cpu,
Memory: i.stubConfig.Runtime.Memory,
Expand Down
2 changes: 1 addition & 1 deletion internal/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
continue
}

if r, err := client.Exec(containerId, cmd); !r.Ok || err != nil {
if r, err := client.Exec(containerId, cmd); err != nil || !r.Ok {
log.Printf("failed to execute command for container <%v>: \"%v\" - %v\n", containerId, cmd, err)

errMsg := ""
Expand Down
Loading

0 comments on commit 233a98d

Please sign in to comment.