Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): exposing the deleted resource ttl as a CLI param #5994

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ var (
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool //scheduler server
allowPlaintxt bool // scheduler server
autoscalingDisabled bool
kafkaConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
)

const (
Expand Down Expand Up @@ -115,6 +116,9 @@ func init() {

// Timeout for scheduler to be ready
flag.UintVar(&schedulerReadyTimeoutSeconds, "scheduler-ready-timeout-seconds", 300, "Timeout for scheduler to be ready")

// This TTL is set in badger DB
flag.UintVar(&deletedResourceTTLSeconds, "deleted-resource-ttl-seconds", 86400, "TTL for deleted experiments and pipelines (in seconds)")
}

func getNamespace() string {
Expand Down Expand Up @@ -211,11 +215,11 @@ func main() {
// Do here after other services created so eventHub events will be handled on pipeline/experiment load
// If we start earlier events will be sent but not received by services that start listening "late" to eventHub
if dbPath != "" {
err := ps.InitialiseOrRestoreDB(dbPath)
err := ps.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise pipeline db at %s", dbPath)
}
err = es.InitialiseOrRestoreDB(dbPath)
err = es.InitialiseOrRestoreDB(dbPath, deletedResourceTTLSeconds)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise experiment db at %s", dbPath)
}
Expand Down
44 changes: 29 additions & 15 deletions scheduler/pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func TestLoadModel(t *testing.T) {
},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"},
model: &pb.Model{
Meta: &pb.MetaData{Name: "model1"},
ModelSpec: &pb.ModelSpec{
Uri: "gs://model",
Requirements: []string{"sklearn"},
Expand Down Expand Up @@ -384,39 +385,54 @@ func TestUnloadModel(t *testing.T) {
{
name: "Simple",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "Multiple",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn", "xgboost"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn", "xgboost"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "TwoReplicas",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}},
{ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
{
ServerName: "server1", ReplicaIdx: 1, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: &pb.Model{Meta: &pb.MetaData{Name: "model1"}, ModelSpec: &pb.ModelSpec{Uri: "gs://model", Requirements: []string{"sklearn"}, MemoryBytes: &smallMemory}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 2}},
code: codes.OK,
modelState: store.ModelTerminated,
},
{
name: "NotExist",
req: []*pba.AgentSubscribeRequest{
{ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}}}},
{
ServerName: "server1", ReplicaIdx: 0, Shared: true, AvailableMemoryBytes: 1000,
ReplicaConfig: &pba.ReplicaConfig{InferenceSvc: "server1", InferenceHttpPort: 1, Capabilities: []string{"sklearn"}},
},
},
model: nil,
code: codes.FailedPrecondition},
code: codes.FailedPrecondition,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -518,7 +534,6 @@ func TestLoadPipeline(t *testing.T) {
}
})
}

}

func TestUnloadPipeline(t *testing.T) {
Expand Down Expand Up @@ -562,7 +577,7 @@ func TestUnloadPipeline(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path)
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path, 10)
if test.loadReq != nil {
err := test.server.pipelineHandler.AddPipeline(test.loadReq.Pipeline)
g.Expect(err).To(BeNil())
Expand All @@ -575,7 +590,6 @@ func TestUnloadPipeline(t *testing.T) {
}
})
}

}

func TestPipelineStatus(t *testing.T) {
Expand Down
19 changes: 11 additions & 8 deletions scheduler/pkg/store/experiment/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ the Change License after the Change Date as each is defined in accordance with t
package experiment

import (
"time"

"github.com/dgraph-io/badger/v3"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand All @@ -25,19 +27,21 @@ const (
)

type ExperimentDBManager struct {
db *badger.DB
logger logrus.FieldLogger
db *badger.DB
logger logrus.FieldLogger
deletedResourceTTL time.Duration
}

func newExperimentDbManager(path string, logger logrus.FieldLogger) (*ExperimentDBManager, error) {
func newExperimentDbManager(path string, logger logrus.FieldLogger, deletedResourceTTL uint) (*ExperimentDBManager, error) {
db, err := utils.Open(path, logger, "experimentDb")
if err != nil {
return nil, err
}

edb := &ExperimentDBManager{
db: db,
logger: logger,
db: db,
logger: logger,
deletedResourceTTL: time.Duration(deletedResourceTTL * uint(time.Second)),
}

version, err := edb.getVersion()
Expand Down Expand Up @@ -73,9 +77,8 @@ func (edb *ExperimentDBManager) save(experiment *Experiment) error {
return err
})
} else {
ttl := utils.DeletedResourceTTL
return edb.db.Update(func(txn *badger.Txn) error {
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(ttl)
e := badger.NewEntry([]byte(experiment.Name), experimentBytes).WithTTL(edb.deletedResourceTTL)
err = txn.SetEntry(e)
return err
})
Expand Down Expand Up @@ -119,7 +122,7 @@ func (edb *ExperimentDBManager) restore(
}
experiment := CreateExperimentFromSnapshot(&snapshot)
if experiment.Deleted {
experiment.DeletedAt = utils.GetDeletedAt(item)
experiment.DeletedAt = utils.GetDeletedAt(item, edb.deletedResourceTTL)
err = stopExperimentCb(experiment)
} else {
// otherwise attempt to start the experiment
Expand Down
18 changes: 9 additions & 9 deletions scheduler/pkg/store/experiment/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestSaveWithTTL(t *testing.T) {

path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
err = db.save(experiment)
g.Expect(err).To(BeNil())
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestSaveAndRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand All @@ -317,7 +317,7 @@ func TestSaveAndRestore(t *testing.T) {
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
for idx, p := range test.experiments {
if !test.errors[idx] {
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
g.Expect(test.experiment.Deleted).To(BeTrue(), "this is a test for deleted experiments")
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
if !test.withTTL {
err = saveWithOutTTL(&test.experiment, edb.db)
Expand All @@ -392,7 +392,7 @@ func TestSaveAndRestoreDeletedExperiments(t *testing.T) {
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())

if !test.withTTL {
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestGetExperimentFromDB(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestDeleteExperimentFromDB(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {
_ = db.Close()

// migrate
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
edb, err := newExperimentDbManager(getExperimentDbFolder(path), logger, 10)
g.Expect(err).To(BeNil())

g.Expect(err).To(BeNil())
Expand All @@ -856,7 +856,7 @@ func TestMigrateFromV1ToV2(t *testing.T) {

// check that we have no experiments in the db format
es := NewExperimentServer(log.New(), nil, nil, nil)
err = es.InitialiseOrRestoreDB(path)
err = es.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
g.Expect(len(es.experiments)).To(Equal(0))
})
Expand Down
6 changes: 3 additions & 3 deletions scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (es *ExperimentStore) addExperimentInMap(experiment *Experiment) error {
}
}

func (es *ExperimentStore) InitialiseOrRestoreDB(path string) error {
func (es *ExperimentStore) InitialiseOrRestoreDB(path string, deletedResourceTTL uint) error {
logger := es.logger.WithField("func", "initialiseDB")
experimentDbPath := getExperimentDbFolder(path)
logger.Infof("Initialise DB at %s", experimentDbPath)
err := os.MkdirAll(experimentDbPath, os.ModePerm)
if err != nil {
return err
}
db, err := newExperimentDbManager(experimentDbPath, es.logger)
db, err := newExperimentDbManager(experimentDbPath, es.logger, deletedResourceTTL)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +484,7 @@ func (es *ExperimentStore) cleanupDeletedExperiments() {
es.logger.Warnf("could not update DB TTL for experiment: %s", experiment.Name)
}
}
} else if experiment.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) {
} else if experiment.DeletedAt.Add(es.db.deletedResourceTTL).Before(time.Now()) {
delete(es.experiments, experiment.Name)
es.logger.Info("cleaning up deleted experiment: %s", experiment.Name)
}
Expand Down
12 changes: 8 additions & 4 deletions scheduler/pkg/store/experiment/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestStartExperiment(t *testing.T) {
g.Expect(err).To(BeNil())
server := NewExperimentServer(logger, eventHub, fakeModelStore{}, fakePipelineStore{})
// init db
_ = server.InitialiseOrRestoreDB(path)
_ = server.InitialiseOrRestoreDB(path, 10)
for _, ea := range test.experiments {
err := server.StartExperiment(ea.experiment)
if ea.fail {
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestStopExperiment(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())

// init db
err := test.store.InitialiseOrRestoreDB(path)
err := test.store.InitialiseOrRestoreDB(path, 1)
g.Expect(err).To(BeNil())
for _, p := range test.store.experiments {
err := test.store.db.save(p)
Expand All @@ -288,6 +288,10 @@ func TestStopExperiment(t *testing.T) {
// check db
experimentFromDB, _ := test.store.db.get(test.experimentName)
g.Expect(experimentFromDB.Deleted).To(BeTrue())

time.Sleep(1 * time.Second)
test.store.cleanupDeletedExperiments()
g.Expect(test.store.experiments[test.experimentName]).To(BeNil())
}
})
}
Expand Down Expand Up @@ -413,7 +417,7 @@ func TestRestoreExperiments(t *testing.T) {
experiments: make(map[string]*Experiment),
}
// init db
err := store.InitialiseOrRestoreDB(path)
err := store.InitialiseOrRestoreDB(path, 10)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := store.db.save(p)
Expand All @@ -422,7 +426,7 @@ func TestRestoreExperiments(t *testing.T) {
_ = store.db.Stop()

// restore from db now that we have state on disk
_ = store.InitialiseOrRestoreDB(path)
_ = store.InitialiseOrRestoreDB(path, 10)

for _, p := range test.experiments {
experimentFromDB, _ := store.db.get(p.Name)
Expand Down
Loading
Loading