diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index f7b372ab1c3..9ded25a9cfc 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -19,17 +19,24 @@ package client import ( "context" + "errors" "fmt" snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" + "github.com/vdaas/vald/internal/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" cli "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -174,3 +181,80 @@ func (*client) LabelSelector(key string, op selection.Operator, vals []string) ( } return labels.NewSelector().Add(*requirements), nil } + +type Ssa struct { + client Client + fieldManager string +} + +func NewSsa(fieldManager string) (Ssa, error) { + client, err := New() + if err != nil { + return Ssa{}, err + } + + return Ssa{ + client: client, + fieldManager: fieldManager, + }, nil +} + +func (s *Ssa) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error { + var podList corev1.PodList + if err := s.client.List(ctx, &podList, &cli.ListOptions{ + Namespace: namespace, + FieldSelector: fields.OneTermEqualSelector("metadata.name", name), + }); err != nil { + return err + } + + if len(podList.Items) == 0 { + return errors.New("agent pod not found on exporting metrics") + } + + if len(podList.Items) >= 2 { + return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?") + } + + pod := podList.Items[0] + + // FIXME: delete this + log.Debugf("found pod: %s", pod.GetName()) + + curApplyConfig, err := applycorev1.ExtractPod(&pod, s.fieldManager) + if err != nil { + return err + } + + // try server side apply + annotations := pod.GetObjectMeta().GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + for k, v := range entries { + annotations[k] = v + } + expectPod := applycorev1.Pod(name, namespace). + WithAnnotations(annotations) + + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(expectPod) + if err != nil { + return err + } + + if equality.Semantic.DeepEqual(expectPod, curApplyConfig) { + // FIXME: delete this + log.Debug("no change in pod spec") + return nil + } + + patch := &unstructured.Unstructured{Object: obj} + if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{ + FieldManager: s.fieldManager, + Force: ptr.To(true), + }); err != nil { + return err + } + + return nil +} diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 6dfac331a5b..4fbcc142b02 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -49,11 +49,6 @@ import ( "github.com/vdaas/vald/pkg/agent/internal/kvs" "github.com/vdaas/vald/pkg/agent/internal/metadata" "github.com/vdaas/vald/pkg/agent/internal/vqueue" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" - kruntime "k8s.io/apimachinery/pkg/runtime" - applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" ) type NGT interface { @@ -154,7 +149,7 @@ type ngt struct { isReadReplica bool enableExportIndexInfo bool exportIndexInfoDuration time.Duration - client client.Client + serverSideApply client.Ssa } const ( @@ -166,6 +161,7 @@ const ( originIndexDirName = "origin" brokenIndexDirName = "broken" + fieldManager = "vald-agent-index-controller" uncommittedAnnotationsKey = "vald.vdaas.org/uncommitted" unsavedProcessedVqAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq" unsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution-num" @@ -237,11 +233,11 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { n.saving.Store(false) if n.enableExportIndexInfo { - c, err := client.New() + ssa, err := client.NewSsa(fieldManager) if err != nil { - return nil, fmt.Errorf("failed to create kubernetes client: %w", err) + return nil, fmt.Errorf("failed to create server side apply client: %w", err) } - n.client = c + n.serverSideApply = ssa } return n, nil @@ -906,7 +902,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { if n.enableExportIndexInfo { log.Debug("k8sTick: flush index info to k8s resource") k, v := n.uncommittedEntry() - err = n.updatePodAnnotations(ctx, map[string]string{k: v}) + err = n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, map[string]string{k: v}) } } if err != nil && err != errors.ErrUncommittedIndexNotFound { @@ -1844,70 +1840,6 @@ func (n *ngt) toSearchResponse(sr []algorithm.SearchResult) (res *payload.Search return res, nil } -// TODO: add processed vq num, lastSavedTimestamp etc.... -// -// wrap up common logics -> entriesを受け取りそれをただSSAで書き込むだけの部分に分ける -func (n *ngt) updatePodAnnotations(ctx context.Context, entries map[string]string) error { - // FIXME: define as const somewhere - fieldManager := "vald-agent-index-controller" - - var podList client.PodList - if err := n.client.List(ctx, &podList, &client.ListOptions{ - Namespace: n.podNamespace, - FieldSelector: fields.OneTermEqualSelector("metadata.name", n.podName), - }); err != nil { - return err - } - - if len(podList.Items) == 0 { - return errors.New("agent pod not found on exporting metrics") - } - - if len(podList.Items) >= 2 { - return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?") - } - - pod := podList.Items[0] - log.Debugf("found pod: %s", pod.GetName()) - - curApplyConfig, err := applycorev1.ExtractPod(&pod, fieldManager) - if err != nil { - return err - } - - // try server side apply - // FIXME: move this to internal/k8s - annotations := pod.GetObjectMeta().GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) - } - for k, v := range entries { - annotations[k] = v - } - expectPod := applycorev1.Pod(n.podName, n.podNamespace). - WithAnnotations(annotations) - - obj, err := kruntime.DefaultUnstructuredConverter.ToUnstructured(expectPod) - if err != nil { - return err - } - - if equality.Semantic.DeepEqual(expectPod, curApplyConfig) { - log.Debug("no change in pod spec") - return nil - } - - patch := &unstructured.Unstructured{Object: obj} - if err := n.client.Patch(ctx, patch, client.ServerSideApply, &client.PatchOptions{ - FieldManager: fieldManager, - Force: client.PointerBool(true), - }); err != nil { - return err - } - - return nil -} - func (n *ngt) uncommittedEntry() (k, v string) { return uncommittedAnnotationsKey, strconv.FormatUint(n.InsertVQueueBufferLen()+n.DeleteVQueueBufferLen(), 10) } @@ -1937,7 +1869,7 @@ func (n *ngt) exportMetricsOnCreateIndex(ctx context.Context) error { k, v = n.unsavedNumberOfCreateIndexExecutionEntry() entries[k] = v - return n.updatePodAnnotations(ctx, entries) + return n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries) } func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error { @@ -1952,5 +1884,5 @@ func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error { k, v = n.processedVqEntries() entries[k] = v - return n.updatePodAnnotations(ctx, entries) + return n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries) }