diff --git a/CHANGELOG.md b/CHANGELOG.md index d3c1e68..d67b8cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. The format Changelog](https://keepachangelog.com/en/1.0.0/). ## Unreleased -## 0.10.0 09-30-2024 +## 0.10.0 08-30-2024 ### Added - Add more features to the test IPFIX collector web API. (#351, @antoninbas) ### Changed diff --git a/go.mod b/go.mod index 2235502..c37d311 100644 --- a/go.mod +++ b/go.mod @@ -54,3 +54,5 @@ require ( sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect ) + +retract v0.10.0 diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 60cb48e..674ff60 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -20,8 +20,10 @@ import ( "crypto/x509" "encoding/json" "fmt" + "io" "net" "sync" + "sync/atomic" "time" "github.com/pion/dtls/v2" @@ -31,6 +33,7 @@ import ( ) const startTemplateID uint16 = 255 +const defaultCheckConnInterval = 10 * time.Second const defaultJSONBufferLen = 5000 type templateValue struct { @@ -53,11 +56,12 @@ type ExportingProcess struct { seqNumber uint32 templateID uint16 templatesMap map[uint16]templateValue - templateRefCh chan struct{} templateMutex sync.Mutex sendJSONRecord bool jsonBufferLen int wg sync.WaitGroup + isClosed atomic.Bool + stopCh chan struct{} } type ExporterTLSClientConfig struct { @@ -153,9 +157,37 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { seqNumber: 0, templateID: startTemplateID, templatesMap: make(map[uint16]templateValue), - templateRefCh: make(chan struct{}), sendJSONRecord: input.SendJSONRecord, wg: sync.WaitGroup{}, + stopCh: make(chan struct{}), + } + + // Start a goroutine to check whether the collector has already closed the TCP connection. + if input.CollectorProtocol == "tcp" { + interval := input.CheckConnInterval + if interval == 0 { + interval = defaultCheckConnInterval + } + expProc.wg.Add(1) + go func() { + defer expProc.wg.Done() + ticker := time.NewTicker(interval) + oneByteForRead := make([]byte, 1) + defer ticker.Stop() + for { + select { + case <-expProc.stopCh: + return + case <-ticker.C: + isConnected := expProc.checkConnToCollector(oneByteForRead) + if !isConnected { + klog.Error("Connector has closed its side of the TCP connection, closing our side") + expProc.closeConnToCollector() + return + } + } + } + }() } // Template refresh logic is only for UDP transport. @@ -171,12 +203,14 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { defer ticker.Stop() for { select { - case <-expProc.templateRefCh: + case <-expProc.stopCh: return case <-ticker.C: err := expProc.sendRefreshedTemplates() if err != nil { - klog.Errorf("Error when sending refreshed templates: %v", err) + klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err) + expProc.closeConnToCollector() + return } } } @@ -231,16 +265,35 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int { } // CloseConnToCollector closes the connection to the collector. -// It should not be called twice. InitExportingProcess can be called again after calling -// CloseConnToCollector. +// It can safely be closed more than once, and subsequent calls will be no-ops. func (ep *ExportingProcess) CloseConnToCollector() { - close(ep.templateRefCh) + ep.closeConnToCollector() + ep.wg.Wait() +} + +// closeConnToCollector is the internal version of CloseConnToCollector. It closes all the resources +// but does not wait for the ep.wg counter to get to 0. Goroutines which need to terminate in order +// for ep.wg to be decremented can safely call closeConnToCollector. +func (ep *ExportingProcess) closeConnToCollector() { + if ep.isClosed.Swap(true) { + return + } + close(ep.stopCh) if err := ep.connToCollector.Close(); err != nil { // Just log the error that happened when closing the connection. Not returning error // as we do not expect library consumers to exit their programs with this error. klog.Errorf("Error when closing connection to collector: %v", err) } - ep.wg.Wait() +} + +// checkConnToCollector checks whether the connection from exporter is still open +// by trying to read from connection. Closed connection will return EOF from read. +func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool { + ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond)) + if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF { + return false + } + return true } // NewTemplateID is called to get ID when creating new template record. diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 26e41ed..66a245e 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -17,6 +17,7 @@ package exporter import ( "crypto/tls" "crypto/x509" + "fmt" "io" "net" "testing" @@ -35,34 +36,54 @@ func init() { registry.LoadRegistry() } -func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { - // Create local server for testing - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } - t.Log("Created local server on random available port for testing") - - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg +func runTCPServer(t *testing.T, listener net.Listener, stopCh <-chan struct{}, buffCh chan<- []byte) { + defer listener.Close() go func() { - defer listener.Close() conn, err := listener.Accept() if err != nil { + t.Error(err) return } defer conn.Close() t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return + go func() { + defer close(buffCh) + buff := make([]byte, 512) + for { + bytes, err := conn.Read(buff) + if err != nil { + return + } + // Remove message header and set header. + buffCh <- buff[20:bytes] + } + }() + <-stopCh }() + <-stopCh +} + +func readWithTimeout[T any](ch <-chan T, timeout time.Duration) (T, error) { + select { + case x, ok := <-ch: + if !ok { + return *new(T), fmt.Errorf("channel was closed") + } + return x, nil + case <-time.After(timeout): + return *new(T), fmt.Errorf("timeout expired") + } +} + +func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + // Create local server for testing + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + t.Log("Created local server on random available port for testing") // Create exporter using local server info input := ExporterInput{ @@ -104,6 +125,9 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { // 32 is the size of the IPFIX message including all headers assert.Equal(t, 32, bytesSent) assert.Equal(t, uint32(0), exporter.seqNumber) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Len(t, bytesAtServer, 12) exporter.CloseConnToCollector() } @@ -195,35 +219,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) { } func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) // Create local server for testing listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) t.Log("Created local server on random available port for testing") - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 28) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only data record part. Remove message header and set header. - // TODO: Verify message header and set header through hardcoded byte values - buffCh <- buff[20:] - return - }() - // Create exporter using local server info input := ExporterInput{ CollectorAddress: listener.Addr().String(), @@ -279,7 +283,9 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { assert.NoError(t, err) // 28 is the size of the IPFIX message including all headers (20 bytes) assert.Equal(t, 28, bytesSent) - assert.Equal(t, dataRecBuff, <-buffCh) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Equal(t, dataRecBuff, bytesAtServer) assert.Equal(t, uint32(1), exporter.seqNumber) // Create data set with multiple data records to test invalid message length @@ -535,7 +541,8 @@ func TestInitExportingProcessWithTLS(t *testing.T) { if err == nil { exporter.CloseConnToCollector() } - serverErr := <-serverErrCh + serverErr, err := readWithTimeout(serverErrCh, 1*time.Second) + require.NoError(t, err) if tc.expectedServerErr != "" { assert.ErrorContains(t, serverErr, tc.expectedServerErr) } else { @@ -566,24 +573,10 @@ func TestExportingProcessWithTLS(t *testing.T) { return } + stopCh := make(chan struct{}) + defer close(stopCh) buffCh := make(chan []byte) - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return - }() + go runTCPServer(t, listener, stopCh, buffCh) // Create exporter using local server info input := ExporterInput{ @@ -744,3 +737,51 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) { t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String()) assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit()) } + +func TestExportingProcess_CheckConnToCollector(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Got error when creating a local server: %v", err) + } + input := ExporterInput{ + CollectorAddress: listener.Addr().String(), + CollectorProtocol: listener.Addr().Network(), + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) + } + + defer listener.Close() + conn, _ := listener.Accept() + oneByte := make([]byte, 1) + isOpen := exporter.checkConnToCollector(oneByte) + assert.True(t, isOpen) + conn.Close() + isOpen = exporter.checkConnToCollector(oneByte) + assert.False(t, isOpen) +} + +func TestExportingProcess_CloseConnToCollectorTwice(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + + input := ExporterInput{ + CollectorAddress: listener.Addr().String(), + CollectorProtocol: listener.Addr().Network(), + ObservationDomainID: 1, + TempRefTimeout: 0, + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) + } + t.Logf("Created exporter connecting to local server with address: %s", listener.Addr().String()) + + exporter.CloseConnToCollector() + exporter.CloseConnToCollector() +}