Skip to content

Commit

Permalink
send alert to elk adapter directly from relay buffer,and handle inter…
Browse files Browse the repository at this point in the history
…rupts gracefully

Signed-off-by: rksharma95 <ramakant@accuknox.com>
  • Loading branch information
rksharma95 committed Dec 18, 2024
1 parent a93cd44 commit 7ed8440
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 57 deletions.
56 changes: 11 additions & 45 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -15,9 +16,8 @@ import (
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/google/uuid"
pb "github.com/kubearmor/KubeArmor/protobuf"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
"github.com/kubearmor/kubearmor-relay-server/relay-server/server"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -28,7 +28,6 @@ var (

// ElasticsearchClient Structure
type ElasticsearchClient struct {
kaClient *server.LogClient
esClient *elasticsearch.Client
cancel context.CancelFunc
bulkIndexer esutil.BulkIndexer
Expand All @@ -39,7 +38,7 @@ type ElasticsearchClient struct {
// NewElasticsearchClient creates a new Elasticsearch client with the given Elasticsearch URL
// and kubearmor LogClient with endpoint. It has a retry mechanism for certain HTTP status codes and a backoff function for retry delays.
// It then creates a new NewBulkIndexer with the esClient
func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error) {
func NewElasticsearchClient(esURL string) (*ElasticsearchClient, error) {
retryBackoff := backoff.NewExponentialBackOff()
cfg := elasticsearch.Config{
Addresses: []string{esURL},
Expand Down Expand Up @@ -70,8 +69,7 @@ func NewElasticsearchClient(esURL, Endpoint string) (*ElasticsearchClient, error
log.Fatalf("Error creating the indexer: %s", err)
}
alertCh := make(chan interface{}, 10000)
kaClient := server.NewClient(Endpoint)
return &ElasticsearchClient{kaClient: kaClient, bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil
return &ElasticsearchClient{bulkIndexer: bi, esClient: esClient, alertCh: alertCh}, nil
}

// bulkIndex takes an interface and index name and adds the data to the Elasticsearch bulk indexer.
Expand Down Expand Up @@ -109,71 +107,39 @@ func (ecl *ElasticsearchClient) bulkIndex(a interface{}, index string) {
}
}

func (ecl *ElasticsearchClient) SendAlertToBuffer(alert *pb.Alert) {
ecl.alertCh <- alert
}

// Start starts the Elasticsearch client by performing a health check on the gRPC server
// and starting goroutines to consume messages from the alert channel and bulk index them.
// The method starts a goroutine for each stream and waits for messages to be received.
// Additional goroutines consume alert from the alert channel and bulk index them.
func (ecl *ElasticsearchClient) Start() error {
start = time.Now()
client := ecl.kaClient
ecl.ctx, ecl.cancel = context.WithCancel(context.Background())
client.WgServer = &errgroup.Group{}
client.Context = ecl.ctx
// do healthcheck
if ok := client.DoHealthCheck(); !ok {
return fmt.Errorf("failed to check the liveness of the gRPC server")
}
kg.Printf("Checked the liveness of the gRPC server")

client.WgServer.Go(func() error {
for client.Running {
res, err := client.AlertStream.Recv()
if err != nil {
return fmt.Errorf("failed to receive an alert (%s) %s", client.Server, err)
}
tel, _ := json.Marshal(res)
fmt.Printf("%s\n", string(tel))

select {
case ecl.alertCh <- res:
case <-client.Context.Done():
// The context is over, stop processing results
return nil
default:
//not able to add it to Log buffer
}
}
return nil
})
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
for {
select {
case alert := <-ecl.alertCh:
ecl.bulkIndex(alert, "alert")
case <-ecl.ctx.Done():
close(ecl.alertCh)
return
}
}
}()
}
wg.Wait()
return nil
}

// Stop stops the Elasticsearch client and performs necessary cleanup operations.
// It stops the Kubearmor Relay client, closes the BulkIndexer and cancels the context.
func (ecl *ElasticsearchClient) Stop() error {
logClient := ecl.kaClient
logClient.Running = false
time.Sleep(2 * time.Second)

//Destoy KubeArmor Relay Client
if err := logClient.DestroyClient(); err != nil {
return fmt.Errorf("failed to destroy the kubearmor relay gRPC client (%s)", err.Error())
}
kg.Printf("Destroyed kubearmor relay gRPC client")

//Close BulkIndexer
if err := ecl.bulkIndexer.Close(ecl.ctx); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions relay-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ func main() {

// check and start an elasticsearch client
if enableEsDashboards == "true" {
esCl, err := elasticsearch.NewElasticsearchClient(esUrl, endPoint)
esCl, err := elasticsearch.NewElasticsearchClient(esUrl)
if err != nil {
kg.Warnf("Failed to start a Elasticsearch Client")
return
}
go esCl.Start()
defer esCl.Stop()
relayServer.ELKClient = esCl
go relayServer.ELKClient.Start()
defer relayServer.ELKClient.Stop()
}

// == //
Expand Down
26 changes: 17 additions & 9 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/status"

cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config"
"github.com/kubearmor/kubearmor-relay-server/relay-server/elasticsearch"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
)

Expand Down Expand Up @@ -495,6 +496,9 @@ func (rs *RelayServer) AddAlertFromBuffChan() {
tel, _ := json.Marshal(alert)
fmt.Printf("%s\n", string(tel))
}
if rs.ELKClient != nil {
rs.ELKClient.SendAlertToBuffer(alert)
}
AlertLock.RLock()
for uid := range AlertStructs {
select {
Expand Down Expand Up @@ -591,6 +595,9 @@ type RelayServer struct {

// wait group
WgServer sync.WaitGroup

// ELK adapter
ELKClient *elasticsearch.ElasticsearchClient
}

// LogBufferChannel store incoming data from log stream in buffer
Expand Down Expand Up @@ -681,7 +688,6 @@ func (rs *RelayServer) DestroyRelayServer() error {

// wait for other routines
rs.WgServer.Wait()

return nil
}

Expand Down Expand Up @@ -713,7 +719,6 @@ func DeleteClientEntry(nodeIP string) {
// =============== //

func connectToKubeArmor(nodeID, port string) error {

nodeIP, err := extractIP(nodeID)
if err != nil {
return err
Expand Down Expand Up @@ -787,7 +792,6 @@ func connectToKubeArmor(nodeID, port string) error {

kg.Printf("Destroyed the client (%s)", server)
}

return nil
}

Expand Down Expand Up @@ -815,13 +819,17 @@ func (rs *RelayServer) GetFeedsFromNodes() {
}

for Running {
ip := <-ipsChan
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
select {
case ip := <-ipsChan:
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
case <-time.After(time.Second):
// no op
}
ClientListLock.Unlock()
}
}
}

0 comments on commit 7ed8440

Please sign in to comment.