Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.10 patches #362

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. The format
Changelog](https://keepachangelog.com/en/1.0.0/).

## Unreleased
## 0.10.0 09-30-2024
## 0.10.0 08-30-2024
### Added
- Add more features to the test IPFIX collector web API. (#351, @antoninbas)
### Changed
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ require (
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

retract v0.10.0
69 changes: 61 additions & 8 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/dtls/v2"
Expand All @@ -31,6 +33,7 @@ import (
)

const startTemplateID uint16 = 255
const defaultCheckConnInterval = 10 * time.Second
const defaultJSONBufferLen = 5000

type templateValue struct {
Expand All @@ -53,11 +56,12 @@ type ExportingProcess struct {
seqNumber uint32
templateID uint16
templatesMap map[uint16]templateValue
templateRefCh chan struct{}
templateMutex sync.Mutex
sendJSONRecord bool
jsonBufferLen int
wg sync.WaitGroup
isClosed atomic.Bool
stopCh chan struct{}
}

type ExporterTLSClientConfig struct {
Expand Down Expand Up @@ -153,9 +157,37 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
seqNumber: 0,
templateID: startTemplateID,
templatesMap: make(map[uint16]templateValue),
templateRefCh: make(chan struct{}),
sendJSONRecord: input.SendJSONRecord,
wg: sync.WaitGroup{},
stopCh: make(chan struct{}),
}

// Start a goroutine to check whether the collector has already closed the TCP connection.
if input.CollectorProtocol == "tcp" {
interval := input.CheckConnInterval
if interval == 0 {
interval = defaultCheckConnInterval
}
expProc.wg.Add(1)
go func() {
defer expProc.wg.Done()
ticker := time.NewTicker(interval)
oneByteForRead := make([]byte, 1)
defer ticker.Stop()
for {
select {
case <-expProc.stopCh:
return
case <-ticker.C:
isConnected := expProc.checkConnToCollector(oneByteForRead)
if !isConnected {
klog.Error("Connector has closed its side of the TCP connection, closing our side")
expProc.closeConnToCollector()
return
}
}
}
}()
}

// Template refresh logic is only for UDP transport.
Expand All @@ -171,12 +203,14 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
defer ticker.Stop()
for {
select {
case <-expProc.templateRefCh:
case <-expProc.stopCh:
return
case <-ticker.C:
err := expProc.sendRefreshedTemplates()
if err != nil {
klog.Errorf("Error when sending refreshed templates: %v", err)
klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err)
expProc.closeConnToCollector()
return
}
}
}
Expand Down Expand Up @@ -231,16 +265,35 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int {
}

// CloseConnToCollector closes the connection to the collector.
// It should not be called twice. InitExportingProcess can be called again after calling
// CloseConnToCollector.
// It can safely be closed more than once, and subsequent calls will be no-ops.
func (ep *ExportingProcess) CloseConnToCollector() {
close(ep.templateRefCh)
ep.closeConnToCollector()
ep.wg.Wait()
}

// closeConnToCollector is the internal version of CloseConnToCollector. It closes all the resources
// but does not wait for the ep.wg counter to get to 0. Goroutines which need to terminate in order
// for ep.wg to be decremented can safely call closeConnToCollector.
func (ep *ExportingProcess) closeConnToCollector() {
if ep.isClosed.Swap(true) {
return
}
close(ep.stopCh)
if err := ep.connToCollector.Close(); err != nil {
// Just log the error that happened when closing the connection. Not returning error
// as we do not expect library consumers to exit their programs with this error.
klog.Errorf("Error when closing connection to collector: %v", err)
}
ep.wg.Wait()
}

// checkConnToCollector checks whether the connection from exporter is still open
// by trying to read from connection. Closed connection will return EOF from read.
func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool {
ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond))
if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF {
return false
}
return true
}

// NewTemplateID is called to get ID when creating new template record.
Expand Down
169 changes: 105 additions & 64 deletions pkg/exporter/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exporter
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"testing"
Expand All @@ -35,34 +36,54 @@ func init() {
registry.LoadRegistry()
}

func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
t.Log("Created local server on random available port for testing")

buffCh := make(chan []byte)
// Create go routine for local server
// TODO: Move this in to different function with byte size as arg
func runTCPServer(t *testing.T, listener net.Listener, stopCh <-chan struct{}, buffCh chan<- []byte) {
defer listener.Close()
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 32)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only template record part. Remove message header and set header.
buffCh <- buff[20:]
return
go func() {
defer close(buffCh)
buff := make([]byte, 512)
for {
bytes, err := conn.Read(buff)
if err != nil {
return
}
// Remove message header and set header.
buffCh <- buff[20:bytes]
}
}()
<-stopCh
}()
<-stopCh
}

func readWithTimeout[T any](ch <-chan T, timeout time.Duration) (T, error) {
select {
case x, ok := <-ch:
if !ok {
return *new(T), fmt.Errorf("channel was closed")
}
return x, nil
case <-time.After(timeout):
return *new(T), fmt.Errorf("timeout expired")
}
}

func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)
t.Log("Created local server on random available port for testing")

// Create exporter using local server info
input := ExporterInput{
Expand Down Expand Up @@ -104,6 +125,9 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
// 32 is the size of the IPFIX message including all headers
assert.Equal(t, 32, bytesSent)
assert.Equal(t, uint32(0), exporter.seqNumber)
bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second)
assert.NoError(t, err)
assert.Len(t, bytesAtServer, 12)
exporter.CloseConnToCollector()
}

Expand Down Expand Up @@ -195,35 +219,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) {
}

func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)
t.Log("Created local server on random available port for testing")

buffCh := make(chan []byte)
// Create go routine for local server
// TODO: Move this in to different function with byte size as arg
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 28)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only data record part. Remove message header and set header.
// TODO: Verify message header and set header through hardcoded byte values
buffCh <- buff[20:]
return
}()

// Create exporter using local server info
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
Expand Down Expand Up @@ -279,7 +283,9 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
assert.NoError(t, err)
// 28 is the size of the IPFIX message including all headers (20 bytes)
assert.Equal(t, 28, bytesSent)
assert.Equal(t, dataRecBuff, <-buffCh)
bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second)
assert.NoError(t, err)
assert.Equal(t, dataRecBuff, bytesAtServer)
assert.Equal(t, uint32(1), exporter.seqNumber)

// Create data set with multiple data records to test invalid message length
Expand Down Expand Up @@ -535,7 +541,8 @@ func TestInitExportingProcessWithTLS(t *testing.T) {
if err == nil {
exporter.CloseConnToCollector()
}
serverErr := <-serverErrCh
serverErr, err := readWithTimeout(serverErrCh, 1*time.Second)
require.NoError(t, err)
if tc.expectedServerErr != "" {
assert.ErrorContains(t, serverErr, tc.expectedServerErr)
} else {
Expand Down Expand Up @@ -566,24 +573,10 @@ func TestExportingProcessWithTLS(t *testing.T) {
return
}

stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 32)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only template record part. Remove message header and set header.
buffCh <- buff[20:]
return
}()
go runTCPServer(t, listener, stopCh, buffCh)

// Create exporter using local server info
input := ExporterInput{
Expand Down Expand Up @@ -744,3 +737,51 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) {
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit())
}

func TestExportingProcess_CheckConnToCollector(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}

defer listener.Close()
conn, _ := listener.Accept()
oneByte := make([]byte, 1)
isOpen := exporter.checkConnToCollector(oneByte)
assert.True(t, isOpen)
conn.Close()
isOpen = exporter.checkConnToCollector(oneByte)
assert.False(t, isOpen)
}

func TestExportingProcess_CloseConnToCollectorTwice(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)

input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
ObservationDomainID: 1,
TempRefTimeout: 0,
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}
t.Logf("Created exporter connecting to local server with address: %s", listener.Addr().String())

exporter.CloseConnToCollector()
exporter.CloseConnToCollector()
}
Loading