From 0a2a8b71508071a87edd2ca0b2b7ae9bb474dd97 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 26 Nov 2024 15:44:17 +0200 Subject: [PATCH 1/4] ch logs receiver init --- receiver/chstatsreceiver/config.go | 6 ++ receiver/chstatsreceiver/factory.go | 17 ++++-- receiver/chstatsreceiver/receiver.go | 86 +++++++++++++++++++++++----- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/receiver/chstatsreceiver/config.go b/receiver/chstatsreceiver/config.go index 7609391..d1e02cc 100644 --- a/receiver/chstatsreceiver/config.go +++ b/receiver/chstatsreceiver/config.go @@ -8,9 +8,15 @@ import ( "go.opentelemetry.io/collector/component" ) +const ( + RCV_TYPE_METRICS = "metrics" + RCV_TYPE_LOGS = "logs" +) + // Represents the receiver config within the collector's config.yaml type Config struct { DSN string `mapstructure:"dsn"` + Type string `mapstructure:"type"` Timeout time.Duration `mapstructure:"timeout"` Queries []string `mapstructure:"queries"` } diff --git a/receiver/chstatsreceiver/factory.go b/receiver/chstatsreceiver/factory.go index 2a434bb..71e315a 100644 --- a/receiver/chstatsreceiver/factory.go +++ b/receiver/chstatsreceiver/factory.go @@ -24,9 +24,17 @@ func createDefaultConfig() component.Config { func createMetricsReceiver(_ context.Context, set receiver.Settings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { return &chReceiver{ - cfg: cfg.(*Config), - logger: set.Logger, - consumer: consumer, + cfg: cfg.(*Config), + logger: set.Logger, + metricsConsumer: consumer, + }, nil +} + +func createLogsReceiver(_ context.Context, set receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { + return &chReceiver{ + cfg: cfg.(*Config), + logger: set.Logger, + logsConsumer: consumer, }, nil } @@ -34,5 +42,6 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( component.MustNewType(typeStr), createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha)) + receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha), + receiver.WithLogs(createLogsReceiver, component.StabilityLevelAlpha)) } diff --git a/receiver/chstatsreceiver/receiver.go b/receiver/chstatsreceiver/receiver.go index 42fae50..4950873 100644 --- a/receiver/chstatsreceiver/receiver.go +++ b/receiver/chstatsreceiver/receiver.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "fmt" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/sync/errgroup" "text/template" "time" @@ -16,13 +18,14 @@ import ( ) type chReceiver struct { - cfg *Config - db clickhouse.Conn - consumer consumer.Metrics - templates []*template.Template - logger *zap.Logger - cancel context.CancelFunc - ticker *time.Ticker + cfg *Config + db clickhouse.Conn + metricsConsumer consumer.Metrics + logsConsumer consumer.Logs + templates []*template.Template + logger *zap.Logger + cancel context.CancelFunc + ticker *time.Ticker } func (r *chReceiver) Start(ctx context.Context, _ component.Host) error { @@ -70,13 +73,20 @@ func (r *chReceiver) mainLoop(ctx context.Context) { } func (r *chReceiver) GetMetrics(ctx context.Context) error { + g := errgroup.Group{} for _, tpl := range r.templates { - err := r.getMetricsTemplate(ctx, tpl) - if err != nil { - return err - } + _tpl := tpl + g.Go(func() error { + switch r.cfg.Type { + case RCV_TYPE_METRICS: + return r.getMetricsTemplate(ctx, _tpl) + case RCV_TYPE_LOGS: + return r.getLogsTemplate(ctx, _tpl) + } + return nil + }) } - return nil + return g.Wait() } func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error { @@ -125,7 +135,57 @@ func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Templ case <-ctx.Done(): return nil default: - err = r.consumer.ConsumeMetrics(ctx, metrics) + err = r.metricsConsumer.ConsumeMetrics(ctx, metrics) + if err != nil { + return wrapErr(err) + } + } + } + return nil +} + +func (r *chReceiver) getLogsTemplate(ctx context.Context, tpl *template.Template) error { + queryBuf := bytes.Buffer{} + params := map[string]any{ + "timestamp_ns": time.Now().UnixNano(), + "timestamp_ms": time.Now().UnixMilli(), + "timestamp_s": time.Now().Unix(), + } + err := tpl.Execute(&queryBuf, params) + wrapErr := func(err error) error { + return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err) + } + if err != nil { + return wrapErr(err) + } + rows, err := r.db.Query(ctx, queryBuf.String()) + if err != nil { + return wrapErr(err) + } + defer rows.Close() + for rows.Next() { + var ( + labels [][]string + value string + ) + err = rows.Scan(&labels, &value) + if err != nil { + return wrapErr(err) + } + logs := plog.NewLogs() + res := logs.ResourceLogs().AppendEmpty() + res.Resource().Attributes() + log := res.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + for _, label := range labels { + log.Attributes().PutStr(label[0], label[1]) + } + log.Body().SetStr(value) + log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + select { + case <-ctx.Done(): + return nil + default: + err = r.logsConsumer.ConsumeLogs(ctx, logs) if err != nil { return wrapErr(err) } From 0f4ea77ae1b6803e1d5f2b3dd198da7d7b4620de Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 26 Nov 2024 16:37:32 +0200 Subject: [PATCH 2/4] debug --- receiver/chstatsreceiver/receiver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/chstatsreceiver/receiver.go b/receiver/chstatsreceiver/receiver.go index 4950873..72ac444 100644 --- a/receiver/chstatsreceiver/receiver.go +++ b/receiver/chstatsreceiver/receiver.go @@ -79,6 +79,7 @@ func (r *chReceiver) GetMetrics(ctx context.Context) error { g.Go(func() error { switch r.cfg.Type { case RCV_TYPE_METRICS: + case "": return r.getMetricsTemplate(ctx, _tpl) case RCV_TYPE_LOGS: return r.getLogsTemplate(ctx, _tpl) From d94ce5e949709ba444d198182c6b2739c8ba8f95 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 28 Nov 2024 14:14:48 +0200 Subject: [PATCH 3/4] README --- receiver/chstatsreceiver/README.md | 57 ++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/receiver/chstatsreceiver/README.md b/receiver/chstatsreceiver/README.md index 739bcd2..71855cf 100644 --- a/receiver/chstatsreceiver/README.md +++ b/receiver/chstatsreceiver/README.md @@ -1,17 +1,28 @@ # Clickhouse Statistics Receiver -| Status | | -| ------------------------ |---------| -| Stability | [beta] | -| Supported pipeline types | metrics | +| Status | | +| ------------------------ |----------------| +| Stability | [beta] | +| Supported pipeline types | metrics, logs | + +The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports +metrics and logs from ClickHouse databases. + +It uses the ClickHouse Go client library to connect to the database and execute SQL queries +to retrieve metrics and log data. + +The module is designed to be highly configurable, allowing users to specify the database connection details, +the SQL queries to execute, and the metrics and logs to export. -The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export. ## Configuration - `dsn`: sets the Data Source Name (DSN) for the ClickHouse database. The DSN is a string that contains the necessary information to connect to the database, such as the host, port, and database name +- `type`: specifies the type of data to collect. Valid values are: + - `"metrics"`: collect metrics data (default if not specified) + - `"logs"`: collect log data - `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data. The queries are specified as a list of strings. - `timeout`: amount of time between two consecutive stats requests iterations. @@ -19,6 +30,7 @@ The timeout is specified as the duration value like `20s`, `1m`, etc. ## Clickhouse Queries +### For Metrics (type: "metrics") Each clickhouse query should return two fields: - labels as array of Tuple(String, String) - value Float64 @@ -32,12 +44,27 @@ SELECT 2::Float64 ``` +### For Logs (type: "logs") +Queries for logs should return two fields: +- labels as array of Tuple(String, String) +- message String + +The receiver will automatically convert the query results into log records. + +```sql +SELECT + [('level', 'debug'), ('label2', 'val2')]::Array(Tuple(String,String)), + 'log line to send' +``` + ## Example ```yaml receivers: - chstatsreceiver: + chstatsreceiver/metrics: dsn: clickhouse://localhost:9000 + type: metrics + timeout: 30s queries: - | SELECT [ @@ -47,13 +74,29 @@ receivers: FROM system.parts WHERE (active = 1) AND (database NOT IN ('system', '_system')) GROUP BY database, disk_name + chstatsreceiver/logs: + dsn: clickhouse://localhost:9000 + type: logs + timeout: 1m + queries: + - | + SELECT + [('job', 'clickhouse_query_logs')], + format('id={} query={}', query_id, query) + FROM system.query_id + WHERE event_time > now() - INTERVAL 1 MINUTE exporters: prometheusremotewrite: endpoint: http://localhost:3100/prom/remote/write timeout: 30s + loki: + endpoint: http://localhost:3100/loki/api/v1/push service: pipelines: metrics: - receivers: [chstatsreceiver] + receivers: [chstatsreceiver/metrics] exporters: [prometheusremotewrite] + logs: + receivers: [chstatsreceiver/logs] + exporters: [loki] ``` From b1a6a3cdef4d41073bb851631f4e704c0b3021bf Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 28 Nov 2024 14:29:40 +0200 Subject: [PATCH 4/4] goimport --- receiver/chstatsreceiver/receiver.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/receiver/chstatsreceiver/receiver.go b/receiver/chstatsreceiver/receiver.go index 72ac444..6d5fa81 100644 --- a/receiver/chstatsreceiver/receiver.go +++ b/receiver/chstatsreceiver/receiver.go @@ -4,11 +4,12 @@ import ( "bytes" "context" "fmt" - "go.opentelemetry.io/collector/pdata/plog" - "golang.org/x/sync/errgroup" "text/template" "time" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/sync/errgroup" + "github.com/ClickHouse/clickhouse-go/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer"