From 06f6636929c49f54e9aaeb16128bc11331e38238 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Mon, 2 Oct 2023 14:49:32 -0700 Subject: [PATCH] Improvements in integration tests (#325) * Use more meaningful byte counts in test records (count and reverseCount have no reason to be the same). * Generate test data packets automatically using a function, instead of having to generate packet data manually. * Add integration build tag for linters Signed-off-by: Antonin Bas --- .golangci.yml | 2 + pkg/collector/process_test.go | 18 ++++---- pkg/exporter/msg.go | 56 +++++++++++++++++++++++++ pkg/exporter/process.go | 33 ++------------- pkg/exporter/process_test.go | 32 +++++++------- pkg/kafka/producer/kafka_test.go | 16 +++---- pkg/test/{ => certs}/certs.go | 2 +- pkg/test/collector_intermediate_test.go | 28 ++----------- pkg/test/collector_perf_test.go | 6 --- pkg/test/collector_producer_test.go | 52 ++++++++++++----------- pkg/test/exporter_collector_test.go | 36 ++++++++-------- pkg/test/util.go | 45 +++++++++++++++++++- 12 files changed, 188 insertions(+), 138 deletions(-) create mode 100644 pkg/exporter/msg.go rename pkg/test/{ => certs}/certs.go (99%) diff --git a/.golangci.yml b/.golangci.yml index 8db73f6c..d06a1bef 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,6 +3,8 @@ run: tests: true timeout: 10m skip-dirs-use-default: true + build-tags: + - integration linters-settings: goimports: diff --git a/pkg/collector/process_test.go b/pkg/collector/process_test.go index b6c1fd19..c7808b7c 100644 --- a/pkg/collector/process_test.go +++ b/pkg/collector/process_test.go @@ -29,7 +29,7 @@ import ( "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) var validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186} @@ -376,11 +376,11 @@ func TestTLSCollectingProcess(t *testing.T) { var config *tls.Config go func() { roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM([]byte(test.FakeCACert)) + ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCACert)) if !ok { t.Error("Failed to parse root certificate") } - cert, err := tls.X509KeyPair([]byte(test.FakeClientCert), []byte(test.FakeClientKey)) + cert, err := tls.X509KeyPair([]byte(testcerts.FakeClientCert), []byte(testcerts.FakeClientKey)) if err != nil { t.Error(err) } @@ -424,7 +424,7 @@ func TestDTLSCollectingProcess(t *testing.T) { collectorAddr, _ := net.ResolveUDPAddr("udp", cp.GetAddress().String()) go func() { roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM([]byte(test.FakeCert2)) + ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCert2)) if !ok { t.Error("Failed to parse root certificate") } @@ -517,9 +517,9 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI MaxBufferSize: 1024, TemplateTTL: 0, IsEncrypted: true, - CACert: []byte(test.FakeCACert), - ServerCert: []byte(test.FakeCert), - ServerKey: []byte(test.FakeKey), + CACert: []byte(testcerts.FakeCACert), + ServerCert: []byte(testcerts.FakeCert), + ServerKey: []byte(testcerts.FakeKey), } } else { return CollectorInput{ @@ -542,8 +542,8 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI MaxBufferSize: 1024, TemplateTTL: 0, IsEncrypted: true, - ServerCert: []byte(test.FakeCert2), - ServerKey: []byte(test.FakeKey2), + ServerCert: []byte(testcerts.FakeCert2), + ServerKey: []byte(testcerts.FakeKey2), } } else { return CollectorInput{ diff --git a/pkg/exporter/msg.go b/pkg/exporter/msg.go new file mode 100644 index 00000000..1fb25612 --- /dev/null +++ b/pkg/exporter/msg.go @@ -0,0 +1,56 @@ +// Copyright 2023 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "fmt" + "time" + + "github.com/vmware/go-ipfix/pkg/entities" +) + +func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, exportTime time.Time) ([]byte, error) { + // Create a new message and use it to send the set. + msg := entities.NewMessage(false) + + // Check if message is exceeding the limit after adding the set. Include message + // header length too. + msgLen := entities.MsgHeaderLength + set.GetSetLength() + if msgLen > entities.MaxSocketMsgSize { + // This is applicable for both TCP and UDP sockets. + return nil, fmt.Errorf("message size exceeds max socket buffer size") + } + + // Set the fields in the message header. + // IPFIX version number is 10. + // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers + msg.SetVersion(10) + msg.SetObsDomainID(obsDomainID) + msg.SetMessageLen(uint16(msgLen)) + msg.SetExportTime(uint32(exportTime.Unix())) + msg.SetSequenceNum(seqNumber) + + bytesSlice := make([]byte, msgLen) + copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader()) + copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer()) + index := entities.MsgHeaderLength + entities.SetHeaderLen + for _, record := range set.GetRecords() { + len := record.GetRecordLength() + copy(bytesSlice[index:index+len], record.GetBuffer()) + index += len + } + + return bytesSlice, nil +} diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index ed714e0d..bce94d2e 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -285,37 +285,12 @@ func (ep *ExportingProcess) NewTemplateID() uint16 { // createAndSendIPFIXMsg takes in a set as input, creates the IPFIX message, and sends it out. // TODO: This method will change when we support sending multiple sets. func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) { - // Create a new message and use it to send the set. - msg := entities.NewMessage(false) - - // Check if message is exceeding the limit after adding the set. Include message - // header length too. - msgLen := entities.MsgHeaderLength + set.GetSetLength() - if msgLen > entities.MaxSocketMsgSize { - // This is applicable for both TCP and UDP sockets. - return 0, fmt.Errorf("message size exceeds max socket buffer size") - } - - // Set the fields in the message header. - // IPFIX version number is 10. - // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers - msg.SetVersion(10) - msg.SetObsDomainID(ep.obsDomainID) - msg.SetMessageLen(uint16(msgLen)) - msg.SetExportTime(uint32(time.Now().Unix())) if set.GetSetType() == entities.Data { ep.seqNumber = ep.seqNumber + set.GetNumberOfRecords() } - msg.SetSequenceNum(ep.seqNumber) - - bytesSlice := make([]byte, msgLen) - copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader()) - copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer()) - index := entities.MsgHeaderLength + entities.SetHeaderLen - for _, record := range set.GetRecords() { - len := record.GetRecordLength() - copy(bytesSlice[index:index+len], record.GetBuffer()) - index += len + bytesSlice, err := CreateIPFIXMsg(set, ep.obsDomainID, ep.seqNumber, time.Now()) + if err != nil { + return 0, err } // Send the message on the exporter connection. @@ -323,7 +298,7 @@ func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) if err != nil { return bytesSent, fmt.Errorf("error when sending message on the connection: %v", err) - } else if bytesSent != msgLen { + } else if bytesSent != len(bytesSlice) { return bytesSent, fmt.Errorf("could not send the complete message on the connection") } diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 5839c236..0cf6975d 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -28,7 +28,7 @@ import ( "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) func init() { @@ -396,14 +396,14 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) { } func TestInitExportingProcessWithTLS(t *testing.T) { - caCert, caKey, caData, err := test.GenerateCACert() + caCert, caKey, caData, err := testcerts.GenerateCACert() require.NoError(t, err, "Error when generating CA cert") - clientCertData, clientKeyData, err := test.GenerateClientCert(caCert, caKey) + clientCertData, clientKeyData, err := testcerts.GenerateClientCert(caCert, caKey) require.NoError(t, err, "Error when generating client cert") testCases := []struct { name string - serverCertOptions []test.CertificateOption + serverCertOptions []testcerts.CertificateOption withClientAuth bool tlsClientConfig *ExporterTLSClientConfig expectedErr string @@ -419,14 +419,14 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "IP SAN", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, }, }, { name: "name SAN with matching ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, ServerName: "foobar", @@ -434,7 +434,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "name SAN with mismatching ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, ServerName: "badname", @@ -444,7 +444,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "name SAN without ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, }, @@ -453,7 +453,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth with no cert", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -462,7 +462,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth with cert", - serverCertOptions: []test.CertificateOption{test.AddIPAddress(net.ParseIP("127.0.0.1"))}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddIPAddress(net.ParseIP("127.0.0.1"))}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -472,7 +472,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { }, { name: "client auth and ServerName", - serverCertOptions: []test.CertificateOption{test.AddDNSName("foobar")}, + serverCertOptions: []testcerts.CertificateOption{testcerts.AddDNSName("foobar")}, withClientAuth: true, tlsClientConfig: &ExporterTLSClientConfig{ CAData: caData, @@ -489,7 +489,7 @@ func TestInitExportingProcessWithTLS(t *testing.T) { // Create local server for testing address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") require.NoError(t, err) - serverCertData, serverKeyData, err := test.GenerateServerCert(caCert, caKey, tc.serverCertOptions...) + serverCertData, serverKeyData, err := testcerts.GenerateServerCert(caCert, caKey, tc.serverCertOptions...) require.NoError(t, err, "Error when generating server cert") serverCert, err := tls.X509KeyPair(serverCertData, serverKeyData) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestExportingProcessWithTLS(t *testing.T) { if err != nil { t.Fatalf("Got error when resolving tcp address: %v", err) } - cer, err := tls.X509KeyPair([]byte(test.FakeCert), []byte(test.FakeKey)) + cer, err := tls.X509KeyPair([]byte(testcerts.FakeCert), []byte(testcerts.FakeKey)) if err != nil { t.Error(err) return @@ -592,7 +592,7 @@ func TestExportingProcessWithTLS(t *testing.T) { ObservationDomainID: 1, TempRefTimeout: 0, TLSClientConfig: &ExporterTLSClientConfig{ - CAData: []byte(test.FakeCACert), + CAData: []byte(testcerts.FakeCACert), ServerName: "127.0.0.1", }, } @@ -638,7 +638,7 @@ func TestExportingProcessWithDTLS(t *testing.T) { if err != nil { t.Fatalf("Got error when resolving udp address: %v", err) } - cert, err := tls.X509KeyPair([]byte(test.FakeCert2), []byte(test.FakeKey2)) + cert, err := tls.X509KeyPair([]byte(testcerts.FakeCert2), []byte(testcerts.FakeKey2)) if err != nil { t.Error(err) return @@ -679,7 +679,7 @@ func TestExportingProcessWithDTLS(t *testing.T) { ObservationDomainID: 1, TempRefTimeout: 0, TLSClientConfig: &ExporterTLSClientConfig{ - CAData: []byte(test.FakeCert2), + CAData: []byte(testcerts.FakeCert2), }, } exporter, err := InitExportingProcess(input) diff --git a/pkg/kafka/producer/kafka_test.go b/pkg/kafka/producer/kafka_test.go index a9574ee9..2d214e2d 100644 --- a/pkg/kafka/producer/kafka_test.go +++ b/pkg/kafka/producer/kafka_test.go @@ -22,13 +22,13 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "github.com/vmware/go-ipfix/pkg/test" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) func TestInitKafkaProducerWithTLS(t *testing.T) { - caCertFile := createTmpFileAndWrite(t, test.FakeCACert, "ca-") - certFile := createTmpFileAndWrite(t, test.FakeCert, "cert-") - keyFile := createTmpFileAndWrite(t, test.FakeKey, "key-") + caCertFile := createTmpFileAndWrite(t, testcerts.FakeCACert, "ca-") + certFile := createTmpFileAndWrite(t, testcerts.FakeCert, "cert-") + keyFile := createTmpFileAndWrite(t, testcerts.FakeKey, "key-") defer closeTmpFile(t, caCertFile) defer closeTmpFile(t, certFile) defer closeTmpFile(t, keyFile) @@ -36,7 +36,7 @@ func TestInitKafkaProducerWithTLS(t *testing.T) { serverTLSConfig, err := setupTLSConfig(caCertFile.Name(), certFile.Name(), keyFile.Name(), false) assert.NoError(t, err) - doListenerTLSTest(t, serverTLSConfig, test.FakeCACert, test.FakeClientCert, test.FakeClientKey) + doListenerTLSTest(t, serverTLSConfig, testcerts.FakeCACert, testcerts.FakeClientCert, testcerts.FakeClientKey) } func doListenerTLSTest(t *testing.T, serverTLSConfig *tls.Config, caCert, clientCert, clientKey string) { @@ -49,9 +49,9 @@ func doListenerTLSTest(t *testing.T, serverTLSConfig *tls.Config, caCert, client seedBroker.Returns(new(sarama.MetadataResponse)) - caCertFile := createTmpFileAndWrite(t, test.FakeCACert, "ca-") - certFile := createTmpFileAndWrite(t, test.FakeClientCert, "clientCert-") - keyFile := createTmpFileAndWrite(t, test.FakeClientKey, "clientKey-") + caCertFile := createTmpFileAndWrite(t, testcerts.FakeCACert, "ca-") + certFile := createTmpFileAndWrite(t, testcerts.FakeClientCert, "clientCert-") + keyFile := createTmpFileAndWrite(t, testcerts.FakeClientKey, "clientKey-") defer closeTmpFile(t, caCertFile) defer closeTmpFile(t, certFile) defer closeTmpFile(t, keyFile) diff --git a/pkg/test/certs.go b/pkg/test/certs/certs.go similarity index 99% rename from pkg/test/certs.go rename to pkg/test/certs/certs.go index e175681c..6b5bdc1d 100644 --- a/pkg/test/certs.go +++ b/pkg/test/certs/certs.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package test +package certs import ( "bytes" diff --git a/pkg/test/collector_intermediate_test.go b/pkg/test/collector_intermediate_test.go index 49b9b24e..ec54af84 100644 --- a/pkg/test/collector_intermediate_test.go +++ b/pkg/test/collector_intermediate_test.go @@ -33,21 +33,6 @@ import ( "github.com/vmware/go-ipfix/pkg/registry" ) -// Run TestSingleRecordTCPTransport and TestSingleRecordTCPTransportIPv6 along with -// debug log for the message in pkg/exporter/process.go before sending it to get following -// raw bytes for template and data packets. -// Following data packets are generated with getTestRecord in util.go -// dataPacket1IPv4: getTestRecord(true, false) -// dataPacket2IPv4: getTestRecord(false, false) -// dataPacket1IPv6: getTestRecord(true, true) -// dataPacket2IPv6: getTestRecord(false, true) -var templatePacketIPv4 = []byte{0x00, 0x0a, 0x00, 0x8c, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x7c, 0x01, 0x00, 0x00, 0x14, 0x00, 0x08, 0x00, 0x04, 0x00, 0x0c, 0x00, 0x04, 0x00, 0x07, 0x00, 0x02, 0x00, 0x0b, 0x00, 0x02, 0x00, 0x04, 0x00, 0x01, 0x00, 0x96, 0x00, 0x04, 0x00, 0x97, 0x00, 0x04, 0x00, 0x88, 0x00, 0x01, 0x00, 0x56, 0x00, 0x08, 0x00, 0x02, 0x00, 0x08, 0x00, 0x55, 0x00, 0x08, 0x80, 0x65, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x67, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6c, 0x00, 0x02, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x89, 0x00, 0x01, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x88, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6a, 0x00, 0x04, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x56, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x02, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x55, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79} -var dataPacket1IPv4 = []byte{0x00, 0x0a, 0x00, 0x73, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x63, 0x0a, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x02, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf0, 0x70, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x31, 0x00, 0x12, 0x83, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x0a, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00} -var dataPacket2IPv4 = []byte{0x00, 0x0a, 0x00, 0x73, 0x61, 0x84, 0xa6, 0xd4, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x63, 0x0a, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x02, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf8, 0x40, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x32, 0x00, 0x00, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40} -var templatePacketIPv6 = []byte{0x00, 0x0a, 0x00, 0x8c, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x7c, 0x01, 0x00, 0x00, 0x14, 0x00, 0x1b, 0x00, 0x10, 0x00, 0x1c, 0x00, 0x10, 0x00, 0x07, 0x00, 0x02, 0x00, 0x0b, 0x00, 0x02, 0x00, 0x04, 0x00, 0x01, 0x00, 0x96, 0x00, 0x04, 0x00, 0x97, 0x00, 0x04, 0x00, 0x88, 0x00, 0x01, 0x00, 0x56, 0x00, 0x08, 0x00, 0x02, 0x00, 0x08, 0x00, 0x55, 0x00, 0x08, 0x80, 0x65, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x67, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6c, 0x00, 0x02, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x89, 0x00, 0x01, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x88, 0xff, 0xff, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x6b, 0x00, 0x10, 0x00, 0x00, 0xdc, 0xba, 0x80, 0x56, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x02, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79, 0x80, 0x55, 0x00, 0x08, 0x00, 0x00, 0x72, 0x79} -var dataPacket1IPv6 = []byte{0x00, 0x0a, 0x00, 0x97, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x87, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfb, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfc, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf0, 0x70, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x31, 0x00, 0x12, 0x83, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xbb, 0xbb, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xaa, 0xaa, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x2c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x35, 0x00} -var dataPacket2IPv6 = []byte{0x00, 0x0a, 0x00, 0x97, 0x61, 0x84, 0xad, 0x94, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x87, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfb, 0x20, 0x01, 0x00, 0x00, 0x32, 0x38, 0xdf, 0xe1, 0x00, 0x63, 0x00, 0x00, 0x00, 0x00, 0xfe, 0xfc, 0x04, 0xd2, 0x16, 0x2e, 0x06, 0x4a, 0xf9, 0xec, 0x88, 0x4a, 0xf9, 0xf8, 0x40, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40, 0x00, 0x04, 0x70, 0x6f, 0x64, 0x32, 0x00, 0x00, 0x02, 0x0b, 0x45, 0x53, 0x54, 0x41, 0x42, 0x4c, 0x49, 0x53, 0x48, 0x45, 0x44, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x42, 0x40} - var ( flowKeyRecordMap = make(map[intermediate.FlowKey]intermediate.AggregationFlowRecord) flowKey1 = intermediate.FlowKey{SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", Protocol: 6, SourcePort: 1234, DestinationPort: 5678} @@ -111,11 +96,6 @@ var ( aggregationWorkerNum = 2 ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - func TestCollectorToIntermediateIPv4(t *testing.T) { address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { @@ -249,19 +229,19 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) { case "octetTotalCount": assert.Equal(t, uint64(1000000), element.GetUnsigned64Value()) case "reverseOctetTotalCount": - assert.Equal(t, uint64(1000000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000), element.GetUnsigned64Value()) case "throughput": assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) case "reverseThroughput": - assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000*8/3000), element.GetUnsigned64Value()) case "throughputFromSourceNode": assert.Equal(t, uint64(800000*8/1000), element.GetUnsigned64Value()) case "throughputFromDestinationNode": assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) case "reverseThroughputFromSourceNode": - assert.Equal(t, uint64(800000*8/1000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(300000*8/1000), element.GetUnsigned64Value()) case "reverseThroughputFromDestinationNode": - assert.Equal(t, uint64(1000000*8/3000), element.GetUnsigned64Value()) + assert.Equal(t, uint64(400000*8/3000), element.GetUnsigned64Value()) } } } diff --git a/pkg/test/collector_perf_test.go b/pkg/test/collector_perf_test.go index ee517a8c..17d9a37a 100644 --- a/pkg/test/collector_perf_test.go +++ b/pkg/test/collector_perf_test.go @@ -29,14 +29,8 @@ import ( "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/exporter" - "github.com/vmware/go-ipfix/pkg/registry" ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - const ( numOfExporters = 100 numOfRecords = 600 diff --git a/pkg/test/collector_producer_test.go b/pkg/test/collector_producer_test.go index 44a17970..bd15c4ec 100644 --- a/pkg/test/collector_producer_test.go +++ b/pkg/test/collector_producer_test.go @@ -12,51 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build integration // +build integration package test import ( "net" + "sync" "testing" "github.com/Shopify/sarama" saramamock "github.com/Shopify/sarama/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/kafka/producer" convertortest "github.com/vmware/go-ipfix/pkg/kafka/producer/convertor/test" - "github.com/vmware/go-ipfix/pkg/registry" ) var ( - msg1InBytes = []byte{0x0, 0x0, 0x0, 0x6a, 0x8, 0xd4, 0xcd, 0x92, 0x8c, 0x6, 0x10, - 0x1, 0x18, 0x1, 0x20, 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xf0, 0xe0, 0xe7, 0xd7, - 0x4, 0x32, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, - 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, - 0x50, 0x6, 0x58, 0xa0, 0x6, 0x60, 0x80, 0xea, 0x30, 0x68, 0xf4, 0x3, 0x78, 0xac, - 0x2, 0x80, 0x1, 0x80, 0xea, 0x30, 0x88, 0x1, 0x96, 0x1, 0x9a, 0x1, 0x4, 0x70, - 0x6f, 0x64, 0x31, 0xca, 0x1, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x33, - 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x90, 0x2, - 0x83, 0x25} - msg2InBytes = []byte{0x0, 0x0, 0x0, 0x65, 0x8, 0xd4, 0xcd, 0x92, 0x8c, 0x6, 0x10, - 0x1, 0x18, 0x1, 0x20, 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xc0, 0xf0, 0xe7, 0xd7, - 0x4, 0x32, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, - 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, - 0x6, 0x58, 0xe8, 0x7, 0x60, 0xc0, 0x84, 0x3d, 0x68, 0xf4, 0x3, 0x78, 0x90, 0x3, - 0x80, 0x1, 0xc0, 0x84, 0x3d, 0x88, 0x1, 0xc8, 0x1, 0xb2, 0x1, 0x4, 0x70, 0x6f, - 0x64, 0x32, 0xca, 0x1, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x8a, 0x2, - 0x9, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31} + msg1InBytes = []byte{0x0, 0x0, 0x0, 0x68, 0x8, 0xca, 0xe0, 0xcb, 0xee, 0x5, 0x18, 0x1, 0x20, + 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xf0, 0xe0, 0xe7, 0xd7, 0x4, 0x32, 0x8, 0x31, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, + 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, 0x6, 0x58, 0xa0, 0x6, + 0x60, 0x80, 0xea, 0x30, 0x68, 0xf4, 0x3, 0x78, 0xac, 0x2, 0x80, 0x1, 0xe0, 0xa7, + 0x12, 0x88, 0x1, 0x96, 0x1, 0x9a, 0x1, 0x4, 0x70, 0x6f, 0x64, 0x31, 0xca, 0x1, 0x8, + 0x31, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x33, 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, + 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x90, 0x2, 0x83, 0x25} + msg2InBytes = []byte{0x0, 0x0, 0x0, 0x63, 0x8, 0xca, 0xe0, 0xcb, 0xee, 0x5, 0x18, 0x1, 0x20, + 0x88, 0xd9, 0xe7, 0xd7, 0x4, 0x28, 0xc0, 0xf0, 0xe7, 0xd7, 0x4, 0x32, 0x8, 0x31, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x8, 0x31, 0x30, 0x2e, 0x30, 0x2e, + 0x30, 0x2e, 0x32, 0x40, 0xd2, 0x9, 0x48, 0xae, 0x2c, 0x50, 0x6, 0x58, 0xe8, 0x7, + 0x60, 0xc0, 0x84, 0x3d, 0x68, 0xf4, 0x3, 0x78, 0x90, 0x3, 0x80, 0x1, 0x80, 0xb5, + 0x18, 0x88, 0x1, 0xc8, 0x1, 0xb2, 0x1, 0x4, 0x70, 0x6f, 0x64, 0x32, 0xca, 0x1, 0x7, + 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x8a, 0x2, 0x9, 0x31, 0x32, 0x37, 0x2e, + 0x30, 0x2e, 0x30, 0x2e, 0x31} ) func TestCollectorToProducer(t *testing.T) { - // Initialize required objects. - registry.LoadRegistry() address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4739") - if err != nil { - t.Error(err) - } + require.NoError(t, err) // Initialize collecting process cpInput := collector.CollectorInput{ Address: address.String(), @@ -82,15 +79,19 @@ func TestCollectorToProducer(t *testing.T) { } mockSaramaProducer := saramamock.NewAsyncProducer(t, kafkaConfig) kafkaProducer, err := producer.NewKafkaProducer(testInput) - assert.NoError(t, err) + require.NoError(t, err) kafkaProducer.SetSaramaProducer(mockSaramaProducer) go cp.Start() waitForCollectorReady(t, cp) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() conn, err := net.DialUDP("udp", nil, address) if err != nil { - t.Errorf("UDP Collecting Process does not start correctly.") + t.Errorf("UDP Collecting Process did not start correctly.") + return } defer conn.Close() // Using the packets from collector_intermediate_test.go @@ -112,6 +113,7 @@ func TestCollectorToProducer(t *testing.T) { kafkaMsg2InBytes, _ := kafkaMsg2.Value.Encode() assert.Equalf(t, msg2InBytes, kafkaMsg2InBytes, "kafka msg should be equal to expected bytes") + wg.Wait() // making sure client connection is closed before stopping collector cp.Stop() if err := mockSaramaProducer.Close(); err != nil { t.Fatal(err) diff --git a/pkg/test/exporter_collector_test.go b/pkg/test/exporter_collector_test.go index 114d7fad..1d6d8456 100644 --- a/pkg/test/exporter_collector_test.go +++ b/pkg/test/exporter_collector_test.go @@ -23,19 +23,16 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/exporter" "github.com/vmware/go-ipfix/pkg/registry" + testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) -func init() { - // Load the global registry - registry.LoadRegistry() -} - func TestSingleRecordUDPTransport(t *testing.T) { address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { @@ -106,7 +103,7 @@ func TestDTLSTransport(t *testing.T) { func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipleRecord bool, isEncrypted bool, t *testing.T) { // Initialize collecting process - messages := make([]*entities.Message, 0) + messages := make([]*entities.Message, 2) cpInput := collector.CollectorInput{ Address: address.String(), Protocol: address.Network(), @@ -118,12 +115,12 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } if isEncrypted { if address.Network() == "tcp" { - cpInput.CACert = []byte(FakeCACert) - cpInput.ServerCert = []byte(FakeCert) - cpInput.ServerKey = []byte(FakeKey) + cpInput.CACert = []byte(testcerts.FakeCACert) + cpInput.ServerCert = []byte(testcerts.FakeCert) + cpInput.ServerKey = []byte(testcerts.FakeKey) } else if address.Network() == "udp" { - cpInput.ServerCert = []byte(FakeCert2) - cpInput.ServerKey = []byte(FakeKey2) + cpInput.ServerCert = []byte(testcerts.FakeCert2) + cpInput.ServerKey = []byte(testcerts.FakeKey2) } } cp, _ := collector.InitCollectingProcess(cpInput) @@ -141,11 +138,11 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl if isEncrypted { tlsClientConfig := &exporter.ExporterTLSClientConfig{} if address.Network() == "tcp" { // use TLS - tlsClientConfig.CAData = []byte(FakeCACert) - tlsClientConfig.CertData = []byte(FakeClientCert) - tlsClientConfig.KeyData = []byte(FakeClientKey) + tlsClientConfig.CAData = []byte(testcerts.FakeCACert) + tlsClientConfig.CertData = []byte(testcerts.FakeClientCert) + tlsClientConfig.KeyData = []byte(testcerts.FakeClientKey) } else if address.Network() == "udp" { // use DTLS - tlsClientConfig.CAData = []byte(FakeCert2) + tlsClientConfig.CAData = []byte(testcerts.FakeCert2) } epInput.TLSClientConfig = tlsClientConfig } @@ -168,13 +165,16 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } set := dataSet + messageIdx := 0 for message := range cp.GetMsgChan() { - messages = append(messages, message) - if len(messages) == 2 { + messages[messageIdx] = message + messageIdx++ + if messageIdx == 2 { cp.CloseMsgChan() } } cp.Stop() // Close collecting process + require.Equal(t, 2, messageIdx) templateMsg := messages[0] assert.Equal(t, uint16(10), templateMsg.GetVersion(), "Version of flow record (template) should be 10.") assert.Equal(t, uint32(1), templateMsg.GetObsDomainID(), "ObsDomainID (template) should be 1.") @@ -219,7 +219,7 @@ func testExporterToCollector(address net.Addr, isSrcNode, isIPv6 bool, isMultipl } } if err = wait.Poll(time.Millisecond, 10*time.Millisecond, checkError); err != nil { - t.Errorf("Collector process does not close correctly.") + t.Errorf("Collector process did not close correctly.") } } diff --git a/pkg/test/util.go b/pkg/test/util.go index 052677e1..26d35a2a 100644 --- a/pkg/test/util.go +++ b/pkg/test/util.go @@ -16,11 +16,19 @@ package test import ( "net" + "time" "github.com/vmware/go-ipfix/pkg/entities" + "github.com/vmware/go-ipfix/pkg/exporter" "github.com/vmware/go-ipfix/pkg/registry" ) +var ( + // First release of Antrea (v0.1.0) at KubeCon NA 2019 (San Diego) :) + sanDiegoLocation, _ = time.LoadLocation("America/Los_Angeles") + testTime = time.Date(2019, time.November, 18, 11, 26, 2, 0, sanDiegoLocation) +) + var ( commonFields = []string{ "sourceTransportPort", @@ -61,6 +69,9 @@ var ( } ) +// will be initialized in init() after loading the registry +var templatePacketIPv4, dataPacket1IPv4, dataPacket2IPv4, templatePacketIPv6, dataPacket1IPv6, dataPacket2IPv6 []byte + type testRecord struct { srcIP net.IP dstIP net.IP @@ -107,7 +118,7 @@ func getTestRecord(isSrcNode, isIPv6 bool) testRecord { record.pktCount = uint64(1000) record.pktDelta = uint64(500) record.bytCount = uint64(1000000) - record.revBytCount = uint64(1000000) + record.revBytCount = uint64(400000) record.dstSvcPort = uint16(0) record.srcPod = "" record.dstPod = "pod2" @@ -123,7 +134,7 @@ func getTestRecord(isSrcNode, isIPv6 bool) testRecord { record.pktCount = uint64(800) record.pktDelta = uint64(500) record.bytCount = uint64(800000) - record.revBytCount = uint64(800000) + record.revBytCount = uint64(300000) record.dstSvcPort = uint16(4739) record.srcPod = "pod1" record.dstPod = "" @@ -262,3 +273,33 @@ func getDataRecordElements(isSrcNode, isIPv6 bool) []entities.InfoElementWithVal } return elements } + +func getTestTemplatePacket(isIPv6 bool) []byte { + set := createTemplateSet(1 /* templateID */, isIPv6) + bytes, err := exporter.CreateIPFIXMsg(set, 1 /* obsDomainID */, 0 /* seqNumber */, testTime) + if err != nil { + panic("failed to create test template packet") + } + return bytes +} + +func getTestDataPacket(isSrcNode bool, isIPv6 bool) []byte { + set := createDataSet(1 /* templateID */, isSrcNode, isIPv6, false /* isMultipleRecord */) + bytes, err := exporter.CreateIPFIXMsg(set, 1 /* obsDomainID */, 0 /* seqNumber */, testTime) + if err != nil { + panic("failed to create test data packet") + } + return bytes +} + +func init() { + // Load the global registry + registry.LoadRegistry() + + templatePacketIPv4 = getTestTemplatePacket(false) + dataPacket1IPv4 = getTestDataPacket(true, false) + dataPacket2IPv4 = getTestDataPacket(false, false) + templatePacketIPv6 = getTestTemplatePacket(true) + dataPacket1IPv6 = getTestDataPacket(true, true) + dataPacket2IPv6 = getTestDataPacket(false, true) +}