From 5ef97bfffa44ec013a80364119fe47bc4188725b Mon Sep 17 00:00:00 2001 From: chankyin Date: Tue, 12 Nov 2024 18:05:34 +0800 Subject: [PATCH] feat(aggregator): add update-trigger --- aggregator/aggregator/controller.go | 21 +-- aggregator/constants/constants.go | 19 +++ aggregator/main.go | 2 + aggregator/observer/logging.go | 10 ++ aggregator/observer/metrics.go | 25 ++++ aggregator/observer/observer.go | 11 ++ aggregator/synctime/synctime.go | 9 ++ aggregator/updatetrigger/updatetrigger.go | 174 ++++++++++++++++++++++ allinone/main.go | 2 + chart/templates/_aggregator.yaml | 13 ++ chart/values.yaml | 12 ++ util/flag/map.go | 90 +++++++++++ 12 files changed, 373 insertions(+), 15 deletions(-) create mode 100644 aggregator/constants/constants.go create mode 100644 aggregator/updatetrigger/updatetrigger.go create mode 100644 util/flag/map.go diff --git a/aggregator/aggregator/controller.go b/aggregator/aggregator/controller.go index c1895a8..fe922ac 100644 --- a/aggregator/aggregator/controller.go +++ b/aggregator/aggregator/controller.go @@ -46,15 +46,11 @@ import ( "github.com/kubewharf/podseidon/util/util" "github.com/kubewharf/podseidon/util/worker" + "github.com/kubewharf/podseidon/aggregator/constants" "github.com/kubewharf/podseidon/aggregator/observer" "github.com/kubewharf/podseidon/aggregator/synctime" ) -const ( - CoreClusterName kube.ClusterName = "core" - WorkerClusterName kube.ClusterName = "worker" -) - var NewController = component.Declare( func(ControllerArgs) string { return "aggregator" }, func(args ControllerArgs, fs *flag.FlagSet) ControllerOptions { @@ -90,30 +86,25 @@ var NewController = component.Declare( } }, func(args ControllerArgs, requests *component.DepRequests) ControllerDeps { - electorArgs := kube.ElectorArgs{ - ElectorName: "aggregator", - ClusterName: WorkerClusterName, - } - return ControllerDeps{ coreClient: component.DepPtr( requests, - kube.NewClient(kube.ClientArgs{ClusterName: CoreClusterName}), + kube.NewClient(kube.ClientArgs{ClusterName: constants.CoreClusterName}), ), workerClient: component.DepPtr( requests, - kube.NewClient(kube.ClientArgs{ClusterName: WorkerClusterName}), + kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}), ), pprInformer: component.DepPtr( requests, pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{ - ClusterName: CoreClusterName, + ClusterName: constants.CoreClusterName, InformerPhase: "leader", - Elector: optional.Some(electorArgs), + Elector: optional.Some(constants.ElectorArgs), }), ), observer: o11y.Request[observer.Observer](requests), - elector: component.DepPtr(requests, kube.NewElector(electorArgs)), + elector: component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)), worker: component.DepPtr(requests, worker.New[types.NamespacedName]( "aggregator", args.Clock, diff --git a/aggregator/constants/constants.go b/aggregator/constants/constants.go new file mode 100644 index 0000000..293286f --- /dev/null +++ b/aggregator/constants/constants.go @@ -0,0 +1,19 @@ +package constants + +import "github.com/kubewharf/podseidon/util/kube" + +const ( + CoreClusterName kube.ClusterName = "core" + WorkerClusterName kube.ClusterName = "worker" +) + +const LeaderPhase kube.InformerPhase = "main" + +const ElectorName kube.ElectorName = "aggregator" + +var ElectorArgs = kube.ElectorArgs{ + ClusterName: WorkerClusterName, + ElectorName: ElectorName, +} + +const AnnotUpdateTriggerTime string = "podseidon.kubewharf.io/update-trigger-time" diff --git a/aggregator/main.go b/aggregator/main.go index 367ccd5..d2333dd 100644 --- a/aggregator/main.go +++ b/aggregator/main.go @@ -27,6 +27,7 @@ import ( "github.com/kubewharf/podseidon/aggregator/aggregator" aggregatorobserver "github.com/kubewharf/podseidon/aggregator/observer" + "github.com/kubewharf/podseidon/aggregator/updatetrigger" ) func main() { @@ -39,5 +40,6 @@ func main() { aggregatorobserver.Provide, pprutilobserver.ProvideInformer, component.RequireDep(aggregator.DefaultArg()), + component.RequireDep(updatetrigger.New(updatetrigger.Args{})), ) } diff --git a/aggregator/observer/logging.go b/aggregator/observer/logging.go index 65612f6..6617c2d 100644 --- a/aggregator/observer/logging.go +++ b/aggregator/observer/logging.go @@ -63,6 +63,16 @@ func NewLoggingObserver() Observer { NextEventPoolCurrentSize: nil, NextEventPoolCurrentLatency: nil, NextEventPoolSingleDrain: nil, + TriggerPodCreate: func(ctx context.Context, arg TriggerPodCreate) { + if arg.Err != nil { + klog.FromContext(ctx).Error(arg.Err, "error creating update-trigger pod") + } + }, + TriggerPodUpdate: func(ctx context.Context, arg TriggerPodUpdate) { + if arg.Err != nil { + klog.FromContext(ctx).Error(arg.Err, "error updating update-trigger pod") + } + }, } } diff --git a/aggregator/observer/metrics.go b/aggregator/observer/metrics.go index 56b1ad4..62ff330 100644 --- a/aggregator/observer/metrics.go +++ b/aggregator/observer/metrics.go @@ -46,6 +46,11 @@ func ProvideMetrics() component.Declared[Observer] { type reconcileStartTime struct{} + type updateTriggerTags struct { + Event string + Error string + } + reconcileHandle := metrics.Register( deps.Registry(), "aggregator_reconcile", @@ -106,6 +111,14 @@ func ProvideMetrics() component.Declared[Observer] { metrics.NewReflectTags[util.Empty](), ) + updateTriggerHandle := metrics.Register( + deps.Registry(), + "aggregator_update_trigger_spin", + "A spin of aggregator update-trigger controller", + metrics.IntCounter(), + metrics.NewReflectTags[updateTriggerTags](), + ) + return Observer{ StartReconcile: func(ctx context.Context, _ StartReconcile) (context.Context, context.CancelFunc) { ctx = context.WithValue(ctx, reconcileStartTime{}, time.Now()) @@ -164,6 +177,18 @@ func ProvideMetrics() component.Declared[Observer] { nextEventPoolDrainPeriod.Emit(timeSinceLastDrain, util.Empty{}) } }, + TriggerPodCreate: func(_ context.Context, arg TriggerPodCreate) { + updateTriggerHandle.Emit(1, updateTriggerTags{ + Event: "create", + Error: errors.SerializeTags(arg.Err), + }) + }, + TriggerPodUpdate: func(_ context.Context, arg TriggerPodUpdate) { + updateTriggerHandle.Emit(1, updateTriggerTags{ + Event: "update", + Error: errors.SerializeTags(arg.Err), + }) + }, } }, ) diff --git a/aggregator/observer/observer.go b/aggregator/observer/observer.go index aedc74a..f25e81d 100644 --- a/aggregator/observer/observer.go +++ b/aggregator/observer/observer.go @@ -42,6 +42,9 @@ type Observer struct { NextEventPoolCurrentSize o11y.MonitorFunc[util.Empty, int] NextEventPoolCurrentLatency o11y.MonitorFunc[util.Empty, time.Duration] NextEventPoolSingleDrain o11y.ObserveFunc[NextEventPoolSingleDrain] + + TriggerPodCreate o11y.ObserveFunc[TriggerPodCreate] + TriggerPodUpdate o11y.ObserveFunc[TriggerPodUpdate] } func (Observer) ComponentName() string { return "aggregator" } @@ -95,3 +98,11 @@ type NextEventPoolSingleDrain struct { // Time since the previous drain, None if this is the first run. TimeSinceLastDrain optional.Optional[time.Duration] } + +type TriggerPodCreate struct { + Err error +} + +type TriggerPodUpdate struct { + Err error +} diff --git a/aggregator/synctime/synctime.go b/aggregator/synctime/synctime.go index 47df7fd..206d219 100644 --- a/aggregator/synctime/synctime.go +++ b/aggregator/synctime/synctime.go @@ -25,6 +25,8 @@ import ( "github.com/kubewharf/podseidon/util/errors" "github.com/kubewharf/podseidon/util/optional" "github.com/kubewharf/podseidon/util/util" + + "github.com/kubewharf/podseidon/aggregator/constants" ) // PodInterpreter determines the last timestamp a pod was updated from the object. @@ -51,6 +53,13 @@ func (StatusPodInterpreter) Interpret(pod *corev1.Pod) (time.Time, error) { maxTime = pod.DeletionTimestamp.Time } + if timeStr, isUpdateTrigger := pod.Annotations[constants.AnnotUpdateTriggerTime]; isUpdateTrigger { + updateTime, err := time.Parse(time.RFC3339Nano, timeStr) + if err == nil && updateTime.After(maxTime) { + maxTime = updateTime + } + } + for _, condition := range pod.Status.Conditions { if condition.LastProbeTime.Time.After(maxTime) { maxTime = condition.LastProbeTime.Time diff --git a/aggregator/updatetrigger/updatetrigger.go b/aggregator/updatetrigger/updatetrigger.go new file mode 100644 index 0000000..decc7f7 --- /dev/null +++ b/aggregator/updatetrigger/updatetrigger.go @@ -0,0 +1,174 @@ +package updatetrigger + +import ( + "context" + "flag" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kubewharf/podseidon/util/component" + "github.com/kubewharf/podseidon/util/errors" + utilflag "github.com/kubewharf/podseidon/util/flag" + "github.com/kubewharf/podseidon/util/kube" + "github.com/kubewharf/podseidon/util/o11y" + "github.com/kubewharf/podseidon/util/util" + + "github.com/kubewharf/podseidon/aggregator/constants" + "github.com/kubewharf/podseidon/aggregator/observer" +) + +var New = component.Declare( + func(Args) string { return "update-trigger" }, + func(_ Args, fs *flag.FlagSet) Options { + return Options{ + Enable: fs.Bool("enable", false, "trigger a dummy pod update periodically to keep the watch stream alive"), + UpdateFrequency: fs.Duration("update-frequency", time.Second, "frequency of generated update requests"), + PodName: fs.String("dummy-pod-name", "podseidon-update-trigger", "name of the dummy pod"), + PodNamespace: fs.String("dummy-pod-namespace", metav1.NamespaceAll, "namespace of the dummy pod"), + PodLabels: utilflag.Map( + fs, "dummy-pod-labels", map[string]string{}, "labels of the dummy pod", + utilflag.StringParser, utilflag.StringParser, + ), + PodAnnotations: utilflag.Map( + fs, "dummy-pod-annotations", map[string]string{}, "annotations of the dummy pod", + utilflag.StringParser, utilflag.StringParser, + ), + PodImage: fs.String("dummy-pod-image", ":", "image field of the dummy pod; would not get pulled if pod is not scheduled"), + PodScheduler: fs.String( + "dummy-pod-scheduler", + "do-not-schedule", + "schedulerName field of the dummy pod; keep unchanged to avoid scheduling the pod to a node", + ), + } + }, + func(_ Args, requests *component.DepRequests) Deps { + return Deps{ + workerClient: component.DepPtr( + requests, + kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}), + ), + elector: component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)), + observer: o11y.Request[observer.Observer](requests), + } + }, + func(context.Context, Args, Options, Deps) (*State, error) { + return &State{}, nil + }, + component.Lifecycle[Args, Options, Deps, State]{ + Start: func(ctx context.Context, _ *Args, options *Options, deps *Deps, _ *State) error { + if !*options.Enable { + return nil + } + + go func() { + ctx, err := deps.elector.Get().Await(ctx) + if err != nil { + return + } + + spin := spinCreate + obs := func(err error) { + deps.observer.Get().TriggerPodCreate(ctx, observer.TriggerPodCreate{Err: err}) + } + + wait.UntilWithContext(ctx, func(ctx context.Context) { + err := spin(ctx, deps.workerClient.Get(), options) + obs(err) + + if err == nil { + spin = spinUpdate + obs = func(err error) { + deps.observer.Get().TriggerPodUpdate(ctx, observer.TriggerPodUpdate{Err: err}) + } + } + }, *options.UpdateFrequency) + }() + + return nil + }, + Join: nil, + HealthChecks: nil, + }, + func(*component.Data[Args, Options, Deps, State]) util.Empty { + return util.Empty{} + }, +) + +type Args struct{} + +type Options struct { + Enable *bool + UpdateFrequency *time.Duration + + PodName *string + PodNamespace *string + PodLabels *map[string]string + PodAnnotations *map[string]string + PodImage *string + PodScheduler *string +} + +type Deps struct { + workerClient component.Dep[*kube.Client] + elector component.Dep[*kube.Elector] + observer component.Dep[observer.Observer] +} + +type State struct{} + +func spinCreate(ctx context.Context, client *kube.Client, options *Options) error { + annotations := map[string]string{ + constants.AnnotUpdateTriggerTime: time.Now().Format(time.RFC3339Nano), + } + for k, v := range *options.PodAnnotations { + annotations[k] = v + } + + _, err := client.NativeClientSet().CoreV1().Pods(*options.PodNamespace).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: *options.PodNamespace, + Name: *options.PodName, + Labels: *options.PodLabels, + Annotations: *options.PodAnnotations, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "main", Image: *options.PodImage}, + }, + SchedulerName: *options.PodScheduler, + }, + }, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return errors.TagWrapf("CreatePod", err, "create trigger pod on worker apiserver") + } + + return nil +} + +func spinUpdate(ctx context.Context, client *kube.Client, options *Options) error { + patchJson, err := json.Marshal([]map[string]string{{ + "op": "replace", + "path": "/metadata/annotations/" + strings.ReplaceAll(constants.AnnotUpdateTriggerTime, "/", "~1"), + "value": time.Now().Format(time.RFC3339Nano), + }}) + if err != nil { + return errors.TagWrapf("EncodePatch", err, "encode patch json") + } + + _, err = client.NativeClientSet(). + CoreV1(). + Pods(*options.PodNamespace). + Patch(ctx, *options.PodName, types.JSONPatchType, patchJson, metav1.PatchOptions{}) + if err != nil { + return errors.TagWrapf("PatchPod", err, "patch trigger pod on worker apiserver") + } + + return nil +} diff --git a/allinone/main.go b/allinone/main.go index a83612a..8449bce 100644 --- a/allinone/main.go +++ b/allinone/main.go @@ -27,6 +27,7 @@ import ( "github.com/kubewharf/podseidon/aggregator/aggregator" aggregatorobserver "github.com/kubewharf/podseidon/aggregator/observer" + "github.com/kubewharf/podseidon/aggregator/updatetrigger" "github.com/kubewharf/podseidon/generator/generator" generatorobserver "github.com/kubewharf/podseidon/generator/observer" "github.com/kubewharf/podseidon/generator/resource" @@ -47,6 +48,7 @@ func main() { aggregatorobserver.Provide, webhookobserver.Provide, component.RequireDep(aggregator.DefaultArg()), + component.RequireDep(updatetrigger.New(updatetrigger.Args{})), component.RequireDep(generator.NewController( generator.ControllerArgs{ Types: []component.Declared[resource.TypeProvider]{ diff --git a/chart/templates/_aggregator.yaml b/chart/templates/_aggregator.yaml index 8ef22e1..430494b 100644 --- a/chart/templates/_aggregator.yaml +++ b/chart/templates/_aggregator.yaml @@ -56,6 +56,19 @@ aggregator-worker-concurrency: {{toJson .main.Values.aggregator.workerCount}} aggregator-cell-id: {{toJson .main.Values.release.workerCellId}} aggregator-pod-label-selector: {{toJson .main.Values.aggregator.podLabelSelector}} aggregator-informer-synctime-algorithm: {{toJson .main.Values.aggregator.syncTimeAlgorithm}} + +{{- with .main.Values.aggregator.updateTrigger}} +update-trigger-enable: {{toJson .enable}} +{{- if .enable}} +update-trigger-update-frequency: {{toJson .updateFrequency}} +update-trigger-dummy-pod-name: {{toJson .dummyPod.name}} +update-trigger-dummy-pod-namespace: {{toJson .dummyPod.namespace}} +update-trigger-dummy-pod-labels: {{toJson .dummyPod.labels}} +update-trigger-dummy-pod-annotations: {{toJson .dummyPod.annotations}} +update-trigger-dummy-pod-image: {{toJson .dummyPod.image}} +update-trigger-dummy-pod-scheduler: {{toJson .dummyPod.scheduler}} +{{- end}} +{{- end}} {{- end}} {{- define "podseidon.aggregator.env.yaml"}} diff --git a/chart/values.yaml b/chart/values.yaml index dca8d3a..bcb61b5 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -123,6 +123,18 @@ aggregator: syncTimeAlgorithm: clock # "clock" to use informer event time, "status" to use last inferred timestamp + updateTrigger: # periodically update a pod to trigger watch events + enable: false + updateFrequency: 1s # update frequency for the dummy pod + dummyPod: # metadata of the dummy pod created + name: podseidon-update-trigger + namespace: default + labels: # ensure that `labels` is matched by `podLabelSelector` above + podseidon-update-trigger-dummy-pod: "true" + annotations: {} + image: ":" # an arbitrary value for the image field, must be non-empty to allow creation + scheduler: do-not-schedule # an arbitrary name that prevents actual schedulers from assigning this pod to a node + webhook: replicas: 3 minReadySeconds: 60 diff --git a/util/flag/map.go b/util/flag/map.go new file mode 100644 index 0000000..7f3e5fa --- /dev/null +++ b/util/flag/map.go @@ -0,0 +1,90 @@ +package utilflag + +import ( + "flag" + "fmt" + "strings" + + "github.com/kubewharf/podseidon/util/errors" + "github.com/kubewharf/podseidon/util/util" +) + +var ErrNoEqual = errors.TagErrorf("ErrNoEqual", "map options expect values in the form k1=v1,k2=v2,k3=v3") + +type Parser[T any] func(string) (T, error) + +var StringParser Parser[string] = func(s string) (string, error) { return s, nil } + +// Registers a flag that accepts a comma-separated list of equal-delimited map entries. +// If the same key is specified multiple times, the last occurrence wins. +func Map[K comparable, V any]( + fs *flag.FlagSet, + name string, + defaultValue map[K]V, + usage string, + keyParser Parser[K], + valueParser Parser[V], +) *map[K]V { + value := &mapValue[K, V]{value: defaultValue, keyParser: keyParser, valueParser: valueParser} + fs.Var(value, name, usage) + + return &value.value +} + +type mapValue[K comparable, V any] struct { + value map[K]V + keyParser Parser[K] + valueParser Parser[V] +} + +func (*mapValue[K, V]) Type() string { + return fmt.Sprintf("map[%s]%s", util.TypeName[K](), util.TypeName[V]()) +} + +func (mv *mapValue[K, V]) String() string { + var output strings.Builder + + for entryKey, entryValue := range mv.value { + if output.Len() == 0 { + _, _ = output.WriteRune(',') + } + + // strings.Builder is infallible + _, _ = output.WriteString(fmt.Sprint(entryKey)) + _, _ = output.WriteRune('=') + _, _ = output.WriteString(fmt.Sprint(entryValue)) + } + + return output.String() +} + +func (mv *mapValue[K, V]) Set(input string) error { + if input == "" { + mv.value = map[K]V{} + return nil + } + + entries := strings.Split(input, ",") + mv.value = make(map[K]V, len(entries)) + + for _, entry := range entries { + eq := strings.IndexRune(entry, '=') + if eq == -1 { + return ErrNoEqual + } + + entryKey, err := mv.keyParser(entry[:eq]) + if err != nil { + return err + } + + entryValue, err := mv.valueParser(entry[eq+1:]) + if err != nil { + return err + } + + mv.value[entryKey] = entryValue + } + + return nil +}