Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added dependency support to k8s #1487

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/oracle/oci-go-sdk v24.3.0+incompatible h1:x4mcfb4agelf1O4/1/auGlZ1lr9
github.com/oracle/oci-go-sdk v24.3.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukwStZIg5F66tcBccjip/j888=
github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0=
github.com/ovh/go-ovh v1.4.3/go.mod h1:AkPXVtgwB6xlKblMjRKJJmjRp+ogrE7fz2lVgcQY8SY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
Expand Down
8 changes: 8 additions & 0 deletions handlers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/patrickmn/go-cache"
tccommon "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"

Expand Down Expand Up @@ -50,6 +51,11 @@ import (
"github.com/tailwarden/komiser/utils"
)

const (
CACHE_DURATION = 3
CLEANUP_DURATION = 4
)

var mu sync.Mutex

func triggerFetchingWorkflow(ctx context.Context, client providers.ProviderClient, provider string, db *bun.DB, regions []string, wp *providers.WorkerPool) {
Expand Down Expand Up @@ -247,8 +253,10 @@ func makeClientFromAccount(account models.Account) (*providers.ProviderClient, e
OpencostBaseUrl: account.Credentials["opencostBaseUrl"],
}

cache := cache.New(CACHE_DURATION, CLEANUP_DURATION)
return &providers.ProviderClient{
K8sClient: &client,
Cache: cache, // Alpha feature for dependency
Name: account.Name,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/mongodb-forks/digest"
"github.com/oracle/oci-go-sdk/common"
"github.com/ovh/go-ovh/ovh"
"github.com/patrickmn/go-cache"
"github.com/scaleway/scaleway-sdk-go/scw"
"github.com/tailwarden/komiser/handlers"
"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
"github.com/tailwarden/komiser/utils"
Expand Down Expand Up @@ -241,8 +243,10 @@ func Load(configPath string, telemetry bool, analytics utils.Analytics) (*models
OpencostBaseUrl: account.OpencostBaseUrl,
}

cache := cache.New(handlers.CACHE_DURATION, handlers.CACHE_DURATION)
clients = append(clients, providers.ProviderClient{
K8sClient: &client,
Cache: cache,
Name: account.Name,
})
}
Expand Down
89 changes: 89 additions & 0 deletions providers/k8s/core/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
oc "github.com/tailwarden/komiser/providers/k8s/opencost"
"github.com/tailwarden/komiser/utils"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -68,6 +70,7 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour
ResourceId: string(pod.UID),
Name: pod.Name,
Region: pod.Namespace,
Relations: getPodRelation(pod, client),
Cost: cost,
CreatedAt: pod.CreationTimestamp.Time,
FetchedAt: time.Now(),
Expand All @@ -90,3 +93,89 @@ func Pods(ctx context.Context, client providers.ProviderClient) ([]models.Resour
}).Info("Fetched resources")
return resources, nil
}

func getPodRelation(pod v1.Pod, client providers.ProviderClient) []models.Link {

var rel []models.Link
var err error
owners := pod.GetOwnerReferences()
for _, owner := range owners {
rel = append(rel, models.Link{
ResourceID: string(owner.UID),
Type: owner.Kind,
Name: owner.Name,
Relation: "USES",
})
}

// check if service was cached before
// if not call the service cache function, get list of services and cahce it for further use
services, ok := client.Cache.Get(utils.SERVICES)
if !ok {
services, err = utils.K8s_Cache(&client, utils.SERVICES)
}

if err == nil {
serviceList, _ := services.(*v1.ServiceList)

for _, service := range serviceList.Items {
selector := service.Spec.Selector
if selectorMatchesPodLabels(selector, pod.Labels) {
rel = append(rel, models.Link{
ResourceID: string(service.UID),
Type: "Service",
Name: service.Name,
Relation: "USES",
})
}
}
}

// for nodes
node, err := client.K8sClient.Client.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(node.UID),
Type: "Node",
Name: node.Name,
Relation: "USES",
})
}

// for namespace
namespace, err := client.K8sClient.Client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(namespace.UID),
Type: "Namespace",
Name: namespace.Name,
Relation: "USES",
})
}

// for serviceAccount
sa, err := client.K8sClient.Client.CoreV1().ServiceAccounts(namespace.Name).Get(context.TODO(), pod.Spec.ServiceAccountName, metav1.GetOptions{})
if err == nil {
rel = append(rel, models.Link{
ResourceID: string(sa.UID),
Type: "ServiceAccount",
Name: sa.Name,
Relation: "USES",
})
}

return rel
}

func selectorMatchesPodLabels(selector, podLabels map[string]string) bool {
if len(selector) == 0 {
return false
}

for key, value := range selector {
if podValue, exists := podLabels[key]; !exists || podValue != value {
return false
}
}
return true
}
99 changes: 99 additions & 0 deletions providers/k8s/core/replicasets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package core

import (
"context"
"time"

log "github.com/sirupsen/logrus"

"github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/api/apps/v1"
)

func Replicasets(ctx context.Context, client providers.ProviderClient) ([]models.Resource, error) {
resources := make([]models.Resource, 0)

var config metav1.ListOptions

for {
res, err := client.K8sClient.Client.AppsV1().ReplicaSets("").List(ctx, config)
if err != nil {
return nil, err
}

for _, rs := range res.Items {
tags := make([]models.Tag, 0)

for key, value := range rs.Labels {
tags = append(tags, models.Tag{
Key: key,
Value: value,
})
}

if len(rs.OwnerReferences) > 0 {
// we use the owner kind of first owner only as the owner tag
ownerTags := []models.Tag{
{
Key: "owner_kind",
Value: rs.OwnerReferences[0].Kind,
},
{
Key: "owner_name",
Value: rs.OwnerReferences[0].Name,
},
}
tags = append(tags, ownerTags...)
}


resources = append(resources, models.Resource{
Provider: "Kubernetes",
Account: client.Name,
Service: "Replicaset",
ResourceId: string(rs.UID),
Name: rs.Name,
Region: rs.Namespace,
Relations: getReplicasetRelation(rs),
Cost: 0,
CreatedAt: rs.CreationTimestamp.Time,
FetchedAt: time.Now(),
Tags: tags,
})
}

if res.GetContinue() == "" {
break
}

config.Continue = res.GetContinue()
}

log.WithFields(log.Fields{
"provider": "Kubernetes",
"account": client.Name,
"service": "Replicaset",
"resources": len(resources),
}).Info("Fetched resources")
return resources, nil
}


func getReplicasetRelation(rs v1.ReplicaSet) []models.Link {

var rel []models.Link

owners := rs.GetOwnerReferences()
for _, owner := range owners {
rel = append(rel, models.Link{
ResourceID: string(owner.UID),
Type: owner.Kind,
Name: owner.Name,
Relation: "USES",
})
}

return rel
}
2 changes: 1 addition & 1 deletion providers/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, db *bu
log.Printf("[%s][K8s] %s", client.Name, err)
} else {
for _, resource := range resources {
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost").Exec(context.Background())
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain

Copy link
Collaborator Author

@AvineshTripathi AvineshTripathi Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the resource or the relation was deleted, in the next cycle it will not be updated if this is not added

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cost = EXCLUDED.cost means that if a conflict occurs, the cost column of the existing row will be updated with the value from the new row being inserted. Same goes for relations

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats correct and we want to ensure this field gets updated

if err != nil {
logrus.WithError(err).Errorf("db trigger failed")
}
Expand Down
2 changes: 2 additions & 0 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/linode/linodego"
"github.com/oracle/oci-go-sdk/common"
"github.com/ovh/go-ovh/ovh"
"github.com/patrickmn/go-cache"
"github.com/scaleway/scaleway-sdk-go/scw"
"github.com/tailwarden/komiser/models"
tccvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
Expand All @@ -34,6 +35,7 @@ type ProviderClient struct {
MongoDBAtlasClient *mongodbatlas.Client
GCPClient *GCPClient
OVHClient *ovh.Client
Cache *cache.Cache
Name string
}

Expand Down
28 changes: 28 additions & 0 deletions utils/k8s_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package utils

import (
"context"

"github.com/tailwarden/komiser/providers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
SERVICES = "SERVICES"
)

func K8s_Cache(client *providers.ProviderClient, callerType string) (interface{}, error) {
var response interface{}
var err error
switch callerType {
case SERVICES:
response, err = client.K8sClient.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return response, err
}
}

client.Cache.Set(callerType, response, 0)

return response, nil
}
Loading