Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve the code structure and documentation of qrynexporter. #107

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/qryn`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used
- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters.
Data ingestion is sess performant but more evenly distributed

# Example:
## Simple Trace Data
Expand Down
1 change: 1 addition & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
10 changes: 5 additions & 5 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string {
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -124,7 +124,7 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TraceV2 struct {
type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
Expand All @@ -137,7 +137,7 @@ type TraceV2 struct {
Payload string `ch:"payload"`
}

type TraceTagsV2 struct {
type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,72 @@ package qrynexporter
import (
"encoding/hex"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type batchV2 struct {
type traceWithTagsBatch struct {
driver.Batch
subBatch driver.Batch
tagsBatch driver.Batch
}

func (b *batchV2) AppendStruct(data any) error {
_data, ok := data.(*Trace)
func (b *traceWithTagsBatch) AppendStruct(v any) error {
ti, ok := v.(*TraceInput)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", data)
return fmt.Errorf("invalid data type, expected *Trace, got %T", v)
}
binTraceId, err := unhexAndPad(_data.TraceID, 16)
binTraceId, err := unhexAndPad(ti.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(_data.ParentID, 8)
binParentID, err := unhexAndPad(ti.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(_data.SpanID, 8)
binSpanID, err := unhexAndPad(ti.SpanID, 8)
if err != nil {
return err
}
trace := &TraceV2{
trace := &TempoTrace{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: _data.Name,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
ServiceName: _data.ServiceName,
PayloadType: _data.PayloadType,
Payload: _data.Payload,
Name: ti.Name,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
ServiceName: ti.ServiceName,
PayloadType: ti.PayloadType,
Payload: ti.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range _data.Tags {
attr := &TraceTagsV2{
for _, tag := range ti.Tags {
attr := &TempoTraceTag{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
}
err = b.subBatch.AppendStruct(attr)
err = b.tagsBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *batchV2) Abort() error {
func (b *traceWithTagsBatch) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.subBatch.Abort()
errs[1] = b.tagsBatch.Abort()
for _, err := range errs {
if err != nil {
return err
Expand All @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error {
return nil
}

func (b *batchV2) Send() error {
func (b *traceWithTagsBatch) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.subBatch.Send()
errs[1] = b.tagsBatch.Send()
for _, err := range errs {
if err != nil {
return err
Expand Down
41 changes: 22 additions & 19 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

type contextKey string

const (
spanLinkDataFormat = "%s|%s|%s|%s|%d"
spanLinkDataFormat = "%s|%s|%s|%s|%d"
clusterKey contextKey = "cluster"
)

var delegate = &protojson.MarshalOptions{
Expand All @@ -48,9 +51,9 @@ type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
v2 bool
db clickhouse.Conn
cluster bool
clientSide bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -64,11 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
return nil, err
}
exp := &tracesExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing,
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing,
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -165,11 +168,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {
}

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
isCluster := ctx.Value(clusterKey).(bool)
var batch driver.Batch
var err error
if e.v2 {
batch, err = e.prepareBatchV2(ctx)
if e.clientSide {
batch, err = e.prepareBatchClientSide(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}
Expand All @@ -196,7 +199,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) {
func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
Expand All @@ -206,15 +209,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro
batch.Abort()
return nil, err
}
return &batchV2{
Batch: batch,
subBatch: subBatch,
return &traceWithTagsBatch{
Batch: batch,
tagsBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
_ctx := context.WithValue(ctx, clusterKey, e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
Expand Down Expand Up @@ -387,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte,
return delegate.Marshal(otlpSpan)
}

func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) {
func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) {
durationNano := uint64(span.EndTimestamp() - span.StartTimestamp())
tags = aggregateSpanTags(span, tags)
tags["service.name"] = serviceName
Expand All @@ -404,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName
return nil, fmt.Errorf("failed to marshal span: %w", err)
}

trace := &Trace{
trace := &TraceInput{
TraceID: span.TraceID().String(),
SpanID: span.SpanID().String(),
ParentID: span.ParentSpanID().String(),
Expand Down
Loading