Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tempo-cli: support dropping multiple traces in a single operation #4266

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## main / unreleased
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [CHANGE] **BREAKING CHANGE** The Tempo serverless is now deprecated and will be removed in an upcoming release [#4017](https://github.com/grafana/tempo/pull/4017/) @electron0zero
* [CHANGE] **BREAKING CHANGE** Change the AWS Lambda serverless build tooling output from "main" to "bootstrap". Refer to https://aws.amazon.com/blogs/compute/migrating-aws-lambda-functions-from-the-go1-x-runtime-to-the-custom-runtime-on-amazon-linux-2/ for migration steps [#3852](https://github.com/grafana/tempo/pull/3852) (@zatlodan)
* [CHANGE] Add throughput and SLO metrics in the tags and tag values endpoints [#4148](https://github.com/grafana/tempo/pull/4148) (@electron0zero)
Expand Down
118 changes: 76 additions & 42 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/google/uuid"
Expand All @@ -19,16 +20,16 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

type dropTraceCmd struct {
type dropTracesCmd struct {
backendOptions

TraceID string `arg:"" help:"trace ID to retrieve"`
TenantID string `arg:"" help:"tenant ID to search"`
TraceIDs string `arg:"" help:"Trace IDs to drop"`
DropTrace bool `name:"drop-trace" help:"actually attempt to drop the trace" default:"false"`
}

func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Printf("beginning process to drop trace %v from tenant %v\n", cmd.TraceID, cmd.TenantID)
func (cmd *dropTracesCmd) Run(opts *globalOptions) error {
fmt.Printf("beginning process to drop traces %v from tenant %v\n", cmd.TraceIDs, cmd.TenantID)
fmt.Println("**warning**: compaction must be disabled or a compactor may duplicate a block as this process is rewriting it")
fmt.Println("")
if cmd.DropTrace {
Expand All @@ -38,62 +39,86 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Println("")
}

r, w, c, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}
ctx := context.Background()

id, err := util.HexStringToTraceID(cmd.TraceID)
r, w, c, err := loadBackend(&cmd.backendOptions, opts)
if err != nil {
return err
}

blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id)
if err != nil {
return err
type pair struct {
traceIDs []common.ID
blockMeta *backend.BlockMeta
}
tracesByBlock := map[backend.UUID]pair{}

if len(blocks) == 0 {
fmt.Println("\ntrace not found in any block. aborting")
return nil
}
// Group trace IDs by blocks
ids := strings.Split(cmd.TraceIDs, ",")
for _, id := range ids {
traceID, err := util.HexStringToTraceID(id)
if err != nil {
return err
}

// print out blocks that have the trace id
fmt.Println("\n\ntrace found in:")
for _, block := range blocks {
fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects)
}
// It might be significantly improved if common.BackendBlock supported bulk searches.
blocks, err := blocksWithTraceID(ctx, r, cmd.TenantID, traceID)
if err != nil {
return err
}

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
return nil
if len(blocks) == 0 {
fmt.Printf("\ntrace %s not found in any block. skipping\n", util.TraceIDToHexString(traceID))
}
for _, block := range blocks {
p, ok := tracesByBlock[block.BlockID]
if !ok {
p = pair{blockMeta: block}
}
p.traceIDs = append(p.traceIDs, traceID)
tracesByBlock[block.BlockID] = p
}
}

fmt.Println("rewriting blocks:")
for _, block := range blocks {
fmt.Printf(" rewriting %v\n", block.BlockID)
newBlock, err := rewriteBlock(context.Background(), r, w, block, id)
// Remove traces from blocks
for _, p := range tracesByBlock {
// print out trace IDs to be removed in the block
ndk marked this conversation as resolved.
Show resolved Hide resolved
strTraceIDs := make([]string, len(p.traceIDs))
for i, tid := range p.traceIDs {
strTraceIDs[i] = util.TraceIDToHexString(tid)
}
fmt.Printf("\nFound %d traces: %v in block: %v\n", len(strTraceIDs), strTraceIDs, p.blockMeta.BlockID)
fmt.Printf("blockInfo: ID: %v, Size: %d Total Traces: %d\n", p.blockMeta.BlockID, p.blockMeta.Size_, p.blockMeta.TotalObjects)

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
continue
}

fmt.Printf(" rewriting %v\n", p.blockMeta.BlockID)
newMeta, err := rewriteBlock(ctx, r, w, p.blockMeta, p.traceIDs)
if err != nil {
return err
}
fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID)
}
if newMeta == nil {
fmt.Printf(" block %v was removed\n", p.blockMeta.BlockID)
} else {
fmt.Printf(" rewrote to new block: %v\n", newMeta.BlockID)
}

fmt.Println("marking old blocks compacted")
for _, block := range blocks {
fmt.Printf(" marking %v\n", block.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID)
fmt.Printf(" marking %v compacted\n", p.blockMeta.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(p.blockMeta.BlockID), p.blockMeta.TenantID)
if err != nil {
return err
}
}

fmt.Println("successfully rewrote blocks dropping requested trace")
if cmd.DropTrace {
fmt.Printf("successfully rewrote blocks dropping requested traces: %v from tenant: %v\n", cmd.TraceIDs, cmd.TenantID)
}

return nil
}

func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceID common.ID) (*backend.BlockMeta, error) {
func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID) (*backend.BlockMeta, error) {
enc, err := encoding.FromVersion(meta.Version)
if err != nil {
return nil, fmt.Errorf("error getting encoder: %w", err)
Expand Down Expand Up @@ -131,7 +156,12 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, traceID)
for _, tid := range traceIDs {
if bytes.Equal(id, tid) {
ndk marked this conversation as resolved.
Show resolved Hide resolved
return true
}
}
return false
},

// setting to prevent panics. should we track and report these?
Expand All @@ -153,20 +183,24 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta
}

if len(out) != 1 {
if meta.TotalObjects == int64(len(traceIDs)) {
// we removed all traces from the block
return nil, nil
}
return nil, fmt.Errorf("expected 1 block, got %d", len(out))
}

newMeta := out[0]

if newMeta.TotalObjects != meta.TotalObjects-1 {
if newMeta.TotalObjects != meta.TotalObjects-int64(len(traceIDs)) {
return nil, fmt.Errorf("expected output to have one less object then in. out: %d in: %d", newMeta.TotalObjects, meta.TotalObjects)
}

return newMeta, nil
}

func blocksWithTraceID(ctx context.Context, r backend.Reader, tenantID string, traceID common.ID) ([]*backend.BlockMeta, error) {
blockIDs, _, err := r.Blocks(context.Background(), tenantID)
blockIDs, _, err := r.Blocks(ctx, tenantID)
if err != nil {
return nil, err
}
Expand All @@ -184,7 +218,7 @@ func blocksWithTraceID(ctx context.Context, r backend.Reader, tenantID string, t
// search here
meta, err := isInBlock(ctx, r, blockNum2, id2, tenantID, traceID)
if err != nil {
fmt.Println("Error querying block:", err)
fmt.Println("\nError querying block:", err)
return
}

Expand All @@ -211,7 +245,7 @@ func isInBlock(ctx context.Context, r backend.Reader, blockNum int, id uuid.UUID
fmt.Print(strconv.Itoa(blockNum))
}

meta, err := r.BlockMeta(context.Background(), id, tenantID)
meta, err := r.BlockMeta(ctx, id, tenantID)
if err != nil && !errors.Is(err, backend.ErrDoesNotExist) {
return nil, err
}
Expand Down
Loading