Skip to content

Commit

Permalink
chore: follow-up on upstream changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
  • Loading branch information
csatib02 committed Dec 9, 2024
1 parent 419cd55 commit f1b7283
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 124 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Forward is the protocol used by Fluentd to route message between peers.

| Property | Default value | Type | Description |
|---|---|---|---|
| endpoint | | string | **MANDATORY** Target URL to send `Forward` log streams to |
| endpoint.tcp_addr | | string | **MANDATORY** Target URL to send `Forward` log streams to |
| endpoint.validate_tcp_resolution | false | bool | Controls whether to validate the tcp address and fail at startup. |
| connection_timeout | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |
| tls.insecure | true | bool | If set to **true**, the connexion is not secured with TLS. |
| tls.insecure_skip_verify | false | bool | Controls whether the exporter verifies the server's certificate chain and host name. If **true**, any certificate is accepted and any host name. This mode is susceptible to man-in-the-middle attacks |
Expand All @@ -34,6 +35,8 @@ Forward is the protocol used by Fluentd to route message between peers.
| tag | "tag" | string | Fluentd tag is a string separated by '.'s (e.g. myapp.access), and is used as the directions for Fluentd's internal routing engine |
| compress_gzip | false | bool | Transparent data compression. You can use this feature to reduce the transferred payload size |
| default_labels_enabled | true | map[string]bool | If omitted then default labels will be added. If one of the labels is omitted then this label will be added |
| kubernetes_metadata.key | | string | KubernetesMetadata includes kubernetes metadata as a nested object. It leverages resources attributes provided by k8sattributesprocessor |
| kubernetes_metadata.include_pod_labels | | bool | Whether pod labels should be added to the nested object |

See the default values in the method `createDefaultConfig()` in [factory.go](factory.go) file.

Expand All @@ -42,7 +45,8 @@ Example, for `default_labels_enabled` that will add only the `time` attribute in
```yaml
exporters:
fluentforward:
endpoint: a.new.fluentforward.target:24224
endpoint:
tcp_addr: a.new.fluentforward.target:24224
connection_timeout: 10s
require_ack: true
tag: nginx
Expand All @@ -59,7 +63,8 @@ Example with TLS enabled and shared key:
```yaml
exporters:
fluentforward:
endpoint: a.new.fluentforward.target:24224
endpoint:
tcp_addr: a.new.fluentforward.target:24224
connection_timeout: 10s
tls:
insecure: false
Expand All @@ -71,7 +76,8 @@ Example with mutual TLS authentication (mTLS):
```yaml
exporters:
fluentforward:
endpoint: a.new.fluentforward.target:24224
endpoint:
tcp_addr: a.new.fluentforward.target:24224
connection_timeout: 10s
tls:
insecure: false
Expand All @@ -93,7 +99,8 @@ Example usage:
```yaml
exporters:
fluentforward:
endpoint: a.new.fluentforward.target:24224
endpoint:
tcp_addr: a.new.fluentforward.target:24224
connection_timeout: 10s
retry_on_failure:
enabled: true
Expand Down
23 changes: 13 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

// TCPClientSettings defines common settings for a TCP client.
type TCPClientSettings struct {
// The target endpoint URI to send data to (e.g.: some.url:24224).
Endpoint string `mapstructure:"endpoint"`
// Endpoint to send logs to.
Endpoint `mapstructure:"endpoint"`

// Connection Timeout parameter configures `net.Dialer`.
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
Expand All @@ -33,10 +33,6 @@ type TCPClientSettings struct {
type Config struct {
TCPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// SkipFailOnInvalidTCPEndpoint controls whether to fail if the endpoint is invalid.
// This is useful for cases where the collector is started before the endpoint becomes available.
SkipFailOnInvalidTCPEndpoint bool `mapstructure:"skip_fail_on_invalid_tcp_endpoint"`

// RequireAck enables the acknowledgement feature.
RequireAck bool `mapstructure:"require_ack"`

Expand Down Expand Up @@ -76,6 +72,13 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
}

type Endpoint struct {
// TCPAddr is the address of the server to connect to.
TCPAddr string `mapstructure:"tcp_addr"`
// Controls whether to validate the tcp address.
ValidateTCPResolution bool `mapstructure:"validate_tcp_resolution"`
}

type KubernetesMetadata struct {
Key string `mapstructure:"key"`
IncludePodLabels bool `mapstructure:"include_pod_labels"`
Expand All @@ -89,10 +92,10 @@ func (config *Config) Validate() error {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}

// Resolve TCP address just to ensure that it is a valid one. It is better
// to fail here than at when the exporter is started.
if !config.SkipFailOnInvalidTCPEndpoint {
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint); err != nil {
if config.TCPClientSettings.Endpoint.ValidateTCPResolution {
// Resolve TCP address just to ensure that it is a valid one. It is better
// to fail here than at when the exporter is started.
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint.TCPAddr); err != nil {
return fmt.Errorf("exporter has an invalid TCP endpoint: %w", err)
}
}
Expand Down
32 changes: 24 additions & 8 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func TestLoadConfigNewExporter(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "allsettings"),
expected: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: false,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: true,
Expand Down Expand Up @@ -97,35 +100,48 @@ func TestConfigValidate(t *testing.T) {
}{
{
desc: "QueueSettings are invalid",
cfg: &Config{QueueConfig: exporterhelper.QueueConfig{QueueSize: -1, Enabled: true}},
err: fmt.Errorf("queue settings has invalid configuration"),
cfg: &Config{
QueueConfig: exporterhelper.QueueConfig{
QueueSize: -1,
Enabled: true,
},
},
err: fmt.Errorf("queue settings has invalid configuration"),
},
{
desc: "Endpoint is invalid",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "http://localhost:24224",
Endpoint: Endpoint{
TCPAddr: "http://localhost:24224",
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
},
err: fmt.Errorf("exporter has an invalid TCP endpoint: address http://localhost:24224: too many colons in address"),
},
{
desc: "Endpoint is invalid but SkipFailOnInvalidTCPEndpoint is false",
desc: "Endpoint is invalid with ValidateTCPResolution false throw no error",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "http://localhost:24224",
Endpoint: Endpoint{
TCPAddr: "http://localhost:24224",
ValidateTCPResolution: false,
},
ConnectionTimeout: time.Second * 30,
},
SkipFailOnInvalidTCPEndpoint: true,
},
err: nil,
},
{
desc: "Config is valid",
cfg: &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
},
Expand Down
10 changes: 5 additions & 5 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (f *fluentforwardExporter) start(ctx context.Context, host component.Host)
return err
}
connFactory := &fclient.ConnFactory{
Address: f.config.Endpoint,
Address: f.config.Endpoint.TCPAddr,
Timeout: f.config.ConnectionTimeout,
TLSConfig: tlsConfig,
}
Expand Down Expand Up @@ -70,14 +70,14 @@ func (f *fluentforwardExporter) stop(context.Context) (err error) {
// connectForward connects to the Fluent Forward endpoint and keep running otel even if the connection is failing
func (f *fluentforwardExporter) connectForward() {
if err := f.client.Connect(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint))
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint.TCPAddr))
return
}
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint))
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint.TCPAddr))

if f.config.SharedKey != "" {
if err := f.client.Handshake(); err != nil {
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint))
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint.TCPAddr))
return
}
f.settings.Logger.Info("Successfull shared key handshake with the endpoint")
Expand Down Expand Up @@ -182,7 +182,7 @@ func (f *fluentforwardExporter) send(sendMethod sendFunc, entries []protocol.Ent
if errr := f.client.Disconnect(); errr != nil {
return errr
}
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint))
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint.TCPAddr))
f.connectForward()
err = sendMethod(f.config.Tag, entries)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
func TestNewExporter(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -37,7 +39,9 @@ func TestNewExporter(t *testing.T) {
func TestStart(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -62,7 +66,10 @@ func TestStartInvalidEndpointErrorLog(t *testing.T) {

config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "invalidEndpoint",
Endpoint: Endpoint{
TCPAddr: "invalidEndpoint",
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand Down
5 changes: 4 additions & 1 deletion factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: "localhost:24224",
Endpoint: Endpoint{
TCPAddr: "localhost:24224",
ValidateTCPResolution: false,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: true,
Expand Down
17 changes: 13 additions & 4 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func TestNewExporterMinimalConfig(t *testing.T) {
t.Run("with valid config", func(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -43,7 +45,10 @@ func TestNewExporterFullConfig(t *testing.T) {
t.Run("with valid config", func(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
ValidateTCPResolution: true,
},
ConnectionTimeout: time.Second * 30,
ClientConfig: configtls.ClientConfig{
Insecure: true,
Expand Down Expand Up @@ -82,7 +87,9 @@ func TestNewExporterFullConfig(t *testing.T) {
func TestStartAlwaysReturnsNil(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand All @@ -94,7 +101,9 @@ func TestStartAlwaysReturnsNil(t *testing.T) {
func TestStopAlwaysReturnsNil(t *testing.T) {
config := &Config{
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
Endpoint: Endpoint{
TCPAddr: validEndpoint,
},
ConnectionTimeout: time.Second * 30,
},
}
Expand Down
51 changes: 27 additions & 24 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,58 @@ toolchain go1.23.2
require (
github.com/IBM/fluent-forward-go v0.2.2
github.com/cenkalti/backoff/v4 v4.3.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
go.opentelemetry.io/collector/config/configretry v1.18.0
go.opentelemetry.io/collector/config/configtls v1.18.0
go.opentelemetry.io/collector/confmap v1.18.0
go.opentelemetry.io/collector/exporter v0.112.0
go.opentelemetry.io/collector/pdata v1.18.0
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.0
go.opentelemetry.io/collector/component/componenttest v0.115.0
go.opentelemetry.io/collector/config/configretry v1.21.0
go.opentelemetry.io/collector/config/configtls v1.21.0
go.opentelemetry.io/collector/confmap v1.21.0
go.opentelemetry.io/collector/exporter v0.115.0
go.opentelemetry.io/collector/pdata v1.21.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/zap v1.27.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
go.opentelemetry.io/collector/config/configopaque v1.18.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.112.0 // indirect
go.opentelemetry.io/collector/consumer v0.112.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect
go.opentelemetry.io/collector/extension v0.112.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.112.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.21.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect
go.opentelemetry.io/collector/consumer v1.21.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect
go.opentelemetry.io/collector/extension v0.115.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.115.0 // indirect
go.opentelemetry.io/collector/featuregate v1.21.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect
go.opentelemetry.io/collector/pipeline v0.115.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit f1b7283

Please sign in to comment.