Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for release v0.8.0 #326

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ run:
tests: true
timeout: 10m
skip-dirs-use-default: true
build-tags:
- integration

linters-settings:
goimports:
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format
Changelog](https://keepachangelog.com/en/1.0.0/).

## Unreleased
## 0.8.0 10-02-2023
### Added
- Add L7 visibility fields. (#315, @tushartathgur)
### Changed
- Change datatype of flowEndSecondsFromSourceNode and
flowEndSecondsFromDestinationNode. (#320, @tushartathgur)
- Improve handling of string IEs. (#322, @antoninbas)
- Avoid error logs in BenchmarkMultipleExportersToCollector. (#323,
@antoninbas)
- Improve integration tests. (#325, @antoninbas)
### Fixed
- Fix aggregation bug for throughput common fields. (#324, @antoninbas)
## 0.7.0 09-21-2023
### Changed
- Upgrade Go to v1.21, as Go v1.19 is no longer maintained. (#317, @antoninbas)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.7.0
v0.8.0
2 changes: 1 addition & 1 deletion pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
var msgLen uint16
err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen)
if err != nil {
return 0, fmt.Errorf("cannot decode message: %v", err)
return 0, fmt.Errorf("cannot decode message: %w", err)

Check warning on line 386 in pkg/collector/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/process.go#L386

Added line #L386 was not covered by tests
}
return int(msgLen), nil
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/collector/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"github.com/vmware/go-ipfix/pkg/test"
testcerts "github.com/vmware/go-ipfix/pkg/test/certs"
)

var validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186}
Expand Down Expand Up @@ -376,11 +376,11 @@ func TestTLSCollectingProcess(t *testing.T) {
var config *tls.Config
go func() {
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM([]byte(test.FakeCACert))
ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCACert))
if !ok {
t.Error("Failed to parse root certificate")
}
cert, err := tls.X509KeyPair([]byte(test.FakeClientCert), []byte(test.FakeClientKey))
cert, err := tls.X509KeyPair([]byte(testcerts.FakeClientCert), []byte(testcerts.FakeClientKey))
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestDTLSCollectingProcess(t *testing.T) {
collectorAddr, _ := net.ResolveUDPAddr("udp", cp.GetAddress().String())
go func() {
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM([]byte(test.FakeCert2))
ok := roots.AppendCertsFromPEM([]byte(testcerts.FakeCert2))
if !ok {
t.Error("Failed to parse root certificate")
}
Expand Down Expand Up @@ -517,9 +517,9 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI
MaxBufferSize: 1024,
TemplateTTL: 0,
IsEncrypted: true,
CACert: []byte(test.FakeCACert),
ServerCert: []byte(test.FakeCert),
ServerKey: []byte(test.FakeKey),
CACert: []byte(testcerts.FakeCACert),
ServerCert: []byte(testcerts.FakeCert),
ServerKey: []byte(testcerts.FakeKey),
}
} else {
return CollectorInput{
Expand All @@ -542,8 +542,8 @@ func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorI
MaxBufferSize: 1024,
TemplateTTL: 0,
IsEncrypted: true,
ServerCert: []byte(test.FakeCert2),
ServerKey: []byte(test.FakeKey2),
ServerCert: []byte(testcerts.FakeCert2),
ServerKey: []byte(testcerts.FakeKey2),
}
} else {
return CollectorInput{
Expand Down
15 changes: 10 additions & 5 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -70,25 +71,29 @@
reader := bufio.NewReader(conn)
for {
length, err := getMessageLength(reader)
if errors.Is(err, io.EOF) {
klog.V(2).InfoS("Connection was closed by client")
return
}
if err != nil {
klog.Errorf("error when retrieving message length: %v", err)
klog.ErrorS(err, "Error when retrieving message length")
cp.deleteClient(address)
return
}
buff := make([]byte, length)
_, err = io.ReadFull(reader, buff)
if err != nil {
klog.Errorf("error when reading the message: %v", err)
klog.ErrorS(err, "Error when reading the message")

Check warning on line 86 in pkg/collector/tcp.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/tcp.go#L86

Added line #L86 was not covered by tests
cp.deleteClient(address)
return
}
message, err := cp.decodePacket(bytes.NewBuffer(buff), address)
if err != nil {
klog.Error(err)
klog.ErrorS(err, "Error when decoding packet")

Check warning on line 92 in pkg/collector/tcp.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/tcp.go#L92

Added line #L92 was not covered by tests
continue
}
klog.V(4).Infof("Processed message from exporter with observation domain ID: %v ser type: %v number of records: %v",
message.GetObsDomainID(), message.GetSet().GetSetType(), message.GetSet().GetNumberOfRecords())
klog.V(4).InfoS("Processed message from exporter",
"observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords())
}
}()
<-cp.stopChan
Expand Down
28 changes: 13 additions & 15 deletions pkg/entities/ie.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,16 +485,14 @@
if len(v) < 255 {
encodedBytes = make([]byte, len(v)+1)
encodedBytes[0] = uint8(len(v))
for i, b := range v {
encodedBytes[i+1] = byte(b)
}
} else if len(v) < 65535 {
copy(encodedBytes[1:], v)
} else if len(v) <= math.MaxUint16 {
encodedBytes = make([]byte, len(v)+3)
encodedBytes[0] = byte(255)
binary.BigEndian.PutUint16(encodedBytes[1:3], uint16(len(v)))
for i, b := range v {
encodedBytes[i+3] = byte(b)
}
copy(encodedBytes[3:], v)
} else {
return nil, fmt.Errorf("provided String value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16)

Check warning on line 495 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L493-L495

Added lines #L493 - L495 were not covered by tests
}
return encodedBytes, nil
}
Expand Down Expand Up @@ -560,15 +558,15 @@
v := element.GetStringValue()
if len(v) < 255 {
buffer[index] = uint8(len(v))
for i, b := range v {
buffer[i+index+1] = byte(b)
}
} else if len(v) < 65535 {
buffer[index] = byte(255)
// See https://pkg.go.dev/builtin#copy
// As a special case, it also will copy bytes from a string to a slice of bytes.
copy(buffer[index+1:], v)
} else if len(v) <= math.MaxUint16 {
buffer[index] = byte(255) // marker byte for long strings

Check warning on line 565 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L565

Added line #L565 was not covered by tests
binary.BigEndian.PutUint16(buffer[index+1:index+3], uint16(len(v)))
for i, b := range v {
buffer[i+index+3] = byte(b)
}
copy(buffer[index+3:], v)
} else {
return fmt.Errorf("provided String value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16)

Check warning on line 569 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L567-L569

Added lines #L567 - L569 were not covered by tests
}
default:
return fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011")
Expand Down
36 changes: 36 additions & 0 deletions pkg/entities/ie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package entities

import (
"net"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var macAddress, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
Expand Down Expand Up @@ -72,3 +74,37 @@ func TestNewInfoElementWithValue(t *testing.T) {
assert.Equal(t, element.GetInfoElement().Name, "sourceIPv4Address")
assert.Equal(t, element.GetIPAddressValue(), ip)
}

func BenchmarkEncodeInfoElementValueToBuffShortString(b *testing.B) {
// a short string has a max length of 254
str := strings.Repeat("x", 128)
element := NewStringInfoElement(NewInfoElement("interfaceDescription", 83, 13, 0, 65535), str)
const numCopies = 1000
length := element.GetLength()
buffer := make([]byte, numCopies*length)
b.ResetTimer()
for i := 0; i < b.N; i++ {
index := 0
for j := 0; j < numCopies; j++ {
require.NoError(b, encodeInfoElementValueToBuff(element, buffer, index))
index += length
}
}
}

func BenchmarkEncodeInfoElementValueToBuffLongString(b *testing.B) {
// a long string has a max length of 65535
str := strings.Repeat("x", 10000)
element := NewStringInfoElement(NewInfoElement("interfaceDescription", 83, 13, 0, 65535), str)
const numCopies = 1000
length := element.GetLength()
buffer := make([]byte, numCopies*length)
b.ResetTimer()
for i := 0; i < b.N; i++ {
index := 0
for j := 0; j < numCopies; j++ {
require.NoError(b, encodeInfoElementValueToBuff(element, buffer, index))
index += length
}
}
}
56 changes: 56 additions & 0 deletions pkg/exporter/msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"fmt"
"time"

"github.com/vmware/go-ipfix/pkg/entities"
)

func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, exportTime time.Time) ([]byte, error) {
// Create a new message and use it to send the set.
msg := entities.NewMessage(false)

// Check if message is exceeding the limit after adding the set. Include message
// header length too.
msgLen := entities.MsgHeaderLength + set.GetSetLength()
if msgLen > entities.MaxSocketMsgSize {
// This is applicable for both TCP and UDP sockets.
return nil, fmt.Errorf("message size exceeds max socket buffer size")
}

// Set the fields in the message header.
// IPFIX version number is 10.
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers
msg.SetVersion(10)
msg.SetObsDomainID(obsDomainID)
msg.SetMessageLen(uint16(msgLen))
msg.SetExportTime(uint32(exportTime.Unix()))
msg.SetSequenceNum(seqNumber)

bytesSlice := make([]byte, msgLen)
copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader())
copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer())
index := entities.MsgHeaderLength + entities.SetHeaderLen
for _, record := range set.GetRecords() {
len := record.GetRecordLength()
copy(bytesSlice[index:index+len], record.GetBuffer())
index += len
}

return bytesSlice, nil
}
33 changes: 4 additions & 29 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,45 +285,20 @@ func (ep *ExportingProcess) NewTemplateID() uint16 {
// createAndSendIPFIXMsg takes in a set as input, creates the IPFIX message, and sends it out.
// TODO: This method will change when we support sending multiple sets.
func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) {
// Create a new message and use it to send the set.
msg := entities.NewMessage(false)

// Check if message is exceeding the limit after adding the set. Include message
// header length too.
msgLen := entities.MsgHeaderLength + set.GetSetLength()
if msgLen > entities.MaxSocketMsgSize {
// This is applicable for both TCP and UDP sockets.
return 0, fmt.Errorf("message size exceeds max socket buffer size")
}

// Set the fields in the message header.
// IPFIX version number is 10.
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-version-numbers
msg.SetVersion(10)
msg.SetObsDomainID(ep.obsDomainID)
msg.SetMessageLen(uint16(msgLen))
msg.SetExportTime(uint32(time.Now().Unix()))
if set.GetSetType() == entities.Data {
ep.seqNumber = ep.seqNumber + set.GetNumberOfRecords()
}
msg.SetSequenceNum(ep.seqNumber)

bytesSlice := make([]byte, msgLen)
copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader())
copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer())
index := entities.MsgHeaderLength + entities.SetHeaderLen
for _, record := range set.GetRecords() {
len := record.GetRecordLength()
copy(bytesSlice[index:index+len], record.GetBuffer())
index += len
bytesSlice, err := CreateIPFIXMsg(set, ep.obsDomainID, ep.seqNumber, time.Now())
if err != nil {
return 0, err
}

// Send the message on the exporter connection.
bytesSent, err := ep.connToCollector.Write(bytesSlice)

if err != nil {
return bytesSent, fmt.Errorf("error when sending message on the connection: %v", err)
} else if bytesSent != msgLen {
} else if bytesSent != len(bytesSlice) {
return bytesSent, fmt.Errorf("could not send the complete message on the connection")
}

Expand Down
Loading
Loading