From 5421151e108ebed002018149c58ef3b9e9087954 Mon Sep 17 00:00:00 2001 From: Chris Marslender Date: Tue, 5 Mar 2024 20:43:19 -0600 Subject: [PATCH] Store ASN data to mysql, if enabled --- cmd/root.go | 19 ++++++++++++++ go.mod | 1 + go.sum | 2 ++ internal/metrics/crawler.go | 51 +++++++++++++++++++++++++++++------- internal/metrics/database.go | 33 +++++++++++++++++++++++ internal/metrics/metrics.go | 40 +++++++++++++++++++++++++++- 6 files changed, 136 insertions(+), 10 deletions(-) create mode 100644 internal/metrics/database.go diff --git a/cmd/root.go b/cmd/root.go index 3988166..a5ce992 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -37,6 +37,13 @@ func init() { requestTimeout time.Duration disableCentralHarvesterCollection bool logBlockTimes bool + + mysqlHost string + mysqlPort uint16 + mysqlUser string + mysqlPass string + mysqlDBName string + mysqlBatchSize uint32 ) cobra.OnInitialize(initConfig) @@ -50,6 +57,12 @@ func init() { rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out") rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters") rootCmd.PersistentFlags().BoolVar(&logBlockTimes, "log-block-times", false, "Enables logging of block (pre)validation times to log files.") + rootCmd.PersistentFlags().StringVar(&mysqlHost, "mysql-host", "127.0.0.1", "MySQL host for metrics that get stored to a DB") + rootCmd.PersistentFlags().Uint16Var(&mysqlPort, "mysql-port", 3306, "Port of the MySQL database") + rootCmd.PersistentFlags().StringVar(&mysqlUser, "mysql-user", "root", "The username for the MySQL Database") + rootCmd.PersistentFlags().StringVar(&mysqlPass, "mysql-password", "password", "The password for the MySQL Database") + rootCmd.PersistentFlags().StringVar(&mysqlDBName, "mysql-db-name", "chia-exporter", "The database in MySQL to use for metrics") + rootCmd.PersistentFlags().Uint32Var(&mysqlBatchSize, "mysql-batch-size", 250, "How many records will be batched into a single insert to MySQL") cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname"))) cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port"))) @@ -59,6 +72,12 @@ func init() { cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout"))) cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection"))) cobra.CheckErr(viper.BindPFlag("log-block-times", rootCmd.PersistentFlags().Lookup("log-block-times"))) + cobra.CheckErr(viper.BindPFlag("mysql-host", rootCmd.PersistentFlags().Lookup("mysql-host"))) + cobra.CheckErr(viper.BindPFlag("mysql-port", rootCmd.PersistentFlags().Lookup("mysql-port"))) + cobra.CheckErr(viper.BindPFlag("mysql-user", rootCmd.PersistentFlags().Lookup("mysql-user"))) + cobra.CheckErr(viper.BindPFlag("mysql-password", rootCmd.PersistentFlags().Lookup("mysql-password"))) + cobra.CheckErr(viper.BindPFlag("mysql-db-name", rootCmd.PersistentFlags().Lookup("mysql-db-name"))) + cobra.CheckErr(viper.BindPFlag("mysql-batch-size", rootCmd.PersistentFlags().Lookup("mysql-batch-size"))) } // initConfig reads in config file and ENV variables if set. diff --git a/go.mod b/go.mod index 2cc9eeb..283cefa 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/chia-network/go-chia-libs v0.5.3 github.com/chia-network/go-modules v0.0.4 + github.com/go-sql-driver/mysql v1.7.1 github.com/oschwald/maxminddb-golang v1.12.0 github.com/prometheus/client_golang v1.19.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 57b22e1..0409163 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/internal/metrics/crawler.go b/internal/metrics/crawler.go index d4e7736..3ff04ac 100644 --- a/internal/metrics/crawler.go +++ b/internal/metrics/crawler.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net" + "strings" "time" log "github.com/sirupsen/logrus" @@ -38,7 +39,6 @@ type CrawlerServiceMetrics struct { ipv6Nodes5Days *wrappedPrometheus.LazyGauge versionBuckets *prometheus.GaugeVec countryNodeCountBuckets *prometheus.GaugeVec - asnNodeCountBuckets *prometheus.GaugeVec // Debug Metric debug *prometheus.GaugeVec @@ -53,7 +53,6 @@ func (s *CrawlerServiceMetrics) InitMetrics() { s.ipv6Nodes5Days = s.metrics.newGauge(chiaServiceCrawler, "ipv6_nodes_5_days", "Total number of IPv6 nodes that have been gossiped around the network with a timestamp in the last 5 days. The crawler did not necessarily connect to all of these peers itself.") s.versionBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "peer_version", "Number of peers for each version. Only peers the crawler was able to connect to are included here.", []string{"version"}) s.countryNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "country_node_count", "Number of peers gossiped in the last 5 days from each country.", []string{"country", "country_display"}) - s.asnNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "asn_node_count", "Number of peers gossiped in the last 5 days from each asn.", []string{"asn", "organization"}) // Debug Metric s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"}) @@ -177,7 +176,7 @@ func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) { return } - log.Println("Requesting IP addresses from the past 5 days for country mapping...") + log.Println("Requesting IP addresses from the past 5 days for country and/or ASN mapping...") ipsAfterTimestamp, _, err := s.metrics.httpClient.CrawlerService.GetIPsAfterTimestamp(&rpc.GetIPsAfterTimestampOptions{ After: time.Now().Add(-5 * time.Hour * 24).Unix(), @@ -245,12 +244,16 @@ func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTime // ProcessIPASNMapping Processes the list of IPs to ASNs func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) { + // Don't process if we can't store + if s.metrics.mysqlClient == nil { + return + } type countStruct struct { - ASN int + ASN uint32 Organization string - Count float64 + Count uint32 } - asnCounts := map[int]*countStruct{} + asnCounts := map[uint32]*countStruct{} if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult { for _, ip := range ipresult { @@ -271,8 +274,38 @@ func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestam } } - for _, asnData := range asnCounts { - s.asnNodeCountBuckets.WithLabelValues(fmt.Sprintf("%d", asnData.ASN), asnData.Organization).Set(asnData.Count) + err := s.metrics.DeleteASNRecords() + if err != nil { + log.Errorf("unable to delete old ASN records from the database: %s\n", err.Error()) + return + } + + batchSize := viper.GetUint32("mysql-batch-size") + var valueStrings []string + var valueArgs []interface{} + + for i, asnData := range asnCounts { + if asnData.ASN == 0 { + continue + } + + valueStrings = append(valueStrings, "(?, ?, ?)") + valueArgs = append(valueArgs, asnData.ASN, asnData.Organization, asnData.Count) + + // Execute the batch insert when reaching the batch size or the end of the slice + if (i+1)%batchSize == 0 || i+1 == uint32(len(asnCounts)) { + _, err := s.metrics.mysqlClient.Exec( + fmt.Sprintf("INSERT INTO asn(asn, organization, count) VALUES %s", strings.Join(valueStrings, ",")), + valueArgs...) + + if err != nil { + log.Errorf("error inserting ASN record to mysql for asn:%d count:%d error: %s\n", asnData.ASN, asnData.Count, err.Error()) + } + + // Reset the slices for the next batch + valueStrings = []string{} + valueArgs = []interface{}{} + } } } @@ -307,7 +340,7 @@ func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, e // ASNRecord record of a country from maxmind type ASNRecord struct { - AutonomousSystemNumber int `maxminddb:"autonomous_system_number"` + AutonomousSystemNumber uint32 `maxminddb:"autonomous_system_number"` AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"` } diff --git a/internal/metrics/database.go b/internal/metrics/database.go new file mode 100644 index 0000000..b25300a --- /dev/null +++ b/internal/metrics/database.go @@ -0,0 +1,33 @@ +package metrics + +// initTables ensures that the tables required exist and have the correct columns present +func (m *Metrics) initTables() error { + if m.mysqlClient == nil { + return nil + } + query := "CREATE TABLE IF NOT EXISTS `asn` (" + + " `asn` int unsigned NOT NULL," + + " `organization` VARCHAR(255) NOT NULL," + + " `count` int unsigned NOT NULL," + + "UNIQUE KEY `asn-unique` (`asn`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;" + + result, err := m.mysqlClient.Query(query) + if err != nil { + return err + } + return result.Close() +} + +// DeleteASNRecords deletes all records from the asn table in the database +func (m *Metrics) DeleteASNRecords() error { + if m.mysqlClient == nil { + return nil + } + query := "DELETE from asn;" + result, err := m.mysqlClient.Query(query) + if err != nil { + return err + } + return result.Close() +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f1a2184..6167d7a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,11 +1,14 @@ package metrics import ( + "database/sql" "encoding/json" "fmt" "net/http" "net/url" + "time" + "github.com/go-sql-driver/mysql" log "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -64,6 +67,9 @@ type Metrics struct { // This holds a custom prometheus registry so that only our metrics are exported, and not the default go metrics registry *prometheus.Registry + // Holds a MySQL DB Instance if configured + mysqlClient *sql.DB + // All the serviceMetrics interfaces that are registered serviceMetrics map[chiaService]serviceMetrics } @@ -99,8 +105,17 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) { log.Errorf("Error creating http client: %s\n", err.Error()) } - // Register each service's metrics + err = metrics.createDBClient() + if err != nil { + log.Debugf("ERROR creating MySQL Client. Will not store any metrics to MySQL") + } + err = metrics.initTables() + if err != nil { + log.Debugf("ERROR ensuring tables exist in MySQL. Will not process or store any MySQL only metrics") + metrics.mysqlClient = nil + } + // Register each service's metrics metrics.serviceMetrics[chiaServiceFullNode] = &FullNodeServiceMetrics{metrics: metrics} metrics.serviceMetrics[chiaServiceWallet] = &WalletServiceMetrics{metrics: metrics} metrics.serviceMetrics[chiaServiceCrawler] = &CrawlerServiceMetrics{metrics: metrics} @@ -116,6 +131,29 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) { return metrics, nil } +func (m *Metrics) createDBClient() error { + var err error + + cfg := mysql.Config{ + User: viper.GetString("mysql-user"), + Passwd: viper.GetString("mysql-password"), + Net: "tcp", + Addr: fmt.Sprintf("%s:%d", viper.GetString("mysql-host"), viper.GetUint16("mysql-port")), + DBName: viper.GetString("mysql-db-name"), + AllowNativePasswords: true, + } + m.mysqlClient, err = sql.Open("mysql", cfg.FormatDSN()) + if err != nil { + return err + } + + m.mysqlClient.SetConnMaxLifetime(time.Minute * 3) + m.mysqlClient.SetMaxOpenConns(10) + m.mysqlClient.SetMaxIdleConns(10) + + return nil +} + // newGauge returns a lazy gauge that follows naming conventions func (m *Metrics) newGauge(service chiaService, name string, help string) *wrappedPrometheus.LazyGauge { opts := prometheus.GaugeOpts{