-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SC-20509 #4
base: add-sematext-exporter
Are you sure you want to change the base?
SC-20509 #4
Changes from 41 commits
7098a62
2160a71
c0ca7cb
9179c5e
915f691
3d8390a
cf6f43e
e86c09f
02d5643
a94b20b
ff00870
a8b67a6
17c525b
e0b95cd
5cfc6e1
9ad149b
e999c1f
5303ef0
14d74d2
d3bb694
b31b205
886b1ac
17e4f25
8260265
75725c0
4513518
27ceed9
9bd25b1
5035763
ef17936
8bdc051
11ba415
91c7494
8be0231
cde2d8f
ddd7911
7b06688
5978836
acfa996
7bc9caf
d4a6453
1fbb2bc
b0c1208
f087bd5
748519a
8c698d3
de4f331
63406b6
814b006
032d36e
3f84e20
5e33287
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package sematextexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sematextexporter" | ||
import ( | ||
"fmt" | ||
"os" | ||
"reflect" | ||
"strings" | ||
"time" | ||
|
||
json "github.com/json-iterator/go" | ||
"github.com/olivere/elastic/v7" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still using a deprecated lib. Replace with https://github.com/elastic/go-elasticsearch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
"github.com/sirupsen/logrus" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "github.com/sirupsen/logrus" is in maintenance mode. Lets use something that is being used by other exporters as well like "go.uber.org/zap". See them for the appropriate usage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
"golang.org/x/net/context" | ||
) | ||
|
||
// artificialDocType designates a syntenic doc type for ES documents | ||
const artificialDocType = "_doc" | ||
|
||
type group struct { | ||
client *elastic.Client | ||
token string | ||
} | ||
|
||
type client struct { | ||
clients map[string]group | ||
config *Config | ||
logger *logrus.Logger | ||
writer FlatWriter | ||
hostname string | ||
} | ||
|
||
// Client represents a minimal interface client implementation has to satisfy. | ||
type Client interface { | ||
Bulk(body any, config *Config) error | ||
} | ||
|
||
// NewClient creates a new instance of ES client that internally stores a reference | ||
// to both, event and log receivers. | ||
func newClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client, error) { | ||
clients := make(map[string]group) | ||
|
||
// client for shipping to logsene | ||
if config.LogsConfig.AppToken != "" { | ||
c, err := elastic.NewClient(elastic.SetURL(config.LogsEndpoint), elastic.SetSniff(false), elastic.SetHealthcheckTimeout(time.Second*2)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer c.Stop() | ||
clients[config.LogsEndpoint] = group{ | ||
client: c, | ||
token: config.LogsConfig.AppToken, | ||
} | ||
} | ||
hostname := getHostname() | ||
|
||
return &client{ | ||
clients: clients, | ||
config: config, | ||
logger: logger, | ||
writer: writer, | ||
hostname: hostname, | ||
}, nil | ||
} | ||
|
||
// Bulk processes a batch of documents and sends them to the specified LogsEndpoint. | ||
func (c *client) Bulk(body any, config *Config) error { | ||
if grp, ok := c.clients[config.LogsEndpoint]; ok { | ||
bulkRequest := grp.client.Bulk() | ||
if reflect.TypeOf(body).Kind() == reflect.Slice { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is using reflect really necessary here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
v := reflect.ValueOf(body) | ||
for i := 0; i < v.Len(); i++ { | ||
doc := v.Index(i).Interface() | ||
if docMap, ok := doc.(map[string]any); ok { | ||
docMap["os.host"] = c.hostname | ||
} | ||
|
||
req := elastic.NewBulkIndexRequest(). | ||
Index(grp.token). | ||
Type(artificialDocType). | ||
Doc(doc) | ||
bulkRequest.Add(req) | ||
} | ||
} | ||
|
||
if bulkRequest.NumberOfActions() > 0 { | ||
payloadBytes, err := json.Marshal(body) | ||
if err != nil { | ||
return fmt.Errorf("failed to serialize payload: %w", err) | ||
} | ||
|
||
// Print or log the payload(Will delete this once everything is good) | ||
fmt.Printf("Payload being sent to Sematext:\n%s\n", string(payloadBytes)) | ||
|
||
if c.config.LogRequests { | ||
c.logger.Infof("Sending bulk to %s", config.LogsEndpoint) | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) | ||
defer cancel() | ||
res, err := bulkRequest.Do(ctx) | ||
if err != nil { | ||
c.writePayload(string(payloadBytes), err.Error()) | ||
return err | ||
} | ||
if res.Errors { | ||
for _, item := range res.Failed() { | ||
if item.Error != nil { | ||
c.logger.Errorf("Document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason) | ||
} | ||
} | ||
} | ||
|
||
c.writePayload(string(payloadBytes), "200") | ||
return nil | ||
} | ||
} | ||
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint) | ||
} | ||
|
||
// writePayload writes a formatted payload along with its status to the configured writer. | ||
func (c *client) writePayload(payload string, status string) { | ||
if c.config.WriteEvents.Load() { | ||
c.writer.Write(formatl(payload, status)) | ||
} else { | ||
c.logger.Debugf("WriteEvents disabled. Payload: %s, Status: %s", payload, status) | ||
} | ||
} | ||
|
||
// Formatl delimits and formats the response returned by receiver. | ||
func formatl(payload string, status string) string { | ||
s := strings.TrimLeft(status, "\n") | ||
i := strings.Index(s, "\n") | ||
if i > 0 { | ||
s = fmt.Sprintf("%s...", s[:i]) | ||
} | ||
return fmt.Sprintf("%s %s", strings.TrimSpace(payload), s) | ||
} | ||
|
||
// getHostname retrieves the current machine's hostname. | ||
func getHostname() string { | ||
hostname, err := os.Hostname() | ||
if err != nil { | ||
return "None" | ||
} | ||
return hostname | ||
} | ||
akshatagarwl marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why is WriteEvents an
atomic.Bool
? Is it something that can be updated in multiple threads concurrently?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this was what was defined in the agent so I literally copied and pasted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read up on Atomic Counters in Go. It'll help you make a decision if this is really required or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just read up on it and yes I believe its required