Skip to content

Commit

Permalink
propagate changes to old parquet encondings
Browse files Browse the repository at this point in the history
  • Loading branch information
javiermolinar committed Dec 20, 2024
1 parent f3d66cb commit e5c6c05
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 9 deletions.
2 changes: 1 addition & 1 deletion modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
}(time.Now())

// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime, sectionStartTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionStartTime, sectionEndTime, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition int64, endSectionTime, startSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition int64, startSectionTime, endSectionTime time.Time, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
Expand Down
2 changes: 1 addition & 1 deletion modules/blockbuilder/partition_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func getPartitionWriter(t *testing.T) *writer {
})
require.NoError(t, err)

return newPartitionSectionWriter(logger, 1, endTime, startTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
return newPartitionSectionWriter(logger, 1, startTime, endTime, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
}

func TestPushBytes(t *testing.T) {
Expand Down
4 changes: 1 addition & 3 deletions tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,13 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error {
if err != nil {
return fmt.Errorf("error preparing trace for read: %w", err)
}

start, end = b.adjustTimeRangeForSlack(start, end)
return b.AppendTrace(id, trace, start, end)
}

func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32) error {
b.buffer = traceToParquet(id, trace, b.buffer)

start, end = b.adjustTimeRangeForSlack(start, end)

// add to current
_, err := b.writer.Write([]*Trace{b.buffer})
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions tempodb/encoding/vparquet3/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error {
if err != nil {
return fmt.Errorf("error preparing trace for read: %w", err)
}

start, end = b.adjustTimeRangeForSlack(start, end)
return b.AppendTrace(id, trace, start, end)
}

Expand All @@ -339,8 +339,6 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal)
}

start, end = b.adjustTimeRangeForSlack(start, end)

// add to current
_, err := b.writer.Write([]*Trace{b.buffer})
if err != nil {
Expand Down

0 comments on commit e5c6c05

Please sign in to comment.