From d1dcd7f3bfd86883a9954a9c4d33d85a19d6fdc5 Mon Sep 17 00:00:00 2001 From: Chris Marslender Date: Tue, 5 Mar 2024 21:06:58 -0600 Subject: [PATCH] ASN Support (#22) * Update readme for country + ASN db support. Change flag to include country in the name * Support maxmind ASN DB * Update readme with ASN/MySQL Note * Store ASN data to mysql, if enabled --- cmd/root.go | 28 ++++++- go.mod | 1 + go.sum | 2 + internal/metrics/crawler.go | 157 +++++++++++++++++++++++++++++++---- internal/metrics/database.go | 33 ++++++++ internal/metrics/metrics.go | 40 ++++++++- readme.md | 10 ++- 7 files changed, 248 insertions(+), 23 deletions(-) create mode 100644 internal/metrics/database.go diff --git a/cmd/root.go b/cmd/root.go index 54b5aba..a5ce992 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,11 +31,19 @@ func init() { var ( hostname string metricsPort int - maxmindDBPath string + maxmindCountryDBPath string + maxmindASNDBPath string logLevel string requestTimeout time.Duration disableCentralHarvesterCollection bool logBlockTimes bool + + mysqlHost string + mysqlPort uint16 + mysqlUser string + mysqlPass string + mysqlDBName string + mysqlBatchSize uint32 ) cobra.OnInitialize(initConfig) @@ -43,19 +51,33 @@ func init() { rootCmd.PersistentFlags().StringVar(&hostname, "hostname", "localhost", "The hostname to connect to") rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", 9914, "The port the metrics server binds to") - rootCmd.PersistentFlags().StringVar(&maxmindDBPath, "maxmind-db-path", "", "Path to the maxmind database file") + rootCmd.PersistentFlags().StringVar(&maxmindCountryDBPath, "maxmind-country-db-path", "", "Path to the maxmind country database file") + rootCmd.PersistentFlags().StringVar(&maxmindASNDBPath, "maxmind-asn-db-path", "", "Path to the maxmind ASN database file") rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "How verbose the logs should be. panic, fatal, error, warn, info, debug, trace") rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out") rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters") rootCmd.PersistentFlags().BoolVar(&logBlockTimes, "log-block-times", false, "Enables logging of block (pre)validation times to log files.") + rootCmd.PersistentFlags().StringVar(&mysqlHost, "mysql-host", "127.0.0.1", "MySQL host for metrics that get stored to a DB") + rootCmd.PersistentFlags().Uint16Var(&mysqlPort, "mysql-port", 3306, "Port of the MySQL database") + rootCmd.PersistentFlags().StringVar(&mysqlUser, "mysql-user", "root", "The username for the MySQL Database") + rootCmd.PersistentFlags().StringVar(&mysqlPass, "mysql-password", "password", "The password for the MySQL Database") + rootCmd.PersistentFlags().StringVar(&mysqlDBName, "mysql-db-name", "chia-exporter", "The database in MySQL to use for metrics") + rootCmd.PersistentFlags().Uint32Var(&mysqlBatchSize, "mysql-batch-size", 250, "How many records will be batched into a single insert to MySQL") cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname"))) cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port"))) - cobra.CheckErr(viper.BindPFlag("maxmind-db-path", rootCmd.PersistentFlags().Lookup("maxmind-db-path"))) + cobra.CheckErr(viper.BindPFlag("maxmind-country-db-path", rootCmd.PersistentFlags().Lookup("maxmind-country-db-path"))) + cobra.CheckErr(viper.BindPFlag("maxmind-asn-db-path", rootCmd.PersistentFlags().Lookup("maxmind-asn-db-path"))) cobra.CheckErr(viper.BindPFlag("log-level", rootCmd.PersistentFlags().Lookup("log-level"))) cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout"))) cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection"))) cobra.CheckErr(viper.BindPFlag("log-block-times", rootCmd.PersistentFlags().Lookup("log-block-times"))) + cobra.CheckErr(viper.BindPFlag("mysql-host", rootCmd.PersistentFlags().Lookup("mysql-host"))) + cobra.CheckErr(viper.BindPFlag("mysql-port", rootCmd.PersistentFlags().Lookup("mysql-port"))) + cobra.CheckErr(viper.BindPFlag("mysql-user", rootCmd.PersistentFlags().Lookup("mysql-user"))) + cobra.CheckErr(viper.BindPFlag("mysql-password", rootCmd.PersistentFlags().Lookup("mysql-password"))) + cobra.CheckErr(viper.BindPFlag("mysql-db-name", rootCmd.PersistentFlags().Lookup("mysql-db-name"))) + cobra.CheckErr(viper.BindPFlag("mysql-batch-size", rootCmd.PersistentFlags().Lookup("mysql-batch-size"))) } // initConfig reads in config file and ENV variables if set. diff --git a/go.mod b/go.mod index 2cc9eeb..283cefa 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/chia-network/go-chia-libs v0.5.3 github.com/chia-network/go-modules v0.0.4 + github.com/go-sql-driver/mysql v1.7.1 github.com/oschwald/maxminddb-golang v1.12.0 github.com/prometheus/client_golang v1.19.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 57b22e1..0409163 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/internal/metrics/crawler.go b/internal/metrics/crawler.go index cf74e2a..3ff04ac 100644 --- a/internal/metrics/crawler.go +++ b/internal/metrics/crawler.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net" + "strings" "time" log "github.com/sirupsen/logrus" @@ -28,7 +29,8 @@ type CrawlerServiceMetrics struct { metrics *Metrics // Interfaces with Maxmind - maxMindDB *maxminddb.Reader + maxMindCountryDB *maxminddb.Reader + maxMindASNDB *maxminddb.Reader // Crawler Metrics totalNodes5Days *wrappedPrometheus.LazyGauge @@ -55,22 +57,44 @@ func (s *CrawlerServiceMetrics) InitMetrics() { // Debug Metric s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"}) - err := s.initMaxmindDB() + err := s.initMaxmindCountryDB() if err != nil { // Continue on maxmind error - optional/not critical functionality - log.Errorf("Error initializing maxmind DB: %s\n", err.Error()) + log.Printf("Error initializing maxmind country DB: %s\n", err.Error()) + } + + err = s.initMaxmindASNDB() + if err != nil { + // Continue on maxmind error - optional/not critical functionality + log.Printf("Error initializing maxmind ASN DB: %s\n", err.Error()) } } -// initMaxmindDB loads the maxmind DB if the file is present +// initMaxmindCountryDB loads the maxmind country DB if the file is present // If the DB is not present, ip/country mapping is skipped -func (s *CrawlerServiceMetrics) initMaxmindDB() error { +func (s *CrawlerServiceMetrics) initMaxmindCountryDB() error { + var err error + dbPath := viper.GetString("maxmind-country-db-path") + if dbPath == "" { + return nil + } + s.maxMindCountryDB, err = maxminddb.Open(dbPath) + if err != nil { + return err + } + + return nil +} + +// initMaxmindASNDB loads the maxmind ASN DB if the file is present +// If the DB is not present, ip/ASN mapping is skipped +func (s *CrawlerServiceMetrics) initMaxmindASNDB() error { var err error - dbPath := viper.GetString("maxmind-db-path") + dbPath := viper.GetString("maxmind-asn-db-path") if dbPath == "" { return nil } - s.maxMindDB, err = maxminddb.Open(dbPath) + s.maxMindASNDB, err = maxminddb.Open(dbPath) if err != nil { return err } @@ -134,15 +158,16 @@ func (s *CrawlerServiceMetrics) GetPeerCounts(resp *types.WebsocketResponse) { s.versionBuckets.WithLabelValues(version).Set(float64(count)) } - s.StartIPCountryMapping(peerCounts.TotalLast5Days) + s.StartIPMapping(peerCounts.TotalLast5Days) } } -// StartIPCountryMapping starts the process to fetch current IPs from the crawler -// and maps them to countries using maxmind +// StartIPMapping starts the process to fetch current IPs from the crawler +// when a response is received, the IPs are mapped to countries and/or ASNs using maxmind, if databases are provided // Updates metrics value once all pages have been received -func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) { - if s.maxMindDB == nil { +func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) { + // If we don't have either maxmind DB, bail now + if s.maxMindCountryDB == nil && s.maxMindASNDB == nil { return } @@ -151,7 +176,7 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) { return } - log.Println("Requesting IP addresses from the past 5 days for country mapping...") + log.Println("Requesting IP addresses from the past 5 days for country and/or ASN mapping...") ipsAfterTimestamp, _, err := s.metrics.httpClient.CrawlerService.GetIPsAfterTimestamp(&rpc.GetIPsAfterTimestampOptions{ After: time.Now().Add(-5 * time.Hour * 24).Unix(), @@ -168,7 +193,8 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) { // GetIPsAfterTimestamp processes a response of IPs seen since a timestamp // Currently assumes all IPs will be in one response func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimestampResponse) { - if s.maxMindDB == nil { + // If we don't have either maxmind DB, bail now + if s.maxMindCountryDB == nil && s.maxMindASNDB == nil { return } @@ -176,6 +202,12 @@ func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimesta return } + s.ProcessIPCountryMapping(ips) + s.ProcessIPASNMapping(ips) +} + +// ProcessIPCountryMapping Processes the list of IPs to countries +func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTimestampResponse) { type countStruct struct { ISOCode string Name string @@ -210,6 +242,73 @@ func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimesta } } +// ProcessIPASNMapping Processes the list of IPs to ASNs +func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) { + // Don't process if we can't store + if s.metrics.mysqlClient == nil { + return + } + type countStruct struct { + ASN uint32 + Organization string + Count uint32 + } + asnCounts := map[uint32]*countStruct{} + + if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult { + for _, ip := range ipresult { + asn, err := s.GetASNForIP(ip) + if err != nil { + continue + } + + if _, ok := asnCounts[asn.AutonomousSystemNumber]; !ok { + asnCounts[asn.AutonomousSystemNumber] = &countStruct{ + ASN: asn.AutonomousSystemNumber, + Organization: asn.AutonomousSystemOrganization, + Count: 0, + } + } + + asnCounts[asn.AutonomousSystemNumber].Count++ + } + } + + err := s.metrics.DeleteASNRecords() + if err != nil { + log.Errorf("unable to delete old ASN records from the database: %s\n", err.Error()) + return + } + + batchSize := viper.GetUint32("mysql-batch-size") + var valueStrings []string + var valueArgs []interface{} + + for i, asnData := range asnCounts { + if asnData.ASN == 0 { + continue + } + + valueStrings = append(valueStrings, "(?, ?, ?)") + valueArgs = append(valueArgs, asnData.ASN, asnData.Organization, asnData.Count) + + // Execute the batch insert when reaching the batch size or the end of the slice + if (i+1)%batchSize == 0 || i+1 == uint32(len(asnCounts)) { + _, err := s.metrics.mysqlClient.Exec( + fmt.Sprintf("INSERT INTO asn(asn, organization, count) VALUES %s", strings.Join(valueStrings, ",")), + valueArgs...) + + if err != nil { + log.Errorf("error inserting ASN record to mysql for asn:%d count:%d error: %s\n", asnData.ASN, asnData.Count, err.Error()) + } + + // Reset the slices for the next batch + valueStrings = []string{} + valueArgs = []interface{}{} + } + } +} + // CountryRecord record of a country from maxmind type CountryRecord struct { Country Country `maxminddb:"country"` @@ -223,15 +322,39 @@ type Country struct { // GetCountryForIP Gets country data for an ip address func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, error) { - if s.maxMindDB == nil { - return nil, fmt.Errorf("maxmind not initialized") + if s.maxMindCountryDB == nil { + return nil, fmt.Errorf("maxmind country DB not initialized") } ip := net.ParseIP(ipStr) record := &CountryRecord{} - err := s.maxMindDB.Lookup(ip, record) + err := s.maxMindCountryDB.Lookup(ip, record) + if err != nil { + return nil, err + } + + return record, nil +} + +// ASNRecord record of a country from maxmind +type ASNRecord struct { + AutonomousSystemNumber uint32 `maxminddb:"autonomous_system_number"` + AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"` +} + +// GetASNForIP Gets ASN data for an ip address +func (s *CrawlerServiceMetrics) GetASNForIP(ipStr string) (*ASNRecord, error) { + if s.maxMindASNDB == nil { + return nil, fmt.Errorf("maxmind ASN DB not initialized") + } + + ip := net.ParseIP(ipStr) + + record := &ASNRecord{} + + err := s.maxMindASNDB.Lookup(ip, &record) if err != nil { return nil, err } diff --git a/internal/metrics/database.go b/internal/metrics/database.go new file mode 100644 index 0000000..b25300a --- /dev/null +++ b/internal/metrics/database.go @@ -0,0 +1,33 @@ +package metrics + +// initTables ensures that the tables required exist and have the correct columns present +func (m *Metrics) initTables() error { + if m.mysqlClient == nil { + return nil + } + query := "CREATE TABLE IF NOT EXISTS `asn` (" + + " `asn` int unsigned NOT NULL," + + " `organization` VARCHAR(255) NOT NULL," + + " `count` int unsigned NOT NULL," + + "UNIQUE KEY `asn-unique` (`asn`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;" + + result, err := m.mysqlClient.Query(query) + if err != nil { + return err + } + return result.Close() +} + +// DeleteASNRecords deletes all records from the asn table in the database +func (m *Metrics) DeleteASNRecords() error { + if m.mysqlClient == nil { + return nil + } + query := "DELETE from asn;" + result, err := m.mysqlClient.Query(query) + if err != nil { + return err + } + return result.Close() +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f1a2184..6167d7a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,11 +1,14 @@ package metrics import ( + "database/sql" "encoding/json" "fmt" "net/http" "net/url" + "time" + "github.com/go-sql-driver/mysql" log "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -64,6 +67,9 @@ type Metrics struct { // This holds a custom prometheus registry so that only our metrics are exported, and not the default go metrics registry *prometheus.Registry + // Holds a MySQL DB Instance if configured + mysqlClient *sql.DB + // All the serviceMetrics interfaces that are registered serviceMetrics map[chiaService]serviceMetrics } @@ -99,8 +105,17 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) { log.Errorf("Error creating http client: %s\n", err.Error()) } - // Register each service's metrics + err = metrics.createDBClient() + if err != nil { + log.Debugf("ERROR creating MySQL Client. Will not store any metrics to MySQL") + } + err = metrics.initTables() + if err != nil { + log.Debugf("ERROR ensuring tables exist in MySQL. Will not process or store any MySQL only metrics") + metrics.mysqlClient = nil + } + // Register each service's metrics metrics.serviceMetrics[chiaServiceFullNode] = &FullNodeServiceMetrics{metrics: metrics} metrics.serviceMetrics[chiaServiceWallet] = &WalletServiceMetrics{metrics: metrics} metrics.serviceMetrics[chiaServiceCrawler] = &CrawlerServiceMetrics{metrics: metrics} @@ -116,6 +131,29 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) { return metrics, nil } +func (m *Metrics) createDBClient() error { + var err error + + cfg := mysql.Config{ + User: viper.GetString("mysql-user"), + Passwd: viper.GetString("mysql-password"), + Net: "tcp", + Addr: fmt.Sprintf("%s:%d", viper.GetString("mysql-host"), viper.GetUint16("mysql-port")), + DBName: viper.GetString("mysql-db-name"), + AllowNativePasswords: true, + } + m.mysqlClient, err = sql.Open("mysql", cfg.FormatDSN()) + if err != nil { + return err + } + + m.mysqlClient.SetConnMaxLifetime(time.Minute * 3) + m.mysqlClient.SetMaxOpenConns(10) + m.mysqlClient.SetMaxIdleConns(10) + + return nil +} + // newGauge returns a lazy gauge that follows naming conventions func (m *Metrics) newGauge(service chiaService, name string, help string) *wrappedPrometheus.LazyGauge { opts := prometheus.GaugeOpts{ diff --git a/readme.md b/readme.md index 1664e49..3f26551 100644 --- a/readme.md +++ b/readme.md @@ -84,6 +84,12 @@ To use a config file, create a new yaml file and place any configuration options metrics-port: 9914 ``` -## Country Data +## Country and ASN Data -When running alongside the crawler, the exporter can optionally export metrics indicating how many peers have been discovered in each country, based on IP address. To enable this functionality, you will need to download the MaxMind GeoLite2 Country database and provide the path to the MaxMind database to the exporter application. The path can be provided with a command line flag `--maxmind-db-path /path/to/GeoLite2-Country.mmdb`, an entry in the config yaml file `maxmind-db-path: /path/to/GeoLite2-Country.mmdb`, or an environment variable `CHIA_EXPORTER_MAXMIND_DB_PATH=/path/to/GeoLite2-Country.mmdb`. To gain access to the MaxMind DB, you can [register here](https://www.maxmind.com/en/geolite2/signup). +When running alongside the crawler, the exporter can optionally export metrics indicating how many peers have been discovered in each country (based on IP address) and/or how many peers are in each ASN. To enable this functionality, you will need to download the appropriate MaxMind GeoLite2 database and provide the path to the exporter application. The paths can be provided with command line flags, an entry in the config yaml file , or an environment variable . To gain access to the MaxMind DBs, you can [register here](https://www.maxmind.com/en/geolite2/signup). + +ASN data can cause a huge explosion of labels in prometheus, and as such, it is not recommended to export ASN data to prometheus. We support writing ASN data to MySQL as another supported Grafana datasource instead. + +`--maxmind-country-db-path /path/to/GeoLite2-Country.mmdb` +`maxmind-country-db-path: /path/to/GeoLite2-Country.mmdb` +`CHIA_EXPORTER_MAXMIND_COUNTRY_DB_PATH=/path/to/GeoLite2-Country.mmdb`