Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC-20509 #4

Open
wants to merge 52 commits into
base: add-sematext-exporter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7098a62
Start working on structure for logs
AkhigbeEromo Nov 19, 2024
2160a71
Work on logs config
AkhigbeEromo Nov 22, 2024
c0ca7cb
Removed LogStash settings
AkhigbeEromo Nov 22, 2024
9179c5e
This is what our logs config will look like
AkhigbeEromo Nov 22, 2024
915f691
Add queue settings to log config
AkhigbeEromo Nov 22, 2024
3d8390a
Correct Metrics config
AkhigbeEromo Nov 22, 2024
cf6f43e
remove unnecessary things
AkhigbeEromo Nov 22, 2024
e86c09f
complete Logs Endpoint
AkhigbeEromo Nov 22, 2024
02d5643
A lot of experimentation (Still in progress) is being done, so code i…
AkhigbeEromo Nov 27, 2024
a94b20b
Fixed some typos
AkhigbeEromo Nov 28, 2024
ff00870
Another heavy commit
AkhigbeEromo Nov 29, 2024
a8b67a6
First test failed so i had to do some tinkering and refining and put …
AkhigbeEromo Dec 2, 2024
17c525b
Add hostname tag
AkhigbeEromo Dec 3, 2024
e0b95cd
Clean up code
AkhigbeEromo Dec 3, 2024
5cfc6e1
Work on config.go test
AkhigbeEromo Dec 4, 2024
9ad149b
Add tests for logs support part of writer.go
AkhigbeEromo Dec 4, 2024
e999c1f
Write tests for falt_formatter.go
AkhigbeEromo Dec 4, 2024
5303ef0
Fix lint issues
AkhigbeEromo Dec 4, 2024
14d74d2
Fix lint issues
AkhigbeEromo Dec 5, 2024
d3bb694
Remove unnecessary alias
AkhigbeEromo Dec 5, 2024
b31b205
Add mapstructure for write events
AkhigbeEromo Dec 5, 2024
886b1ac
Fix otel issues
AkhigbeEromo Dec 5, 2024
17e4f25
Fix more lint issues
AkhigbeEromo Dec 5, 2024
8260265
Fix CI issues
AkhigbeEromo Dec 5, 2024
75725c0
App token is not being passed as ._index
AkhigbeEromo Dec 6, 2024
4513518
Fix CI issues, removed "custom" from being an option in the regions
AkhigbeEromo Dec 9, 2024
27ceed9
Fix CI issues
AkhigbeEromo Dec 9, 2024
9bd25b1
Fix lint issues
AkhigbeEromo Dec 9, 2024
5035763
Fix CI issues
AkhigbeEromo Dec 9, 2024
ef17936
Update gendistributions
AkhigbeEromo Dec 9, 2024
8bdc051
Remove unnecessary error return value
AkhigbeEromo Dec 9, 2024
11ba415
Make anchor links lowercase
AkhigbeEromo Dec 9, 2024
91c7494
Fix anchor links
AkhigbeEromo Dec 9, 2024
8be0231
Fix anchor link
AkhigbeEromo Dec 9, 2024
cde2d8f
remove links
AkhigbeEromo Dec 9, 2024
ddd7911
Write more unit tests
AkhigbeEromo Dec 10, 2024
7b06688
Remove unused prameter
AkhigbeEromo Dec 10, 2024
5978836
Fix lint issues
AkhigbeEromo Dec 12, 2024
acfa996
Implement fix that Akshat suggested
AkhigbeEromo Dec 18, 2024
7bc9caf
Correct endpoints
AkhigbeEromo Dec 18, 2024
d4a6453
Fix lint issues
AkhigbeEromo Dec 18, 2024
1fbb2bc
Remove getHostName
AkhigbeEromo Dec 19, 2024
b0c1208
Remove deprecated library
AkhigbeEromo Dec 23, 2024
f087bd5
Replace logrus with zap
AkhigbeEromo Dec 23, 2024
748519a
Fix lint issues
AkhigbeEromo Dec 23, 2024
8c698d3
Add more tests
AkhigbeEromo Dec 24, 2024
de4f331
Remove logrus
AkhigbeEromo Dec 27, 2024
63406b6
Handle errors
AkhigbeEromo Dec 27, 2024
814b006
Fix issue with writer
AkhigbeEromo Dec 27, 2024
032d36e
Remove flatwriter
AkhigbeEromo Dec 27, 2024
3f84e20
Fix lint issues
AkhigbeEromo Dec 27, 2024
5e33287
Remove unnecessary things from exporter
AkhigbeEromo Dec 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions exporter/sematextexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Sematext Exporter
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fsematext%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fsematext) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fsematext%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fsematext) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@AkhigbeEromo](https://www.github.com/AkhigbeEromo) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending metrics to [Sematext Cloud](https://sematext.com/) in Influx line protocol format
Expand All @@ -15,11 +24,11 @@ The following configuration options are supported:
* `payload_max_lines` (default = 1_000) Maximum number of lines allowed per HTTP POST request
* `payload_max_bytes` (default = 300_000) Maximum number of bytes allowed per HTTP POST request
* `metrics_schema` (default = telegraf-prometheus-v2) The chosen metrics schema to write
* `sending_queue` [details here](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.25.0/exporter/exporterhelper/README.md#configuration)
* `sending_queue`
* `enabled` (default = true)
* `num_consumers` (default = 10) The number of consumers from the queue
* `queue_size` (default = 1000) Maximum number of batches allowed in queue at a given time
* `retry_on_failure` [details here](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.25.0/exporter/exporterhelper/README.md#configuration)
* `retry_on_failure`
* `enabled` (default = true)
* `initial_interval` (default = 5s) Time to wait after the first failure before retrying
* `max_interval` (default = 30s) Upper bound on backoff interval
Expand Down
60 changes: 47 additions & 13 deletions exporter/sematextexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ package sematextexporter // import "github.com/open-telemetry/opentelemetry-coll
import (
"fmt"
"strings"
"sync/atomic"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const (
euRegion = "eu"
usRegion = "us"
euMetricsEndpoint = "https://spm-receiver.eu.sematext.com"
euLogsEndpoint = "https://logsene-receiver.eu.sematext.com"
usMetricsEndpoint = "https://spm-receiver.sematext.com"
usLogsEndpoint = "https://logsene-receiver.sematext.com"
)

type Config struct {
confighttp.ClientConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
Expand All @@ -20,37 +30,61 @@ type Config struct {
// - EU
// - US
Region string `mapstructure:"region"`
// MetricsConfig defines the configuration specific to metrics
MetricsConfig `mapstructure:"metrics"`
// LogsConfig defines the configuration specific to logs
LogsConfig `mapstructure:"logs"`
}

type MetricsConfig struct {
// App token is the token of Sematext Monitoring App to which you want to send the metrics.
AppToken string `mapstructure:"app_token"`
AppToken string `mapstructure:"app_token"`
// MetricsEndpoint specifies the endpoint for receiving metrics in Sematext
MetricsEndpoint string `mapstructure:"metrics_endpoint"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
MetricsEndpoint string `mapstructure:"metrics_endpoint"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
// MetricsSchema indicates the metrics schema to emit to line protocol.
// Default: telegraf-prometheus-v2
MetricsSchema string `mapstructure:"metrics_schema"`
MetricsSchema string `mapstructure:"metrics_schema"`
// PayloadMaxLines is the maximum number of line protocol lines to POST in a single request.
PayloadMaxLines int `mapstructure:"payload_max_lines"`
PayloadMaxLines int `mapstructure:"payload_max_lines"`
// PayloadMaxBytes is the maximum number of line protocol bytes to POST in a single request.
PayloadMaxBytes int `mapstructure:"payload_max_bytes"`
PayloadMaxBytes int `mapstructure:"payload_max_bytes"`
}
type LogsConfig struct {
// App token is the token of Sematext Monitoring App to which you want to send the logs.
AppToken string `mapstructure:"app_token"`
// LogsEndpoint specifies the endpoint for receiving logs in Sematext
LogsEndpoint string `mapstructure:"logs_endpoint"`
// LogRequests determines whether request tracking is enabled
LogRequests bool `mapstructure:"logs_requests"`
// LogMaxAge is the max number of days to retain old log files
LogMaxAge int `mapstructure:"logs_max_age"`
// LogMaxBackups is the maximum number of old log files to retain.
LogMaxBackups int `mapstructure:"logs_max_backups"`
// LogMaxSize is the maximum size in megabytes of the log file before it gets rotated
LogMaxSize int `mapstructure:"logs_max_size"`
// WriteEvents determines if events are logged
WriteEvents atomic.Bool `mapstructure:"write_events"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why is WriteEvents an atomic.Bool? Is it something that can be updated in multiple threads concurrently?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was what was defined in the agent so I literally copied and pasted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read up on Atomic Counters in Go. It'll help you make a decision if this is really required or not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just read up on it and yes I believe its required

}

// Validate checks for invalid or missing entries in the configuration.
func (cfg *Config) Validate() error {
if strings.ToLower(cfg.Region) != "eu" && strings.ToLower(cfg.Region) != "us" && strings.ToLower(cfg.Region) != "custom"{
if strings.ToLower(cfg.Region) != euRegion && strings.ToLower(cfg.Region) != usRegion {
return fmt.Errorf("invalid region: %s. please use either 'EU' or 'US'", cfg.Region)
}
if len(cfg.AppToken) != 36 {
return fmt.Errorf("invalid app_token: %s. app_token should be 36 characters", cfg.AppToken)
if len(cfg.MetricsConfig.AppToken) != 36 {
return fmt.Errorf("invalid metrics app_token: %s. app_token should be 36 characters", cfg.MetricsConfig.AppToken)
}
if len(cfg.LogsConfig.AppToken) != 36 {
return fmt.Errorf("invalid logs app_token: %s. app_token should be 36 characters", cfg.LogsConfig.AppToken)
}
if strings.ToLower(cfg.Region) == "eu" {
cfg.MetricsEndpoint ="https://spm-receiver.eu.sematext.com"
if strings.ToLower(cfg.Region) == euRegion {
cfg.MetricsEndpoint = euMetricsEndpoint
cfg.LogsEndpoint = euLogsEndpoint
}
if strings.ToLower(cfg.Region) == "us"{
cfg.MetricsEndpoint ="https://spm-receiver.sematext.com"
if strings.ToLower(cfg.Region) == usRegion {
cfg.MetricsEndpoint = usMetricsEndpoint
cfg.LogsEndpoint = usLogsEndpoint
}

return nil
Expand Down
18 changes: 13 additions & 5 deletions exporter/sematextexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "override-config"),
expected: &Config{
ClientConfig: confighttp.ClientConfig{
Timeout: 500 * time.Millisecond,
Headers: map[string]configopaque.String{"User-Agent": "OpenTelemetry -> Sematext"},
Timeout: 500 * time.Millisecond,
Headers: map[string]configopaque.String{"User-Agent": "OpenTelemetry -> Sematext"},
},
MetricsConfig: MetricsConfig{
MetricsEndpoint: "https://spm-receiver.sematext.com",
Expand All @@ -49,12 +49,20 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 3,
QueueSize: 10,
},
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
MetricsSchema: "telegraf-prometheus-v2",
PayloadMaxLines: 72,
PayloadMaxBytes: 27,
},

LogsConfig: LogsConfig{
AppToken: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
LogsEndpoint: "https://logsene-receiver.sematext.com",
LogRequests: true,
LogMaxAge: 2,
LogMaxBackups: 10,
LogMaxSize: 10,
},

BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
Expand All @@ -63,7 +71,7 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
Region: "US",
Region: "US",
},
},
}
Expand Down
148 changes: 148 additions & 0 deletions exporter/sematextexporter/es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sematextexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sematextexporter"
import (
"fmt"
"os"
"reflect"
"strings"
"time"

json "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using a deprecated lib. Replace with https://github.com/elastic/go-elasticsearch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"github.com/sirupsen/logrus"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"github.com/sirupsen/logrus" is in maintenance mode. Lets use something that is being used by other exporters as well like "go.uber.org/zap". See them for the appropriate usage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"golang.org/x/net/context"
)

// artificialDocType designates a syntenic doc type for ES documents
const artificialDocType = "_doc"

type group struct {
client *elastic.Client
token string
}

type client struct {
clients map[string]group
config *Config
logger *logrus.Logger
writer FlatWriter
hostname string
}

// Client represents a minimal interface client implementation has to satisfy.
type Client interface {
Bulk(body any, config *Config) error
}

// NewClient creates a new instance of ES client that internally stores a reference
// to both, event and log receivers.
func newClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client, error) {
clients := make(map[string]group)

// client for shipping to logsene
if config.LogsConfig.AppToken != "" {
c, err := elastic.NewClient(elastic.SetURL(config.LogsEndpoint), elastic.SetSniff(false), elastic.SetHealthcheckTimeout(time.Second*2))
if err != nil {
return nil, err
}
defer c.Stop()
clients[config.LogsEndpoint] = group{
client: c,
token: config.LogsConfig.AppToken,
}
}
hostname := getHostname()

return &client{
clients: clients,
config: config,
logger: logger,
writer: writer,
hostname: hostname,
}, nil
}

// Bulk processes a batch of documents and sends them to the specified LogsEndpoint.
func (c *client) Bulk(body any, config *Config) error {
if grp, ok := c.clients[config.LogsEndpoint]; ok {
bulkRequest := grp.client.Bulk()
if reflect.TypeOf(body).Kind() == reflect.Slice {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using reflect really necessary here?

Copy link
Collaborator Author

@AkhigbeEromo AkhigbeEromo Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes
But what would you suggest
I tried making changes and it broke my code

v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
doc := v.Index(i).Interface()
if docMap, ok := doc.(map[string]any); ok {
docMap["os.host"] = c.hostname
}

req := elastic.NewBulkIndexRequest().
Index(grp.token).
Type(artificialDocType).
Doc(doc)
bulkRequest.Add(req)
}
}

if bulkRequest.NumberOfActions() > 0 {
payloadBytes, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("failed to serialize payload: %w", err)
}

// Print or log the payload(Will delete this once everything is good)
fmt.Printf("Payload being sent to Sematext:\n%s\n", string(payloadBytes))

if c.config.LogRequests {
c.logger.Infof("Sending bulk to %s", config.LogsEndpoint)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
res, err := bulkRequest.Do(ctx)
if err != nil {
c.writePayload(string(payloadBytes), err.Error())
return err
}
if res.Errors {
for _, item := range res.Failed() {
if item.Error != nil {
c.logger.Errorf("Document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason)
}
}
}

c.writePayload(string(payloadBytes), "200")
return nil
}
}
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)
}

// writePayload writes a formatted payload along with its status to the configured writer.
func (c *client) writePayload(payload string, status string) {
if c.config.WriteEvents.Load() {
c.writer.Write(formatl(payload, status))
} else {
c.logger.Debugf("WriteEvents disabled. Payload: %s, Status: %s", payload, status)
}
}

// Formatl delimits and formats the response returned by receiver.
func formatl(payload string, status string) string {
s := strings.TrimLeft(status, "\n")
i := strings.Index(s, "\n")
if i > 0 {
s = fmt.Sprintf("%s...", s[:i])
}
return fmt.Sprintf("%s %s", strings.TrimSpace(payload), s)
}

// getHostname retrieves the current machine's hostname.
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
return "None"
}
return hostname
}
akshatagarwl marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading