Skip to content

Commit

Permalink
Fix connection close in exporter process
Browse files Browse the repository at this point in the history
There were a couple of issues:

* ConnToCollector is exposed to the consumers of the library, and in
  Antrea we call ConnToCollector whenever sending a record fails. So the
  library itself should not call ConnToCollector as this is inherently
  racy. For example, it is possible for templateRefCh to be closed
  twice, despite the isChanClosed test (check-then-act race). We could
  add a mutex and a boolean, but I feel like it is better to just let
  consumer call ConnToCollector.
* When the exporter process is sending over UDP, there was a typo where
  a `break` was used instead of `return`. The `break` statement breaks
  from the select case, and not from the surrounding for loop, so the
  goroutine would never terminate.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed Aug 23, 2024
1 parent a4ae35d commit c7836a4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 80 deletions.
70 changes: 14 additions & 56 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"sync"
"time"
Expand All @@ -32,7 +31,6 @@ import (
)

const startTemplateID uint16 = 255
const defaultCheckConnInterval = 10 * time.Second
const defaultJSONBufferLen = 5000

type templateValue struct {
Expand All @@ -59,6 +57,7 @@ type ExportingProcess struct {
templateMutex sync.Mutex
sendJSONRecord bool
jsonBufferLen int
wg sync.WaitGroup
}

type ExporterTLSClientConfig struct {
Expand Down Expand Up @@ -156,30 +155,7 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
templatesMap: make(map[uint16]templateValue),
templateRefCh: make(chan struct{}),
sendJSONRecord: input.SendJSONRecord,
}

// Start a goroutine for checking whether connection to collector is still open
if input.CollectorProtocol == "tcp" {
interval := input.CheckConnInterval
if interval == 0 {
interval = defaultCheckConnInterval
}
go func() {
ticker := time.NewTicker(interval)
oneByteForRead := make([]byte, 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
isConnected := expProc.checkConnToCollector(oneByteForRead)
if !isConnected {
expProc.CloseConnToCollector()
klog.Error("Error when connecting to collector because connection is closed.")
return
}
}
}
}()
wg: sync.WaitGroup{},
}

// Template refresh logic is only for UDP transport.
Expand All @@ -188,19 +164,19 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
// Default value
input.TempRefTimeout = entities.TemplateRefreshTimeOut
}
expProc.wg.Add(1)
go func() {
defer expProc.wg.Done()
ticker := time.NewTicker(time.Duration(input.TempRefTimeout) * time.Second)
defer ticker.Stop()
for {
select {
case <-expProc.templateRefCh:
break
return
case <-ticker.C:
err := expProc.sendRefreshedTemplates()
if err != nil {
// Other option is sending messages through channel to library consumers
klog.Errorf("Error when sending refreshed templates: %v. Closing the connection to IPFIX controller", err)
expProc.CloseConnToCollector()
klog.Errorf("Error when sending refreshed templates: %v", err)
}
}
}
Expand Down Expand Up @@ -254,26 +230,17 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int {
return entities.MaxSocketMsgSize
}

// CloseConnToCollector closes the connection to the collector.
// It should not be called twice. InitExportingProcess can be called again after calling
// CloseConnToCollector.
func (ep *ExportingProcess) CloseConnToCollector() {
if !isChanClosed(ep.templateRefCh) {
close(ep.templateRefCh) // Close template refresh channel
}
err := ep.connToCollector.Close()
// Just log the error that happened when closing the connection. Not returning error as we do not expect library
// consumers to exit their programs with this error.
if err != nil {
close(ep.templateRefCh)
if err := ep.connToCollector.Close(); err != nil {
// Just log the error that happened when closing the connection. Not returning error
// as we do not expect library consumers to exit their programs with this error.
klog.Errorf("Error when closing connection to collector: %v", err)
}
}

// checkConnToCollector checks whether the connection from exporter is still open
// by trying to read from connection. Closed connection will return EOF from read.
func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool {
ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond))
if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF {
return false
}
return true
ep.wg.Wait()
}

// NewTemplateID is called to get ID when creating new template record.
Expand Down Expand Up @@ -450,15 +417,6 @@ func (ep *ExportingProcess) dataRecSanityCheck(rec entities.Record) error {
return nil
}

func isChanClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}

func createClientConfig(config *ExporterTLSClientConfig) (*tls.Config, error) {
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(config.CAData)
Expand Down
24 changes: 0 additions & 24 deletions pkg/exporter/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,27 +744,3 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) {
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit())
}

func TestExportingProcess_CheckConnToCollector(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Got error when creating a local server: %v", err)
}
input := ExporterInput{
CollectorAddress: listener.Addr().String(),
CollectorProtocol: listener.Addr().Network(),
}
exporter, err := InitExportingProcess(input)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}

defer listener.Close()
conn, _ := listener.Accept()
oneByte := make([]byte, 1)
isOpen := exporter.checkConnToCollector(oneByte)
assert.True(t, isOpen)
conn.Close()
isOpen = exporter.checkConnToCollector(oneByte)
assert.False(t, isOpen)
}

0 comments on commit c7836a4

Please sign in to comment.