Skip to content

Commit

Permalink
Support maxmind ASN DB
Browse files Browse the repository at this point in the history
  • Loading branch information
cmmarslender committed Mar 6, 2024
1 parent 0650f45 commit 59a2bfa
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
7 changes: 5 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func init() {
var (
hostname string
metricsPort int
maxmindDBPath string
maxmindCountryDBPath string
maxmindASNDBPath string
logLevel string
requestTimeout time.Duration
disableCentralHarvesterCollection bool
Expand All @@ -43,7 +44,8 @@ func init() {

rootCmd.PersistentFlags().StringVar(&hostname, "hostname", "localhost", "The hostname to connect to")
rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", 9914, "The port the metrics server binds to")
rootCmd.PersistentFlags().StringVar(&maxmindDBPath, "maxmind-country-db-path", "", "Path to the maxmind country database file")
rootCmd.PersistentFlags().StringVar(&maxmindCountryDBPath, "maxmind-country-db-path", "", "Path to the maxmind country database file")
rootCmd.PersistentFlags().StringVar(&maxmindASNDBPath, "maxmind-asn-db-path", "", "Path to the maxmind ASN database file")
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "How verbose the logs should be. panic, fatal, error, warn, info, debug, trace")
rootCmd.PersistentFlags().DurationVar(&requestTimeout, "rpc-timeout", 10*time.Second, "How long RPC requests will wait before timing out")
rootCmd.PersistentFlags().BoolVar(&disableCentralHarvesterCollection, "disable-central-harvester-collection", false, "Disables collection of harvester information via the farmer. Useful for very large farms where this request is very expensive, or cases where chia-exporter is already installed on all harvesters")
Expand All @@ -52,6 +54,7 @@ func init() {
cobra.CheckErr(viper.BindPFlag("hostname", rootCmd.PersistentFlags().Lookup("hostname")))
cobra.CheckErr(viper.BindPFlag("metrics-port", rootCmd.PersistentFlags().Lookup("metrics-port")))
cobra.CheckErr(viper.BindPFlag("maxmind-country-db-path", rootCmd.PersistentFlags().Lookup("maxmind-country-db-path")))
cobra.CheckErr(viper.BindPFlag("maxmind-asn-db-path", rootCmd.PersistentFlags().Lookup("maxmind-asn-db-path")))
cobra.CheckErr(viper.BindPFlag("log-level", rootCmd.PersistentFlags().Lookup("log-level")))
cobra.CheckErr(viper.BindPFlag("rpc-timeout", rootCmd.PersistentFlags().Lookup("rpc-timeout")))
cobra.CheckErr(viper.BindPFlag("disable-central-harvester-collection", rootCmd.PersistentFlags().Lookup("disable-central-harvester-collection")))
Expand Down
120 changes: 105 additions & 15 deletions internal/metrics/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type CrawlerServiceMetrics struct {
metrics *Metrics

// Interfaces with Maxmind
maxMindDB *maxminddb.Reader
maxMindCountryDB *maxminddb.Reader
maxMindASNDB *maxminddb.Reader

// Crawler Metrics
totalNodes5Days *wrappedPrometheus.LazyGauge
Expand All @@ -37,6 +38,7 @@ type CrawlerServiceMetrics struct {
ipv6Nodes5Days *wrappedPrometheus.LazyGauge
versionBuckets *prometheus.GaugeVec
countryNodeCountBuckets *prometheus.GaugeVec
asnNodeCountBuckets *prometheus.GaugeVec

// Debug Metric
debug *prometheus.GaugeVec
Expand All @@ -51,26 +53,49 @@ func (s *CrawlerServiceMetrics) InitMetrics() {
s.ipv6Nodes5Days = s.metrics.newGauge(chiaServiceCrawler, "ipv6_nodes_5_days", "Total number of IPv6 nodes that have been gossiped around the network with a timestamp in the last 5 days. The crawler did not necessarily connect to all of these peers itself.")
s.versionBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "peer_version", "Number of peers for each version. Only peers the crawler was able to connect to are included here.", []string{"version"})
s.countryNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "country_node_count", "Number of peers gossiped in the last 5 days from each country.", []string{"country", "country_display"})
s.asnNodeCountBuckets = s.metrics.newGaugeVec(chiaServiceCrawler, "asn_node_count", "Number of peers gossiped in the last 5 days from each asn.", []string{"asn", "organization"})

// Debug Metric
s.debug = s.metrics.newGaugeVec(chiaServiceCrawler, "debug_metrics", "random debugging metrics distinguished by labels", []string{"key"})

err := s.initMaxmindDB()
err := s.initMaxmindCountryDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Errorf("Error initializing maxmind DB: %s\n", err.Error())
log.Printf("Error initializing maxmind country DB: %s\n", err.Error())
}

err = s.initMaxmindASNDB()
if err != nil {
// Continue on maxmind error - optional/not critical functionality
log.Printf("Error initializing maxmind ASN DB: %s\n", err.Error())
}
}

// initMaxmindDB loads the maxmind DB if the file is present
// initMaxmindCountryDB loads the maxmind country DB if the file is present
// If the DB is not present, ip/country mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindDB() error {
func (s *CrawlerServiceMetrics) initMaxmindCountryDB() error {
var err error
dbPath := viper.GetString("maxmind-country-db-path")
if dbPath == "" {
return nil
}
s.maxMindDB, err = maxminddb.Open(dbPath)
s.maxMindCountryDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}

return nil
}

// initMaxmindASNDB loads the maxmind ASN DB if the file is present
// If the DB is not present, ip/ASN mapping is skipped
func (s *CrawlerServiceMetrics) initMaxmindASNDB() error {
var err error
dbPath := viper.GetString("maxmind-asn-db-path")
if dbPath == "" {
return nil
}
s.maxMindASNDB, err = maxminddb.Open(dbPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -134,15 +159,16 @@ func (s *CrawlerServiceMetrics) GetPeerCounts(resp *types.WebsocketResponse) {
s.versionBuckets.WithLabelValues(version).Set(float64(count))
}

s.StartIPCountryMapping(peerCounts.TotalLast5Days)
s.StartIPMapping(peerCounts.TotalLast5Days)
}
}

// StartIPCountryMapping starts the process to fetch current IPs from the crawler
// and maps them to countries using maxmind
// StartIPMapping starts the process to fetch current IPs from the crawler
// when a response is received, the IPs are mapped to countries and/or ASNs using maxmind, if databases are provided
// Updates metrics value once all pages have been received
func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
if s.maxMindDB == nil {
func (s *CrawlerServiceMetrics) StartIPMapping(limit uint) {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

Expand All @@ -168,14 +194,21 @@ func (s *CrawlerServiceMetrics) StartIPCountryMapping(limit uint) {
// GetIPsAfterTimestamp processes a response of IPs seen since a timestamp
// Currently assumes all IPs will be in one response
func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimestampResponse) {
if s.maxMindDB == nil {
// If we don't have either maxmind DB, bail now
if s.maxMindCountryDB == nil && s.maxMindASNDB == nil {
return
}

if ips == nil {
return
}

s.ProcessIPCountryMapping(ips)
s.ProcessIPASNMapping(ips)
}

// ProcessIPCountryMapping Processes the list of IPs to countries
func (s *CrawlerServiceMetrics) ProcessIPCountryMapping(ips *rpc.GetIPsAfterTimestampResponse) {
type countStruct struct {
ISOCode string
Name string
Expand Down Expand Up @@ -210,6 +243,39 @@ func (s *CrawlerServiceMetrics) GetIPsAfterTimestamp(ips *rpc.GetIPsAfterTimesta
}
}

// ProcessIPASNMapping Processes the list of IPs to ASNs
func (s *CrawlerServiceMetrics) ProcessIPASNMapping(ips *rpc.GetIPsAfterTimestampResponse) {
type countStruct struct {
ASN int
Organization string
Count float64
}
asnCounts := map[int]*countStruct{}

if ipresult, hasIPResult := ips.IPs.Get(); hasIPResult {
for _, ip := range ipresult {
asn, err := s.GetASNForIP(ip)
if err != nil {
continue
}

if _, ok := asnCounts[asn.AutonomousSystemNumber]; !ok {
asnCounts[asn.AutonomousSystemNumber] = &countStruct{
ASN: asn.AutonomousSystemNumber,
Organization: asn.AutonomousSystemOrganization,
Count: 0,
}
}

asnCounts[asn.AutonomousSystemNumber].Count++
}
}

for _, asnData := range asnCounts {
s.asnNodeCountBuckets.WithLabelValues(fmt.Sprintf("%d", asnData.ASN), asnData.Organization).Set(asnData.Count)
}
}

// CountryRecord record of a country from maxmind
type CountryRecord struct {
Country Country `maxminddb:"country"`
Expand All @@ -223,15 +289,39 @@ type Country struct {

// GetCountryForIP Gets country data for an ip address
func (s *CrawlerServiceMetrics) GetCountryForIP(ipStr string) (*CountryRecord, error) {
if s.maxMindDB == nil {
return nil, fmt.Errorf("maxmind not initialized")
if s.maxMindCountryDB == nil {
return nil, fmt.Errorf("maxmind country DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &CountryRecord{}

err := s.maxMindDB.Lookup(ip, record)
err := s.maxMindCountryDB.Lookup(ip, record)
if err != nil {
return nil, err
}

return record, nil
}

// ASNRecord record of a country from maxmind
type ASNRecord struct {
AutonomousSystemNumber int `maxminddb:"autonomous_system_number"`
AutonomousSystemOrganization string `maxminddb:"autonomous_system_organization"`
}

// GetASNForIP Gets ASN data for an ip address
func (s *CrawlerServiceMetrics) GetASNForIP(ipStr string) (*ASNRecord, error) {
if s.maxMindASNDB == nil {
return nil, fmt.Errorf("maxmind ASN DB not initialized")
}

ip := net.ParseIP(ipStr)

record := &ASNRecord{}

err := s.maxMindASNDB.Lookup(ip, &record)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 59a2bfa

Please sign in to comment.