Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewriteexporter] Retry on 5xx status codes using cenkalti/backoff client #23842

Merged
merged 26 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
79d41dd
Add chlog
vasireddy99 Jul 25, 2023
2630d06
Use retryablehttp client
vasireddy99 Jul 27, 2023
31ff420
Modify Test case to count retry attempts
vasireddy99 Jul 27, 2023
8df157c
Use require.NoError
vasireddy99 Jul 28, 2023
4242c3b
Rebased
vasireddy99 Jul 31, 2023
fc6099e
Use retry settings form cfg
vasireddy99 Aug 3, 2023
f03e816
Use cenkalti/backoff for retry
vasireddy99 Aug 10, 2023
243f3ac
Add 4xx test case
vasireddy99 Aug 10, 2023
188c426
Add Retry in testcase to run faster
vasireddy99 Aug 10, 2023
aa9d7bb
Pass retry settings inline
vasireddy99 Aug 11, 2023
e5fa7e9
Wrap to permanent error
vasireddy99 Aug 11, 2023
fe660eb
Return Error
vasireddy99 Aug 15, 2023
aeadb69
Merge branch 'main' into prw5xx
vasireddy99 Aug 15, 2023
9c8840c
Merge branch 'main' into prw5xx
Aneurysm9 Aug 15, 2023
ee0b776
Merge branch 'main' into prw5xx
vasireddy99 Aug 18, 2023
5d08920
Merge branch 'main' into prw5xx
vasireddy99 Aug 18, 2023
80f484a
Merge branch 'main' into prw5xx
Aneurysm9 Aug 18, 2023
8261cfc
Merge branch 'main' into prw5xx
vasireddy99 Aug 21, 2023
70d7dc3
Merge branch 'main' into prw5xx
vasireddy99 Aug 21, 2023
6751b22
Remove withRetry from exporterhelper
vasireddy99 Aug 22, 2023
51ae3d2
Merge branch 'main' into prw5xx
vasireddy99 Aug 22, 2023
43341dd
Merge branch 'main' into prw5xx
vasireddy99 Aug 23, 2023
311966b
Merge branch 'main' into prw5xx
vasireddy99 Aug 28, 2023
fb3c596
Merge branch 'main' into prw5xx
vasireddy99 Aug 28, 2023
eca1af4
Merge branch 'main' into prw5xx
Aneurysm9 Aug 29, 2023
cb139d3
Merge branch 'main' into prw5xx
Aneurysm9 Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .chloggen/prw5xx.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Retry on 5xx status codes using `cenkalti/backoff` client

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20304]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
89 changes: 57 additions & 32 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"strings"
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

Expand All @@ -41,6 +43,7 @@ type prwExporter struct {
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
retrySettings exporterhelper.RetrySettings

wal *prweWAL
exporterSettings prometheusremotewrite.Settings
Expand Down Expand Up @@ -68,6 +71,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
retrySettings: cfg.RetrySettings,
vasireddy99 marked this conversation as resolved.
Show resolved Hide resolved
exporterSettings: prometheusremotewrite.Settings{
Namespace: cfg.Namespace,
ExternalLabels: sanitizedLabels,
Expand Down Expand Up @@ -216,46 +220,67 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(writeReq)
if err != nil {
return consumererror.NewPermanent(err)
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)
// Retry function for backoff
retryFunc := func() error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(writeReq)
if err != nil {
return backoff.Permanent(consumererror.NewPermanent(err))
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return consumererror.NewPermanent(err)
// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return backoff.Permanent(consumererror.NewPermanent(err))
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

resp, err := prwe.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}

body, err := io.ReadAll(io.LimitReader(resp.Body, 256))
rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body)
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
return rerr
}
return backoff.Permanent(consumererror.NewPermanent(rerr))
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)
// Use the BackOff instance to retry the func with exponential backoff.
err := backoff.Retry(retryFunc, &backoff.ExponentialBackOff{
InitialInterval: prwe.retrySettings.InitialInterval,
RandomizationFactor: prwe.retrySettings.RandomizationFactor,
Multiplier: prwe.retrySettings.Multiplier,
MaxInterval: prwe.retrySettings.MaxInterval,
MaxElapsedTime: prwe.retrySettings.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
})

resp, err := prwe.client.Do(req)
if err != nil {
return consumererror.NewPermanent(err)
}
defer resp.Body.Close()

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
body, err := io.ReadAll(io.LimitReader(resp.Body, 256))
rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body)
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
return rerr
}
return consumererror.NewPermanent(rerr)
return err
}

func (prwe *prwExporter) walEnabled() bool { return prwe.wal != nil }
Expand Down
71 changes: 71 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -22,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -667,6 +669,13 @@ func Test_PushMetrics(t *testing.T) {

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
cfg := &Config{
Namespace: "",
HTTPClientSettings: confighttp.HTTPClientSettings{
Expand All @@ -682,6 +691,7 @@ func Test_PushMetrics(t *testing.T) {
CreatedMetric: &CreatedMetric{
Enabled: true,
},
RetrySettings: retrySettings,
}

if useWAL {
Expand Down Expand Up @@ -978,3 +988,64 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
assert.Equal(t, want, gotFromUpload)
assert.Equal(t, gotFromWAL, gotFromUpload)
}

func TestRetryOn5xx(t *testing.T) {
// Create a mock HTTP server with a counter to simulate a 5xx error on the first attempt and a 2xx success on the second attempt
attempts := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if attempts < 4 {
attempts++
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
}))
defer mockServer.Close()

endpointURL, err := url.Parse(mockServer.URL)
require.NoError(t, err)

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
}

ctx := context.Background()

// Execute the write request and verify that the exporter returns a non-permanent error on the first attempt.
err = exporter.execute(ctx, &prompb.WriteRequest{})
assert.NoError(t, err)
assert.Equal(t, 4, attempts)
}

func TestNoRetryOn4xx(t *testing.T) {
// Create a mock HTTP server with a counter to simulate a 4xx error
attempts := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if attempts < 1 {
attempts++
http.Error(w, "Bad Request", http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusOK)
}
}))
defer mockServer.Close()

endpointURL, err := url.Parse(mockServer.URL)
require.NoError(t, err)

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
}

ctx := context.Background()

// Execute the write request and verify that the exporter returns an error due to the 4xx response.
err = exporter.execute(ctx, &prompb.WriteRequest{})
assert.Error(t, err)
assert.True(t, consumererror.IsPermanent(err))
assert.Equal(t, 1, attempts)
}
Loading