Skip to content

Commit

Permalink
Merge pull request #122 from vinted/upd8_thanos_1111
Browse files Browse the repository at this point in the history
*: pull fixes
  • Loading branch information
GiedriusS authored Nov 11, 2024
2 parents 6e5ce60 + f5514c8 commit 45dd5cc
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add <labelname> != "" to fetch less postings.
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.
- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals.
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func runCompact(
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
})

var syncMetasTimeout = conf.waitInterval
if !conf.wait {
syncMetasTimeout = 0
}
sy, err = compact.NewMetaSyncer(
logger,
reg,
Expand All @@ -297,6 +302,7 @@ func runCompact(
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
syncMetasTimeout,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
0,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down Expand Up @@ -1413,6 +1414,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
0,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
21 changes: 18 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Syncer struct {
metrics *SyncerMetrics
duplicateBlocksFilter block.DeduplicateFilter
ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter
syncMetasTimeout time.Duration

g singleflight.Group
}
Expand Down Expand Up @@ -101,15 +102,23 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) {
return NewMetaSyncerWithMetrics(logger, NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), bkt, fetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter)
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, syncMetasTimeout time.Duration) (*Syncer, error) {
return NewMetaSyncerWithMetrics(logger,
NewSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks),
bkt,
fetcher,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
syncMetasTimeout,
)
}

func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter) (*Syncer, error) {
func NewMetaSyncerWithMetrics(logger log.Logger, metrics *SyncerMetrics, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, syncMetasTimeout time.Duration) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
syncMetasTimeout: syncMetasTimeout,
logger: logger,
bkt: bkt,
fetcher: fetcher,
Expand Down Expand Up @@ -138,6 +147,12 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {

// SyncMetas synchronizes local state of block metas with what we have in the bucket.
func (s *Syncer) SyncMetas(ctx context.Context) error {
var cancel func() = func() {}
if s.syncMetasTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.syncMetasTimeout)
}
defer cancel()

type metasContainer struct {
metas map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down Expand Up @@ -209,7 +209,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
})
testutil.Ok(t, err)

sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 0)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,10 +1688,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.seriesBlocksQueried.WithLabelValues(tenant).Observe(float64(stats.blocksQueried))
}

lt := NewProxyResponseLoserTree(respSets...)
defer lt.Close()
// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
set := NewResponseDeduplicator(lt)
i := 0
for set.Next() {
i++
Expand Down

0 comments on commit 45dd5cc

Please sign in to comment.