Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ASN Support #22

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,53 @@ func init() {
var (
hostname string
metricsPort int
maxmindDBPath string
maxmindCountryDBPath string
maxmindASNDBPath string
logLevel string
requestTimeout time.Duration
disableCentralHarvesterCollection bool
logBlockTimes bool

mysqlHost string
mysqlPort uint16
mysqlUser string
mysqlPass string
mysqlDBName string
mysqlBatchSize uint32
)

cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.chia-exporter.yaml)")

rootCmd.PersistentFlags().StringVar(&hostname, "hostname", "localhost", "The hostname to connect to")
rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", 9914, "The port the metrics server binds to")
rootCmd.PersistentFlags().StringVar(&maxmindDBPath, "maxmind-db-path", "", "Path to the maxmind database file")
rootCmd.PersistentFlags().StringVar(&maxmindCountryDBPath, "maxmind-country-db-path", "", "Path to the maxmind country database file")
rootCmd.PersistentFlags().StringVar(&maxmindASNDBPath, "maxmind-asn-db-path", "", "Path to the maxmind ASN database file")
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "How verbose the logs should be. panic, fatal, error, warn, info, debug, trace")
rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out")
rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters")
rootCmd.PersistentFlags().BoolVar(&logBlockTimes, "log-block-times", false, "Enables logging of block (pre)validation times to log files.")
rootCmd.PersistentFlags().StringVar(&mysqlHost, "mysql-host", "127.0.0.1", "MySQL host for metrics that get stored to a DB")
rootCmd.PersistentFlags().Uint16Var(&mysqlPort, "mysql-port", 3306, "Port of the MySQL database")
rootCmd.PersistentFlags().StringVar(&mysqlUser, "mysql-user", "root", "The username for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlPass, "mysql-password", "password", "The password for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlDBName, "mysql-db-name", "chia-exporter", "The database in MySQL to use for metrics")
rootCmd.PersistentFlags().Uint32Var(&mysqlBatchSize, "mysql-batch-size", 250, "How many records will be batched into a single insert to MySQL")

cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname")))
cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port")))
cobra.CheckErr(viper.BindPFlag("maxmind-db-path", rootCmd.PersistentFlags().Lookup("maxmind-db-path")))
cobra.CheckErr(viper.BindPFlag("maxmind-country-db-path", rootCmd.PersistentFlags().Lookup("maxmind-country-db-path")))
cobra.CheckErr(viper.BindPFlag("maxmind-asn-db-path", rootCmd.PersistentFlags().Lookup("maxmind-asn-db-path")))
cobra.CheckErr(viper.BindPFlag("log-level", rootCmd.PersistentFlags().Lookup("log-level")))
cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout")))
cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection")))
cobra.CheckErr(viper.BindPFlag("log-block-times", rootCmd.PersistentFlags().Lookup("log-block-times")))
cobra.CheckErr(viper.BindPFlag("mysql-host", rootCmd.PersistentFlags().Lookup("mysql-host")))
cobra.CheckErr(viper.BindPFlag("mysql-port", rootCmd.PersistentFlags().Lookup("mysql-port")))
cobra.CheckErr(viper.BindPFlag("mysql-user", rootCmd.PersistentFlags().Lookup("mysql-user")))
cobra.CheckErr(viper.BindPFlag("mysql-password", rootCmd.PersistentFlags().Lookup("mysql-password")))
cobra.CheckErr(viper.BindPFlag("mysql-db-name", rootCmd.PersistentFlags().Lookup("mysql-db-name")))
cobra.CheckErr(viper.BindPFlag("mysql-batch-size", rootCmd.PersistentFlags().Lookup("mysql-batch-size")))
}

// initConfig reads in config file and ENV variables if set.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/chia-network/go-chia-libs v0.5.3
github.com/chia-network/go-modules v0.0.4
github.com/go-sql-driver/mysql v1.7.1
github.com/oschwald/maxminddb-golang v1.12.0
github.com/prometheus/client_golang v1.19.0
github.com/sirupsen/logrus v1.9.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
157 changes: 140 additions & 17 deletions internal/metrics/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -28,7 +29,8 @@ type CrawlerServiceMetrics struct {
metrics *Metrics

// Interfaces with Maxmind
maxMindDB *maxminddb.Reader
maxMindCountryDB *maxminddb.Reader
maxMindASNDB *maxminddb.Reader

// Crawler Metrics
totalNodes5Days *wrappedPrometheus.LazyGauge
Expand All @@ -55,22 +57,44 @@ func (s *CrawlerServiceMetrics) InitMetrics() {
// Debug Metric
s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"})

err := s.initMaxmindDB()
err := s.initMaxmindCountryDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Errorf("Error initializing maxmind DB: %s\n", err.Error())
log.Printf("Error initializing maxmind country DB: %s\n", err.Error())
}

err = s.initMaxmindASNDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Printf("Error initializing maxmind ASN DB: %s\n", err.Error())
}
}

// initMaxmindDB loads the maxmind DB if the file is present
// initMaxmindCountryDB loads the maxmind country DB if the file is present
// If the DB is not present, ip/country mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindDB() error {
func (s *CrawlerServiceMetrics) initMaxmindCountryDB() error {
var err error
dbPath := viper.GetString("maxmind-country-db-path")
if dbPath == "" {
return nil
}
s.maxMindCountryDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}

return nil
}

// initMaxmindASNDB loads the maxmind ASN DB if the file is present
// If the DB is not present, ip/ASN mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindASNDB() error {
var err error
dbPath := viper.GetString("maxmind-db-path")
dbPath := viper.GetString("maxmind-asn-db-path")
if dbPath == "" {
return nil
}
s.maxMindDB, err = maxminddb.Open(dbPath)
s.maxMindASNDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -134,15 +158,16 @@ func (s *CrawlerServiceMetrics) GetPeerCounts(resp *types.WebsocketResponse) {
s.versionBuckets.WithLabelValues(version).Set(float64(count))
}

s.StartIPCountryMapping(peerCounts.TotalLast5Days)
s.StartIPMapping(peerCounts.TotalLast5Days)
}
}

// StartIPCountryMapping starts the process to fetch current IPs from the crawler
// and maps them to countries using maxmind
// StartIPMapping starts the process to fetch current IPs from the crawler
// when a response is received, the IPs are mapped to countries and/or ASNs using maxmind, if databases are provided
// Updates metrics value once all pages have been received
func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
if s.maxMindDB == nil {
func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

Expand All @@ -151,7 +176,7 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
return
}

log.Println("Requesting IP addresses from the past 5 days for country mapping...")
log.Println("Requesting IP addresses from the past 5 days for country and/or ASN mapping...")

ipsAfterTimestamp, _, err := s.metrics.httpClient.CrawlerService.GetIPsAfterTimestamp(&rpc.GetIPsAfterTimestampOptions{
After: time.Now().Add(-5 * time.Hour * 24).Unix(),
Expand All @@ -168,14 +193,21 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
// GetIPsAfterTimestamp processes a response of IPs seen since a timestamp
// Currently assumes all IPs will be in one response
func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimestampResponse) {
if s.maxMindDB == nil {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

if ips == nil {
return
}

s.ProcessIPCountryMapping(ips)
s.ProcessIPASNMapping(ips)
}

// ProcessIPCountryMapping Processes the list of IPs to countries
func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTimestampResponse) {
type countStruct struct {
ISOCode string
Name string
Expand Down Expand Up @@ -210,6 +242,73 @@ func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimesta
}
}

// ProcessIPASNMapping Processes the list of IPs to ASNs
func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) {
// Don't process if we can't store
if s.metrics.mysqlClient == nil {
return
}
type countStruct struct {
ASN uint32
Organization string
Count uint32
}
asnCounts := map[uint32]*countStruct{}

if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult {
for _, ip := range ipresult {
asn, err := s.GetASNForIP(ip)
if err != nil {
continue
}

if _, ok := asnCounts[asn.AutonomousSystemNumber]; !ok {
asnCounts[asn.AutonomousSystemNumber] = &countStruct{
ASN: asn.AutonomousSystemNumber,
Organization: asn.AutonomousSystemOrganization,
Count: 0,
}
}

asnCounts[asn.AutonomousSystemNumber].Count++
}
}

err := s.metrics.DeleteASNRecords()
if err != nil {
log.Errorf("unable to delete old ASN records from the database: %s\n", err.Error())
return
}

batchSize := viper.GetUint32("mysql-batch-size")
var valueStrings []string
var valueArgs []interface{}

for i, asnData := range asnCounts {
if asnData.ASN == 0 {
continue
}

valueStrings = append(valueStrings, "(?, ?, ?)")
valueArgs = append(valueArgs, asnData.ASN, asnData.Organization, asnData.Count)

// Execute the batch insert when reaching the batch size or the end of the slice
if (i+1)%batchSize == 0 || i+1 == uint32(len(asnCounts)) {
_, err := s.metrics.mysqlClient.Exec(
fmt.Sprintf("INSERT INTO asn(asn, organization, count) VALUES %s", strings.Join(valueStrings, ",")),
valueArgs...)

if err != nil {
log.Errorf("error inserting ASN record to mysql for asn:%d count:%d error: %s\n", asnData.ASN, asnData.Count, err.Error())
}

// Reset the slices for the next batch
valueStrings = []string{}
valueArgs = []interface{}{}
}
}
}

// CountryRecord record of a country from maxmind
type CountryRecord struct {
Country Country `maxminddb:"country"`
Expand All @@ -223,15 +322,39 @@ type Country struct {

// GetCountryForIP Gets country data for an ip address
func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, error) {
if s.maxMindDB == nil {
return nil, fmt.Errorf("maxmind not initialized")
if s.maxMindCountryDB == nil {
return nil, fmt.Errorf("maxmind country DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &CountryRecord{}

err := s.maxMindDB.Lookup(ip, record)
err := s.maxMindCountryDB.Lookup(ip, record)
if err != nil {
return nil, err
}

return record, nil
}

// ASNRecord record of a country from maxmind
type ASNRecord struct {
AutonomousSystemNumber uint32 `maxminddb:"autonomous_system_number"`
AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"`
}

// GetASNForIP Gets ASN data for an ip address
func (s *CrawlerServiceMetrics) GetASNForIP(ipStr string) (*ASNRecord, error) {
if s.maxMindASNDB == nil {
return nil, fmt.Errorf("maxmind ASN DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &ASNRecord{}

err := s.maxMindASNDB.Lookup(ip, &record)
if err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions internal/metrics/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metrics

// initTables ensures that the tables required exist and have the correct columns present
func (m *Metrics) initTables() error {
if m.mysqlClient == nil {
return nil
}
query := "CREATE TABLE IF NOT EXISTS `asn` (" +
" `asn` int unsigned NOT NULL," +
" `organization` VARCHAR(255) NOT NULL," +
" `count` int unsigned NOT NULL," +
"UNIQUE KEY `asn-unique` (`asn`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;"

result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}

// DeleteASNRecords deletes all records from the asn table in the database
func (m *Metrics) DeleteASNRecords() error {
if m.mysqlClient == nil {
return nil
}
query := "DELETE from asn;"
result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}
Loading
Loading