From 45fdef0d5f82108f28e44cfa11360d10c2bc4e35 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:14:43 +0800 Subject: [PATCH 1/4] refactor: improve the code structure and documentation of qrynexporter. Rename the batchV2 struct to traceWithTagsBatch to improve code readability. Update the names of struct fields to make them more descriptive. Rename the traces_v2.go file to trace_batch_processor.go. Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning. Optimize README.md to provide more detailed configuration instructions. These changes are aimed at improving code quality, maintainability, and documentation clarity. --- exporter/qrynexporter/README.md | 14 +++++ exporter/qrynexporter/config.go | 4 ++ exporter/qrynexporter/logs.go | 2 +- exporter/qrynexporter/metrics.go | 2 +- exporter/qrynexporter/schema.go | 10 ++-- ...{traces_v2.go => trace_batch_processor.go} | 51 ++++++++++--------- exporter/qrynexporter/traces.go | 39 +++++++++----- 7 files changed, 76 insertions(+), 46 deletions(-) rename exporter/qrynexporter/{traces_v2.go => trace_batch_processor.go} (61%) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index b2b0b4f..ce0a325 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,11 +8,25 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/?database=cloki` +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. + +<<<<<<< HEAD - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used - `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. Data ingestion is sess performant but more evenly distributed +======= +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index b025e7a..176c28c 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,10 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` +<<<<<<< HEAD +======= + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index 34dce51..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string { // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -124,7 +124,7 @@ type Trace struct { Tags [][]string `ch:"tags"` } -type TraceV2 struct { +type TempoTrace struct { OID string `ch:"oid"` TraceID []byte `ch:"trace_id"` SpanID []byte `ch:"span_id"` @@ -137,7 +137,7 @@ type TraceV2 struct { Payload string `ch:"payload"` } -type TraceTagsV2 struct { +type TempoTraceTag struct { OID string `ch:"oid"` Date time.Time `ch:"date"` Key string `ch:"key"` diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/trace_batch_processor.go similarity index 61% rename from exporter/qrynexporter/traces_v2.go rename to exporter/qrynexporter/trace_batch_processor.go index 37da8a9..6fcde08 100644 --- a/exporter/qrynexporter/traces_v2.go +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -3,60 +3,61 @@ package qrynexporter import ( "encoding/hex" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type batchV2 struct { +type traceWithTagsBatch struct { driver.Batch - subBatch driver.Batch + tagsBatch driver.Batch } -func (b *batchV2) AppendStruct(data any) error { - _data, ok := data.(*Trace) +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) if !ok { - return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) } - binTraceId, err := unhexAndPad(_data.TraceID, 16) + binTraceId, err := unhexAndPad(ti.TraceID, 16) if err != nil { return err } - binParentID, err := unhexAndPad(_data.ParentID, 8) + binParentID, err := unhexAndPad(ti.ParentID, 8) if err != nil { return err } - binSpanID, err := unhexAndPad(_data.SpanID, 8) + binSpanID, err := unhexAndPad(ti.SpanID, 8) if err != nil { return err } - trace := &TraceV2{ + trace := &TempoTrace{ OID: "0", TraceID: binTraceId, SpanID: binSpanID, ParentID: binParentID, - Name: _data.Name, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, - ServiceName: _data.ServiceName, - PayloadType: _data.PayloadType, - Payload: _data.Payload, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, } err = b.Batch.AppendStruct(trace) if err != nil { return err } - for _, tag := range _data.Tags { - attr := &TraceTagsV2{ + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ OID: "0", Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), Key: tag[0], Val: tag[1], TraceID: binTraceId, SpanID: binSpanID, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, } - err = b.subBatch.AppendStruct(attr) + err = b.tagsBatch.AppendStruct(attr) if err != nil { return err } @@ -64,10 +65,10 @@ func (b *batchV2) AppendStruct(data any) error { return nil } -func (b *batchV2) Abort() error { +func (b *traceWithTagsBatch) Abort() error { var errs [2]error errs[0] = b.Batch.Abort() - errs[1] = b.subBatch.Abort() + errs[1] = b.tagsBatch.Abort() for _, err := range errs { if err != nil { return err @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error { return nil } -func (b *batchV2) Send() error { +func (b *traceWithTagsBatch) Send() error { var errs [2]error errs[0] = b.Batch.Send() - errs[1] = b.subBatch.Send() + errs[1] = b.tagsBatch.Send() for _, err := range errs { if err != nil { return err diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index b1e71f2..a63e27e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,9 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool - v2 bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -64,11 +67,19 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ +<<<<<<< HEAD logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, +======= + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClientSideTraceProcessing, +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -165,11 +176,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) + isCluster := ctx.Value(clusterKey).(bool) var batch driver.Batch var err error - if e.v2 { - batch, err = e.prepareBatchV2(ctx) + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) } else { batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) } @@ -196,7 +207,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } -func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) if err != nil { return nil, err @@ -206,15 +217,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro batch.Abort() return nil, err } - return &batchV2{ - Batch: batch, - subBatch: subBatch, + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, }, nil } // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -387,7 +398,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -404,7 +415,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(), From e4bf5c0a03d6ae554ed6c6a90eff96a13d1270b7 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:31:46 +0800 Subject: [PATCH 2/4] fix: rebase --- exporter/qrynexporter/README.md | 7 ------- exporter/qrynexporter/config.go | 3 --- exporter/qrynexporter/traces.go | 8 -------- 3 files changed, 18 deletions(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index ce0a325..a844bb0 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -20,13 +20,6 @@ - Default: `true` - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -<<<<<<< HEAD -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used -- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. -Data ingestion is sess performant but more evenly distributed -======= ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 176c28c..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,10 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` -<<<<<<< HEAD -======= // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index a63e27e..fd56330 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -67,19 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ -<<<<<<< HEAD - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, -======= logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, clientSide: cfg.ClientSideTraceProcessing, ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) From a4911bc8dd0fd4506749efed315462d5d1017a28 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:33:47 +0800 Subject: [PATCH 3/4] docs: fix --- exporter/qrynexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index a844bb0..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -9,7 +9,7 @@ # Configuration options: - `dsn` (required): Data Source Name for Clickhouse. - - Example: `tcp://localhost:9000/?database=cloki` + - Example: `tcp://localhost:9000/qryn` - `clustered_clickhouse` (required): - Type: boolean From cb44a9995f8cd0cbf529ed5cc211d0d218d3804c Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:37:37 +0800 Subject: [PATCH 4/4] feat: need cfg.ClusteredClickhouse --- exporter/qrynexporter/traces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index fd56330..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -71,7 +71,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - clientSide: cfg.ClientSideTraceProcessing, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))