Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unite divergent changes #6

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.22'

- name: Build
run: go build -v ./...
Expand Down
40 changes: 35 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// TCPClientSettings defines common settings for a TCP client.
type TCPClientSettings struct {
// The target endpoint URI to send data to (e.g.: some.url:24224).
Endpoint string `mapstructure:"endpoint"`

// Connection Timeout parameter configures `net.Dialer`.
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`

// TLSSetting struct exposes TLS client configuration.
TLSSetting configtls.ClientConfig `mapstructure:"tls"`
// ClientConfig struct exposes TLS client configuration.
ClientConfig configtls.ClientConfig `mapstructure:"tls"`

// SharedKey is used for authorization with the server that knows it.
SharedKey string `mapstructure:"shared_key"`
Expand All @@ -44,14 +45,43 @@ type Config struct {
// DefaultLabelsEnabled is a map of default attributes to be added to each log record.
DefaultLabelsEnabled map[string]bool `mapstructure:"default_labels_enabled"`

exporterhelper.QueueSettings `mapstructure:"sending_queue"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// KubernetesMetadata includes kubernetes metadata as a nested object.
// It leverages resources attributes provided by k8sattributesprocessor
//
// Configuration example
// ```
// kubernetes_metadata:
// key: kubernetes
// include_pod_labels: true
// ```
//
// Resulting record structure:
// ```
// kubernetes:
// namespace_name: default
// container_name: nginx
// pod_name: nginx-59f678c4b-p6lk6
// labels:
// app.kubernetes.io/name: nginx
// host: gke-dev-node-pool-8-cf541dd4-98ro
// ```
//
KubernetesMetadata *KubernetesMetadata `mapstructure:"kubernetes_metadata,omitempty"`

exporterhelper.QueueConfig `mapstructure:"sending_queue"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
}

type KubernetesMetadata struct {
Key string `mapstructure:"key"`
IncludePodLabels bool `mapstructure:"include_pod_labels"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the configuration is valid
func (config *Config) Validate() error {
if err := config.QueueSettings.Validate(); err != nil {
if err := config.QueueConfig.Validate(); err != nil {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}

Expand Down
42 changes: 21 additions & 21 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,38 @@ func TestLoadConfigNewExporter(t *testing.T) {
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
ConnectionTimeout: time.Second * 30,
TLSSetting: configtls.ClientConfig{
Insecure: false,
InsecureSkipVerify: true,
ClientConfig: configtls.ClientConfig{
Insecure: true,
InsecureSkipVerify: false,
Config: configtls.Config{
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
CAFile: "",
CertFile: "",
KeyFile: "",
},
},
SharedKey: "otelcol-dev",
SharedKey: "",
},
RequireAck: true,
Tag: "nginx",
CompressGzip: true,
RequireAck: false,
Tag: "tag",
CompressGzip: false,
DefaultLabelsEnabled: map[string]bool{
"time": true,
"exporter": false,
"job": false,
"instance": false,
"exporter": true,
"job": true,
"instance": true,
},
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 5 * time.Minute,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
QueueSettings: exporterhelper.QueueSettings{
QueueConfig: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
NumConsumers: 10,
QueueSize: 1000,
},
},
},
Expand All @@ -81,7 +81,7 @@ func TestLoadConfigNewExporter(t *testing.T) {

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))
require.NoError(t, sub.Unmarshal(cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
Expand All @@ -97,7 +97,7 @@ func TestConfigValidate(t *testing.T) {
}{
{
desc: "QueueSettings are invalid",
cfg: &Config{QueueSettings: exporterhelper.QueueSettings{QueueSize: -1, Enabled: true}},
cfg: &Config{QueueConfig: exporterhelper.QueueConfig{QueueSize: -1, Enabled: true}},
err: fmt.Errorf("queue settings has invalid configuration"),
},
{
Expand Down
57 changes: 53 additions & 4 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package fluentforwardexporter // import "github.com/r0mdau/fluentforwardexporter
import (
"context"
"fmt"
"strings"
"sync"

fclient "github.com/IBM/fluent-forward-go/fluent/client"
"github.com/IBM/fluent-forward-go/fluent/protocol"
fproto "github.com/IBM/fluent-forward-go/fluent/protocol"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand All @@ -31,12 +33,12 @@ func newExporter(config *Config, settings component.TelemetrySettings) *fluentfo
}
}

func (f *fluentforwardExporter) start(_ context.Context, host component.Host) error {
func (f *fluentforwardExporter) start(ctx context.Context, host component.Host) error {
connOptions := fclient.ConnectionOptions{
RequireAck: f.config.RequireAck,
}

tlsConfig, err := f.config.TLSSetting.LoadTLSConfig()
tlsConfig, err := f.config.ClientConfig.LoadTLSConfig(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -96,7 +98,7 @@ func (f *fluentforwardExporter) pushLogData(ctx context.Context, ld plog.Logs) e
log := logs.At(k)
entry := fproto.EntryExt{
Timestamp: fproto.EventTimeNow(),
Record: f.convertLogToMap(log),
Record: f.convertLogToMap(log, rls.At(i)),
}
entries = append(entries, entry)
}
Expand All @@ -109,7 +111,7 @@ func (f *fluentforwardExporter) pushLogData(ctx context.Context, ld plog.Logs) e
return f.sendForward(entries)
}

func (f *fluentforwardExporter) convertLogToMap(lr plog.LogRecord) map[string]interface{} {
func (f *fluentforwardExporter) convertLogToMap(lr plog.LogRecord, res plog.ResourceLogs) map[string]interface{} {
// move function into a translator
m := make(map[string]interface{})
m["severity"] = lr.SeverityText()
Expand All @@ -122,6 +124,53 @@ func (f *fluentforwardExporter) convertLogToMap(lr plog.LogRecord) map[string]in
}
}
}
if f.config.KubernetesMetadata != nil {
key := f.config.KubernetesMetadata.Key
if f.config.KubernetesMetadata.Key == "" {
key = "kubernetes"
}
var namespace, container, pod, node string
var labels map[string]string
res.Resource().Attributes().Range(func(k string, v pcommon.Value) bool {
if k == "k8s.namespace.name" {
namespace = v.AsString()
return true
}
if k == "k8s.container.name" {
container = v.AsString()
}
if k == "k8s.pod.name" {
pod = v.AsString()
}
if k == "k8s.node.name" {
node = v.AsString()
}
if f.config.KubernetesMetadata.IncludePodLabels && strings.HasPrefix(k, "k8s.pod.labels.") {
if labels == nil {
labels = make(map[string]string)
}
labelKey := strings.TrimPrefix(k, "k8s.pod.labels.")
labels[labelKey] = v.AsString()
}
return true
})

k8sMetadata := map[string]interface{}{
"namespace_name": namespace,
"container_name": container,
"pod_name": pod,
"host": node,
}

if f.config.KubernetesMetadata.IncludePodLabels {
k8sMetadata["labels"] = labels
}

m[key] = k8sMetadata
}

f.settings.Logger.Debug(fmt.Sprintf("message %+v", m))

return m
}

Expand Down
1 change: 1 addition & 0 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

package fluentforwardexporter // import "github.com/r0mdau/fluentforwardexporter"

import (
"context"
"testing"
Expand Down
10 changes: 5 additions & 5 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func createDefaultConfig() component.Config {
TCPClientSettings: TCPClientSettings{
Endpoint: "localhost:24224",
ConnectionTimeout: time.Second * 30,
TLSSetting: configtls.ClientConfig{
ClientConfig: configtls.ClientConfig{
Insecure: true,
InsecureSkipVerify: false,
Config: configtls.Config{
Expand All @@ -54,11 +54,11 @@ func createDefaultConfig() component.Config {
"instance": true,
},
BackOffConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
}
}

func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) {
func createLogsExporter(ctx context.Context, set exporter.Settings, config component.Config) (exporter.Logs, error) {
exporterConfig := config.(*Config)
exp := newExporter(exporterConfig, set.TelemetrySettings)

Expand All @@ -68,9 +68,9 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config
config,
exp.pushLogData,
// explicitly disable since we rely on net.Dialer timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithRetry(exporterConfig.BackOffConfig),
exporterhelper.WithQueue(exporterConfig.QueueSettings),
exporterhelper.WithQueue(exporterConfig.QueueConfig),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop),
)
Expand Down
4 changes: 2 additions & 2 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestNewExporterFullConfig(t *testing.T) {
TCPClientSettings: TCPClientSettings{
Endpoint: validEndpoint,
ConnectionTimeout: time.Second * 30,
TLSSetting: configtls.TLSClientSetting{
ClientConfig: configtls.ClientConfig{
Insecure: true,
InsecureSkipVerify: false,
},
Expand All @@ -68,7 +68,7 @@ func TestNewExporterFullConfig(t *testing.T) {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
QueueSettings: exporterhelper.QueueSettings{
QueueConfig: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
Expand Down
Loading
Loading