Skip to content

Commit

Permalink
Merge pull request #3 from SOF3/aggregator-update-trigger
Browse files Browse the repository at this point in the history
feat(aggregator): add update-trigger
  • Loading branch information
SOF3 authored Nov 24, 2024
2 parents 9c15fbf + 5ef97bf commit cde4f0b
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 15 deletions.
21 changes: 6 additions & 15 deletions aggregator/aggregator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ import (
"github.com/kubewharf/podseidon/util/util"
"github.com/kubewharf/podseidon/util/worker"

"github.com/kubewharf/podseidon/aggregator/constants"
"github.com/kubewharf/podseidon/aggregator/observer"
"github.com/kubewharf/podseidon/aggregator/synctime"
)

const (
CoreClusterName kube.ClusterName = "core"
WorkerClusterName kube.ClusterName = "worker"
)

var NewController = component.Declare(
func(ControllerArgs) string { return "aggregator" },
func(args ControllerArgs, fs *flag.FlagSet) ControllerOptions {
Expand Down Expand Up @@ -90,30 +86,25 @@ var NewController = component.Declare(
}
},
func(args ControllerArgs, requests *component.DepRequests) ControllerDeps {
electorArgs := kube.ElectorArgs{
ElectorName: "aggregator",
ClusterName: WorkerClusterName,
}

return ControllerDeps{
coreClient: component.DepPtr(
requests,
kube.NewClient(kube.ClientArgs{ClusterName: CoreClusterName}),
kube.NewClient(kube.ClientArgs{ClusterName: constants.CoreClusterName}),
),
workerClient: component.DepPtr(
requests,
kube.NewClient(kube.ClientArgs{ClusterName: WorkerClusterName}),
kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}),
),
pprInformer: component.DepPtr(
requests,
pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{
ClusterName: CoreClusterName,
ClusterName: constants.CoreClusterName,
InformerPhase: "leader",
Elector: optional.Some(electorArgs),
Elector: optional.Some(constants.ElectorArgs),
}),
),
observer: o11y.Request[observer.Observer](requests),
elector: component.DepPtr(requests, kube.NewElector(electorArgs)),
elector: component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)),
worker: component.DepPtr(requests, worker.New[types.NamespacedName](
"aggregator",
args.Clock,
Expand Down
19 changes: 19 additions & 0 deletions aggregator/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package constants

import "github.com/kubewharf/podseidon/util/kube"

const (
CoreClusterName kube.ClusterName = "core"
WorkerClusterName kube.ClusterName = "worker"
)

const LeaderPhase kube.InformerPhase = "main"

const ElectorName kube.ElectorName = "aggregator"

var ElectorArgs = kube.ElectorArgs{
ClusterName: WorkerClusterName,
ElectorName: ElectorName,
}

const AnnotUpdateTriggerTime string = "podseidon.kubewharf.io/update-trigger-time"
2 changes: 2 additions & 0 deletions aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/kubewharf/podseidon/aggregator/aggregator"
aggregatorobserver "github.com/kubewharf/podseidon/aggregator/observer"
"github.com/kubewharf/podseidon/aggregator/updatetrigger"
)

func main() {
Expand All @@ -39,5 +40,6 @@ func main() {
aggregatorobserver.Provide,
pprutilobserver.ProvideInformer,
component.RequireDep(aggregator.DefaultArg()),
component.RequireDep(updatetrigger.New(updatetrigger.Args{})),
)
}
10 changes: 10 additions & 0 deletions aggregator/observer/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func NewLoggingObserver() Observer {
NextEventPoolCurrentSize: nil,
NextEventPoolCurrentLatency: nil,
NextEventPoolSingleDrain: nil,
TriggerPodCreate: func(ctx context.Context, arg TriggerPodCreate) {
if arg.Err != nil {
klog.FromContext(ctx).Error(arg.Err, "error creating update-trigger pod")
}
},
TriggerPodUpdate: func(ctx context.Context, arg TriggerPodUpdate) {
if arg.Err != nil {
klog.FromContext(ctx).Error(arg.Err, "error updating update-trigger pod")
}
},
}
}

Expand Down
25 changes: 25 additions & 0 deletions aggregator/observer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func ProvideMetrics() component.Declared[Observer] {

type reconcileStartTime struct{}

type updateTriggerTags struct {
Event string
Error string
}

reconcileHandle := metrics.Register(
deps.Registry(),
"aggregator_reconcile",
Expand Down Expand Up @@ -106,6 +111,14 @@ func ProvideMetrics() component.Declared[Observer] {
metrics.NewReflectTags[util.Empty](),
)

updateTriggerHandle := metrics.Register(
deps.Registry(),
"aggregator_update_trigger_spin",
"A spin of aggregator update-trigger controller",
metrics.IntCounter(),
metrics.NewReflectTags[updateTriggerTags](),
)

return Observer{
StartReconcile: func(ctx context.Context, _ StartReconcile) (context.Context, context.CancelFunc) {
ctx = context.WithValue(ctx, reconcileStartTime{}, time.Now())
Expand Down Expand Up @@ -164,6 +177,18 @@ func ProvideMetrics() component.Declared[Observer] {
nextEventPoolDrainPeriod.Emit(timeSinceLastDrain, util.Empty{})
}
},
TriggerPodCreate: func(_ context.Context, arg TriggerPodCreate) {
updateTriggerHandle.Emit(1, updateTriggerTags{
Event: "create",
Error: errors.SerializeTags(arg.Err),
})
},
TriggerPodUpdate: func(_ context.Context, arg TriggerPodUpdate) {
updateTriggerHandle.Emit(1, updateTriggerTags{
Event: "update",
Error: errors.SerializeTags(arg.Err),
})
},
}
},
)
Expand Down
11 changes: 11 additions & 0 deletions aggregator/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Observer struct {
NextEventPoolCurrentSize o11y.MonitorFunc[util.Empty, int]
NextEventPoolCurrentLatency o11y.MonitorFunc[util.Empty, time.Duration]
NextEventPoolSingleDrain o11y.ObserveFunc[NextEventPoolSingleDrain]

TriggerPodCreate o11y.ObserveFunc[TriggerPodCreate]
TriggerPodUpdate o11y.ObserveFunc[TriggerPodUpdate]
}

func (Observer) ComponentName() string { return "aggregator" }
Expand Down Expand Up @@ -95,3 +98,11 @@ type NextEventPoolSingleDrain struct {
// Time since the previous drain, None if this is the first run.
TimeSinceLastDrain optional.Optional[time.Duration]
}

type TriggerPodCreate struct {
Err error
}

type TriggerPodUpdate struct {
Err error
}
9 changes: 9 additions & 0 deletions aggregator/synctime/synctime.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/kubewharf/podseidon/util/errors"
"github.com/kubewharf/podseidon/util/optional"
"github.com/kubewharf/podseidon/util/util"

"github.com/kubewharf/podseidon/aggregator/constants"
)

// PodInterpreter determines the last timestamp a pod was updated from the object.
Expand All @@ -51,6 +53,13 @@ func (StatusPodInterpreter) Interpret(pod *corev1.Pod) (time.Time, error) {
maxTime = pod.DeletionTimestamp.Time
}

if timeStr, isUpdateTrigger := pod.Annotations[constants.AnnotUpdateTriggerTime]; isUpdateTrigger {
updateTime, err := time.Parse(time.RFC3339Nano, timeStr)
if err == nil && updateTime.After(maxTime) {
maxTime = updateTime
}
}

for _, condition := range pod.Status.Conditions {
if condition.LastProbeTime.Time.After(maxTime) {
maxTime = condition.LastProbeTime.Time
Expand Down
174 changes: 174 additions & 0 deletions aggregator/updatetrigger/updatetrigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package updatetrigger

import (
"context"
"flag"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/podseidon/util/component"
"github.com/kubewharf/podseidon/util/errors"
utilflag "github.com/kubewharf/podseidon/util/flag"
"github.com/kubewharf/podseidon/util/kube"
"github.com/kubewharf/podseidon/util/o11y"
"github.com/kubewharf/podseidon/util/util"

"github.com/kubewharf/podseidon/aggregator/constants"
"github.com/kubewharf/podseidon/aggregator/observer"
)

var New = component.Declare(
func(Args) string { return "update-trigger" },
func(_ Args, fs *flag.FlagSet) Options {
return Options{
Enable: fs.Bool("enable", false, "trigger a dummy pod update periodically to keep the watch stream alive"),
UpdateFrequency: fs.Duration("update-frequency", time.Second, "frequency of generated update requests"),
PodName: fs.String("dummy-pod-name", "podseidon-update-trigger", "name of the dummy pod"),
PodNamespace: fs.String("dummy-pod-namespace", metav1.NamespaceAll, "namespace of the dummy pod"),
PodLabels: utilflag.Map(
fs, "dummy-pod-labels", map[string]string{}, "labels of the dummy pod",
utilflag.StringParser, utilflag.StringParser,
),
PodAnnotations: utilflag.Map(
fs, "dummy-pod-annotations", map[string]string{}, "annotations of the dummy pod",
utilflag.StringParser, utilflag.StringParser,
),
PodImage: fs.String("dummy-pod-image", ":", "image field of the dummy pod; would not get pulled if pod is not scheduled"),
PodScheduler: fs.String(
"dummy-pod-scheduler",
"do-not-schedule",
"schedulerName field of the dummy pod; keep unchanged to avoid scheduling the pod to a node",
),
}
},
func(_ Args, requests *component.DepRequests) Deps {
return Deps{
workerClient: component.DepPtr(
requests,
kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}),
),
elector: component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)),
observer: o11y.Request[observer.Observer](requests),
}
},
func(context.Context, Args, Options, Deps) (*State, error) {
return &State{}, nil
},
component.Lifecycle[Args, Options, Deps, State]{
Start: func(ctx context.Context, _ *Args, options *Options, deps *Deps, _ *State) error {
if !*options.Enable {
return nil
}

go func() {
ctx, err := deps.elector.Get().Await(ctx)
if err != nil {
return
}

spin := spinCreate
obs := func(err error) {
deps.observer.Get().TriggerPodCreate(ctx, observer.TriggerPodCreate{Err: err})
}

wait.UntilWithContext(ctx, func(ctx context.Context) {
err := spin(ctx, deps.workerClient.Get(), options)
obs(err)

if err == nil {
spin = spinUpdate
obs = func(err error) {
deps.observer.Get().TriggerPodUpdate(ctx, observer.TriggerPodUpdate{Err: err})
}
}
}, *options.UpdateFrequency)
}()

return nil
},
Join: nil,
HealthChecks: nil,
},
func(*component.Data[Args, Options, Deps, State]) util.Empty {
return util.Empty{}
},
)

type Args struct{}

type Options struct {
Enable *bool
UpdateFrequency *time.Duration

PodName *string
PodNamespace *string
PodLabels *map[string]string
PodAnnotations *map[string]string
PodImage *string
PodScheduler *string
}

type Deps struct {
workerClient component.Dep[*kube.Client]
elector component.Dep[*kube.Elector]
observer component.Dep[observer.Observer]
}

type State struct{}

func spinCreate(ctx context.Context, client *kube.Client, options *Options) error {
annotations := map[string]string{
constants.AnnotUpdateTriggerTime: time.Now().Format(time.RFC3339Nano),
}
for k, v := range *options.PodAnnotations {
annotations[k] = v
}

_, err := client.NativeClientSet().CoreV1().Pods(*options.PodNamespace).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: *options.PodNamespace,
Name: *options.PodName,
Labels: *options.PodLabels,
Annotations: *options.PodAnnotations,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: *options.PodImage},
},
SchedulerName: *options.PodScheduler,
},
}, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.TagWrapf("CreatePod", err, "create trigger pod on worker apiserver")
}

return nil
}

func spinUpdate(ctx context.Context, client *kube.Client, options *Options) error {
patchJson, err := json.Marshal([]map[string]string{{
"op": "replace",
"path": "/metadata/annotations/" + strings.ReplaceAll(constants.AnnotUpdateTriggerTime, "/", "~1"),
"value": time.Now().Format(time.RFC3339Nano),
}})
if err != nil {
return errors.TagWrapf("EncodePatch", err, "encode patch json")
}

_, err = client.NativeClientSet().
CoreV1().
Pods(*options.PodNamespace).
Patch(ctx, *options.PodName, types.JSONPatchType, patchJson, metav1.PatchOptions{})
if err != nil {
return errors.TagWrapf("PatchPod", err, "patch trigger pod on worker apiserver")
}

return nil
}
2 changes: 2 additions & 0 deletions allinone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/kubewharf/podseidon/aggregator/aggregator"
aggregatorobserver "github.com/kubewharf/podseidon/aggregator/observer"
"github.com/kubewharf/podseidon/aggregator/updatetrigger"
"github.com/kubewharf/podseidon/generator/generator"
"github.com/kubewharf/podseidon/generator/monitor"
generatorobserver "github.com/kubewharf/podseidon/generator/observer"
Expand All @@ -48,6 +49,7 @@ func main() {
aggregatorobserver.Provide,
webhookobserver.Provide,
component.RequireDep(aggregator.DefaultArg()),
component.RequireDep(updatetrigger.New(updatetrigger.Args{})),
component.RequireDep(generator.NewController(
generator.ControllerArgs{
Types: []component.Declared[resource.TypeProvider]{
Expand Down
Loading

0 comments on commit cde4f0b

Please sign in to comment.