Skip to content

Commit

Permalink
Merge pull request #119 from Nithunikzz/shedule
Browse files Browse the repository at this point in the history
added shedule and image scanning
  • Loading branch information
avinashkna4 authored Jul 30, 2023
2 parents bcb4e1d + d871e2d commit 78bf20f
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 31 deletions.
71 changes: 47 additions & 24 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package main

import (
"encoding/json"
"github.com/intelops/kubviz/constants"
"github.com/nats-io/nats.go"
"log"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/go-co-op/gocron"
"github.com/nats-io/nats.go"

"context"

"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,6 +35,8 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

// constants for jetstream

type RuningEnv int

const (
Expand All @@ -48,7 +53,8 @@ var (
//for local testing provide the location of kubeconfig
// inside the civo file paste your kubeconfig
// uncomment this line from Dockerfile.Kubviz (COPY --from=builder /workspace/civo /etc/myapp/civo)
cluster_conf_loc string = os.Getenv("CONFIG_LOCATION")
cluster_conf_loc string = os.Getenv("CONFIG_LOCATION")
schedulingIntervalStr string = os.Getenv("SCHEDULING_INTERVAL")
)

func main() {
Expand All @@ -57,17 +63,18 @@ func main() {
outdatedErrChan := make(chan error, 1)
kubePreUpgradeChan := make(chan error, 1)
getAllResourceChan := make(chan error, 1)
trivyK8sMetricsChan := make(chan error, 1)
clusterMetricsChan := make(chan error, 1)
kubescoreMetricsChan := make(chan error, 1)
trivyK8sMetricsChan := make(chan error, 1)
trivyImagescanChan := make(chan error, 1)
RakeesErrChan := make(chan error, 1)
var (
wg sync.WaitGroup
config *rest.Config
clientset *kubernetes.Clientset
)
// waiting for 7 go routines
wg.Add(7)
// waiting for 4 go routines
wg.Add(8)
// connecting with nats ...
nc, err := nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token))
checkErr(err)
Expand All @@ -92,23 +99,35 @@ func main() {
clientset = getK8sClient(config)
}
// starting all the go routines
go outDatedImages(config, js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan)
go GetAllResources(config, js, &wg, getAllResourceChan)
go RakeesOutput(config, js, &wg, RakeesErrChan)
go getK8sEvents(clientset)
go publishMetrics(clientset, js, &wg, clusterMetricsChan)
go RunKubeScore(clientset, js, &wg, kubescoreMetricsChan)
go RunTrivyK8sClusterScan(&wg, js, trivyK8sMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
close(clusterMetricsChan)
close(kubescoreMetricsChan)
close(RakeesErrChan)
close(trivyK8sMetricsChan)
collectAndPublishMetrics := func() {
go outDatedImages(config, js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan)
go GetAllResources(config, js, &wg, getAllResourceChan)
go RakeesOutput(config, js, &wg, RakeesErrChan)
go getK8sEvents(clientset)
go RunTrivyImageScans(config, js, &wg, trivyImagescanChan)
go publishMetrics(clientset, js, &wg, clusterMetricsChan)
go RunKubeScore(clientset, js, &wg, kubescoreMetricsChan)
go RunTrivyK8sClusterScan(&wg, js, trivyK8sMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
close(clusterMetricsChan)
close(kubescoreMetricsChan)
close(trivyImagescanChan)
close(trivyK8sMetricsChan)
close(RakeesErrChan)
}
collectAndPublishMetrics()
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(collectAndPublishMetrics) // Adjust the interval as needed
s.StartAsync()
// for loop will wait for the error channels
// logs if any error occurs
for {
Expand All @@ -133,14 +152,18 @@ func main() {
if err != nil {
log.Println(err)
}
case err := <-RakeesErrChan:
case err := <-trivyImagescanChan:
if err != nil {
log.Println(err)
}
case err := <-trivyK8sMetricsChan:
if err != nil {
log.Println(err)
}
case err := <-RakeesErrChan:
if err != nil {
log.Println(err)
}
}
}

Expand Down
67 changes: 67 additions & 0 deletions agent/kubviz/trivy_image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"encoding/json"
"log"
"strings"
"sync"

"github.com/aquasecurity/trivy/pkg/types"
"github.com/google/uuid"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"
"github.com/nats-io/nats.go"
"k8s.io/client-go/rest"
)

func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
images, err := ListImages(config)
if err != nil {
log.Fatal(err)
}

for _, image := range images {
var report types.Report
out, err := executeCommand("trivy image " + image.PullableImage + " --timeout 60m -f json -q --cache-dir /tmp/.cache")
if err != nil {
log.Printf("Error scanning image %s: %v", image.PullableImage, err)
continue // Move on to the next image in case of an error
}

parts := strings.SplitN(out, "{", 2)
if len(parts) <= 1 {
log.Println("No output from command", err)
continue // Move on to the next image if there's no output
}

log.Println("Command logs", parts[0])
jsonPart := "{" + parts[1]
log.Println("First 200 lines output", jsonPart[:200])
log.Println("Last 200 lines output", jsonPart[len(jsonPart)-200:])

err = json.Unmarshal([]byte(jsonPart), &report)
if err != nil {
log.Printf("Error occurred while Unmarshalling json: %v", err)
continue // Move on to the next image in case of an error
}
publishImageScanReports(report, js, errCh)
// If you want to publish the report or perform any other action with it, you can do it here

}
}

func publishImageScanReports(report types.Report, js nats.JetStreamContext, errCh chan error) {
metrics := model.TrivyImage{
ID: uuid.New().String(),
ClusterName: ClusterName,
Report: report,
}
metricsJson, _ := json.Marshal(metrics)
_, err := js.Publish(constants.TRIVY_IMAGE_SUBJECT, metricsJson)
if err != nil {
errCh <- err
}
log.Printf("Trivy report with ID:%s has been published\n", metrics.ID)
errCh <- nil
}
33 changes: 32 additions & 1 deletion client/pkg/clickhouse/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DBInterface interface {
InsertKubvizEvent(model.Metrics)
InsertGitEvent(string)
InsertKubeScoreMetrics(model.KubeScoreRecommendations)
InsertTrivyImageMetrics(metrics model.TrivyImage)
InsertTrivyMetrics(metrics model.Trivy)
RetriveKetallEvent() ([]model.Resource, error)
RetriveOutdatedEvent() ([]model.CheckResultfinal, error)
Expand Down Expand Up @@ -69,7 +70,7 @@ func NewDBClient(conf *config.Config) (DBInterface, error) {
return nil, err
}

tables := []DBStatement{kubvizTable, rakeesTable, kubePugDepricatedTable, kubepugDeletedTable, ketallTable, outdateTable, clickhouseExperimental, containerDockerhubTable, containerGithubTable, kubescoreTable, trivyTableVul, trivyTableMisconfig, dockerHubBuildTable, DBStatement(dbstatement.AzureDevopsTable), DBStatement(dbstatement.GithubTable), DBStatement(dbstatement.GitlabTable), DBStatement(dbstatement.BitbucketTable), DBStatement(dbstatement.GiteaTable)}
tables := []DBStatement{kubvizTable, rakeesTable, kubePugDepricatedTable, kubepugDeletedTable, ketallTable, trivyTableImage, outdateTable, clickhouseExperimental, containerDockerhubTable, containerGithubTable, kubescoreTable, trivyTableVul, trivyTableMisconfig, dockerHubBuildTable, DBStatement(dbstatement.AzureDevopsTable), DBStatement(dbstatement.GithubTable), DBStatement(dbstatement.GitlabTable), DBStatement(dbstatement.BitbucketTable), DBStatement(dbstatement.GiteaTable)}
for _, table := range tables {
if err = splconn.Exec(context.Background(), string(table)); err != nil {
return nil, err
Expand Down Expand Up @@ -352,7 +353,37 @@ func (c *DBClient) InsertTrivyMetrics(metrics model.Trivy) {
}

}
func (c *DBClient) InsertTrivyImageMetrics(metrics model.TrivyImage) {
for _, result := range metrics.Report.Results {
for _, vulnerability := range result.Vulnerabilities {
var (
tx, _ = c.conn.Begin()
stmt, _ = tx.Prepare(InsertTrivyImage)
)
if _, err := stmt.Exec(
metrics.ID,
metrics.ClusterName,
metrics.Report.ArtifactName,
vulnerability.VulnerabilityID,
vulnerability.PkgID,
vulnerability.PkgName,
vulnerability.InstalledVersion,
vulnerability.FixedVersion,
vulnerability.Title,
vulnerability.Severity,
vulnerability.PublishedDate,
vulnerability.LastModifiedDate,
); err != nil {
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
stmt.Close()
}

}
}
func (c *DBClient) Close() {
_ = c.conn.Close()
}
Expand Down
18 changes: 17 additions & 1 deletion client/pkg/clickhouse/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,22 @@ const trivyTableMisconfig DBStatement = `
misconfig_status String
) engine=File(TabSeparated)
`

const trivyTableImage DBStatement = `
CREATE TABLE IF NOT EXISTS trivyimage (
id UUID,
cluster_name String,
artifact_name String,
vul_id String,
vul_pkg_id String,
vul_pkg_name String,
vul_installed_version String,
vul_fixed_version String,
vul_title String,
vul_severity String,
vul_published_date DateTime('UTC'),
vul_last_modified_date DateTime('UTC')
) engine=File(TabSeparated)
`
const dockerHubBuildTable DBStatement = `
CREATE TABLE IF NOT EXISTS dockerhubbuild (
PushedBy String,
Expand All @@ -143,4 +158,5 @@ const containerDockerhubTable DBStatement = `CREATE table IF NOT EXISTS containe
const containerGithubTable DBStatement = `CREATE table IF NOT EXISTS container_github(event JSON) ENGINE = MergeTree ORDER BY tuple();`
const InsertKubeScore string = "INSERT INTO kubescore (id, namespace, cluster_name, recommendations) VALUES (?, ?, ?, ?)"
const InsertTrivyVul string = "INSERT INTO trivy_vul (id, cluster_name, namespace, kind, name, vul_id, vul_vendor_ids, vul_pkg_id, vul_pkg_name, vul_pkg_path, vul_installed_version, vul_fixed_version, vul_title, vul_severity, vul_published_date, vul_last_modified_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?. ?)"
const InsertTrivyImage string = "INSERT INTO trivyimage (id, cluster_name, artifact_name, vul_id, vul_pkg_id, vul_pkg_name, vul_installed_version, vul_fixed_version, vul_title, vul_severity, vul_published_date, vul_last_modified_date) VALUES ( ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
const InsertTrivyMisconfig string = "INSERT INTO trivy_misconfig (id, cluster_name, namespace, kind, name, misconfig_id, misconfig_avdid, misconfig_type, misconfig_title, misconfig_desc, misconfig_msg, misconfig_query, misconfig_resolution, misconfig_severity, misconfig_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?. ?, ?)"
18 changes: 17 additions & 1 deletion client/pkg/clients/kubviz_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package clients

import (
"encoding/json"
"log"

"github.com/intelops/kubviz/constants"
"github.com/nats-io/nats.go"
"log"

"github.com/intelops/kubviz/client/pkg/clickhouse"
"github.com/intelops/kubviz/model"
Expand Down Expand Up @@ -97,6 +98,21 @@ func (n *NATSContext) SubscribeAllKubvizNats(conn clickhouse.DBInterface) {
log.Println()
},
},
{
Subject: constants.TRIVY_IMAGE_SUBJECT,
Consumer: constants.Trivy_Image_Consumer,
Handler: func(msg *nats.Msg) {
msg.Ack()
var metrics model.TrivyImage
err := json.Unmarshal(msg.Data, &metrics)
if err != nil {
log.Fatal(err)
}
log.Printf("Trivy Metrics Received: %#v,", metrics)
conn.InsertTrivyImageMetrics(metrics)
log.Println()
},
},
{
Subject: constants.KubvizSubject,
Consumer: constants.KubvizConsumer,
Expand Down
2 changes: 2 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ const (
KubvizConsumer = "KUBVIZ_EVENTS_CONSUMER"
KubscoreConsumer = "KUBSCORE_CONSUMER"
TrivyConsumer = "TRIVY_CONSUMER"
TRIVY_IMAGE_SUBJECT = "METRICS.trivyimage"
Trivy_Image_Consumer = "TRIVY_IMAGE_CONSUMER"
)
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/gin-gonic/gin v1.9.1
github.com/go-chi/chi/v5 v5.0.8
github.com/go-playground/webhooks/v6 v6.2.0
github.com/go-co-op/gocron v1.30.1
github.com/google/uuid v1.3.0
github.com/hashicorp/go-version v1.6.0
github.com/kelseyhightower/envconfig v1.4.0
Expand Down Expand Up @@ -105,6 +105,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
Expand Down
Loading

0 comments on commit 78bf20f

Please sign in to comment.