From d9e4f30a000ffd0ca481d5224fa06dcf2de06770 Mon Sep 17 00:00:00 2001 From: Andrey Karpov Date: Sat, 9 Nov 2024 13:22:14 +0100 Subject: [PATCH] tempo-cli: support dropping multiple traces in a single operation --- CHANGELOG.md | 1 + cmd/tempo-cli/cmd-rewrite-blocks.go | 111 +++++++---- cmd/tempo-cli/cmd-rewrite-blocks_test.go | 202 +++++++++++++++++++++ cmd/tempo-cli/main.go | 2 +- docs/sources/tempo/operations/tempo_cli.md | 17 +- 5 files changed, 288 insertions(+), 45 deletions(-) create mode 100644 cmd/tempo-cli/cmd-rewrite-blocks_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b8bbb742b..7ff37651a0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## main / unreleased +* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk) * [CHANGE] **BREAKING CHANGE** Change the AWS Lambda serverless build tooling output from "main" to "bootstrap". Refer to https://aws.amazon.com/blogs/compute/migrating-aws-lambda-functions-from-the-go1-x-runtime-to-the-custom-runtime-on-amazon-linux-2/ for migration steps [#3852](https://github.com/grafana/tempo/pull/3852) (@zatlodan) * [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210) * [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio) diff --git a/cmd/tempo-cli/cmd-rewrite-blocks.go b/cmd/tempo-cli/cmd-rewrite-blocks.go index ea5f22069be..72398ed60df 100644 --- a/cmd/tempo-cli/cmd-rewrite-blocks.go +++ b/cmd/tempo-cli/cmd-rewrite-blocks.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strconv" + "strings" "github.com/go-kit/log" "github.com/google/uuid" @@ -19,16 +20,16 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type dropTraceCmd struct { +type dropTracesCmd struct { backendOptions - TraceID string `arg:"" help:"trace ID to retrieve"` + TraceIDs string `arg:"" help:"Trace IDs to drop"` TenantID string `arg:"" help:"tenant ID to search"` DropTrace bool `name:"drop-trace" help:"actually attempt to drop the trace" default:"false"` } -func (cmd *dropTraceCmd) Run(ctx *globalOptions) error { - fmt.Printf("beginning process to drop trace %v from tenant %v\n", cmd.TraceID, cmd.TenantID) +func (cmd *dropTracesCmd) Run(opts *globalOptions) error { + fmt.Printf("beginning process to drop traces %v from tenant %v\n", cmd.TraceIDs, cmd.TenantID) fmt.Println("**warning**: compaction must be disabled or a compactor may duplicate a block as this process is rewriting it") fmt.Println("") if cmd.DropTrace { @@ -38,51 +39,74 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error { fmt.Println("") } - r, w, c, err := loadBackend(&cmd.backendOptions, ctx) - if err != nil { - return err - } + ctx := context.Background() - id, err := util.HexStringToTraceID(cmd.TraceID) + r, w, c, err := loadBackend(&cmd.backendOptions, opts) if err != nil { return err } - blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id) - if err != nil { - return err + type pair struct { + traceIDs []common.ID + blockMeta *backend.BlockMeta } + tracesByBlock := map[backend.UUID]pair{} - if len(blocks) == 0 { - fmt.Println("\ntrace not found in any block. aborting") - return nil - } + // Group trace IDs by blocks + ids := strings.Split(cmd.TraceIDs, ",") + for _, id := range ids { + traceID, err := util.HexStringToTraceID(id) + if err != nil { + return err + } - // print out blocks that have the trace id - fmt.Println("\n\ntrace found in:") - for _, block := range blocks { - fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects) - } + // It might be significantly improved if common.BackendBlock supported bulk searches. + blocks, err := blocksWithTraceID(ctx, r, cmd.TenantID, traceID) + if err != nil { + return err + } - if !cmd.DropTrace { - fmt.Println("**not dropping trace, use --drop-trace to actually drop**") - return nil + if len(blocks) == 0 { + fmt.Printf("\ntrace %s not found in any block. skipping\n", util.TraceIDToHexString(traceID)) + } + for _, block := range blocks { + p, ok := tracesByBlock[block.BlockID] + if !ok { + p = pair{blockMeta: block} + } + p.traceIDs = append(p.traceIDs, traceID) + tracesByBlock[block.BlockID] = p + } } - fmt.Println("rewriting blocks:") - for _, block := range blocks { - fmt.Printf(" rewriting %v\n", block.BlockID) - newBlock, err := rewriteBlock(context.Background(), r, w, block, id) + // Remove traces from blocks + for _, p := range tracesByBlock { + // print out trace IDs to be removed in the block + strTraceIDs := make([]string, len(p.traceIDs)) + for i, tid := range p.traceIDs { + strTraceIDs[i] = util.TraceIDToHexString(tid) + } + fmt.Printf("\nFound %d traces: %v in block: %v\n", len(strTraceIDs), strTraceIDs, p.blockMeta.BlockID) + fmt.Printf("blockInfo: ID: %v, Size: %d Total Traces: %d\n", p.blockMeta.BlockID, p.blockMeta.Size_, p.blockMeta.TotalObjects) + + if !cmd.DropTrace { + fmt.Println("**not dropping trace, use --drop-trace to actually drop**") + continue + } + + fmt.Printf(" rewriting %v\n", p.blockMeta.BlockID) + newMeta, err := rewriteBlock(ctx, r, w, p.blockMeta, p.traceIDs) if err != nil { return err } - fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID) - } + if newMeta == nil { + fmt.Printf(" block %v was removed\n", p.blockMeta.BlockID) + } else { + fmt.Printf(" rewrote to new block: %v\n", newMeta.BlockID) + } - fmt.Println("marking old blocks compacted") - for _, block := range blocks { - fmt.Printf(" marking %v\n", block.BlockID) - err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID) + fmt.Printf(" marking %v compacted\n", p.blockMeta.BlockID) + err = c.MarkBlockCompacted((uuid.UUID)(p.blockMeta.BlockID), p.blockMeta.TenantID) if err != nil { return err } @@ -93,7 +117,7 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error { return nil } -func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceID common.ID) (*backend.BlockMeta, error) { +func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID) (*backend.BlockMeta, error) { enc, err := encoding.FromVersion(meta.Version) if err != nil { return nil, fmt.Errorf("error getting encoder: %w", err) @@ -131,7 +155,12 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta // hook to drop the trace DropObject: func(id common.ID) bool { - return bytes.Equal(id, traceID) + for _, tid := range traceIDs { + if bytes.Equal(id, tid) { + return true + } + } + return false }, // setting to prevent panics. should we track and report these? @@ -152,12 +181,16 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta } if len(out) != 1 { + if meta.TotalObjects == int64(len(traceIDs)) { + // we removed all traces from the block + return nil, nil + } return nil, fmt.Errorf("expected 1 block, got %d", len(out)) } newMeta := out[0] - if newMeta.TotalObjects != meta.TotalObjects-1 { + if newMeta.TotalObjects != meta.TotalObjects-int64(len(traceIDs)) { return nil, fmt.Errorf("expected output to have one less object then in. out: %d in: %d", newMeta.TotalObjects, meta.TotalObjects) } @@ -165,7 +198,7 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta } func blocksWithTraceID(ctx context.Context, r backend.Reader, tenantID string, traceID common.ID) ([]*backend.BlockMeta, error) { - blockIDs, _, err := r.Blocks(context.Background(), tenantID) + blockIDs, _, err := r.Blocks(ctx, tenantID) if err != nil { return nil, err } @@ -210,7 +243,7 @@ func isInBlock(ctx context.Context, r backend.Reader, blockNum int, id uuid.UUID fmt.Print(strconv.Itoa(blockNum)) } - meta, err := r.BlockMeta(context.Background(), id, tenantID) + meta, err := r.BlockMeta(ctx, id, tenantID) if err != nil && !errors.Is(err, backend.ErrDoesNotExist) { return nil, err } diff --git a/cmd/tempo-cli/cmd-rewrite-blocks_test.go b/cmd/tempo-cli/cmd-rewrite-blocks_test.go new file mode 100644 index 00000000000..33e3f5c479f --- /dev/null +++ b/cmd/tempo-cli/cmd-rewrite-blocks_test.go @@ -0,0 +1,202 @@ +package main + +import ( + "bytes" + "context" + "io" + "sort" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/parquet-go/parquet-go" + "github.com/stretchr/testify/require" + + tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/grafana/tempo/pkg/parquetquery" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/grafana/tempo/tempodb/encoding/vparquet4" +) + +func TestDropTraceCmd(t *testing.T) { + testCase := func(t *testing.T, blocksNum int, tracesNum int, deleteEvery int) { + cmd := dropTracesCmd{ + backendOptions: backendOptions{ + Backend: "local", + Bucket: t.TempDir(), + }, + TenantID: "single-tenant", + DropTrace: true, + } + generateTestBlocks(t, cmd.backendOptions.Bucket, cmd.TenantID, blocksNum, tracesNum) + + before := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID) + + var expectedAfter, toRemove []string + for i, traceID := range before { + if i%deleteEvery == 0 { + toRemove = append(toRemove, traceID) + } else { + expectedAfter = append(expectedAfter, traceID) + } + } + cmd.TraceIDs = strings.Join(toRemove, ",") + + err := cmd.Run(&globalOptions{}) + require.NoError(t, err) + + after := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID) + + require.ElementsMatch(t, after, expectedAfter) + } + + testCase(t, 1, 10, 3) + testCase(t, 2, 5, 3) + testCase(t, 2, 5, 1) +} + +func generateTestBlocks(t *testing.T, tempDir string, tenantID string, blockCount int, traceCount int) { + t.Helper() + + rawR, rawW, _, err := local.New(&local.Config{ + Path: tempDir, + }) + require.NoError(t, err) + + r := backend.NewReader(rawR) + w := backend.NewWriter(rawW) + ctx := context.Background() + + cfg := &common.BlockConfig{ + BloomFP: 0.01, + BloomShardSizeBytes: 100 * 1024, + } + + for bn := 0; bn < blockCount; bn++ { + traces := newTestTraces(traceCount) + iter := &testIterator{traces: traces} + meta := backend.NewBlockMeta(tenantID, uuid.New(), vparquet4.VersionString, backend.EncNone, "") + meta.TotalObjects = int64(len(iter.traces)) + _, err := vparquet4.CreateBlock(ctx, cfg, meta, iter, r, w) + require.NoError(t, err) + } +} + +func getAllTraceIDs(t *testing.T, dir string, tenant string) []string { + t.Helper() + + rawR, _, _, err := local.New(&local.Config{ + Path: dir, + }) + require.NoError(t, err) + + reader := backend.NewReader(rawR) + ctx := context.Background() + + tenants, err := reader.Tenants(ctx) + require.NoError(t, err) + require.Equal(t, []string{tenant}, tenants) + + blocks, _, err := reader.Blocks(ctx, tenant) + require.NoError(t, err) + + var traceIDs []string + for _, block := range blocks { + meta, err := reader.BlockMeta(ctx, block, tenant) + require.NoError(t, err) + rr := vparquet4.NewBackendReaderAt(ctx, reader, vparquet4.DataFileName, meta) + br := tempo_io.NewBufferedReaderAt(rr, int64(meta.Size_), 2*1024*1024, 64) + parquetSchema := parquet.SchemaOf(&vparquet4.Trace{}) + o := []parquet.FileOption{ + parquet.SkipBloomFilters(true), + parquet.SkipPageIndex(true), + parquet.FileSchema(parquetSchema), + parquet.FileReadMode(parquet.ReadModeAsync), + } + pf, err := parquet.OpenFile(br, int64(meta.Size_), o...) + require.NoError(t, err) + r := parquet.NewReader(pf, parquetSchema) + defer func() { + err := r.Close() + require.NoError(t, err) + }() + traceIDIndex, _ := parquetquery.GetColumnIndexByPath(pf, vparquet4.TraceIDColumnName) + require.GreaterOrEqual(t, traceIDIndex, 0) + defer func() { + err := r.Close() + require.NoError(t, err) + }() + + for read := int64(0); read < r.NumRows(); { + rows := make([]parquet.Row, r.NumRows()) + n, err := r.ReadRows(rows) + require.NoError(t, err) + require.Greater(t, n, 0) + rows = rows[:n] + read += int64(n) + + getTraceID := func(row parquet.Row) common.ID { + for _, v := range row { + if v.Column() == traceIDIndex { + return v.ByteArray() + } + } + + return nil + } + + for _, row := range rows { + traceID := getTraceID(row) + traceIDs = append(traceIDs, util.TraceIDToHexString(traceID)) + } + } + + // Ensure that we read all rows + _, err = r.ReadRows([]parquet.Row{{}}) + require.ErrorIs(t, err, io.EOF) + } + + return traceIDs +} + +type testTrace struct { + traceID common.ID + trace *tempopb.Trace +} + +type testIterator struct { + traces []testTrace +} + +func newTestTraces(traceCount int) []testTrace { + traces := make([]testTrace, 0, traceCount) + + for i := 0; i < traceCount; i++ { + traceID := test.ValidTraceID(nil) + trace := test.MakeTraceWithTags(traceID, "megaservice", int64(i)) + traces = append(traces, testTrace{traceID: traceID, trace: trace}) + } + + sort.Slice(traces, func(i, j int) bool { + return bytes.Compare(traces[i].traceID, traces[j].traceID) == -1 + }) + + return traces +} + +func (i *testIterator) Next(context.Context) (common.ID, *tempopb.Trace, error) { + if len(i.traces) == 0 { + return nil, nil, io.EOF + } + tr := i.traces[0] + i.traces = i.traces[1:] + return tr.traceID, tr.trace, nil +} + +func (i *testIterator) Close() { +} diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index 3d046e79dd5..651f6d09586 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -76,7 +76,7 @@ var cli struct { } `cmd:""` RewriteBlocks struct { - DropTrace dropTraceCmd `cmd:"" help:"rewrite blocks with a given trace id redacted"` + DropTraces dropTracesCmd `cmd:"" help:"rewrite blocks with given trace ids redacted"` } `cmd:""` Parquet struct { diff --git a/docs/sources/tempo/operations/tempo_cli.md b/docs/sources/tempo/operations/tempo_cli.md index 695583d0715..a946677e489 100644 --- a/docs/sources/tempo/operations/tempo_cli.md +++ b/docs/sources/tempo/operations/tempo_cli.md @@ -506,20 +506,27 @@ Options: tempo-cli analyse blocks --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant ``` -## Drop trace by id +## Drop traces by id -Rewrites all blocks for a tenant that contain a specific trace id. The trace is dropped from +Rewrites all blocks for a tenant that contain a specific trace ids. The traces are dropped from the new blocks and the rewritten blocks are marked compacted so they will be cleaned up. Arguments: - `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups. -- `trace-id` The trace id to drop +- `trace-ids` The comma separated trace ids to drop (also supports single trace id) Options: - [Backend options](#backend-options) -- `--drop-traces` By default this command runs in dry run mode. Supplying this argument causes it to actually rewrite blocks with the trace dropped. +- `--drop-traces` By default this command runs in dry run mode. Supplying this argument causes it to actually rewrite blocks with the traces dropped. **Example:** + +Dropping one trace +```bash +tempo-cli rewrite-blocks drop-trace --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant 04d5f549746c96e4f3daed6202571db2 +``` + +Dropping multiple traces ```bash -tempo-cli rewrite-blocks drop-trace --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant a188ea38aa3a83d74523774ad6728cc8 +tempo-cli rewrite-blocks drop-trace --backend=local --bucket=./cmd/tempo-cli/test-data/ single-tenant 04d5f549746c96e4f3daed6202571db2,111fa1850042aea83c17cd7e674210b8 ```