Skip to content

Commit

Permalink
Merge pull request #79 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Mar 28, 2024
2 parents a7399e2 + 4f1f658 commit 64afdad
Show file tree
Hide file tree
Showing 16 changed files with 1,324 additions and 1,204 deletions.
19 changes: 5 additions & 14 deletions cmd/otel-collector/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/parquetexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter"
Expand All @@ -27,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarderextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/dockerobserver"
Expand Down Expand Up @@ -59,8 +58,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
Expand Down Expand Up @@ -88,7 +85,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/expvarreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filestatsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver"
Expand Down Expand Up @@ -154,11 +150,10 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/forwardconnector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/extension/zpagesextension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -187,7 +182,7 @@ func components() (otelcol.Factories, error) {
bearertokenauthextension.NewFactory(),
headerssetterextension.NewFactory(),
healthcheckextension.NewFactory(),
httpforwarder.NewFactory(),
httpforwarderextension.NewFactory(),
jaegerremotesampling.NewFactory(),
oauth2clientauthextension.NewFactory(),
ecsobserver.NewFactory(),
Expand Down Expand Up @@ -243,7 +238,6 @@ func components() (otelcol.Factories, error) {
expvarreceiver.NewFactory(),
filelogreceiver.NewFactory(),
filestatsreceiver.NewFactory(),
filereceiver.NewFactory(),
flinkmetricsreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
googlecloudpubsubreceiver.NewFactory(),
Expand Down Expand Up @@ -323,7 +317,6 @@ func components() (otelcol.Factories, error) {
kafkaexporter.NewFactory(),
loadbalancingexporter.NewFactory(),
opencensusexporter.NewFactory(),
parquetexporter.NewFactory(),
prometheusexporter.NewFactory(),
prometheusremotewriteexporter.NewFactory(),
zipkinexporter.NewFactory(),
Expand Down Expand Up @@ -353,8 +346,6 @@ func components() (otelcol.Factories, error) {
resourceprocessor.NewFactory(),
routingprocessor.NewFactory(),
schemaprocessor.NewFactory(),
servicegraphprocessor.NewFactory(),
spanmetricsprocessor.NewFactory(),
spanprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
transformprocessor.NewFactory(),
Expand All @@ -372,6 +363,7 @@ func components() (otelcol.Factories, error) {
datadogconnector.NewFactory(),
exceptionsconnector.NewFactory(),
routingconnector.NewFactory(),
// it is previously in processor now it is in connector
servicegraphconnector.NewFactory(),
spanmetricsconnector.NewFactory(),
}
Expand All @@ -394,7 +386,6 @@ func CoreComponents() (

extensions, err := extension.MakeFactoryMap(
zpagesextension.NewFactory(),
ballastextension.NewFactory(),
)
errs = multierr.Append(errs, err)

Expand All @@ -404,7 +395,7 @@ func CoreComponents() (
errs = multierr.Append(errs, err)

exporters, err := exporter.MakeFactoryMap(
loggingexporter.NewFactory(),
debugexporter.NewFactory(),
otlpexporter.NewFactory(),
otlphttpexporter.NewFactory(),
)
Expand Down
15 changes: 1 addition & 14 deletions exporter/clickhouseprofileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,17 @@ import (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
Dsn string `mapstructure:"dsn"`
}

type QueueSettings struct {
// Length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforceQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
6 changes: 3 additions & 3 deletions exporter/clickhouseprofileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Dsn: defaultDsn,
}
Expand All @@ -37,7 +37,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, cfg co
cfg,
exp.send,
exporterhelper.WithShutdown(exp.Shutdown),
exporterhelper.WithQueue(c.enforceQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
Expand All @@ -46,7 +46,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, cfg co
// Creates a factory for the clickhouse profile exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
component.MustNewType(typeStr),
createDefaultConfig,
exporter.WithLogs(createLogsExporter, component.StabilityLevelAlpha),
)
Expand Down
18 changes: 1 addition & 17 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ const (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable.
QueueSettings QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

Expand All @@ -45,12 +43,6 @@ type Config struct {
Metrics MetricsConfig `mapstructure:"metrics"`
}

// QueueSettings is a subset of exporterhelper.QueueSettings.
type QueueSettings struct {
// QueueSize set the length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

// LogsConfig holds the configuration for log data.
type LogsConfig struct {
// AttributeLabels is the string representing attribute labels.
Expand All @@ -74,11 +66,3 @@ var _ component.Config = (*Config)(nil)
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
7 changes: 5 additions & 2 deletions exporter/qrynexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: 0.5,
Multiplier: 1.5,
},
QueueSettings: QueueSettings{
QueueSize: 100,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
QueueSize: 100,
NumConsumers: 10,
StorageID: nil,
},
},
},
Expand Down
20 changes: 10 additions & 10 deletions exporter/qrynexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// NewFactory creates a factory for Logging exporter
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
component.MustNewType(typeStr),
createDefaultConfig,
exporter.WithTraces(createTracesExporter, stability),
exporter.WithLogs(createLogsExporter, stability),
Expand All @@ -42,7 +42,7 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
DSN: defaultDSN,
}
Expand All @@ -52,23 +52,23 @@ func createDefaultConfig() component.Config {
// Traces are directly insert into clickhouse.
func createTracesExporter(
ctx context.Context,
params exporter.CreateSettings,
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
c := cfg.(*Config)
oce, err := newTracesExporter(params.Logger, c)
oce, err := newTracesExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn traces exporter: %w", err)
}

return exporterhelper.NewTracesExporter(
ctx,
params,
set,
cfg,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -81,7 +81,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newLogsExporter(set.Logger, c)
exporter, err := newLogsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -93,7 +93,7 @@ func createLogsExporter(
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -106,7 +106,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
c := cfg.(*Config)
exporter, err := newMetricsExporter(set.Logger, c)
exporter, err := newMetricsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -118,7 +118,7 @@ func createMetricsExporter(
exporter.pushMetricsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
20 changes: 15 additions & 5 deletions exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-logfmt/logfmt"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -30,6 +32,7 @@ const (

type logsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

Expand All @@ -39,7 +42,7 @@ type logsExporter struct {
cluster bool
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
func newLogsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*logsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -48,14 +51,20 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
if err != nil {
return nil, err
}
return &logsExporter{
exp := &logsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
format: cfg.Logs.Format,
attributeLabels: cfg.Logs.AttributeLabels,
resourceLabels: cfg.Logs.ResourceLabels,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -433,11 +442,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeLogs)))
e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
Loading

0 comments on commit 64afdad

Please sign in to comment.