From d83013adf64a47ac099fceb812f9f057d706142c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 6 Nov 2024 16:26:54 +0200 Subject: [PATCH 1/2] compact: add SyncMetas() timeout (#7887) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add wait_interval*3 timeout to SyncMetas(). We had an incident in production where object storage had had some problems and the syncer got stuck due to no timeout. The timeout value is arbitrary but just exists so that it wouldn't get stuck for eternity. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/compact.go | 6 ++++++ cmd/thanos/tools_bucket.go | 2 ++ pkg/compact/compact.go | 21 ++++++++++++++++++--- pkg/compact/compact_e2e_test.go | 6 +++--- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 06d3ffaaa3..3a5f04f639 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -288,6 +288,11 @@ func runCompact( cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { api.SetLoaded(blocks, err) }) + + var syncMetasTimeout = conf.waitInterval + if !conf.wait { + syncMetasTimeout = 0 + } sy, err = compact.NewMetaSyncer( logger, reg, @@ -297,6 +302,7 @@ func runCompact( ignoreDeletionMarkFilter, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), compactMetrics.garbageCollectedBlocks, + syncMetasTimeout, ) if err != nil { return errors.Wrap(err, "create syncer") diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 7e711168cf..7a09dd23a2 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -870,6 +870,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat ignoreDeletionMarkFilter, stubCounter, stubCounter, + 0, ) if err != nil { return errors.Wrap(err, "create syncer") @@ -1413,6 +1414,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P ignoreDeletionMarkFilter, stubCounter, stubCounter, + 0, ) if err != nil { return errors.Wrap(err, "create syncer") diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7f08297671..a20544b2f4 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -61,6 +61,7 @@ type Syncer struct { metrics *SyncerMetrics duplicateBlocksFilter block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter + syncMetasTimeout time.Duration g singleflight.Group } @@ -101,15 +102,23 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { - return NewMetaSyncerWithMetrics(logger, NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), bkt, fetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter) +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, syncMetasTimeout time.Duration) (*Syncer, error) { + return NewMetaSyncerWithMetrics(logger, + NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), + bkt, + fetcher, + duplicateBlocksFilter, + ignoreDeletionMarkFilter, + syncMetasTimeout, + ) } -func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter) (*Syncer, error) { +func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, syncMetasTimeout time.Duration) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ + syncMetasTimeout: syncMetasTimeout, logger: logger, bkt: bkt, fetcher: fetcher, @@ -138,6 +147,12 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { // SyncMetas synchronizes local state of block metas with what we have in the bucket. func (s *Syncer) SyncMetas(ctx context.Context) error { + var cancel func() = func() {} + if s.syncMetasTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, s.syncMetasTimeout) + } + defer cancel() + type metasContainer struct { metas map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index e4f146e6eb..c0232547ca 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -106,7 +106,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -209,7 +209,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) @@ -493,7 +493,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T }) testutil.Ok(t, err) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0) testutil.Ok(t, err) // Do one initial synchronization with the bucket. From f5514c8045ab8de2396912b0379d9a5ca2909a96 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 7 Nov 2024 12:00:47 +0100 Subject: [PATCH 2/2] Merge pull request #7885 from fpetkovski/close-loser-tree Fix bug in Bucket Series --- CHANGELOG.md | 4 ++++ pkg/store/bucket.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e1314b242..1ef69c15ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add != "" to fetch less postings. - [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries - [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796. +- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints. +- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging. +- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals. +- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call. ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7ab47f4cbe..eec1de1005 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1688,10 +1688,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.seriesBlocksQueried.WithLabelValues(tenant).Observe(float64(stats.blocksQueried)) } + lt := NewProxyResponseLoserTree(respSets...) + defer lt.Close() // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...)) + set := NewResponseDeduplicator(lt) i := 0 for set.Next() { i++