diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 0cf6975..ee5f4ba 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -17,6 +17,7 @@ package exporter import ( "crypto/tls" "crypto/x509" + "fmt" "io" "net" "testing" @@ -35,34 +36,54 @@ func init() { registry.LoadRegistry() } -func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { - // Create local server for testing - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } - t.Log("Created local server on random available port for testing") - - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg +func runTCPServer(t *testing.T, listener net.Listener, stopCh <-chan struct{}, buffCh chan<- []byte) { + defer listener.Close() go func() { - defer listener.Close() conn, err := listener.Accept() if err != nil { + t.Error(err) return } defer conn.Close() t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return + go func() { + defer close(buffCh) + buff := make([]byte, 512) + for { + bytes, err := conn.Read(buff) + if err != nil { + return + } + // Remove message header and set header. + buffCh <- buff[20:bytes] + } + }() + <-stopCh }() + <-stopCh +} + +func readWithTimeout[T any](ch <-chan T, timeout time.Duration) (T, error) { + select { + case x, ok := <-ch: + if !ok { + return *new(T), fmt.Errorf("channel was closed") + } + return x, nil + case <-time.After(timeout): + return *new(T), fmt.Errorf("timeout expired") + } +} + +func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + // Create local server for testing + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + t.Log("Created local server on random available port for testing") // Create exporter using local server info input := ExporterInput{ @@ -104,6 +125,9 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { // 32 is the size of the IPFIX message including all headers assert.Equal(t, 32, bytesSent) assert.Equal(t, uint32(0), exporter.seqNumber) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Len(t, bytesAtServer, 12) exporter.CloseConnToCollector() } @@ -195,35 +219,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) { } func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) // Create local server for testing listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) t.Log("Created local server on random available port for testing") - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 28) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only data record part. Remove message header and set header. - // TODO: Verify message header and set header through hardcoded byte values - buffCh <- buff[20:] - return - }() - // Create exporter using local server info input := ExporterInput{ CollectorAddress: listener.Addr().String(), @@ -279,7 +283,9 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { assert.NoError(t, err) // 28 is the size of the IPFIX message including all headers (20 bytes) assert.Equal(t, 28, bytesSent) - assert.Equal(t, dataRecBuff, <-buffCh) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Equal(t, dataRecBuff, bytesAtServer) assert.Equal(t, uint32(1), exporter.seqNumber) // Create data set with multiple data records to test invalid message length @@ -566,24 +572,10 @@ func TestExportingProcessWithTLS(t *testing.T) { return } + stopCh := make(chan struct{}) + defer close(stopCh) buffCh := make(chan []byte) - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return - }() + go runTCPServer(t, listener, stopCh, buffCh) // Create exporter using local server info input := ExporterInput{ @@ -768,3 +760,27 @@ func TestExportingProcess_CheckConnToCollector(t *testing.T) { isOpen = exporter.checkConnToCollector(oneByte) assert.False(t, isOpen) } + +func TestExportingProcess_CloseConnToCollectorTwice(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + + input := ExporterInput{ + CollectorAddress: listener.Addr().String(), + CollectorProtocol: listener.Addr().Network(), + ObservationDomainID: 1, + TempRefTimeout: 0, + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) + } + t.Logf("Created exporter connecting to local server with address: %s", listener.Addr().String()) + + exporter.CloseConnToCollector() + exporter.CloseConnToCollector() +}