diff --git a/.chloggen/prw5xx.yaml b/.chloggen/prw5xx.yaml new file mode 100755 index 000000000000..9febfe6a2265 --- /dev/null +++ b/.chloggen/prw5xx.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Retry on 5xx status codes using `cenkalti/backoff` client + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20304] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 296e13d3dac8..dd2899ade7f1 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -15,6 +15,7 @@ import ( "strings" "sync" + "github.com/cenkalti/backoff/v4" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" @@ -22,6 +23,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" @@ -41,6 +43,7 @@ type prwExporter struct { userAgentHeader string clientSettings *confighttp.HTTPClientSettings settings component.TelemetrySettings + retrySettings exporterhelper.RetrySettings wal *prweWAL exporterSettings prometheusremotewrite.Settings @@ -68,6 +71,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err concurrency: cfg.RemoteWriteQueue.NumConsumers, clientSettings: &cfg.HTTPClientSettings, settings: set.TelemetrySettings, + retrySettings: cfg.RetrySettings, exporterSettings: prometheusremotewrite.Settings{ Namespace: cfg.Namespace, ExternalLabels: sanitizedLabels, @@ -216,46 +220,67 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq } func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { - // Uses proto.Marshal to convert the WriteRequest into bytes array - data, err := proto.Marshal(writeReq) - if err != nil { - return consumererror.NewPermanent(err) - } - buf := make([]byte, len(data), cap(data)) - compressedData := snappy.Encode(buf, data) + // Retry function for backoff + retryFunc := func() error { + // Uses proto.Marshal to convert the WriteRequest into bytes array + data, err := proto.Marshal(writeReq) + if err != nil { + return backoff.Permanent(consumererror.NewPermanent(err)) + } + buf := make([]byte, len(data), cap(data)) + compressedData := snappy.Encode(buf, data) - // Create the HTTP POST request to send to the endpoint - req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) - if err != nil { - return consumererror.NewPermanent(err) + // Create the HTTP POST request to send to the endpoint + req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) + if err != nil { + return backoff.Permanent(consumererror.NewPermanent(err)) + } + + // Add necessary headers specified by: + // https://cortexmetrics.io/docs/apis/#remote-api + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + req.Header.Set("User-Agent", prwe.userAgentHeader) + + resp, err := prwe.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // 2xx status code is considered a success + // 5xx errors are recoverable and the exporter should retry + // Reference for different behavior according to status code: + // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, 256)) + rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body) + if resp.StatusCode >= 500 && resp.StatusCode < 600 { + return rerr + } + return backoff.Permanent(consumererror.NewPermanent(rerr)) } - // Add necessary headers specified by: - // https://cortexmetrics.io/docs/apis/#remote-api - req.Header.Add("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - req.Header.Set("User-Agent", prwe.userAgentHeader) + // Use the BackOff instance to retry the func with exponential backoff. + err := backoff.Retry(retryFunc, &backoff.ExponentialBackOff{ + InitialInterval: prwe.retrySettings.InitialInterval, + RandomizationFactor: prwe.retrySettings.RandomizationFactor, + Multiplier: prwe.retrySettings.Multiplier, + MaxInterval: prwe.retrySettings.MaxInterval, + MaxElapsedTime: prwe.retrySettings.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + }) - resp, err := prwe.client.Do(req) if err != nil { return consumererror.NewPermanent(err) } - defer resp.Body.Close() - // 2xx status code is considered a success - // 5xx errors are recoverable and the exporter should retry - // Reference for different behavior according to status code: - // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - return nil - } - body, err := io.ReadAll(io.LimitReader(resp.Body, 256)) - rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body) - if resp.StatusCode >= 500 && resp.StatusCode < 600 { - return rerr - } - return consumererror.NewPermanent(rerr) + return err } func (prwe *prwExporter) walEnabled() bool { return prwe.wal != nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 35ff6028e427..476f2e2c44d0 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -11,6 +11,7 @@ import ( "net/url" "sync" "testing" + "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -22,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" @@ -667,6 +669,13 @@ func Test_PushMetrics(t *testing.T) { defer server.Close() + // Adjusted retry settings for faster testing + retrySettings := exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } cfg := &Config{ Namespace: "", HTTPClientSettings: confighttp.HTTPClientSettings{ @@ -682,6 +691,7 @@ func Test_PushMetrics(t *testing.T) { CreatedMetric: &CreatedMetric{ Enabled: true, }, + RetrySettings: retrySettings, } if useWAL { @@ -978,3 +988,64 @@ func TestWALOnExporterRoundTrip(t *testing.T) { assert.Equal(t, want, gotFromUpload) assert.Equal(t, gotFromWAL, gotFromUpload) } + +func TestRetryOn5xx(t *testing.T) { + // Create a mock HTTP server with a counter to simulate a 5xx error on the first attempt and a 2xx success on the second attempt + attempts := 0 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if attempts < 4 { + attempts++ + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer mockServer.Close() + + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(t, err) + + // Create the prwExporter + exporter := &prwExporter{ + endpointURL: endpointURL, + client: http.DefaultClient, + } + + ctx := context.Background() + + // Execute the write request and verify that the exporter returns a non-permanent error on the first attempt. + err = exporter.execute(ctx, &prompb.WriteRequest{}) + assert.NoError(t, err) + assert.Equal(t, 4, attempts) +} + +func TestNoRetryOn4xx(t *testing.T) { + // Create a mock HTTP server with a counter to simulate a 4xx error + attempts := 0 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if attempts < 1 { + attempts++ + http.Error(w, "Bad Request", http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer mockServer.Close() + + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(t, err) + + // Create the prwExporter + exporter := &prwExporter{ + endpointURL: endpointURL, + client: http.DefaultClient, + } + + ctx := context.Background() + + // Execute the write request and verify that the exporter returns an error due to the 4xx response. + err = exporter.execute(ctx, &prompb.WriteRequest{}) + assert.Error(t, err) + assert.True(t, consumererror.IsPermanent(err)) + assert.Equal(t, 1, attempts) +} diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 859612a8a36a..0e3aa6fc898a 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -57,7 +57,6 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, NumConsumers: 1, QueueSize: prwCfg.RemoteWriteQueue.QueueSize, }), - exporterhelper.WithRetry(prwCfg.RetrySettings), exporterhelper.WithStart(prwe.Start), exporterhelper.WithShutdown(prwe.Shutdown), )