From e4b1b89c5619727fc0316f9ae09c5a6fc13c2e5a Mon Sep 17 00:00:00 2001 From: Thomas Labarussias Date: Wed, 31 Jan 2024 15:58:10 +0100 Subject: [PATCH] add index template creation for elasticsearch Signed-off-by: Thomas Labarussias --- README.md | 19 +- actionners/kubernetes/exec/exec.go | 4 +- actionners/kubernetes/labelize/labelize.go | 4 +- actionners/kubernetes/log/log.go | 4 +- .../kubernetes/networkpolicy/networkpolicy.go | 4 +- actionners/kubernetes/script/script.go | 4 +- actionners/kubernetes/terminate/terminate.go | 4 +- notifiers/elasticsearch/elasticsearch.go | 47 +++- notifiers/elasticsearch/mapping.go | 203 ++++++++++-------- notifiers/http/client.go | 13 +- notifiers/k8sevents/k8sevents.go | 6 +- notifiers/loki/loki.go | 2 +- notifiers/slack/slack.go | 2 +- notifiers/webhook/webhook.go | 2 +- utils/utils.go | 27 ++- 15 files changed, 209 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index 67850e66..aaf30a3c 100644 --- a/README.md +++ b/README.md @@ -163,14 +163,17 @@ Results: ### Elasticsearch -| Setting | Default | Description | -| ---------------- | ------------- | --------------------------------------------------------------------------------- | -| `host_port` | n/a | http://{domain or ip}:{port} | -| `user` | n/a | User for Grafana Logs | -| `password` | n/a | Password for Grafana Logs | -| `index` | `falco-talon` | Elasticsearch index | -| `suffix` | `daily` | Date suffix for index rotation : `daily` (default), `monthly`, `annually`, `none` | -| `custom_headers` | n/a | Custom HTTP Headers | +| Setting | Default | Description | +| ----------------------- | ------------- | --------------------------------------------------------------------------------- | +| `host_port` | n/a | http://{domain or ip}:{port} | +| `user` | n/a | User for Grafana Logs | +| `password` | n/a | Password for Grafana Logs | +| `index` | `falco-talon` | Elasticsearch index | +| `suffix` | `daily` | Date suffix for index rotation : `daily` (default), `monthly`, `annually`, `none` | +| `create_index_template` | `true` | Create the index template at the init if it doesn't exist | +| `number_of_shards` | `3` | Number of shards for the index (if `create_index_template` is `true`) | +| `number_of_replicas` | `3` | Number of replicas for the index (if `create_index_template` is `true`) | +| `custom_headers` | n/a | Custom HTTP Headers | ### SMTP diff --git a/actionners/kubernetes/exec/exec.go b/actionners/kubernetes/exec/exec.go index ae40eaaf..2637c440 100644 --- a/actionners/kubernetes/exec/exec.go +++ b/actionners/kubernetes/exec/exec.go @@ -21,8 +21,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": pod, - "Namespace": namespace, + "pod": pod, + "namespace": namespace, } parameters := action.GetParameters() diff --git a/actionners/kubernetes/labelize/labelize.go b/actionners/kubernetes/labelize/labelize.go index 1c0bb401..7e5f21d4 100644 --- a/actionners/kubernetes/labelize/labelize.go +++ b/actionners/kubernetes/labelize/labelize.go @@ -30,8 +30,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": pod, - "Namespace": namespace, + "pod": pod, + "namespace": namespace, } payload := make([]patch, 0) diff --git a/actionners/kubernetes/log/log.go b/actionners/kubernetes/log/log.go index 11fffe9c..b57c5693 100644 --- a/actionners/kubernetes/log/log.go +++ b/actionners/kubernetes/log/log.go @@ -19,8 +19,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": pod, - "Namespace": namespace, + "pod": pod, + "namespace": namespace, } parameters := action.GetParameters() diff --git a/actionners/kubernetes/networkpolicy/networkpolicy.go b/actionners/kubernetes/networkpolicy/networkpolicy.go index 5d7f43b2..88e30a1c 100644 --- a/actionners/kubernetes/networkpolicy/networkpolicy.go +++ b/actionners/kubernetes/networkpolicy/networkpolicy.go @@ -21,8 +21,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": podName, - "Namespace": namespace, + "pod": podName, + "namespace": namespace, } client := kubernetes.GetClient() diff --git a/actionners/kubernetes/script/script.go b/actionners/kubernetes/script/script.go index ba1838b8..d486efe4 100644 --- a/actionners/kubernetes/script/script.go +++ b/actionners/kubernetes/script/script.go @@ -23,8 +23,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": pod, - "Namespace": namespace, + "pod": pod, + "namespace": namespace, } parameters := action.GetParameters() diff --git a/actionners/kubernetes/terminate/terminate.go b/actionners/kubernetes/terminate/terminate.go index b56ee119..31522640 100644 --- a/actionners/kubernetes/terminate/terminate.go +++ b/actionners/kubernetes/terminate/terminate.go @@ -20,8 +20,8 @@ var Action = func(rule *rules.Rule, action *rules.Action, event *events.Event) ( namespace := event.GetNamespaceName() objects := map[string]string{ - "Pod": podName, - "Namespace": namespace, + "pod": podName, + "namespace": namespace, } parameters := action.GetParameters() diff --git a/notifiers/elasticsearch/elasticsearch.go b/notifiers/elasticsearch/elasticsearch.go index fe81c393..2acca2aa 100644 --- a/notifiers/elasticsearch/elasticsearch.go +++ b/notifiers/elasticsearch/elasticsearch.go @@ -1,7 +1,10 @@ package elasticsearch import ( + "encoding/json" "errors" + "fmt" + "strings" "time" "github.com/Issif/falco-talon/notifiers/http" @@ -9,15 +12,19 @@ import ( ) type Settings struct { - CustomHeaders map[string]string `field:"custom_headers"` - URL string `field:"url"` - User string `field:"user"` - Password string `field:"password"` - Suffix string `field:"suffix" default:"daily"` - Index string `field:"index" default:"falco-talon"` + CustomHeaders map[string]string `field:"custom_headers"` + URL string `field:"url"` + User string `field:"user"` + Password string `field:"password"` + Suffix string `field:"suffix" default:"daily"` + CreateIndexTemplate bool `field:"create_index_template" default:"true"` + NumberOfShards int `field:"number_of_shards" default:"3"` + NumberOfReplicas int `field:"number_of_replicas" default:"3"` + Index string `field:"index" default:"falco-talon"` } const docType string = "/_doc" +const indexTemplate string = "/_index_template/falco-talon" var settings *Settings @@ -27,6 +34,26 @@ var Init = func(fields map[string]interface{}) error { if err := checkSettings(settings); err != nil { return err } + if settings.CreateIndexTemplate { + client := http.NewClient("GET", "", "", settings.CustomHeaders) + if settings.User != "" && settings.Password != "" { + client.SetBasicAuth(settings.User, settings.Password) + } + if err := client.Request(settings.URL+indexTemplate, nil); err != nil { + if err.Error() == "resource not found" { + client.SetHTTPMethod("PUT") + m := strings.ReplaceAll(mapping, "${SHARDS}", fmt.Sprintf("%v", settings.NumberOfShards)) + m = strings.ReplaceAll(m, "${REPLICAS}", fmt.Sprintf("%v", settings.NumberOfReplicas)) + j := make(map[string]interface{}) + if err := json.Unmarshal([]byte(m), &j); err != nil { + return err + } + if err := client.Request(settings.URL+indexTemplate, j); err != nil { + return err + } + } + } + } return nil } @@ -49,7 +76,7 @@ var Notify = func(log utils.LogLine) error { log.Time = time.Now().Format(time.RFC3339) - if err := client.Post(u, log); err != nil { + if err := client.Request(u, log); err != nil { return err } @@ -60,6 +87,12 @@ func checkSettings(settings *Settings) error { if settings.URL == "" { return errors.New("wrong `url` setting") } + if settings.NumberOfShards < 1 { + return errors.New("wrong `number_of_shards` setting") + } + if settings.NumberOfReplicas < 1 { + return errors.New("wrong `number_of_replcicas` setting") + } if err := http.CheckURL(settings.URL); err != nil { return err diff --git a/notifiers/elasticsearch/mapping.go b/notifiers/elasticsearch/mapping.go index bd65d678..0eff5838 100644 --- a/notifiers/elasticsearch/mapping.go +++ b/notifiers/elasticsearch/mapping.go @@ -2,74 +2,18 @@ package elasticsearch var mapping = ` { - "mappings": { - "properties": { - "action": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 128 - } - } - }, - "actionner": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 128 - } - } - }, - "error": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "event": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 1024 - } - } + "index_patterns": ["falco-talon*"], + "template": { + "settings": { + "number_of_shards": ${SHARDS}, + "number_of_replicas": ${REPLICAS} }, - "message": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 128 - } - } - }, - "output": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 2048 - } - } - }, - "result": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 2048 - } - } - }, - "objects": { + "mappings": { + "_source": { + "enabled": true + }, "properties": { - "Namespace": { + "action": { "type": "text", "fields": { "keyword": { @@ -78,7 +22,7 @@ var mapping = ` } } }, - "Pod": { + "actionner": { "type": "text", "fields": { "keyword": { @@ -86,37 +30,106 @@ var mapping = ` "ignore_above": 128 } } - } - } - }, - "rule": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 128 - } - } - }, - "status": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 128 - } - } - }, - "trace_id": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "error": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "event": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "message": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "output": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 2048 + } + } + }, + "result": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 2048 + } + } + }, + "objects": { + "properties": { + "namespace": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "pod": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + } + } + }, + "rule": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "status": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "trace_id": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } } } } + }, + "_meta": { + "description": "index template for falco talon logs" } } -} ` diff --git a/notifiers/http/client.go b/notifiers/http/client.go index 722de66f..b73f8ef4 100644 --- a/notifiers/http/client.go +++ b/notifiers/http/client.go @@ -59,6 +59,10 @@ func DefaultClient() Client { } } +func (c *Client) SetHTTPMethod(hm string) { + c.HTTPMethod = hm +} + func (c *Client) SetContentType(ct string) { c.Headers.Set("Content-Type", ct) } @@ -106,7 +110,7 @@ func NewClient(httpMethod, contentType, userAgent string, headers map[string]str } } -func (c *Client) Post(u string, payload interface{}) error { +func (c *Client) Request(u string, payload interface{}) error { // defer + recover to catch panic if output doesn't respond config := configuration.GetConfiguration() defer func() { @@ -116,8 +120,11 @@ func (c *Client) Post(u string, payload interface{}) error { }() body := new(bytes.Buffer) - if err := json.NewEncoder(body).Encode(payload); err != nil { - return err + + if c.HTTPMethod != "GET" { + if err := json.NewEncoder(body).Encode(payload); err != nil { + return err + } } client := &http.Client{ diff --git a/notifiers/k8sevents/k8sevents.go b/notifiers/k8sevents/k8sevents.go index abe6bcc8..9b905141 100644 --- a/notifiers/k8sevents/k8sevents.go +++ b/notifiers/k8sevents/k8sevents.go @@ -70,8 +70,8 @@ var Notify = func(log utils.LogLine) error { }, InvolvedObject: corev1.ObjectReference{ Kind: "Pod", - Namespace: log.Objects["Namespace"], - Name: log.Objects["Pod"], + Namespace: log.Objects["namespace"], + Name: log.Objects["pod"], }, Reason: falcoTalon + ":" + log.Actionner + ":" + log.Status, Message: strings.ReplaceAll(message, `'`, `"`), @@ -85,7 +85,7 @@ var Notify = func(log utils.LogLine) error { Action: log.Actionner, } k8sclient := kubernetes.GetClient() - _, err = k8sclient.CoreV1().Events(log.Objects["Namespace"]).Create(context.TODO(), k8sevent, metav1.CreateOptions{}) + _, err = k8sclient.CoreV1().Events(log.Objects["namespace"]).Create(context.TODO(), k8sevent, metav1.CreateOptions{}) if err != nil { return err } diff --git a/notifiers/loki/loki.go b/notifiers/loki/loki.go index 7a160d9a..117a99e0 100644 --- a/notifiers/loki/loki.go +++ b/notifiers/loki/loki.go @@ -61,7 +61,7 @@ var Notify = func(log utils.LogLine) error { client.SetHeader("X-Scope-OrgID", settings.Tenant) } - err := client.Post(settings.HostPort+"/loki/api/v1/push", NewPayload(log)) + err := client.Request(settings.HostPort+"/loki/api/v1/push", NewPayload(log)) if err != nil { return err } diff --git a/notifiers/slack/slack.go b/notifiers/slack/slack.go index 74a70b72..9982b64a 100644 --- a/notifiers/slack/slack.go +++ b/notifiers/slack/slack.go @@ -65,7 +65,7 @@ var Init = func(fields map[string]interface{}) error { var Notify = func(log utils.LogLine) error { client := http.DefaultClient() - err := client.Post(settings.WebhookURL, NewPayload(log)) + err := client.Request(settings.WebhookURL, NewPayload(log)) if err != nil { return err } diff --git a/notifiers/webhook/webhook.go b/notifiers/webhook/webhook.go index 98671fe2..314be9e7 100644 --- a/notifiers/webhook/webhook.go +++ b/notifiers/webhook/webhook.go @@ -43,7 +43,7 @@ var Notify = func(log utils.LogLine) error { config.CustomHeaders, ) - err := client.Post(config.URL, log) + err := client.Request(config.URL, log) if err != nil { return err } diff --git a/utils/utils.go b/utils/utils.go index cba59fa6..718df63a 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -121,7 +121,7 @@ func PrintLog(level, format string, line LogLine) { } if len(line.Objects) > 0 { for i, j := range line.Objects { - l.Str(i, j) + l.Str(strings.ToLower(i), j) } } if line.Error != "" { @@ -147,12 +147,29 @@ func SetFields(structure interface{}, fields map[string]interface{}) interface{} case StringStr: valueOf.Field(i).SetString(fmt.Sprint(fields[field])) case IntStr, Int64Str: - d := int64(fields[field].(int)) - valueOf.Field(i).SetInt(d) + d, err := strconv.Atoi(fmt.Sprintf("%v", fields[field])) + if err == nil { + valueOf.Field(i).SetInt(int64(d)) + } else if deflt != "" { + d, _ := strconv.Atoi(deflt) + valueOf.Field(i).SetInt(int64(d)) + } case FloatStr, Float64Str: - valueOf.Field(i).SetFloat(fields[field].(float64)) + d, err := strconv.ParseFloat(fmt.Sprintf("%v", fields[field]), 64) + if err == nil { + valueOf.Field(i).SetFloat(d) + } else if deflt != "" { + d, _ := strconv.ParseFloat(deflt, 64) + valueOf.Field(i).SetFloat(d) + } case BoolStr: - valueOf.Field(i).SetBool(fields[field].(bool)) + d, err := strconv.ParseBool(fmt.Sprintf("%v", fields[field])) + if err == nil { + valueOf.Field(i).SetBool(d) + } else if deflt != "" { + d, _ := strconv.ParseBool(deflt) + valueOf.Field(i).SetBool(d) + } case MapStringStr: valueOf.Field(i).SetMapIndex(reflect.ValueOf(fields[field]), reflect.ValueOf(fields[field]).Elem()) }