Skip to content

Commit

Permalink
Update Flow aggregate field httpVals
Browse files Browse the repository at this point in the history
if exporter sends httpval1 in first export and sends httpvals2 in second
we should append both before sending further.
Also the function should take care of deduplicating same TxID items.

Signed-off-by: Tushar Tathgur <tathgurt@tathgurtPNQHP.vmware.com>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Nov 17, 2023
1 parent d5ea241 commit 871fef5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
37 changes: 37 additions & 0 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package intermediate

import (
"container/heap"
"encoding/json"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -542,6 +543,16 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
incomingVal := ieWithValue.GetStringValue()
existingIeWithValue.SetStringValue(incomingVal)
}
case "httpVals":
incomingVal := ieWithValue.GetStringValue()
existingVal := existingIeWithValue.GetStringValue()
updatedHttpVals, err := fillHttpVals(incomingVal, existingVal)
if err != nil {
klog.Errorf("httpVals could not be updated from jsons, err: %v", err)
existingIeWithValue.SetStringValue(incomingVal)
} else {
existingIeWithValue.SetStringValue(updatedHttpVals)
}
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
Expand Down Expand Up @@ -984,3 +995,29 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool {
}
return false
}

func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) {
incomingHttpValsJson := make(map[int32]string)
existingHttpValsJson := make(map[int32]string)

if incomingHttpVals != "" {
if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)
}
}
if existingHttpVals != "" {
if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil {
return "", fmt.Errorf("error parsing JSON: %v", err)
}
}
if len(existingHttpValsJson) > 0 {
for key, value := range existingHttpValsJson {
incomingHttpValsJson[key] = value
}
}
updatedHttpVals, err := json.Marshal(incomingHttpValsJson)
if err != nil {
return "", fmt.Errorf("error converting JSON to string: %v", err)
}
return string(updatedHttpVals), nil
}
19 changes: 14 additions & 5 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
"flowEndSeconds",
"flowEndReason",
"tcpState",
"httpVals",
}
statsElementList = []string{
"packetTotalCount",
Expand Down Expand Up @@ -165,7 +166,7 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie6 := entities.NewStringInfoElement(entities.NewInfoElement("sourcePodName", 101, 13, registry.AntreaEnterpriseID, 65535), srcPod)
ie7 := entities.NewStringInfoElement(entities.NewInfoElement("destinationPodName", 103, 13, registry.AntreaEnterpriseID, 65535), dstPod)
ie9 := entities.NewUnsigned16InfoElement(entities.NewInfoElement("destinationServicePort", 107, 2, registry.AntreaEnterpriseID, 2), uint16(4739))
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16 entities.InfoElementWithValue
var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18 entities.InfoElementWithValue
if !isIPv6 {
ie1 = entities.NewIPAddressInfoElement(entities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4), net.ParseIP("10.0.0.1").To4())
ie2 = entities.NewIPAddressInfoElement(entities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4), net.ParseIP("10.0.0.2").To4())
Expand All @@ -179,15 +180,18 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
tmpFlowEndSecs, _ := registry.GetInfoElement("flowEndSeconds", registry.IANAEnterpriseID)
tmpFlowEndReason, _ := registry.GetInfoElement("flowEndReason", registry.IANAEnterpriseID)
tmpTCPState, _ := registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
tmpHttpVals, _ := registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)

if !isUpdatedRecord {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(1))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.ActiveTimeoutReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "ESTABLISHED")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
} else {
ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(10))
ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.EndOfFlowReason)
ie13 = entities.NewStringInfoElement(tmpTCPState, "TIME_WAIT")
ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}")
}

if isToExternal {
Expand All @@ -207,9 +211,9 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
} else {
ie15 = entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), registry.NetworkPolicyRuleActionNoAction)
}
ie17 := entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))
ie17 = entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0))

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -269,7 +273,7 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
var srcAddr, dstAddr, svcAddr []byte
var flowStartTime, flowEndTime uint32
var flowEndReason, ingressNetworkPolicyRuleAction, antreaFlowType uint8
var srcPod, dstPod, tcpState string
var srcPod, dstPod, tcpState, httpVals string
var svcPort uint16
srcPort := uint16(1234)
dstPort := uint16(5678)
Expand Down Expand Up @@ -323,10 +327,12 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
flowEndTime = uint32(1)
flowEndReason = registry.ActiveTimeoutReason
tcpState = "ESTABLISHED"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
} else {
flowEndTime = uint32(10)
flowEndReason = registry.EndOfFlowReason
tcpState = "TIME_WAIT"
httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}"
}
tmpElement, _ := registry.GetInfoElement("flowStartSeconds", registry.IANAEnterpriseID)
ie17 := entities.NewDateTimeSecondsInfoElement(tmpElement, flowStartTime)
Expand All @@ -345,8 +351,10 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR
ie14 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)
ie15 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), egressNetworkPolicyRuleAction)
ie16 := entities.NewSigned32InfoElement(entities.NewInfoElement("ingressNetworkPolicyRulePriority", 116, 7, registry.AntreaEnterpriseID, 4), ingressNetworkPolicyRulePriority)
tmpElement, _ = registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID)
ie18 := entities.NewStringInfoElement(tmpElement, httpVals)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -802,6 +810,7 @@ func assertElementMap(t *testing.T, record map[string]interface{}, ipv6 bool) {
assert.Equal(t, uint64(0), record["packetDeltaCount"])
assert.Equal(t, uint64(502), record["reversePacketTotalCount"])
assert.Equal(t, uint64(0), record["reversePacketDeltaCount"])
assert.Equal(t, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}", record["httpVals"])
}

func TestGetRecords(t *testing.T) {
Expand Down

0 comments on commit 871fef5

Please sign in to comment.