Skip to content

Commit

Permalink
Add otel (#330)
Browse files Browse the repository at this point in the history
* add otel exporter for traces

* add span between request, events and actions

* add otel traces for notifiers, add extra data on output

* fix otel connection timeout

* add context fetching in traces, add status and additional attributes for traces

* refactor context enricher helper function

* add event.traceid after generating traceId

* fix error handling behavior, add checker for otel enablement before creating exporter

* rename falcoContext to talonContext, small changes on config defaults, update config_example with OTEL

---------

Signed-off-by: Thomas Labarussias <issif+github@gadz.org>
Co-authored-by: Thomas Labarussias <issif+github@gadz.org>
  • Loading branch information
IgorEulalio and Issif authored Jul 31, 2024
1 parent 5a66d6f commit b9964be
Show file tree
Hide file tree
Showing 21 changed files with 1,914 additions and 69 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ dist/
Memory
.idea/**
tmp
DO-NOT-COMMIT-local-setup.yaml
DO-NOT-COMMIT-local-setup.yaml
deployment/compose/minio
.env
108 changes: 82 additions & 26 deletions actionners/actionners.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package actionners

import (
"context"
"encoding/json"
"fmt"
"reflect"
"runtime"

"go.opentelemetry.io/otel/codes"

"github.com/falco-talon/falco-talon/internal/otlp/traces"

lambdaInvoke "github.com/falco-talon/falco-talon/actionners/aws/lambda"
"github.com/falco-talon/falco-talon/outputs"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

calicoNetworkpolicy "github.com/falco-talon/falco-talon/actionners/calico/networkpolicy"
ciliumNetworkPolicy "github.com/falco-talon/falco-talon/actionners/cilium/networkpolicy"
k8sCordon "github.com/falco-talon/falco-talon/actionners/kubernetes/cordon"
Expand All @@ -25,12 +35,13 @@ import (
aws "github.com/falco-talon/falco-talon/internal/aws/client"
calico "github.com/falco-talon/falco-talon/internal/calico/client"
cilium "github.com/falco-talon/falco-talon/internal/cilium/client"
"github.com/falco-talon/falco-talon/internal/context"
talonContext "github.com/falco-talon/falco-talon/internal/context"
"github.com/falco-talon/falco-talon/internal/events"
k8sChecks "github.com/falco-talon/falco-talon/internal/kubernetes/checks"
k8s "github.com/falco-talon/falco-talon/internal/kubernetes/client"
"github.com/falco-talon/falco-talon/internal/nats"
"github.com/falco-talon/falco-talon/internal/otlp/metrics"
"github.com/falco-talon/falco-talon/internal/rules"
"github.com/falco-talon/falco-talon/metrics"
"github.com/falco-talon/falco-talon/notifiers"
"github.com/falco-talon/falco-talon/outputs/model"
"github.com/falco-talon/falco-talon/utils"
Expand Down Expand Up @@ -192,9 +203,10 @@ func GetDefaultActionners() *Actionners {
Checks: []checkActionner{
k8sChecks.CheckPodExist,
},
CheckParameters: k8sTcpdump.CheckParameters,
Action: k8sTcpdump.Action,
RequireOutput: true,
AllowAdditionalContexts: true,
CheckParameters: k8sTcpdump.CheckParameters,
Action: k8sTcpdump.Action,
RequireOutput: true,
},
&Actionner{
Category: "aws",
Expand Down Expand Up @@ -330,7 +342,9 @@ func (actionner *Actionner) AllowAdditionalContext() bool {
return actionner.AllowAdditionalContexts
}

func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) error {
func runAction(ictx context.Context, rule *rules.Rule, action *rules.Action, event *events.Event) (err error) {
tracer := traces.GetTracer()

actionners := GetActionners()
if actionners == nil {
return nil
Expand All @@ -348,7 +362,7 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
if rule.DryRun == trueStr {
log.Output = "no action, dry-run is enabled"
utils.PrintLog("info", log)
return nil
return err
}

actionner := actionners.FindActionner(action.GetActionner())
Expand All @@ -360,15 +374,30 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro

if checks := actionner.Checks; len(checks) != 0 {
for _, i := range checks {
if err := i(event, action); err != nil {
_, span := tracer.Start(ictx, "checks",
trace.WithAttributes(attribute.String("check.name", runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name())))
if err = i(event, action); err != nil {
span.SetStatus(codes.Error, "Failed to run checks")
span.RecordError(err)
log.Error = err.Error()
utils.PrintLog("error", log)
span.End()
return err
}
span.End()
}
}

ctx, span := tracer.Start(ictx, "action",
trace.WithAttributes(attribute.String("action.name", action.Name)),
trace.WithAttributes(attribute.String("action.actionner", action.Actionner)),
trace.WithAttributes(attribute.String("action.description", action.Description)),
)
defer span.End()
result, data, err := actionner.Action(action, event)
span.SetAttributes(attribute.String("action.result", result.Status))
span.SetAttributes(attribute.String("action.output", result.Output))

log.Status = result.Status
if len(result.Objects) != 0 {
log.Objects = result.Objects
Expand All @@ -388,13 +417,16 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
metrics.IncreaseCounter(log)

if err != nil {
span.SetStatus(codes.Error, "Failed to run action")
span.RecordError(err)
utils.PrintLog("error", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
span.SetStatus(codes.Ok, "Action completed successfully")

utils.PrintLog("info", log)
notifiers.Notify(rule, action, event, log)
ctx = notifiers.Notify(ctx, rule, action, event, log)

if actionner.IsOutputRequired() {
log = utils.LogLine{
Expand All @@ -412,7 +444,7 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
log.Error = err.Error()
utils.PrintLog("error", log)
metrics.IncreaseCounter(log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
target := output.GetTarget()
Expand All @@ -422,7 +454,7 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
log.Error = err.Error()
utils.PrintLog("error", log)
metrics.IncreaseCounter(log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}

Expand All @@ -435,12 +467,17 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
log.Status = "failure"
utils.PrintLog("error", log)
metrics.IncreaseCounter(log)
notifiers.Notify(rule, action, event, log)
return err2
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
}
}

ctx, span = tracer.Start(ctx, "output",
trace.WithAttributes(attribute.String("output.name", o.GetName())),
trace.WithAttributes(attribute.String("output.category", o.GetCategory())),
trace.WithAttributes(attribute.String("output.target", output.GetTarget())),
)
defer span.End()
result, err = o.Output(output, data)
log.Status = result.Status
log.Objects = result.Objects
Expand All @@ -454,13 +491,18 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
metrics.IncreaseCounter(log)

if err != nil {
span.SetStatus(codes.Error, "Failed to run output")
span.RecordError(err)
utils.PrintLog("error", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
span.SetStatus(codes.Ok, "Output completed successfully")
span.SetAttributes(attribute.String("output.status", result.Status))
span.SetAttributes(attribute.String("output.message", result.Output))

utils.PrintLog("info", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return nil
}

Expand All @@ -470,7 +512,7 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
log.Error = err.Error()
utils.PrintLog("error", log)
metrics.IncreaseCounter(log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
log = utils.LogLine{
Expand All @@ -485,10 +527,16 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
err = fmt.Errorf("unknown target '%v'", target)
log.Error = err.Error()
utils.PrintLog("error", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
log.Target = target
ctx, span = tracer.Start(ctx, "output",
trace.WithAttributes(attribute.String("output.name", o.GetName())),
trace.WithAttributes(attribute.String("output.category", o.GetCategory())),
trace.WithAttributes(attribute.String("output.target", output.GetTarget())),
)
defer span.End()
result, err = o.Output(output, data)
log.Status = result.Status
log.Objects = result.Objects
Expand All @@ -502,25 +550,32 @@ func runAction(rule *rules.Rule, action *rules.Action, event *events.Event) erro
metrics.IncreaseCounter(log)

if err != nil {
span.SetStatus(codes.Error, "Failed to run output")
span.RecordError(err)
utils.PrintLog("error", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return err
}
span.SetStatus(codes.Ok, "Output completed successfully")
span.SetAttributes(attribute.String("output.status", result.Status))
span.SetAttributes(attribute.String("output.message", result.Output))

utils.PrintLog("info", log)
notifiers.Notify(rule, action, event, log)
_ = notifiers.Notify(ctx, rule, action, event, log)
return nil
}

return nil
}

func StartConsumer(eventsC <-chan string) {
func StartConsumer(eventsC <-chan nats.MessageWithContext) {
config := configuration.GetConfiguration()
for {
e := <-eventsC
m := <-eventsC
e := m.Data
ctx := m.Ctx
var event *events.Event
err := json.Unmarshal([]byte(e), &event)
err := json.Unmarshal(e, &event)
if err != nil {
continue
}
Expand Down Expand Up @@ -567,7 +622,7 @@ func StartConsumer(eventsC <-chan string) {
if GetDefaultActionners().FindActionner(a.GetActionner()).AllowAdditionalContext() &&
len(a.GetAdditionalContexts()) != 0 {
for _, i := range a.GetAdditionalContexts() {
elements, err := context.GetContext(i, e)
elements, err := talonContext.GetContext(ctx, i, e)
if err != nil {
log := utils.LogLine{
Message: "context",
Expand All @@ -584,7 +639,8 @@ func StartConsumer(eventsC <-chan string) {
}
}
}
if err := runAction(i, a, e); err != nil && a.IgnoreErrors == falseStr {
err := runAction(ctx, i, a, e)
if err != nil && a.IgnoreErrors == falseStr {
break
}
if a.Continue == falseStr || a.Continue != trueStr && !GetDefaultActionners().FindActionner(a.GetActionner()).MustDefaultContinue() {
Expand Down
1 change: 1 addition & 0 deletions actionners/kubernetes/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func Action(action *rules.Action, event *events.Event) (utils.LogLine, *model.Da

for _, p := range pods.Items {
wg.Add(1)
p := p // loopclosure: loop variable p captured by func literal
go func() {
defer wg.Done()

Expand Down
51 changes: 42 additions & 9 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package cmd

import (
"context"
"fmt"
"net/http"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/falco-talon/falco-talon/internal/handler"
"github.com/falco-talon/falco-talon/internal/otlp/metrics"
"github.com/falco-talon/falco-talon/internal/otlp/traces"

"github.com/fsnotify/fsnotify"

"github.com/falco-talon/falco-talon/actionners"
"github.com/falco-talon/falco-talon/configuration"
"github.com/falco-talon/falco-talon/internal/handler"
k8s "github.com/falco-talon/falco-talon/internal/kubernetes/client"
"github.com/falco-talon/falco-talon/internal/nats"
ruleengine "github.com/falco-talon/falco-talon/internal/rules"
"github.com/falco-talon/falco-talon/metrics"
"github.com/falco-talon/falco-talon/notifiers"
"github.com/falco-talon/falco-talon/outputs"
"github.com/falco-talon/falco-talon/utils"
Expand Down Expand Up @@ -105,11 +110,6 @@ var serverCmd = &cobra.Command{
utils.PrintLog("info", utils.LogLine{Result: fmt.Sprintf("%v rule(s) has/have been successfully loaded", len(*rules)), Message: "init"})
}

http.HandleFunc("/", handler.MainHandler)
http.HandleFunc("/healthz", handler.HealthHandler)
http.HandleFunc("/rules", handler.RulesHandler)
http.Handle("/metrics", metrics.Handler())

if config.WatchRules {
utils.PrintLog("info", utils.LogLine{Result: "watch of rules enabled", Message: "init"})
}
Expand All @@ -118,7 +118,7 @@ var serverCmd = &cobra.Command{
Addr: fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort),
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
Handler: nil,
Handler: newHTTPHandler(),
}

if config.WatchRules {
Expand Down Expand Up @@ -212,7 +212,6 @@ var serverCmd = &cobra.Command{
}
}()
}

// start the local NATS
ns, err := nats.StartServer(config.Deduplication.TimeWindowSeconds)
if err != nil {
Expand Down Expand Up @@ -247,19 +246,53 @@ var serverCmd = &cobra.Command{

// start the consumer for the actionners
c, err := nats.GetConsumer().ConsumeMsg()

if err != nil {
utils.PrintLog("fatal", utils.LogLine{Error: err.Error(), Message: "nats"})
}
go actionners.StartConsumer(c)

utils.PrintLog("info", utils.LogLine{Result: fmt.Sprintf("Falco Talon is up and listening on %s:%d", config.ListenAddress, config.ListenPort), Message: "http"})

ctx := context.Background()
otelShutdown, err := traces.SetupOTelSDK(ctx)
if err != nil {
utils.PrintLog("warn", utils.LogLine{Error: err.Error(), Message: "otel-traces"})
}
defer func() {
if err := otelShutdown(ctx); err != nil {
utils.PrintLog("warn", utils.LogLine{Error: err.Error(), Message: "otel-traces"})
}
}()

if err := srv.ListenAndServe(); err != nil {
utils.PrintLog("fatal", utils.LogLine{Error: err.Error(), Message: "http"})
}
},
}

func newHTTPHandler() http.Handler {
mux := http.NewServeMux()
mux.Handle("/metrics", metrics.Handler())

handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
otelHandler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc))
mux.Handle(pattern, otelHandler)
}

handleFunc("/", handler.MainHandler)
handleFunc("/healthz", handler.HealthHandler)
handleFunc("/rules", handler.RulesHandler)

otelHandler := otelhttp.NewHandler(
mux,
"/",
otelhttp.WithFilter(func(req *http.Request) bool {
return req.URL.Path == "/"
}))
return otelHandler
}

func init() {
RootCmd.AddCommand(serverCmd)
}
Loading

0 comments on commit b9964be

Please sign in to comment.