Skip to content

Commit

Permalink
database_observability: report health of component and collectors (#2392
Browse files Browse the repository at this point in the history
)

Report unhealthy in case of errors when starting up the collectors or
of any collector is stopped during operations.
  • Loading branch information
cristiangreco authored Jan 14, 2025
1 parent cb9f7b0 commit 55d952e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 15 deletions.
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Main (unreleased)
### Enhancements

- Update `prometheus.write.queue` to support v2 for cpu performance. (@mattdurham)
- (_Experimental_) Add health reporting to `database_observability.mysql` component (@cristiangreco)

v1.6.0-rc.0
-----------------
Expand Down Expand Up @@ -132,26 +133,26 @@ v1.6.0-rc.0
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36071
- `otelcol.exporter.datadog`: Stop prefixing `http_server_duration`, `http_server_request_size` and `http_server_response_size` with `otelcol`.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36265
These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with
These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with
https://opentelemetry.io/docs/collector/internal-telemetry/#lists-of-internal-metrics
- `otelcol.receiver.datadog`: Add json handling for the `api/v2/series` endpoint in the datadogreceiver.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36218
- `otelcol.processor.span`: Add a new `keep_original_name` configuration argument
- `otelcol.processor.span`: Add a new `keep_original_name` configuration argument
to keep the original span name when extracting attributes from the span name.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36397
- `pkg/ottl`: Respect the `depth` option when flattening slices using `flatten`.
The `depth` option is also now required to be at least `1`.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36198
- `otelcol.exporter.loadbalancing`: Shutdown exporters during collector shutdown. This fixes a memory leak.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36024
- `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments,
- `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments,
which block the processor startup until metadata is received from Kubernetes.
https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32556
- `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances,
- `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances,
to retain the behavior of `regex` argument in the `annotation` and `label` blocks.
When the feature gate is "deprecated" in the upstream Collector, Alloy users will need to use the transform processor instead.
https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/25128
- `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'.
- `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'.
All TLS client config should now be honored.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36482
- `otelcol.receiver.opencensus`: Do not report error message when OpenCensus receiver is shutdown cleanly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

var rdsRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.([^\.]+)\.(?P<region>[^\.]+)\.rds\.amazonaws\.com`)
Expand All @@ -21,6 +22,8 @@ type ConnectionInfo struct {
DSN string
Registry *prometheus.Registry
InfoMetric *prometheus.GaugeVec

running *atomic.Bool
}

func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
Expand All @@ -35,15 +38,22 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
DSN: args.DSN,
Registry: args.Registry,
InfoMetric: infoMetric,
running: &atomic.Bool{},
}, nil
}

func (c *ConnectionInfo) Name() string {
return "ConnectionInfo"
}

func (c *ConnectionInfo) Start(ctx context.Context) error {
cfg, err := mysql.ParseDSN(c.DSN)
if err != nil {
return err
}

c.running.Store(true)

var (
providerName = "unknown"
providerRegion = "unknown"
Expand All @@ -66,6 +76,11 @@ func (c *ConnectionInfo) Start(ctx context.Context) error {
return nil
}

func (c *ConnectionInfo) Stopped() bool {
return !c.running.Load()
}

func (c *ConnectionInfo) Stop() {
c.Registry.Unregister(c.InfoMetric)
c.running.Store(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/xwb1989/sqlparser"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
Expand Down Expand Up @@ -57,11 +57,15 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
dbConnection: args.DB,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: args.Logger,
logger: log.With(args.Logger, "collector", "QuerySample"),
running: &atomic.Bool{},
}, nil
}

func (c *QuerySample) Name() string {
return "QuerySample"
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")

Expand All @@ -81,6 +85,7 @@ func (c *QuerySample) Start(ctx context.Context) error {
for {
if err := c.fetchQuerySamples(c.ctx); err != nil {
level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err)
c.Stop()
break
}

Expand Down Expand Up @@ -127,7 +132,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
}

if strings.HasSuffix(sampleText, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
Expand Down Expand Up @@ -66,10 +67,10 @@ type SchemaTable struct {
// TODO(cristian): allow configuring cache size (currently unlimited).
cache *expirable.LRU[string, tableInfo]

logger log.Logger

ctx context.Context
cancel context.CancelFunc
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

type tableInfo struct {
Expand All @@ -86,18 +87,29 @@ func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
logger: args.Logger,
logger: log.With(args.Logger, "collector", "SchemaTable"),
running: &atomic.Bool{},
}, nil
}

func (c *SchemaTable) Name() string {
return "SchemaTable"
}

func (c *SchemaTable) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "SchemaTable collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
Expand All @@ -119,6 +131,10 @@ func (c *SchemaTable) Start(ctx context.Context) error {
return nil
}

func (c *SchemaTable) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *SchemaTable) Stop() {
c.cancel()
Expand Down Expand Up @@ -155,6 +171,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
}
}

if len(schemas) == 0 {
level.Info(c.logger).Log("msg", "no schema detected from information_schema.schemata")
return nil
}

tables := []tableInfo{}

for _, schema := range schemas {
Expand Down
54 changes: 52 additions & 2 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -79,12 +80,15 @@ type Exports struct {
}

var (
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

type Collector interface {
Name() string
Start(context.Context) error
Stopped() bool
Stop()
}

Expand All @@ -98,6 +102,7 @@ type Component struct {
baseTarget discovery.Target
collectors []Collector
dbConnection *sql.DB
healthErr *atomic.String
}

func New(opts component.Options, args Arguments) (*Component, error) {
Expand All @@ -107,6 +112,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
receivers: args.ForwardTo,
handler: loki.NewLogsReceiver(),
registry: prometheus.NewRegistry(),
healthErr: atomic.NewString(""),
}

baseTarget, err := c.getBaseTarget()
Expand Down Expand Up @@ -184,6 +190,16 @@ func (c *Component) Update(args component.Arguments) error {

c.args = args.(Arguments)

if err := c.startCollectors(); err != nil {
c.healthErr.Store(err.Error())
return err
}

c.healthErr.Store("")
return nil
}

func (c *Component) startCollectors() error {
dbConnection, err := sql.Open("mysql", formatDSN(string(c.args.DataSourceName), "parseTime=true"))
if err != nil {
return err
Expand Down Expand Up @@ -254,6 +270,40 @@ func (c *Component) Handler() http.Handler {
return promhttp.HandlerFor(c.registry, promhttp.HandlerOpts{})
}

func (c *Component) CurrentHealth() component.Health {
if err := c.healthErr.Load(); err != "" {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: err,
UpdateTime: time.Now(),
}
}

var unhealthyCollectors []string

c.mut.RLock()
for _, collector := range c.collectors {
if collector.Stopped() {
unhealthyCollectors = append(unhealthyCollectors, collector.Name())
}
}
c.mut.RUnlock()

if len(unhealthyCollectors) > 0 {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: "One or more collectors are unhealthy: [" + strings.Join(unhealthyCollectors, ", ") + "]",
UpdateTime: time.Now(),
}
}

return component.Health{
Health: component.HealthTypeHealthy,
Message: "All collectors are healthy",
UpdateTime: time.Now(),
}
}

// instanceKey returns network(hostname:port)/dbname of the MySQL server.
// This is the same key as used by the mysqld_exporter integration.
func (c *Component) instanceKey() string {
Expand Down

0 comments on commit 55d952e

Please sign in to comment.