Skip to content

Commit

Permalink
Update Flow aggregate field httpVals
Browse files Browse the repository at this point in the history
if exporter sends httpval1 in first export and sends httpvals2 in second
we should append both before sending further.
Also the function should take care of deduplicating same TxID items.

Signed-off-by: Tushar Tathgur <tathgurt@tathgurtPNQHP.vmware.com>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Nov 8, 2023
1 parent 2c9df0b commit daa32ea
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package intermediate

import (
"container/heap"
"encoding/json"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -542,6 +543,14 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
incomingVal := ieWithValue.GetStringValue()
existingIeWithValue.SetStringValue(incomingVal)
}
case "httpVals":
incomingVal := ieWithValue.GetStringValue()
existingVal := existingIeWithValue.GetStringValue()
updatedHttpVals, err := fillHttpVals(incomingVal, existingVal)
if err != nil {
return fmt.Errorf("httpVals could not be updated: %v", err)

Check warning on line 551 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L546-L551

Added lines #L546 - L551 were not covered by tests
}
existingIeWithValue.SetStringValue(updatedHttpVals)

Check warning on line 553 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L553

Added line #L553 was not covered by tests
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
Expand Down Expand Up @@ -984,3 +993,29 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool {
}
return false
}

func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) {
incomingHttpValsJson := make(map[int32]string)
existingHttpValsJson := make(map[int32]string)

Check warning on line 999 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L998-L999

Added lines #L998 - L999 were not covered by tests

if incomingHttpVals != "" {
if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil {
return "", fmt.Errorf("Error parsing JSON: %v", err)

Check warning on line 1003 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1001-L1003

Added lines #L1001 - L1003 were not covered by tests
}
}
if existingHttpVals != "" {
if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil {
return "", fmt.Errorf("Error parsing JSON: %v", err)

Check warning on line 1008 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1006-L1008

Added lines #L1006 - L1008 were not covered by tests
}
}
if len(existingHttpValsJson) > 0 {

Check warning on line 1011 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1011

Added line #L1011 was not covered by tests
for key, value := range existingHttpValsJson {
incomingHttpValsJson[key] = value

Check warning on line 1013 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1013

Added line #L1013 was not covered by tests
}
}
updatedHttpVals, err := json.Marshal(incomingHttpValsJson)
if err != nil {
return "", fmt.Errorf("Error converting JSON to string: %v", err)

Check warning on line 1018 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1016-L1018

Added lines #L1016 - L1018 were not covered by tests
}
return string(updatedHttpVals), nil

Check warning on line 1020 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L1020

Added line #L1020 was not covered by tests
}

0 comments on commit daa32ea

Please sign in to comment.