diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index ffd1dba9..9d8548b3 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -16,6 +16,7 @@ package intermediate import ( "container/heap" + "encoding/json" "fmt" "net" "strings" @@ -542,6 +543,14 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent incomingVal := ieWithValue.GetStringValue() existingIeWithValue.SetStringValue(incomingVal) } + case "httpVals": + incomingVal := ieWithValue.GetStringValue() + existingVal := existingIeWithValue.GetStringValue() + updatedHttpVals, err := fillHttpVals(incomingVal, existingVal) + if err != nil { + return fmt.Errorf("httpVals could not be updated: %v", err) + } + existingIeWithValue.SetStringValue(updatedHttpVals) default: klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element) } @@ -984,3 +993,29 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool { } return false } + +func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) { + incomingHttpValsJson := make(map[int32]string) + existingHttpValsJson := make(map[int32]string) + + if incomingHttpVals != "" { + if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil { + return "", fmt.Errorf("Error parsing JSON: %v", err) + } + } + if existingHttpVals != "" { + if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil { + return "", fmt.Errorf("Error parsing JSON: %v", err) + } + } + if len(existingHttpValsJson) > 0 { + for key, value := range existingHttpValsJson { + incomingHttpValsJson[key] = value + } + } + updatedHttpVals, err := json.Marshal(incomingHttpValsJson) + if err != nil { + return "", fmt.Errorf("Error converting JSON to string: %v", err) + } + return string(updatedHttpVals), nil +}