Skip to content

Commit

Permalink
Restore check for closed TCP connection in exporter process
Browse files Browse the repository at this point in the history
This is a partial reversal of d072ed8.
It turns out that the check can actually be useful as it can detect that
the collector ("server" side) has closed its side of the connection, and
this can be used as a signal to close our side of the connection as
well. This can happen when a process using our collector implementation
is closed, but no TCP RST is received when sendign a data set (in
particular, this can happen when running in K8s and using a virtual IP
to connect to the collector). This check can detect the issue much
faster than relying on a keep-alive timeout. Furthermore, a client of
this library could end up blocking if the connection has not timed out
yet and the send buffer is full.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed Sep 4, 2024
1 parent 8c4efa4 commit ebfb249
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
69 changes: 61 additions & 8 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/dtls/v2"
Expand All @@ -31,6 +33,7 @@ import (
)

const startTemplateID uint16 = 255
const defaultCheckConnInterval = 10 * time.Second
const defaultJSONBufferLen = 5000

type templateValue struct {
Expand All @@ -53,11 +56,12 @@ type ExportingProcess struct {
seqNumber uint32
templateID uint16
templatesMap map[uint16]templateValue
templateRefCh chan struct{}
templateMutex sync.Mutex
sendJSONRecord bool
jsonBufferLen int
wg sync.WaitGroup
isClosed atomic.Bool
stopCh chan struct{}
}

type ExporterTLSClientConfig struct {
Expand Down Expand Up @@ -153,9 +157,37 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
seqNumber: 0,
templateID: startTemplateID,
templatesMap: make(map[uint16]templateValue),
templateRefCh: make(chan struct{}),
sendJSONRecord: input.SendJSONRecord,
wg: sync.WaitGroup{},
stopCh: make(chan struct{}),
}

// Start a goroutine to check whether the collector has already closed the TCP connection.
if input.CollectorProtocol == "tcp" {
interval := input.CheckConnInterval
if interval == 0 {
interval = defaultCheckConnInterval
}
expProc.wg.Add(1)
go func() {
defer expProc.wg.Done()
ticker := time.NewTicker(interval)
oneByteForRead := make([]byte, 1)
defer ticker.Stop()
for {
select {
case <-expProc.stopCh:
return
case <-ticker.C:
isConnected := expProc.checkConnToCollector(oneByteForRead)
if !isConnected {
klog.Error("Connector has closed its side of the TCP connection, closing our side")
expProc.closeConnToCollector()
return
}
}
}
}()
}

// Template refresh logic is only for UDP transport.
Expand All @@ -171,12 +203,14 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
defer ticker.Stop()
for {
select {
case <-expProc.templateRefCh:
case <-expProc.stopCh:
return
case <-ticker.C:
err := expProc.sendRefreshedTemplates()
if err != nil {
klog.Errorf("Error when sending refreshed templates: %v", err)
klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err)
expProc.closeConnToCollector()
return
}
}
}
Expand Down Expand Up @@ -231,16 +265,35 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int {
}

// CloseConnToCollector closes the connection to the collector.
// It should not be called twice. InitExportingProcess can be called again after calling
// CloseConnToCollector.
// It can safely be closed more than once, and subsequent calls will be no-ops.
func (ep *ExportingProcess) CloseConnToCollector() {
close(ep.templateRefCh)
ep.closeConnToCollector()
ep.wg.Wait()
}

// closeConnToCollector is the internal version of CloseConnToCollector. It closes all the resources
// but does not wait for the ep.wg counter to get to 0. Goroutines which need to terminate in order
// for ep.wg to be decremented can safely call closeConnToCollector.
func (ep *ExportingProcess) closeConnToCollector() {
if ep.isClosed.Swap(true) {
return
}
close(ep.stopCh)
if err := ep.connToCollector.Close(); err != nil {
// Just log the error that happened when closing the connection. Not returning error
// as we do not expect library consumers to exit their programs with this error.
klog.Errorf("Error when closing connection to collector: %v", err)
}
ep.wg.Wait()
}

// checkConnToCollector checks whether the connection from exporter is still open
// by trying to read from connection. Closed connection will return EOF from read.
func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool {
ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond))
if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF {
return false
}
return true
}

// NewTemplateID is called to get ID when creating new template record.
Expand Down
24 changes: 24 additions & 0 deletions pkg/exporter/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,3 +744,27 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) {
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit())
}

func TestExportingProcess_CheckConnToCollector(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}

defer listener.Close()
conn, _ := listener.Accept()
oneByte := make([]byte, 1)
isOpen := exporter.checkConnToCollector(oneByte)
assert.True(t, isOpen)
conn.Close()
isOpen = exporter.checkConnToCollector(oneByte)
assert.False(t, isOpen)
}

0 comments on commit ebfb249

Please sign in to comment.