Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add a TTL on build containers for cleanup #819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Builder struct {
registry *common.ImageRegistry
containerRepo repository.ContainerRepository
tailscale *network.Tailscale
rdb *common.RedisClient
}

type BuildStep struct {
Expand Down Expand Up @@ -130,13 +131,14 @@ func (o *BuildOpts) addPythonRequirements() {
o.PythonPackages = append(filteredPythonPackages, baseRequirementsSlice...)
}

func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository) (*Builder, error) {
func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) {
return &Builder{
config: config,
scheduler: scheduler,
tailscale: tailscale,
registry: registry,
containerRepo: containerRepo,
rdb: rdb,
}, nil
}

Expand Down Expand Up @@ -285,6 +287,14 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return err
}

err = b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
return err
}

go b.keepAlive(ctx, containerId, ctx.Done())

conn, err := network.ConnectToHost(ctx, hostname, time.Second*30, b.tailscale, b.config.Tailscale)
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
Expand Down Expand Up @@ -439,6 +449,22 @@ func (b *Builder) Exists(ctx context.Context, imageId string) bool {
return b.registry.Exists(ctx, imageId)
}

func (b *Builder) keepAlive(ctx context.Context, containerId string, done <-chan struct{}) {
ticker := time.NewTicker(time.Duration(buildContainerKeepAliveIntervalS) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-ticker.C:
b.rdb.Set(ctx, Keys.imageBuildContainerTTL(containerId), "1", time.Duration(imageContainerTtlS)*time.Second).Err()
}
}
}

var imageNamePattern = regexp.MustCompile(
`^` + // Assert position at the start of the string
`(?:(?P<Registry>(?:(?:localhost|[\w.-]+(?:\.[\w.-]+)+)(?::\d+)?)|[\w]+:\d+)\/)?` + // Optional registry, which can be localhost, a domain with optional port, or a simple registry with port
Expand Down
81 changes: 72 additions & 9 deletions pkg/abstractions/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package image
import (
"context"
"fmt"
"strings"

"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/common"
Expand All @@ -23,9 +24,12 @@ type ImageService interface {

type RuncImageService struct {
pb.UnimplementedImageServiceServer
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
builder *Builder
config types.AppConfig
backendRepo repository.BackendRepository
rdb *common.RedisClient
keyEventChan chan common.KeyEvent
keyEventManager *common.KeyEventManager
}

type ImageServiceOpts struct {
Expand All @@ -34,8 +38,12 @@ type ImageServiceOpts struct {
BackendRepo repository.BackendRepository
Scheduler *scheduler.Scheduler
Tailscale *network.Tailscale
RedisClient *common.RedisClient
}

const buildContainerKeepAliveIntervalS int = 10
const imageContainerTtlS int = 60

func NewRuncImageService(
ctx context.Context,
opts ImageServiceOpts,
Expand All @@ -45,16 +53,30 @@ func NewRuncImageService(
return nil, err
}

builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo)
builder, err := NewBuilder(opts.Config, registry, opts.Scheduler, opts.Tailscale, opts.ContainerRepo, opts.RedisClient)
if err != nil {
return nil, err
}

return &RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
}, nil
keyEventManager, err := common.NewKeyEventManager(opts.RedisClient)
if err != nil {
return nil, err
}

is := RuncImageService{
builder: builder,
config: opts.Config,
backendRepo: opts.BackendRepo,
keyEventChan: make(chan common.KeyEvent),
keyEventManager: keyEventManager,
rdb: opts.RedisClient,
}

go is.monitorImageContainers(ctx)
go is.keyEventManager.ListenForPattern(ctx, Keys.imageBuildContainerTTL("*"), is.keyEventChan)
go is.keyEventManager.ListenForPattern(ctx, common.RedisKeys.SchedulerContainerState("*"), is.keyEventChan)

return &is, nil
}

func (is *RuncImageService) VerifyImageBuild(ctx context.Context, in *pb.VerifyImageBuildRequest) (*pb.VerifyImageBuildResponse, error) {
Expand Down Expand Up @@ -184,6 +206,35 @@ func (is *RuncImageService) retrieveBuildSecrets(ctx context.Context, secrets []
return buildSecrets, nil
}

func (is *RuncImageService) monitorImageContainers(ctx context.Context) {
for {
select {
case event := <-is.keyEventChan:
switch event.Operation {
case common.KeyOperationSet:
if strings.Contains(event.Key, common.RedisKeys.SchedulerContainerState("")) {
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), common.RedisKeys.SchedulerContainerState(""))

if is.rdb.Exists(ctx, Keys.imageBuildContainerTTL(containerId)).Val() == 0 {
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
}
case common.KeyOperationExpired:
containerId := strings.TrimPrefix(is.keyEventManager.TrimKeyspacePrefix(event.Key), Keys.imageBuildContainerTTL(""))
is.builder.scheduler.Stop(&types.StopContainerArgs{
ContainerId: containerId,
Force: true,
})
}
case <-ctx.Done():
return
}
}
}

func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
steps := make([]BuildStep, len(buildSteps))
for i, s := range buildSteps {
Expand All @@ -194,3 +245,15 @@ func convertBuildSteps(buildSteps []*pb.BuildStep) []BuildStep {
}
return steps
}

var (
imageBuildContainerTTL string = "image:build_container_ttl:%s"
)

var Keys = &keys{}

type keys struct{}

func (k *keys) imageBuildContainerTTL(containerId string) string {
return fmt.Sprintf(imageBuildContainerTTL, containerId)
}
8 changes: 4 additions & 4 deletions pkg/common/key_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func NewKeyEventManager(rdb *RedisClient) (*KeyEventManager, error) {
return &KeyEventManager{rdb: rdb}, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, error) {
pattern := fmt.Sprintf("%s*", patternPrefix)

Expand All @@ -49,10 +53,6 @@ func (kem *KeyEventManager) fetchExistingKeys(patternPrefix string) ([]string, e
return trimmedKeys, nil
}

func (kem *KeyEventManager) TrimKeyspacePrefix(key string) string {
return strings.TrimPrefix(key, keyspacePrefix)
}

func (kem *KeyEventManager) ListenForPattern(ctx context.Context, patternPrefix string, keyEventChan chan KeyEvent) error {
existingKeys, err := kem.fetchExistingKeys(patternPrefix)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (g *Gateway) registerServices() error {
Scheduler: g.Scheduler,
Tailscale: g.Tailscale,
BackendRepo: g.BackendRepo,
RedisClient: g.RedisClient,
})
if err != nil {
return err
Expand Down
Loading