From daa32eaa9e5ddb3bd9b3335aaee92c0a4f5a7c39 Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Tue, 7 Nov 2023 15:42:59 -0800 Subject: [PATCH] Update Flow aggregate field httpVals if exporter sends httpval1 in first export and sends httpvals2 in second we should append both before sending further. Also the function should take care of deduplicating same TxID items. Signed-off-by: Tushar Tathgur --- pkg/intermediate/aggregate.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index ffd1dba9..9d8548b3 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -16,6 +16,7 @@ package intermediate import ( "container/heap" + "encoding/json" "fmt" "net" "strings" @@ -542,6 +543,14 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent incomingVal := ieWithValue.GetStringValue() existingIeWithValue.SetStringValue(incomingVal) } + case "httpVals": + incomingVal := ieWithValue.GetStringValue() + existingVal := existingIeWithValue.GetStringValue() + updatedHttpVals, err := fillHttpVals(incomingVal, existingVal) + if err != nil { + return fmt.Errorf("httpVals could not be updated: %v", err) + } + existingIeWithValue.SetStringValue(updatedHttpVals) default: klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element) } @@ -984,3 +993,29 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool { } return false } + +func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) { + incomingHttpValsJson := make(map[int32]string) + existingHttpValsJson := make(map[int32]string) + + if incomingHttpVals != "" { + if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil { + return "", fmt.Errorf("Error parsing JSON: %v", err) + } + } + if existingHttpVals != "" { + if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil { + return "", fmt.Errorf("Error parsing JSON: %v", err) + } + } + if len(existingHttpValsJson) > 0 { + for key, value := range existingHttpValsJson { + incomingHttpValsJson[key] = value + } + } + updatedHttpVals, err := json.Marshal(incomingHttpValsJson) + if err != nil { + return "", fmt.Errorf("Error converting JSON to string: %v", err) + } + return string(updatedHttpVals), nil +}