Skip to content

Commit

Permalink
Restore check for closed TCP connection in exporter process (#360)
Browse files Browse the repository at this point in the history
This is a partial reversal of d072ed8.
It turns out that the check can actually be useful as it can detect that
the collector ("server" side) has closed its side of the connection, and
this can be used as a signal to close our side of the connection as
well. This can happen when a process using our collector implementation
is closed, but no TCP RST is received when sendign a data set (in
particular, this can happen when running in K8s and using a virtual IP
to connect to the collector). This check can detect the issue much
faster than relying on a keep-alive timeout. Furthermore, a client of
this library could end up blocking if the connection has not timed out
yet and the send buffer is full.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas authored Sep 5, 2024
1 parent 14ced93 commit 2c76c1e
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 72 deletions.
69 changes: 61 additions & 8 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/dtls/v2"
Expand All @@ -31,6 +33,7 @@ import (
)

const startTemplateID uint16 = 255
const defaultCheckConnInterval = 10 * time.Second
const defaultJSONBufferLen = 5000

type templateValue struct {
Expand All @@ -53,11 +56,12 @@ type ExportingProcess struct {
seqNumber uint32
templateID uint16
templatesMap map[uint16]templateValue
templateRefCh chan struct{}
templateMutex sync.Mutex
sendJSONRecord bool
jsonBufferLen int
wg sync.WaitGroup
isClosed atomic.Bool
stopCh chan struct{}
}

type ExporterTLSClientConfig struct {
Expand Down Expand Up @@ -153,9 +157,37 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
seqNumber: 0,
templateID: startTemplateID,
templatesMap: make(map[uint16]templateValue),
templateRefCh: make(chan struct{}),
sendJSONRecord: input.SendJSONRecord,
wg: sync.WaitGroup{},
stopCh: make(chan struct{}),
}

// Start a goroutine to check whether the collector has already closed the TCP connection.
if input.CollectorProtocol == "tcp" {
interval := input.CheckConnInterval
if interval == 0 {
interval = defaultCheckConnInterval
}
expProc.wg.Add(1)
go func() {
defer expProc.wg.Done()
ticker := time.NewTicker(interval)
oneByteForRead := make([]byte, 1)
defer ticker.Stop()
for {
select {
case <-expProc.stopCh:
return
case <-ticker.C:
isConnected := expProc.checkConnToCollector(oneByteForRead)
if !isConnected {
klog.Error("Connector has closed its side of the TCP connection, closing our side")
expProc.closeConnToCollector()
return
}
}
}
}()
}

// Template refresh logic is only for UDP transport.
Expand All @@ -171,12 +203,14 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
defer ticker.Stop()
for {
select {
case <-expProc.templateRefCh:
case <-expProc.stopCh:
return
case <-ticker.C:
err := expProc.sendRefreshedTemplates()
if err != nil {
klog.Errorf("Error when sending refreshed templates: %v", err)
klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err)
expProc.closeConnToCollector()
return
}
}
}
Expand Down Expand Up @@ -231,16 +265,35 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int {
}

// CloseConnToCollector closes the connection to the collector.
// It should not be called twice. InitExportingProcess can be called again after calling
// CloseConnToCollector.
// It can safely be closed more than once, and subsequent calls will be no-ops.
func (ep *ExportingProcess) CloseConnToCollector() {
close(ep.templateRefCh)
ep.closeConnToCollector()
ep.wg.Wait()
}

// closeConnToCollector is the internal version of CloseConnToCollector. It closes all the resources
// but does not wait for the ep.wg counter to get to 0. Goroutines which need to terminate in order
// for ep.wg to be decremented can safely call closeConnToCollector.
func (ep *ExportingProcess) closeConnToCollector() {
if ep.isClosed.Swap(true) {
return
}
close(ep.stopCh)
if err := ep.connToCollector.Close(); err != nil {
// Just log the error that happened when closing the connection. Not returning error
// as we do not expect library consumers to exit their programs with this error.
klog.Errorf("Error when closing connection to collector: %v", err)
}
ep.wg.Wait()
}

// checkConnToCollector checks whether the connection from exporter is still open
// by trying to read from connection. Closed connection will return EOF from read.
func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool {
ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond))
if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF {
return false
}
return true
}

// NewTemplateID is called to get ID when creating new template record.
Expand Down
169 changes: 105 additions & 64 deletions pkg/exporter/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exporter
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"testing"
Expand All @@ -35,34 +36,54 @@ func init() {
registry.LoadRegistry()
}

func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
t.Log("Created local server on random available port for testing")

buffCh := make(chan []byte)
// Create go routine for local server
// TODO: Move this in to different function with byte size as arg
func runTCPServer(t *testing.T, listener net.Listener, stopCh <-chan struct{}, buffCh chan<- []byte) {
defer listener.Close()
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 32)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only template record part. Remove message header and set header.
buffCh <- buff[20:]
return
go func() {
defer close(buffCh)
buff := make([]byte, 512)
for {
bytes, err := conn.Read(buff)
if err != nil {
return
}
// Remove message header and set header.
buffCh <- buff[20:bytes]
}
}()
<-stopCh
}()
<-stopCh
}

func readWithTimeout[T any](ch <-chan T, timeout time.Duration) (T, error) {
select {
case x, ok := <-ch:
if !ok {
return *new(T), fmt.Errorf("channel was closed")
}
return x, nil
case <-time.After(timeout):
return *new(T), fmt.Errorf("timeout expired")
}
}

func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)
t.Log("Created local server on random available port for testing")

// Create exporter using local server info
input := ExporterInput{
Expand Down Expand Up @@ -104,6 +125,9 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
// 32 is the size of the IPFIX message including all headers
assert.Equal(t, 32, bytesSent)
assert.Equal(t, uint32(0), exporter.seqNumber)
bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second)
assert.NoError(t, err)
assert.Len(t, bytesAtServer, 12)
exporter.CloseConnToCollector()
}

Expand Down Expand Up @@ -195,35 +219,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) {
}

func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
// Create local server for testing
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)
t.Log("Created local server on random available port for testing")

buffCh := make(chan []byte)
// Create go routine for local server
// TODO: Move this in to different function with byte size as arg
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 28)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only data record part. Remove message header and set header.
// TODO: Verify message header and set header through hardcoded byte values
buffCh <- buff[20:]
return
}()

// Create exporter using local server info
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
Expand Down Expand Up @@ -279,7 +283,9 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
assert.NoError(t, err)
// 28 is the size of the IPFIX message including all headers (20 bytes)
assert.Equal(t, 28, bytesSent)
assert.Equal(t, dataRecBuff, <-buffCh)
bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second)
assert.NoError(t, err)
assert.Equal(t, dataRecBuff, bytesAtServer)
assert.Equal(t, uint32(1), exporter.seqNumber)

// Create data set with multiple data records to test invalid message length
Expand Down Expand Up @@ -535,7 +541,8 @@ func TestInitExportingProcessWithTLS(t *testing.T) {
if err == nil {
exporter.CloseConnToCollector()
}
serverErr := <-serverErrCh
serverErr, err := readWithTimeout(serverErrCh, 1*time.Second)
require.NoError(t, err)
if tc.expectedServerErr != "" {
assert.ErrorContains(t, serverErr, tc.expectedServerErr)
} else {
Expand Down Expand Up @@ -566,24 +573,10 @@ func TestExportingProcessWithTLS(t *testing.T) {
return
}

stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
go func() {
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
return
}
defer conn.Close()
t.Log("Accept the connection from exporter")
buff := make([]byte, 32)
_, err = conn.Read(buff)
if err != nil {
t.Error(err)
}
// Compare only template record part. Remove message header and set header.
buffCh <- buff[20:]
return
}()
go runTCPServer(t, listener, stopCh, buffCh)

// Create exporter using local server info
input := ExporterInput{
Expand Down Expand Up @@ -744,3 +737,51 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) {
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit())
}

func TestExportingProcess_CheckConnToCollector(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}

defer listener.Close()
conn, _ := listener.Accept()
oneByte := make([]byte, 1)
isOpen := exporter.checkConnToCollector(oneByte)
assert.True(t, isOpen)
conn.Close()
isOpen = exporter.checkConnToCollector(oneByte)
assert.False(t, isOpen)
}

func TestExportingProcess_CloseConnToCollectorTwice(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
buffCh := make(chan []byte)
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Error when creating a local server")
go runTCPServer(t, listener, stopCh, buffCh)

input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
ObservationDomainID: 1,
TempRefTimeout: 0,
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}
t.Logf("Created exporter connecting to local server with address: %s", listener.Addr().String())

exporter.CloseConnToCollector()
exporter.CloseConnToCollector()
}

0 comments on commit 2c76c1e

Please sign in to comment.