diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index b2b0b4f..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,11 +8,18 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/qryn` + +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used -- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. -Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index b025e7a..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index 34dce51..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string { // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -124,7 +124,7 @@ type Trace struct { Tags [][]string `ch:"tags"` } -type TraceV2 struct { +type TempoTrace struct { OID string `ch:"oid"` TraceID []byte `ch:"trace_id"` SpanID []byte `ch:"span_id"` @@ -137,7 +137,7 @@ type TraceV2 struct { Payload string `ch:"payload"` } -type TraceTagsV2 struct { +type TempoTraceTag struct { OID string `ch:"oid"` Date time.Time `ch:"date"` Key string `ch:"key"` diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/trace_batch_processor.go similarity index 61% rename from exporter/qrynexporter/traces_v2.go rename to exporter/qrynexporter/trace_batch_processor.go index 37da8a9..6fcde08 100644 --- a/exporter/qrynexporter/traces_v2.go +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -3,60 +3,61 @@ package qrynexporter import ( "encoding/hex" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type batchV2 struct { +type traceWithTagsBatch struct { driver.Batch - subBatch driver.Batch + tagsBatch driver.Batch } -func (b *batchV2) AppendStruct(data any) error { - _data, ok := data.(*Trace) +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) if !ok { - return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) } - binTraceId, err := unhexAndPad(_data.TraceID, 16) + binTraceId, err := unhexAndPad(ti.TraceID, 16) if err != nil { return err } - binParentID, err := unhexAndPad(_data.ParentID, 8) + binParentID, err := unhexAndPad(ti.ParentID, 8) if err != nil { return err } - binSpanID, err := unhexAndPad(_data.SpanID, 8) + binSpanID, err := unhexAndPad(ti.SpanID, 8) if err != nil { return err } - trace := &TraceV2{ + trace := &TempoTrace{ OID: "0", TraceID: binTraceId, SpanID: binSpanID, ParentID: binParentID, - Name: _data.Name, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, - ServiceName: _data.ServiceName, - PayloadType: _data.PayloadType, - Payload: _data.Payload, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, } err = b.Batch.AppendStruct(trace) if err != nil { return err } - for _, tag := range _data.Tags { - attr := &TraceTagsV2{ + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ OID: "0", Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), Key: tag[0], Val: tag[1], TraceID: binTraceId, SpanID: binSpanID, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, } - err = b.subBatch.AppendStruct(attr) + err = b.tagsBatch.AppendStruct(attr) if err != nil { return err } @@ -64,10 +65,10 @@ func (b *batchV2) AppendStruct(data any) error { return nil } -func (b *batchV2) Abort() error { +func (b *traceWithTagsBatch) Abort() error { var errs [2]error errs[0] = b.Batch.Abort() - errs[1] = b.subBatch.Abort() + errs[1] = b.tagsBatch.Abort() for _, err := range errs { if err != nil { return err @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error { return nil } -func (b *batchV2) Send() error { +func (b *traceWithTagsBatch) Send() error { var errs [2]error errs[0] = b.Batch.Send() - errs[1] = b.subBatch.Send() + errs[1] = b.tagsBatch.Send() for _, err := range errs { if err != nil { return err diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index b1e71f2..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,9 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool - v2 bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -64,11 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -165,11 +168,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) + isCluster := ctx.Value(clusterKey).(bool) var batch driver.Batch var err error - if e.v2 { - batch, err = e.prepareBatchV2(ctx) + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) } else { batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) } @@ -196,7 +199,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } -func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) if err != nil { return nil, err @@ -206,15 +209,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro batch.Abort() return nil, err } - return &batchV2{ - Batch: batch, - subBatch: subBatch, + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, }, nil } // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -387,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -404,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(),