From 174f4d7eeb2cb9702da8cb1acdebdf264b6fc237 Mon Sep 17 00:00:00 2001 From: tushartathgur <79219394+tushartathgur@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:59:41 -0700 Subject: [PATCH 1/8] Changing the datatype for flowEndSecondsFromSourceNode and flowEndSecondsFromDestinationNode (#320) flowEndSecondsFromDestinationNode and flowEndSecondsFromDestinationNode are changed to the type DateTimesecond Signed-off-by: Tushar Tathgur Co-authored-by: Tushar Tathgur --- pkg/registry/registry_antrea.csv | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/registry/registry_antrea.csv b/pkg/registry/registry_antrea.csv index fca62e45..5819096b 100644 --- a/pkg/registry/registry_antrea.csv +++ b/pkg/registry/registry_antrea.csv @@ -50,7 +50,7 @@ ElementID,Name,Abstract Data Type,Data Type Semantics,Status,Description,Units,R 148,throughputFromDestinationNode,unsigned64,,current,,,,,,,,56506, 149,reverseThroughputFromSourceNode,unsigned64,,current,,,,,,,,56506, 150,reverseThroughputFromDestinationNode,unsigned64,,current,,,,,,,,56506, -151,flowEndSecondsFromSourceNode,unsigned32,,current,,,,,,,,56506, -152,flowEndSecondsFromDestinationNode,unsigned32,,current,,,,,,,,56506, +151,flowEndSecondsFromSourceNode,dateTimeSeconds,,current,,,,,,,,56506, +152,flowEndSecondsFromDestinationNode,dateTimeSeconds,,current,,,,,,,,56506, 153,egressName,string,,current,,,,,,,,56506, 154,egressIP,string,,current,,,,,,,,56506, From 152e5e9b21d93caeaa2d72f7a8294065fcc006a4 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 29 Sep 2023 11:02:25 -0700 Subject: [PATCH 2/8] Improve handling of string IEs (#322) * Use the copy built-in to copy string bytes to buffer, instead of copying bytes one-by-one with a for loop. This is more efficient (confirmed with new Benchmark functions) and more correct (in case the string includes UTF-8 characters which use more than 1 byte). * Increase max string length from 65,534 to 65,535 (as per the IPFIX RFC 7011). Signed-off-by: Antonin Bas --- pkg/entities/ie.go | 28 +++++++++++++--------------- pkg/entities/ie_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pkg/entities/ie.go b/pkg/entities/ie.go index de72b331..871ee449 100644 --- a/pkg/entities/ie.go +++ b/pkg/entities/ie.go @@ -485,16 +485,14 @@ func EncodeToIEDataType(dataType IEDataType, val interface{}) ([]byte, error) { if len(v) < 255 { encodedBytes = make([]byte, len(v)+1) encodedBytes[0] = uint8(len(v)) - for i, b := range v { - encodedBytes[i+1] = byte(b) - } - } else if len(v) < 65535 { + copy(encodedBytes[1:], v) + } else if len(v) <= math.MaxUint16 { encodedBytes = make([]byte, len(v)+3) encodedBytes[0] = byte(255) binary.BigEndian.PutUint16(encodedBytes[1:3], uint16(len(v))) - for i, b := range v { - encodedBytes[i+3] = byte(b) - } + copy(encodedBytes[3:], v) + } else { + return nil, fmt.Errorf("provided String value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16) } return encodedBytes, nil } @@ -560,15 +558,15 @@ func encodeInfoElementValueToBuff(element InfoElementWithValue, buffer []byte, i v := element.GetStringValue() if len(v) < 255 { buffer[index] = uint8(len(v)) - for i, b := range v { - buffer[i+index+1] = byte(b) - } - } else if len(v) < 65535 { - buffer[index] = byte(255) + // See https://pkg.go.dev/builtin#copy + // As a special case, it also will copy bytes from a string to a slice of bytes. + copy(buffer[index+1:], v) + } else if len(v) <= math.MaxUint16 { + buffer[index] = byte(255) // marker byte for long strings binary.BigEndian.PutUint16(buffer[index+1:index+3], uint16(len(v))) - for i, b := range v { - buffer[i+index+3] = byte(b) - } + copy(buffer[index+3:], v) + } else { + return fmt.Errorf("provided String value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16) } default: return fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011") diff --git a/pkg/entities/ie_test.go b/pkg/entities/ie_test.go index 9f95355e..43ae6a02 100644 --- a/pkg/entities/ie_test.go +++ b/pkg/entities/ie_test.go @@ -2,9 +2,11 @@ package entities import ( "net" + "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var macAddress, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -72,3 +74,37 @@ func TestNewInfoElementWithValue(t *testing.T) { assert.Equal(t, element.GetInfoElement().Name, "sourceIPv4Address") assert.Equal(t, element.GetIPAddressValue(), ip) } + +func BenchmarkEncodeInfoElementValueToBuffShortString(b *testing.B) { + // a short string has a max length of 254 + str := strings.Repeat("x", 128) + element := NewStringInfoElement(NewInfoElement("interfaceDescription", 83, 13, 0, 65535), str) + const numCopies = 1000 + length := element.GetLength() + buffer := make([]byte, numCopies*length) + b.ResetTimer() + for i := 0; i < b.N; i++ { + index := 0 + for j := 0; j < numCopies; j++ { + require.NoError(b, encodeInfoElementValueToBuff(element, buffer, index)) + index += length + } + } +} + +func BenchmarkEncodeInfoElementValueToBuffLongString(b *testing.B) { + // a long string has a max length of 65535 + str := strings.Repeat("x", 10000) + element := NewStringInfoElement(NewInfoElement("interfaceDescription", 83, 13, 0, 65535), str) + const numCopies = 1000 + length := element.GetLength() + buffer := make([]byte, numCopies*length) + b.ResetTimer() + for i := 0; i < b.N; i++ { + index := 0 + for j := 0; j < numCopies; j++ { + require.NoError(b, encodeInfoElementValueToBuff(element, buffer, index)) + index += length + } + } +} From b141f8b74dc5213e1caa900ec2f06016953ce58d Mon Sep 17 00:00:00 2001 From: tushartathgur <79219394+tushartathgur@users.noreply.github.com> Date: Fri, 29 Sep 2023 12:42:29 -0700 Subject: [PATCH 3/8] Adding L7 visibility fields (#315) * L7 Visibility support in Antrea Signed-off-by: Tushar Tathgur * Replaced isL7 bool with l7ProtocolName string Signed-off-by: Tushar Tathgur --------- Signed-off-by: Tushar Tathgur Co-authored-by: Tushar Tathgur --- pkg/registry/registry_antrea.csv | 2 ++ pkg/registry/registry_antrea.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/registry/registry_antrea.csv b/pkg/registry/registry_antrea.csv index 5819096b..20c9cd81 100644 --- a/pkg/registry/registry_antrea.csv +++ b/pkg/registry/registry_antrea.csv @@ -54,3 +54,5 @@ ElementID,Name,Abstract Data Type,Data Type Semantics,Status,Description,Units,R 152,flowEndSecondsFromDestinationNode,dateTimeSeconds,,current,,,,,,,,56506, 153,egressName,string,,current,,,,,,,,56506, 154,egressIP,string,,current,,,,,,,,56506, +155,l7ProtocolName,string,,current,,,,,,,,56506, +156,httpVals,string,,current,,,,,,,,56506, diff --git a/pkg/registry/registry_antrea.go b/pkg/registry/registry_antrea.go index 38610365..6587d8df 100644 --- a/pkg/registry/registry_antrea.go +++ b/pkg/registry/registry_antrea.go @@ -76,4 +76,6 @@ func loadAntreaRegistry() { registerInfoElement(*entities.NewInfoElement("flowEndSecondsFromDestinationNode", 152, 14, 56506, 4), 56506) registerInfoElement(*entities.NewInfoElement("egressName", 153, 13, 56506, 65535), 56506) registerInfoElement(*entities.NewInfoElement("egressIP", 154, 13, 56506, 65535), 56506) + registerInfoElement(*entities.NewInfoElement("l7ProtocolName", 155, 13, 56506, 65535), 56506) + registerInfoElement(*entities.NewInfoElement("httpVals", 156, 13, 56506, 65535), 56506) } From 6d01e0ef05fb45c3fce16fa7baf1b7f7c0333570 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 29 Sep 2023 15:09:11 -0700 Subject: [PATCH 4/8] Avoid error logs in BenchmarkMultipleExportersToCollector (#323) * Gracefully shutdown exporter connections before stopping collector. * Do not log an error for graceful client shutdown (EOF error). Signed-off-by: Antonin Bas --- pkg/collector/process.go | 2 +- pkg/collector/tcp.go | 15 ++++++++++----- pkg/test/collector_perf_test.go | 11 ++++++++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/collector/process.go b/pkg/collector/process.go index ef4e23b7..cb59ffca 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -383,7 +383,7 @@ func getMessageLength(reader *bufio.Reader) (int, error) { var msgLen uint16 err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen) if err != nil { - return 0, fmt.Errorf("cannot decode message: %v", err) + return 0, fmt.Errorf("cannot decode message: %w", err) } return int(msgLen), nil } diff --git a/pkg/collector/tcp.go b/pkg/collector/tcp.go index 3f3b2acc..d4b39fba 100644 --- a/pkg/collector/tcp.go +++ b/pkg/collector/tcp.go @@ -5,6 +5,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "net" @@ -70,25 +71,29 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { reader := bufio.NewReader(conn) for { length, err := getMessageLength(reader) + if errors.Is(err, io.EOF) { + klog.V(2).InfoS("Connection was closed by client") + return + } if err != nil { - klog.Errorf("error when retrieving message length: %v", err) + klog.ErrorS(err, "Error when retrieving message length") cp.deleteClient(address) return } buff := make([]byte, length) _, err = io.ReadFull(reader, buff) if err != nil { - klog.Errorf("error when reading the message: %v", err) + klog.ErrorS(err, "Error when reading the message") cp.deleteClient(address) return } message, err := cp.decodePacket(bytes.NewBuffer(buff), address) if err != nil { - klog.Error(err) + klog.ErrorS(err, "Error when decoding packet") continue } - klog.V(4).Infof("Processed message from exporter with observation domain ID: %v ser type: %v number of records: %v", - message.GetObsDomainID(), message.GetSet().GetSetType(), message.GetSet().GetNumberOfRecords()) + klog.V(4).InfoS("Processed message from exporter", + "observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords()) } }() <-cp.stopChan diff --git a/pkg/test/collector_perf_test.go b/pkg/test/collector_perf_test.go index 4ebb3d21..ee517a8c 100644 --- a/pkg/test/collector_perf_test.go +++ b/pkg/test/collector_perf_test.go @@ -71,6 +71,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { } go cp.Start() waitForCollectorStatus(b, cp, true) + exporters := make([]*exporter.ExportingProcess, 0, numOfExporters) b.ResetTimer() for i := 0; i < numOfExporters; i++ { b.StartTimer() @@ -89,6 +90,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { exporter.SendSet(createDataSet(templateID, true, false, false)) } b.StopTimer() + exporters = append(exporters, exporter) time.Sleep(time.Millisecond) } b.StartTimer() @@ -96,10 +98,17 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { for range cp.GetMsgChan() { count++ if count == numOfRecords*numOfExporters { - cp.Stop() break } } + b.StopTimer() + // Gracefully shutdown all the exporters to avoid "use of closed network connection" error + // logs. + for i := 0; i < numOfExporters; i++ { + exporters[i].CloseConnToCollector() + } + b.StartTimer() + cp.Stop() waitForCollectorStatus(b, cp, false) } From e89e7378b036e686608bd95074fc965a26d2582b Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Mon, 2 Oct 2023 12:08:08 -0700 Subject: [PATCH 5/8] Fix aggregation bug for throughput common fields (#324) Just like for stats, throughput common fields should only be updated for the "latest" record (based on "flowEndSeconds"). This bug explains why the TestCollectorToIntermediate integration test has been flaky for a long time. In some cases, the record from the destination was processed before the record from the source (we have 2 aggregation workers, so records are not guaranteed to be processed in the order in which they are received). While the destination record has a larger "flowEndSeconds", the "throughput" and "reverseThroughput" common elements were overriden by values from the source record. Signed-off-by: Antonin Bas --- pkg/intermediate/aggregate.go | 8 +++++--- pkg/test/collector_intermediate_test.go | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index 349b63c4..ffd1dba9 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -637,8 +637,10 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent ie, _, _ := existingRecord.GetInfoElementWithValue(antreaDestinationThroughputElements[i]) ie.SetUnsigned64Value(throughputVals[i]) } - ie, _, _ := existingRecord.GetInfoElementWithValue(element) - ie.SetUnsigned64Value(throughputVals[i]) + if isLatest { + ie, _, _ := existingRecord.GetInfoElementWithValue(element) + ie.SetUnsigned64Value(throughputVals[i]) + } } return nil } @@ -756,7 +758,7 @@ func (a *AggregationProcess) addFieldsForThroughputCalculation(record entities.R return err } value := uint32(0) - if fillSrcStats && strings.Contains(ieName, "Source") || fillDstStats && strings.Contains(ieName, "Destination") { + if (fillSrcStats && strings.Contains(ieName, "Source")) || (fillDstStats && strings.Contains(ieName, "Destination")) { value = timeEnd } if err = record.AddInfoElement(entities.NewUnsigned32InfoElement(ie, value)); err != nil { diff --git a/pkg/test/collector_intermediate_test.go b/pkg/test/collector_intermediate_test.go index d4a5119e..49b9b24e 100644 --- a/pkg/test/collector_intermediate_test.go +++ b/pkg/test/collector_intermediate_test.go @@ -36,7 +36,7 @@ import ( // Run TestSingleRecordTCPTransport and TestSingleRecordTCPTransportIPv6 along with // debug log for the message in pkg/exporter/process.go before sending it to get following // raw bytes for template and data packets. -// Following data packets are generated with getTestRecord in exporter_collector_test.go +// Following data packets are generated with getTestRecord in util.go // dataPacket1IPv4: getTestRecord(true, false) // dataPacket2IPv4: getTestRecord(false, false) // dataPacket1IPv6: getTestRecord(true, true) @@ -164,7 +164,8 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) { ap, _ := intermediate.InitAggregationProcess(apInput) go cp.Start() waitForCollectorReady(t, cp) - go func() { + go ap.Start() + func() { collectorAddr, _ := net.ResolveTCPAddr("tcp", cp.GetAddress().String()) conn, err := net.DialTCP("tcp", nil, collectorAddr) if err != nil { @@ -181,7 +182,6 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) { conn.Write(dataPacket2IPv4) } }() - go ap.Start() if isIPv6 { waitForAggregationToFinish(t, ap, flowKey2) } else { @@ -276,9 +276,11 @@ func waitForCollectorReady(t *testing.T, cp *collector.CollectingProcess) { if strings.Split(cp.GetAddress().String(), ":")[1] == "0" { return false, fmt.Errorf("random port is not resolved") } - if _, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()); err != nil { + conn, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()) + if err != nil { return false, err } + conn.Close() return true, nil } if err := wait.Poll(100*time.Millisecond, 500*time.Millisecond, checkConn); err != nil { From 06f6636929c49f54e9aaeb16128bc11331e38238 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Mon, 2 Oct 2023 14:49:32 -0700 Subject: [PATCH 6/8] Improvements in integration tests (#325) * Use more meaningful byte counts in test records (count and reverseCount have no reason to be the same). * Generate test data packets automatically using a function, instead of having to generate packet data manually. * Add integration build tag for linters Signed-off-by: Antonin Bas --- .golangci.yml | 2 + pkg/collector/process_test.go | 18 ++++---- pkg/exporter/msg.go | 56 +++++++++++++++++++++++++ pkg/exporter/process.go | 33 ++------------- pkg/exporter/process_test.go | 32 +++++++------- pkg/kafka/producer/kafka_test.go | 16 +++---- pkg/test/{ => certs}/certs.go | 2 +- pkg/test/collector_intermediate_test.go | 28 ++----------- pkg/test/collector_perf_test.go | 6 --- pkg/test/collector_producer_test.go | 52 ++++++++++++----------- pkg/test/exporter_collector_test.go | 36 ++++++++-------- pkg/test/util.go | 45 +++++++++++++++++++- 12 files changed, 188 insertions(+), 138 deletions(-) create mode 100644 pkg/exporter/msg.go rename pkg/test/{ => certs}/certs.go (99%) diff --git a/.golangci.yml b/.golangci.yml index 8db73f6c..d06a1bef 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,6 +3,8 @@ run: tests: true timeout: 10m skip-dirs-use-default: true + build-tags: + - integration linters-settings: goimports: diff --git a/pkg/collector/process_test.go b/pkg/collector/process_test.go index b6c1fd19..c7808b7c 100644 --- a/pkg/collector/process_test.go +++ b/pkg/collector/process_test.go @@ -29,7 +29,7 @@ import ( "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) var validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186} @@ -376,11 +376,11 @@ func TestTLSCollectingProcess(t *testing.T) { var config *tls.Config go func() { roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM([]byte(test.FakeCACert)) + ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCACert)) if !ok { t.Error("Failed to parse root certificate") } - cert, err := tls.X509KeyPair([]byte(test.FakeClientCert), []byte(test.FakeClientKey)) + cert, err := tls.X509KeyPair([]byte(testcerts.FakeClientCert), []byte(testcerts.FakeClientKey)) if err != nil { t.Error(err) } @@ -424,7 +424,7 @@ func TestDTLSCollectingProcess(t *testing.T) { collectorAddr, _ := net.ResolveUDPAddr("udp", cp.GetAddress().String()) go func() { roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM([]byte(test.FakeCert2)) + ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCert2)) if !ok { t.Error("Failed to parse root certificate") } @@ -517,9 +517,9 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI MaxBufferSize: 1024, TemplateTTL: 0, IsEncrypted: true, - CACert: []byte(test.FakeCACert), - ServerCert: []byte(test.FakeCert), - ServerKey: []byte(test.FakeKey), + CACert: []byte(testcerts.FakeCACert), + ServerCert: []byte(testcerts.FakeCert), + ServerKey: []byte(testcerts.FakeKey), } } else { return CollectorInput{ @@ -542,8 +542,8 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI MaxBufferSize: 1024, TemplateTTL: 0, IsEncrypted: true, - ServerCert: []byte(test.FakeCert2), - ServerKey: []byte(test.FakeKey2), + ServerCert: []byte(testcerts.FakeCert2), + ServerKey: []byte(testcerts.FakeKey2), } } else { return CollectorInput{ diff --git a/pkg/exporter/msg.go b/pkg/exporter/msg.go new file mode 100644 index 00000000..1fb25612 --- /dev/null +++ b/pkg/exporter/msg.go @@ -0,0 +1,56 @@ +// Copyright 2023 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "fmt" + "time" + + "github.com/vmware/go-ipfix/pkg/entities" +) + +func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, exportTime time.Time) ([]byte, error) { + // Create a new message and use it to send the set. + msg := entities.NewMessage(false) + + // Check if message is exceeding the limit after adding the set. Include message + // header length too. + msgLen := entities.MsgHeaderLength + set.GetSetLength() + if msgLen > entities.MaxSocketMsgSize { + // This is applicable for both TCP and UDP sockets. + return nil, fmt.Errorf("message size exceeds max socket buffer size") + } + + // Set the fields in the message header. + // IPFIX version number is 10. + // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers + msg.SetVersion(10) + msg.SetObsDomainID(obsDomainID) + msg.SetMessageLen(uint16(msgLen)) + msg.SetExportTime(uint32(exportTime.Unix())) + msg.SetSequenceNum(seqNumber) + + bytesSlice := make([]byte, msgLen) + copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader()) + copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer()) + index := entities.MsgHeaderLength + entities.SetHeaderLen + for _, record := range set.GetRecords() { + len := record.GetRecordLength() + copy(bytesSlice[index:index+len], record.GetBuffer()) + index += len + } + + return bytesSlice, nil +} diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index ed714e0d..bce94d2e 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -285,37 +285,12 @@ func (ep *ExportingProcess) NewTemplateID() uint16 { // createAndSendIPFIXMsg takes in a set as input, creates the IPFIX message, and sends it out. // TODO: This method will change when we support sending multiple sets. func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) { - // Create a new message and use it to send the set. - msg := entities.NewMessage(false) - - // Check if message is exceeding the limit after adding the set. Include message - // header length too. - msgLen := entities.MsgHeaderLength + set.GetSetLength() - if msgLen > entities.MaxSocketMsgSize { - // This is applicable for both TCP and UDP sockets. - return 0, fmt.Errorf("message size exceeds max socket buffer size") - } - - // Set the fields in the message header. - // IPFIX version number is 10. - // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers - msg.SetVersion(10) - msg.SetObsDomainID(ep.obsDomainID) - msg.SetMessageLen(uint16(msgLen)) - msg.SetExportTime(uint32(time.Now().Unix())) if set.GetSetType() == entities.Data { ep.seqNumber = ep.seqNumber + set.GetNumberOfRecords() } - msg.SetSequenceNum(ep.seqNumber) - - bytesSlice := make([]byte, msgLen) - copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader()) - copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer()) - index := entities.MsgHeaderLength + entities.SetHeaderLen - for _, record := range set.GetRecords() { - len := record.GetRecordLength() - copy(bytesSlice[index:index+len], record.GetBuffer()) - index += len + bytesSlice, err := CreateIPFIXMsg(set, ep.obsDomainID, ep.seqNumber, time.Now()) + if err != nil { + return 0, err } // Send the message on the exporter connection. @@ -323,7 +298,7 @@ func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) if err != nil { return bytesSent, fmt.Errorf("error when sending message on the connection: %v", err) - } else if bytesSent != msgLen { + } else if bytesSent != len(bytesSlice) { return bytesSent, fmt.Errorf("could not send the complete message on the connection") } diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 5839c236..0cf6975d 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -28,7 +28,7 @@ import ( "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) func init() { @@ -396,14 +396,14 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) { } func TestInitExportingProcessWithTLS(t *testing.T) { - caCert, caKey, caData, err := test.GenerateCACert() + caCert, caKey, caData, err := testcerts.GenerateCACert() require.NoError(t, err, "Error when generating CA cert") - clientCertData, clientKeyData, err := test.GenerateClientCert(caCert, caKey) + clientCertData, clientKeyData, err := testcerts.GenerateClientCert(caCert, caKey) require.NoError(t, err, "Error when generating client cert") testCases := []struct { name string - serverCertOptions []test.CertificateOption + serverCertOptions []testcerts.CertificateOption withClientAuth bool tlsClientConfig *ExporterTLSClientConfig expectedErr string @@ -419,14 +419,14 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "IP SAN", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, }, }, { name: "name SAN with matching ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, ServerName: "foobar", @@ -434,7 +434,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "name SAN with mismatching ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, ServerName: "badname", @@ -444,7 +444,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "name SAN without ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, }, @@ -453,7 +453,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth with no cert", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -462,7 +462,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth with cert", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -472,7 +472,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth and ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -489,7 +489,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { // Create local server for testing address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") require.NoError(t, err) - serverCertData, serverKeyData, err := test.GenerateServerCert(caCert, caKey, tc.serverCertOptions...) + serverCertData, serverKeyData, err := testcerts.GenerateServerCert(caCert, caKey, tc.serverCertOptions...) require.NoError(t, err, "Error when generating server cert") serverCert, err := tls.X509KeyPair(serverCertData, serverKeyData) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestExportingProcessWithTLS(t *testing.T) { if err != nil { t.Fatalf("Got error when resolving tcp address: %v", err) } - cer, err := tls.X509KeyPair([]byte(test.FakeCert), []byte(test.FakeKey)) + cer, err := tls.X509KeyPair([]byte(testcerts.FakeCert), []byte(testcerts.FakeKey)) if err != nil { t.Error(err) return @@ -592,7 +592,7 @@ func TestExportingProcessWithTLS(t *testing.T) { ObservationDomainID: 1, TempRefTimeout: 0, TLSClientConfig: &ExporterTLSClientConfig{ - CAData: []byte(test.FakeCACert), + CAData: []byte(testcerts.FakeCACert), ServerName: "127.0.0.1", }, } @@ -638,7 +638,7 @@ func TestExportingProcessWithDTLS(t *testing.T) { if err != nil { t.Fatalf("Got error when resolving udp address: %v", err) } - cert, err := tls.X509KeyPair([]byte(test.FakeCert2), []byte(test.FakeKey2)) + cert, err := tls.X509KeyPair([]byte(testcerts.FakeCert2), []byte(testcerts.FakeKey2)) if err != nil { t.Error(err) return @@ -679,7 +679,7 @@ func TestExportingProcessWithDTLS(t *testing.T) { ObservationDomainID: 1, TempRefTimeout: 0, TLSClientConfig: &ExporterTLSClientConfig{ - CAData: []byte(test.FakeCert2), + CAData: []byte(testcerts.FakeCert2), }, } exporter, err := InitExportingProcess(input) diff --git a/pkg/kafka/producer/kafka_test.go b/pkg/kafka/producer/kafka_test.go index a9574ee9..2d214e2d 100644 --- a/pkg/kafka/producer/kafka_test.go +++ b/pkg/kafka/producer/kafka_test.go @@ -22,13 +22,13 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) func TestInitKafkaProducerWithTLS(t *testing.T) { - caCertFile := createTmpFileAndWrite(t, test.FakeCACert, "ca-") - certFile := createTmpFileAndWrite(t, test.FakeCert, "cert-") - keyFile := createTmpFileAndWrite(t, test.FakeKey, "key-") + caCertFile := createTmpFileAndWrite(t, testcerts.FakeCACert, "ca-") + certFile := createTmpFileAndWrite(t, testcerts.FakeCert, "cert-") + keyFile := createTmpFileAndWrite(t, testcerts.FakeKey, "key-") defer closeTmpFile(t, caCertFile) defer closeTmpFile(t, certFile) defer closeTmpFile(t, keyFile) @@ -36,7 +36,7 @@ func TestInitKafkaProducerWithTLS(t *testing.T) { serverTLSConfig, err := setupTLSConfig(caCertFile.Name(), certFile.Name(), keyFile.Name(), false) assert.NoError(t, err) - doListenerTLSTest(t, serverTLSConfig, test.FakeCACert, test.FakeClientCert, test.FakeClientKey) + doListenerTLSTest(t, serverTLSConfig, testcerts.FakeCACert, testcerts.FakeClientCert, testcerts.FakeClientKey) } func doListenerTLSTest(t *testing.T, serverTLSConfig *tls.Config, caCert, clientCert, clientKey string) { @@ -49,9 +49,9 @@ func doListenerTLSTest(t *testing.T, serverTLSConfig *tls.Config, caCert, client seedBroker.Returns(new(sarama.MetadataResponse)) - caCertFile := createTmpFileAndWrite(t, test.FakeCACert, "ca-") - certFile := createTmpFileAndWrite(t, test.FakeClientCert, "clientCert-") - keyFile := createTmpFileAndWrite(t, test.FakeClientKey, "clientKey-") + caCertFile := createTmpFileAndWrite(t, testcerts.FakeCACert, "ca-") + certFile := createTmpFileAndWrite(t, testcerts.FakeClientCert, "clientCert-") + keyFile := createTmpFileAndWrite(t, testcerts.FakeClientKey, "clientKey-") defer closeTmpFile(t, caCertFile) defer closeTmpFile(t, certFile) defer closeTmpFile(t, keyFile) diff --git a/pkg/test/certs.go b/pkg/test/certs/certs.go similarity index 99% rename from pkg/test/certs.go rename to pkg/test/certs/certs.go index e175681c..6b5bdc1d 100644 --- a/pkg/test/certs.go +++ b/pkg/test/certs/certs.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package test +package certs import ( "bytes" diff --git a/pkg/test/collector_intermediate_test.go b/pkg/test/collector_intermediate_test.go index 49b9b24e..ec54af84 100644 --- a/pkg/test/collector_intermediate_test.go +++ b/pkg/test/collector_intermediate_test.go @@ -33,21 +33,6 @@ import ( "github.com/vmware/go-ipfix/pkg/registry" ) -// Run TestSingleRecordTCPTransport and TestSingleRecordTCPTransportIPv6 along with -// debug log for the message in pkg/exporter/process.go before sending it to get following -// raw bytes for template and data packets. -// Following data packets are generated with getTestRecord in util.go -// dataPacket1IPv4: getTestRecord(true, false) -// dataPacket2IPv4: getTestRecord(false, false) -// dataPacket1IPv6: getTestRecord(true, true) -// dataPacket2IPv6: getTestRecord(false, true) -var templatePacketIPv4 = []byte{0x00, 0x0a, 0x00, 0x8c, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x7c, 0x01, 0x00, 0x00, 0x14, 0x00, 0x08, 0x00, 0x04, 0x00, 0x0c, 0x00, 0x04, 0x00, 0x07, 0x00, 0x02, 0x00, 0x0b, 0x00, 0x02, 0x00, 0x04, 0x00, 0x01, 0x00, 0x96, 0x00, 0x04, 0x00, 0x97, 0x00, 0x04, 0x00, 0x88, 0x00, 0x01, 0x00, 0x56, 0x00, 0x08, 0x00, 0x02, 0x00, 0x08, 0x00, 0x55, 0x00, 0x08, 0x80, 0x65, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x67, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6c, 0x00, 0x02, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x89, 0x00, 0x01, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x88, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6a, 0x00, 0x04, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x56, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x02, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x55, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79} -var dataPacket1IPv4 = []byte{0x00, 0x0a, 0x00, 0x73, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x63, 0x0a, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x02, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf0, 0x70, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x31, 0x00, 0x12, 0x83, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x0a, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00} -var dataPacket2IPv4 = []byte{0x00, 0x0a, 0x00, 0x73, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x63, 0x0a, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x02, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf8, 0x40, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x32, 0x00, 0x00, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40} -var templatePacketIPv6 = []byte{0x00, 0x0a, 0x00, 0x8c, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x7c, 0x01, 0x00, 0x00, 0x14, 0x00, 0x1b, 0x00, 0x10, 0x00, 0x1c, 0x00, 0x10, 0x00, 0x07, 0x00, 0x02, 0x00, 0x0b, 0x00, 0x02, 0x00, 0x04, 0x00, 0x01, 0x00, 0x96, 0x00, 0x04, 0x00, 0x97, 0x00, 0x04, 0x00, 0x88, 0x00, 0x01, 0x00, 0x56, 0x00, 0x08, 0x00, 0x02, 0x00, 0x08, 0x00, 0x55, 0x00, 0x08, 0x80, 0x65, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x67, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6c, 0x00, 0x02, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x89, 0x00, 0x01, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x88, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6b, 0x00, 0x10, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x56, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x02, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x55, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79} -var dataPacket1IPv6 = []byte{0x00, 0x0a, 0x00, 0x97, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x87, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfb, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfc, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf0, 0x70, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x31, 0x00, 0x12, 0x83, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xbb, 0xbb, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xaa, 0xaa, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00} -var dataPacket2IPv6 = []byte{0x00, 0x0a, 0x00, 0x97, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x87, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfb, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfc, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf8, 0x40, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x32, 0x00, 0x00, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40} - var ( flowKeyRecordMap = make(map[intermediate.FlowKey]intermediate.AggregationFlowRecord) flowKey1 = intermediate.FlowKey{SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", Protocol: 6, SourcePort: 1234, DestinationPort: 5678} @@ -111,11 +96,6 @@ var ( aggregationWorkerNum = 2 ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - func TestCollectorToIntermediateIPv4(t *testing.T) { address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { @@ -249,19 +229,19 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) { case "octetTotalCount": assert.Equal(t, uint64(1000000), element.GetUnsigned64Value()) case "reverseOctetTotalCount": - assert.Equal(t, uint64(1000000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000), element.GetUnsigned64Value()) case "throughput": assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) case "reverseThroughput": - assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000*8/3000), element.GetUnsigned64Value()) case "throughputFromSourceNode": assert.Equal(t, uint64(800000*8/1000), element.GetUnsigned64Value()) case "throughputFromDestinationNode": assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) case "reverseThroughputFromSourceNode": - assert.Equal(t, uint64(800000*8/1000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(300000*8/1000), element.GetUnsigned64Value()) case "reverseThroughputFromDestinationNode": - assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000*8/3000), element.GetUnsigned64Value()) } } } diff --git a/pkg/test/collector_perf_test.go b/pkg/test/collector_perf_test.go index ee517a8c..17d9a37a 100644 --- a/pkg/test/collector_perf_test.go +++ b/pkg/test/collector_perf_test.go @@ -29,14 +29,8 @@ import ( "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/exporter" - "github.com/vmware/go-ipfix/pkg/registry" ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - const ( numOfExporters = 100 numOfRecords = 600 diff --git a/pkg/test/collector_producer_test.go b/pkg/test/collector_producer_test.go index 44a17970..bd15c4ec 100644 --- a/pkg/test/collector_producer_test.go +++ b/pkg/test/collector_producer_test.go @@ -12,51 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build integration // +build integration package test import ( "net" + "sync" "testing" "github.com/Shopify/sarama" saramamock "github.com/Shopify/sarama/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/kafka/producer" convertortest "github.com/vmware/go-ipfix/pkg/kafka/producer/convertor/test" - "github.com/vmware/go-ipfix/pkg/registry" ) var ( - msg1InBytes = []byte{0x0, 0x0, 0x0, 0x6a, 0x8, 0xd4, 0xcd, 0x92, 0x8c, 0x6, 0x10, - 0x1, 0x18, 0x1, 0x20, 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xf0, 0xe0, 0xe7, 0xd7, - 0x4, 0x32, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, - 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, - 0x50, 0x6, 0x58, 0xa0, 0x6, 0x60, 0x80, 0xea, 0x30, 0x68, 0xf4, 0x3, 0x78, 0xac, - 0x2, 0x80, 0x1, 0x80, 0xea, 0x30, 0x88, 0x1, 0x96, 0x1, 0x9a, 0x1, 0x4, 0x70, - 0x6f, 0x64, 0x31, 0xca, 0x1, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x33, - 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x90, 0x2, - 0x83, 0x25} - msg2InBytes = []byte{0x0, 0x0, 0x0, 0x65, 0x8, 0xd4, 0xcd, 0x92, 0x8c, 0x6, 0x10, - 0x1, 0x18, 0x1, 0x20, 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xc0, 0xf0, 0xe7, 0xd7, - 0x4, 0x32, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, - 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, - 0x6, 0x58, 0xe8, 0x7, 0x60, 0xc0, 0x84, 0x3d, 0x68, 0xf4, 0x3, 0x78, 0x90, 0x3, - 0x80, 0x1, 0xc0, 0x84, 0x3d, 0x88, 0x1, 0xc8, 0x1, 0xb2, 0x1, 0x4, 0x70, 0x6f, - 0x64, 0x32, 0xca, 0x1, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x8a, 0x2, - 0x9, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31} + msg1InBytes = []byte{0x0, 0x0, 0x0, 0x68, 0x8, 0xca, 0xe0, 0xcb, 0xee, 0x5, 0x18, 0x1, 0x20, + 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xf0, 0xe0, 0xe7, 0xd7, 0x4, 0x32, 0x8, 0x31, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, + 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, 0x6, 0x58, 0xa0, 0x6, + 0x60, 0x80, 0xea, 0x30, 0x68, 0xf4, 0x3, 0x78, 0xac, 0x2, 0x80, 0x1, 0xe0, 0xa7, + 0x12, 0x88, 0x1, 0x96, 0x1, 0x9a, 0x1, 0x4, 0x70, 0x6f, 0x64, 0x31, 0xca, 0x1, 0x8, + 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x33, 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, + 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x90, 0x2, 0x83, 0x25} + msg2InBytes = []byte{0x0, 0x0, 0x0, 0x63, 0x8, 0xca, 0xe0, 0xcb, 0xee, 0x5, 0x18, 0x1, 0x20, + 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xc0, 0xf0, 0xe7, 0xd7, 0x4, 0x32, 0x8, 0x31, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, + 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, 0x6, 0x58, 0xe8, 0x7, + 0x60, 0xc0, 0x84, 0x3d, 0x68, 0xf4, 0x3, 0x78, 0x90, 0x3, 0x80, 0x1, 0x80, 0xb5, + 0x18, 0x88, 0x1, 0xc8, 0x1, 0xb2, 0x1, 0x4, 0x70, 0x6f, 0x64, 0x32, 0xca, 0x1, 0x7, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, 0x2e, + 0x30, 0x2e, 0x30, 0x2e, 0x31} ) func TestCollectorToProducer(t *testing.T) { - // Initialize required objects. - registry.LoadRegistry() address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4739") - if err != nil { - t.Error(err) - } + require.NoError(t, err) // Initialize collecting process cpInput := collector.CollectorInput{ Address: address.String(), @@ -82,15 +79,19 @@ func TestCollectorToProducer(t *testing.T) { } mockSaramaProducer := saramamock.NewAsyncProducer(t, kafkaConfig) kafkaProducer, err := producer.NewKafkaProducer(testInput) - assert.NoError(t, err) + require.NoError(t, err) kafkaProducer.SetSaramaProducer(mockSaramaProducer) go cp.Start() waitForCollectorReady(t, cp) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() conn, err := net.DialUDP("udp", nil, address) if err != nil { - t.Errorf("UDP Collecting Process does not start correctly.") + t.Errorf("UDP Collecting Process did not start correctly.") + return } defer conn.Close() // Using the packets from collector_intermediate_test.go @@ -112,6 +113,7 @@ func TestCollectorToProducer(t *testing.T) { kafkaMsg2InBytes, _ := kafkaMsg2.Value.Encode() assert.Equalf(t, msg2InBytes, kafkaMsg2InBytes, "kafka msg should be equal to expected bytes") + wg.Wait() // making sure client connection is closed before stopping collector cp.Stop() if err := mockSaramaProducer.Close(); err != nil { t.Fatal(err) diff --git a/pkg/test/exporter_collector_test.go b/pkg/test/exporter_collector_test.go index 114d7fad..1d6d8456 100644 --- a/pkg/test/exporter_collector_test.go +++ b/pkg/test/exporter_collector_test.go @@ -23,19 +23,16 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/exporter" "github.com/vmware/go-ipfix/pkg/registry" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - func TestSingleRecordUDPTransport(t *testing.T) { address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { @@ -106,7 +103,7 @@ func TestDTLSTransport(t *testing.T) { func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipleRecord bool, isEncrypted bool, t *testing.T) { // Initialize collecting process - messages := make([]*entities.Message, 0) + messages := make([]*entities.Message, 2) cpInput := collector.CollectorInput{ Address: address.String(), Protocol: address.Network(), @@ -118,12 +115,12 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } if isEncrypted { if address.Network() == "tcp" { - cpInput.CACert = []byte(FakeCACert) - cpInput.ServerCert = []byte(FakeCert) - cpInput.ServerKey = []byte(FakeKey) + cpInput.CACert = []byte(testcerts.FakeCACert) + cpInput.ServerCert = []byte(testcerts.FakeCert) + cpInput.ServerKey = []byte(testcerts.FakeKey) } else if address.Network() == "udp" { - cpInput.ServerCert = []byte(FakeCert2) - cpInput.ServerKey = []byte(FakeKey2) + cpInput.ServerCert = []byte(testcerts.FakeCert2) + cpInput.ServerKey = []byte(testcerts.FakeKey2) } } cp, _ := collector.InitCollectingProcess(cpInput) @@ -141,11 +138,11 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl if isEncrypted { tlsClientConfig := &exporter.ExporterTLSClientConfig{} if address.Network() == "tcp" { // use TLS - tlsClientConfig.CAData = []byte(FakeCACert) - tlsClientConfig.CertData = []byte(FakeClientCert) - tlsClientConfig.KeyData = []byte(FakeClientKey) + tlsClientConfig.CAData = []byte(testcerts.FakeCACert) + tlsClientConfig.CertData = []byte(testcerts.FakeClientCert) + tlsClientConfig.KeyData = []byte(testcerts.FakeClientKey) } else if address.Network() == "udp" { // use DTLS - tlsClientConfig.CAData = []byte(FakeCert2) + tlsClientConfig.CAData = []byte(testcerts.FakeCert2) } epInput.TLSClientConfig = tlsClientConfig } @@ -168,13 +165,16 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } set := dataSet + messageIdx := 0 for message := range cp.GetMsgChan() { - messages = append(messages, message) - if len(messages) == 2 { + messages[messageIdx] = message + messageIdx++ + if messageIdx == 2 { cp.CloseMsgChan() } } cp.Stop() // Close collecting process + require.Equal(t, 2, messageIdx) templateMsg := messages[0] assert.Equal(t, uint16(10), templateMsg.GetVersion(), "Version of flow record (template) should be 10.") assert.Equal(t, uint32(1), templateMsg.GetObsDomainID(), "ObsDomainID (template) should be 1.") @@ -219,7 +219,7 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } } if err = wait.Poll(time.Millisecond, 10*time.Millisecond, checkError); err != nil { - t.Errorf("Collector process does not close correctly.") + t.Errorf("Collector process did not close correctly.") } } diff --git a/pkg/test/util.go b/pkg/test/util.go index 052677e1..26d35a2a 100644 --- a/pkg/test/util.go +++ b/pkg/test/util.go @@ -16,11 +16,19 @@ package test import ( "net" + "time" "github.com/vmware/go-ipfix/pkg/entities" + "github.com/vmware/go-ipfix/pkg/exporter" "github.com/vmware/go-ipfix/pkg/registry" ) +var ( + // First release of Antrea (v0.1.0) at KubeCon NA 2019 (San Diego) :) + sanDiegoLocation, _ = time.LoadLocation("America/Los_Angeles") + testTime = time.Date(2019, time.November, 18, 11, 26, 2, 0, sanDiegoLocation) +) + var ( commonFields = []string{ "sourceTransportPort", @@ -61,6 +69,9 @@ var ( } ) +// will be initialized in init() after loading the registry +var templatePacketIPv4, dataPacket1IPv4, dataPacket2IPv4, templatePacketIPv6, dataPacket1IPv6, dataPacket2IPv6 []byte + type testRecord struct { srcIP net.IP dstIP net.IP @@ -107,7 +118,7 @@ func getTestRecord(isSrcNode, isIPv6 bool) testRecord { record.pktCount = uint64(1000) record.pktDelta = uint64(500) record.bytCount = uint64(1000000) - record.revBytCount = uint64(1000000) + record.revBytCount = uint64(400000) record.dstSvcPort = uint16(0) record.srcPod = "" record.dstPod = "pod2" @@ -123,7 +134,7 @@ func getTestRecord(isSrcNode, isIPv6 bool) testRecord { record.pktCount = uint64(800) record.pktDelta = uint64(500) record.bytCount = uint64(800000) - record.revBytCount = uint64(800000) + record.revBytCount = uint64(300000) record.dstSvcPort = uint16(4739) record.srcPod = "pod1" record.dstPod = "" @@ -262,3 +273,33 @@ func getDataRecordElements(isSrcNode, isIPv6 bool) []entities.InfoElementWithVal } return elements } + +func getTestTemplatePacket(isIPv6 bool) []byte { + set := createTemplateSet(1 /* templateID */, isIPv6) + bytes, err := exporter.CreateIPFIXMsg(set, 1 /* obsDomainID */, 0 /* seqNumber */, testTime) + if err != nil { + panic("failed to create test template packet") + } + return bytes +} + +func getTestDataPacket(isSrcNode bool, isIPv6 bool) []byte { + set := createDataSet(1 /* templateID */, isSrcNode, isIPv6, false /* isMultipleRecord */) + bytes, err := exporter.CreateIPFIXMsg(set, 1 /* obsDomainID */, 0 /* seqNumber */, testTime) + if err != nil { + panic("failed to create test data packet") + } + return bytes +} + +func init() { + // Load the global registry + registry.LoadRegistry() + + templatePacketIPv4 = getTestTemplatePacket(false) + dataPacket1IPv4 = getTestDataPacket(true, false) + dataPacket2IPv4 = getTestDataPacket(false, false) + templatePacketIPv6 = getTestTemplatePacket(true) + dataPacket1IPv6 = getTestDataPacket(true, true) + dataPacket2IPv6 = getTestDataPacket(false, true) +} From 6d624762816b10ed349b47d07aae7c0f7e5c8db4 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Mon, 2 Oct 2023 15:23:15 -0700 Subject: [PATCH 7/8] Update CHANGELOG for v0.8.0 Signed-off-by: Yongming Ding --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbb32502..f88ff411 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format Changelog](https://keepachangelog.com/en/1.0.0/). ## Unreleased +## 0.8.0 10-02-2023 +### Added +- Add L7 visibility fields. (#315, @tushartathgur) +### Changed +- Change datatype of flowEndSecondsFromSourceNode and + flowEndSecondsFromDestinationNode. (#320, @tushartathgur) +- Improve handling of string IEs. (#322, @antoninbas) +- Avoid error logs in BenchmarkMultipleExportersToCollector. (#323, + @antoninbas) +- Improve integration tests. (#325, @antoninbas) +### Fixed +- Fix aggregation bug for throughput common fields. (#324, @antoninbas) ## 0.7.0 09-21-2023 ### Changed - Upgrade Go to v1.21, as Go v1.19 is no longer maintained. (#317, @antoninbas) From 6a646f55c26ff86788309a88fa3709cecfb5650a Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Mon, 2 Oct 2023 15:23:58 -0700 Subject: [PATCH 8/8] Update VERSION Signed-off-by: Yongming Ding --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 8b20e485..b19b5211 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.7.0 +v0.8.0