Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid error logs in BenchmarkMultipleExportersToCollector #323

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
var msgLen uint16
err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen)
if err != nil {
return 0, fmt.Errorf("cannot decode message: %v", err)
return 0, fmt.Errorf("cannot decode message: %w", err)

Check warning on line 386 in pkg/collector/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/process.go#L386

Added line #L386 was not covered by tests
}
return int(msgLen), nil
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -70,25 +71,29 @@
reader := bufio.NewReader(conn)
for {
length, err := getMessageLength(reader)
if errors.Is(err, io.EOF) {
klog.V(2).InfoS("Connection was closed by client")
return
}
if err != nil {
klog.Errorf("error when retrieving message length: %v", err)
klog.ErrorS(err, "Error when retrieving message length")
cp.deleteClient(address)
return
}
buff := make([]byte, length)
_, err = io.ReadFull(reader, buff)
if err != nil {
klog.Errorf("error when reading the message: %v", err)
klog.ErrorS(err, "Error when reading the message")

Check warning on line 86 in pkg/collector/tcp.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/tcp.go#L86

Added line #L86 was not covered by tests
cp.deleteClient(address)
return
}
message, err := cp.decodePacket(bytes.NewBuffer(buff), address)
if err != nil {
klog.Error(err)
klog.ErrorS(err, "Error when decoding packet")

Check warning on line 92 in pkg/collector/tcp.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/tcp.go#L92

Added line #L92 was not covered by tests
continue
}
klog.V(4).Infof("Processed message from exporter with observation domain ID: %v ser type: %v number of records: %v",
message.GetObsDomainID(), message.GetSet().GetSetType(), message.GetSet().GetNumberOfRecords())
klog.V(4).InfoS("Processed message from exporter",
"observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords())
}
}()
<-cp.stopChan
Expand Down
11 changes: 10 additions & 1 deletion pkg/test/collector_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) {
}
go cp.Start()
waitForCollectorStatus(b, cp, true)
exporters := make([]*exporter.ExportingProcess, 0, numOfExporters)
b.ResetTimer()
for i := 0; i < numOfExporters; i++ {
b.StartTimer()
Expand All @@ -89,17 +90,25 @@ func BenchmarkMultipleExportersToCollector(b *testing.B) {
exporter.SendSet(createDataSet(templateID, true, false, false))
}
b.StopTimer()
exporters = append(exporters, exporter)
time.Sleep(time.Millisecond)
}
b.StartTimer()
count := 0
for range cp.GetMsgChan() {
count++
if count == numOfRecords*numOfExporters {
cp.Stop()
break
}
}
b.StopTimer()
// Gracefully shutdown all the exporters to avoid "use of closed network connection" error
// logs.
for i := 0; i < numOfExporters; i++ {
exporters[i].CloseConnToCollector()
}
b.StartTimer()
cp.Stop()
waitForCollectorStatus(b, cp, false)
}

Expand Down
Loading