From 25813ff43b6725e8bf477801d4e4e9cf6883faec Mon Sep 17 00:00:00 2001 From: Anurag Rajawat Date: Thu, 11 Jul 2024 19:17:12 +0530 Subject: [PATCH] fix: Optimise performance Signed-off-by: Anurag Rajawat --- relay-server/common/common.go | 11 ---- relay-server/go.mod | 5 +- relay-server/server/k8sHandler.go | 93 ++++++++++++++++++------------ relay-server/server/relayServer.go | 55 ++++++++---------- 4 files changed, 81 insertions(+), 83 deletions(-) diff --git a/relay-server/common/common.go b/relay-server/common/common.go index 889c368..c3a2669 100644 --- a/relay-server/common/common.go +++ b/relay-server/common/common.go @@ -4,21 +4,10 @@ package common import ( - "encoding/json" "os" "path/filepath" ) -// ============ // -// == Common == // -// ============ // - -// Clone Function -func Clone(src, dst interface{}) error { - arr, _ := json.Marshal(src) - return json.Unmarshal(arr, dst) -} - // ================ // // == Kubernetes == // // ================ // diff --git a/relay-server/go.mod b/relay-server/go.mod index bbcb47e..8414ba5 100644 --- a/relay-server/go.mod +++ b/relay-server/go.mod @@ -13,6 +13,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/dustin/go-humanize v1.0.1 github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342 github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9 @@ -20,6 +21,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 google.golang.org/grpc v1.63.2 + k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 ) @@ -33,8 +35,8 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -71,7 +73,6 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.2 // indirect k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect diff --git a/relay-server/server/k8sHandler.go b/relay-server/server/k8sHandler.go index da73177..f19d436 100644 --- a/relay-server/server/k8sHandler.go +++ b/relay-server/server/k8sHandler.go @@ -14,16 +14,20 @@ import ( "net/http" "os" "path/filepath" - "reflect" "strings" + "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - rest "k8s.io/client-go/rest" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -228,52 +232,65 @@ func (kh *K8sHandler) DoRequest(cmd string, data interface{}, path string) ([]by return resBody, nil } -// ========== // -// == Pods == // -// ========== // +func (kh *K8sHandler) WatchKubeArmorPods(ctx context.Context, wg *sync.WaitGroup, ipsChan chan string) { + defer func() { + close(ipsChan) + wg.Done() + }() -func containsElement(slice interface{}, element interface{}) bool { - switch reflect.TypeOf(slice).Kind() { - case reflect.Slice: - s := reflect.ValueOf(slice) + // Get the KubeArmor pods IP that were added before relay itself. + once := sync.Once{} + once.Do(func() { + kh.findExistingKaPodsIp(ctx, ipsChan) + }) - for i := 0; i < s.Len(); i++ { - val := s.Index(i).Interface() - if reflect.DeepEqual(val, element) { - return true - } - } - } - return false + podInformer := kh.getKaPodInformer(ipsChan) + podInformer.Run(ctx.Done()) } -// GetKubeArmorNodes Function -func (kh *K8sHandler) GetKubeArmorNodes() []string { - nodeIPs := []string{} +func (kh *K8sHandler) getKaPodInformer(ipsChan chan string) cache.SharedIndexInformer { + option := informers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.LabelSelector = "kubearmor-app=kubearmor" + }) + + factory := informers.NewSharedInformerFactoryWithOptions(kh.K8sClient, 0, option) + informer := factory.Core().V1().Pods().Informer() + + _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if ok { + if pod.Status.PodIP != "" { + ipsChan <- pod.Status.PodIP + } + } + }, + UpdateFunc: func(old, new interface{}) { + newPod, ok := new.(*corev1.Pod) + if ok { + if newPod.Status.PodIP != "" { + ipsChan <- newPod.Status.PodIP + } + } + }, + }) + + return informer +} - if !kl.IsK8sEnv() { // not Kubernetes - return nodeIPs - } +func (kh *K8sHandler) findExistingKaPodsIp(ctx context.Context, ipsChan chan string) { + pods, err := kh.K8sClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: "kubearmor-app=kubearmor", + }) - pods, err := kh.K8sClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) if err != nil { - return nodeIPs + kg.Errf("failed to list KubeArmor pods: %v", err) + return } for _, pod := range pods.Items { - if val, ok := pod.ObjectMeta.Labels["kubearmor-app"]; !ok { - continue - } else if val != "kubearmor" { - continue - } - if pod.Status.PodIP == "" { - kg.Printf("pod.Status=%+v", pod.Status) - } - - if pod.Status.PodIP != "" && !containsElement(nodeIPs, pod.Status.PodIP) { - nodeIPs = append(nodeIPs, pod.Status.PodIP) + if pod.Status.PodIP != "" { + ipsChan <- pod.Status.PodIP } } - - return nodeIPs } diff --git a/relay-server/server/relayServer.go b/relay-server/server/relayServer.go index 6c46030..5770faf 100644 --- a/relay-server/server/relayServer.go +++ b/relay-server/server/relayServer.go @@ -23,7 +23,6 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" - kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common" cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config" kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log" ) @@ -434,13 +433,7 @@ func (rs *RelayServer) AddMsgFromBuffChan() { for Running { select { - case res := <-MsgBufferChannel: - msg := pb.Message{} - - if err := kl.Clone(*res, &msg); err != nil { - kg.Warnf("Failed to clone a message (%v)", *res) - continue - } + case msg := <-MsgBufferChannel: if stdoutmsg { tel, _ := json.Marshal(msg) fmt.Printf("%s\n", string(tel)) @@ -448,7 +441,7 @@ func (rs *RelayServer) AddMsgFromBuffChan() { MsgLock.RLock() for uid := range MsgStructs { select { - case MsgStructs[uid].Broadcast <- (&msg): + case MsgStructs[uid].Broadcast <- msg: default: } } @@ -493,18 +486,12 @@ func (lc *LogClient) WatchAlerts(wg *sync.WaitGroup, stop chan struct{}, errCh c kg.Print("Stopped watching alerts from " + lc.Server) } -// AddAlertFromBuffChan Adds ALert from AlertBufferChannel into AlertStructs +// AddAlertFromBuffChan Adds Alert from AlertBufferChannel into AlertStructs func (rs *RelayServer) AddAlertFromBuffChan() { for Running { select { - case res := <-AlertBufferChannel: - alert := pb.Alert{} - - if err := kl.Clone(*res, &alert); err != nil { - kg.Warnf("Failed to clone an alert (%v)", *res) - continue - } + case alert := <-AlertBufferChannel: if stdoutalerts { tel, _ := json.Marshal(alert) fmt.Printf("%s\n", string(tel)) @@ -512,7 +499,7 @@ func (rs *RelayServer) AddAlertFromBuffChan() { AlertLock.RLock() for uid := range AlertStructs { select { - case AlertStructs[uid].Broadcast <- (&alert): + case AlertStructs[uid].Broadcast <- alert: default: } } @@ -561,18 +548,14 @@ func (rs *RelayServer) AddLogFromBuffChan() { for Running { select { - case res := <-LogBufferChannel: - log := pb.Log{} - if err := kl.Clone(*res, &log); err != nil { - kg.Warnf("Failed to clone a log (%v)", *res) - } + case log := <-LogBufferChannel: if stdoutlogs { tel, _ := json.Marshal(log) fmt.Printf("%s\n", string(tel)) } for uid := range LogStructs { select { - case LogStructs[uid].Broadcast <- (&log): + case LogStructs[uid].Broadcast <- log: default: } } @@ -816,19 +799,27 @@ func (rs *RelayServer) GetFeedsFromNodes() { if K8s.InitK8sClient() { kg.Print("Initialized the Kubernetes client") + ipsChan := make(chan string) + if Running { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rs.WgServer.Add(1) + go K8s.WatchKubeArmorPods(ctx, &rs.WgServer, ipsChan) + } else { + close(ipsChan) + } + for Running { - newNodes := K8s.GetKubeArmorNodes() - for _, nodeIP := range newNodes { + select { + case ip := <-ipsChan: ClientListLock.Lock() - if _, ok := ClientList[nodeIP]; !ok { - ClientList[nodeIP] = 1 - go connectToKubeArmor(nodeIP, rs.Port) + if _, ok := ClientList[ip]; !ok { + ClientList[ip] = 1 + go connectToKubeArmor(ip, rs.Port) } ClientListLock.Unlock() } - - time.Sleep(time.Second * 1) + time.Sleep(10 * time.Second) } - } }