Skip to content

Commit

Permalink
scheduler: optimize performance by refining transformer extension poi…
Browse files Browse the repository at this point in the history
…nts and using Skip status

Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube authored and saintube committed Sep 20, 2024
1 parent c6cddf3 commit c9f28f3
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 31 deletions.
77 changes: 53 additions & 24 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package frameworkext
import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
schedconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
Expand All @@ -51,9 +54,12 @@ type frameworkExtenderImpl struct {
koordinatorClientSet koordinatorclientset.Interface
koordinatorSharedInformerFactory koordinatorinformers.SharedInformerFactory

preFilterTransformers map[string]PreFilterTransformer
filterTransformers map[string]FilterTransformer
scoreTransformers map[string]ScoreTransformer
preFilterTransformers map[string]PreFilterTransformer
filterTransformers map[string]FilterTransformer
scoreTransformers map[string]ScoreTransformer
preFilterTransformersEnabled []PreFilterTransformer
filterTransformersEnabled []FilterTransformer
scoreTransformersEnabled []ScoreTransformer

reservationNominator ReservationNominator
reservationFilterPlugins []ReservationFilterPlugin
Expand All @@ -66,6 +72,8 @@ type frameworkExtenderImpl struct {

numaTopologyHintProviders []topologymanager.NUMATopologyHintProvider
topologyManager topologymanager.Interface

metricsRecorder *metrics.MetricAsyncRecorder
}

func NewFrameworkExtender(f *FrameworkExtenderFactory, fw framework.Framework) FrameworkExtender {
Expand All @@ -85,6 +93,7 @@ func NewFrameworkExtender(f *FrameworkExtenderFactory, fw framework.Framework) F
filterTransformers: map[string]FilterTransformer{},
scoreTransformers: map[string]ScoreTransformer{},
preBindExtensionsPlugins: map[string]PreBindExtensions{},
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, wait.NeverStop),
}
frameworkExtender.topologyManager = topologymanager.New(frameworkExtender)
return frameworkExtender
Expand Down Expand Up @@ -143,6 +152,29 @@ func (ext *frameworkExtenderImpl) updatePlugins(pl framework.Plugin) {

func (ext *frameworkExtenderImpl) SetConfiguredPlugins(plugins *schedconfig.Plugins) {
ext.configuredPlugins = plugins

for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer != nil {
ext.preFilterTransformersEnabled = append(ext.preFilterTransformersEnabled, transformer)
}
}
for _, pl := range ext.configuredPlugins.Filter.Enabled {
transformer := ext.filterTransformers[pl.Name]
if transformer != nil {
ext.filterTransformersEnabled = append(ext.filterTransformersEnabled, transformer)
}
}
for _, pl := range ext.configuredPlugins.Score.Enabled {
transformer := ext.scoreTransformers[pl.Name]
if transformer != nil {
ext.scoreTransformersEnabled = append(ext.scoreTransformersEnabled, transformer)
}
}
klog.V(5).InfoS("Set configured transformer plugins",
"PreFilterTransformer", len(ext.preFilterTransformersEnabled),
"FilterTransformer", len(ext.filterTransformersEnabled),
"ScoreTransformer", len(ext.scoreTransformersEnabled))
}

func (ext *frameworkExtenderImpl) KoordinatorClientSet() koordinatorclientset.Interface {
Expand All @@ -166,12 +198,10 @@ func (ext *frameworkExtenderImpl) GetReservationNominator() ReservationNominator

// RunPreFilterPlugins transforms the PreFilter phase of framework with pre-filter transformers.
func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) {
for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.preFilterTransformersEnabled {
startTime := time.Now()
newPod, transformed, status := transformer.BeforePreFilter(ctx, cycleState, pod)
ext.metricsRecorder.ObservePluginDurationAsync("BeforePreFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforePreFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
Expand All @@ -187,12 +217,11 @@ func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycle
return result, status
}

for _, pl := range ext.configuredPlugins.PreFilter.Enabled {
transformer := ext.preFilterTransformers[pl.Name]
if transformer == nil {
continue
}
if status := transformer.AfterPreFilter(ctx, cycleState, pod); !status.IsSuccess() {
for _, transformer := range ext.preFilterTransformersEnabled {
startTime := time.Now()
status = transformer.AfterPreFilter(ctx, cycleState, pod)
ext.metricsRecorder.ObservePluginDurationAsync("AfterPreFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run AfterPreFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
}
Expand All @@ -203,12 +232,10 @@ func (ext *frameworkExtenderImpl) RunPreFilterPlugins(ctx context.Context, cycle
// RunFilterPluginsWithNominatedPods transforms the Filter phase of framework with filter transformers.
// We don't transform RunFilterPlugins since framework's RunFilterPluginsWithNominatedPods just calls its RunFilterPlugins.
func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
for _, pl := range ext.configuredPlugins.Filter.Enabled {
transformer := ext.filterTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.filterTransformersEnabled {
startTime := time.Now()
newPod, newNodeInfo, transformed, status := transformer.BeforeFilter(ctx, cycleState, pod, nodeInfo)
ext.metricsRecorder.ObservePluginDurationAsync("BeforeFilter", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforeFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return status
Expand All @@ -227,12 +254,10 @@ func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.
}

func (ext *frameworkExtenderImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) ([]framework.NodePluginScores, *framework.Status) {
for _, pl := range ext.configuredPlugins.Score.Enabled {
transformer := ext.scoreTransformers[pl.Name]
if transformer == nil {
continue
}
for _, transformer := range ext.scoreTransformersEnabled {
startTime := time.Now()
newPod, newNodes, transformed, status := transformer.BeforeScore(ctx, state, pod, nodes)
ext.metricsRecorder.ObservePluginDurationAsync("BeforeScore", transformer.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforeScore", "pod", klog.KObj(pod), "plugin", transformer.Name())
return nil, status
Expand Down Expand Up @@ -278,7 +303,9 @@ func (ext *frameworkExtenderImpl) RunPreBindPlugins(ctx context.Context, state *
reservation = reservation.DeepCopy()
reservation.Status.NodeName = nodeName
for _, pl := range ext.reservationPreBindPlugins {
startTime := time.Now()
status := pl.PreBindReservation(ctx, state, reservation, nodeName)
ext.metricsRecorder.ObservePluginDurationAsync("PreBindReservation", pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running ReservationPreBindPlugin plugin", "plugin", pl.Name(), "reservation", klog.KObj(reservation))
Expand Down Expand Up @@ -465,7 +492,9 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,

func (ext *frameworkExtenderImpl) RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
for _, pl := range ext.resizePodPlugins {
startTime := time.Now()
status := pl.ResizePod(ctx, cycleState, pod, nodeName)
ext.metricsRecorder.ObservePluginDurationAsync("ResizePod", pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
if !status.IsSuccess() {
return status
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type ReservationScoreExtensions interface {
// ResizePodPlugin is an interface that resize the pod resource spec after reserve.
// If you want to use the feature, must enable the feature gate ResizePod=true
type ResizePodPlugin interface {
framework.Plugin
ResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (g *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState
quotaName, treeID := g.getPodAssociateQuotaNameAndTreeID(pod)
if quotaName == "" {
g.skipPostFilterState(cycleState)
return nil, framework.NewStatus(framework.Success, "")
return nil, framework.NewStatus(framework.Skip)
}

mgr := g.GetGroupQuotaManagerForTree(treeID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ func (pl *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleStat
for nodeName := range state.nodeReservationStates {
preResult.NodeNames.Insert(nodeName)
}
} else if len(state.nodeReservationStates) <= 0 { // nor available reservation neither a reserve pod
return nil, framework.NewStatus(framework.Skip)
}

return preResult, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestPreFilter(t *testing.T) {
Name: "not-reserve",
},
},
wantStatus: nil,
wantStatus: framework.NewStatus(framework.Skip),
wantPreRes: nil,
},
{
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/plugins/reservation/scoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ const (

func (pl *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) *framework.Status {
if reservationutil.IsReservePod(pod) {
return nil
return framework.NewStatus(framework.Skip)
}

// if the pod is reservation-ignored, it does not want a nominated reservation
if apiext.IsReservationIgnored(pod) {
return nil
return framework.NewStatus(framework.Skip)
}

state := getStateData(cycleState)
if len(state.nodeReservationStates) == 0 {
return nil
return framework.NewStatus(framework.Skip)
}

ctx, cancel := context.WithCancel(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/reservation/scoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestScore(t *testing.T) {
suit.start()

status := pl.PreScore(context.TODO(), cycleState, tt.pod, []*corev1.Node{node})
assert.True(t, status.IsSuccess())
assert.True(t, status.IsSuccess() || status.IsSkip())

score, status := pl.Score(context.TODO(), cycleState, tt.pod, node.Name)
assert.True(t, status.IsSuccess())
Expand Down Expand Up @@ -843,7 +843,7 @@ func TestPreScoreWithNominateReservation(t *testing.T) {
suit.start()

status := pl.PreScore(context.TODO(), cycleState, tt.pod, nodes)
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantStatus, status.IsSuccess() || status.IsSkip())

for nodeName, wantReservationInfo := range tt.wantReservation {
sort.Slice(wantReservationInfo.ResourceNames, func(i, j int) bool {
Expand Down

0 comments on commit c9f28f3

Please sign in to comment.