Skip to content

Commit

Permalink
Deterministically build blocks for partition sections (#4327)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapno authored Nov 14, 2024
1 parent 63cbc9b commit 7a1f3c0
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 82 deletions.
56 changes: 34 additions & 22 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time)
continue
}

err = b.consumePartition(ctx, partition, partitionLag, cycleEndTime)
if err != nil {
return fmt.Errorf("failed to consume partition: %w", err)
if err = b.consumePartition(ctx, partition, partitionLag, cycleEndTime); err != nil {
_ = level.Error(b.logger).Log("msg", "failed to consume partition", "partition", partition, "err", err)
}
}
return nil
Expand All @@ -196,26 +195,41 @@ func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time)
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, partitionLag kadm.GroupMemberLag, cycleEndTime time.Time) error {
level.Info(b.logger).Log("msg", "consuming partition", "partition", partition)

partitionProcessor := newPartitionProcessor(b.logger, b.cfg.blockConfig, b.overrides, b.wal, b.enc)

sectionEndTime := cycleEndTime
commitRecTs := time.UnixMilli(max(partitionLag.Commit.At, b.fallbackOffsetMillis))
if sectionEndTime.Sub(commitRecTs) > time.Duration(1.5*float64(b.cfg.ConsumeCycleDuration)) {
// We're lagging behind or there is no commit, we need to consume in smaller sections.
sectionEndTime, _ = nextCycleEnd(commitRecTs, b.cfg.ConsumeCycleDuration)
}
// Continue consuming in sections until we're caught up.
for !sectionEndTime.After(cycleEndTime) {
err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag, partitionProcessor)
newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag)
if err != nil {
return fmt.Errorf("failed to consume partition section: %w", err)
}
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeCycleDuration)
if newCommitAt > partitionLag.Commit.At {
// We've committed a new offset, so we need to update the lag.
partitionLag.Commit.At = newCommitAt
partitionLag.Lag = partitionLag.End.Offset - newCommitAt
}
}
return nil
}

func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag, p partitionWriter) error {
level.Info(b.logger).Log("msg", "consuming partition section", "partition", partition, "section_end", sectionEndTime)
func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) {
level.Info(b.logger).Log(
"msg", "consuming partition section",
"partition", partition,
"section_end", sectionEndTime,
"commit_offset", lag.Commit.At,
"start_offset", lag.Start.Offset,
"end_offset", lag.End.Offset,
"lag", lag.Lag,
)

// TODO - Review what ts is used here
writer := newPartitionSectionWriter(b.logger, sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand All @@ -236,7 +250,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
consumerLoop:
for recOffset := int64(-1); recOffset < lag.End.Offset-1; {
if err := context.Cause(ctx); err != nil {
return err
return lag.Commit.At, err
}

// PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands,
Expand All @@ -253,6 +267,7 @@ consumerLoop:
for recIter := fetches.RecordIter(); !recIter.Done(); {
rec := recIter.Next()
recOffset = rec.Offset
level.Info(b.logger).Log("msg", "processing record", "partition", rec.Partition, "offset", rec.Offset, "timestamp", rec.Timestamp)

if firstRec == nil {
firstRec = rec
Expand All @@ -264,10 +279,10 @@ consumerLoop:
break consumerLoop
}

err := b.pushTraces(rec.Key, rec.Value, p) // TODO - Batch pushes by tenant
err := b.pushTraces(rec.Key, rec.Value, writer) // TODO - Batch pushes by tenant
if err != nil {
// All "non-terminal" errors are handled by the TSDBBuilder.
return fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err)
return lag.Commit.At, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err)
}
lastRec = rec
}
Expand All @@ -276,17 +291,17 @@ consumerLoop:
// Nothing was consumed from Kafka at all.
if firstRec == nil {
level.Info(b.logger).Log("msg", "no records were consumed")
return nil
return lag.Commit.At, nil
}

// No records were processed for this cycle.
if lastRec == nil {
level.Info(b.logger).Log("msg", "nothing to commit due to first record has a timestamp greater than this section end", "first_rec_offset", firstRec.Offset, "first_rec_ts", firstRec.Timestamp)
return nil
return lag.Commit.At, nil
}

if err := p.Flush(ctx, b.writer); err != nil {
return fmt.Errorf("failed to flush partition to object storage: %w", err)
if err := writer.flush(ctx, b.writer); err != nil {
return lag.Commit.At, fmt.Errorf("failed to flush partition to object storage: %w", err)
}

commit := kadm.Offset{
Expand All @@ -295,7 +310,7 @@ consumerLoop:
At: lastRec.Offset + 1, // offset+1 means everything up to (including) the offset was processed
LeaderEpoch: lastRec.LeaderEpoch,
}
return b.commitState(ctx, commit)
return commit.At, b.commitState(ctx, commit)
}

func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) error {
Expand All @@ -304,26 +319,23 @@ func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) erro

// TODO - Commit with backoff
adm := kadm.NewClient(b.kafkaClient)
res, err := adm.CommitOffsets(ctx, b.cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)
err := adm.CommitAllOffsets(ctx, b.cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)
if err != nil {
return fmt.Errorf("failed to commit offsets: %w", err)
}
if res.Error() != nil {
return fmt.Errorf("commit offsets error: %w", res.Error())
}
level.Info(b.logger).Log("msg", "successfully committed offset to kafka", "offset", commit.At)

return nil
}

func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionWriter) error {
func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error {
req, err := b.decoder.Decode(reqBytes)
if err != nil {
return fmt.Errorf("failed to decode trace: %w", err)
}
defer b.decoder.Reset()

return p.PushBytes(string(tenantBytes), req)
return p.pushBytes(string(tenantBytes), req)
}

func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
Expand Down
89 changes: 87 additions & 2 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockbuilder

import (
"context"
"crypto/rand"
"errors"
"testing"
"time"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/atomic"
Expand Down Expand Up @@ -71,6 +73,89 @@ func TestBlockbuilder(t *testing.T) {
}, time.Minute, time.Second)
}

// FIXME - Test is unstable and will fail if records cross two consumption cycles,
//
// because it's asserting that there is exactly two commits, one of which fails.
// It can be 3 commits if the records cross two consumption cycles.
// 🤷
func TestBlockbuilderDeterministicComsumption(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic")
t.Cleanup(k.Close)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
k.KeepControl()
defer kafkaCommits.Add(1)

if kafkaCommits.Load() == 0 { // First commit fails
res := kmsg.NewOffsetCommitResponse()
res.Version = req.GetVersion()
res.Topics = []kmsg.OffsetCommitResponseTopic{
{
Topic: testTopic,
Partitions: []kmsg.OffsetCommitResponseTopicPartition{
{
Partition: 0,
ErrorCode: kerr.RebalanceInProgress.Code,
},
},
},
}
return &res, nil, true
}

return nil, nil, false
})

store := newStore(t)
store.EnablePolling(ctx, nil)
cfg := blockbuilderConfig(t, address)
logger := test.NewTestingLogger(t)

client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
// Send for 1 second, <1 consumption cycles
timer := time.NewTimer(1 * time.Second)
sendLoop:
for {
select {
case <-timer.C:
break sendLoop
default:
traceID := make([]byte, 16)
_, err := rand.Read(traceID)
require.NoError(t, err)

req := test.MakePushBytesRequest(t, 10, traceID)
records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000)
require.NoError(t, err)

res := client.ProduceSync(ctx, records...)
require.NoError(t, res.FirstErr())

time.Sleep(100 * time.Millisecond)
}
}

b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
})

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() == 2 // First commit fails, second commit succeeds
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1 // Only one block should be written
}, time.Minute, time.Second)
}

func blockbuilderConfig(_ *testing.T, address string) Config {
cfg := Config{}
flagext.DefaultValues(&cfg)
Expand All @@ -83,7 +168,7 @@ func blockbuilderConfig(_ *testing.T, address string) Config {
cfg.IngestStorageConfig.Kafka.ConsumerGroup = "test-consumer-group"

cfg.AssignedPartitions = []int32{0}
cfg.LookbackOnNoCommit = 1 * time.Minute
cfg.LookbackOnNoCommit = 15 * time.Second
cfg.ConsumeCycleDuration = 5 * time.Second

return cfg
Expand All @@ -108,7 +193,7 @@ func newStore(t *testing.T) storage.Store {
WAL: &wal.Config{
Filepath: tmpDir,
},
BlocklistPoll: 5 * time.Second,
BlocklistPoll: 50 * time.Second,
BlocklistPollFallback: true,
},
}, nil, test.NewTestingLogger(t))
Expand Down
32 changes: 18 additions & 14 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
)

type partitionWriter interface {
PushBytes(tenant string, req *tempopb.PushBytesRequest) error
Flush(ctx context.Context, store tempodb.Writer) error
type partitionSectionWriter interface {
pushBytes(tenant string, req *tempopb.PushBytesRequest) error
flush(ctx context.Context, store tempodb.Writer) error
}

type writer struct {
logger log.Logger

blockCfg BlockConfig
blockCfg BlockConfig
cycleEndTs int64

overrides Overrides
wal *wal.WAL
Expand All @@ -31,22 +33,24 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionProcessor(logger log.Logger, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
m: make(map[string]*tenantStore),
logger: logger,
cycleEndTs: cycleEndTs,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
m: make(map[string]*tenantStore),
}
}

func (p *writer) PushBytes(tenant string, req *tempopb.PushBytesRequest) error {
func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error {
level.Info(p.logger).Log(
"msg", "pushing bytes",
"tenant", tenant,
"num_traces", len(req.Traces),
"id", util.TraceIDToHexString(req.Ids[0].Slice),
)

i, err := p.instanceForTenant(tenant)
Expand All @@ -68,7 +72,7 @@ func (p *writer) PushBytes(tenant string, req *tempopb.PushBytesRequest) error {
return nil
}

func (p *writer) Flush(ctx context.Context, store tempodb.Writer) error {
func (p *writer) flush(ctx context.Context, store tempodb.Writer) error {
// TODO - Retry with backoff?
for _, i := range p.m {
level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID)
Expand All @@ -84,7 +88,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
return i, nil
}

i, err := newTenantStore(tenant, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7a1f3c0

Please sign in to comment.