diff --git a/.github/workflows/ghcr.yml b/.github/workflows/ghcr.yml index 8c17bba..cedd836 100644 --- a/.github/workflows/ghcr.yml +++ b/.github/workflows/ghcr.yml @@ -14,6 +14,14 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - id: tag_bump + name: Bump version and push tag + uses: anothrNick/github-tag-action@1.55.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + DEFAULT_BUMP: patch + RELEASE_BRANCHES: main + PRERELEASE: true - name: Log in to the Container registry uses: docker/login-action@v2.1.0 with: @@ -29,6 +37,7 @@ jobs: ghcr.io/metrico/qryn-otel-collector tags: | latest + ${{ steps.tag_bump.outputs.new_tag }} - name: Build and push uses: docker/build-push-action@v3.2.0 with: diff --git a/cmd/otel-collector/components.go b/cmd/otel-collector/components.go index 6a53e4c..e4fa6a7 100644 --- a/cmd/otel-collector/components.go +++ b/cmd/otel-collector/components.go @@ -165,6 +165,7 @@ import ( "github.com/metrico/otel-collector/exporter/clickhouseprofileexporter" "github.com/metrico/otel-collector/exporter/qrynexporter" + "github.com/metrico/otel-collector/receiver/chstatsreceiver" "github.com/metrico/otel-collector/receiver/pyroscopereceiver" ) @@ -300,6 +301,7 @@ func components() (otelcol.Factories, error) { zipkinreceiver.NewFactory(), zookeeperreceiver.NewFactory(), pyroscopereceiver.NewFactory(), + chstatsreceiver.NewFactory(), } for _, rcv := range factories.Receivers { receivers = append(receivers, rcv) diff --git a/receiver/chstatsreceiver/README.md b/receiver/chstatsreceiver/README.md new file mode 100644 index 0000000..527cb6e --- /dev/null +++ b/receiver/chstatsreceiver/README.md @@ -0,0 +1,59 @@ +# Pyroscope Receiver + +| Status | | +| ------------------------ |---------| +| Stability | [beta] | +| Supported pipeline types | metrics | + +The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export. + +## Configuration + +- `dsn`: sets the Data Source Name (DSN) for the ClickHouse database. +The DSN is a string that contains the necessary information to connect to the database, +such as the host, port, and database name +- `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data. +The queries are specified as a list of strings. +- `timeout`: amount of time between two consecutive stats requests iterations. +The timeout is specified as the duration value like `20s`, `1m`, etc. + +## Clickhouse Queries + +Each clickhouse query should return two fields: +- labels as array of Tuple(String, String) +- value Float64 + +Labels should have the `__name__` label with the name of the metric. + +For example +```sql +SELECT + [('__name__', 'some_metric'), ('label2', 'val2')]::Array(Tuple(String,String)), + 2::Float64 +``` + +## Example + +```yaml +receivers: + chstatsreceiver: + dsn: clickhouse://localhost:9000 + queries: + - | + SELECT [ + ('__name__', 'clickhouse_bytes_on_disk'), ('db', database), ('disk', disk_name), ('host', hostname()) + ], + sum(bytes_on_disk)::Float64 + FROM system.parts + WHERE (active = 1) AND (database NOT IN ('system', '_system')) + GROUP BY database, disk_name +exporters: + prometheusremotewrite: + endpoint: http://localhost:3100/prom/remote/write + timeout: 30s +service: + pipelines: + metrics: + receivers: [chstatsreceiver] + exporters: [prometheusremotewrite] +``` diff --git a/receiver/chstatsreceiver/config.go b/receiver/chstatsreceiver/config.go new file mode 100644 index 0000000..7609391 --- /dev/null +++ b/receiver/chstatsreceiver/config.go @@ -0,0 +1,33 @@ +package chstatsreceiver + +import ( + "fmt" + "net/url" + "time" + + "go.opentelemetry.io/collector/component" +) + +// Represents the receiver config within the collector's config.yaml +type Config struct { + DSN string `mapstructure:"dsn"` + Timeout time.Duration `mapstructure:"timeout"` + Queries []string `mapstructure:"queries"` +} + +var _ component.Config = (*Config)(nil) + +// Checks that the receiver configuration is valid +func (cfg *Config) Validate() error { + if cfg.Timeout < 15*time.Second { + return fmt.Errorf("timeout must be at least 15 seconds") + } + chDSN, err := url.Parse(cfg.DSN) + if err != nil { + return fmt.Errorf("invalid dsn: %w", err) + } + if chDSN.Scheme != "clickhouse" { + return fmt.Errorf("invalid dsn: scheme should be clickhouse://") + } + return nil +} diff --git a/receiver/chstatsreceiver/factory.go b/receiver/chstatsreceiver/factory.go new file mode 100644 index 0000000..4ca6461 --- /dev/null +++ b/receiver/chstatsreceiver/factory.go @@ -0,0 +1,38 @@ +package chstatsreceiver + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +const ( + typeStr = "chstatsreceiver" + defaultTimeout = 15 * time.Second +) + +func createDefaultConfig() component.Config { + return &Config{ + DSN: "", + Timeout: defaultTimeout, + Queries: []string{}, + } +} + +func createMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { + return &chReceiver{ + cfg: cfg.(*Config), + logger: set.Logger, + consumer: consumer, + }, nil +} + +func NewFactory() receiver.Factory { + return receiver.NewFactory( + component.MustNewType(typeStr), + createDefaultConfig, + receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha)) +} diff --git a/receiver/chstatsreceiver/receiver.go b/receiver/chstatsreceiver/receiver.go new file mode 100644 index 0000000..42fae50 --- /dev/null +++ b/receiver/chstatsreceiver/receiver.go @@ -0,0 +1,143 @@ +package chstatsreceiver + +import ( + "bytes" + "context" + "fmt" + "text/template" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type chReceiver struct { + cfg *Config + db clickhouse.Conn + consumer consumer.Metrics + templates []*template.Template + logger *zap.Logger + cancel context.CancelFunc + ticker *time.Ticker +} + +func (r *chReceiver) Start(ctx context.Context, _ component.Host) error { + opts, err := clickhouse.ParseDSN(r.cfg.DSN) + if err != nil { + return err + } + db, err := clickhouse.Open(opts) + if err != nil { + return err + } + r.db = db + r.templates = make([]*template.Template, len(r.cfg.Queries)) + for i, query := range r.cfg.Queries { + r.templates[i], err = template.New(fmt.Sprintf("tpl-%d", i)).Parse(query) + if err != nil { + return err + } + } + + _ctx, cancel := context.WithCancel(ctx) + r.cancel = cancel + + r.ticker = time.NewTicker(r.cfg.Timeout) + + go r.mainLoop(_ctx) + return nil +} + +func (r *chReceiver) mainLoop(ctx context.Context) { + for { + r.logger.Info("tick start") + select { + case <-ctx.Done(): + fmt.Println("tick stop") + return + case <-r.ticker.C: + err := r.GetMetrics(ctx) + if err != nil { + r.logger.Error("failed to get metrics", zap.Error(err)) + } + } + r.logger.Info("tick end") + } +} + +func (r *chReceiver) GetMetrics(ctx context.Context) error { + for _, tpl := range r.templates { + err := r.getMetricsTemplate(ctx, tpl) + if err != nil { + return err + } + } + return nil +} + +func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error { + queryBuf := bytes.Buffer{} + params := map[string]any{ + "timestamp_ns": time.Now().UnixNano(), + "timestamp_ms": time.Now().UnixMilli(), + "timestamp_s": time.Now().Unix(), + } + err := tpl.Execute(&queryBuf, params) + wrapErr := func(err error) error { + return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err) + } + if err != nil { + return wrapErr(err) + } + rows, err := r.db.Query(ctx, queryBuf.String()) + if err != nil { + return wrapErr(err) + } + defer rows.Close() + for rows.Next() { + var ( + labels [][]string + value float64 + ) + err = rows.Scan(&labels, &value) + if err != nil { + return wrapErr(err) + } + metrics := pmetric.NewMetrics() + res := metrics.ResourceMetrics().AppendEmpty() + res.Resource().Attributes() + metric := res.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + data := metric.SetEmptyGauge().DataPoints().AppendEmpty() + for _, label := range labels { + if label[0] == "__name__" { + metric.SetName(label[1]) + continue + } + data.Attributes().PutStr(label[0], label[1]) + } + data.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + data.SetDoubleValue(value) + select { + case <-ctx.Done(): + return nil + default: + err = r.consumer.ConsumeMetrics(ctx, metrics) + if err != nil { + return wrapErr(err) + } + } + } + return nil +} + +func (r *chReceiver) Shutdown(_ context.Context) error { + fmt.Println("shutting down") + r.cancel() + r.ticker.Stop() + _ = r.db.Close() + return nil +}