From 1dbce89df1ee9fc1e7f79c19e8d8b38538ed292a Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Tue, 7 Nov 2023 15:42:59 -0800 Subject: [PATCH] Update Flow aggregate field httpVals if exporter sends httpval1 in first export and sends httpvals2 in second we should append both before sending further. Also the function should take care of deduplicating same TxID items. Signed-off-by: Tushar Tathgur --- pkg/intermediate/aggregate.go | 37 ++++++++++++++++++++++++++++++ pkg/intermediate/aggregate_test.go | 19 +++++++++++---- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index ffd1dba9..6827d9f1 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -16,6 +16,7 @@ package intermediate import ( "container/heap" + "encoding/json" "fmt" "net" "strings" @@ -542,6 +543,16 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent incomingVal := ieWithValue.GetStringValue() existingIeWithValue.SetStringValue(incomingVal) } + case "httpVals": + incomingVal := ieWithValue.GetStringValue() + existingVal := existingIeWithValue.GetStringValue() + updatedHttpVals, err := fillHttpVals(incomingVal, existingVal) + if err != nil { + klog.Errorf("httpVals could not be updated from jsons, err: %v", err) + existingIeWithValue.SetStringValue(incomingVal) + } else { + existingIeWithValue.SetStringValue(updatedHttpVals) + } default: klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element) } @@ -984,3 +995,29 @@ func isCorrelationRequired(flowType uint8, record entities.Record) bool { } return false } + +func fillHttpVals(incomingHttpVals, existingHttpVals string) (string, error) { + incomingHttpValsJson := make(map[int32]string) + existingHttpValsJson := make(map[int32]string) + + if incomingHttpVals != "" { + if err := json.Unmarshal([]byte(incomingHttpVals), &incomingHttpValsJson); err != nil { + return "", fmt.Errorf("error parsing JSON: %v", err) + } + } + if existingHttpVals != "" { + if err := json.Unmarshal([]byte(existingHttpVals), &existingHttpValsJson); err != nil { + return "", fmt.Errorf("error parsing JSON: %v", err) + } + } + if len(existingHttpValsJson) > 0 { + for key, value := range existingHttpValsJson { + incomingHttpValsJson[key] = value + } + } + updatedHttpVals, err := json.Marshal(incomingHttpValsJson) + if err != nil { + return "", fmt.Errorf("error converting JSON to string: %v", err) + } + return string(updatedHttpVals), nil +} diff --git a/pkg/intermediate/aggregate_test.go b/pkg/intermediate/aggregate_test.go index 75d1e08d..8dc573f5 100644 --- a/pkg/intermediate/aggregate_test.go +++ b/pkg/intermediate/aggregate_test.go @@ -47,6 +47,7 @@ var ( "flowEndSeconds", "flowEndReason", "tcpState", + "httpVals", } statsElementList = []string{ "packetTotalCount", @@ -165,7 +166,7 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR ie6 := entities.NewStringInfoElement(entities.NewInfoElement("sourcePodName", 101, 13, registry.AntreaEnterpriseID, 65535), srcPod) ie7 := entities.NewStringInfoElement(entities.NewInfoElement("destinationPodName", 103, 13, registry.AntreaEnterpriseID, 65535), dstPod) ie9 := entities.NewUnsigned16InfoElement(entities.NewInfoElement("destinationServicePort", 107, 2, registry.AntreaEnterpriseID, 2), uint16(4739)) - var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16 entities.InfoElementWithValue + var ie1, ie2, ie8, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18 entities.InfoElementWithValue if !isIPv6 { ie1 = entities.NewIPAddressInfoElement(entities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4), net.ParseIP("10.0.0.1").To4()) ie2 = entities.NewIPAddressInfoElement(entities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4), net.ParseIP("10.0.0.2").To4()) @@ -179,15 +180,18 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR tmpFlowEndSecs, _ := registry.GetInfoElement("flowEndSeconds", registry.IANAEnterpriseID) tmpFlowEndReason, _ := registry.GetInfoElement("flowEndReason", registry.IANAEnterpriseID) tmpTCPState, _ := registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID) + tmpHttpVals, _ := registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID) if !isUpdatedRecord { ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(1)) ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.ActiveTimeoutReason) ie13 = entities.NewStringInfoElement(tmpTCPState, "ESTABLISHED") + ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}") } else { ie10 = entities.NewDateTimeSecondsInfoElement(tmpFlowEndSecs, uint32(10)) ie12 = entities.NewUnsigned8InfoElement(tmpFlowEndReason, registry.EndOfFlowReason) ie13 = entities.NewStringInfoElement(tmpTCPState, "TIME_WAIT") + ie18 = entities.NewStringInfoElement(tmpHttpVals, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}") } if isToExternal { @@ -207,9 +211,9 @@ func createDataMsgForSrc(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR } else { ie15 = entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), registry.NetworkPolicyRuleActionNoAction) } - ie17 := entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0)) + ie17 = entities.NewDateTimeSecondsInfoElement(tmpFlowStartSecs, uint32(0)) - elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17) + elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18) // Add all elements in statsElements. for _, element := range statsElementList { var e *entities.InfoElement @@ -269,7 +273,7 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR var srcAddr, dstAddr, svcAddr []byte var flowStartTime, flowEndTime uint32 var flowEndReason, ingressNetworkPolicyRuleAction, antreaFlowType uint8 - var srcPod, dstPod, tcpState string + var srcPod, dstPod, tcpState, httpVals string var svcPort uint16 srcPort := uint16(1234) dstPort := uint16(5678) @@ -323,10 +327,12 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR flowEndTime = uint32(1) flowEndReason = registry.ActiveTimeoutReason tcpState = "ESTABLISHED" + httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}" } else { flowEndTime = uint32(10) flowEndReason = registry.EndOfFlowReason tcpState = "TIME_WAIT" + httpVals = "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}" } tmpElement, _ := registry.GetInfoElement("flowStartSeconds", registry.IANAEnterpriseID) ie17 := entities.NewDateTimeSecondsInfoElement(tmpElement, flowStartTime) @@ -345,8 +351,10 @@ func createDataMsgForDst(t testing.TB, isIPv6 bool, isIntraNode bool, isUpdatedR ie14 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction) ie15 := entities.NewUnsigned8InfoElement(entities.NewInfoElement("egressNetworkPolicyRuleAction", 140, 1, registry.AntreaEnterpriseID, 1), egressNetworkPolicyRuleAction) ie16 := entities.NewSigned32InfoElement(entities.NewInfoElement("ingressNetworkPolicyRulePriority", 116, 7, registry.AntreaEnterpriseID, 4), ingressNetworkPolicyRulePriority) + tmpElement, _ = registry.GetInfoElement("httpVals", registry.AntreaEnterpriseID) + ie18 := entities.NewStringInfoElement(tmpElement, httpVals) - elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17) + elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14, ie15, ie16, ie17, ie18) // Add all elements in statsElements. for _, element := range statsElementList { var e *entities.InfoElement @@ -802,6 +810,7 @@ func assertElementMap(t *testing.T, record map[string]interface{}, ipv6 bool) { assert.Equal(t, uint64(0), record["packetDeltaCount"]) assert.Equal(t, uint64(502), record["reversePacketTotalCount"]) assert.Equal(t, uint64(0), record["reversePacketDeltaCount"]) + assert.Equal(t, "{\"0\":\"{hostname:10.10.0.1,url:/public/,http_user_agent:curl/7.74.0,http_content_type:text/html,http_method:GET,protocol:HTTP/1.1,status:200,length:153}\"}", record["httpVals"]) } func TestGetRecords(t *testing.T) {