diff --git a/main.go b/main.go index a1ccc789..0c2bb780 100644 --- a/main.go +++ b/main.go @@ -131,7 +131,6 @@ func run(c *cli.Context) error { metricsConfig.ServerTLSConfig = config.ServerTLSConfig go metrics.Serve(ctx, metricsConfig) - config.MetricsRegisterer = metrics.Registry _, err := endpoint.Listen(ctx, config) if err != nil { return err diff --git a/pkg/drivers/dqlite/dqlite.go b/pkg/drivers/dqlite/dqlite.go index 9c7602dd..a5b68cf9 100644 --- a/pkg/drivers/dqlite/dqlite.go +++ b/pkg/drivers/dqlite/dqlite.go @@ -69,7 +69,7 @@ outer: return nil } -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { opts, err := parseOpts(datasourceName) if err != nil { return nil, err diff --git a/pkg/drivers/dqlite/no_dqlite.go b/pkg/drivers/dqlite/no_dqlite.go index 751e9d78..b0f48a41 100644 --- a/pkg/drivers/dqlite/no_dqlite.go +++ b/pkg/drivers/dqlite/no_dqlite.go @@ -9,9 +9,8 @@ import ( "github.com/k3s-io/kine/pkg/drivers/generic" "github.com/k3s-io/kine/pkg/server" - "github.com/prometheus/client_golang/prometheus" ) -func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { +func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`) } diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index ef206f56..24f01c25 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -16,7 +16,6 @@ import ( "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/util" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/sirupsen/logrus" ) @@ -171,7 +170,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) { return db, nil } -func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool, metricsRegisterer prometheus.Registerer) (*Generic, error) { +func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) { var ( db *sql.DB err error @@ -193,8 +192,8 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig configureConnectionPooling(connPoolConfig, db, driverName) - if metricsRegisterer != nil { - metricsRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine")) + if metrics.DefaultRegisterer != nil { + metrics.DefaultRegisterer.MustRegister(collectors.NewDBStatsCollector(db, "kine")) } return &Generic{ diff --git a/pkg/drivers/mysql/mysql.go b/pkg/drivers/mysql/mysql.go index f9745630..af935be6 100644 --- a/pkg/drivers/mysql/mysql.go +++ b/pkg/drivers/mysql/mysql.go @@ -9,7 +9,6 @@ import ( "strconv" "github.com/go-sql-driver/mysql" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "github.com/k3s-io/kine/pkg/drivers/generic" @@ -52,7 +51,7 @@ var ( createDB = "CREATE DATABASE IF NOT EXISTS " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err @@ -71,7 +70,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false, metricsRegisterer) + dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false) if err != nil { return nil, err } diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 7928d3b3..6fc2b91a 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -19,7 +19,6 @@ import ( "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/k3s-io/kine/pkg/util" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -53,7 +52,7 @@ var ( createDB = "CREATE DATABASE " ) -func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { +func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { parsedDSN, err := prepareDSN(dataSourceName, tlsInfo) if err != nil { return nil, err @@ -63,7 +62,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo return nil, err } - dialect, err := generic.Open(ctx, "pgx", parsedDSN, connPoolConfig, "$", true, metricsRegisterer) + dialect, err := generic.Open(ctx, "pgx", parsedDSN, connPoolConfig, "$", true) if err != nil { return nil, err } diff --git a/pkg/drivers/sqlite/sqlite.go b/pkg/drivers/sqlite/sqlite.go index 1313b298..0f349856 100644 --- a/pkg/drivers/sqlite/sqlite.go +++ b/pkg/drivers/sqlite/sqlite.go @@ -17,7 +17,6 @@ import ( "github.com/k3s-io/kine/pkg/util" "github.com/mattn/go-sqlite3" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" // sqlite db driver @@ -47,12 +46,12 @@ var ( } ) -func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { - backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer) +func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) { + backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig) return backend, err } -func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { +func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) { if dataSourceName == "" { if err := os.MkdirAll("./db", 0700); err != nil { return nil, nil, err @@ -60,7 +59,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool dataSourceName = "./db/state.db?_journal=WAL&cache=shared&_busy_timeout=30000" } - dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer) + dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false) if err != nil { return nil, nil, err } diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 81dedecd..701dde3b 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -14,11 +14,9 @@ import ( "github.com/k3s-io/kine/pkg/drivers/nats" "github.com/k3s-io/kine/pkg/drivers/pgsql" "github.com/k3s-io/kine/pkg/drivers/sqlite" - "github.com/k3s-io/kine/pkg/metrics" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" "google.golang.org/grpc" @@ -44,7 +42,6 @@ type Config struct { ConnectionPoolConfig generic.ConnectionPoolConfig ServerTLSConfig tls.Config BackendTLSConfig tls.Config - MetricsRegisterer prometheus.Registerer NotifyInterval time.Duration EmulatedETCDVersion string } @@ -70,14 +67,6 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { return ETCDConfig{}, errors.Wrap(err, "building kine") } - if config.MetricsRegisterer != nil { - config.MetricsRegisterer.MustRegister( - metrics.SQLTotal, - metrics.SQLTime, - metrics.CompactTotal, - ) - } - if err := backend.Start(ctx); err != nil { return ETCDConfig{}, errors.Wrap(err, "starting kine backend") } @@ -214,13 +203,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) switch driver { case SQLiteBackend: leaderElect = false - backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) + backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) case DQLiteBackend: - backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) + backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig) case PostgresBackend: - backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) + backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) case MySQLBackend: - backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) + backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig) case JetStreamBackend: backend, err = nats.NewLegacy(ctx, dsn, cfg.BackendTLSConfig) case NATSBackend: diff --git a/pkg/metrics/mvcc.go b/pkg/metrics/mvcc.go new file mode 100644 index 00000000..f728d736 --- /dev/null +++ b/pkg/metrics/mvcc.go @@ -0,0 +1,293 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// mvcc metrics cribbed from https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/metrics.go +var ( + RangeCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "range_total", + Help: "Total number of ranges seen by this member.", + }) + + PutCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "put_total", + Help: "Total number of puts seen by this member.", + }) + + DeleteCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "delete_total", + Help: "Total number of deletes seen by this member.", + }) + + TxnCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "txn_total", + Help: "Total number of txns seen by this member.", + }) + + KeysGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "keys_total", + Help: "Total number of keys.", + }) + + WatchStreamGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "watch_stream_total", + Help: "Total number of watch streams.", + }) + + WatcherGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "watcher_total", + Help: "Total number of watchers.", + }) + + SlowWatcherGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "slow_watcher_total", + Help: "Total number of unsynced slow watchers.", + }) + + TotalEventsCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "events_total", + Help: "Total number of events sent by this member.", + }) + + PendingEventsGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "pending_events_total", + Help: "Total number of pending events to be sent.", + }) + + IndexCompactionPauseMs = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "index_compaction_pause_duration_milliseconds", + Help: "Bucketed histogram of index compaction pause duration.", + + // lowest bucket start of upper bound 0.5 ms with factor 2 + // highest bucket start of 0.5 ms * 2^13 == 4.096 sec + Buckets: prometheus.ExponentialBuckets(0.5, 2, 14), + }) + + DbCompactionPauseMs = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "db_compaction_pause_duration_milliseconds", + Help: "Bucketed histogram of db compaction pause duration.", + + // lowest bucket start of upper bound 1 ms with factor 2 + // highest bucket start of 1 ms * 2^12 == 4.096 sec + Buckets: prometheus.ExponentialBuckets(1, 2, 13), + }) + + DbCompactionTotalMs = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "db_compaction_total_duration_milliseconds", + Help: "Bucketed histogram of db compaction total duration.", + + // lowest bucket start of upper bound 100 ms with factor 2 + // highest bucket start of 100 ms * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(100, 2, 14), + }) + + DbCompactionLast = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "db_compaction_last", + Help: "The unix time of the last db compaction. Resets to 0 on start.", + }) + + DbCompactionKeysCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "db_compaction_keys_total", + Help: "Total number of db keys compacted.", + }) + + DbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "db_total_size_in_bytes", + Help: "Total size of the underlying database physically allocated in bytes.", + }, + func() float64 { + ReportDbTotalSizeInBytesMu.RLock() + defer ReportDbTotalSizeInBytesMu.RUnlock() + return ReportDbTotalSizeInBytes() + }, + ) + // overridden by mvcc initialization + ReportDbTotalSizeInBytesMu sync.RWMutex + ReportDbTotalSizeInBytes = func() float64 { return 0 } + + DbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "db_total_size_in_use_in_bytes", + Help: "Total size of the underlying database logically in use in bytes.", + }, + func() float64 { + ReportDbTotalSizeInUseInBytesMu.RLock() + defer ReportDbTotalSizeInUseInBytesMu.RUnlock() + return ReportDbTotalSizeInUseInBytes() + }, + ) + // overridden by mvcc initialization + ReportDbTotalSizeInUseInBytesMu sync.RWMutex + ReportDbTotalSizeInUseInBytes = func() float64 { return 0 } + + DbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "db_open_read_transactions", + Help: "The number of currently open read transactions", + }, + + func() float64 { + ReportDbOpenReadTxNMu.RLock() + defer ReportDbOpenReadTxNMu.RUnlock() + return ReportDbOpenReadTxN() + }, + ) + // overridden by mvcc initialization + ReportDbOpenReadTxNMu sync.RWMutex + ReportDbOpenReadTxN = func() float64 { return 0 } + + CurrentRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "current_revision", + Help: "The current revision of store.", + }, + func() float64 { + ReportCurrentRevMu.RLock() + defer ReportCurrentRevMu.RUnlock() + return ReportCurrentRev() + }, + ) + // overridden by mvcc initialization + ReportCurrentRevMu sync.RWMutex + ReportCurrentRev = func() float64 { return 0 } + + CompactRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "compact_revision", + Help: "The revision of the last compaction in store.", + }, + func() float64 { + ReportCompactRevMu.RLock() + defer ReportCompactRevMu.RUnlock() + return ReportCompactRev() + }, + ) + // overridden by mvcc initialization + ReportCompactRevMu sync.RWMutex + ReportCompactRev = func() float64 { return 0 } + + TotalPutSizeGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "total_put_size_in_bytes", + Help: "The total size of put kv pairs seen by this member.", + }) +) + +// server metrics cribbed from https://github.com/etcd-io/etcd/blob/v3.5.11/server/etcdserver/metrics.go +var ( + HasLeader = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "has_leader", + Help: "Whether or not a leader exists. 1 is existence, 0 is not.", + }) + + IsLeader = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "is_leader", + Help: "Whether or not this member is a leader. 1 if is, 0 otherwise.", + }) + + IsLearner = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "is_learner", + Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.", + }) + + CurrentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "version", + Help: "Which version is running. 1 for 'server_version' label with current version.", + }, + []string{"server_version"}) + + CurrentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "go_version", + Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.", + }, + []string{"server_go_version"}) + + ServerID = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "id", + Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.", + }, + []string{"server_id"}) +) diff --git a/pkg/metrics/registry.go b/pkg/metrics/registry.go index 159c457f..9c588d5e 100644 --- a/pkg/metrics/registry.go +++ b/pkg/metrics/registry.go @@ -2,21 +2,54 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" ) -type RegistererGatherer interface { - prometheus.Registerer - prometheus.Gatherer -} - -var Registry RegistererGatherer = prometheus.NewRegistry() +var ( + // DefaultRegisterer and DefaultGatherer are the implementations of the + // prometheus Registerer and Gatherer interfaces that all metrics operations + // will use. They are variables so that packages that embed this library can + // replace them at runtime, instead of having to pass around specific + // registries. + DefaultRegisterer = prometheus.DefaultRegisterer + DefaultGatherer = prometheus.DefaultGatherer +) -func init() { - Registry.MustRegister( - // expose process metrics like CPU, Memory, file descriptor usage etc. - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - // expose Go runtime metrics like GC stats, memory stats etc. - collectors.NewGoCollector(), - ) +func Register() { + if DefaultRegisterer != nil { + DefaultRegisterer.MustRegister( + // kine sql metrics + SQLTotal, + SQLTime, + CompactTotal, + // etcd mvcc metrics + RangeCounter, + PutCounter, + DeleteCounter, + TxnCounter, + KeysGauge, + WatchStreamGauge, + WatcherGauge, + SlowWatcherGauge, + TotalEventsCounter, + PendingEventsGauge, + IndexCompactionPauseMs, + DbCompactionPauseMs, + DbCompactionTotalMs, + DbCompactionLast, + DbCompactionKeysCounter, + DbTotalSize, + DbTotalSizeInUse, + DbOpenReadTxN, + CurrentRev, + CompactRev, + TotalPutSizeGauge, + // etcd server metrics + HasLeader, + IsLeader, + IsLearner, + CurrentVersion, + CurrentGoVersion, + ServerID, + ) + } } diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go index 348ea740..0a9180e8 100644 --- a/pkg/metrics/server.go +++ b/pkg/metrics/server.go @@ -27,6 +27,7 @@ func Serve(ctx context.Context, config Config) { config.ServerAddress = defaultBindAddress } if config.ServerAddress == "0" { + DefaultRegisterer = nil return } @@ -36,7 +37,8 @@ func Serve(ctx context.Context, config Config) { logrus.Fatalf("error creating the metrics listener: %v", err) } - handler := promhttp.HandlerFor(Registry, promhttp.HandlerOpts{ + Register() + handler := promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) mux := http.NewServeMux()