Skip to content

Commit

Permalink
Merge pull request #38 from tomershafir/feat/grafana-pyroscope
Browse files Browse the repository at this point in the history
Add initial pyroscope java pipeline
  • Loading branch information
akvlad authored Jan 3, 2024
2 parents ff0f0b2 + 5ae5bb6 commit 490afcf
Show file tree
Hide file tree
Showing 26 changed files with 4,592 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
.vscode
.DS_Store
.idea
otel-collector
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ build:

.PHONY: run
run:
go run cmd/otel-collector/*.go --config ${CONFIG_FILE}
go run cmd/otel-collector/*.go --config ${CONFIG_FILE} --feature-gates=telemetry.useOtelForInternalMetrics

.PHONY: fmt
fmt:
Expand Down
4 changes: 4 additions & 0 deletions cmd/otel-collector/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ import (
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/multierr"

"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter"
"github.com/metrico/otel-collector/exporter/qrynexporter"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver"
)

func components() (otelcol.Factories, error) {
Expand Down Expand Up @@ -230,6 +232,7 @@ func components() (otelcol.Factories, error) {
zipkinreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
lokireceiver.NewFactory(),
pyroscopereceiver.NewFactory(),
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
Expand All @@ -241,6 +244,7 @@ func components() (otelcol.Factories, error) {

exporters := []exporter.Factory{
qrynexporter.NewFactory(),
clickhouseprofileexporter.NewFactory(),
carbonexporter.NewFactory(),
fileexporter.NewFactory(),
jaegerexporter.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions cmd/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol"

_ "github.com/KimMachineGun/automemlimit" // default == 0.9 * cgroup_memory_limit
_ "go.uber.org/automaxprocs" // default == cgroup_cpu_limit
)

func main() {
Expand Down
26 changes: 22 additions & 4 deletions config/example-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
receivers:
fluentforward:
endpoint: 0.0.0.0:24224
prometheus:
otlp:
protocols:
grpc:
Expand All @@ -10,11 +9,12 @@ receivers:
protocols:
grpc:
endpoint: 'localhost:12345'
pyroscopereceiver:

processors:
batch:
send_batch_size: 1000
timeout: 5s
timeout: 10s
memory_limiter:
check_interval: 2s
limit_mib: 1800
Expand Down Expand Up @@ -51,6 +51,7 @@ processors:
- include: latency
action: update
new_name: traces_spanmetrics_latency

exporters:
qryn:
dsn: tcp://clickhouse-server:9000/cloki?username=qryn&password=demo
Expand All @@ -62,6 +63,16 @@ exporters:
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
clickhouseprofileexporter:
dsn: tcp://0.0.0.0:9000/cloki
timeout: 10s
sending_queue:
queue_size: 100
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
otlp:
endpoint: localhost:4317
tls:
Expand All @@ -73,14 +84,22 @@ extensions:
zpages:
memory_ballast:
size_mib: 1000

service:
telemetry:
metrics:
level: detailed
# logs:
# level: debug
extensions: [pprof, zpages, health_check]
pipelines:
logs:
receivers: [fluentforward, otlp]
processors: [memory_limiter, resourcedetection/system, batch]
exporters: [qryn]
logs/profiles:
receivers: [pyroscopereceiver]
processors: [batch]
exporters: [clickhouseprofileexporter]
traces:
receivers: [otlp]
processors: [resourcedetection/system, spanmetrics, batch]
Expand All @@ -97,4 +116,3 @@ service:
processors:
[memory_limiter, resourcedetection/system, metricstransform, batch]
exporters: [qryn]

139 changes: 139 additions & 0 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package ch

import (
"context"
"fmt"
"strconv"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
type clickhouseAccessNativeColumnar struct {
conn driver.Conn

logger *zap.Logger
}

type tuple []any

// Connects to clickhouse and checks the connection's health, returning a new native client
func NewClickhouseAccessNativeColumnar(opts *clickhouse.Options, logger *zap.Logger) (*clickhouseAccessNativeColumnar, error) {
c, err := clickhouse.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to connect to clickhouse: %w", err)
}
nc := &clickhouseAccessNativeColumnar{
conn: c,
logger: logger,
}
if err = c.Ping(context.Background()); err != nil {
nc.logger.Warn(fmt.Sprintf("failed to ping clickhouse server: %s", err.Error()))
}
return nc, nil
}

// Inserts a profile batch into the clickhouse server using columnar native protocol
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}

// this implementation is tightly coupled to how pyroscope-java and pyroscopereciver work,
// specifically receiving a single profile at a time from the agent,
// and thus each batched resource logs slice contains a single log record
rl := ls.ResourceLogs()
sz := rl.Len()

timestamp_ns := make([]uint64, sz)
typ := make([]string, sz)
service_name := make([]string, sz)
period_type := make([]string, sz)
period_unit := make([]string, sz)
tags := make([][]tuple, sz)
duration_ns := make([]uint64, sz)
payload_type := make([]string, sz)
payload := make([][]byte, sz)

var (
r plog.LogRecord
m pcommon.Map
tmp pcommon.Value
tm map[string]any
)
for i := 0; i < sz; i++ {
r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0)
m = r.Attributes()

timestamp_ns[i] = uint64(r.Timestamp())

tmp, _ = m.Get("type")
typ[i] = tmp.AsString()

tmp, _ = m.Get("service_name")
service_name[i] = tmp.AsString()

tmp, _ = m.Get("period_type")
period_type[i] = tmp.AsString()

tmp, _ = m.Get("period_unit")
period_unit[i] = tmp.AsString()

tmp, _ = m.Get("tags")
tm = tmp.Map().AsRaw()
tag, j := make([]tuple, len(tm)), 0
for k, v := range tm {
tag[j] = tuple{k, v.(string)}
j++
}
tags[i] = tag

tmp, _ = m.Get("duration_ns")
duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64)

tmp, _ = m.Get("payload_type")
payload_type[i] = tmp.AsString()

payload[i] = r.Body().Bytes().AsRaw()
}

// column order here should match table column order
if err := b.Column(0).Append(timestamp_ns); err != nil {
return err
}
if err := b.Column(1).Append(typ); err != nil {
return err
}
if err := b.Column(2).Append(service_name); err != nil {
return err
}
if err := b.Column(3).Append(period_type); err != nil {
return err
}
if err := b.Column(4).Append(period_unit); err != nil {
return err
}
if err := b.Column(5).Append(tags); err != nil {
return err
}
if err := b.Column(6).Append(duration_ns); err != nil {
return err
}
if err := b.Column(7).Append(payload_type); err != nil {
return err
}
if err := b.Column(8).Append(payload); err != nil {
return err
}
return b.Send()
}

// Closes the clickhouse connection pool
func (ch *clickhouseAccessNativeColumnar) Shutdown() error {
return ch.conn.Close()
}
39 changes: 39 additions & 0 deletions exporter/clickhouseprofileexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package clickhouseprofileexporter

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Represents the receiver config within the collector's config.yaml
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
QueueSettings `mapstructure:"sending_queue"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`
// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
Dsn string `mapstructure:"dsn"`
}

type QueueSettings struct {
// Length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforceQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
87 changes: 87 additions & 0 deletions exporter/clickhouseprofileexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package clickhouseprofileexporter

import (
"context"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter/ch"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

const (
errorCodeError = "1"
errorCodeSuccess = ""
)

type clickhouseProfileExporter struct {
cfg *Config
set *exporter.CreateSettings
logger *zap.Logger
meter metric.Meter

ch clickhouseAccess
}

type clickhouseAccess interface {
// Inserts a profile batch into the clickhouse server
InsertBatch(profiles plog.Logs) error

// Shuts down the clickhouse connection
Shutdown() error
}

// TODO: batch like this https://github.com/open-telemetry/opentelemetry-collector/issues/8122
func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSettings, cfg *Config) (*clickhouseProfileExporter, error) {
exp := &clickhouseProfileExporter{
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
}
opts, err := clickhouse.ParseDSN(cfg.Dsn)
if err != nil {
return nil, fmt.Errorf("failed to parse clickhouse dsn: %w", err)
}
ch, err := ch.NewClickhouseAccessNativeColumnar(opts, exp.logger)
if err != nil {
return nil, fmt.Errorf("failed to init native ch storage: %w", err)
}
exp.ch = ch
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Sends the profiles to clickhouse server using the configured connection
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
start := time.Now().UnixMilli()
if err := exp.ch.InsertBatch(logs); err != nil {
otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len()))
return nil
}

func newOtelcolAttrSetBatch(errorCode string) *attribute.Set {
s := attribute.NewSet(attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)})
return &s
}

// Shuts down the exporter, by shutting down the ch connection pull
func (exp *clickhouseProfileExporter) Shutdown(ctx context.Context) error {
if err := exp.ch.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown: %w", err)
}
return nil
}
Loading

0 comments on commit 490afcf

Please sign in to comment.