diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index d63188da..8b5d46b3 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -340,9 +340,22 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent klog.Warning("FlowType does not exist in current record.") } correlationRequired := isCorrelationRequired(flowType, record) - - currTime := time.Now() aggregationRecord, exist := a.flowKeyRecordMap[*flowKey] + if exist { + if flowTypeIE, _, exist := aggregationRecord.Record.GetInfoElementWithValue("flowType"); exist { + flowType = flowTypeIE.GetUnsigned8Value() + } else { + klog.Warning("FlowType does not exist in current record.") + } + prevCorrelationRequired := isCorrelationRequired(flowType, aggregationRecord.Record) + if prevCorrelationRequired && !correlationRequired { + delete(a.flowKeyRecordMap, *flowKey) + exist = false + } else if !prevCorrelationRequired && correlationRequired { + return nil + } + } + currTime := time.Now() if exist { if correlationRequired { // Do correlation of records if record belongs to inter-node flow and @@ -411,14 +424,9 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent if !correlationRequired { aggregationRecord.ReadyToSend = true - // If no correlation is required for an Inter-Node record, K8s metadata is - // expected to be not completely filled. For Intra-Node flows and ToExternal - // flows, areCorrelatedFieldsFilled is set to true by default. - if flowType == registry.FlowTypeInterNode { - aggregationRecord.areCorrelatedFieldsFilled = false - } else { - aggregationRecord.areCorrelatedFieldsFilled = true - } + } + if flowType != registry.FlowTypeInterNode { + aggregationRecord.areCorrelatedFieldsFilled = true } aggregationRecord.areExternalFieldsFilled = false // Push the record to the priority queue.