Skip to content

Commit

Permalink
feat: added dependency graph support to k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
AvineshTripathi committed Aug 16, 2024
1 parent 7315cad commit 0e1a2cf
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/oracle/oci-go-sdk v24.3.0+incompatible h1:x4mcfb4agelf1O4/1/auGlZ1lr9
github.com/oracle/oci-go-sdk v24.3.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukwStZIg5F66tcBccjip/j888=
github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0=
github.com/ovh/go-ovh v1.4.3/go.mod h1:AkPXVtgwB6xlKblMjRKJJmjRp+ogrE7fz2lVgcQY8SY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
Expand Down
8 changes: 8 additions & 0 deletions handlers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/patrickmn/go-cache"
tccommon "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"

Expand Down Expand Up @@ -50,6 +51,11 @@ import (
"github.com/tailwarden/komiser/utils"
)

const (
CACHE_DURATION = 3
CLEANUP_DURATION = 4
)

var mu sync.Mutex

func triggerFetchingWorkflow(ctx context.Context, client providers.ProviderClient, provider string, db *bun.DB, regions []string, wp *providers.WorkerPool) {
Expand Down Expand Up @@ -247,8 +253,10 @@ func makeClientFromAccount(account models.Account) (*providers.ProviderClient, e
OpencostBaseUrl: account.Credentials["opencostBaseUrl"],
}

cache := cache.New(CACHE_DURATION, CLEANUP_DURATION)
return &providers.ProviderClient{
K8sClient: &client,
Cache: cache, // Alpha feature for dependency
Name: account.Name,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/mongodb-forks/digest"
"github.com/oracle/oci-go-sdk/common"
"github.com/ovh/go-ovh/ovh"
"github.com/patrickmn/go-cache"
"github.com/scaleway/scaleway-sdk-go/scw"
"github.com/tailwarden/komiser/handlers"
"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
"github.com/tailwarden/komiser/utils"
Expand Down Expand Up @@ -241,8 +243,10 @@ func Load(configPath string, telemetry bool, analytics utils.Analytics) (*models
OpencostBaseUrl: account.OpencostBaseUrl,
}

cache := cache.New(handlers.CACHE_DURATION, handlers.CACHE_DURATION)
clients = append(clients, providers.ProviderClient{
K8sClient: &client,
Cache: cache,
Name: account.Name,
})
}
Expand Down
89 changes: 89 additions & 0 deletions providers/k8s/core/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
oc "github.com/tailwarden/komiser/providers/k8s/opencost"
"github.com/tailwarden/komiser/utils"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -68,6 +70,7 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour
ResourceId: string(pod.UID),
Name: pod.Name,
Region: pod.Namespace,
Relations: getPodRelation(pod, client),
Cost: cost,
CreatedAt: pod.CreationTimestamp.Time,
FetchedAt: time.Now(),
Expand All @@ -90,3 +93,89 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour
}).Info("Fetched resources")
return resources, nil
}

func getPodRelation(pod v1.Pod, client providers.ProviderClient) []models.Link {

var rel []models.Link
var err error
owners := pod.GetOwnerReferences()
for _, owner := range owners {
rel = append(rel, models.Link{
ResourceID: string(owner.UID),
Type: owner.Kind,
Name: owner.Name,
Relation: "USES",
})
}

// check if service was cached before
// if not call the service cache function, get list of services and cahce it for further use
services, ok := client.Cache.Get(utils.SERVICES)
if !ok {
services, err = utils.K8s_Cache(&client, utils.SERVICES)
}

if err == nil {
serviceList, _ := services.(*v1.ServiceList)

for _, service := range serviceList.Items {
selector := service.Spec.Selector
if selectorMatchesPodLabels(selector, pod.Labels) {
rel = append(rel, models.Link{
ResourceID: string(service.UID),
Type: "Service",
Name: service.Name,
Relation: "USES",
})
}
}
}

// for nodes
node, err := client.K8sClient.Client.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(node.UID),
Type: "Node",
Name: node.Name,
Relation: "USES",
})
}

// for namespace
namespace, err := client.K8sClient.Client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(namespace.UID),
Type: "Namespace",
Name: namespace.Name,
Relation: "USES",
})
}

// for serviceAccount
sa, err := client.K8sClient.Client.CoreV1().ServiceAccounts(namespace.Name).Get(context.TODO(), pod.Spec.ServiceAccountName, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(sa.UID),
Type: "ServiceAccount",
Name: sa.Name,
Relation: "USES",
})
}

return rel
}

func selectorMatchesPodLabels(selector, podLabels map[string]string) bool {
if len(selector) == 0 {
return false
}

for key, value := range selector {
if podValue, exists := podLabels[key]; !exists || podValue != value {
return false
}
}
return true
}
99 changes: 99 additions & 0 deletions providers/k8s/core/replicasets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package core

import (
"context"
"time"

log "github.com/sirupsen/logrus"

"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/api/apps/v1"
)

func Replicasets(ctx context.Context, client providers.ProviderClient) ([]models.Resource, error) {
resources := make([]models.Resource, 0)

var config metav1.ListOptions

for {
res, err := client.K8sClient.Client.AppsV1().ReplicaSets("").List(ctx, config)
if err != nil {
return nil, err
}

for _, rs := range res.Items {
tags := make([]models.Tag, 0)

for key, value := range rs.Labels {
tags = append(tags, models.Tag{
Key: key,
Value: value,
})
}

if len(rs.OwnerReferences) > 0 {
// we use the owner kind of first owner only as the owner tag
ownerTags := []models.Tag{
{
Key: "owner_kind",
Value: rs.OwnerReferences[0].Kind,
},
{
Key: "owner_name",
Value: rs.OwnerReferences[0].Name,
},
}
tags = append(tags, ownerTags...)
}


resources = append(resources, models.Resource{
Provider: "Kubernetes",
Account: client.Name,
Service: "Replicaset",
ResourceId: string(rs.UID),
Name: rs.Name,
Region: rs.Namespace,
Relations: getReplicasetRelation(rs),
Cost: 0,
CreatedAt: rs.CreationTimestamp.Time,
FetchedAt: time.Now(),
Tags: tags,
})
}

if res.GetContinue() == "" {
break
}

config.Continue = res.GetContinue()
}

log.WithFields(log.Fields{
"provider": "Kubernetes",
"account": client.Name,
"service": "Replicaset",
"resources": len(resources),
}).Info("Fetched resources")
return resources, nil
}


func getReplicasetRelation(rs v1.ReplicaSet) []models.Link {

var rel []models.Link

owners := rs.GetOwnerReferences()
for _, owner := range owners {
rel = append(rel, models.Link{
ResourceID: string(owner.UID),
Type: owner.Kind,
Name: owner.Name,
Relation: "USES",
})
}

return rel
}
2 changes: 1 addition & 1 deletion providers/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, db *bu
log.Printf("[%s][K8s] %s", client.Name, err)
} else {
for _, resource := range resources {
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost").Exec(context.Background())
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background())
if err != nil {
logrus.WithError(err).Errorf("db trigger failed")
}
Expand Down
2 changes: 2 additions & 0 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/linode/linodego"
"github.com/oracle/oci-go-sdk/common"
"github.com/ovh/go-ovh/ovh"
"github.com/patrickmn/go-cache"
"github.com/scaleway/scaleway-sdk-go/scw"
"github.com/tailwarden/komiser/models"
tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
Expand All @@ -34,6 +35,7 @@ type ProviderClient struct {
MongoDBAtlasClient *mongodbatlas.Client
GCPClient *GCPClient
OVHClient *ovh.Client
Cache *cache.Cache
Name string
}

Expand Down
28 changes: 28 additions & 0 deletions utils/k8s_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package utils

import (
"context"

"github.com/tailwarden/komiser/providers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
SERVICES = "SERVICES"
)

func K8s_Cache(client *providers.ProviderClient, callerType string) (interface{}, error) {
var response interface{}
var err error
switch callerType {
case SERVICES:
response, err = client.K8sClient.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return response, err
}
}

client.Cache.Set(callerType, response, 0)

return response, nil
}

0 comments on commit 0e1a2cf

Please sign in to comment.