Skip to content

Commit

Permalink
NETOBSERV-1407 - Loki labels for deduper merge (netobserv#529)
Browse files Browse the repository at this point in the history
* manage loki labels for deduper merge

* fix console flags defaults

* manage errors + testing
  • Loading branch information
jpinsonneau authored Jan 25, 2024
1 parent 9d50e16 commit fe8ae8c
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 38 deletions.
40 changes: 25 additions & 15 deletions controllers/consoleplugin/consoleplugin_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/loki"
"github.com/netobserv/network-observability-operator/pkg/volumes"
)

Expand Down Expand Up @@ -310,12 +311,7 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) {
if lconf.URL != statusURL {
lconf.StatusURL = statusURL
}
// check for connection traking to list indexes
indexFields := constants.LokiIndexFields
if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows {
indexFields = append(indexFields, constants.LokiConnectionIndexFields...)
}
lconf.Labels = indexFields
lconf.Labels = loki.GetLokiLabels(b.desired)
lconf.TenantID = b.loki.TenantID
lconf.ForwardUserToken = b.loki.UseForwardToken()
if b.loki.TLS.Enable {
Expand Down Expand Up @@ -347,8 +343,16 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) {
}
}

func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) {
var dedupJustMark, dedupMerge bool
func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error {
var err error
dedupJustMark, err := strconv.ParseBool(ebpf.DedupeJustMarkDefault)
if err != nil {
return err
}
dedupMerge, err := strconv.ParseBool(ebpf.DedupeMergeDefault)
if err != nil {
return err
}
if helper.UseEBPF(b.desired) {
if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) {
fconf.Features = append(fconf.Features, "pktDrop")
Expand All @@ -364,15 +368,17 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) {

if b.desired.Agent.EBPF.Advanced != nil {
if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok {
dedupJustMark, _ = strconv.ParseBool(v)
} else {
dedupJustMark, _ = strconv.ParseBool(ebpf.DedupeJustMarkDefault)
dedupJustMark, err = strconv.ParseBool(v)
if err != nil {
return err
}
}

if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeMerge]; ok {
dedupMerge, _ = strconv.ParseBool(v)
} else {
dedupMerge, _ = strconv.ParseBool(ebpf.DedupeMergeDefault)
dedupMerge, err = strconv.ParseBool(v)
if err != nil {
return err
}
}
}
}
Expand All @@ -385,6 +391,7 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) {
Mark: dedupJustMark,
Merge: dedupMerge,
}
return nil
}

//go:embed config/static-frontend-config.yaml
Expand All @@ -407,7 +414,10 @@ func (b *builder) configMap() (*corev1.ConfigMap, string, error) {
if err != nil {
return nil, "", err
}
b.setFrontendConfig(&config.Frontend)
err = b.setFrontendConfig(&config.Frontend)
if err != nil {
return nil, "", err
}

var configStr string
bs, err := yaml.Marshal(config)
Expand Down
96 changes: 96 additions & 0 deletions controllers/consoleplugin/consoleplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/pkg/helper"
)
Expand Down Expand Up @@ -285,6 +287,100 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) {
assert.NotEqual(old.Data, nEw.Data)
}

func TestConfigMapContent(t *testing.T) {
assert := assert.New(t)

agentSpec := flowslatest.FlowCollectorAgent{
Type: "eBPF",
EBPF: flowslatest.FlowCollectorEBPF{
Sampling: ptr.To(int32(1)),
},
}
lokiSpec := flowslatest.FlowCollectorLoki{
Mode: flowslatest.LokiModeLokiStack,
LokiStack: flowslatest.LokiStackRef{Name: "lokistack", Namespace: "ls-namespace"},
}
loki := helper.NewLokiConfig(&lokiSpec, "any")
spec := flowslatest.FlowCollectorSpec{
Agent: agentSpec,
ConsolePlugin: getPluginConfig(),
Loki: lokiSpec,
}
builder := newBuilder(testNamespace, testImage, &spec, &loki)
cm, _, err := builder.configMap()
assert.NotNil(cm)
assert.Nil(err)

// parse output config and check expected values
var config config.PluginConfig
err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &config)
assert.Nil(err)

// loki config
assert.Equal(config.Loki.URL, "https://lokistack-gateway-http.ls-namespace.svc:8080/api/logs/v1/network/")
assert.Equal(config.Loki.StatusURL, "https://lokistack-query-frontend-http.ls-namespace.svc:3100/")

// frontend params
assert.Equal(config.Frontend.RecordTypes, []string{"flowLog"})
assert.Empty(config.Frontend.Features)
assert.NotEmpty(config.Frontend.Columns)
assert.NotEmpty(config.Frontend.Filters)
assert.Equal(config.Frontend.Sampling, 1)
assert.Equal(config.Frontend.Deduper.Mark, true)
assert.Equal(config.Frontend.Deduper.Merge, false)
}

func TestConfigMapError(t *testing.T) {
assert := assert.New(t)

agentSpec := flowslatest.FlowCollectorAgent{
Type: "eBPF",
EBPF: flowslatest.FlowCollectorEBPF{
Sampling: ptr.To(int32(1)),
Advanced: &flowslatest.AdvancedAgentConfig{
Env: map[string]string{
"DEDUPER_JUST_MARK": "invalid",
},
},
},
}
lokiSpec := flowslatest.FlowCollectorLoki{}
loki := helper.NewLokiConfig(&lokiSpec, "any")

// spec with invalid flag
spec := flowslatest.FlowCollectorSpec{
Agent: agentSpec,
ConsolePlugin: getPluginConfig(),
Loki: lokiSpec,
}
builder := newBuilder(testNamespace, testImage, &spec, &loki)
cm, _, err := builder.configMap()
assert.Nil(cm)
assert.NotNil(err)

// update to valid flags
agentSpec.EBPF.Advanced.Env = map[string]string{
"DEDUPER_JUST_MARK": "false",
"DEDUPER_MERGE": "true",
}
spec = flowslatest.FlowCollectorSpec{
Agent: agentSpec,
ConsolePlugin: getPluginConfig(),
Loki: lokiSpec,
}
builder = newBuilder(testNamespace, testImage, &spec, &loki)
cm, _, err = builder.configMap()
assert.NotNil(cm)
assert.Nil(err)

// parse output config and check expected values
var config config.PluginConfig
err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &config)
assert.Nil(err)
assert.Equal(config.Frontend.Deduper.Mark, false)
assert.Equal(config.Frontend.Deduper.Merge, true)
}

func TestServiceUpdateCheck(t *testing.T) {
assert := assert.New(t)
old := getServiceSpecs()
Expand Down
5 changes: 4 additions & 1 deletion controllers/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
HeartbeatType = "heartbeat"
EndConnectionType = "endConnection"

ClusterNameLabelName = "K8S_ClusterName"

MonitoringNamespace = "openshift-monitoring"
MonitoringServiceAccount = "prometheus-k8s"

Expand All @@ -45,8 +47,9 @@ const (
LokiCRReader = "netobserv-reader"
)

var LokiIndexFields = []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "FlowDirection", "Duplicate"}
var LokiIndexFields = []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type"}
var LokiConnectionIndexFields = []string{"_RecordType"}
var LokiDeduperMarkIndexFields = []string{"FlowDirection", "Duplicate"}
var FlowCollectorName = types.NamespacedName{Name: "cluster"}
var EnvNoHTTP2 = corev1.EnvVar{
Name: "GODEBUG",
Expand Down
7 changes: 7 additions & 0 deletions controllers/flowcollector_controller_certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ func flowCollectorCertificatesSpecs() {
DeploymentModel: flowslatest.DeploymentModelKafka,
Agent: flowslatest.FlowCollectorAgent{
Type: "eBPF",
EBPF: flowslatest.FlowCollectorEBPF{
Advanced: &flowslatest.AdvancedAgentConfig{
Env: map[string]string{
"DEDUPER_JUST_MARK": "true",
},
},
},
},
Loki: flowslatest.FlowCollectorLoki{
Enable: ptr.To(true),
Expand Down
23 changes: 6 additions & 17 deletions controllers/flp/flp_pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ import (
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/pkg/filters"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/loki"
"github.com/netobserv/network-observability-operator/pkg/metrics"
"github.com/netobserv/network-observability-operator/pkg/volumes"
)

const (
clusterNameLabelName = "K8S_ClusterName"
)

type PipelineBuilder struct {
*config.PipelineBuilderStage
desired *flowslatest.FlowCollectorSpec
Expand Down Expand Up @@ -52,15 +49,8 @@ func newPipelineBuilder(

func (b *PipelineBuilder) AddProcessorStages() error {
lastStage := *b.PipelineBuilderStage
indexFields := constants.LokiIndexFields

lastStage = b.addTransformFilter(lastStage)

indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage)

if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment {
indexFields = append(indexFields, clusterNameLabelName)
}
lastStage = b.addConnectionTracking(lastStage)

// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Expand Down Expand Up @@ -88,7 +78,7 @@ func (b *PipelineBuilder) AddProcessorStages() error {
debugConfig := helper.GetAdvancedLokiConfig(b.desired.Loki.Advanced)
if helper.UseLoki(b.desired) {
lokiWrite := api.WriteLoki{
Labels: indexFields,
Labels: loki.GetLokiLabels(b.desired),
BatchSize: int(b.desired.Loki.WriteBatchSize),
BatchWait: helper.UnstructuredDuration(b.desired.Loki.WriteBatchWait),
MaxBackoff: helper.UnstructuredDuration(debugConfig.WriteMaxBackoff),
Expand Down Expand Up @@ -200,7 +190,7 @@ func flowMetricToFLP(flowMetric *metricslatest.FlowMetricSpec) (*api.PromMetrics
return m, nil
}

func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) {
func (b *PipelineBuilder) addConnectionTracking(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage {
outputFields := []api.OutputField{
{
Name: "Bytes",
Expand Down Expand Up @@ -307,7 +297,6 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage

// Connection tracking stage (only if LogTypes is not FLOWS)
if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows {
indexFields = append(indexFields, constants.LokiConnectionIndexFields...)
outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor)
debugConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{
Expand Down Expand Up @@ -342,7 +331,7 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage
},
})
}
return indexFields, lastStage
return lastStage
}

func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage {
Expand All @@ -359,7 +348,7 @@ func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderSta
if clusterName != "" {
transformFilterRules = []api.TransformFilterRule{
{
Input: clusterNameLabelName,
Input: constants.ClusterNameLabelName,
Type: "add_field_if_doesnt_exist",
Value: clusterName,
},
Expand Down
7 changes: 2 additions & 5 deletions controllers/flp/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,6 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiManual(t *testing.T) {
"DstK8S_Namespace",
"DstK8S_OwnerName",
"DstK8S_Type",
"FlowDirection",
"Duplicate",
"_RecordType",
}, lokiCfg.Labels)
assert.Equal(`{app="netobserv-flowcollector"}`, fmt.Sprintf("%v", lokiCfg.StaticLabels))
Expand All @@ -670,7 +668,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) {
ns := "namespace"
cfg := getConfig()
useLokiStack(&cfg)
cfg.Agent.Type = flowslatest.AgentIPFIX
cfg.Agent.Type = flowslatest.AgentEBPF
b := monoBuilder(ns, &cfg)
cm, digest, err := b.configMap()
assert.NoError(err)
Expand All @@ -689,7 +687,6 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) {

params := decoded.Parameters
assert.Len(params, 6)
assert.Equal(*cfg.Processor.Advanced.Port, int32(params[0].Ingest.Collector.Port))

lokiCfg := params[3].Write.Loki
assert.Equal("https://lokistack-gateway-http.ls-namespace.svc:8080/api/logs/v1/network/", lokiCfg.URL)
Expand All @@ -705,7 +702,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) {
assert.Equal(cfg.Loki.Advanced.WriteMinBackoff.Duration.String(), lokiCfg.MinBackoff)
assert.Equal(cfg.Loki.Advanced.WriteMaxBackoff.Duration.String(), lokiCfg.MaxBackoff)
assert.EqualValues(*cfg.Loki.Advanced.WriteMaxRetries, lokiCfg.MaxRetries)
assert.EqualValues([]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "FlowDirection", "Duplicate", "_RecordType"}, lokiCfg.Labels)
assert.EqualValues([]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "_RecordType", "FlowDirection", "Duplicate"}, lokiCfg.Labels)
assert.Equal(`{app="netobserv-flowcollector"}`, fmt.Sprintf("%v", lokiCfg.StaticLabels))

assert.Equal(cfg.Processor.Metrics.Server.Port, int32(decoded.MetricsSettings.Port))
Expand Down
36 changes: 36 additions & 0 deletions pkg/loki/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package loki

import (
"strconv"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf"
"github.com/netobserv/network-observability-operator/pkg/helper"
)

func GetLokiLabels(desired *flowslatest.FlowCollectorSpec) []string {
indexFields := constants.LokiIndexFields

if desired.Processor.LogTypes != nil && *desired.Processor.LogTypes != flowslatest.LogTypeFlows {
indexFields = append(indexFields, constants.LokiConnectionIndexFields...)
}

if desired.Processor.MultiClusterDeployment != nil && *desired.Processor.MultiClusterDeployment {
indexFields = append(indexFields, constants.ClusterNameLabelName)
}

if helper.UseEBPF(desired) {
dedupJustMark, _ := strconv.ParseBool(ebpf.DedupeJustMarkDefault)
if desired.Agent.EBPF.Advanced != nil {
if v, ok := desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok {
dedupJustMark, _ = strconv.ParseBool(v)
}
}
if dedupJustMark {
indexFields = append(indexFields, constants.LokiDeduperMarkIndexFields...)
}
}

return indexFields
}

0 comments on commit fe8ae8c

Please sign in to comment.