From 37f3eb978e4ba9827a35695b95f998c4cc352fbe Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 5 Jun 2024 18:45:03 +0100 Subject: [PATCH] Suggestions from code review --- .../components/otelcol.exporter.kafka.md | 112 ++---------- .../components/otelcol.receiver.kafka.md | 94 +---------- .../otelcol-kafka-authentication-kerberos.md | 24 +++ .../otelcol-kafka-authentication-plaintext.md | 13 ++ ...elcol-kafka-authentication-sasl-aws_msk.md | 14 ++ .../otelcol-kafka-authentication-sasl.md | 26 +++ .../otelcol-kafka-authentication.md | 8 + .../otelcol-kafka-metadata-retry.md | 14 ++ .../components/otelcol-kafka-metadata.md | 22 +++ internal/component/otelcol/config_kafka.go | 155 +++++++++++++++++ .../component/otelcol/exporter/kafka/kafka.go | 15 +- .../component/otelcol/receiver/kafka/kafka.go | 159 +----------------- .../otelcolconvert/converter_kafkaexporter.go | 2 +- .../otelcolconvert/converter_kafkareceiver.go | 30 ++-- 14 files changed, 329 insertions(+), 359 deletions(-) create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-authentication.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md create mode 100644 docs/sources/shared/reference/components/otelcol-kafka-metadata.md create mode 100644 internal/component/otelcol/config_kafka.go diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md index a13b20c1e2..56df3d7afe 100644 --- a/docs/sources/reference/components/otelcol.exporter.kafka.md +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -9,6 +9,12 @@ title: otelcol.exporter.kafka `otelcol.exporter.kafka` accepts logs, metrics, and traces telemetry data from other `otelcol` components and sends it to Kafka. +It is important to use `otelcol.exporter.kafka` together with `otelcol.processor.batch` +in order to make sure `otelcol.exporter.kafka` doesn't slow down due to sending Kafka a huge number of small payloads. + +This exporter uses a synchronous producer that blocks and does not batch messages, +therefore it should be used with batch and queued retry processors for higher throughput and resiliency. + > **NOTE**: `otelcol.exporter.kafka` is a wrapper over the upstream > OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` > distribution. Bug reports or feature requests will be redirected to the @@ -33,7 +39,7 @@ Name | Type | Description ------------------------------------------ | --------------- | ----------------------------------------------------------------------------------- | -------------------- | -------- `protocol_version` | `string` | Kafka protocol version to use. | | yes `brokers` | `list(string)` | Kafka brokers to connect to. | `["localhost:9092"]` | no -`topic` | `string` | Kafka topic to read from. | _See below_ | no +`topic` | `string` | Kafka topic to send to. | _See below_ | no `topic_from_attribute` | `string` | A resource attribute whose value should be used as the message's topic. | `""` | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `client_id` | `string` | Consumer client ID to use. The ID will be used for all produce requests. | `"sarama"` | no @@ -44,13 +50,11 @@ Name | Type | Description If `topic` is not set, different topics will be used for different telemetry signals: -* Metrics will be received from an `otlp_metrics` topic. -* Traces will be received from an `otlp_spans` topic. -* Logs will be received from an `otlp_logs` topic. +* Metrics will be sent to an `otlp_metrics` topic. +* Traces will be sent to an `otlp_spans` topic. +* Logs will be sent to an `otlp_logs` topic. -If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. -For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. -If it contains only metrics, then `otelcol.exporter.kafka` should be configured to process only metrics. +If topic is set, the same topic will be used for all telemetry signals - metrics, logs, and traces. When `topic_from_attribute` is set, it will take precedence over `topic`. @@ -106,56 +110,19 @@ For example, `authentication > tls` refers to a `tls` block defined inside an `a ### authentication block -The `authentication` block holds the definition of different authentication -mechanisms to use when connecting to Kafka brokers. It doesn't support any -arguments and is configured fully through inner blocks. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} ### plaintext block -The `plaintext` block configures `PLAIN` authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for `PLAIN` authentication. | | yes -`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} ### sasl block -The `sasl` block configures SASL authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for SASL authentication. | | yes -`password` | `secret` | Password to use for SASL authentication. | | yes -`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes -`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no - -The `mechanism` argument can be set to one of the following strings: - -* `"PLAIN"` -* `"AWS_MSK_IAM"` -* `"SCRAM-SHA-256"` -* `"SCRAM-SHA-512"` - -When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. - -The `version` argument can be set to either `0` or `1`. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} ### aws_msk block -The `aws_msk` block configures extra parameters for SASL authentication when -using the `AWS_MSK_IAM` mechanism. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`region` | `string` | AWS region the MSK cluster is based in. | | yes -`broker_addr` | `string` | MSK address to connect to for authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} ### tls block @@ -167,58 +134,15 @@ communication. ### kerberos block -The `kerberos` block configures Kerberos authentication against the Kafka -broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`service_name` | `string` | Kerberos service name. | | no -`realm` | `string` | Kerberos realm. | | no -`use_keytab` | `string` | Enables using keytab instead of password. | | no -`username` | `string` | Kerberos username to authenticate as. | | yes -`password` | `secret` | Kerberos password to authenticate with. | | no -`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no -`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no - -When `use_keytab` is `false`, the `password` argument is required. When -`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is -used for authentication instead. At most one of `password` or `keytab_file` -must be provided. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} ### metadata block -The `metadata` block configures how to retrieve and store metadata from the -Kafka broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no - -If the `include_all_topics` argument is `true`, `otelcol.exporter.kafka` -maintains a full set of metadata for all topics rather than the minimal set -that has been necessary so far. Including the full set of metadata is more -convenient for users but can consume a substantial amount of memory if you have -many topics and partitions. - -Retrieving metadata may fail if the Kafka broker is starting up at the same -time as the `otelcol.exporter.kafka` component. The [`retry` child -block][retry] can be provided to customize retry behavior. +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} ### retry block -The `retry` block configures how to retry retrieving metadata when retrieval -fails. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no -`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} ### retry_on_failure block diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index acd01b63ac..ba71f33030 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -118,56 +118,19 @@ The `>` symbol indicates deeper levels of nesting. For example, ### authentication block -The `authentication` block holds the definition of different authentication -mechanisms to use when connecting to Kafka brokers. It doesn't support any -arguments and is configured fully through inner blocks. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} ### plaintext block -The `plaintext` block configures `PLAIN` authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for `PLAIN` authentication. | | yes -`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} ### sasl block -The `sasl` block configures SASL authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for SASL authentication. | | yes -`password` | `secret` | Password to use for SASL authentication. | | yes -`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes -`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no - -The `mechanism` argument can be set to one of the following strings: - -* `"PLAIN"` -* `"AWS_MSK_IAM"` -* `"SCRAM-SHA-256"` -* `"SCRAM-SHA-512"` - -When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. - -The `version` argument can be set to either `0` or `1`. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} ### aws_msk block -The `aws_msk` block configures extra parameters for SASL authentication when -using the `AWS_MSK_IAM` mechanism. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`region` | `string` | AWS region the MSK cluster is based in. | | yes -`broker_addr` | `string` | MSK address to connect to for authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} ### tls block @@ -179,58 +142,15 @@ communication. ### kerberos block -The `kerberos` block configures Kerberos authentication against the Kafka -broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`service_name` | `string` | Kerberos service name. | | no -`realm` | `string` | Kerberos realm. | | no -`use_keytab` | `string` | Enables using keytab instead of password. | | no -`username` | `string` | Kerberos username to authenticate as. | | yes -`password` | `secret` | Kerberos password to authenticate with. | | no -`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no -`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no - -When `use_keytab` is `false`, the `password` argument is required. When -`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is -used for authentication instead. At most one of `password` or `keytab_file` -must be provided. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} ### metadata block -The `metadata` block configures how to retrieve and store metadata from the -Kafka broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no - -If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka` -maintains a full set of metadata for all topics rather than the minimal set -that has been necessary so far. Including the full set of metadata is more -convenient for users but can consume a substantial amount of memory if you have -many topics and partitions. - -Retrieving metadata may fail if the Kafka broker is starting up at the same -time as the `otelcol.receiver.kafka` component. The [`retry` child -block][retry] can be provided to customize retry behavior. +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} ### retry block -The `retry` block configures how to retry retrieving metadata when retrieval -fails. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no -`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} ### autocommit block diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md new file mode 100644 index 0000000000..2fcbfea2c7 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md @@ -0,0 +1,24 @@ +--- +description: Shared content, otelcol kafka kerberos authentication +headless: true +--- + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md new file mode 100644 index 0000000000..e4c0338fe1 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md @@ -0,0 +1,13 @@ +--- +description: Shared content, otelcol kafka plain text authentication +headless: true +--- + +The `plaintext` block configures `PLAIN` authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for `PLAIN` authentication. | | yes +`password` | `secret` | Password to use for `PLAIN` authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md new file mode 100644 index 0000000000..8f35ef3552 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol kafka sasl aws_msk authentication +headless: true +--- + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md new file mode 100644 index 0000000000..64df1f0d7a --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md @@ -0,0 +1,26 @@ +--- +description: Shared content, otelcol kafka sasl authentication +headless: true +--- + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for SASL authentication. | | yes +`password` | `secret` | Password to use for SASL authentication. | | yes +`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes +`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no + +The `mechanism` argument can be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. + +The `version` argument can be set to either `0` or `1`. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md new file mode 100644 index 0000000000..df02b37731 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md @@ -0,0 +1,8 @@ +--- +description: Shared content, otelcol kafka authentication +headless: true +--- + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md new file mode 100644 index 0000000000..52465b8192 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol kafka metadata retry +headless: true +--- + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md new file mode 100644 index 0000000000..3d37760fc2 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md @@ -0,0 +1,22 @@ +--- +description: Shared content, otelcol kafka metadata +headless: true +--- + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, +a full set of metadata for all topics is maintained rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users but can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the Alloy component. The [`retry` child block][retry] can be provided to customize retry behavior. diff --git a/internal/component/otelcol/config_kafka.go b/internal/component/otelcol/config_kafka.go new file mode 100644 index 0000000000..1178e85e4c --- /dev/null +++ b/internal/component/otelcol/config_kafka.go @@ -0,0 +1,155 @@ +package otelcol + +import ( + "time" + + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" +) + +// KafkaAuthenticationArguments configures how to authenticate to the Kafka broker. +type KafkaAuthenticationArguments struct { + Plaintext *KafkaPlaintextArguments `alloy:"plaintext,block,optional"` + SASL *KafkaSASLArguments `alloy:"sasl,block,optional"` + TLS *TLSClientArguments `alloy:"tls,block,optional"` + Kerberos *KafkaKerberosArguments `alloy:"kerberos,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaAuthenticationArguments) Convert() map[string]interface{} { + auth := make(map[string]interface{}) + + if args.Plaintext != nil { + conv := args.Plaintext.Convert() + auth["plain_text"] = &conv + } + if args.SASL != nil { + conv := args.SASL.Convert() + auth["sasl"] = &conv + } + if args.TLS != nil { + auth["tls"] = args.TLS.Convert() + } + if args.Kerberos != nil { + conv := args.Kerberos.Convert() + auth["kerberos"] = &conv + } + + return auth +} + +// KafkaPlaintextArguments configures plaintext authentication against the Kafka +// broker. +type KafkaPlaintextArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaPlaintextArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + } +} + +// KafkaSASLArguments configures SASL authentication against the Kafka broker. +type KafkaSASLArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` + Mechanism string `alloy:"mechanism,attr"` + Version int `alloy:"version,attr,optional"` + AWSMSK KafkaAWSMSKArguments `alloy:"aws_msk,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaSASLArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + "mechanism": args.Mechanism, + "version": args.Version, + "aws_msk": args.AWSMSK.Convert(), + } +} + +// KafkaAWSMSKArguments exposes additional SASL authentication measures required to +// use the AWS_MSK_IAM mechanism. +type KafkaAWSMSKArguments struct { + Region string `alloy:"region,attr"` + BrokerAddr string `alloy:"broker_addr,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaAWSMSKArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "region": args.Region, + "broker_addr": args.BrokerAddr, + } +} + +// KafkaKerberosArguments configures Kerberos authentication against the Kafka +// broker. +type KafkaKerberosArguments struct { + ServiceName string `alloy:"service_name,attr,optional"` + Realm string `alloy:"realm,attr,optional"` + UseKeyTab bool `alloy:"use_keytab,attr,optional"` + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr,optional"` + ConfigPath string `alloy:"config_file,attr,optional"` + KeyTabPath string `alloy:"keytab_file,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaKerberosArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "service_name": args.ServiceName, + "realm": args.Realm, + "use_keytab": args.UseKeyTab, + "username": args.Username, + "password": string(args.Password), + "config_file": args.ConfigPath, + "keytab_file": args.KeyTabPath, + } +} + +// KafkaMetadataArguments configures how the Alloy component will +// retrieve metadata from the Kafka broker. +type KafkaMetadataArguments struct { + IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` + Retry KafkaMetadataRetryArguments `alloy:"retry,block,optional"` +} + +func (args *KafkaMetadataArguments) SetToDefault() { + *args = KafkaMetadataArguments{ + IncludeAllTopics: true, + Retry: KafkaMetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + } +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataArguments) Convert() kafkaexporter.Metadata { + return kafkaexporter.Metadata{ + Full: args.IncludeAllTopics, + Retry: args.Retry.Convert(), + } +} + +// KafkaMetadataRetryArguments configures how to retry retrieving metadata from the +// Kafka broker. Retrying is useful to avoid race conditions when the Kafka +// broker is starting at the same time as the Alloy component. +type KafkaMetadataRetryArguments struct { + MaxRetries int `alloy:"max_retries,attr,optional"` + Backoff time.Duration `alloy:"backoff,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { + return kafkaexporter.MetadataRetry{ + Max: args.MaxRetries, + Backoff: args.Backoff, + } +} diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go index 2b0a5f0d8c..01260863dd 100644 --- a/internal/component/otelcol/exporter/kafka/kafka.go +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -9,7 +9,6 @@ import ( "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter" - alloy_kafka_receiver "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/syntax" "github.com/mitchellh/mapstructure" @@ -46,11 +45,11 @@ type Arguments struct { PartitionMetricsByResourceAttributes bool `alloy:"partition_metrics_by_resource_attributes,attr,optional"` Timeout time.Duration `alloy:"timeout,attr,optional"` - Authentication alloy_kafka_receiver.AuthenticationArguments `alloy:"authentication,block,optional"` - Metadata alloy_kafka_receiver.MetadataArguments `alloy:"metadata,block,optional"` - Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` - Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` - Producer Producer `alloy:"producer,block,optional"` + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` + Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` + Producer Producer `alloy:"producer,block,optional"` // DebugMetrics configures component internal metrics. Optional. DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` @@ -103,9 +102,9 @@ func (args *Arguments) SetToDefault() { Brokers: []string{"localhost:9092"}, ClientID: "sarama", Timeout: 5 * time.Second, - Metadata: alloy_kafka_receiver.MetadataArguments{ + Metadata: otelcol.KafkaMetadataArguments{ IncludeAllTopics: true, - Retry: alloy_kafka_receiver.MetadataRetryArguments{ + Retry: otelcol.KafkaMetadataRetryArguments{ MaxRetries: 3, Backoff: 250 * time.Millisecond, }, diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index b6ff5917e8..186f381f4c 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -11,9 +11,7 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/receiver" "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/syntax/alloytypes" "github.com/mitchellh/mapstructure" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" otelcomponent "go.opentelemetry.io/collector/component" otelextension "go.opentelemetry.io/collector/extension" @@ -44,11 +42,11 @@ type Arguments struct { ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` - Authentication AuthenticationArguments `alloy:"authentication,block,optional"` - Metadata MetadataArguments `alloy:"metadata,block,optional"` - AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` - MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` - HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` + MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` + HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` // DebugMetrics configures component internal metrics. Optional. DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` @@ -142,153 +140,6 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { return args.Output } -// AuthenticationArguments configures how to authenticate to the Kafka broker. -type AuthenticationArguments struct { - Plaintext *PlaintextArguments `alloy:"plaintext,block,optional"` - SASL *SASLArguments `alloy:"sasl,block,optional"` - TLS *otelcol.TLSClientArguments `alloy:"tls,block,optional"` - Kerberos *KerberosArguments `alloy:"kerberos,block,optional"` -} - -// Convert converts args into the upstream type. -func (args AuthenticationArguments) Convert() map[string]interface{} { - auth := make(map[string]interface{}) - - if args.Plaintext != nil { - conv := args.Plaintext.Convert() - auth["plain_text"] = &conv - } - if args.SASL != nil { - conv := args.SASL.Convert() - auth["sasl"] = &conv - } - if args.TLS != nil { - auth["tls"] = args.TLS.Convert() - } - if args.Kerberos != nil { - conv := args.Kerberos.Convert() - auth["kerberos"] = &conv - } - - return auth -} - -// PlaintextArguments configures plaintext authentication against the Kafka -// broker. -type PlaintextArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` -} - -// Convert converts args into the upstream type. -func (args PlaintextArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - } -} - -// SASLArguments configures SASL authentication against the Kafka broker. -type SASLArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` - Mechanism string `alloy:"mechanism,attr"` - Version int `alloy:"version,attr,optional"` - AWSMSK AWSMSKArguments `alloy:"aws_msk,block,optional"` -} - -// Convert converts args into the upstream type. -func (args SASLArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - "mechanism": args.Mechanism, - "version": args.Version, - "aws_msk": args.AWSMSK.Convert(), - } -} - -// AWSMSKArguments exposes additional SASL authentication measures required to -// use the AWS_MSK_IAM mechanism. -type AWSMSKArguments struct { - Region string `alloy:"region,attr"` - BrokerAddr string `alloy:"broker_addr,attr"` -} - -// Convert converts args into the upstream type. -func (args AWSMSKArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "region": args.Region, - "broker_addr": args.BrokerAddr, - } -} - -// KerberosArguments configures Kerberos authentication against the Kafka -// broker. -type KerberosArguments struct { - ServiceName string `alloy:"service_name,attr,optional"` - Realm string `alloy:"realm,attr,optional"` - UseKeyTab bool `alloy:"use_keytab,attr,optional"` - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr,optional"` - ConfigPath string `alloy:"config_file,attr,optional"` - KeyTabPath string `alloy:"keytab_file,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args KerberosArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "service_name": args.ServiceName, - "realm": args.Realm, - "use_keytab": args.UseKeyTab, - "username": args.Username, - "password": string(args.Password), - "config_file": args.ConfigPath, - "keytab_file": args.KeyTabPath, - } -} - -// MetadataArguments configures how the otelcol.receiver.kafka component will -// retrieve metadata from the Kafka broker. -type MetadataArguments struct { - IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` - Retry MetadataRetryArguments `alloy:"retry,block,optional"` -} - -func (args *MetadataArguments) SetToDefault() { - *args = MetadataArguments{ - IncludeAllTopics: true, - Retry: MetadataRetryArguments{ - MaxRetries: 3, - Backoff: 250 * time.Millisecond, - }, - } -} - -// Convert converts args into the upstream type. -func (args MetadataArguments) Convert() kafkaexporter.Metadata { - return kafkaexporter.Metadata{ - Full: args.IncludeAllTopics, - Retry: args.Retry.Convert(), - } -} - -// MetadataRetryArguments configures how to retry retrieving metadata from the -// Kafka broker. Retrying is useful to avoid race conditions when the Kafka -// broker is starting at the same time as the otelcol.receiver.kafka component. -type MetadataRetryArguments struct { - MaxRetries int `alloy:"max_retries,attr,optional"` - Backoff time.Duration `alloy:"backoff,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { - return kafkaexporter.MetadataRetry{ - Max: args.MaxRetries, - Backoff: args.Backoff, - } -} - // AutoCommitArguments configures how to automatically commit updated topic // offsets back to the Kafka broker. type AutoCommitArguments struct { diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go index 90d845d281..0c735ee45e 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -27,7 +27,7 @@ func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.Instan label := state.AlloyComponentLabel() - args := toKafkaExporter(state, id, cfg.(*kafkaexporter.Config)) + args := toKafkaExporter(cfg.(*kafkaexporter.Config)) block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "kafka"}, label, args) diags.Add( diff --git a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go index 7888c04f4a..654227f81c 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go @@ -76,8 +76,8 @@ func toKafkaReceiver(state *State, id component.InstanceID, cfg *kafkareceiver.C } } -func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { - return kafka.AuthenticationArguments{ +func toKafkaAuthentication(cfg map[string]any) otelcol.KafkaAuthenticationArguments { + return otelcol.KafkaAuthenticationArguments{ Plaintext: toKafkaPlaintext(encodeMapstruct(cfg["plain_text"])), SASL: toKafkaSASL(encodeMapstruct(cfg["sasl"])), TLS: toKafkaTLSClientArguments(encodeMapstruct(cfg["tls"])), @@ -85,23 +85,23 @@ func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { } } -func toKafkaPlaintext(cfg map[string]any) *kafka.PlaintextArguments { +func toKafkaPlaintext(cfg map[string]any) *otelcol.KafkaPlaintextArguments { if cfg == nil { return nil } - return &kafka.PlaintextArguments{ + return &otelcol.KafkaPlaintextArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), } } -func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { +func toKafkaSASL(cfg map[string]any) *otelcol.KafkaSASLArguments { if cfg == nil { return nil } - return &kafka.SASLArguments{ + return &otelcol.KafkaSASLArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), Mechanism: cfg["mechanism"].(string), @@ -110,12 +110,12 @@ func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { } } -func toKafkaAWSMSK(cfg map[string]any) kafka.AWSMSKArguments { +func toKafkaAWSMSK(cfg map[string]any) otelcol.KafkaAWSMSKArguments { if cfg == nil { - return kafka.AWSMSKArguments{} + return otelcol.KafkaAWSMSKArguments{} } - return kafka.AWSMSKArguments{ + return otelcol.KafkaAWSMSKArguments{ Region: cfg["region"].(string), BrokerAddr: cfg["broker_addr"].(string), } @@ -136,12 +136,12 @@ func toKafkaTLSClientArguments(cfg map[string]any) *otelcol.TLSClientArguments { return &res } -func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { +func toKafkaKerberos(cfg map[string]any) *otelcol.KafkaKerberosArguments { if cfg == nil { return nil } - return &kafka.KerberosArguments{ + return &otelcol.KafkaKerberosArguments{ ServiceName: cfg["service_name"].(string), Realm: cfg["realm"].(string), UseKeyTab: cfg["use_keytab"].(bool), @@ -152,15 +152,15 @@ func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { } } -func toKafkaMetadata(cfg kafkaexporter.Metadata) kafka.MetadataArguments { - return kafka.MetadataArguments{ +func toKafkaMetadata(cfg kafkaexporter.Metadata) otelcol.KafkaMetadataArguments { + return otelcol.KafkaMetadataArguments{ IncludeAllTopics: cfg.Full, Retry: toKafkaRetry(cfg.Retry), } } -func toKafkaRetry(cfg kafkaexporter.MetadataRetry) kafka.MetadataRetryArguments { - return kafka.MetadataRetryArguments{ +func toKafkaRetry(cfg kafkaexporter.MetadataRetry) otelcol.KafkaMetadataRetryArguments { + return otelcol.KafkaMetadataRetryArguments{ MaxRetries: cfg.Max, Backoff: cfg.Backoff, }