Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler): Fix deleting models that are still progressing #5143

Merged
merged 14 commits into from
Oct 2, 2023
3 changes: 0 additions & 3 deletions operator/scheduler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, conn *grpc.C
)
return nil
}
if !latestModel.ObjectMeta.DeletionTimestamp.IsZero() { // Model is being deleted
return nil
}

// Handle status update
modelStatus := latestVersionStatus.GetState()
Expand Down
62 changes: 50 additions & 12 deletions scheduler/pkg/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,51 +617,89 @@ func (m *MemoryStore) addServerReplicaImpl(request *agent.AgentSubscribeRequest)
}

func (m *MemoryStore) RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) {
m.mu.Lock()
defer m.mu.Unlock()

return m.removeServerReplicaImpl(serverName, replicaIdx)
models, evts, err := m.removeServerReplicaImpl(serverName, replicaIdx)
if err != nil {
return nil, err
}
if m.eventHub != nil {
for _, evt := range evts {
m.eventHub.PublishModelEvent(
modelUpdateEventSource,
evt,
)
}
}
return models, nil
}

func (m *MemoryStore) removeServerReplicaImpl(serverName string, replicaIdx int) ([]string, error) {
func (m *MemoryStore) removeServerReplicaImpl(serverName string, replicaIdx int) ([]string, []coordinator.ModelEventMsg, error) {
m.mu.Lock()
defer m.mu.Unlock()

server, ok := m.store.servers[serverName]
if !ok {
return nil, fmt.Errorf("Failed to find server %s", serverName)
return nil, nil, fmt.Errorf("Failed to find server %s", serverName)
}
serverReplica, ok := server.replicas[replicaIdx]
if !ok {
return nil, fmt.Errorf("Failed to find replica %d for server %s", replicaIdx, serverName)
return nil, nil, fmt.Errorf("Failed to find replica %d for server %s", replicaIdx, serverName)
}
delete(server.replicas, replicaIdx)
//TODO we should not reschedule models on servers with dedicated models, e.g. non shareable servers
if len(server.replicas) == 0 {
delete(m.store.servers, serverName)
}
loadedModelsRemoved := m.removeModelfromServerReplica(serverReplica.loadedModels, replicaIdx)
loadingModelsRemoved := m.removeModelfromServerReplica(serverReplica.loadingModels, replicaIdx)
loadedModelsRemoved, loadedEvts := m.removeModelfromServerReplica(serverReplica.loadedModels, replicaIdx)
loadingModelsRemoved, loadingEtvs := m.removeModelfromServerReplica(serverReplica.loadingModels, replicaIdx)

modelsRemoved := append(loadedModelsRemoved, loadingModelsRemoved...)
evts := append(loadedEvts, loadingEtvs...)

return modelsRemoved, nil
return modelsRemoved, evts, nil
}

func (m *MemoryStore) removeModelfromServerReplica(lModels map[ModelVersionID]bool, replicaIdx int) []string {
func (m *MemoryStore) removeModelfromServerReplica(lModels map[ModelVersionID]bool, replicaIdx int) ([]string, []coordinator.ModelEventMsg) {
logger := m.logger.WithField("func", "RemoveServerReplica")
var modelNames []string
var evts []coordinator.ModelEventMsg
// Find models to reschedule due to this server replica being removed
for modelVersionID := range lModels {
model, ok := m.store.models[modelVersionID.Name]
if ok {
modelVersion := model.GetVersion(modelVersionID.Version)
if modelVersion != nil {
modelVersion.DeleteReplica(replicaIdx)
modelNames = append(modelNames, modelVersionID.Name)
if model.IsDeleted() {
// In some cases we found that the user can ask for a model to be deleted and the model replica
// is still in the process of being loaded. In this case we should not reschedule the model.
logger.Debugf(
"Model %s is being deleted and server replica %d is disconnected, skipping",
modelVersionID.Name, replicaIdx,
)
modelVersion.SetReplicaState(replicaIdx, Unloaded, "model is being deleted and server replica was removed")
m.LockModel(modelVersionID.Name)
m.updateModelStatus(
model.Latest().GetVersion() == modelVersion.GetVersion(),
model.IsDeleted(), modelVersion, model.GetLastAvailableModelVersion())
m.UnlockModel(modelVersionID.Name)
// send an event to progress the deletion
evts = append(
evts,
coordinator.ModelEventMsg{
ModelName: modelVersion.GetMeta().GetName(),
ModelVersion: modelVersion.GetVersion(),
},
)
} else {
modelNames = append(modelNames, modelVersionID.Name)
}
} else {
logger.Warnf("Can't find model version %s", modelVersionID.String())
}
}
}
return modelNames
return modelNames, evts
}

func (m *MemoryStore) DrainServerReplica(serverName string, replicaIdx int) ([]string, error) {
Expand Down
Loading