Skip to content

Commit

Permalink
Store ASN data to mysql, if enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
cmmarslender committed Mar 6, 2024
1 parent bb25cab commit 5421151
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 10 deletions.
19 changes: 19 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func init() {
requestTimeout time.Duration
disableCentralHarvesterCollection bool
logBlockTimes bool

mysqlHost string
mysqlPort uint16
mysqlUser string
mysqlPass string
mysqlDBName string
mysqlBatchSize uint32
)

cobra.OnInitialize(initConfig)
Expand All @@ -50,6 +57,12 @@ func init() {
rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out")
rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters")
rootCmd.PersistentFlags().BoolVar(&logBlockTimes, "log-block-times", false, "Enables logging of block (pre)validation times to log files.")
rootCmd.PersistentFlags().StringVar(&mysqlHost, "mysql-host", "127.0.0.1", "MySQL host for metrics that get stored to a DB")
rootCmd.PersistentFlags().Uint16Var(&mysqlPort, "mysql-port", 3306, "Port of the MySQL database")
rootCmd.PersistentFlags().StringVar(&mysqlUser, "mysql-user", "root", "The username for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlPass, "mysql-password", "password", "The password for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlDBName, "mysql-db-name", "chia-exporter", "The database in MySQL to use for metrics")
rootCmd.PersistentFlags().Uint32Var(&mysqlBatchSize, "mysql-batch-size", 250, "How many records will be batched into a single insert to MySQL")

cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname")))
cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port")))
Expand All @@ -59,6 +72,12 @@ func init() {
cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout")))
cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection")))
cobra.CheckErr(viper.BindPFlag("log-block-times", rootCmd.PersistentFlags().Lookup("log-block-times")))
cobra.CheckErr(viper.BindPFlag("mysql-host", rootCmd.PersistentFlags().Lookup("mysql-host")))
cobra.CheckErr(viper.BindPFlag("mysql-port", rootCmd.PersistentFlags().Lookup("mysql-port")))
cobra.CheckErr(viper.BindPFlag("mysql-user", rootCmd.PersistentFlags().Lookup("mysql-user")))
cobra.CheckErr(viper.BindPFlag("mysql-password", rootCmd.PersistentFlags().Lookup("mysql-password")))
cobra.CheckErr(viper.BindPFlag("mysql-db-name", rootCmd.PersistentFlags().Lookup("mysql-db-name")))
cobra.CheckErr(viper.BindPFlag("mysql-batch-size", rootCmd.PersistentFlags().Lookup("mysql-batch-size")))
}

// initConfig reads in config file and ENV variables if set.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/chia-network/go-chia-libs v0.5.3
github.com/chia-network/go-modules v0.0.4
github.com/go-sql-driver/mysql v1.7.1
github.com/oschwald/maxminddb-golang v1.12.0
github.com/prometheus/client_golang v1.19.0
github.com/sirupsen/logrus v1.9.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
51 changes: 42 additions & 9 deletions internal/metrics/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -38,7 +39,6 @@ type CrawlerServiceMetrics struct {
ipv6Nodes5Days *wrappedPrometheus.LazyGauge
versionBuckets *prometheus.GaugeVec
countryNodeCountBuckets *prometheus.GaugeVec
asnNodeCountBuckets *prometheus.GaugeVec

// Debug Metric
debug *prometheus.GaugeVec
Expand All @@ -53,7 +53,6 @@ func (s *CrawlerServiceMetrics) InitMetrics() {
s.ipv6Nodes5Days = s.metrics.newGauge(chiaServiceCrawler, "ipv6_nodes_5_days", "Total number of IPv6 nodes that have been gossiped around the network with a timestamp in the last 5 days. The crawler did not necessarily connect to all of these peers itself.")
s.versionBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "peer_version", "Number of peers for each version. Only peers the crawler was able to connect to are included here.", []string{"version"})
s.countryNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "country_node_count", "Number of peers gossiped in the last 5 days from each country.", []string{"country", "country_display"})
s.asnNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "asn_node_count", "Number of peers gossiped in the last 5 days from each asn.", []string{"asn", "organization"})

// Debug Metric
s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"})
Expand Down Expand Up @@ -177,7 +176,7 @@ func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) {
return
}

log.Println("Requesting IP addresses from the past 5 days for country mapping...")
log.Println("Requesting IP addresses from the past 5 days for country and/or ASN mapping...")

ipsAfterTimestamp, _, err := s.metrics.httpClient.CrawlerService.GetIPsAfterTimestamp(&rpc.GetIPsAfterTimestampOptions{
After: time.Now().Add(-5 * time.Hour * 24).Unix(),
Expand Down Expand Up @@ -245,12 +244,16 @@ func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTime

// ProcessIPASNMapping Processes the list of IPs to ASNs
func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) {
// Don't process if we can't store
if s.metrics.mysqlClient == nil {
return
}
type countStruct struct {
ASN int
ASN uint32
Organization string
Count float64
Count uint32
}
asnCounts := map[int]*countStruct{}
asnCounts := map[uint32]*countStruct{}

if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult {
for _, ip := range ipresult {
Expand All @@ -271,8 +274,38 @@ func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestam
}
}

for _, asnData := range asnCounts {
s.asnNodeCountBuckets.WithLabelValues(fmt.Sprintf("%d", asnData.ASN), asnData.Organization).Set(asnData.Count)
err := s.metrics.DeleteASNRecords()
if err != nil {
log.Errorf("unable to delete old ASN records from the database: %s\n", err.Error())
return
}

batchSize := viper.GetUint32("mysql-batch-size")
var valueStrings []string
var valueArgs []interface{}

for i, asnData := range asnCounts {
if asnData.ASN == 0 {
continue
}

valueStrings = append(valueStrings, "(?, ?, ?)")
valueArgs = append(valueArgs, asnData.ASN, asnData.Organization, asnData.Count)

// Execute the batch insert when reaching the batch size or the end of the slice
if (i+1)%batchSize == 0 || i+1 == uint32(len(asnCounts)) {
_, err := s.metrics.mysqlClient.Exec(
fmt.Sprintf("INSERT INTO asn(asn, organization, count) VALUES %s", strings.Join(valueStrings, ",")),
valueArgs...)

if err != nil {
log.Errorf("error inserting ASN record to mysql for asn:%d count:%d error: %s\n", asnData.ASN, asnData.Count, err.Error())
}

// Reset the slices for the next batch
valueStrings = []string{}
valueArgs = []interface{}{}
}
}
}

Expand Down Expand Up @@ -307,7 +340,7 @@ func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, e

// ASNRecord record of a country from maxmind
type ASNRecord struct {
AutonomousSystemNumber int `maxminddb:"autonomous_system_number"`
AutonomousSystemNumber uint32 `maxminddb:"autonomous_system_number"`
AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"`
}

Expand Down
33 changes: 33 additions & 0 deletions internal/metrics/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metrics

// initTables ensures that the tables required exist and have the correct columns present
func (m *Metrics) initTables() error {
if m.mysqlClient == nil {
return nil
}
query := "CREATE TABLE IF NOT EXISTS `asn` (" +
" `asn` int unsigned NOT NULL," +
" `organization` VARCHAR(255) NOT NULL," +
" `count` int unsigned NOT NULL," +
"UNIQUE KEY `asn-unique` (`asn`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;"

result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}

// DeleteASNRecords deletes all records from the asn table in the database
func (m *Metrics) DeleteASNRecords() error {
if m.mysqlClient == nil {
return nil
}
query := "DELETE from asn;"
result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}
40 changes: 39 additions & 1 deletion internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package metrics

import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"

Expand Down Expand Up @@ -64,6 +67,9 @@ type Metrics struct {
// This holds a custom prometheus registry so that only our metrics are exported, and not the default go metrics
registry *prometheus.Registry

// Holds a MySQL DB Instance if configured
mysqlClient *sql.DB

// All the serviceMetrics interfaces that are registered
serviceMetrics map[chiaService]serviceMetrics
}
Expand Down Expand Up @@ -99,8 +105,17 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) {
log.Errorf("Error creating http client: %s\n", err.Error())
}

// Register each service's metrics
err = metrics.createDBClient()
if err != nil {
log.Debugf("ERROR creating MySQL Client. Will not store any metrics to MySQL")
}
err = metrics.initTables()
if err != nil {
log.Debugf("ERROR ensuring tables exist in MySQL. Will not process or store any MySQL only metrics")
metrics.mysqlClient = nil
}

// Register each service's metrics
metrics.serviceMetrics[chiaServiceFullNode] = &FullNodeServiceMetrics{metrics: metrics}
metrics.serviceMetrics[chiaServiceWallet] = &WalletServiceMetrics{metrics: metrics}
metrics.serviceMetrics[chiaServiceCrawler] = &CrawlerServiceMetrics{metrics: metrics}
Expand All @@ -116,6 +131,29 @@ func NewMetrics(port uint16, logLevel log.Level) (*Metrics, error) {
return metrics, nil
}

func (m *Metrics) createDBClient() error {
var err error

cfg := mysql.Config{
User: viper.GetString("mysql-user"),
Passwd: viper.GetString("mysql-password"),
Net: "tcp",
Addr: fmt.Sprintf("%s:%d", viper.GetString("mysql-host"), viper.GetUint16("mysql-port")),
DBName: viper.GetString("mysql-db-name"),
AllowNativePasswords: true,
}
m.mysqlClient, err = sql.Open("mysql", cfg.FormatDSN())
if err != nil {
return err
}

m.mysqlClient.SetConnMaxLifetime(time.Minute * 3)
m.mysqlClient.SetMaxOpenConns(10)
m.mysqlClient.SetMaxIdleConns(10)

return nil
}

// newGauge returns a lazy gauge that follows naming conventions
func (m *Metrics) newGauge(service chiaService, name string, help string) *wrappedPrometheus.LazyGauge {
opts := prometheus.GaugeOpts{
Expand Down

0 comments on commit 5421151

Please sign in to comment.