From 6ceb125f49a2cc76335844458e095b53be892263 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 19 Dec 2024 09:17:14 -0500 Subject: [PATCH 1/2] New config and logic to only search RF1 blocks after a given date, and stop all flushing from ingesters --- modules/frontend/search_sharder.go | 15 ++++++++++++++- modules/ingester/config.go | 2 ++ modules/ingester/flush.go | 12 +++++++----- modules/ingester/ingester.go | 16 +++++++++------- modules/querier/config.go | 1 + modules/querier/querier.go | 3 ++- tempodb/encoding/common/interfaces.go | 24 ++++++++++++------------ tempodb/tempodb.go | 17 +++++++++++------ tempodb/tempodb_test.go | 4 ++-- 9 files changed, 60 insertions(+), 34 deletions(-) diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index ddad508d30f..ef6086ffb45 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -35,6 +35,7 @@ type SearchSharderConfig struct { QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"` QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"` IngesterShards int `yaml:"ingester_shards,omitempty"` + RF1After time.Time `yaml:"rf1_after"` } type asyncSearchSharder struct { @@ -138,13 +139,25 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin // blockMetas returns all relevant blockMetas given a start/end func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta { + var rfCheck func(m *backend.BlockMeta) bool + if s.cfg.RF1After.IsZero() { + rfCheck = func(m *backend.BlockMeta) bool { + return m.ReplicationFactor == backend.DefaultReplicationFactor + } + } else { + rfCheck = func(m *backend.BlockMeta) bool { + return (m.ReplicationFactor == backend.DefaultReplicationFactor && m.StartTime.Before(s.cfg.RF1After)) || + (m.ReplicationFactor == 1 && m.StartTime.After(s.cfg.RF1After)) + } + } + // reduce metas to those in the requested range allMetas := s.reader.BlockMetas(tenantID) metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck for _, m := range allMetas { if m.StartTime.Unix() <= end && m.EndTime.Unix() >= start && - m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1) + rfCheck(m) { metas = append(metas, m) } } diff --git a/modules/ingester/config.go b/modules/ingester/config.go index f64d9bbd6cf..412367f7436 100644 --- a/modules/ingester/config.go +++ b/modules/ingester/config.go @@ -29,6 +29,7 @@ type Config struct { CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"` OverrideRingKey string `yaml:"override_ring_key"` FlushAllOnShutdown bool `yaml:"flush_all_on_shutdown"` + FlushObjectStorage bool `yaml:"flush_object_storage"` // This config is dynamically injected because defined outside the ingester config. DedicatedColumns backend.DedicatedColumns `yaml:"-"` @@ -49,6 +50,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.FlushCheckPeriod = 10 * time.Second cfg.FlushOpTimeout = 5 * time.Minute cfg.FlushAllOnShutdown = false + cfg.FlushObjectStorage = true f.DurationVar(&cfg.MaxTraceIdle, prefix+".trace-idle-period", 10*time.Second, "Duration after which to consider a trace complete if no spans have been received") f.DurationVar(&cfg.MaxBlockDuration, prefix+".max-block-duration", 30*time.Minute, "Maximum duration which the head block can be appended to before cutting it.") diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 9bb3afd2291..f9f9d9b953a 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -286,11 +286,13 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, // add a flushOp for the block we just completed // No delay - i.enqueue(&flushOp{ - kind: opKindFlush, - userID: instance.instanceID, - blockID: op.blockID, - }, false) + if i.cfg.FlushObjectStorage { + i.enqueue(&flushOp{ + kind: opKindFlush, + userID: instance.instanceID, + blockID: op.blockID, + }, false) + } return false, nil } diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index eac6c8c378d..0ed2fd7610d 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -489,13 +489,15 @@ func (i *Ingester) rediscoverLocalBlocks() error { } // Requeue needed flushes - for _, b := range newBlocks { - if b.FlushedTime().IsZero() { - i.enqueue(&flushOp{ - kind: opKindFlush, - userID: t, - blockID: (uuid.UUID)(b.BlockMeta().BlockID), - }, i.replayJitter) + if i.cfg.FlushObjectStorage { + for _, b := range newBlocks { + if b.FlushedTime().IsZero() { + i.enqueue(&flushOp{ + kind: opKindFlush, + userID: t, + blockID: (uuid.UUID)(b.BlockMeta().BlockID), + }, i.replayJitter) + } } } } diff --git a/modules/querier/config.go b/modules/querier/config.go index 824622cc3c2..c70f5e008b5 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -43,6 +43,7 @@ type SearchConfig struct { type TraceByIDConfig struct { QueryTimeout time.Duration `yaml:"query_timeout"` + RF1After time.Time `yaml:"rf1_after"` } type MetricsConfig struct { diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 513f1fd0973..6f137b2041b 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -286,7 +286,8 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque )) opts := common.DefaultSearchOptionsWithMaxBytes(maxBytes) - opts.BlockReplicationFactor = backend.DefaultReplicationFactor + opts.RF1After = q.cfg.TraceByID.RF1After + partialTraces, blockErrs, err := q.store.Find(ctx, userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd, opts) if err != nil { retErr := fmt.Errorf("error querying store in Querier.FindTraceByID: %w", err) diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index ef3cd6b9969..831970ffcc9 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -2,6 +2,7 @@ package common import ( "context" + "time" "github.com/go-kit/log" @@ -35,14 +36,14 @@ type Searcher interface { } type SearchOptions struct { - ChunkSizeBytes uint32 // Buffer size to read from backend storage. - StartPage int // Controls searching only a subset of the block. Which page to begin searching at. - TotalPages int // Controls searching only a subset of the block. How many pages to search. - MaxBytes int // Max allowable trace size in bytes. Traces exceeding this are not searched. - PrefetchTraceCount int // How many traces to prefetch async. - ReadBufferCount int - ReadBufferSize int - BlockReplicationFactor int // Only blocks with this replication factor will be searched. Set to 1 to search generator blocks (RF=1). + ChunkSizeBytes uint32 // Buffer size to read from backend storage. + StartPage int // Controls searching only a subset of the block. Which page to begin searching at. + TotalPages int // Controls searching only a subset of the block. How many pages to search. + MaxBytes int // Max allowable trace size in bytes. Traces exceeding this are not searched. + PrefetchTraceCount int // How many traces to prefetch async. + ReadBufferCount int + ReadBufferSize int + RF1After time.Time // Only blocks with this replication factor == 1 will be searched after this date. } // DefaultSearchOptions is used in a lot of places such as local ingester searches. It is important @@ -52,10 +53,9 @@ type SearchOptions struct { // tempodb.SearchConfig{}.ApplyToOptions(&searchOpts). we should consolidate these. func DefaultSearchOptions() SearchOptions { return SearchOptions{ - ReadBufferCount: 32, - ReadBufferSize: 1024 * 1024, - ChunkSizeBytes: 4 * 1024 * 1024, - BlockReplicationFactor: backend.DefaultReplicationFactor, + ReadBufferCount: 32, + ReadBufferSize: 1024 * 1024, + ChunkSizeBytes: 4 * 1024 * 1024, } } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 86ceebed9bc..2cb0180614e 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -308,13 +308,13 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, compactedBlocksSearched := 0 for _, b := range blocklist { - if includeBlock(b, id, blockStartBytes, blockEndBytes, timeStart, timeEnd, opts.BlockReplicationFactor) { + if includeBlock(b, id, blockStartBytes, blockEndBytes, timeStart, timeEnd, opts.RF1After) { copiedBlocklist = append(copiedBlocklist, b) blocksSearched++ } } for _, c := range compactedBlocklist { - if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll, timeStart, timeEnd, opts.BlockReplicationFactor) { + if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll, timeStart, timeEnd, opts.RF1After) { copiedBlocklist = append(copiedBlocklist, &c.BlockMeta) compactedBlocksSearched++ } @@ -599,7 +599,7 @@ func (rw *readerWriter) pollBlocklist() { } // includeBlock indicates whether a given block should be included in a backend search -func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte, timeStart, timeEnd int64, replicationFactor int) bool { +func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte, timeStart, timeEnd int64, rf1After time.Time) bool { // todo: restore this functionality once it works. min/max ids are currently not recorded // https://github.com/grafana/tempo/issues/1903 // correctly in a block @@ -620,16 +620,21 @@ func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte return false } - return b.ReplicationFactor == uint32(replicationFactor) + if rf1After.IsZero() { + return b.ReplicationFactor == backend.DefaultReplicationFactor + } else { + return (b.StartTime.Before(rf1After) && b.ReplicationFactor == backend.DefaultReplicationFactor) || + (b.StartTime.After(rf1After) && b.ReplicationFactor == 1) + } } // if block is compacted within lookback period, and is within shard ranges, include it in search -func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart, blockEnd []byte, poll time.Duration, timeStart, timeEnd int64, replicationFactor int) bool { +func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart, blockEnd []byte, poll time.Duration, timeStart, timeEnd int64, rf1After time.Time) bool { lookback := time.Now().Add(-(2 * poll)) if c.CompactedTime.Before(lookback) { return false } - return includeBlock(&c.BlockMeta, id, blockStart, blockEnd, timeStart, timeEnd, replicationFactor) + return includeBlock(&c.BlockMeta, id, blockStart, blockEnd, timeStart, timeEnd, rf1After) } // createLegacyCache uses the config to return a cache and a list of roles. diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index d47374f9047..c7c626fcc97 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -416,7 +416,7 @@ func TestIncludeBlock(t *testing.T) { e, err := tc.blockEnd.MarshalBinary() require.NoError(t, err) - assert.Equal(t, tc.expected, includeBlock(tc.meta, tc.searchID, s, e, tc.start, tc.end, 0)) + assert.Equal(t, tc.expected, includeBlock(tc.meta, tc.searchID, s, e, tc.start, tc.end, time.Time{})) }) } } @@ -487,7 +487,7 @@ func TestIncludeCompactedBlock(t *testing.T) { e, err := tc.blockEnd.MarshalBinary() require.NoError(t, err) - assert.Equal(t, tc.expected, includeCompactedBlock(tc.meta, tc.searchID, s, e, blocklistPoll, tc.start, tc.end, 0)) + assert.Equal(t, tc.expected, includeCompactedBlock(tc.meta, tc.searchID, s, e, blocklistPoll, tc.start, tc.end, time.Time{})) }) } } From eb8cdfcec526a730b4d5ba234f21a268c0b2d50c Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 19 Dec 2024 10:16:04 -0500 Subject: [PATCH 2/2] lint --- tempodb/tempodb.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 2cb0180614e..40545fe213a 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -622,10 +622,10 @@ func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte if rf1After.IsZero() { return b.ReplicationFactor == backend.DefaultReplicationFactor - } else { - return (b.StartTime.Before(rf1After) && b.ReplicationFactor == backend.DefaultReplicationFactor) || - (b.StartTime.After(rf1After) && b.ReplicationFactor == 1) } + + return (b.StartTime.Before(rf1After) && b.ReplicationFactor == backend.DefaultReplicationFactor) || + (b.StartTime.After(rf1After) && b.ReplicationFactor == 1) } // if block is compacted within lookback period, and is within shard ranges, include it in search