Skip to content

Commit

Permalink
Merge pull request #95 from vinted/compactor_gc
Browse files Browse the repository at this point in the history
compactor: hold lock for a shorter amount of time
  • Loading branch information
GiedriusS authored Apr 5, 2024
2 parents f5b0c74 + 75b1ee2 commit f779b38
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
8 changes: 5 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
metrics.SyncFailures.Inc()
}
}()
metrics.Syncs.Inc()
metrics.ResetTx()

// Run this in thread safe run group.
// TODO(bwplotka): Consider custom singleflight with ttl.
Expand Down Expand Up @@ -617,7 +615,6 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
}

metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas)))
metrics.Submit()

if len(resp.metaErrs) > 0 {
return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view")
Expand Down Expand Up @@ -650,6 +647,9 @@ type MetaFetcher struct {
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.Syncs.Inc()
f.metrics.ResetTx()

metas, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters)
if f.listener != nil {
blocks := make([]metadata.Meta, 0, len(metas))
Expand All @@ -658,6 +658,8 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
}
f.listener(blocks, err)
}

f.metrics.Submit()
return metas, partial, err
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/groupcache/singleflight"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -59,6 +60,8 @@ type Syncer struct {
metrics *SyncerMetrics
duplicateBlocksFilter block.DeduplicateFilter
ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter

g singleflight.Group
}

// SyncerMetrics holds metrics tracked by the syncer. This struct and its fields are exported
Expand Down Expand Up @@ -134,15 +137,22 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {

// SyncMetas synchronizes local state of block metas with what we have in the bucket.
func (s *Syncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()
type metasContainer struct {
metas map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
}

metas, partial, err := s.fetcher.Fetch(ctx)
container, err := s.g.Do("", func() (interface{}, error) {
metas, partial, err := s.fetcher.Fetch(ctx)
return metasContainer{metas, partial}, err
})
if err != nil {
return retry(err)
}
s.blocks = metas
s.partial = partial
s.mtx.Lock()
s.blocks = container.(metasContainer).metas
s.partial = container.(metasContainer).partial
s.mtx.Unlock()
return nil
}

Expand Down Expand Up @@ -171,9 +181,6 @@ func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta {
// block with a higher compaction level.
// Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
func (s *Syncer) GarbageCollect(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

begin := time.Now()

// Ignore filter exists before deduplicate filter.
Expand Down Expand Up @@ -208,7 +215,9 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {

// Immediately update our in-memory state so no further call to SyncMetas is needed
// after running garbage collection.
s.mtx.Lock()
delete(s.blocks, id)
s.mtx.Unlock()
s.metrics.GarbageCollectedBlocks.Inc()
}
s.metrics.GarbageCollections.Inc()
Expand Down

0 comments on commit f779b38

Please sign in to comment.