Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC-20509 #4

Open
wants to merge 52 commits into
base: add-sematext-exporter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7098a62
Start working on structure for logs
AkhigbeEromo Nov 19, 2024
2160a71
Work on logs config
AkhigbeEromo Nov 22, 2024
c0ca7cb
Removed LogStash settings
AkhigbeEromo Nov 22, 2024
9179c5e
This is what our logs config will look like
AkhigbeEromo Nov 22, 2024
915f691
Add queue settings to log config
AkhigbeEromo Nov 22, 2024
3d8390a
Correct Metrics config
AkhigbeEromo Nov 22, 2024
cf6f43e
remove unnecessary things
AkhigbeEromo Nov 22, 2024
e86c09f
complete Logs Endpoint
AkhigbeEromo Nov 22, 2024
02d5643
A lot of experimentation (Still in progress) is being done, so code i…
AkhigbeEromo Nov 27, 2024
a94b20b
Fixed some typos
AkhigbeEromo Nov 28, 2024
ff00870
Another heavy commit
AkhigbeEromo Nov 29, 2024
a8b67a6
First test failed so i had to do some tinkering and refining and put …
AkhigbeEromo Dec 2, 2024
17c525b
Add hostname tag
AkhigbeEromo Dec 3, 2024
e0b95cd
Clean up code
AkhigbeEromo Dec 3, 2024
5cfc6e1
Work on config.go test
AkhigbeEromo Dec 4, 2024
9ad149b
Add tests for logs support part of writer.go
AkhigbeEromo Dec 4, 2024
e999c1f
Write tests for falt_formatter.go
AkhigbeEromo Dec 4, 2024
5303ef0
Fix lint issues
AkhigbeEromo Dec 4, 2024
14d74d2
Fix lint issues
AkhigbeEromo Dec 5, 2024
d3bb694
Remove unnecessary alias
AkhigbeEromo Dec 5, 2024
b31b205
Add mapstructure for write events
AkhigbeEromo Dec 5, 2024
886b1ac
Fix otel issues
AkhigbeEromo Dec 5, 2024
17e4f25
Fix more lint issues
AkhigbeEromo Dec 5, 2024
8260265
Fix CI issues
AkhigbeEromo Dec 5, 2024
75725c0
App token is not being passed as ._index
AkhigbeEromo Dec 6, 2024
4513518
Fix CI issues, removed "custom" from being an option in the regions
AkhigbeEromo Dec 9, 2024
27ceed9
Fix CI issues
AkhigbeEromo Dec 9, 2024
9bd25b1
Fix lint issues
AkhigbeEromo Dec 9, 2024
5035763
Fix CI issues
AkhigbeEromo Dec 9, 2024
ef17936
Update gendistributions
AkhigbeEromo Dec 9, 2024
8bdc051
Remove unnecessary error return value
AkhigbeEromo Dec 9, 2024
11ba415
Make anchor links lowercase
AkhigbeEromo Dec 9, 2024
91c7494
Fix anchor links
AkhigbeEromo Dec 9, 2024
8be0231
Fix anchor link
AkhigbeEromo Dec 9, 2024
cde2d8f
remove links
AkhigbeEromo Dec 9, 2024
ddd7911
Write more unit tests
AkhigbeEromo Dec 10, 2024
7b06688
Remove unused prameter
AkhigbeEromo Dec 10, 2024
5978836
Fix lint issues
AkhigbeEromo Dec 12, 2024
acfa996
Implement fix that Akshat suggested
AkhigbeEromo Dec 18, 2024
7bc9caf
Correct endpoints
AkhigbeEromo Dec 18, 2024
d4a6453
Fix lint issues
AkhigbeEromo Dec 18, 2024
1fbb2bc
Remove getHostName
AkhigbeEromo Dec 19, 2024
b0c1208
Remove deprecated library
AkhigbeEromo Dec 23, 2024
f087bd5
Replace logrus with zap
AkhigbeEromo Dec 23, 2024
748519a
Fix lint issues
AkhigbeEromo Dec 23, 2024
8c698d3
Add more tests
AkhigbeEromo Dec 24, 2024
de4f331
Remove logrus
AkhigbeEromo Dec 27, 2024
63406b6
Handle errors
AkhigbeEromo Dec 27, 2024
814b006
Fix issue with writer
AkhigbeEromo Dec 27, 2024
032d36e
Remove flatwriter
AkhigbeEromo Dec 27, 2024
3f84e20
Fix lint issues
AkhigbeEromo Dec 27, 2024
5e33287
Remove unnecessary things from exporter
AkhigbeEromo Dec 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions exporter/sematextexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Sematext Exporter
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fsematext%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fsematext) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fsematext%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fsematext) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@AkhigbeEromo](https://www.github.com/AkhigbeEromo) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending metrics to [Sematext Cloud](https://sematext.com/) in Influx line protocol format
Expand All @@ -15,11 +24,11 @@ The following configuration options are supported:
* `payload_max_lines` (default = 1_000) Maximum number of lines allowed per HTTP POST request
* `payload_max_bytes` (default = 300_000) Maximum number of bytes allowed per HTTP POST request
* `metrics_schema` (default = telegraf-prometheus-v2) The chosen metrics schema to write
* `sending_queue` [details here](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.25.0/exporter/exporterhelper/README.md#configuration)
* `sending_queue`
* `enabled` (default = true)
* `num_consumers` (default = 10) The number of consumers from the queue
* `queue_size` (default = 1000) Maximum number of batches allowed in queue at a given time
* `retry_on_failure` [details here](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.25.0/exporter/exporterhelper/README.md#configuration)
* `retry_on_failure`
* `enabled` (default = true)
* `initial_interval` (default = 5s) Time to wait after the first failure before retrying
* `max_interval` (default = 30s) Upper bound on backoff interval
Expand Down
51 changes: 38 additions & 13 deletions exporter/sematextexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const (
euRegion = "eu"
usRegion = "us"
euMetricsEndpoint = "https://spm-receiver.eu.sematext.com"
euLogsEndpoint = "https://logsene-receiver.eu.sematext.com"
usMetricsEndpoint = "https://spm-receiver.sematext.com"
usLogsEndpoint = "https://logsene-receiver.sematext.com"
)

type Config struct {
confighttp.ClientConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
Expand All @@ -20,37 +29,53 @@ type Config struct {
// - EU
// - US
Region string `mapstructure:"region"`
// MetricsConfig defines the configuration specific to metrics
MetricsConfig `mapstructure:"metrics"`
// LogsConfig defines the configuration specific to logs
LogsConfig `mapstructure:"logs"`
}

type MetricsConfig struct {
// App token is the token of Sematext Monitoring App to which you want to send the metrics.
AppToken string `mapstructure:"app_token"`
AppToken string `mapstructure:"app_token"`
// MetricsEndpoint specifies the endpoint for receiving metrics in Sematext
MetricsEndpoint string `mapstructure:"metrics_endpoint"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
MetricsEndpoint string `mapstructure:"metrics_endpoint"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
// MetricsSchema indicates the metrics schema to emit to line protocol.
// Default: telegraf-prometheus-v2
MetricsSchema string `mapstructure:"metrics_schema"`
MetricsSchema string `mapstructure:"metrics_schema"`
// PayloadMaxLines is the maximum number of line protocol lines to POST in a single request.
PayloadMaxLines int `mapstructure:"payload_max_lines"`
PayloadMaxLines int `mapstructure:"payload_max_lines"`
// PayloadMaxBytes is the maximum number of line protocol bytes to POST in a single request.
PayloadMaxBytes int `mapstructure:"payload_max_bytes"`
PayloadMaxBytes int `mapstructure:"payload_max_bytes"`
}
type LogsConfig struct {
// App token is the token of Sematext Monitoring App to which you want to send the logs.
AppToken string `mapstructure:"app_token"`
// LogsEndpoint specifies the endpoint for receiving logs in Sematext
LogsEndpoint string `mapstructure:"logs_endpoint"`
// LogRequests determines whether request tracking is enabled
// LogRequests bool `mapstructure:"logs_requests"`
}

// Validate checks for invalid or missing entries in the configuration.
func (cfg *Config) Validate() error {
if strings.ToLower(cfg.Region) != "eu" && strings.ToLower(cfg.Region) != "us" && strings.ToLower(cfg.Region) != "custom"{
if strings.ToLower(cfg.Region) != euRegion && strings.ToLower(cfg.Region) != usRegion {
return fmt.Errorf("invalid region: %s. please use either 'EU' or 'US'", cfg.Region)
}
if len(cfg.AppToken) != 36 {
return fmt.Errorf("invalid app_token: %s. app_token should be 36 characters", cfg.AppToken)
if len(cfg.MetricsConfig.AppToken) != 36 {
return fmt.Errorf("invalid metrics app_token: %s. app_token should be 36 characters", cfg.MetricsConfig.AppToken)
}
if len(cfg.LogsConfig.AppToken) != 36 {
return fmt.Errorf("invalid logs app_token: %s. app_token should be 36 characters", cfg.LogsConfig.AppToken)
}
if strings.ToLower(cfg.Region) == "eu" {
cfg.MetricsEndpoint ="https://spm-receiver.eu.sematext.com"
if strings.ToLower(cfg.Region) == euRegion {
cfg.MetricsEndpoint = euMetricsEndpoint
cfg.LogsEndpoint = euLogsEndpoint
}
if strings.ToLower(cfg.Region) == "us"{
cfg.MetricsEndpoint ="https://spm-receiver.sematext.com"
if strings.ToLower(cfg.Region) == usRegion {
cfg.MetricsEndpoint = usMetricsEndpoint
cfg.LogsEndpoint = usLogsEndpoint
}

return nil
Expand Down
98 changes: 93 additions & 5 deletions exporter/sematextexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "override-config"),
expected: &Config{
ClientConfig: confighttp.ClientConfig{
Timeout: 500 * time.Millisecond,
Headers: map[string]configopaque.String{"User-Agent": "OpenTelemetry -> Sematext"},
Timeout: 500 * time.Millisecond,
Headers: map[string]configopaque.String{"User-Agent": "OpenTelemetry -> Sematext"},
},
MetricsConfig: MetricsConfig{
MetricsEndpoint: "https://spm-receiver.sematext.com",
Expand All @@ -49,12 +49,16 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 3,
QueueSize: 10,
},
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
MetricsSchema: "telegraf-prometheus-v2",
PayloadMaxLines: 72,
PayloadMaxBytes: 27,
},

LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
LogsEndpoint: "https://logsene-receiver.sematext.com",
},

BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
Expand All @@ -63,7 +67,7 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
Region: "US",
Region: "us",
},
},
}
Expand All @@ -82,3 +86,87 @@ func TestLoadConfig(t *testing.T) {
})
}
}
func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config *Config
expectError bool
}{
{
name: "Valid configuration 1",
config: &Config{
Region: "US",
MetricsConfig: MetricsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
},
expectError: false,
},
{
name: "Valid configuration 2",
config: &Config{
Region: "EU",
MetricsConfig: MetricsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
},
expectError: false,
},
{
name: "Invalid region",
config: &Config{
Region: "ASIA",
MetricsConfig: MetricsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
},
expectError: true,
},
{
name: "Invalid metrics AppToken length",
config: &Config{
Region: "US",
MetricsConfig: MetricsConfig{
AppToken: "short-token",
},
LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
},
expectError: true,
},
{
name: "Invalid logs AppToken length",
config: &Config{
Region: "EU",
MetricsConfig: MetricsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
},
LogsConfig: LogsConfig{
AppToken: "short-token",
},
},
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.Validate()
if tt.expectError {
assert.Error(t, err, "Expected an error for invalid configuration")
} else {
assert.NoError(t, err, "Expected no error for valid configuration")
}
})
}
}
124 changes: 124 additions & 0 deletions exporter/sematextexporter/es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sematextexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sematextexporter"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"time"

"github.com/elastic/go-elasticsearch"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
)

type group struct {
client *elasticsearch.Client
token string
}

type client struct {
clients map[string]group
config *Config
logger *zap.Logger
hostname string
}

// Client represents a minimal interface client implementation has to satisfy.
type Client interface {
Bulk(body any, config *Config) error
}

// NewClient creates a new instance of ES client that internally stores a reference
// to both event and log receivers.
func newClient(config *Config, logger *zap.Logger) (Client, error) {
clients := make(map[string]group)

// Client for shipping to logsene
if config.LogsConfig.AppToken != "" {
c, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{config.LogsEndpoint},
})
if err != nil {
logger.Error("Failed to create Elasticsearch client", zap.Error(err))
return nil, fmt.Errorf("elasticsearch client creation failed: %w", err)
}
clients[config.LogsEndpoint] = group{
client: c,
token: config.LogsConfig.AppToken,
}
}

hostname, err := os.Hostname()
if err != nil {
logger.Warn("Could not retrieve hostname", zap.Error(err))
}

return &client{
clients: clients,
config: config,
logger: logger,
hostname: hostname,
}, nil
}

// Bulk processes a batch of documents and sends them to the specified LogsEndpoint.
func (c *client) Bulk(body any, config *Config) error {
grp, ok := c.clients[config.LogsEndpoint]
if !ok {
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)
}

var bulkBuffer bytes.Buffer

if reflect.TypeOf(body).Kind() == reflect.Slice {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
doc := v.Index(i).Interface()
if docMap, ok := doc.(map[string]any); ok {
docMap["os.host"] = c.hostname
}

meta := map[string]map[string]string{
"index": {"_index": grp.token},
}
metaBytes, _ := json.Marshal(meta)
docBytes, _ := json.Marshal(doc)

bulkBuffer.Write(metaBytes)
bulkBuffer.WriteByte('\n')
bulkBuffer.Write(docBytes)
bulkBuffer.WriteByte('\n')
}
}

if bulkBuffer.Len() > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

req := esapi.BulkRequest{
Body: bytes.NewReader(bulkBuffer.Bytes()),
}

res, err := req.Do(ctx, grp.client)
if err != nil {
c.logger.Error("Bulk request failed", zap.Error(err))
return err
}
defer res.Body.Close()

if res.IsError() {
c.logger.Error("Bulk request returned an error", zap.String("response", res.String()))
return fmt.Errorf("bulk request error: %s", res.String())
}

c.logger.Info("Bulk request successful", zap.String("response", res.String()))
}

return nil
}
Loading
Loading