From 0e1a2cfdb451f3c3b8b7b4a0eeea863c2ed2c9ef Mon Sep 17 00:00:00 2001 From: AvineshTripathi Date: Thu, 8 Aug 2024 00:39:53 +0530 Subject: [PATCH] feat: added dependency graph support to k8s --- go.mod | 1 + go.sum | 2 + handlers/helper.go | 8 +++ internal/config/load.go | 4 ++ providers/k8s/core/pods.go | 89 +++++++++++++++++++++++++++ providers/k8s/core/replicasets.go | 99 +++++++++++++++++++++++++++++++ providers/k8s/k8s.go | 2 +- providers/providers.go | 2 + utils/k8s_cache.go | 28 +++++++++ 9 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 providers/k8s/core/replicasets.go create mode 100644 utils/k8s_cache.go diff --git a/go.mod b/go.mod index 156d5d38f..530924274 100644 --- a/go.mod +++ b/go.mod @@ -105,6 +105,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/s2a-go v0.1.7 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect diff --git a/go.sum b/go.sum index b4523d826..e4ddefe21 100644 --- a/go.sum +++ b/go.sum @@ -420,6 +420,8 @@ github.com/oracle/oci-go-sdk v24.3.0+incompatible h1:x4mcfb4agelf1O4/1/auGlZ1lr9 github.com/oracle/oci-go-sdk v24.3.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukwStZIg5F66tcBccjip/j888= github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0= github.com/ovh/go-ovh v1.4.3/go.mod h1:AkPXVtgwB6xlKblMjRKJJmjRp+ogrE7fz2lVgcQY8SY= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI= github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= diff --git a/handlers/helper.go b/handlers/helper.go index 9fc840ac9..4b65be5b1 100644 --- a/handlers/helper.go +++ b/handlers/helper.go @@ -10,6 +10,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/patrickmn/go-cache" tccommon "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312" @@ -50,6 +51,11 @@ import ( "github.com/tailwarden/komiser/utils" ) +const ( + CACHE_DURATION = 3 + CLEANUP_DURATION = 4 +) + var mu sync.Mutex func triggerFetchingWorkflow(ctx context.Context, client providers.ProviderClient, provider string, db *bun.DB, regions []string, wp *providers.WorkerPool) { @@ -247,8 +253,10 @@ func makeClientFromAccount(account models.Account) (*providers.ProviderClient, e OpencostBaseUrl: account.Credentials["opencostBaseUrl"], } + cache := cache.New(CACHE_DURATION, CLEANUP_DURATION) return &providers.ProviderClient{ K8sClient: &client, + Cache: cache, // Alpha feature for dependency Name: account.Name, }, nil } diff --git a/internal/config/load.go b/internal/config/load.go index 9f11119ec..05ae373b6 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -19,7 +19,9 @@ import ( "github.com/mongodb-forks/digest" "github.com/oracle/oci-go-sdk/common" "github.com/ovh/go-ovh/ovh" + "github.com/patrickmn/go-cache" "github.com/scaleway/scaleway-sdk-go/scw" + "github.com/tailwarden/komiser/handlers" "github.com/tailwarden/komiser/models" "github.com/tailwarden/komiser/providers" "github.com/tailwarden/komiser/utils" @@ -241,8 +243,10 @@ func Load(configPath string, telemetry bool, analytics utils.Analytics) (*models OpencostBaseUrl: account.OpencostBaseUrl, } + cache := cache.New(handlers.CACHE_DURATION, handlers.CACHE_DURATION) clients = append(clients, providers.ProviderClient{ K8sClient: &client, + Cache: cache, Name: account.Name, }) } diff --git a/providers/k8s/core/pods.go b/providers/k8s/core/pods.go index aa157ad22..f85b9f9f3 100644 --- a/providers/k8s/core/pods.go +++ b/providers/k8s/core/pods.go @@ -9,6 +9,8 @@ import ( "github.com/tailwarden/komiser/models" "github.com/tailwarden/komiser/providers" oc "github.com/tailwarden/komiser/providers/k8s/opencost" + "github.com/tailwarden/komiser/utils" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -68,6 +70,7 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour ResourceId: string(pod.UID), Name: pod.Name, Region: pod.Namespace, + Relations: getPodRelation(pod, client), Cost: cost, CreatedAt: pod.CreationTimestamp.Time, FetchedAt: time.Now(), @@ -90,3 +93,89 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour }).Info("Fetched resources") return resources, nil } + +func getPodRelation(pod v1.Pod, client providers.ProviderClient) []models.Link { + + var rel []models.Link + var err error + owners := pod.GetOwnerReferences() + for _, owner := range owners { + rel = append(rel, models.Link{ + ResourceID: string(owner.UID), + Type: owner.Kind, + Name: owner.Name, + Relation: "USES", + }) + } + + // check if service was cached before + // if not call the service cache function, get list of services and cahce it for further use + services, ok := client.Cache.Get(utils.SERVICES) + if !ok { + services, err = utils.K8s_Cache(&client, utils.SERVICES) + } + + if err == nil { + serviceList, _ := services.(*v1.ServiceList) + + for _, service := range serviceList.Items { + selector := service.Spec.Selector + if selectorMatchesPodLabels(selector, pod.Labels) { + rel = append(rel, models.Link{ + ResourceID: string(service.UID), + Type: "Service", + Name: service.Name, + Relation: "USES", + }) + } + } + } + + // for nodes + node, err := client.K8sClient.Client.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) + if err == nil { + rel = append(rel, models.Link{ + ResourceID: string(node.UID), + Type: "Node", + Name: node.Name, + Relation: "USES", + }) + } + + // for namespace + namespace, err := client.K8sClient.Client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{}) + if err == nil { + rel = append(rel, models.Link{ + ResourceID: string(namespace.UID), + Type: "Namespace", + Name: namespace.Name, + Relation: "USES", + }) + } + + // for serviceAccount + sa, err := client.K8sClient.Client.CoreV1().ServiceAccounts(namespace.Name).Get(context.TODO(), pod.Spec.ServiceAccountName, metav1.GetOptions{}) + if err == nil { + rel = append(rel, models.Link{ + ResourceID: string(sa.UID), + Type: "ServiceAccount", + Name: sa.Name, + Relation: "USES", + }) + } + + return rel +} + +func selectorMatchesPodLabels(selector, podLabels map[string]string) bool { + if len(selector) == 0 { + return false + } + + for key, value := range selector { + if podValue, exists := podLabels[key]; !exists || podValue != value { + return false + } + } + return true +} diff --git a/providers/k8s/core/replicasets.go b/providers/k8s/core/replicasets.go new file mode 100644 index 000000000..57b0ce6b6 --- /dev/null +++ b/providers/k8s/core/replicasets.go @@ -0,0 +1,99 @@ +package core + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/tailwarden/komiser/models" + "github.com/tailwarden/komiser/providers" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/api/apps/v1" +) + +func Replicasets(ctx context.Context, client providers.ProviderClient) ([]models.Resource, error) { + resources := make([]models.Resource, 0) + + var config metav1.ListOptions + + for { + res, err := client.K8sClient.Client.AppsV1().ReplicaSets("").List(ctx, config) + if err != nil { + return nil, err + } + + for _, rs := range res.Items { + tags := make([]models.Tag, 0) + + for key, value := range rs.Labels { + tags = append(tags, models.Tag{ + Key: key, + Value: value, + }) + } + + if len(rs.OwnerReferences) > 0 { + // we use the owner kind of first owner only as the owner tag + ownerTags := []models.Tag{ + { + Key: "owner_kind", + Value: rs.OwnerReferences[0].Kind, + }, + { + Key: "owner_name", + Value: rs.OwnerReferences[0].Name, + }, + } + tags = append(tags, ownerTags...) + } + + + resources = append(resources, models.Resource{ + Provider: "Kubernetes", + Account: client.Name, + Service: "Replicaset", + ResourceId: string(rs.UID), + Name: rs.Name, + Region: rs.Namespace, + Relations: getReplicasetRelation(rs), + Cost: 0, + CreatedAt: rs.CreationTimestamp.Time, + FetchedAt: time.Now(), + Tags: tags, + }) + } + + if res.GetContinue() == "" { + break + } + + config.Continue = res.GetContinue() + } + + log.WithFields(log.Fields{ + "provider": "Kubernetes", + "account": client.Name, + "service": "Replicaset", + "resources": len(resources), + }).Info("Fetched resources") + return resources, nil +} + + +func getReplicasetRelation(rs v1.ReplicaSet) []models.Link { + + var rel []models.Link + + owners := rs.GetOwnerReferences() + for _, owner := range owners { + rel = append(rel, models.Link{ + ResourceID: string(owner.UID), + Type: owner.Kind, + Name: owner.Name, + Relation: "USES", + }) + } + + return rel +} \ No newline at end of file diff --git a/providers/k8s/k8s.go b/providers/k8s/k8s.go index b8c792d91..530d98942 100644 --- a/providers/k8s/k8s.go +++ b/providers/k8s/k8s.go @@ -37,7 +37,7 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, db *bu log.Printf("[%s][K8s] %s", client.Name, err) } else { for _, resource := range resources { - _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost").Exec(context.Background()) + _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) if err != nil { logrus.WithError(err).Errorf("db trigger failed") } diff --git a/providers/providers.go b/providers/providers.go index c3ad76191..965f55c52 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -11,6 +11,7 @@ import ( "github.com/linode/linodego" "github.com/oracle/oci-go-sdk/common" "github.com/ovh/go-ovh/ovh" + "github.com/patrickmn/go-cache" "github.com/scaleway/scaleway-sdk-go/scw" "github.com/tailwarden/komiser/models" tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312" @@ -34,6 +35,7 @@ type ProviderClient struct { MongoDBAtlasClient *mongodbatlas.Client GCPClient *GCPClient OVHClient *ovh.Client + Cache *cache.Cache Name string } diff --git a/utils/k8s_cache.go b/utils/k8s_cache.go new file mode 100644 index 000000000..725fc959c --- /dev/null +++ b/utils/k8s_cache.go @@ -0,0 +1,28 @@ +package utils + +import ( + "context" + + "github.com/tailwarden/komiser/providers" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + SERVICES = "SERVICES" +) + +func K8s_Cache(client *providers.ProviderClient, callerType string) (interface{}, error) { + var response interface{} + var err error + switch callerType { + case SERVICES: + response, err = client.K8sClient.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + return response, err + } + } + + client.Cache.Set(callerType, response, 0) + + return response, nil +}