Skip to content

Commit

Permalink
feat: upgrade to otel collector v0.102.1 (#132)
Browse files Browse the repository at this point in the history
* feat: upgrade to otel collector v0.102.1

* make modified kafka exporter consistent with original

* update go-grpc-compression pkg to 1.2.3 to fix vulnerability

* fix even more vulnerabilities
  • Loading branch information
tim-mwangi authored Jun 12, 2024
1 parent 4543e39 commit ee29249
Show file tree
Hide file tree
Showing 66 changed files with 2,237 additions and 1,475 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
uses: actions/checkout@v4
- if: matrix.os == 'ubuntu-latest'
name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.58.1
version: v1.59.0
skip-pkg-cache: true
only-new-issues: true
- name: Run unit tests
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run:
deadline: 5m
timeout: 5m

linters:
disable-all: true
Expand Down
12 changes: 5 additions & 7 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/exporter/kafkaexporter and
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.102.0/exporter/kafkaexporter and
adapted to accept compression settings and also do span curing on large spans.
# Kafka Exporter

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, metrics, logs |
| Distributions | [core], [contrib], [aws], [observiq], [splunk], [sumo] |
| Distributions | [core], [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fkafka%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fkafka) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fkafka%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fkafka) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@pavolloffay](https://www.github.com/pavolloffay), [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[aws]: https://github.com/aws-observability/aws-otel-collector
[observiq]: https://github.com/observIQ/observiq-otel-collector
[splunk]: https://github.com/signalfx/splunk-otel-collector
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
<!-- end autogenerated section -->

Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a synchronous producer
Expand All @@ -31,6 +27,7 @@ The following settings can be optionally configured:
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand All @@ -42,6 +39,7 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand All @@ -53,7 +51,7 @@ The following settings can be optionally configured:
- `version` (default = 0): The SASL protocol version to use (0 or 1)
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
- `tls`
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to false.
- `cert_file`: path to the TLS cert to use for TLS required connections. Should
Expand Down
5 changes: 5 additions & 0 deletions exporter/kafkaexporter/config.go_original
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

Expand All @@ -48,6 +51,8 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
5 changes: 5 additions & 0 deletions exporter/kafkaexporter/config.modified.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

Expand All @@ -48,6 +51,8 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
25 changes: 14 additions & 11 deletions exporter/kafkaexporter/config_modified_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(metadata.Type, ""),
option: func(conf *Config) {
option: func(_ *Config) {
// intentionally left blank so we use default config
},
expected: &Config{
Expand All @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -119,11 +120,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -185,6 +187,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
25 changes: 14 additions & 11 deletions exporter/kafkaexporter/config_test.go_original
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(metadata.Type, ""),
option: func(conf *Config) {
option: func(_ *Config) {
// intentionally left blank so we use default config
},
expected: &Config{
Expand All @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
10 changes: 8 additions & 2 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
defaultCompression = "none"
// default from sarama.NewConfig()
defaultFluxMaxMessages = 0
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -97,8 +99,9 @@ func createDefaultConfig() component.Config {
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down Expand Up @@ -148,6 +151,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -178,6 +182,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -208,5 +213,6 @@ func (f *kafkaExporterFactory) createLogsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}
13 changes: 10 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down
Loading

0 comments on commit ee29249

Please sign in to comment.