Skip to content

Commit

Permalink
ASN Support (#22)
Browse files Browse the repository at this point in the history
* Update readme for country + ASN db support. Change flag to include country in the name

* Support maxmind ASN DB

* Update readme with ASN/MySQL Note

* Store ASN data to mysql, if enabled
  • Loading branch information
cmmarslender authored Mar 6, 2024
1 parent 72ca948 commit d1dcd7f
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 23 deletions.
28 changes: 25 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,53 @@ func init() {
var (
hostname string
metricsPort int
maxmindDBPath string
maxmindCountryDBPath string
maxmindASNDBPath string
logLevel string
requestTimeout time.Duration
disableCentralHarvesterCollection bool
logBlockTimes bool

mysqlHost string
mysqlPort uint16
mysqlUser string
mysqlPass string
mysqlDBName string
mysqlBatchSize uint32
)

cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.chia-exporter.yaml)")

rootCmd.PersistentFlags().StringVar(&hostname, "hostname", "localhost", "The hostname to connect to")
rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", 9914, "The port the metrics server binds to")
rootCmd.PersistentFlags().StringVar(&maxmindDBPath, "maxmind-db-path", "", "Path to the maxmind database file")
rootCmd.PersistentFlags().StringVar(&maxmindCountryDBPath, "maxmind-country-db-path", "", "Path to the maxmind country database file")
rootCmd.PersistentFlags().StringVar(&maxmindASNDBPath, "maxmind-asn-db-path", "", "Path to the maxmind ASN database file")
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "How verbose the logs should be. panic, fatal, error, warn, info, debug, trace")
rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out")
rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters")
rootCmd.PersistentFlags().BoolVar(&logBlockTimes, "log-block-times", false, "Enables logging of block (pre)validation times to log files.")
rootCmd.PersistentFlags().StringVar(&mysqlHost, "mysql-host", "127.0.0.1", "MySQL host for metrics that get stored to a DB")
rootCmd.PersistentFlags().Uint16Var(&mysqlPort, "mysql-port", 3306, "Port of the MySQL database")
rootCmd.PersistentFlags().StringVar(&mysqlUser, "mysql-user", "root", "The username for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlPass, "mysql-password", "password", "The password for the MySQL Database")
rootCmd.PersistentFlags().StringVar(&mysqlDBName, "mysql-db-name", "chia-exporter", "The database in MySQL to use for metrics")
rootCmd.PersistentFlags().Uint32Var(&mysqlBatchSize, "mysql-batch-size", 250, "How many records will be batched into a single insert to MySQL")

cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname")))
cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port")))
cobra.CheckErr(viper.BindPFlag("maxmind-db-path", rootCmd.PersistentFlags().Lookup("maxmind-db-path")))
cobra.CheckErr(viper.BindPFlag("maxmind-country-db-path", rootCmd.PersistentFlags().Lookup("maxmind-country-db-path")))
cobra.CheckErr(viper.BindPFlag("maxmind-asn-db-path", rootCmd.PersistentFlags().Lookup("maxmind-asn-db-path")))
cobra.CheckErr(viper.BindPFlag("log-level", rootCmd.PersistentFlags().Lookup("log-level")))
cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout")))
cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection")))
cobra.CheckErr(viper.BindPFlag("log-block-times", rootCmd.PersistentFlags().Lookup("log-block-times")))
cobra.CheckErr(viper.BindPFlag("mysql-host", rootCmd.PersistentFlags().Lookup("mysql-host")))
cobra.CheckErr(viper.BindPFlag("mysql-port", rootCmd.PersistentFlags().Lookup("mysql-port")))
cobra.CheckErr(viper.BindPFlag("mysql-user", rootCmd.PersistentFlags().Lookup("mysql-user")))
cobra.CheckErr(viper.BindPFlag("mysql-password", rootCmd.PersistentFlags().Lookup("mysql-password")))
cobra.CheckErr(viper.BindPFlag("mysql-db-name", rootCmd.PersistentFlags().Lookup("mysql-db-name")))
cobra.CheckErr(viper.BindPFlag("mysql-batch-size", rootCmd.PersistentFlags().Lookup("mysql-batch-size")))
}

// initConfig reads in config file and ENV variables if set.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/chia-network/go-chia-libs v0.5.3
github.com/chia-network/go-modules v0.0.4
github.com/go-sql-driver/mysql v1.7.1
github.com/oschwald/maxminddb-golang v1.12.0
github.com/prometheus/client_golang v1.19.0
github.com/sirupsen/logrus v1.9.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
157 changes: 140 additions & 17 deletions internal/metrics/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -28,7 +29,8 @@ type CrawlerServiceMetrics struct {
metrics *Metrics

// Interfaces with Maxmind
maxMindDB *maxminddb.Reader
maxMindCountryDB *maxminddb.Reader
maxMindASNDB *maxminddb.Reader

// Crawler Metrics
totalNodes5Days *wrappedPrometheus.LazyGauge
Expand All @@ -55,22 +57,44 @@ func (s *CrawlerServiceMetrics) InitMetrics() {
// Debug Metric
s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"})

err := s.initMaxmindDB()
err := s.initMaxmindCountryDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Errorf("Error initializing maxmind DB: %s\n", err.Error())
log.Printf("Error initializing maxmind country DB: %s\n", err.Error())
}

err = s.initMaxmindASNDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Printf("Error initializing maxmind ASN DB: %s\n", err.Error())
}
}

// initMaxmindDB loads the maxmind DB if the file is present
// initMaxmindCountryDB loads the maxmind country DB if the file is present
// If the DB is not present, ip/country mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindDB() error {
func (s *CrawlerServiceMetrics) initMaxmindCountryDB() error {
var err error
dbPath := viper.GetString("maxmind-country-db-path")
if dbPath == "" {
return nil
}
s.maxMindCountryDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}

return nil
}

// initMaxmindASNDB loads the maxmind ASN DB if the file is present
// If the DB is not present, ip/ASN mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindASNDB() error {
var err error
dbPath := viper.GetString("maxmind-db-path")
dbPath := viper.GetString("maxmind-asn-db-path")
if dbPath == "" {
return nil
}
s.maxMindDB, err = maxminddb.Open(dbPath)
s.maxMindASNDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -134,15 +158,16 @@ func (s *CrawlerServiceMetrics) GetPeerCounts(resp *types.WebsocketResponse) {
s.versionBuckets.WithLabelValues(version).Set(float64(count))
}

s.StartIPCountryMapping(peerCounts.TotalLast5Days)
s.StartIPMapping(peerCounts.TotalLast5Days)
}
}

// StartIPCountryMapping starts the process to fetch current IPs from the crawler
// and maps them to countries using maxmind
// StartIPMapping starts the process to fetch current IPs from the crawler
// when a response is received, the IPs are mapped to countries and/or ASNs using maxmind, if databases are provided
// Updates metrics value once all pages have been received
func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
if s.maxMindDB == nil {
func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

Expand All @@ -151,7 +176,7 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
return
}

log.Println("Requesting IP addresses from the past 5 days for country mapping...")
log.Println("Requesting IP addresses from the past 5 days for country and/or ASN mapping...")

ipsAfterTimestamp, _, err := s.metrics.httpClient.CrawlerService.GetIPsAfterTimestamp(&rpc.GetIPsAfterTimestampOptions{
After: time.Now().Add(-5 * time.Hour * 24).Unix(),
Expand All @@ -168,14 +193,21 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
// GetIPsAfterTimestamp processes a response of IPs seen since a timestamp
// Currently assumes all IPs will be in one response
func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimestampResponse) {
if s.maxMindDB == nil {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

if ips == nil {
return
}

s.ProcessIPCountryMapping(ips)
s.ProcessIPASNMapping(ips)
}

// ProcessIPCountryMapping Processes the list of IPs to countries
func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTimestampResponse) {
type countStruct struct {
ISOCode string
Name string
Expand Down Expand Up @@ -210,6 +242,73 @@ func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimesta
}
}

// ProcessIPASNMapping Processes the list of IPs to ASNs
func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) {
// Don't process if we can't store
if s.metrics.mysqlClient == nil {
return
}
type countStruct struct {
ASN uint32
Organization string
Count uint32
}
asnCounts := map[uint32]*countStruct{}

if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult {
for _, ip := range ipresult {
asn, err := s.GetASNForIP(ip)
if err != nil {
continue
}

if _, ok := asnCounts[asn.AutonomousSystemNumber]; !ok {
asnCounts[asn.AutonomousSystemNumber] = &countStruct{
ASN: asn.AutonomousSystemNumber,
Organization: asn.AutonomousSystemOrganization,
Count: 0,
}
}

asnCounts[asn.AutonomousSystemNumber].Count++
}
}

err := s.metrics.DeleteASNRecords()
if err != nil {
log.Errorf("unable to delete old ASN records from the database: %s\n", err.Error())
return
}

batchSize := viper.GetUint32("mysql-batch-size")
var valueStrings []string
var valueArgs []interface{}

for i, asnData := range asnCounts {
if asnData.ASN == 0 {
continue
}

valueStrings = append(valueStrings, "(?, ?, ?)")
valueArgs = append(valueArgs, asnData.ASN, asnData.Organization, asnData.Count)

// Execute the batch insert when reaching the batch size or the end of the slice
if (i+1)%batchSize == 0 || i+1 == uint32(len(asnCounts)) {
_, err := s.metrics.mysqlClient.Exec(
fmt.Sprintf("INSERT INTO asn(asn, organization, count) VALUES %s", strings.Join(valueStrings, ",")),
valueArgs...)

if err != nil {
log.Errorf("error inserting ASN record to mysql for asn:%d count:%d error: %s\n", asnData.ASN, asnData.Count, err.Error())
}

// Reset the slices for the next batch
valueStrings = []string{}
valueArgs = []interface{}{}
}
}
}

// CountryRecord record of a country from maxmind
type CountryRecord struct {
Country Country `maxminddb:"country"`
Expand All @@ -223,15 +322,39 @@ type Country struct {

// GetCountryForIP Gets country data for an ip address
func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, error) {
if s.maxMindDB == nil {
return nil, fmt.Errorf("maxmind not initialized")
if s.maxMindCountryDB == nil {
return nil, fmt.Errorf("maxmind country DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &CountryRecord{}

err := s.maxMindDB.Lookup(ip, record)
err := s.maxMindCountryDB.Lookup(ip, record)
if err != nil {
return nil, err
}

return record, nil
}

// ASNRecord record of a country from maxmind
type ASNRecord struct {
AutonomousSystemNumber uint32 `maxminddb:"autonomous_system_number"`
AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"`
}

// GetASNForIP Gets ASN data for an ip address
func (s *CrawlerServiceMetrics) GetASNForIP(ipStr string) (*ASNRecord, error) {
if s.maxMindASNDB == nil {
return nil, fmt.Errorf("maxmind ASN DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &ASNRecord{}

err := s.maxMindASNDB.Lookup(ip, &record)
if err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions internal/metrics/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metrics

// initTables ensures that the tables required exist and have the correct columns present
func (m *Metrics) initTables() error {
if m.mysqlClient == nil {
return nil
}
query := "CREATE TABLE IF NOT EXISTS `asn` (" +
" `asn` int unsigned NOT NULL," +
" `organization` VARCHAR(255) NOT NULL," +
" `count` int unsigned NOT NULL," +
"UNIQUE KEY `asn-unique` (`asn`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;"

result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}

// DeleteASNRecords deletes all records from the asn table in the database
func (m *Metrics) DeleteASNRecords() error {
if m.mysqlClient == nil {
return nil
}
query := "DELETE from asn;"
result, err := m.mysqlClient.Query(query)
if err != nil {
return err
}
return result.Close()
}
Loading

0 comments on commit d1dcd7f

Please sign in to comment.