diff --git a/cmd/autothrottle/README.md b/cmd/autothrottle/README.md index 7c054fc..2c1eb6f 100644 --- a/cmd/autothrottle/README.md +++ b/cmd/autothrottle/README.md @@ -5,9 +5,9 @@ Autothrottle does this by running a loop that discovers topics undergoing replic In contrast to where Kafka's out of the box tooling allows users to manually set a static, global inbound and outbound rate, autothrottle determines path-optimal rates on an individual broker basis. It does this by examining a graph representation of replication flows and individually calculating optimal transfer rates for inbound and outbound transfers. This means that if a single broker is near saturation, it doesn't need to slow down the recovery pace of the entire replication group. This allows every broker to run at target network utilization thresholds in any direction, minimizing recovery time and hotspots. -All throttle control logic is applied directly to the cluster through ZooKeeper; autothrottle natively mirrors Kafka's throttle control implementation rather than wrapping CLI tools. +Historically, all throttle control logic has been applied directly to the cluster through ZooKeeper; autothrottle natively ports Kafka's throttle control implementation and manages it directly through the cluster state metadata housed in ZooKeeper. For newer versions of Kafka, autothrottle now supports throttle management via dynamic configurations using the Kafka Admin API, along with KIP-455 compatible reassignment lookups. This feature is enabled with the `--kafka-native-mode` flag and marks the continued support for eventual removal of ZooKeeper as a Kafka dependency (KIP-500). -Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or even outright disabled. This allows users to quickly revert to using other tools if desired. +Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or outright disabled. This allows users to quickly revert to using other tools if desired. **Additional features**: - Configurable portion of free headroom available for use by replication (`--max-rate`) @@ -82,46 +82,54 @@ The variables in brackets are optional env var overrides. ``` Usage of autothrottle: - -api-key string - Datadog API key [AUTOTHROTTLE_API_KEY] - -api-listen string - Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080") - -app-key string - Datadog app key [AUTOTHROTTLE_APP_KEY] - -broker-id-tag string - Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id") - -cap-map string - JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP] - -change-threshold float - Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10) - -cleanup-after int - Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60) - -dd-event-tags string - Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS] - -failure-threshold int - Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1) - -interval int - Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180) - -max-rx-rate float - Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90) - -max-tx-rate float - Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90) - -metrics-window int - Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120) - -min-rate float - Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10) - -net-rx-query string - Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}") - -net-tx-query string - Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}") - -version - version [AUTOTHROTTLE_VERSION] - -zk-addr string - ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181") - -zk-config-prefix string - ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle") - -zk-prefix string - ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX] +-api-key string + Datadog API key [AUTOTHROTTLE_API_KEY] +-api-listen string + Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080") +-app-key string + Datadog app key [AUTOTHROTTLE_APP_KEY] +-bootstrap-servers string + Kafka bootstrap servers [AUTOTHROTTLE_BOOTSTRAP_SERVERS] (default "localhost:9092") +-broker-id-tag string + Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id") +-cap-map string + JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP] +-change-threshold float + Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10) +-cleanup-after int + Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60) +-dd-event-tags string + Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS] +-failure-threshold int + Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1) +-instance-type-tag string + Datadog tag for instance type [AUTOTHROTTLE_INSTANCE_TYPE_TAG] (default "instance-type") +-interval int + Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180) +-kafka-api-request-timeout int + Kafka API request timeout (seconds) [AUTOTHROTTLE_KAFKA_API_REQUEST_TIMEOUT] (default 15) +-kafka-native-mode + Favor native Kafka RPCs over ZooKeeper metadata access [AUTOTHROTTLE_KAFKA_NATIVE_MODE] +-max-rx-rate float + Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90) +-max-tx-rate float + Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90) +-metrics-window int + Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120) +-min-rate float + Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10) +-net-rx-query string + Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}") +-net-tx-query string + Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}") +-version + version [AUTOTHROTTLE_VERSION] +-zk-addr string + ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181") +-zk-config-prefix string + ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle") +-zk-prefix string + ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX] ``` ## Detailed: Rate Calculations, Applying Throttles diff --git a/cmd/autothrottle/api_deprecated.go b/cmd/autothrottle/api_deprecated.go deleted file mode 100644 index f445fbb..0000000 --- a/cmd/autothrottle/api_deprecated.go +++ /dev/null @@ -1,120 +0,0 @@ -package main - -import ( - "fmt" - "io" - "net/http" - "strconv" - - "github.com/DataDog/kafka-kit/v3/kafkazk" -) - -var errDeprecated = "WARN: this route is deprecated - refer to documentation\n" - -func getThrottleDeprecated(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { - logReq(req) - if req.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - writeNLError(w, incorrectMethodError) - return - } - - r, err := fetchThrottleOverride(zk, overrideRateZnodePath) - if err != nil { - io.WriteString(w, err.Error()) - return - } - - io.WriteString(w, errDeprecated) - - switch r.Rate { - case 0: - io.WriteString(w, "no throttle override is set\n") - default: - resp := fmt.Sprintf("a throttle override is configured at %dMB/s, autoremove==%v\n", - r.Rate, r.AutoRemove) - io.WriteString(w, resp) - } -} - -func setThrottleDeprecated(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { - logReq(req) - if req.Method != http.MethodPost { - w.WriteHeader(http.StatusMethodNotAllowed) - writeNLError(w, incorrectMethodError) - return - } - - // Get rate param. - - r := req.URL.Query().Get("rate") - var rate int - var err error - - rate, err = strconv.Atoi(r) - - io.WriteString(w, errDeprecated) - - switch { - case r == "": - io.WriteString(w, "rate param must be supplied\n") - return - case r == "0": - io.WriteString(w, "rate param must be >0\n") - return - case err != nil: - io.WriteString(w, "rate param must be supplied as an integer\n") - return - } - - // Get automatic rate removal param. - - c := req.URL.Query().Get("autoremove") - var remove bool - - if c != "" { - remove, err = strconv.ParseBool(c) - if err != nil { - io.WriteString(w, "autoremove param must be a bool\n") - return - } - } - - // Populate configs. - - rateCfg := ThrottleOverrideConfig{ - Rate: rate, - AutoRemove: remove, - } - - err = storeThrottleOverride(zk, overrideRateZnodePath, rateCfg) - if err != nil { - io.WriteString(w, fmt.Sprintf("%s\n", err)) - } else { - io.WriteString(w, fmt.Sprintf("throttle successfully set to %dMB/s, autoremove==%v\n", - rate, remove)) - } -} - -func removeThrottleDeprecated(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { - logReq(req) - if req.Method != http.MethodPost { - w.WriteHeader(http.StatusMethodNotAllowed) - writeNLError(w, incorrectMethodError) - return - } - - c := ThrottleOverrideConfig{ - Rate: 0, - AutoRemove: false, - } - - io.WriteString(w, errDeprecated) - - err := storeThrottleOverride(zk, overrideRateZnodePath, c) - if err != nil { - io.WriteString(w, fmt.Sprintf("%s\n", err)) - } else { - io.WriteString(w, "throttle successfully removed\n") - } -} diff --git a/cmd/autothrottle/capacities.go b/cmd/autothrottle/capacities.go index f5ded71..12f58a1 100644 --- a/cmd/autothrottle/capacities.go +++ b/cmd/autothrottle/capacities.go @@ -58,7 +58,7 @@ func (r replicationCapacityByBroker) reset() { // in the reassignment. For each broker, it determines whether the broker is // a leader (source) or a follower (destination), and calculates a throttle // accordingly, returning a replicationCapacityByBroker and error. -func brokerReplicationCapacities(rtc *ReplicationThrottleConfigs, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) { +func brokerReplicationCapacities(rtc *ThrottleManager, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) { capacities := replicationCapacityByBroker{} // For each broker, check whether the it's a source and/or destination, diff --git a/cmd/autothrottle/capacities_test.go b/cmd/autothrottle/capacities_test.go index 45edd19..4579824 100644 --- a/cmd/autothrottle/capacities_test.go +++ b/cmd/autothrottle/capacities_test.go @@ -18,7 +18,7 @@ func TestBrokerReplicationCapacities(t *testing.T) { CapacityMap: map[string]float64{"stub": 200.00}, }) - rtc := &ReplicationThrottleConfigs{ + rtc := &ThrottleManager{ reassignments: reassignments, previouslySetThrottles: replicationCapacityByBroker{1000: throttleByRole{float64ptr(20)}}, limits: lim, diff --git a/cmd/autothrottle/api.go b/cmd/autothrottle/internal/api/api.go similarity index 84% rename from cmd/autothrottle/api.go rename to cmd/autothrottle/internal/api/api.go index 4859fe0..b4f57c8 100644 --- a/cmd/autothrottle/api.go +++ b/cmd/autothrottle/internal/api/api.go @@ -1,4 +1,4 @@ -package main +package api import ( "errors" @@ -9,6 +9,7 @@ import ( "sort" "strconv" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" "github.com/DataDog/kafka-kit/v3/kafkazk" ) @@ -20,19 +21,19 @@ type APIConfig struct { var ( overrideRateZnode = "override_rate" - overrideRateZnodePath string + OverrideRateZnodePath string incorrectMethodError = errors.New("disallowed method") ) -func initAPI(c *APIConfig, zk kafkazk.Handler) { +func Init(c *APIConfig, zk kafkazk.Handler) { chroot := fmt.Sprintf("/%s", c.ZKPrefix) - overrideRateZnodePath = fmt.Sprintf("%s/%s", chroot, overrideRateZnode) + OverrideRateZnodePath = fmt.Sprintf("%s/%s", chroot, overrideRateZnode) m := http.NewServeMux() // Check ZK for override rate config znode. var exists bool - for _, path := range []string{chroot, overrideRateZnodePath} { + for _, path := range []string{chroot, OverrideRateZnodePath} { var err error exists, err = zk.Exists(path) if err != nil { @@ -52,10 +53,11 @@ func initAPI(c *APIConfig, zk kafkazk.Handler) { // If it is, update it to the json format. // TODO(jamie): we can probably remove this by now. if exists { - r, _ := zk.Get(overrideRateZnodePath) + r, _ := zk.Get(OverrideRateZnodePath) if rate, err := strconv.Atoi(string(r)); err == nil { // Populate the updated config. - err := storeThrottleOverride(zk, overrideRateZnodePath, ThrottleOverrideConfig{Rate: rate}) + tor := throttlestore.ThrottleOverrideConfig{Rate: rate} + err := throttlestore.StoreThrottleOverride(zk, OverrideRateZnodePath, tor) if err != nil { log.Fatal(err) } @@ -72,11 +74,6 @@ func initAPI(c *APIConfig, zk kafkazk.Handler) { m.HandleFunc("/throttle/remove", func(w http.ResponseWriter, req *http.Request) { throttleRemove(w, req, zk) }) m.HandleFunc("/throttle/remove/", func(w http.ResponseWriter, req *http.Request) { throttleRemove(w, req, zk) }) - // Deprecated routes. - m.HandleFunc("/get_throttle", func(w http.ResponseWriter, req *http.Request) { getThrottleDeprecated(w, req, zk) }) - m.HandleFunc("/set_throttle", func(w http.ResponseWriter, req *http.Request) { setThrottleDeprecated(w, req, zk) }) - m.HandleFunc("/remove_throttle", func(w http.ResponseWriter, req *http.Request) { removeThrottleDeprecated(w, req, zk) }) - // Start listener. go func() { err := http.ListenAndServe(c.Listen, m) @@ -135,7 +132,7 @@ func getThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { } } - configPath := overrideRateZnodePath + configPath := OverrideRateZnodePath // A non-0 ID means that this is broker specific. if id != "" { @@ -143,7 +140,7 @@ func getThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { configPath = fmt.Sprintf("%s/%s", configPath, id) } - r, err := fetchThrottleOverride(zk, configPath) + r, err := throttlestore.FetchThrottleOverride(zk, configPath) respMessage := fmt.Sprintf("a throttle override is configured at %dMB/s, autoremove==%v\n", r.Rate, r.AutoRemove) noOverrideMessage := "no throttle override is set\n" @@ -157,7 +154,7 @@ func getThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { // Handle errors. if err != nil { switch err { - case errNoOverrideSet: + case throttlestore.ErrNoOverrideSet: // Do nothing, let the rate condition handle // no override set messages. default: @@ -191,7 +188,7 @@ func setThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { } // Populate configs. - rateCfg := ThrottleOverrideConfig{ + rateCfg := throttlestore.ThrottleOverrideConfig{ Rate: rate, AutoRemove: autoRemove, } @@ -208,7 +205,7 @@ func setThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { } updateMessage := fmt.Sprintf("throttle successfully set to %dMB/s, autoremove==%v\n", rate, autoRemove) - configPath := overrideRateZnodePath + configPath := OverrideRateZnodePath writeOverride(w, id, configPath, updateMessage, err, zk, rateCfg) } @@ -216,7 +213,7 @@ func setThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { // removeThrottle removes the throttle rate for a specific broker, the global rate, or for all brokers. func removeThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler) { // Removing a rate means setting it to 0. - c := ThrottleOverrideConfig{ + c := throttlestore.ThrottleOverrideConfig{ Rate: 0, AutoRemove: false, } @@ -233,7 +230,7 @@ func removeThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler } } - configPath := overrideRateZnodePath + configPath := OverrideRateZnodePath updateMessage := "throttle removed\n" var err error @@ -241,7 +238,7 @@ func removeThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler if id == "all" { // Instead of specifying a broker, the string 'all' means clear all overrides we have by setting to 0. var children []string - var parentPath = overrideRateZnodePath + var parentPath = OverrideRateZnodePath children, err = zk.Children(parentPath) sort.Strings(children) @@ -259,17 +256,17 @@ func removeThrottle(w http.ResponseWriter, req *http.Request, zk kafkazk.Handler } } -func writeOverride(w http.ResponseWriter, id string, configPath string, updateMessage string, err error, zk kafkazk.Handler, c ThrottleOverrideConfig) { +func writeOverride(w http.ResponseWriter, id string, configPath string, updateMessage string, err error, zk kafkazk.Handler, c throttlestore.ThrottleOverrideConfig) { // A non-0 ID means that this is broker specific. if id != "" { configPath, updateMessage = formatConfigAndMessage(configPath, id, updateMessage) } - err = storeThrottleOverride(zk, configPath, c) + err = throttlestore.StoreThrottleOverride(zk, configPath, c) if err != nil { switch err { - case errNoOverrideSet: + case throttlestore.ErrNoOverrideSet: // Do nothing. default: writeNLError(w, err) diff --git a/cmd/autothrottle/api_parsing.go b/cmd/autothrottle/internal/api/api_parsing.go similarity index 99% rename from cmd/autothrottle/api_parsing.go rename to cmd/autothrottle/internal/api/api_parsing.go index 83076f5..fa0cfe3 100644 --- a/cmd/autothrottle/api_parsing.go +++ b/cmd/autothrottle/internal/api/api_parsing.go @@ -1,4 +1,4 @@ -package main +package api import ( "errors" diff --git a/cmd/autothrottle/api_parsing_test.go b/cmd/autothrottle/internal/api/api_parsing_test.go similarity index 99% rename from cmd/autothrottle/api_parsing_test.go rename to cmd/autothrottle/internal/api/api_parsing_test.go index 7e78a69..fa33efe 100644 --- a/cmd/autothrottle/api_parsing_test.go +++ b/cmd/autothrottle/internal/api/api_parsing_test.go @@ -1,4 +1,4 @@ -package main +package api import ( "fmt" diff --git a/cmd/autothrottle/api_test.go b/cmd/autothrottle/internal/api/api_test.go similarity index 98% rename from cmd/autothrottle/api_test.go rename to cmd/autothrottle/internal/api/api_test.go index 766080e..f1f44c0 100644 --- a/cmd/autothrottle/api_test.go +++ b/cmd/autothrottle/internal/api/api_test.go @@ -1,4 +1,4 @@ -package main +package api import ( "fmt" @@ -150,7 +150,7 @@ func TestRemoveBrokerThrottle(t *testing.T) { func TestRemoveAllBrokerThrottle(t *testing.T) { // GIVEN overrideRateZnode = "override_rate" - overrideRateZnodePath = fmt.Sprintf("%s/%s", "zkChroot", overrideRateZnode) + OverrideRateZnodePath = fmt.Sprintf("%s/%s", "zkChroot", overrideRateZnode) zk := kafkazk.NewZooKeeperStub() setReq, err := http.NewRequest("POST", "/throttle/123?rate=5&autoremove=false", nil) diff --git a/cmd/autothrottle/internal/throttlestore/brokers.go b/cmd/autothrottle/internal/throttlestore/brokers.go new file mode 100644 index 0000000..f8e9050 --- /dev/null +++ b/cmd/autothrottle/internal/throttlestore/brokers.go @@ -0,0 +1,52 @@ +package throttlestore + +// BrokerOverrides is a map of broker ID to BrokerThrottleOverride. +type BrokerOverrides map[int]BrokerThrottleOverride + +// BrokerThrottleOverride holds broker-specific overrides. +type BrokerThrottleOverride struct { + // Broker ID. + ID int + // Whether this override is for a broker that's part of a reassignment. + ReassignmentParticipant bool + // The ThrottleOverrideConfig. + Config ThrottleOverrideConfig +} + +// BrokerOverridesFilterFn specifies a filter function. +type BrokerOverridesFilterFn func(BrokerThrottleOverride) bool + +// Copy returns a copy of a BrokerThrottleOverride. +func (b BrokerThrottleOverride) Copy() BrokerThrottleOverride { + return BrokerThrottleOverride{ + ID: b.ID, + ReassignmentParticipant: b.ReassignmentParticipant, + Config: ThrottleOverrideConfig{ + Rate: b.Config.Rate, + AutoRemove: b.Config.AutoRemove, + }, + } +} + +// IDs returns a []int of broker IDs held by the BrokerOverrides. +func (b BrokerOverrides) IDs() []int { + var ids []int + for id := range b { + ids = append(ids, id) + } + + return ids +} + +// Filter takes a BrokerOverridesFilterFn and returns a BrokerOverrides where +// all elements return true as an input to the filter func. +func (b BrokerOverrides) Filter(fn BrokerOverridesFilterFn) BrokerOverrides { + var bo = make(BrokerOverrides) + for _, bto := range b { + if fn(bto) { + bo[bto.ID] = bto.Copy() + } + } + + return bo +} diff --git a/cmd/autothrottle/throttles_persistence.go b/cmd/autothrottle/internal/throttlestore/store.go similarity index 76% rename from cmd/autothrottle/throttles_persistence.go rename to cmd/autothrottle/internal/throttlestore/store.go index ef650ca..d53797a 100644 --- a/cmd/autothrottle/throttles_persistence.go +++ b/cmd/autothrottle/internal/throttlestore/store.go @@ -1,4 +1,4 @@ -package main +package throttlestore import ( "encoding/json" @@ -10,18 +10,27 @@ import ( ) var ( - errNoOverrideSet = errors.New("no override set at path") + ErrNoOverrideSet = errors.New("no override set at path") ) +// ThrottleOverrideConfig holds throttle override configurations. +type ThrottleOverrideConfig struct { + // Rate in MB. + Rate int `json:"rate"` + // Whether the override rate should be + // removed when the current reassignments finish. + AutoRemove bool `json:"autoremove"` +} + // fetchThrottleOverride gets a throttle override from path p. -func fetchThrottleOverride(zk kafkazk.Handler, p string) (*ThrottleOverrideConfig, error) { +func FetchThrottleOverride(zk kafkazk.Handler, p string) (*ThrottleOverrideConfig, error) { c := &ThrottleOverrideConfig{} override, err := zk.Get(p) if err != nil { switch err.(type) { case kafkazk.ErrNoNode: - return c, errNoOverrideSet + return c, ErrNoOverrideSet default: return c, fmt.Errorf("error getting throttle override: %s", err) } @@ -39,7 +48,7 @@ func fetchThrottleOverride(zk kafkazk.Handler, p string) (*ThrottleOverrideConfi } // storeThrottleOverride sets a throttle override to path p. -func storeThrottleOverride(zk kafkazk.Handler, p string, c ThrottleOverrideConfig) error { +func StoreThrottleOverride(zk kafkazk.Handler, p string, c ThrottleOverrideConfig) error { d, err := json.Marshal(c) if err != nil { return fmt.Errorf("error marshalling override config: %s", err) @@ -65,7 +74,7 @@ func storeThrottleOverride(zk kafkazk.Handler, p string, c ThrottleOverrideConfi } // removeThrottleOverride deletes an override at path p. -func removeThrottleOverride(zk kafkazk.Handler, p string) error { +func RemoveThrottleOverride(zk kafkazk.Handler, p string) error { exists, err := zk.Exists(p) if !exists && err == nil { return nil @@ -79,11 +88,11 @@ func removeThrottleOverride(zk kafkazk.Handler, p string) error { return nil } -// fetchBrokerOverrides returns a BrokerOverrides populated with all brokers +// FetchBrokerOverrides returns a BrokerOverrides populated with all brokers // with overrides set. This function exists as a convenience since the number of // broker overrides can vary, as opposed to the global which has a single, // consistent znode that always exists. -func fetchBrokerOverrides(zk kafkazk.Handler, p string) (BrokerOverrides, error) { +func FetchBrokerOverrides(zk kafkazk.Handler, p string) (BrokerOverrides, error) { overrides := BrokerOverrides{} // Get brokers with overrides configured. diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 9a05c25..a2054d6 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/api" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" "github.com/DataDog/kafka-kit/v3/kafkametrics" "github.com/DataDog/kafka-kit/v3/kafkametrics/datadog" "github.com/DataDog/kafka-kit/v3/kafkazk" @@ -22,29 +24,31 @@ var ( // This can be set with -ldflags "-X main.version=x.x.x" version = "0.0.0" - // Config holds configuration - // parameters. + // Config holds configuration parameters. Config struct { - APIKey string - AppKey string - NetworkTXQuery string - NetworkRXQuery string - BrokerIDTag string - InstanceTypeTag string - MetricsWindow int - ZKAddr string - ZKPrefix string - Interval int - APIListen string - ConfigZKPrefix string - DDEventTags string - MinRate float64 - SourceMaxRate float64 - DestinationMaxRate float64 - ChangeThreshold float64 - FailureThreshold int - CapMap map[string]float64 - CleanupAfter int64 + KafkaNativeMode bool + KafkaAPIRequestTimeout int + APIKey string + AppKey string + NetworkTXQuery string + NetworkRXQuery string + BrokerIDTag string + InstanceTypeTag string + MetricsWindow int + BootstrapServers string + ZKAddr string + ZKPrefix string + Interval int + APIListen string + ConfigZKPrefix string + DDEventTags string + MinRate float64 + SourceMaxRate float64 + DestinationMaxRate float64 + ChangeThreshold float64 + FailureThreshold int + CapMap map[string]float64 + CleanupAfter int64 } // Misc. @@ -53,6 +57,8 @@ var ( func main() { v := flag.Bool("version", false, "version") + flag.BoolVar(&Config.KafkaNativeMode, "kafka-native-mode", false, "Favor native Kafka RPCs over ZooKeeper metadata access") + flag.IntVar(&Config.KafkaAPIRequestTimeout, "kafka-api-request-timeout", 15, "Kafka API request timeout (seconds)") flag.StringVar(&Config.APIKey, "api-key", "", "Datadog API key") flag.StringVar(&Config.AppKey, "app-key", "", "Datadog app key") flag.StringVar(&Config.NetworkTXQuery, "net-tx-query", "avg:system.net.bytes_sent{service:kafka} by {host}", "Datadog query for broker outbound bandwidth by host") @@ -60,6 +66,7 @@ func main() { flag.StringVar(&Config.BrokerIDTag, "broker-id-tag", "broker_id", "Datadog host tag for broker ID") flag.StringVar(&Config.InstanceTypeTag, "instance-type-tag", "instance-type", "Datadog tag for instance type") flag.IntVar(&Config.MetricsWindow, "metrics-window", 120, "Time span of metrics required (seconds)") + flag.StringVar(&Config.BootstrapServers, "bootstrap-servers", "localhost:9092", "Kafka bootstrap servers") flag.StringVar(&Config.ZKAddr, "zk-addr", "localhost:2181", "ZooKeeper connect string (for broker metadata or rebuild-topic lookups)") flag.StringVar(&Config.ZKPrefix, "zk-prefix", "", "ZooKeeper namespace prefix") flag.IntVar(&Config.Interval, "interval", 180, "Autothrottle check interval (seconds)") @@ -93,8 +100,7 @@ func main() { } log.Println("Autothrottle Running") - // Lazily prevent a tight restart - // loop from thrashing ZK. + // Lazily prevent a tight restart loop from thrashing ZK. time.Sleep(1 * time.Second) // Init ZK. @@ -102,19 +108,20 @@ func main() { Connect: Config.ZKAddr, Prefix: Config.ZKPrefix, }) + if err != nil { + log.Fatal(err) + } + + defer zk.Close() // Init the admin API. - apiConfig := &APIConfig{ + apiConfig := &api.APIConfig{ Listen: Config.APIListen, ZKPrefix: Config.ConfigZKPrefix, } - initAPI(apiConfig, zk) + api.Init(apiConfig, zk) log.Printf("Admin API: %s\n", Config.APIListen) - if err != nil { - log.Fatal(err) - } - defer zk.Close() // Init a Kafka metrics fetcher. km, err := datadog.NewHandler(&datadog.Config{ @@ -148,7 +155,7 @@ func main() { tags: tags, } - // Default to true on startup in case throttles were set in an autothrottle + // Default to true on startup in case throttles were set in an autothrottle // process other than the current one. knownThrottles := true @@ -175,25 +182,45 @@ func main() { log.Fatal(err) } - throttleMeta := &ReplicationThrottleConfigs{ + ThrottleManager := &ThrottleManager{ zk: zk, km: km, + kafkaNativeMode: Config.KafkaNativeMode, + kafkaAPIRequestTimeout: Config.KafkaAPIRequestTimeout, events: events, previouslySetThrottles: make(replicationCapacityByBroker), limits: lim, failureThreshold: Config.FailureThreshold, } + // Init a KafkaAdmin Client if needed. + if Config.KafkaNativeMode { + if err := ThrottleManager.InitKafkaAdmin(Config.BootstrapServers); err != nil { + log.Fatal(err) + } + log.Printf("Connected to Kafka: %s\n", Config.BootstrapServers) + } + // Run. var interval int64 var ticker = time.NewTicker(time.Duration(Config.Interval) * time.Second) // TODO(jamie): refactor this loop. - for { + for ; ; <-ticker.C { interval++ // Get topics undergoing reassignment. - reassignments = zk.GetReassignments() // XXX This needs to return an error. + if !Config.KafkaNativeMode { + reassignments = zk.GetReassignments() + } else { + // KIP-455 compatible reassignments lookup. + reassignments, err = zk.ListReassignments() + if err != nil { + fmt.Printf("error fetching reassignments: %s\n", err) + continue + } + } + topicsReplicatingNow = newSet() for t := range reassignments { topicsReplicatingNow.add(t) @@ -215,9 +242,9 @@ func main() { // the Kafka topic throttled replicas list. This minimizes // state that must be propagated through the cluster. if topicsReplicatingNow.isSubSet(topicsReplicatingPreviously) { - throttleMeta.DisableTopicUpdates() + ThrottleManager.DisableTopicUpdates() } else { - throttleMeta.EnableTopicUpdates() + ThrottleManager.EnableTopicUpdates() // Unset any previously stored throttle rates. This is done to avoid a // scenario that results in autothrottle being unaware of externally // specified throttles and failing to override them. The condition can be @@ -242,7 +269,7 @@ func main() { // required ChangeThreshold. // // Ensure we're doing option 1 right here: - throttleMeta.previouslySetThrottles.reset() + ThrottleManager.previouslySetThrottles.reset() } // Rebuild topicsReplicatingPreviously with the current replications @@ -250,32 +277,35 @@ func main() { topicsReplicatingPreviously = topicsReplicatingNow.copy() // Check if a global throttle override was configured. - overrideCfg, err := fetchThrottleOverride(zk, overrideRateZnodePath) + overrideCfg, err := throttlestore.FetchThrottleOverride(zk, api.OverrideRateZnodePath) if err != nil { log.Println(err) } // Fetch all broker-specific overrides. - throttleMeta.brokerOverrides, err = fetchBrokerOverrides(zk, overrideRateZnodePath) + bo, err := throttlestore.FetchBrokerOverrides(zk, api.OverrideRateZnodePath) if err != nil { log.Println(err) } // Get the maps of brokers handling reassignments. - throttleMeta.reassigningBrokers, err = getReassigningBrokers(reassignments, zk) + rb, err := getReassigningBrokers(reassignments, zk) if err != nil { log.Println(err) } + ThrottleManager.brokerOverrides = bo + ThrottleManager.reassigningBrokers = rb + // If topics are being reassigned, update the replication throttle. if len(topicsReplicatingNow) > 0 { log.Printf("Topics with ongoing reassignments: %s\n", topicsReplicatingNow.keys()) - // Update the throttleMeta. - throttleMeta.overrideRate = overrideCfg.Rate - throttleMeta.reassignments = reassignments + // Update the ThrottleManager. + ThrottleManager.overrideRate = overrideCfg.Rate + ThrottleManager.reassignments = reassignments - err = updateReplicationThrottle(throttleMeta) + err = ThrottleManager.updateReplicationThrottle() if err != nil { log.Println(err) } else { @@ -286,11 +316,11 @@ func main() { // Get brokers with active overrides, ie where the override rate is non-0, // that are also not part of a reassignment. - activeOverrideBrokers := throttleMeta.brokerOverrides.Filter(notReassignmentParticipant) + activeOverrideBrokers := ThrottleManager.brokerOverrides.Filter(notReassignmentParticipant) // Apply any additional broker-specific throttles that were not applied as // part of a reassignment. - if len(throttleMeta.brokerOverrides) > 0 { + if len(ThrottleManager.brokerOverrides) > 0 { // Find all topics that include brokers with static overrides // configured that aren't being reassigned. In order for broker-specific // throttles to be applied, topics being replicated by those brokers @@ -301,7 +331,7 @@ func main() { // have also have a reassignment? We're discovering topics here by // reverse lookup of brokers that are not reassignment participants. var err error - throttleMeta.overrideThrottleLists, err = getTopicsWithThrottledBrokers(throttleMeta) + ThrottleManager.overrideThrottleLists, err = ThrottleManager.getTopicsWithThrottledBrokers() if err != nil { log.Printf("Error fetching topic states: %s\n", err) } @@ -315,15 +345,15 @@ func main() { } if brokersThrottledNow.equal(brokersThrottledPreviously) { - throttleMeta.DisableOverrideTopicUpdates() + ThrottleManager.DisableOverrideTopicUpdates() } else { - throttleMeta.EnableOverrideTopicUpdates() + ThrottleManager.EnableOverrideTopicUpdates() } brokersThrottledPreviously = brokersThrottledNow.copy() // Update throttles. - if err := updateOverrideThrottles(throttleMeta); err != nil { + if err := ThrottleManager.updateOverrideThrottles(); err != nil { log.Println(err) } @@ -335,7 +365,7 @@ func main() { } // Remove and delete any broker-specific overrides set to 0. - if errs := purgeOverrideThrottles(throttleMeta); errs != nil { + if errs := ThrottleManager.purgeOverrideThrottles(); errs != nil { log.Println("Error removing persisted broker throttle overrides") for i := range errs { log.Println(errs[i]) @@ -391,7 +421,7 @@ func main() { interval = 0 // Remove all the broker + topic throttle configs. - err := removeAllThrottles(throttleMeta) + err := ThrottleManager.removeAllThrottles() if err != nil { log.Printf("Error removing throttles: %s\n", err.Error()) } else { @@ -401,12 +431,12 @@ func main() { } // Ensure topic throttle updates are re-enabled. - throttleMeta.EnableTopicUpdates() - throttleMeta.EnableOverrideTopicUpdates() + ThrottleManager.EnableTopicUpdates() + ThrottleManager.EnableOverrideTopicUpdates() // Remove any configured throttle overrides if AutoRemove is true. if overrideCfg.AutoRemove { - err := storeThrottleOverride(zk, overrideRateZnodePath, ThrottleOverrideConfig{}) + err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{}) if err != nil { log.Println(err) } else { @@ -415,7 +445,6 @@ func main() { } } - <-ticker.C } } diff --git a/cmd/autothrottle/throttle_update_legacy.go b/cmd/autothrottle/throttle_update_legacy.go new file mode 100644 index 0000000..e90fdc6 --- /dev/null +++ b/cmd/autothrottle/throttle_update_legacy.go @@ -0,0 +1,193 @@ +// These are legacy methods that perform updates directly through ZooKeeper. +package main + +import ( + "errors" + "fmt" + "log" + "sort" + "strconv" + "strings" + "time" + + "github.com/DataDog/kafka-kit/v3/kafkazk" +) + +func (tm *ThrottleManager) legacyApplyBrokerThrottles(configs map[int]kafkazk.KafkaConfig, capacities replicationCapacityByBroker) (chan brokerChangeEvent, []error) { + events := make(chan brokerChangeEvent, len(configs)*2) + var errs []error + + for ID, config := range configs { + changes, err := tm.zk.UpdateKafkaConfig(config) + if err != nil { + errs = append(errs, fmt.Errorf("Error setting throttle on broker %d: %s", ID, err)) + } + + for i, changed := range changes { + if changed { + // This will be either "leader.replication.throttled.rate" or + // "follower.replication.throttled.rate". + throttleConfigString := config.Configs[i][0] + // Split on ".", get "leader" or "follower" string. + role := strings.Split(throttleConfigString, ".")[0] + + log.Printf("Updated throttle on broker %d [%s]\n", ID, role) + + var rate *float64 + + // Store the configured rate. + switch role { + case "leader": + rate = capacities[ID][0] + tm.previouslySetThrottles.storeLeaderCapacity(ID, *rate) + case "follower": + rate = capacities[ID][1] + tm.previouslySetThrottles.storeFollowerCapacity(ID, *rate) + } + + events <- brokerChangeEvent{ + id: ID, + role: role, + rate: *rate, + } + } + } + + // Hard coded sleep to reduce + // ZK load. + time.Sleep(250 * time.Millisecond) + } + + close(events) + + return events, errs +} + +func (tm *ThrottleManager) legacyApplyTopicThrottles(throttled topicThrottledReplicas) []error { + var errs []error + + for t := range throttled { + // Generate config. + config := kafkazk.KafkaConfig{ + Type: "topic", + Name: string(t), + Configs: []kafkazk.KafkaConfigKV{}, + } + + // The sort is important; it avoids unecessary config updates due to the same + // data but in different orders. + sort.Strings(throttled[t]["leaders"]) + sort.Strings(throttled[t]["followers"]) + + leaderList := strings.Join(throttled[t]["leaders"], ",") + if leaderList != "" { + c := kafkazk.KafkaConfigKV{"leader.replication.throttled.replicas", leaderList} + config.Configs = append(config.Configs, c) + } + + followerList := strings.Join(throttled[t]["followers"], ",") + if followerList != "" { + c := kafkazk.KafkaConfigKV{"follower.replication.throttled.replicas", followerList} + config.Configs = append(config.Configs, c) + } + + // Write the config. + _, err := tm.zk.UpdateKafkaConfig(config) + if err != nil { + errs = append(errs, fmt.Errorf("Error setting throttle list on topic %s: %s\n", t, err)) + } + } + + return nil +} + +func (tm *ThrottleManager) legacyRemoveTopicThrottles() error { + topics, err := tm.zk.GetTopics(topicsRegex) + if err != nil { + return err + } + + var errTopics []string + + for _, topic := range topics { + config := kafkazk.KafkaConfig{ + Type: "topic", + Name: topic, + Configs: []kafkazk.KafkaConfigKV{ + {"leader.replication.throttled.replicas", ""}, + {"follower.replication.throttled.replicas", ""}, + }, + } + + // Update the config. + _, err := tm.zk.UpdateKafkaConfig(config) + if err != nil { + errTopics = append(errTopics, topic) + } + + time.Sleep(250 * time.Millisecond) + } + + if errTopics != nil { + names := strings.Join(errTopics, ", ") + return fmt.Errorf("Error removing throttle config on topics: %s\n", names) + } + + return nil +} + +func (tm *ThrottleManager) legacyRemoveBrokerThrottlesByID(ids map[int]struct{}) error { + var unthrottledBrokers []int + var errorEncountered bool + + // Unset throttles. + for b := range ids { + config := kafkazk.KafkaConfig{ + Type: "broker", + Name: strconv.Itoa(b), + Configs: []kafkazk.KafkaConfigKV{ + {"leader.replication.throttled.rate", ""}, + {"follower.replication.throttled.rate", ""}, + }, + } + + changed, err := tm.zk.UpdateKafkaConfig(config) + switch err.(type) { + case nil: + case kafkazk.ErrNoNode: + // We'd get an ErrNoNode here only if the parent path for dynamic broker + // configs (/config/brokers) if it doesn't exist, which can happen in + // new clusters that have never had dynamic configs applied. Rather than + // creating that znode, we'll just ignore errors here; if the znodes + // don't exist, there's not even config to remove. + default: + errorEncountered = true + log.Printf("Error removing throttle on broker %d: %s\n", b, err) + } + + if changed[0] || changed[1] { + unthrottledBrokers = append(unthrottledBrokers, b) + log.Printf("Throttle removed on broker %d\n", b) + + // Unset the previously stored throttle rate. + tm.previouslySetThrottles[b] = [2]*float64{} + } + + // Hardcoded sleep to reduce ZK load. + time.Sleep(250 * time.Millisecond) + } + + // Write event. + if len(unthrottledBrokers) > 0 { + m := fmt.Sprintf("Replication throttle removed on the following brokers: %v", + unthrottledBrokers) + tm.events.Write("Broker replication throttle removed", m) + } + + // Lazily check if any errors were encountered, return a generic error. + if errorEncountered { + return errors.New("one or more throttles were not cleared") + } + + return nil +} diff --git a/cmd/autothrottle/throttles.go b/cmd/autothrottle/throttles.go index 2809bdd..12ff60e 100644 --- a/cmd/autothrottle/throttles.go +++ b/cmd/autothrottle/throttles.go @@ -1,20 +1,27 @@ package main import ( + "context" + "time" + + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" + "github.com/DataDog/kafka-kit/v3/kafkaadmin" "github.com/DataDog/kafka-kit/v3/kafkametrics" "github.com/DataDog/kafka-kit/v3/kafkazk" ) -// ReplicationThrottleConfigs holds all the data needed to call -// updateReplicationThrottle. -type ReplicationThrottleConfigs struct { - reassignments kafkazk.Reassignments - zk kafkazk.Handler - km kafkametrics.Handler - overrideRate int +// ThrottleManager manages Kafka throttle rates. +type ThrottleManager struct { + reassignments kafkazk.Reassignments + zk kafkazk.Handler + km kafkametrics.Handler + ka kafkaadmin.KafkaAdmin + overrideRate int + kafkaNativeMode bool + kafkaAPIRequestTimeout int // The following three fields are for brokers with static overrides set // and a topicThrottledReplicas for topics where those brokers are assigned. - brokerOverrides BrokerOverrides + brokerOverrides throttlestore.BrokerOverrides overrideThrottleLists topicThrottledReplicas skipOverrideTopicUpdates bool reassigningBrokers reassigningBrokers @@ -26,79 +33,30 @@ type ReplicationThrottleConfigs struct { skipTopicUpdates bool } -// ThrottleOverrideConfig holds throttle override configurations. -type ThrottleOverrideConfig struct { - // Rate in MB. - Rate int `json:"rate"` - // Whether the override rate should be - // removed when the current reassignments finish. - AutoRemove bool `json:"autoremove"` -} - -// BrokerOverrides is a map of broker ID to BrokerThrottleOverride. -type BrokerOverrides map[int]BrokerThrottleOverride - -// BrokerThrottleOverride holds broker-specific overrides. -type BrokerThrottleOverride struct { - // Broker ID. - ID int - // Whether this override is for a broker that's part of a reassignment. - ReassignmentParticipant bool - // The ThrottleOverrideConfig. - Config ThrottleOverrideConfig -} - -// Copy returns a copy of a BrokerThrottleOverride. -func (b BrokerThrottleOverride) Copy() BrokerThrottleOverride { - return BrokerThrottleOverride{ - ID: b.ID, - ReassignmentParticipant: b.ReassignmentParticipant, - Config: ThrottleOverrideConfig{ - Rate: b.Config.Rate, - AutoRemove: b.Config.AutoRemove, - }, - } -} - -// IDs returns a []int of broker IDs held by the BrokerOverrides. -func (b BrokerOverrides) IDs() []int { - var ids []int - for id := range b { - ids = append(ids, id) +// InitKafkaAdmin takes a csv Kafka broker list and initializes a kafkaadmin +// client. +func (tm *ThrottleManager) InitKafkaAdmin(brokers string) error { + cfg := kafkaadmin.Config{BootstrapServers: brokers} + ka, err := kafkaadmin.NewClient(cfg) + if err != nil { + return err } - return ids + tm.ka = ka + return nil } -// BrokerOverridesFilterFn specifies a filter function. -type BrokerOverridesFilterFn func(BrokerThrottleOverride) bool - -// Filter funcs. - -func hasActiveOverride(bto BrokerThrottleOverride) bool { +func hasActiveOverride(bto throttlestore.BrokerThrottleOverride) bool { return bto.Config.Rate != 0 } -func notReassignmentParticipant(bto BrokerThrottleOverride) bool { +func notReassignmentParticipant(bto throttlestore.BrokerThrottleOverride) bool { return !bto.ReassignmentParticipant && bto.Config.Rate != 0 } -// Filter takes a BrokerOverridesFilterFn and returns a BrokerOverrides where -// all elements return true as an input to the filter func. -func (b BrokerOverrides) Filter(fn BrokerOverridesFilterFn) BrokerOverrides { - var bo = make(BrokerOverrides) - for _, bto := range b { - if fn(bto) { - bo[bto.ID] = bto.Copy() - } - } - - return bo -} - // Failure increments the failures count and returns true if the // count exceeds the failures threshold. -func (r *ReplicationThrottleConfigs) Failure() bool { +func (r *ThrottleManager) Failure() bool { r.failures++ if r.failures > r.failureThreshold { @@ -109,31 +67,40 @@ func (r *ReplicationThrottleConfigs) Failure() bool { } // ResetFailures resets the failures count. -func (r *ReplicationThrottleConfigs) ResetFailures() { - r.failures = 0 +func (tm *ThrottleManager) ResetFailures() { + tm.failures = 0 } // DisableTopicUpdates prevents topic throttled replica lists from being // updated in ZooKeeper. -func (r *ReplicationThrottleConfigs) DisableTopicUpdates() { - r.skipTopicUpdates = true +func (tm *ThrottleManager) DisableTopicUpdates() { + tm.skipTopicUpdates = true } // DisableTopicUpdates allows topic throttled replica lists updates in ZooKeeper. -func (r *ReplicationThrottleConfigs) EnableTopicUpdates() { - r.skipTopicUpdates = false +func (tm *ThrottleManager) EnableTopicUpdates() { + tm.skipTopicUpdates = false } // DisableOverrideTopicUpdates prevents topic throttled replica lists for // topics assigned to override brokers from being updated in ZooKeeper. -func (r *ReplicationThrottleConfigs) DisableOverrideTopicUpdates() { - r.skipOverrideTopicUpdates = true +func (tm *ThrottleManager) DisableOverrideTopicUpdates() { + tm.skipOverrideTopicUpdates = true } // EnableOverrideTopicUpdates allows topic throttled replica lists for // topics assigned to override brokers to be updated in ZooKeeper. -func (r *ReplicationThrottleConfigs) EnableOverrideTopicUpdates() { - r.skipOverrideTopicUpdates = false +func (tm *ThrottleManager) EnableOverrideTopicUpdates() { + tm.skipOverrideTopicUpdates = false +} + +// kafkaRequestContext returns a context and cancel func with the default +// ThrottleManager Kafka API request timeout. +func (tm *ThrottleManager) kafkaRequestContext() (context.Context, context.CancelFunc) { + return context.WithTimeout( + context.Background(), + time.Duration(tm.kafkaAPIRequestTimeout)*time.Second, + ) } // ThrottledBrokers is a list of brokers with a throttle applied diff --git a/cmd/autothrottle/throttles_update.go b/cmd/autothrottle/throttles_update.go index 0ab193e..cddcf41 100644 --- a/cmd/autothrottle/throttles_update.go +++ b/cmd/autothrottle/throttles_update.go @@ -2,15 +2,15 @@ package main import ( "bytes" - "errors" "fmt" "log" "math" - "sort" "strconv" "strings" - "time" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/api" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" + "github.com/DataDog/kafka-kit/v3/kafkaadmin" "github.com/DataDog/kafka-kit/v3/kafkametrics" "github.com/DataDog/kafka-kit/v3/kafkazk" ) @@ -23,7 +23,7 @@ type brokerChangeEvent struct { rate float64 } -// updateReplicationThrottle takes a ReplicationThrottleConfigs that holds topics +// updateReplicationThrottle takes a ThrottleManager that holds topics // being replicated, any ZooKeeper/other clients, throttle override params, and // other required metadata. Metrics for brokers participating in any ongoing // replication are fetched to determine replication headroom. The replication @@ -31,10 +31,14 @@ type brokerChangeEvent struct { // that static value is used instead of a dynamically determined value. // Additionally, broker-specific overrides may be specified, which take precedence // over the global override. -// TODO(jamie): this function is absolute Mad Max. Fix. -func updateReplicationThrottle(params *ReplicationThrottleConfigs) error { +// TODO(jamie): This function is a masssively messy artifact from autothrottle +// quickly growing in complexity from an originally flat, simple program. A +// considerable amount of shared data needs to be better encapsulated so we can +// deconstruct these functions that hold too much of the general autothrottle logic. +// WIP on doing so. +func (tm *ThrottleManager) updateReplicationThrottle() error { // Creates lists from maps. - srcBrokers, dstBrokers, allBrokers := params.reassigningBrokers.lists() + srcBrokers, dstBrokers, allBrokers := tm.reassigningBrokers.lists() log.Printf("Source brokers participating in replication: %v\n", srcBrokers) log.Printf("Destination brokers participating in replication: %v\n", dstBrokers) @@ -49,16 +53,16 @@ func updateReplicationThrottle(params *ReplicationThrottleConfigs) error { var inFailureMode bool var metricErrs []error - if params.overrideRate != 0 { - log.Printf("A global throttle override is set: %dMB/s\n", params.overrideRate) + if tm.overrideRate != 0 { + log.Printf("A global throttle override is set: %dMB/s\n", tm.overrideRate) rateOverride = true - capacities.setAllRatesWithDefault(allBrokers, float64(params.overrideRate)) + capacities.setAllRatesWithDefault(allBrokers, float64(tm.overrideRate)) } if !rateOverride { // Get broker metrics. - brokerMetrics, metricErrs = params.km.GetMetrics() + brokerMetrics, metricErrs = tm.km.GetMetrics() // Even if errors are returned, we can still proceed as long as we have complete // metrics data for all target brokers. If we have broker metrics for all target // brokers, we can ignore any errors. @@ -76,46 +80,46 @@ func updateReplicationThrottle(params *ReplicationThrottleConfigs) error { log.Printf("Errors fetching metrics: %s\n", metricErrs) // Increment and check our failure count against the configured threshold. - over := params.Failure() + over := tm.Failure() // If we're not over the threshold, return and just retain previous throttles. if !over { log.Printf("Metrics fetch failure count %d doesn't exeed threshold %d, retaining previous throttle\n", - params.failures, params.failureThreshold) + tm.failures, tm.failureThreshold) return nil } // We're over the threshold; failback to the configured minimum. log.Printf("Metrics fetch failure count %d exceeds threshold %d, reverting to min-rate %.2fMB/s\n", - params.failures, params.failureThreshold, params.limits["minimum"]) + tm.failures, tm.failureThreshold, tm.limits["minimum"]) // Set the failback rate. - capacities.setAllRatesWithDefault(allBrokers, params.limits["minimum"]) + capacities.setAllRatesWithDefault(allBrokers, tm.limits["minimum"]) } // Reset the failure counter. We may have incremented in past iterations, but if // we're here now, we can reset the count. if !inFailureMode { - params.ResetFailures() + tm.ResetFailures() } // If there's no override set and we're not in a failure mode, apply the // calculated throttles. if !rateOverride && !inFailureMode { var err error - capacities, err = brokerReplicationCapacities(params, params.reassigningBrokers, brokerMetrics) + capacities, err = brokerReplicationCapacities(tm, tm.reassigningBrokers, brokerMetrics) if err != nil { return err } } // Merge in broker-specific overrides if they're part of the reassignment. - for id := range params.reassigningBrokers.all { - if override, exists := params.brokerOverrides[id]; exists { + for id := range tm.reassigningBrokers.all { + if override, exists := tm.brokerOverrides[id]; exists { // Any brokers with throttle overrides that are being issued as part of a // reassignemnt should be marked as such. override.ReassignmentParticipant = true - params.brokerOverrides[id] = override + tm.brokerOverrides[id] = override rate := override.Config.Rate // A rate of 0 means we intend to remove this throttle override. Skip. @@ -130,13 +134,7 @@ func updateReplicationThrottle(params *ReplicationThrottleConfigs) error { } // Set broker throttle configs. - events, errs := applyBrokerThrottles( - params.reassigningBrokers.all, - capacities, - params.previouslySetThrottles, - params.limits, - params.zk, - ) + events, errs := tm.applyBrokerThrottles(tm.reassigningBrokers.all, capacities) for _, e := range errs { // TODO(jamie): revisit whether we should actually be returning rather than @@ -157,29 +155,33 @@ func updateReplicationThrottle(params *ReplicationThrottleConfigs) error { } // Set topic throttle configs. - if !params.skipTopicUpdates { - _, errs = applyTopicThrottles(params.reassigningBrokers.throttledReplicas, params.zk) + if !tm.skipTopicUpdates { + errs := tm.applyTopicThrottles(tm.reassigningBrokers.throttledReplicas) for _, e := range errs { log.Println(e) } + if errs == nil { + topics := tm.reassigningBrokers.throttledReplicas.topics() + log.Printf("updated the throttle replicas configs for topics: %v\n", topics) + } } // Append topic stats to event. var topics []string - for t := range params.reassignments { + for t := range tm.reassignments { topics = append(topics, t) } b.WriteString(fmt.Sprintf("Topics currently undergoing replication: %v", topics)) // Ship it. - params.events.Write("Broker replication throttle set", b.String()) + tm.events.Write("Broker replication throttle set", b.String()) return nil } -// updateOverrideThrottles takes a *ReplicationThrottleConfigs and applies +// updateOverrideThrottles takes a *ThrottleManager and applies // replication throttles for any brokers with overrides set. -func updateOverrideThrottles(params *ReplicationThrottleConfigs) error { +func (tm *ThrottleManager) updateOverrideThrottles() error { // The rate spec we'll be applying, which is the override rates. var capacities = make(replicationCapacityByBroker) // Broker IDs that will have throttles set. @@ -187,7 +189,7 @@ func updateOverrideThrottles(params *ReplicationThrottleConfigs) error { // Broker IDs that should have previously set throttles removed. var toRemove = make(map[int]struct{}) - for _, override := range params.brokerOverrides { + for _, override := range tm.brokerOverrides { // ReassignmentParticipant have already had their override rate used as part // of an ongoing reassignment. if !override.ReassignmentParticipant { @@ -209,23 +211,22 @@ func updateOverrideThrottles(params *ReplicationThrottleConfigs) error { } // Set broker throttle configs. - events, errs := applyBrokerThrottles(toAssign, - capacities, - params.previouslySetThrottles, - params.limits, - params.zk, - ) + events, errs := tm.applyBrokerThrottles(toAssign, capacities) for _, e := range errs { log.Println(e) } // Set topic throttle configs. - if !params.skipOverrideTopicUpdates { - _, errs = applyTopicThrottles(params.overrideThrottleLists, params.zk) + if !tm.skipOverrideTopicUpdates { + errs := tm.applyTopicThrottles(tm.overrideThrottleLists) for _, e := range errs { log.Println(e) } + if errs == nil { + topics := tm.overrideThrottleLists.topics() + log.Printf("updated the throttle replicas configs for topics: %v\n", topics) + } } // Append broker throttle info to event. @@ -241,19 +242,19 @@ func updateOverrideThrottles(params *ReplicationThrottleConfigs) error { } // Ship it. - params.events.Write("Broker level throttle override(s) configured", b.String()) + tm.events.Write("Broker level throttle override(s) configured", b.String()) // Unset the broker throttles marked for removal. - return removeBrokerThrottlesByID(params, toRemove) + return tm.removeBrokerThrottlesByID(toRemove) } -// purgeOverrideThrottles takes a *ReplicationThrottleConfigs and removes +// purgeOverrideThrottles takes a *ThrottleManager and removes // broker overrides from ZK that have been set to a value of 0. -func purgeOverrideThrottles(params *ReplicationThrottleConfigs) []error { +func (tm *ThrottleManager) purgeOverrideThrottles() []error { // Broker IDs that should have previously set throttles removed. var toRemove = make(map[int]struct{}) - for _, override := range params.brokerOverrides { + for _, override := range tm.brokerOverrides { rate := float64(override.Config.Rate) // Rate == 0 means the rate was removed via the API. if rate == 0 { @@ -264,8 +265,8 @@ func purgeOverrideThrottles(params *ReplicationThrottleConfigs) []error { var errs []error for id := range toRemove { - path := fmt.Sprintf("%s/%d", overrideRateZnodePath, id) - if err := removeThrottleOverride(params.zk, path); err != nil { + path := fmt.Sprintf("%s/%d", api.OverrideRateZnodePath, id) + if err := throttlestore.RemoveThrottleOverride(tm.zk, path); err != nil { errs = append(errs, err) } } @@ -273,43 +274,45 @@ func purgeOverrideThrottles(params *ReplicationThrottleConfigs) []error { return errs } -// applyBrokerThrottles takes a set of brokers, a replication throttle rate -// string, rate, map for tracking applied throttles, and zk kafkazk.Handler -// zookeeper client. For each broker, the throttle rate is applied and if -// successful, the rate is stored in the throttles map for future reference. -// A channel of events and []string of errors is returned. -func applyBrokerThrottles(bs map[int]struct{}, capacities, prevThrottles replicationCapacityByBroker, l Limits, zk kafkazk.Handler) (chan brokerChangeEvent, []string) { - events := make(chan brokerChangeEvent, len(bs)*2) - var errs []string +// applyBrokerThrottles applies broker throttle configs. +func (tm *ThrottleManager) applyBrokerThrottles(bs map[int]struct{}, capacities replicationCapacityByBroker) (chan brokerChangeEvent, []error) { + var configs = kafkaadmin.SetThrottleConfig{Brokers: map[int]kafkaadmin.BrokerThrottleConfig{}} + var legacyConfigs = make(map[int]kafkazk.KafkaConfig) - // Set the throttle config for all reassigning brokers. + // Set the throttle config for all reassigning brokers. We currently populate + // both the Kafka native and legacy configs, conditionally applying whichever + // is configured after all rates are calculated. for ID := range bs { - brokerConfig := kafkazk.KafkaConfig{ + + legacyBrokerConfig := kafkazk.KafkaConfig{ Type: "broker", Name: strconv.Itoa(ID), Configs: []kafkazk.KafkaConfigKV{}, } + brokerConfig := kafkaadmin.BrokerThrottleConfig{} + // Check if a rate was determined for each role (leader, follower) type. for i, rate := range capacities[ID] { if rate == nil { continue } - role := roleFromIndex(i) - prevRate := prevThrottles[ID][i] + // Get the previously set throttle rate. + prevRate := tm.previouslySetThrottles[ID][i] if prevRate == nil { v := 0.00 prevRate = &v } + // Get the maximum utilization value for logging purposes. var max float64 switch role { case "leader": - max = l["srcMax"] + max = tm.limits["srcMax"] case "follower": - max = l["dstMax"] + max = tm.limits["dstMax"] } log.Printf("Replication throttle rate for broker %d [%s] (based on a %.0f%% max free capacity utilization): %0.2fMB/s\n", @@ -325,52 +328,90 @@ func applyBrokerThrottles(bs map[int]struct{}, capacities, prevThrottles replica continue } - rateBytesString := fmt.Sprintf("%.0f", *rate*1000000.00) + rateBytes := *rate * 1000000.00 + rateBytesString := fmt.Sprintf("%.0f", rateBytes) + + // Add config. + switch role { + case "leader": + brokerConfig.OutboundLimitBytes = int(math.Round(rateBytes)) + case "follower": + brokerConfig.InboundLimitBytes = int(math.Round(rateBytes)) + } - // Append config. + // Add legacy config. c := kafkazk.KafkaConfigKV{fmt.Sprintf("%s.replication.throttled.rate", role), rateBytesString} - brokerConfig.Configs = append(brokerConfig.Configs, c) + legacyBrokerConfig.Configs = append(legacyBrokerConfig.Configs, c) } - // Write the throttle config. - changes, err := zk.UpdateKafkaConfig(brokerConfig) + // Populate each configuration collection. + configs.Brokers[ID] = brokerConfig + legacyConfigs[ID] = legacyBrokerConfig + } + + // Write the throttle configs. + + if !tm.kafkaNativeMode { + // Use the direct ZooKeeper config update method. + return tm.legacyApplyBrokerThrottles(legacyConfigs, capacities) + } + + return tm.applyBrokerThrottlesSequential(configs, capacities) +} + +// KafkaAdmin applies these sequentially under the hood, but from an API perspective +// it's a single batch job: if one fails, a single error is returned. We break +// these into sequential KafkaAdmin SetThrottle calls so that we can individually +// report errors/successes. +func (tm *ThrottleManager) applyBrokerThrottlesSequential(configs kafkaadmin.SetThrottleConfig, capacities replicationCapacityByBroker) (chan brokerChangeEvent, []error) { + events := make(chan brokerChangeEvent, len(configs.Brokers)*2) + var errs []error + + // For each broker, create a new config that only holds that broker and apply it. + for id, config := range configs.Brokers { + cfg := kafkaadmin.SetThrottleConfig{ + Brokers: map[int]kafkaadmin.BrokerThrottleConfig{ + id: config, + }} + + ctx, cancelFn := tm.kafkaRequestContext() + defer cancelFn() + + // Apply. + err := tm.ka.SetThrottle(ctx, cfg) if err != nil { - errs = append(errs, fmt.Sprintf("Error setting throttle on broker %d: %s", ID, err)) + errs = append(errs, fmt.Errorf("Error setting throttle on broker %d: %s", id, err)) + // Continue to the next broker if we encounter an error. + continue } - for i, changed := range changes { - if changed { - // This will be either "leader.replication.throttled.rate" or - // "follower.replication.throttled.rate". - throttleConfigString := brokerConfig.Configs[i][0] - // Split on ".", get "leader" or "follower" string. - role := strings.Split(throttleConfigString, ".")[0] - - log.Printf("Updated throttle on broker %d [%s]\n", ID, role) - - var rate *float64 - - // Store the configured rate. - switch role { - case "leader": - rate = capacities[ID][0] - prevThrottles.storeLeaderCapacity(ID, *rate) - case "follower": - rate = capacities[ID][1] - prevThrottles.storeFollowerCapacity(ID, *rate) - } - - events <- brokerChangeEvent{ - id: ID, - role: role, - rate: *rate, - } + // Store the configured rates in the previously set throttles map. + + // Store and log leader configs, if any. + if cfg.Brokers[id].OutboundLimitBytes != 0 { + rate := capacities[id][0] + tm.previouslySetThrottles.storeLeaderCapacity(id, *rate) + + log.Printf("Updated throttle on broker %d [leader]\n", id) + events <- brokerChangeEvent{ + id: id, + role: "leader", + rate: *rate, } } - // Hard coded sleep to reduce - // ZK load. - time.Sleep(250 * time.Millisecond) + // Store and log follower configs, if any. + if cfg.Brokers[id].InboundLimitBytes != 0 { + rate := capacities[id][1] + tm.previouslySetThrottles.storeFollowerCapacity(id, *rate) + + log.Printf("Updated throttle on broker %d [follower]\n", id) + events <- brokerChangeEvent{ + id: id, + role: "follower", + rate: *rate, + } + } } close(events) @@ -379,71 +420,37 @@ func applyBrokerThrottles(bs map[int]struct{}, capacities, prevThrottles replica } // applyTopicThrottles updates a throttledReplicas for all topics undergoing -// replication, returning a channel of events and []string of errors. +// replication. // TODO(jamie) review whether the throttled replicas list changes as replication // finishes; each time the list changes here, we probably update the config then // propagate a watch to all the brokers in the cluster. -func applyTopicThrottles(throttled topicThrottledReplicas, zk kafkazk.Handler) (chan string, []string) { - events := make(chan string, len(throttled)) - var errs []string - - for t := range throttled { - // Generate config. - config := kafkazk.KafkaConfig{ - Type: "topic", - Name: string(t), - Configs: []kafkazk.KafkaConfigKV{}, - } - - // The sort is important; it avoids unecessary config updates due to the same - // data but in different orders. - sort.Strings(throttled[t]["leaders"]) - sort.Strings(throttled[t]["followers"]) - - leaderList := strings.Join(throttled[t]["leaders"], ",") - if leaderList != "" { - c := kafkazk.KafkaConfigKV{"leader.replication.throttled.replicas", leaderList} - config.Configs = append(config.Configs, c) - } - - followerList := strings.Join(throttled[t]["followers"], ",") - if followerList != "" { - c := kafkazk.KafkaConfigKV{"follower.replication.throttled.replicas", followerList} - config.Configs = append(config.Configs, c) - } +func (tm *ThrottleManager) applyTopicThrottles(throttledTopics topicThrottledReplicas) []error { + if !tm.kafkaNativeMode { + // Use the direct ZooKeeper config update method. + return tm.legacyApplyTopicThrottles(throttledTopics) + } - // Write the config. - changes, err := zk.UpdateKafkaConfig(config) - if err != nil { - errs = append(errs, fmt.Sprintf("Error setting throttle list on topic %s: %s\n", t, err)) - } + // Populate the config with all topics named in the topicThrottledReplicas. + ctx, cancel := tm.kafkaRequestContext() + defer cancel() - var anyChanges bool - for _, changed := range changes { - if changed { - anyChanges = true - } - } + var throttleCfg = kafkaadmin.SetThrottleConfig{Topics: throttledTopics.topics()} - if anyChanges { - // TODO(jamie): we don't use these events yet, but this probably isn't - // actually the format we want anyway. - events <- fmt.Sprintf("updated throttled brokers list for %s", string(t)) - } + // Apply the config. + if err := tm.ka.SetThrottle(ctx, throttleCfg); err != nil { + return []error{err} } - close(events) - - return events, errs + return nil } // removeAllThrottles calls removeTopicThrottles and removeBrokerThrottles in sequence. -func removeAllThrottles(params *ReplicationThrottleConfigs) error { - for _, fn := range []func(*ReplicationThrottleConfigs) error{ - removeTopicThrottles, - removeBrokerThrottles, +func (tm *ThrottleManager) removeAllThrottles() error { + for _, fn := range []func() error{ + tm.removeTopicThrottles, + tm.removeBrokerThrottles, } { - if err := fn(params); err != nil { + if err := fn(); err != nil { return err } } @@ -452,98 +459,78 @@ func removeAllThrottles(params *ReplicationThrottleConfigs) error { } // removeTopicThrottles removes all topic throttle configs. -func removeTopicThrottles(params *ReplicationThrottleConfigs) error { - // Get all topics. - topics, err := params.zk.GetTopics(topicsRegex) +func (tm *ThrottleManager) removeTopicThrottles() error { + // ZooKeeper method. + if !tm.kafkaNativeMode { + return tm.legacyRemoveTopicThrottles() + } + + // Get all topic states. + ctx, cancel := tm.kafkaRequestContext() + defer cancel() + + tstates, err := tm.ka.DescribeTopics(ctx, []string{".*"}) if err != nil { return err } - for _, topic := range topics { - config := kafkazk.KafkaConfig{ - Type: "topic", - Name: topic, - Configs: []kafkazk.KafkaConfigKV{ - {"leader.replication.throttled.replicas", ""}, - {"follower.replication.throttled.replicas", ""}, - }, - } + // States to []string of names. + var topics []string + for name := range tstates { + topics = append(topics, name) + } - // Update the config. - _, err := params.zk.UpdateKafkaConfig(config) - if err != nil { - log.Printf("Error removing throttle config on topic %s: %s\n", topic, err) - } + ctx, cancel = tm.kafkaRequestContext() + defer cancel() - time.Sleep(250 * time.Millisecond) + cfg := kafkaadmin.RemoveThrottleConfig{ + Topics: topics, + } + + // Issue the remove. + if err := tm.ka.RemoveThrottle(ctx, cfg); err != nil { + return err } return nil } // removeBrokerThrottlesByID removes broker throttle configs for the specified IDs. -func removeBrokerThrottlesByID(params *ReplicationThrottleConfigs, ids map[int]struct{}) error { - var unthrottledBrokers []int - var errorEncountered bool - - // Unset throttles. - for b := range ids { - config := kafkazk.KafkaConfig{ - Type: "broker", - Name: strconv.Itoa(b), - Configs: []kafkazk.KafkaConfigKV{ - {"leader.replication.throttled.rate", ""}, - {"follower.replication.throttled.rate", ""}, - }, - } - - changed, err := params.zk.UpdateKafkaConfig(config) - switch err.(type) { - case nil: - case kafkazk.ErrNoNode: - // We'd get an ErrNoNode here only if the parent path for dynamic broker - // configs (/config/brokers) if it doesn't exist, which can happen in - // new clusters that have never had dynamic configs applied. Rather than - // creating that znode, we'll just ignore errors here; if the znodes - // don't exist, there's not even config to remove. - default: - errorEncountered = true - log.Printf("Error removing throttle on broker %d: %s\n", b, err) - } - - if changed[0] || changed[1] { - unthrottledBrokers = append(unthrottledBrokers, b) - log.Printf("Throttle removed on broker %d\n", b) - } - - // Hardcoded sleep to reduce ZK load. - time.Sleep(250 * time.Millisecond) +func (tm *ThrottleManager) removeBrokerThrottlesByID(ids map[int]struct{}) error { + // ZooKeeper method. + if !tm.kafkaNativeMode { + return tm.legacyRemoveBrokerThrottlesByID(ids) } - // Write event. - if len(unthrottledBrokers) > 0 { - m := fmt.Sprintf("Replication throttle removed on the following brokers: %v", - unthrottledBrokers) - params.events.Write("Broker replication throttle removed", m) + // Set to list. + var brokers []int + for id := range ids { + brokers = append(brokers, id) } - // Lazily check if any errors were encountered, return a generic error. - if errorEncountered { - return errors.New("one or more throttles were not cleared") + ctx, cancel := tm.kafkaRequestContext() + defer cancel() + + cfg := kafkaadmin.RemoveThrottleConfig{ + Brokers: brokers, } - // Unset all stored throttle rates. - for ID := range params.previouslySetThrottles { - params.previouslySetThrottles[ID] = [2]*float64{} + // Issue the remove. + if err := tm.ka.RemoveThrottle(ctx, cfg); err != nil { + return fmt.Errorf("Error removing broker throttles: %s", err) } + listStr := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(brokers)), ", "), "[]") + log.Printf("Throttles removed on brokers: %s\n", listStr) + return nil } // removeBrokerThrottles removes all broker throttle configs. -func removeBrokerThrottles(params *ReplicationThrottleConfigs) error { +func (tm *ThrottleManager) removeBrokerThrottles() error { // Fetch brokers. - brokers, errs := params.zk.GetAllBrokerMeta(false) + // TODO(jamie): Switch this to a KafkaAdmin lookup. + brokers, errs := tm.zk.GetAllBrokerMeta(false) if errs != nil { return errs[0] } @@ -551,7 +538,7 @@ func removeBrokerThrottles(params *ReplicationThrottleConfigs) error { var ids = make(map[int]struct{}) for id := range brokers { // Skip brokers with an override where AutoRemove is false. - if override, exists := params.brokerOverrides[id]; exists { + if override, exists := tm.brokerOverrides[id]; exists { if !override.Config.AutoRemove { continue } @@ -560,5 +547,5 @@ func removeBrokerThrottles(params *ReplicationThrottleConfigs) error { ids[id] = struct{}{} } - return removeBrokerThrottlesByID(params, ids) + return tm.removeBrokerThrottlesByID(ids) } diff --git a/cmd/autothrottle/topics.go b/cmd/autothrottle/topics.go index 7d62d14..1fbd7ec 100644 --- a/cmd/autothrottle/topics.go +++ b/cmd/autothrottle/topics.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "strconv" - - "github.com/DataDog/kafka-kit/v3/kafkazk" ) var ( @@ -35,6 +33,15 @@ var acceptedReplicaTypes = map[replicaType]struct{}{ "followers": {}, } +// topics returns the topic names held in the topicThrottledReplicas. +func (ttr topicThrottledReplicas) topics() []string { + var names []string + for topic := range ttr { + names = append(names, string(topic)) + } + return names +} + // addReplica takes a topic, partition number, role (leader, follower), and // broker ID and adds the configuration to the topicThrottledReplicas. func (ttr topicThrottledReplicas) addReplica(topic topic, partn string, role replicaType, id string) error { @@ -70,61 +77,51 @@ func (ttr topicThrottledReplicas) addReplica(topic topic, partn string, role rep return nil } -// TopicStates is a map of topic names to kafakzk.TopicState. -type TopicStates map[string]kafkazk.TopicState - -// TopicStatesFilterFn specifies a filter function. -type TopicStatesFilterFn func(kafkazk.TopicState) bool - -// Filter takes a TopicStatesFilterFn and returns a TopicStates where -// all elements return true as an input to the filter func. -func (t TopicStates) Filter(fn TopicStatesFilterFn) TopicStates { - var ts = make(TopicStates) - for name, state := range t { - if fn(state) { - ts[name] = state - } - } - - return ts -} - // getTopicsWithThrottledBrokers returns a topicThrottledReplicas that includes // any topics that have partitions assigned to brokers with a static throttle // rate set. -func getTopicsWithThrottledBrokers(params *ReplicationThrottleConfigs) (topicThrottledReplicas, error) { - // Fetch all topic states. - states, err := getAllTopicStates(params.zk) - if err != nil { - return nil, err +func (tm *ThrottleManager) getTopicsWithThrottledBrokers() (topicThrottledReplicas, error) { + if !tm.kafkaNativeMode { + // Use the direct ZooKeeper config update method. + return tm.legacyGetTopicsWithThrottledBrokers() } // Lookup brokers with overrides set that are not a reassignment participant. - throttledBrokers := params.brokerOverrides.Filter(notReassignmentParticipant) + throttledBrokers := tm.brokerOverrides.Filter(notReassignmentParticipant) // Construct a topicThrottledReplicas that includes any topics with replicas // assigned to brokers with overrides. The throttled list only includes brokers // with throttles set rather than all configured replicas. var throttleLists = make(topicThrottledReplicas) - // For each topic... + ctx, cancelFn := tm.kafkaRequestContext() + defer cancelFn() + + // Get topic states. + states, err := tm.ka.DescribeTopics(ctx, []string{".*"}) + if err != nil { + return nil, err + } + + // For each topic, check the replica assignment for all partitions. If any + // partition has an assigned broker with a static throttle rate set, append it + // to the throttleLists. for topicName, state := range states { // TODO(jamie): make this configurable. if topicName == "__consumer_offsets" { continue } - // For each partition... - for partn, replicas := range state.Partitions { - // For each replica assignment... - for _, assignedID := range replicas { - // If the replica is a throttled broker, append that broker to the - // throttled list for this {topic, partition}. - if _, exists := throttledBrokers[assignedID]; exists { + for partition, partitionState := range state.PartitionStates { + for _, brokerID := range partitionState.Replicas { + // Look up the broker in the throttled brokers set. + if _, hasThrottle := throttledBrokers[int(brokerID)]; hasThrottle { + // Add it to the throttleLists. throttleLists.addReplica( topic(topicName), - partn, + strconv.Itoa(partition), replicaType("followers"), - strconv.Itoa(assignedID)) + strconv.Itoa(int(brokerID)), + ) } } } @@ -132,25 +129,3 @@ func getTopicsWithThrottledBrokers(params *ReplicationThrottleConfigs) (topicThr return throttleLists, nil } - -// getAllTopicStates returns a TopicStates for all topics in Kafka. -func getAllTopicStates(zk kafkazk.Handler) (TopicStates, error) { - var states = make(TopicStates) - - // Get all topics. - topics, err := zk.GetTopics(topicsRegex) - if err != nil { - return nil, err - } - - // Fetch state for each topic. - for _, topic := range topics { - state, err := zk.GetTopicState(topic) - if err != nil { - return nil, err - } - states[topic] = *state - } - - return states, nil -} diff --git a/cmd/autothrottle/topics_legacy.go b/cmd/autothrottle/topics_legacy.go new file mode 100644 index 0000000..826eae5 --- /dev/null +++ b/cmd/autothrottle/topics_legacy.go @@ -0,0 +1,98 @@ +package main + +import ( + "strconv" + + "github.com/DataDog/kafka-kit/v3/kafkazk" +) + +// TopicStates is a map of topic names to kafakzk.TopicState. +type TopicStates map[string]kafkazk.TopicState + +// legacyGetTopicsWithThrottledBrokers returns a topicThrottledReplicas that +// includes any topics that have partitions assigned to brokers with a static +// throttle rate set. +func (tm *ThrottleManager) legacyGetTopicsWithThrottledBrokers() (topicThrottledReplicas, error) { + // Fetch all topic states. + states, err := tm.legacyGetAllTopicStates() + if err != nil { + return nil, err + } + + // Lookup brokers with overrides set that are not a reassignment participant. + throttledBrokers := tm.brokerOverrides.Filter(notReassignmentParticipant) + + // Construct a topicThrottledReplicas that includes any topics with replicas + // assigned to brokers with overrides. The throttled list only includes brokers + // with throttles set rather than all configured replicas. + var throttleLists = make(topicThrottledReplicas) + + // For each topic... + for topicName, state := range states { + // TODO(jamie): make this configurable. + if topicName == "__consumer_offsets" { + continue + } + // For each partition... + for partn, replicas := range state.Partitions { + // For each replica assignment... + for _, assignedID := range replicas { + // If the replica is a throttled broker, append that broker to the + // throttled list for this {topic, partition}. + if _, exists := throttledBrokers[assignedID]; exists { + throttleLists.addReplica( + topic(topicName), + partn, + replicaType("followers"), + strconv.Itoa(assignedID)) + } + } + } + } + + return throttleLists, nil +} + +// legacyGetAllTopicStates returns a TopicStates for all topics in Kafka. +func (tm *ThrottleManager) legacyGetAllTopicStates() (TopicStates, error) { + var states = make(TopicStates) + + // Get all topics. + topics, err := tm.zk.GetTopics(topicsRegex) + if err != nil { + return nil, err + } + + // Fetch state for each topic. + for _, topic := range topics { + state, err := tm.zk.GetTopicState(topic) + if err != nil { + return nil, err + } + states[topic] = *state + } + + return states, nil +} + +/* + + This code isn't currently accessed, but holding it as a comment until we + remove all the deprecated bits. + +// TopicStatesFilterFn specifies a filter function. +type TopicStatesFilterFn func(kafkazk.TopicState) bool + +// Filter takes a TopicStatesFilterFn and returns a TopicStates where +// all elements return true as an input to the filter func. +func (t TopicStates) Filter(fn TopicStatesFilterFn) TopicStates { + var ts = make(TopicStates) + for name, state := range t { + if fn(state) { + ts[name] = state + } + } + + return ts +} +*/ diff --git a/cmd/autothrottle/topics_legacy_test.go b/cmd/autothrottle/topics_legacy_test.go new file mode 100644 index 0000000..218fad9 --- /dev/null +++ b/cmd/autothrottle/topics_legacy_test.go @@ -0,0 +1,99 @@ +package main + +import ( + "testing" + + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" + "github.com/DataDog/kafka-kit/v3/kafkazk" +) + +func TestLegacyGetTopicsWithThrottledBrokers(t *testing.T) { + rtf := &ThrottleManager{ + zk: &kafkazk.Stub{}, + } + + // Minimally populate the ThrottleManager. + rtf.brokerOverrides = throttlestore.BrokerOverrides{ + 1001: throttlestore.BrokerThrottleOverride{ + ID: 1001, + ReassignmentParticipant: false, + Config: throttlestore.ThrottleOverrideConfig{ + Rate: 50, + }, + }, + // Topics that include this broker shouldn't be included; the + // BrokerThrottleOverride.Filter called in getTopicsWithThrottledBrokers + // excludes any topics mapped to brokers where ReassignmentParticipant + // == true. + 1002: throttlestore.BrokerThrottleOverride{ + ID: 1002, + ReassignmentParticipant: true, + Config: throttlestore.ThrottleOverrideConfig{ + Rate: 50, + }, + }, + } + + // Call. + topicThrottledBrokers, _ := rtf.legacyGetTopicsWithThrottledBrokers() + + expected := topicThrottledReplicas{ + "test_topic": throttled{"followers": brokerIDs{"0:1001"}}, + "test_topic2": throttled{"followers": brokerIDs{"0:1001"}}, + } + + if len(topicThrottledBrokers) != len(expected) { + t.Fatalf("Expected len %d, got %d", len(expected), len(topicThrottledBrokers)) + } + + for topic := range expected { + output, exist := topicThrottledBrokers[topic] + if !exist { + t.Fatalf("Expected topic '%s' in output", topic) + } + + got := output["followers"][0] + expectedOut := expected[topic]["followers"][0] + if got != expectedOut { + t.Errorf("Expected followers '%s', got '%s'", expectedOut, got) + } + } +} + +/* +func TestFilter(t *testing.T) { + zk := &kafkazk.Stub{} + state, _ := zk.GetTopicState("test_topic") + + topicStates := make(TopicStates) + topicStates["test_topic"] = *state + + matchID := 1000 + + // Our filter func. returns any topic that includes matchID as a replica. + fn := func(ts kafkazk.TopicState) bool { + // The stub partition state here is []int{1000,1001}. + for _, id := range ts.Partitions["0"] { + if id == matchID { + return true + } + } + return false + } + + // We should get back one topic. + filtered := topicStates.Filter(fn) + _, match := filtered["test_topic"] + if len(filtered) != 1 && !match { + t.Errorf("Expected key 'test_topic'") + } + + matchID = 9999 + + // We should now have no matched topics. + filtered = topicStates.Filter(fn) + if len(filtered) != 0 { + t.Errorf("Expected nil filtered result") + } +} +*/ diff --git a/cmd/autothrottle/topics_test.go b/cmd/autothrottle/topics_test.go index 9acc685..e65dec1 100644 --- a/cmd/autothrottle/topics_test.go +++ b/cmd/autothrottle/topics_test.go @@ -3,7 +3,8 @@ package main import ( "testing" - "github.com/DataDog/kafka-kit/v3/kafkazk" + "github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore" + "github.com/DataDog/kafka-kit/v3/kafkaadmin/stub" ) func TestAddReplica(t *testing.T) { @@ -39,53 +40,18 @@ func TestAddReplica(t *testing.T) { } } -func TestFilter(t *testing.T) { - zk := &kafkazk.Stub{} - state, _ := zk.GetTopicState("test_topic") - - topicStates := make(TopicStates) - topicStates["test_topic"] = *state - - matchID := 1000 - - // Our filter func. returns any topic that includes matchID as a replica. - fn := func(ts kafkazk.TopicState) bool { - // The stub partition state here is []int{1000,1001}. - for _, id := range ts.Partitions["0"] { - if id == matchID { - return true - } - } - return false - } - - // We should get back one topic. - filtered := topicStates.Filter(fn) - _, match := filtered["test_topic"] - if len(filtered) != 1 && !match { - t.Errorf("Expected key 'test_topic'") - } - - matchID = 9999 - - // We should now have no matched topics. - filtered = topicStates.Filter(fn) - if len(filtered) != 0 { - t.Errorf("Expected nil filtered result") - } -} - func TestGetTopicsWithThrottledBrokers(t *testing.T) { - rtf := &ReplicationThrottleConfigs{ - zk: &kafkazk.Stub{}, + rtf := &ThrottleManager{ + kafkaNativeMode: true, + ka: stub.Client{}, } - // Minimally populate the ReplicationThrottleConfigs. - rtf.brokerOverrides = BrokerOverrides{ - 1001: BrokerThrottleOverride{ + // Minimally populate the ThrottleManager. + rtf.brokerOverrides = throttlestore.BrokerOverrides{ + 1001: throttlestore.BrokerThrottleOverride{ ID: 1001, ReassignmentParticipant: false, - Config: ThrottleOverrideConfig{ + Config: throttlestore.ThrottleOverrideConfig{ Rate: 50, }, }, @@ -93,21 +59,20 @@ func TestGetTopicsWithThrottledBrokers(t *testing.T) { // BrokerThrottleOverride.Filter called in getTopicsWithThrottledBrokers // excludes any topics mapped to brokers where ReassignmentParticipant // == true. - 1002: BrokerThrottleOverride{ + 1002: throttlestore.BrokerThrottleOverride{ ID: 1002, ReassignmentParticipant: true, - Config: ThrottleOverrideConfig{ + Config: throttlestore.ThrottleOverrideConfig{ Rate: 50, }, }, } // Call. - topicThrottledBrokers, _ := getTopicsWithThrottledBrokers(rtf) + topicThrottledBrokers, _ := rtf.getTopicsWithThrottledBrokers() expected := topicThrottledReplicas{ - "test_topic": throttled{"followers": brokerIDs{"0:1001"}}, - "test_topic2": throttled{"followers": brokerIDs{"0:1001"}}, + "test1": throttled{"followers": brokerIDs{"0:1001"}}, } if len(topicThrottledBrokers) != len(expected) { diff --git a/kafkaadmin/stub/kafkaadmin.go b/kafkaadmin/stub/kafkaadmin.go new file mode 100644 index 0000000..af2ad25 --- /dev/null +++ b/kafkaadmin/stub/kafkaadmin.go @@ -0,0 +1,34 @@ +package stub + +import ( + "context" + + "github.com/DataDog/kafka-kit/v3/kafkaadmin" +) + +// StubClient is a stubbed implementation of KafkaAdminClient. +type Client struct{} + +func (s Client) Close() { + return +} + +func (s Client) CreateTopic(context.Context, kafkaadmin.CreateTopicConfig) error { + return nil +} +func (s Client) DeleteTopic(context.Context, string) error { + return nil +} +func (s Client) DescribeTopics(context.Context, []string) (kafkaadmin.TopicStates, error) { + return kafkaadmin.TopicStatesFromMetadata(fakeKafkaMetadata()) +} + +func (s Client) SetThrottle(context.Context, kafkaadmin.SetThrottleConfig) error { + return nil +} +func (s Client) RemoveThrottle(context.Context, kafkaadmin.RemoveThrottleConfig) error { + return nil +} +func (s Client) GetDynamicConfigs(context.Context, string, []string) (kafkaadmin.ResourceConfigs, error) { + return nil, nil +} diff --git a/kafkaadmin/stub/metadata.go b/kafkaadmin/stub/metadata.go new file mode 100644 index 0000000..49c3fc0 --- /dev/null +++ b/kafkaadmin/stub/metadata.go @@ -0,0 +1,95 @@ +package stub + +import ( + "github.com/DataDog/kafka-kit/v3/kafkaadmin" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// fakeTopicState takes a topic name and desired number of partitions and returns +// a TopicState. Note that the PartitionStates are left empty; those are to be +// filled as needed in each test. +func fakeTopicState(name string, partitions int32) kafkaadmin.TopicState { + ts := kafkaadmin.NewTopicState(name) + ts.Partitions = partitions + ts.ReplicationFactor = 2 + ts.PartitionStates = map[int]kafkaadmin.PartitionState{} + for i := int32(0); i < partitions; i++ { + ts.PartitionStates[int(i)] = kafkaadmin.PartitionState{ + ID: i, + } + } + + return ts +} + +func fakeKafkaMetadata() *kafka.Metadata { + var noErr = kafka.NewError(kafka.ErrNoError, "Success", false) + + return &kafka.Metadata{ + Brokers: []kafka.BrokerMetadata{ + { + ID: 1001, + Host: "host-a", + Port: 9092, + }, + { + ID: 1002, + Host: "host-b", + Port: 9092, + }, + { + ID: 1003, + Host: "host-c", + Port: 9092, + }, + }, + Topics: map[string]kafka.TopicMetadata{ + "test1": { + Topic: "test1", + Partitions: []kafka.PartitionMetadata{ + { + ID: 0, + Error: noErr, + Leader: 1001, + Replicas: []int32{1001, 1002}, + Isrs: []int32{1001, 1002}, + }, + { + ID: 1, + Error: noErr, + Leader: 1002, + Replicas: []int32{1002}, + Isrs: []int32{1002}, + }, + }, + Error: noErr, + }, + "test2": { + Topic: "test2", + Partitions: []kafka.PartitionMetadata{ + { + ID: 0, + Error: noErr, + Leader: 1003, + Replicas: []int32{1003, 1002}, + Isrs: []int32{1003, 1002}, + }, + { + ID: 1, + Error: noErr, + Leader: 1003, + Replicas: []int32{1002, 1003}, + Isrs: []int32{1003, 1002}, + }, + }, + Error: noErr, + }, + }, + OriginatingBroker: kafka.BrokerMetadata{ + ID: 1001, + Host: "host-a", + Port: 9092, + }, + } +} diff --git a/kafkaadmin/throttles.go b/kafkaadmin/throttles.go index 79b5bc8..1d0c69b 100644 --- a/kafkaadmin/throttles.go +++ b/kafkaadmin/throttles.go @@ -173,9 +173,13 @@ func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) er } } - if len(throttleConfigs) > 0 { - // Apply the configs. - if _, err = c.c.AlterConfigs(ctx, throttleConfigs); err != nil { + // Apply the configs in sequence. + for _, config := range throttleConfigs { + // TODO(jamie) perform these in batch once the 'Only one ConfigResource of + // type BROKER is allowed per call' error is no longer encountered. + // TODO(jamie) review whether the kafka.SetAdminIncremental AlterConfigsAdminOption + // actually works here. + if _, err = c.c.AlterConfigs(ctx, []kafka.ConfigResource{config}); err != nil { return ErrRemoveThrottle{Message: err.Error()} } } @@ -261,14 +265,18 @@ func populateBrokerThrottleConfigs(brokers map[int]BrokerThrottleConfig, configs txRate := fmt.Sprintf("%d", throttleRates.OutboundLimitBytes) rxRate := fmt.Sprintf("%d", throttleRates.InboundLimitBytes) - // Write configs. - err = configs.AddConfig(id, brokerTXThrottleCfgName, txRate) - if err != nil { - return err + // Write configs. We skip any zero configs which are interpreted as unset. + if throttleRates.OutboundLimitBytes != 0 { + err = configs.AddConfig(id, brokerTXThrottleCfgName, txRate) + if err != nil { + return err + } } - err = configs.AddConfig(id, brokerRXThrottleCfgName, rxRate) - if err != nil { - return err + if throttleRates.InboundLimitBytes != 0 { + err = configs.AddConfig(id, brokerRXThrottleCfgName, rxRate) + if err != nil { + return err + } } } diff --git a/kafkaadmin/topics.go b/kafkaadmin/topics.go index a6d3ca3..dd500f5 100644 --- a/kafkaadmin/topics.go +++ b/kafkaadmin/topics.go @@ -109,7 +109,7 @@ func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicState filterMatches(md, topicNamesRegex) - return topicStatesFromMetadata(md) + return TopicStatesFromMetadata(md) } func filterMatches(md *kafka.Metadata, re []*regexp.Regexp) { @@ -126,7 +126,7 @@ func filterMatches(md *kafka.Metadata, re []*regexp.Regexp) { } } -func topicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error) { +func TopicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error) { if len(md.Topics) == 0 { return nil, ErrNoData } diff --git a/kafkaadmin/topics_test.go b/kafkaadmin/topics_test.go index 9afa5ee..d37c5fb 100644 --- a/kafkaadmin/topics_test.go +++ b/kafkaadmin/topics_test.go @@ -11,7 +11,7 @@ func TestTopicStatesFromMetadata(t *testing.T) { // Mock metadata. md := fakeKafkaMetadata() // Get a TopicStates. - ts, err := topicStatesFromMetadata(md) + ts, err := TopicStatesFromMetadata(md) assert.Nil(t, err) // Expected results. diff --git a/kafkazk/README.md b/kafkazk/README.md index 9f3a00a..5a5010e 100644 --- a/kafkazk/README.md +++ b/kafkazk/README.md @@ -1 +1 @@ -[![GoDoc](https://godoc.org/github.com/DataDog/kafka-kit/kafkazk?status.svg)](https://godoc.org/github.com/DataDog/kafka-kit/kafkazk) +[![GoDoc](https://godoc.org/github.com/DataDog/kafka-kit/kafkazk?status.svg)](https://godoc.org/github.com/DataDog/kafka-kit/v3/kafkazk)