Skip to content

Commit

Permalink
Merge pull request #84 from metrico/feat/chstats
Browse files Browse the repository at this point in the history
feat: clickhouse stat scraper
  • Loading branch information
akvlad authored May 15, 2024
2 parents e2edb79 + c92097b commit 59700e5
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ghcr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: tag_bump
name: Bump version and push tag
uses: anothrNick/github-tag-action@1.55.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DEFAULT_BUMP: patch
RELEASE_BRANCHES: main
PRERELEASE: true
- name: Log in to the Container registry
uses: docker/login-action@v2.1.0
with:
Expand All @@ -29,6 +37,7 @@ jobs:
ghcr.io/metrico/qryn-otel-collector
tags: |
latest
${{ steps.tag_bump.outputs.new_tag }}
- name: Build and push
uses: docker/build-push-action@v3.2.0
with:
Expand Down
2 changes: 2 additions & 0 deletions cmd/otel-collector/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ import (

"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter"
"github.com/metrico/otel-collector/exporter/qrynexporter"
"github.com/metrico/otel-collector/receiver/chstatsreceiver"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver"
)

Expand Down Expand Up @@ -300,6 +301,7 @@ func components() (otelcol.Factories, error) {
zipkinreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
pyroscopereceiver.NewFactory(),
chstatsreceiver.NewFactory(),
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
Expand Down
59 changes: 59 additions & 0 deletions receiver/chstatsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Pyroscope Receiver

| Status | |
| ------------------------ |---------|
| Stability | [beta] |
| Supported pipeline types | metrics |

The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export.

## Configuration

- `dsn`: sets the Data Source Name (DSN) for the ClickHouse database.
The DSN is a string that contains the necessary information to connect to the database,
such as the host, port, and database name
- `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data.
The queries are specified as a list of strings.
- `timeout`: amount of time between two consecutive stats requests iterations.
The timeout is specified as the duration value like `20s`, `1m`, etc.

## Clickhouse Queries

Each clickhouse query should return two fields:
- labels as array of Tuple(String, String)
- value Float64

Labels should have the `__name__` label with the name of the metric.

For example
```sql
SELECT
[('__name__', 'some_metric'), ('label2', 'val2')]::Array(Tuple(String,String)),
2::Float64
```

## Example

```yaml
receivers:
chstatsreceiver:
dsn: clickhouse://localhost:9000
queries:
- |
SELECT [
('__name__', 'clickhouse_bytes_on_disk'), ('db', database), ('disk', disk_name), ('host', hostname())
],
sum(bytes_on_disk)::Float64
FROM system.parts
WHERE (active = 1) AND (database NOT IN ('system', '_system'))
GROUP BY database, disk_name
exporters:
prometheusremotewrite:
endpoint: http://localhost:3100/prom/remote/write
timeout: 30s
service:
pipelines:
metrics:
receivers: [chstatsreceiver]
exporters: [prometheusremotewrite]
```
33 changes: 33 additions & 0 deletions receiver/chstatsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package chstatsreceiver

import (
"fmt"
"net/url"
"time"

"go.opentelemetry.io/collector/component"
)

// Represents the receiver config within the collector's config.yaml
type Config struct {
DSN string `mapstructure:"dsn"`
Timeout time.Duration `mapstructure:"timeout"`
Queries []string `mapstructure:"queries"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.Timeout < 15*time.Second {
return fmt.Errorf("timeout must be at least 15 seconds")
}
chDSN, err := url.Parse(cfg.DSN)
if err != nil {
return fmt.Errorf("invalid dsn: %w", err)
}
if chDSN.Scheme != "clickhouse" {
return fmt.Errorf("invalid dsn: scheme should be clickhouse://")
}
return nil
}
38 changes: 38 additions & 0 deletions receiver/chstatsreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package chstatsreceiver

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
typeStr = "chstatsreceiver"
defaultTimeout = 15 * time.Second
)

func createDefaultConfig() component.Config {
return &Config{
DSN: "",
Timeout: defaultTimeout,
Queries: []string{},
}
}

func createMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
return &chReceiver{
cfg: cfg.(*Config),
logger: set.Logger,
consumer: consumer,
}, nil
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType(typeStr),
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha))
}
143 changes: 143 additions & 0 deletions receiver/chstatsreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package chstatsreceiver

import (
"bytes"
"context"
"fmt"
"text/template"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type chReceiver struct {
cfg *Config
db clickhouse.Conn
consumer consumer.Metrics
templates []*template.Template
logger *zap.Logger
cancel context.CancelFunc
ticker *time.Ticker
}

func (r *chReceiver) Start(ctx context.Context, _ component.Host) error {
opts, err := clickhouse.ParseDSN(r.cfg.DSN)
if err != nil {
return err
}
db, err := clickhouse.Open(opts)
if err != nil {
return err
}
r.db = db
r.templates = make([]*template.Template, len(r.cfg.Queries))
for i, query := range r.cfg.Queries {
r.templates[i], err = template.New(fmt.Sprintf("tpl-%d", i)).Parse(query)
if err != nil {
return err
}
}

_ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.ticker = time.NewTicker(r.cfg.Timeout)

go r.mainLoop(_ctx)
return nil
}

func (r *chReceiver) mainLoop(ctx context.Context) {
for {
r.logger.Info("tick start")
select {
case <-ctx.Done():
fmt.Println("tick stop")
return
case <-r.ticker.C:
err := r.GetMetrics(ctx)
if err != nil {
r.logger.Error("failed to get metrics", zap.Error(err))
}
}
r.logger.Info("tick end")
}
}

func (r *chReceiver) GetMetrics(ctx context.Context) error {
for _, tpl := range r.templates {
err := r.getMetricsTemplate(ctx, tpl)
if err != nil {
return err
}
}
return nil
}

func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error {
queryBuf := bytes.Buffer{}
params := map[string]any{
"timestamp_ns": time.Now().UnixNano(),
"timestamp_ms": time.Now().UnixMilli(),
"timestamp_s": time.Now().Unix(),
}
err := tpl.Execute(&queryBuf, params)
wrapErr := func(err error) error {
return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err)
}
if err != nil {
return wrapErr(err)
}
rows, err := r.db.Query(ctx, queryBuf.String())
if err != nil {
return wrapErr(err)
}
defer rows.Close()
for rows.Next() {
var (
labels [][]string
value float64
)
err = rows.Scan(&labels, &value)
if err != nil {
return wrapErr(err)
}
metrics := pmetric.NewMetrics()
res := metrics.ResourceMetrics().AppendEmpty()
res.Resource().Attributes()
metric := res.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
data := metric.SetEmptyGauge().DataPoints().AppendEmpty()
for _, label := range labels {
if label[0] == "__name__" {
metric.SetName(label[1])
continue
}
data.Attributes().PutStr(label[0], label[1])
}
data.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
data.SetDoubleValue(value)
select {
case <-ctx.Done():
return nil
default:
err = r.consumer.ConsumeMetrics(ctx, metrics)
if err != nil {
return wrapErr(err)
}
}
}
return nil
}

func (r *chReceiver) Shutdown(_ context.Context) error {
fmt.Println("shutting down")
r.cancel()
r.ticker.Stop()
_ = r.db.Close()
return nil
}

0 comments on commit 59700e5

Please sign in to comment.