From b49492162814e8d16ec714f39d20c6d89300dcb7 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 29 Sep 2023 12:25:57 -0700 Subject: [PATCH] Avoid error logs in BenchmarkMultipleExportersToCollector * Gracefully shutdown exporter connections before stopping collector. * Do not log an error for graceful client shutdown (EOF error). Signed-off-by: Antonin Bas --- pkg/collector/process.go | 2 +- pkg/collector/tcp.go | 15 ++++++++++----- pkg/test/collector_perf_test.go | 11 ++++++++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/collector/process.go b/pkg/collector/process.go index ef4e23b7..cb59ffca 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -383,7 +383,7 @@ func getMessageLength(reader *bufio.Reader) (int, error) { var msgLen uint16 err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen) if err != nil { - return 0, fmt.Errorf("cannot decode message: %v", err) + return 0, fmt.Errorf("cannot decode message: %w", err) } return int(msgLen), nil } diff --git a/pkg/collector/tcp.go b/pkg/collector/tcp.go index 3f3b2acc..d4b39fba 100644 --- a/pkg/collector/tcp.go +++ b/pkg/collector/tcp.go @@ -5,6 +5,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "net" @@ -70,25 +71,29 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { reader := bufio.NewReader(conn) for { length, err := getMessageLength(reader) + if errors.Is(err, io.EOF) { + klog.V(2).InfoS("Connection was closed by client") + return + } if err != nil { - klog.Errorf("error when retrieving message length: %v", err) + klog.ErrorS(err, "Error when retrieving message length") cp.deleteClient(address) return } buff := make([]byte, length) _, err = io.ReadFull(reader, buff) if err != nil { - klog.Errorf("error when reading the message: %v", err) + klog.ErrorS(err, "Error when reading the message") cp.deleteClient(address) return } message, err := cp.decodePacket(bytes.NewBuffer(buff), address) if err != nil { - klog.Error(err) + klog.ErrorS(err, "Error when decoding packet") continue } - klog.V(4).Infof("Processed message from exporter with observation domain ID: %v ser type: %v number of records: %v", - message.GetObsDomainID(), message.GetSet().GetSetType(), message.GetSet().GetNumberOfRecords()) + klog.V(4).InfoS("Processed message from exporter", + "observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords()) } }() <-cp.stopChan diff --git a/pkg/test/collector_perf_test.go b/pkg/test/collector_perf_test.go index 4ebb3d21..ee517a8c 100644 --- a/pkg/test/collector_perf_test.go +++ b/pkg/test/collector_perf_test.go @@ -71,6 +71,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { } go cp.Start() waitForCollectorStatus(b, cp, true) + exporters := make([]*exporter.ExportingProcess, 0, numOfExporters) b.ResetTimer() for i := 0; i < numOfExporters; i++ { b.StartTimer() @@ -89,6 +90,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { exporter.SendSet(createDataSet(templateID, true, false, false)) } b.StopTimer() + exporters = append(exporters, exporter) time.Sleep(time.Millisecond) } b.StartTimer() @@ -96,10 +98,17 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) { for range cp.GetMsgChan() { count++ if count == numOfRecords*numOfExporters { - cp.Stop() break } } + b.StopTimer() + // Gracefully shutdown all the exporters to avoid "use of closed network connection" error + // logs. + for i := 0; i < numOfExporters; i++ { + exporters[i].CloseConnToCollector() + } + b.StartTimer() + cp.Stop() waitForCollectorStatus(b, cp, false) }