diff --git a/internal/component/prometheus/remote/queue/network/benchmark_test.go b/internal/component/prometheus/remote/queue/network/benchmark_test.go new file mode 100644 index 000000000..a1c22328c --- /dev/null +++ b/internal/component/prometheus/remote/queue/network/benchmark_test.go @@ -0,0 +1,24 @@ +package network + +import ( + "context" + "testing" + + "github.com/vladopajic/go-actor/actor" +) + +func BenchmarkMailbox(b *testing.B) { + // This should be 260 ns roughly or 3m messages a second. + mbx := actor.NewMailbox[struct{}]() + mbx.Start() + defer mbx.Stop() + go func() { + for { + <-mbx.ReceiveC() + } + }() + ctx := context.Background() + for i := 0; i < b.N; i++ { + mbx.Send(ctx, struct{}{}) + } +} diff --git a/internal/component/prometheus/remote/queue/network/loop.go b/internal/component/prometheus/remote/queue/network/loop.go new file mode 100644 index 000000000..cbe94f2e4 --- /dev/null +++ b/internal/component/prometheus/remote/queue/network/loop.go @@ -0,0 +1,370 @@ +package network + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/prometheus/prometheus/prompb" + "github.com/vladopajic/go-actor/actor" + "go.uber.org/atomic" +) + +var _ actor.Worker = (*loop)(nil) + +// loop handles the low level sending of data. It's conceptually a queue. +// loop makes no attempt to save or restore signals in the queue. +// loop config cannot be updated, it is easier to recreate. This does mean we lose any signals in the queue. +type loop struct { + isMeta bool + seriesMbx actor.Mailbox[*types.TimeSeriesBinary] + client *http.Client + cfg types.ConnectionConfig + log log.Logger + lastSend time.Time + statsFunc func(s types.NetworkStats) + stopCalled atomic.Bool + externalLabels map[string]string + series []*types.TimeSeriesBinary + self actor.Actor + ticker *time.Ticker + req *prompb.WriteRequest + buf *proto.Buffer + sendBuffer []byte +} + +func newLoop(cc types.ConnectionConfig, isMetaData bool, l log.Logger, stats func(s types.NetworkStats)) *loop { + // TODO @mattdurham add TLS support afer the initial push. + return &loop{ + isMeta: isMetaData, + // In general we want a healthy queue of items, in this case we want to have 2x our maximum send sized ready. + seriesMbx: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(2 * cc.BatchCount)), + client: &http.Client{}, + cfg: cc, + log: log.With(l, "name", "loop", "url", cc.URL), + statsFunc: stats, + externalLabels: cc.ExternalLabels, + ticker: time.NewTicker(1 * time.Second), + buf: proto.NewBuffer(nil), + sendBuffer: make([]byte, 0), + req: &prompb.WriteRequest{ + // We know BatchCount is the most we will ever send. + Timeseries: make([]prompb.TimeSeries, 0, cc.BatchCount), + }, + } +} + +func (l *loop) Start() { + l.self = actor.Combine(l.actors()...).Build() + l.self.Start() +} + +func (l *loop) Stop() { + l.stopCalled.Store(true) + l.self.Stop() +} + +func (l *loop) actors() []actor.Actor { + return []actor.Actor{ + actor.New(l), + l.seriesMbx, + } +} + +func (l *loop) DoWork(ctx actor.Context) actor.WorkerStatus { + // Main select loop + select { + case <-ctx.Done(): + return actor.WorkerEnd + // Ticker is to ensure the flush timer is called. + case <-l.ticker.C: + if len(l.series) == 0 { + return actor.WorkerContinue + } + if time.Since(l.lastSend) > l.cfg.FlushFrequency { + l.trySend(ctx) + } + return actor.WorkerContinue + case series, ok := <-l.seriesMbx.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + l.series = append(l.series, series) + if len(l.series) >= l.cfg.BatchCount { + l.trySend(ctx) + } + return actor.WorkerContinue + } +} + +// trySend is the core functionality for sending data to a endpoint. It will attempt retries as defined in MaxRetryBackoffAttempts. +func (l *loop) trySend(ctx context.Context) { + attempts := 0 + for { + start := time.Now() + result := l.send(ctx, attempts) + duration := time.Since(start) + l.statsFunc(types.NetworkStats{ + SendDuration: duration, + }) + if result.err != nil { + level.Error(l.log).Log("msg", "error in sending telemetry", "err", result.err.Error()) + } + if result.successful { + l.sendingCleanup() + return + } + if !result.recoverableError { + l.sendingCleanup() + return + } + attempts++ + if attempts > int(l.cfg.MaxRetryBackoffAttempts) && l.cfg.MaxRetryBackoffAttempts > 0 { + level.Debug(l.log).Log("msg", "max retry attempts reached", "attempts", attempts) + l.sendingCleanup() + return + } + // This helps us short circuit the loop if we are stopping. + if l.stopCalled.Load() { + return + } + // Sleep between attempts. + time.Sleep(result.retryAfter) + } +} + +type sendResult struct { + err error + successful bool + recoverableError bool + retryAfter time.Duration + statusCode int + networkError bool +} + +func (l *loop) sendingCleanup() { + types.PutTimeSeriesSliceIntoPool(l.series) + l.sendBuffer = l.sendBuffer[:0] + l.series = make([]*types.TimeSeriesBinary, 0, l.cfg.BatchCount) + l.lastSend = time.Now() +} + +// send is the main work loop of the loop. +func (l *loop) send(ctx context.Context, retryCount int) sendResult { + result := sendResult{} + defer func() { + recordStats(l.series, l.isMeta, l.statsFunc, result, len(l.sendBuffer)) + }() + // Check to see if this is a retry and we can reuse the buffer. + // I wonder if we should do this, its possible we are sending things that have exceeded the TTL. + if len(l.sendBuffer) == 0 { + var data []byte + var wrErr error + if l.isMeta { + data, wrErr = createWriteRequestMetadata(l.log, l.req, l.series, l.buf) + } else { + data, wrErr = createWriteRequest(l.req, l.series, l.externalLabels, l.buf) + } + if wrErr != nil { + result.err = wrErr + result.recoverableError = false + return result + } + l.sendBuffer = snappy.Encode(l.sendBuffer, data) + } + + httpReq, err := http.NewRequest("POST", l.cfg.URL, bytes.NewReader(l.sendBuffer)) + if err != nil { + result.err = err + result.recoverableError = true + result.networkError = true + return result + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", l.cfg.UserAgent) + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + httpReq.SetBasicAuth(l.cfg.Username, l.cfg.Password) + + if retryCount > 0 { + httpReq.Header.Set("Retry-Attempt", strconv.Itoa(retryCount)) + } + ctx, cncl := context.WithTimeout(ctx, l.cfg.Timeout) + defer cncl() + resp, err := l.client.Do(httpReq.WithContext(ctx)) + // Network errors are recoverable. + if err != nil { + result.err = err + result.networkError = true + result.recoverableError = true + result.retryAfter = l.cfg.RetryBackoff + return result + } + result.statusCode = resp.StatusCode + defer resp.Body.Close() + // 500 errors are considered recoverable. + if resp.StatusCode/100 == 5 || resp.StatusCode == http.StatusTooManyRequests { + result.err = fmt.Errorf("server responded with status code %d", resp.StatusCode) + result.retryAfter = retryAfterDuration(l.cfg.RetryBackoff, resp.Header.Get("Retry-After")) + result.recoverableError = true + return result + } + // Status Codes that are not 500 or 200 are not recoverable and dropped. + if resp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(resp.Body, 1_000)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + result.err = fmt.Errorf("server returned HTTP status %s: %s", resp.Status, line) + return result + } + + result.successful = true + return result +} + +func createWriteRequest(wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, externalLabels map[string]string, data *proto.Buffer) ([]byte, error) { + if cap(wr.Timeseries) < len(series) { + wr.Timeseries = make([]prompb.TimeSeries, len(series)) + } + wr.Timeseries = wr.Timeseries[:len(series)] + + for i, tsBuf := range series { + ts := wr.Timeseries[i] + if cap(ts.Labels) < len(tsBuf.Labels) { + ts.Labels = make([]prompb.Label, 0, len(tsBuf.Labels)) + } + ts.Labels = ts.Labels[:len(tsBuf.Labels)] + for k, v := range tsBuf.Labels { + ts.Labels[k].Name = v.Name + ts.Labels[k].Value = v.Value + } + + // By default each sample only has a histogram, float histogram or sample. + if cap(ts.Histograms) == 0 { + ts.Histograms = make([]prompb.Histogram, 1) + } else { + ts.Histograms = ts.Histograms[:0] + } + if tsBuf.Histograms.Histogram != nil { + ts.Histograms = ts.Histograms[:1] + ts.Histograms[0] = tsBuf.Histograms.Histogram.ToPromHistogram() + } + if tsBuf.Histograms.FloatHistogram != nil { + ts.Histograms = ts.Histograms[:1] + ts.Histograms[0] = tsBuf.Histograms.FloatHistogram.ToPromFloatHistogram() + } + + if tsBuf.Histograms.Histogram == nil && tsBuf.Histograms.FloatHistogram == nil { + ts.Histograms = ts.Histograms[:0] + } + + // Encode the external labels inside if needed. + for k, v := range externalLabels { + found := false + for j, lbl := range ts.Labels { + if lbl.Name == k { + ts.Labels[j].Value = v + found = true + break + } + } + if !found { + ts.Labels = append(ts.Labels, prompb.Label{ + Name: k, + Value: v, + }) + } + } + // By default each TimeSeries only has one sample. + if len(ts.Samples) == 0 { + ts.Samples = make([]prompb.Sample, 1) + } + ts.Samples[0].Value = tsBuf.Value + ts.Samples[0].Timestamp = tsBuf.TS + wr.Timeseries[i] = ts + } + defer func() { + for i := 0; i < len(wr.Timeseries); i++ { + wr.Timeseries[i].Histograms = wr.Timeseries[i].Histograms[:0] + wr.Timeseries[i].Labels = wr.Timeseries[i].Labels[:0] + wr.Timeseries[i].Exemplars = wr.Timeseries[i].Exemplars[:0] + } + }() + // Reset the buffer for reuse. + data.Reset() + err := data.Marshal(wr) + return data.Bytes(), err +} + +func createWriteRequestMetadata(l log.Logger, wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, data *proto.Buffer) ([]byte, error) { + if cap(wr.Metadata) < len(series) { + wr.Metadata = make([]prompb.MetricMetadata, len(series)) + } else { + wr.Metadata = wr.Metadata[:len(series)] + } + + for i, ts := range series { + mt, valid := toMetadata(ts) + if !valid { + level.Error(l).Log("msg", "invalid metadata was found", "labels", ts.Labels.String()) + continue + } + wr.Metadata[i] = mt + } + data.Reset() + err := data.Marshal(wr) + return data.Bytes(), err +} + +func getMetadataCount(tss []*types.TimeSeriesBinary) int { + var cnt int + for _, ts := range tss { + if isMetadata(ts) { + cnt++ + } + } + return cnt +} + +func isMetadata(ts *types.TimeSeriesBinary) bool { + return ts.Labels.Has(types.MetaType) && + ts.Labels.Has(types.MetaUnit) && + ts.Labels.Has(types.MetaHelp) +} + +func toMetadata(ts *types.TimeSeriesBinary) (prompb.MetricMetadata, bool) { + if !isMetadata(ts) { + return prompb.MetricMetadata{}, false + } + return prompb.MetricMetadata{ + Type: prompb.MetricMetadata_MetricType(prompb.MetricMetadata_MetricType_value[strings.ToUpper(ts.Labels.Get(types.MetaType))]), + Help: ts.Labels.Get(types.MetaHelp), + Unit: ts.Labels.Get(types.MetaUnit), + MetricFamilyName: ts.Labels.Get("__name__"), + }, true +} + +func retryAfterDuration(defaultDuration time.Duration, t string) time.Duration { + if parsedTime, err := time.Parse(http.TimeFormat, t); err == nil { + return time.Until(parsedTime) + } + // The duration can be in seconds. + d, err := strconv.Atoi(t) + if err != nil { + return defaultDuration + } + return time.Duration(d) * time.Second +} diff --git a/internal/component/prometheus/remote/queue/network/manager.go b/internal/component/prometheus/remote/queue/network/manager.go new file mode 100644 index 000000000..19e4f5b96 --- /dev/null +++ b/internal/component/prometheus/remote/queue/network/manager.go @@ -0,0 +1,167 @@ +package network + +import ( + "context" + + "github.com/grafana/alloy/internal/runtime/logging/level" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/vladopajic/go-actor/actor" +) + +// manager manages loops. Mostly it exists to control their lifecycle and send work to them. +type manager struct { + loops []*loop + metadata *loop + logger log.Logger + inbox actor.Mailbox[*types.TimeSeriesBinary] + metaInbox actor.Mailbox[*types.TimeSeriesBinary] + configInbox actor.Mailbox[types.ConnectionConfig] + self actor.Actor + cfg types.ConnectionConfig + stats func(types.NetworkStats) + metaStats func(types.NetworkStats) +} + +var _ types.NetworkClient = (*manager)(nil) + +var _ actor.Worker = (*manager)(nil) + +func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStats func(types.NetworkStats)) (types.NetworkClient, error) { + s := &manager{ + loops: make([]*loop, 0, cc.Connections), + logger: logger, + // This provides blocking to only handle one at a time, so that if a queue blocks + // it will stop the filequeue from feeding more. + inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)), + metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)), + configInbox: actor.NewMailbox[types.ConnectionConfig](), + stats: seriesStats, + cfg: cc, + } + + // start kicks off a number of concurrent connections. + for i := uint64(0); i < s.cfg.Connections; i++ { + l := newLoop(cc, false, logger, seriesStats) + l.self = actor.New(l) + s.loops = append(s.loops, l) + } + + s.metadata = newLoop(cc, true, logger, metadataStats) + s.metadata.self = actor.New(s.metadata) + return s, nil +} + +func (s *manager) Start() { + s.startLoops() + s.configInbox.Start() + s.metaInbox.Start() + s.inbox.Start() + s.self = actor.New(s) + s.self.Start() +} + +func (s *manager) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error { + return s.inbox.Send(ctx, data) +} + +func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error { + return s.metaInbox.Send(ctx, data) +} + +func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error { + return s.configInbox.Send(ctx, cc) +} + +func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { + // This acts as a priority queue, always check for configuration changes first. + select { + case cfg, ok := <-s.configInbox.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + s.updateConfig(cfg) + return actor.WorkerContinue + default: + } + select { + case <-ctx.Done(): + s.Stop() + return actor.WorkerEnd + case ts, ok := <-s.inbox.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + s.queue(ctx, ts) + return actor.WorkerContinue + case ts, ok := <-s.metaInbox.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + err := s.metadata.seriesMbx.Send(ctx, ts) + if err != nil { + level.Error(s.logger).Log("msg", "failed to send to metadata loop", "err", err) + } + return actor.WorkerContinue + } +} + +func (s *manager) updateConfig(cc types.ConnectionConfig) { + // No need to do anything if the configuration is the same. + if s.cfg.Equals(cc) { + return + } + s.cfg = cc + // TODO @mattdurham make this smarter, at the moment any samples in the loops are lost. + // Ideally we would drain the queues and re add them but that is a future need. + // In practice this shouldn't change often so data loss should be minimal. + // For the moment we will stop all the items and recreate them. + level.Debug(s.logger).Log("msg", "dropping all series in loops and creating queue due to config change") + s.stopLoops() + s.loops = make([]*loop, 0, s.cfg.Connections) + var i uint64 + for ; i < s.cfg.Connections; i++ { + l := newLoop(cc, false, s.logger, s.stats) + l.self = actor.New(l) + + s.loops = append(s.loops, l) + } + + s.metadata = newLoop(cc, true, s.logger, s.metaStats) + s.metadata.self = actor.New(s.metadata) + s.startLoops() +} + +func (s *manager) Stop() { + s.stopLoops() + s.configInbox.Stop() + s.metaInbox.Stop() + s.inbox.Stop() + s.self.Stop() +} + +func (s *manager) stopLoops() { + for _, l := range s.loops { + l.Stop() + } + s.metadata.Stop() +} + +func (s *manager) startLoops() { + for _, l := range s.loops { + l.Start() + } + s.metadata.Start() +} + +// Queue adds anything thats not metadata to the queue. +func (s *manager) queue(ctx context.Context, ts *types.TimeSeriesBinary) { + // Based on a hash which is the label hash add to the queue. + queueNum := ts.Hash % s.cfg.Connections + // This will block if the queue is full. + err := s.loops[queueNum].seriesMbx.Send(ctx, ts) + if err != nil { + level.Error(s.logger).Log("msg", "failed to send to loop", "err", err) + } +} diff --git a/internal/component/prometheus/remote/queue/network/manager_test.go b/internal/component/prometheus/remote/queue/network/manager_test.go new file mode 100644 index 000000000..47001fda9 --- /dev/null +++ b/internal/component/prometheus/remote/queue/network/manager_test.go @@ -0,0 +1,312 @@ +package network + +import ( + "context" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/golang/snappy" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/goleak" +) + +func TestSending(t *testing.T) { + defer goleak.VerifyNone(t) + + recordsFound := atomic.Uint32{} + svr := httptest.NewServer(handler(t, http.StatusOK, func(wr *prompb.WriteRequest) { + recordsFound.Add(uint32(len(wr.Timeseries))) + })) + + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 10, + FlushFrequency: 1 * time.Second, + Connections: 4, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {}) + wr.Start() + defer wr.Stop() + require.NoError(t, err) + for i := 0; i < 1_000; i++ { + send(t, wr, ctx) + } + require.Eventually(t, func() bool { + return recordsFound.Load() == 1_000 + }, 10*time.Second, 100*time.Millisecond) +} + +func TestUpdatingConfig(t *testing.T) { + defer goleak.VerifyNone(t) + + recordsFound := atomic.Uint32{} + lastBatchSize := atomic.Uint32{} + svr := httptest.NewServer(handler(t, http.StatusOK, func(wr *prompb.WriteRequest) { + lastBatchSize.Store(uint32(len(wr.Timeseries))) + recordsFound.Add(uint32(len(wr.Timeseries))) + })) + + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 10, + FlushFrequency: 1 * time.Second, + Connections: 4, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {}) + wr.Start() + defer wr.Stop() + + cc2 := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 5 * time.Second, + BatchCount: 100, + FlushFrequency: 1 * time.Second, + Connections: 4, + } + + err = wr.UpdateConfig(context.Background(), cc2) + require.NoError(t, err) + for i := 0; i < 1_000; i++ { + send(t, wr, ctx) + } + require.Eventuallyf(t, func() bool { + return recordsFound.Load() == 1_000 + }, 10*time.Second, 1*time.Second, "record count should be 1000 but is %d", recordsFound.Load()) + + require.Truef(t, lastBatchSize.Load() == 100, "batch_count should be 100 but is %d", lastBatchSize.Load()) +} + +func TestRetry(t *testing.T) { + defer goleak.VerifyNone(t) + + retries := atomic.Uint32{} + var previous *prompb.WriteRequest + svr := httptest.NewServer(handler(t, http.StatusTooManyRequests, func(wr *prompb.WriteRequest) { + retries.Add(1) + // Check that we are getting the same sample back. + if previous == nil { + previous = wr + } else { + require.True(t, previous.Timeseries[0].Labels[0].Value == wr.Timeseries[0].Labels[0].Value) + } + })) + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushFrequency: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + Connections: 1, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {}) + require.NoError(t, err) + wr.Start() + defer wr.Stop() + + for i := 0; i < 10; i++ { + send(t, wr, ctx) + } + require.Eventually(t, func() bool { + done := retries.Load() > 5 + return done + }, 10*time.Second, 1*time.Second) +} + +func TestRetryBounded(t *testing.T) { + defer goleak.VerifyNone(t) + + sends := atomic.Uint32{} + svr := httptest.NewServer(handler(t, http.StatusTooManyRequests, func(wr *prompb.WriteRequest) { + sends.Add(1) + })) + + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushFrequency: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryBackoffAttempts: 1, + Connections: 1, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {}) + wr.Start() + defer wr.Stop() + require.NoError(t, err) + for i := 0; i < 10; i++ { + send(t, wr, ctx) + } + require.Eventually(t, func() bool { + // We send 10 but each one gets retried once so 20 total. + return sends.Load() == 10*2 + }, 2*time.Second, 100*time.Millisecond) + time.Sleep(2 * time.Second) + // Ensure we dont get any more. + require.True(t, sends.Load() == 10*2) +} + +func TestRecoverable(t *testing.T) { + defer goleak.VerifyNone(t) + + recoverable := atomic.Uint32{} + svr := httptest.NewServer(handler(t, http.StatusInternalServerError, func(wr *prompb.WriteRequest) { + })) + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushFrequency: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryBackoffAttempts: 1, + Connections: 1, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) { + recoverable.Add(uint32(s.Total5XX())) + }, func(s types.NetworkStats) {}) + require.NoError(t, err) + wr.Start() + defer wr.Stop() + for i := 0; i < 10; i++ { + send(t, wr, ctx) + } + require.Eventually(t, func() bool { + // We send 10 but each one gets retried once so 20 total. + return recoverable.Load() == 10*2 + }, 2*time.Second, 100*time.Millisecond) + time.Sleep(2 * time.Second) + // Ensure we dont get any more. + require.True(t, recoverable.Load() == 10*2) +} + +func TestNonRecoverable(t *testing.T) { + defer goleak.VerifyNone(t) + + nonRecoverable := atomic.Uint32{} + svr := httptest.NewServer(handler(t, http.StatusBadRequest, func(wr *prompb.WriteRequest) { + })) + + defer svr.Close() + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + + cc := types.ConnectionConfig{ + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushFrequency: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryBackoffAttempts: 1, + Connections: 1, + } + + logger := log.NewNopLogger() + wr, err := New(cc, logger, func(s types.NetworkStats) { + nonRecoverable.Add(uint32(s.TotalFailed())) + }, func(s types.NetworkStats) {}) + wr.Start() + defer wr.Stop() + require.NoError(t, err) + for i := 0; i < 10; i++ { + send(t, wr, ctx) + } + require.Eventually(t, func() bool { + return nonRecoverable.Load() == 10 + }, 2*time.Second, 100*time.Millisecond) + time.Sleep(2 * time.Second) + // Ensure we dont get any more. + require.True(t, nonRecoverable.Load() == 10) +} + +func send(t *testing.T, wr types.NetworkClient, ctx context.Context) { + ts := createSeries(t) + // The actual hash is only used for queueing into different buckets. + err := wr.SendSeries(ctx, ts) + require.NoError(t, err) +} + +func handler(t *testing.T, code int, callback func(wr *prompb.WriteRequest)) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf, err := io.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + decoded, err := snappy.Decode(nil, buf) + require.NoError(t, err) + + wr := &prompb.WriteRequest{} + err = wr.Unmarshal(decoded) + require.NoError(t, err) + callback(wr) + w.WriteHeader(code) + }) +} + +func createSeries(_ *testing.T) *types.TimeSeriesBinary { + ts := &types.TimeSeriesBinary{ + TS: time.Now().Unix(), + Value: 1, + Labels: []labels.Label{ + { + Name: "__name__", + Value: randSeq(10), + }, + }, + } + return ts +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/internal/component/prometheus/remote/queue/network/stats.go b/internal/component/prometheus/remote/queue/network/stats.go new file mode 100644 index 000000000..4b384f51f --- /dev/null +++ b/internal/component/prometheus/remote/queue/network/stats.go @@ -0,0 +1,126 @@ +package network + +import ( + "net/http" + + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" +) + +// recordStats determines what values to send to the stats function. This allows for any +// number of metrics/signals libraries to be used. Prometheus, OTel, and any other. +func recordStats(series []*types.TimeSeriesBinary, isMeta bool, stats func(s types.NetworkStats), r sendResult, bytesSent int) { + seriesCount := getSeriesCount(series) + histogramCount := getHistogramCount(series) + metadataCount := getMetadataCount(series) + switch { + case r.networkError: + stats(types.NetworkStats{ + Series: types.CategoryStats{ + NetworkSamplesFailed: seriesCount, + }, + Histogram: types.CategoryStats{ + NetworkSamplesFailed: histogramCount, + }, + Metadata: types.CategoryStats{ + NetworkSamplesFailed: metadataCount, + }, + }) + case r.successful: + // Need to grab the newest series. + var newestTS int64 + for _, ts := range series { + if ts.TS > newestTS { + newestTS = ts.TS + } + } + var sampleBytesSent int + var metaBytesSent int + // Each loop is explicitly a normal signal or metadata sender. + if isMeta { + metaBytesSent = bytesSent + } else { + sampleBytesSent = bytesSent + } + stats(types.NetworkStats{ + Series: types.CategoryStats{ + SeriesSent: seriesCount, + }, + Histogram: types.CategoryStats{ + SeriesSent: histogramCount, + }, + Metadata: types.CategoryStats{ + SeriesSent: metadataCount, + }, + MetadataBytes: metaBytesSent, + SeriesBytes: sampleBytesSent, + NewestTimestamp: newestTS, + }) + case r.statusCode == http.StatusTooManyRequests: + stats(types.NetworkStats{ + Series: types.CategoryStats{ + RetriedSamples: seriesCount, + RetriedSamples429: seriesCount, + }, + Histogram: types.CategoryStats{ + RetriedSamples: histogramCount, + RetriedSamples429: histogramCount, + }, + Metadata: types.CategoryStats{ + RetriedSamples: metadataCount, + RetriedSamples429: metadataCount, + }, + }) + case r.statusCode/100 == 5: + stats(types.NetworkStats{ + Series: types.CategoryStats{ + RetriedSamples5XX: seriesCount, + }, + Histogram: types.CategoryStats{ + RetriedSamples5XX: histogramCount, + }, + Metadata: types.CategoryStats{ + RetriedSamples: metadataCount, + }, + }) + case r.statusCode != 200: + stats(types.NetworkStats{ + Series: types.CategoryStats{ + FailedSamples: seriesCount, + }, + Histogram: types.CategoryStats{ + FailedSamples: histogramCount, + }, + Metadata: types.CategoryStats{ + FailedSamples: metadataCount, + }, + }) + } + +} + +func getSeriesCount(tss []*types.TimeSeriesBinary) int { + cnt := 0 + for _, ts := range tss { + // This is metadata + if isMetadata(ts) { + continue + } + if ts.Histograms.Histogram == nil && ts.Histograms.FloatHistogram == nil { + cnt++ + } + } + return cnt +} + +func getHistogramCount(tss []*types.TimeSeriesBinary) int { + cnt := 0 + for _, ts := range tss { + if isMetadata(ts) { + continue + } + if ts.Histograms.Histogram != nil || ts.Histograms.FloatHistogram != nil { + cnt++ + } + } + return cnt +} diff --git a/internal/component/prometheus/remote/queue/serialization/appender.go b/internal/component/prometheus/remote/queue/serialization/appender.go index b9248c00c..e2cdd5627 100644 --- a/internal/component/prometheus/remote/queue/serialization/appender.go +++ b/internal/component/prometheus/remote/queue/serialization/appender.go @@ -104,15 +104,15 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta // to ensure its efficient it makes sense to encode metadata into it. combinedLabels := l.Copy() combinedLabels = append(combinedLabels, labels.Label{ - Name: "__alloy_metadata_type__", + Name: types.MetaType, Value: string(m.Type), }) combinedLabels = append(combinedLabels, labels.Label{ - Name: "__alloy_metadata_help__", + Name: types.MetaHelp, Value: m.Help, }) combinedLabels = append(combinedLabels, labels.Label{ - Name: "__alloy_metadata_unit__", + Name: types.MetaUnit, Value: m.Unit, }) ts.Labels = combinedLabels diff --git a/internal/component/prometheus/remote/queue/types/network.go b/internal/component/prometheus/remote/queue/types/network.go new file mode 100644 index 000000000..024f8a312 --- /dev/null +++ b/internal/component/prometheus/remote/queue/types/network.go @@ -0,0 +1,32 @@ +package types + +import ( + "context" + "reflect" + "time" +) + +type NetworkClient interface { + Start() + Stop() + SendSeries(ctx context.Context, d *TimeSeriesBinary) error + SendMetadata(ctx context.Context, d *TimeSeriesBinary) error + UpdateConfig(ctx context.Context, cfg ConnectionConfig) error +} +type ConnectionConfig struct { + URL string + Username string + Password string + UserAgent string + Timeout time.Duration + RetryBackoff time.Duration + MaxRetryBackoffAttempts time.Duration + BatchCount int + FlushFrequency time.Duration + ExternalLabels map[string]string + Connections uint64 +} + +func (cc ConnectionConfig) Equals(bb ConnectionConfig) bool { + return reflect.DeepEqual(cc, bb) +} diff --git a/internal/component/prometheus/remote/queue/types/serialization.go b/internal/component/prometheus/remote/queue/types/serialization.go index 777705543..80b2282f7 100644 --- a/internal/component/prometheus/remote/queue/types/serialization.go +++ b/internal/component/prometheus/remote/queue/types/serialization.go @@ -10,6 +10,10 @@ import ( "go.uber.org/atomic" ) +const MetaType = "__alloy_metadata_type__" +const MetaUnit = "__alloy_metadata_unit__" +const MetaHelp = "__alloy_metadata_help__" + // SeriesGroup is the holder for TimeSeries, Metadata, and the strings array. // When serialized the Labels Key,Value array will be transformed into // LabelNames and LabelsValues that point to the index in Strings. @@ -20,6 +24,9 @@ type SeriesGroup struct { Metadata []*TimeSeriesBinary } +// TimeSeriesBinary is an optimized format for handling metrics and metadata. It should never be instantiated directly +// but instead use GetTimeSeriesFromPool and PutTimeSeriesSliceIntoPool. This allows us to reuse these objects and avoid +// allocations. type TimeSeriesBinary struct { // Labels are not serialized to msgp, instead we store separately a dictionary of strings and use `LabelNames` and `LabelValues` to refer to the dictionary by ID. Labels labels.Labels `msg:"-"` diff --git a/internal/component/prometheus/remote/queue/types/stats.go b/internal/component/prometheus/remote/queue/types/stats.go index c74f0953a..4107d8089 100644 --- a/internal/component/prometheus/remote/queue/types/stats.go +++ b/internal/component/prometheus/remote/queue/types/stats.go @@ -1,8 +1,279 @@ package types +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + type SerializerStats struct { SeriesStored int MetadataStored int Errors int NewestTimestamp int64 } + +type PrometheusStats struct { + // Network Stats + NetworkSeriesSent prometheus.Counter + NetworkFailures prometheus.Counter + NetworkRetries prometheus.Counter + NetworkRetries429 prometheus.Counter + NetworkRetries5XX prometheus.Counter + NetworkSentDuration prometheus.Histogram + NetworkErrors prometheus.Counter + NetworkNewestOutTimeStampSeconds prometheus.Gauge + + // Filequeue Stats + FilequeueInSeries prometheus.Counter + FilequeueNewestInTimeStampSeconds prometheus.Gauge + FilequeueErrors prometheus.Counter + + // Backwards compatibility metrics + SamplesTotal prometheus.Counter + HistogramsTotal prometheus.Counter + MetadataTotal prometheus.Counter + + FailedSamplesTotal prometheus.Counter + FailedHistogramsTotal prometheus.Counter + FailedMetadataTotal prometheus.Counter + + RetriedSamplesTotal prometheus.Counter + RetriedHistogramsTotal prometheus.Counter + RetriedMetadataTotal prometheus.Counter + + EnqueueRetriesTotal prometheus.Counter + SentBatchDuration prometheus.Histogram + HighestSentTimestamp prometheus.Gauge + + SentBytesTotal prometheus.Counter + MetadataBytesTotal prometheus.Counter + RemoteStorageInTimestamp prometheus.Gauge + RemoteStorageOutTimestamp prometheus.Gauge + RemoteStorageDuration prometheus.Histogram +} + +func NewStats(namespace, subsystem string, registry prometheus.Registerer) *PrometheusStats { + s := &PrometheusStats{ + FilequeueInSeries: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "filequeue_incoming", + }), + FilequeueNewestInTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "filequeue_incoming_timestamp_seconds", + }), + FilequeueErrors: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "filequeue_errors", + }), + NetworkNewestOutTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_timestamp_seconds", + }), + RemoteStorageDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "prometheus_remote_storage_queue_duration_seconds", + }), + NetworkSeriesSent: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_sent", + }), + NetworkFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_failed", + }), + NetworkRetries: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_retried", + }), + NetworkRetries429: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_retried_429", + }), + NetworkRetries5XX: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_retried_5xx", + }), + NetworkSentDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_duration_seconds", + NativeHistogramBucketFactor: 1.1, + }), + NetworkErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "network_errors", + }), + RemoteStorageOutTimestamp: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_remote_storage_queue_highest_sent_timestamp_seconds", + }), + RemoteStorageInTimestamp: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_remote_storage_highest_timestamp_in_seconds", + }), + SamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_samples_total", + Help: "Total number of samples sent to remote storage.", + }), + HistogramsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_histograms_total", + Help: "Total number of histograms sent to remote storage.", + }), + MetadataTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_metadata_total", + Help: "Total number of metadata sent to remote storage.", + }), + FailedSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_samples_failed_total", + Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", + }), + FailedHistogramsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_histograms_failed_total", + Help: "Total number of histograms which failed on send to remote storage, non-recoverable errors.", + }), + FailedMetadataTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_metadata_failed_total", + Help: "Total number of metadata entries which failed on send to remote storage, non-recoverable errors.", + }), + + RetriedSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_samples_retried_total", + Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", + }), + RetriedHistogramsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_histograms_retried_total", + Help: "Total number of histograms which failed on send to remote storage but were retried because the send error was recoverable.", + }), + RetriedMetadataTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_metadata_retried_total", + Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.", + }), + SentBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_sent_bytes_total", + Help: "The total number of bytes of data (not metadata) sent by the queue after compression. Note that when exemplars over remote write is enabled the exemplars included in a remote write request count towards this metric.", + }), + MetadataBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_metadata_bytes_total", + Help: "The total number of bytes of metadata sent by the queue after compression.", + }), + } + registry.MustRegister( + s.NetworkSentDuration, + s.NetworkRetries5XX, + s.NetworkRetries429, + s.NetworkRetries, + s.NetworkFailures, + s.NetworkSeriesSent, + s.NetworkErrors, + s.NetworkNewestOutTimeStampSeconds, + s.FilequeueInSeries, + s.FilequeueErrors, + s.FilequeueNewestInTimeStampSeconds, + ) + return s +} + +func (s *PrometheusStats) BackwardsCompatibility(registry prometheus.Registerer) { + registry.MustRegister( + s.RemoteStorageDuration, + s.RemoteStorageInTimestamp, + s.RemoteStorageOutTimestamp, + s.SamplesTotal, + s.HistogramsTotal, + s.MetadataTotal, + s.FailedSamplesTotal, + s.FailedHistogramsTotal, + s.FailedMetadataTotal, + s.RetriedSamplesTotal, + s.RetriedHistogramsTotal, + s.RetriedMetadataTotal, + s.SentBytesTotal, + s.MetadataBytesTotal, + ) +} + +func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { + s.NetworkSeriesSent.Add(float64(stats.TotalSent())) + s.NetworkRetries.Add(float64(stats.TotalRetried())) + s.NetworkFailures.Add(float64(stats.TotalFailed())) + s.NetworkRetries429.Add(float64(stats.Total429())) + s.NetworkRetries5XX.Add(float64(stats.Total5XX())) + s.NetworkSentDuration.Observe(stats.SendDuration.Seconds()) + s.RemoteStorageDuration.Observe(stats.SendDuration.Seconds()) + // The newest timestamp is no always sent. + if stats.NewestTimestamp != 0 { + s.RemoteStorageOutTimestamp.Set(float64(stats.NewestTimestamp)) + } + + s.SamplesTotal.Add(float64(stats.Series.SeriesSent)) + s.MetadataTotal.Add(float64(stats.Metadata.SeriesSent)) + s.HistogramsTotal.Add(float64(stats.Histogram.SeriesSent)) + + s.FailedSamplesTotal.Add(float64(stats.Series.FailedSamples)) + s.FailedMetadataTotal.Add(float64(stats.Metadata.FailedSamples)) + s.FailedHistogramsTotal.Add(float64(stats.Histogram.FailedSamples)) + + s.RetriedSamplesTotal.Add(float64(stats.Series.RetriedSamples)) + s.RetriedHistogramsTotal.Add(float64(stats.Histogram.RetriedSamples)) + s.RetriedMetadataTotal.Add(float64(stats.Metadata.RetriedSamples)) + + s.MetadataBytesTotal.Add(float64(stats.MetadataBytes)) + s.SentBytesTotal.Add(float64(stats.SeriesBytes)) +} + +func (s *PrometheusStats) UpdateFileQueue(stats SerializerStats) { + s.FilequeueInSeries.Add(float64(stats.SeriesStored)) + s.FilequeueErrors.Add(float64(stats.Errors)) + if stats.NewestTimestamp != 0 { + s.FilequeueNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) + s.RemoteStorageInTimestamp.Set(float64(stats.NewestTimestamp)) + } +} + +type NetworkStats struct { + Series CategoryStats + Histogram CategoryStats + Metadata CategoryStats + SendDuration time.Duration + NewestTimestamp int64 + SeriesBytes int + MetadataBytes int +} + +func (ns NetworkStats) TotalSent() int { + return ns.Series.SeriesSent + ns.Histogram.SeriesSent + ns.Metadata.SeriesSent +} + +func (ns NetworkStats) TotalRetried() int { + return ns.Series.RetriedSamples + ns.Histogram.RetriedSamples + ns.Metadata.RetriedSamples +} + +func (ns NetworkStats) TotalFailed() int { + return ns.Series.FailedSamples + ns.Histogram.FailedSamples + ns.Metadata.FailedSamples +} + +func (ns NetworkStats) Total429() int { + return ns.Series.RetriedSamples429 + ns.Histogram.RetriedSamples429 + ns.Metadata.RetriedSamples429 +} + +func (ns NetworkStats) Total5XX() int { + return ns.Series.RetriedSamples5XX + ns.Histogram.RetriedSamples5XX + ns.Metadata.RetriedSamples5XX +} + +type CategoryStats struct { + RetriedSamples int + RetriedSamples429 int + RetriedSamples5XX int + SeriesSent int + FailedSamples int + NetworkSamplesFailed int +}