Skip to content

Commit

Permalink
Move apply function to internal/k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Feb 8, 2024
1 parent ceeefd7 commit 8ecd778
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 76 deletions.
84 changes: 84 additions & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@ package client

import (
"context"
"errors"
"fmt"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/vdaas/vald/internal/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
cli "sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -174,3 +181,80 @@ func (*client) LabelSelector(key string, op selection.Operator, vals []string) (
}
return labels.NewSelector().Add(*requirements), nil
}

type Ssa struct {
client Client
fieldManager string
}

func NewSsa(fieldManager string) (Ssa, error) {
client, err := New()
if err != nil {
return Ssa{}, err
}

return Ssa{
client: client,
fieldManager: fieldManager,
}, nil
}

func (s *Ssa) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error {
var podList corev1.PodList
if err := s.client.List(ctx, &podList, &cli.ListOptions{
Namespace: namespace,
FieldSelector: fields.OneTermEqualSelector("metadata.name", name),
}); err != nil {
return err
}

if len(podList.Items) == 0 {
return errors.New("agent pod not found on exporting metrics")
}

if len(podList.Items) >= 2 {
return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?")
}

pod := podList.Items[0]

// FIXME: delete this
log.Debugf("found pod: %s", pod.GetName())

curApplyConfig, err := applycorev1.ExtractPod(&pod, s.fieldManager)
if err != nil {
return err
}

// try server side apply
annotations := pod.GetObjectMeta().GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
for k, v := range entries {
annotations[k] = v
}
expectPod := applycorev1.Pod(name, namespace).
WithAnnotations(annotations)

obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(expectPod)
if err != nil {
return err
}

if equality.Semantic.DeepEqual(expectPod, curApplyConfig) {
// FIXME: delete this
log.Debug("no change in pod spec")
return nil
}

patch := &unstructured.Unstructured{Object: obj}
if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{
FieldManager: s.fieldManager,
Force: ptr.To(true),
}); err != nil {
return err
}

return nil
}
84 changes: 8 additions & 76 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ import (
"github.com/vdaas/vald/pkg/agent/internal/kvs"
"github.com/vdaas/vald/pkg/agent/internal/metadata"
"github.com/vdaas/vald/pkg/agent/internal/vqueue"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
kruntime "k8s.io/apimachinery/pkg/runtime"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
)

type NGT interface {
Expand Down Expand Up @@ -154,7 +149,7 @@ type ngt struct {
isReadReplica bool
enableExportIndexInfo bool
exportIndexInfoDuration time.Duration
client client.Client
serverSideApply client.Ssa
}

const (
Expand All @@ -166,6 +161,7 @@ const (
originIndexDirName = "origin"
brokenIndexDirName = "broken"

fieldManager = "vald-agent-index-controller"
uncommittedAnnotationsKey = "vald.vdaas.org/uncommitted"
unsavedProcessedVqAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq"
unsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution-num"
Expand Down Expand Up @@ -237,11 +233,11 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) {
n.saving.Store(false)

if n.enableExportIndexInfo {
c, err := client.New()
ssa, err := client.NewSsa(fieldManager)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
return nil, fmt.Errorf("failed to create server side apply client: %w", err)
}
n.client = c
n.serverSideApply = ssa
}

return n, nil
Expand Down Expand Up @@ -906,7 +902,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
if n.enableExportIndexInfo {
log.Debug("k8sTick: flush index info to k8s resource")
k, v := n.uncommittedEntry()
err = n.updatePodAnnotations(ctx, map[string]string{k: v})
err = n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, map[string]string{k: v})
}
}
if err != nil && err != errors.ErrUncommittedIndexNotFound {
Expand Down Expand Up @@ -1844,70 +1840,6 @@ func (n *ngt) toSearchResponse(sr []algorithm.SearchResult) (res *payload.Search
return res, nil
}

// TODO: add processed vq num, lastSavedTimestamp etc....
//
// wrap up common logics -> entriesを受け取りそれをただSSAで書き込むだけの部分に分ける
func (n *ngt) updatePodAnnotations(ctx context.Context, entries map[string]string) error {
// FIXME: define as const somewhere
fieldManager := "vald-agent-index-controller"

var podList client.PodList
if err := n.client.List(ctx, &podList, &client.ListOptions{
Namespace: n.podNamespace,
FieldSelector: fields.OneTermEqualSelector("metadata.name", n.podName),
}); err != nil {
return err
}

if len(podList.Items) == 0 {
return errors.New("agent pod not found on exporting metrics")
}

if len(podList.Items) >= 2 {
return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?")
}

pod := podList.Items[0]
log.Debugf("found pod: %s", pod.GetName())

curApplyConfig, err := applycorev1.ExtractPod(&pod, fieldManager)
if err != nil {
return err
}

// try server side apply
// FIXME: move this to internal/k8s
annotations := pod.GetObjectMeta().GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
for k, v := range entries {
annotations[k] = v
}
expectPod := applycorev1.Pod(n.podName, n.podNamespace).
WithAnnotations(annotations)

obj, err := kruntime.DefaultUnstructuredConverter.ToUnstructured(expectPod)
if err != nil {
return err
}

if equality.Semantic.DeepEqual(expectPod, curApplyConfig) {
log.Debug("no change in pod spec")
return nil
}

patch := &unstructured.Unstructured{Object: obj}
if err := n.client.Patch(ctx, patch, client.ServerSideApply, &client.PatchOptions{
FieldManager: fieldManager,
Force: client.PointerBool(true),
}); err != nil {
return err
}

return nil
}

func (n *ngt) uncommittedEntry() (k, v string) {
return uncommittedAnnotationsKey, strconv.FormatUint(n.InsertVQueueBufferLen()+n.DeleteVQueueBufferLen(), 10)
}
Expand Down Expand Up @@ -1937,7 +1869,7 @@ func (n *ngt) exportMetricsOnCreateIndex(ctx context.Context) error {
k, v = n.unsavedNumberOfCreateIndexExecutionEntry()
entries[k] = v

return n.updatePodAnnotations(ctx, entries)
return n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries)
}

func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error {
Expand All @@ -1952,5 +1884,5 @@ func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error {
k, v = n.processedVqEntries()
entries[k] = v

return n.updatePodAnnotations(ctx, entries)
return n.serverSideApply.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries)
}

0 comments on commit 8ecd778

Please sign in to comment.