diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 7399e807..bd5e28e2 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -25,6 +25,7 @@ var ( AppKey string NetworkTXQuery string BrokerIDTag string + InstanceTypeTag string MetricsWindow int ZKAddr string ZKPrefix string @@ -51,6 +52,7 @@ func init() { flag.StringVar(&Config.AppKey, "app-key", "", "Datadog app key") flag.StringVar(&Config.NetworkTXQuery, "net-tx-query", "avg:system.net.bytes_sent{service:kafka} by {host}", "Datadog query for broker outbound bandwidth by host") flag.StringVar(&Config.BrokerIDTag, "broker-id-tag", "broker_id", "Datadog host tag for broker ID") + flag.StringVar(&Config.InstanceTypeTag, "instance-type-tag", "instance-type", "Datadog instance-type tag for broker") flag.IntVar(&Config.MetricsWindow, "metrics-window", 120, "Time span of metrics required (seconds)") flag.StringVar(&Config.ZKAddr, "zk-addr", "localhost:2181", "ZooKeeper connect string (for broker metadata or rebuild-topic lookups)") flag.StringVar(&Config.ZKPrefix, "zk-prefix", "", "ZooKeeper namespace prefix") @@ -106,11 +108,12 @@ func main() { // Init a Kafka metrics fetcher. km, err := datadog.NewHandler(&datadog.Config{ - APIKey: Config.APIKey, - AppKey: Config.AppKey, - NetworkTXQuery: Config.NetworkTXQuery, - BrokerIDTag: Config.BrokerIDTag, - MetricsWindow: Config.MetricsWindow, + APIKey: Config.APIKey, + AppKey: Config.AppKey, + NetworkTXQuery: Config.NetworkTXQuery, + BrokerIDTag: Config.BrokerIDTag, + InstanceTypeTag: Config.InstanceTypeTag, + MetricsWindow: Config.MetricsWindow, }) if err != nil { log.Fatal(err) diff --git a/kafkametrics/datadog/datadog.go b/kafkametrics/datadog/datadog.go index 95ed21d8..002c0b57 100644 --- a/kafkametrics/datadog/datadog.go +++ b/kafkametrics/datadog/datadog.go @@ -27,6 +27,9 @@ type Config struct { // BrokerIDTag is the host tag name // for Kafka broker IDs. BrokerIDTag string + // InstanceTypeTag is the instance type tag name + // for Kafka broker Instance. + InstanceTypeTag string // MetricsWindow specifies the window size of // timeseries data to evaluate in seconds. // All values for the window are averaged. @@ -34,13 +37,14 @@ type Config struct { } type ddHandler struct { - c *dd.Client - netTXQuery string - brokerIDTag string - metricsWindow int - tagCache map[string][]string - keysRegex *regexp.Regexp - redactionSub []byte + c *dd.Client + netTXQuery string + brokerIDTag string + instanceTypeTag string + metricsWindow int + tagCache map[string][]string + keysRegex *regexp.Regexp + redactionSub []byte } // NewHandler takes a *Config and @@ -57,12 +61,13 @@ func NewHandler(c *Config) (kafkametrics.Handler, error) { keysRegex := regexp.MustCompile(fmt.Sprintf("%s|%s", c.APIKey, c.AppKey)) h := &ddHandler{ - netTXQuery: createNetTXQuery(c), - metricsWindow: c.MetricsWindow, - brokerIDTag: c.BrokerIDTag, - tagCache: make(map[string][]string), - keysRegex: keysRegex, - redactionSub: []byte("xxx"), + netTXQuery: createNetTXQuery(c), + metricsWindow: c.MetricsWindow, + brokerIDTag: c.BrokerIDTag, + instanceTypeTag: c.InstanceTypeTag, + tagCache: make(map[string][]string), + keysRegex: keysRegex, + redactionSub: []byte("xxx"), } client := dd.NewClient(c.APIKey, c.AppKey) diff --git a/kafkametrics/datadog/datadog_test.go b/kafkametrics/datadog/datadog_test.go index d5e1637d..bf0921b4 100644 --- a/kafkametrics/datadog/datadog_test.go +++ b/kafkametrics/datadog/datadog_test.go @@ -98,7 +98,7 @@ func TestPopulateFromTagMap(t *testing.T) { // Test with complete input. tagMap := mockTagMap() - err := populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id") + err := populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id", "instance-type") if err != nil { t.Errorf("Unexpected error: %s\n", err) } @@ -119,7 +119,7 @@ func TestPopulateFromTagMap(t *testing.T) { // Test with incomplete input. tagMap[rndBroker] = tagMap[rndBroker][1:] - err = populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id") + err = populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id", "instance-type") if err == nil { t.Errorf("Expected error, got nil") } diff --git a/kafkametrics/datadog/helpers.go b/kafkametrics/datadog/helpers.go index b3e1edc5..64159d15 100644 --- a/kafkametrics/datadog/helpers.go +++ b/kafkametrics/datadog/helpers.go @@ -65,7 +65,7 @@ func (h *ddHandler) brokerMetricsFromList(l []*kafkametrics.Broker) (kafkametric } brokers := kafkametrics.BrokerMetrics{} - errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag) + errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag, h.instanceTypeTag) if errs != nil { errs = append(errors, errs...) } @@ -112,7 +112,7 @@ func (h *ddHandler) getHostTagMap(l []*kafkametrics.Broker) (map[*kafkametrics.B // to []string unparsed host tag key:value pairs, and a broker ID tag key // populates the kafkametrics.BrokerMetrics with tags of interest. // An error describing any missing tags is returned. -func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t map[*kafkametrics.Broker][]string, btag string) []error { +func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t map[*kafkametrics.Broker][]string, btag string, itag string) []error { var missingTags bytes.Buffer for b, ht := range t { @@ -133,7 +133,7 @@ func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t } // Get instance type. - it = valFromTags(ht, "instance-type") + it = valFromTags(ht, itag) if it != "" { // Cache this broker's tags. In case additional tags are populated // in the future, we should only cache brokers that have @@ -153,6 +153,7 @@ func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t b.ID = id b.InstanceType = it bm[id] = b + } if missingTags.String() != "" {