Skip to content

Commit

Permalink
Merge pull request #399 from DataDog/jamie/autothrottle-admin-rpc
Browse files Browse the repository at this point in the history
autothrottle Kafka native API support
  • Loading branch information
jamiealquiza authored May 3, 2022
2 parents 51fd2df + 3cc1bd8 commit 0052781
Show file tree
Hide file tree
Showing 24 changed files with 1,072 additions and 676 deletions.
92 changes: 50 additions & 42 deletions cmd/autothrottle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ Autothrottle does this by running a loop that discovers topics undergoing replic

In contrast to where Kafka's out of the box tooling allows users to manually set a static, global inbound and outbound rate, autothrottle determines path-optimal rates on an individual broker basis. It does this by examining a graph representation of replication flows and individually calculating optimal transfer rates for inbound and outbound transfers. This means that if a single broker is near saturation, it doesn't need to slow down the recovery pace of the entire replication group. This allows every broker to run at target network utilization thresholds in any direction, minimizing recovery time and hotspots.

All throttle control logic is applied directly to the cluster through ZooKeeper; autothrottle natively mirrors Kafka's throttle control implementation rather than wrapping CLI tools.
Historically, all throttle control logic has been applied directly to the cluster through ZooKeeper; autothrottle natively ports Kafka's throttle control implementation and manages it directly through the cluster state metadata housed in ZooKeeper. For newer versions of Kafka, autothrottle now supports throttle management via dynamic configurations using the Kafka Admin API, along with KIP-455 compatible reassignment lookups. This feature is enabled with the `--kafka-native-mode` flag and marks the continued support for eventual removal of ZooKeeper as a Kafka dependency (KIP-500).

Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or even outright disabled. This allows users to quickly revert to using other tools if desired.
Finally, autothrottle was designed to work as a piggyback system that doesn't take ownership of your cluster. It can easily be overridden (through the admin API), stopped safely at any time, or outright disabled. This allows users to quickly revert to using other tools if desired.

**Additional features**:
- Configurable portion of free headroom available for use by replication (`--max-rate`)
Expand Down Expand Up @@ -82,46 +82,54 @@ The variables in brackets are optional env var overrides.

```
Usage of autothrottle:
-api-key string
Datadog API key [AUTOTHROTTLE_API_KEY]
-api-listen string
Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080")
-app-key string
Datadog app key [AUTOTHROTTLE_APP_KEY]
-broker-id-tag string
Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id")
-cap-map string
JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP]
-change-threshold float
Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10)
-cleanup-after int
Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60)
-dd-event-tags string
Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS]
-failure-threshold int
Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1)
-interval int
Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180)
-max-rx-rate float
Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90)
-max-tx-rate float
Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90)
-metrics-window int
Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120)
-min-rate float
Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10)
-net-rx-query string
Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}")
-net-tx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}")
-version
version [AUTOTHROTTLE_VERSION]
-zk-addr string
ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181")
-zk-config-prefix string
ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle")
-zk-prefix string
ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX]
-api-key string
Datadog API key [AUTOTHROTTLE_API_KEY]
-api-listen string
Admin API listen address:port [AUTOTHROTTLE_API_LISTEN] (default "localhost:8080")
-app-key string
Datadog app key [AUTOTHROTTLE_APP_KEY]
-bootstrap-servers string
Kafka bootstrap servers [AUTOTHROTTLE_BOOTSTRAP_SERVERS] (default "localhost:9092")
-broker-id-tag string
Datadog host tag for broker ID [AUTOTHROTTLE_BROKER_ID_TAG] (default "broker_id")
-cap-map string
JSON map of instance types to network capacity in MB/s [AUTOTHROTTLE_CAP_MAP]
-change-threshold float
Required change in replication throttle to trigger an update (percent) [AUTOTHROTTLE_CHANGE_THRESHOLD] (default 10)
-cleanup-after int
Number of intervals after which to issue a global throttle unset if no replication is running [AUTOTHROTTLE_CLEANUP_AFTER] (default 60)
-dd-event-tags string
Comma-delimited list of Datadog event tags [AUTOTHROTTLE_DD_EVENT_TAGS]
-failure-threshold int
Number of iterations that throttle determinations can fail before reverting to the min-rate [AUTOTHROTTLE_FAILURE_THRESHOLD] (default 1)
-instance-type-tag string
Datadog tag for instance type [AUTOTHROTTLE_INSTANCE_TYPE_TAG] (default "instance-type")
-interval int
Autothrottle check interval (seconds) [AUTOTHROTTLE_INTERVAL] (default 180)
-kafka-api-request-timeout int
Kafka API request timeout (seconds) [AUTOTHROTTLE_KAFKA_API_REQUEST_TIMEOUT] (default 15)
-kafka-native-mode
Favor native Kafka RPCs over ZooKeeper metadata access [AUTOTHROTTLE_KAFKA_NATIVE_MODE]
-max-rx-rate float
Maximum inbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_RX_RATE] (default 90)
-max-tx-rate float
Maximum outbound replication throttle rate (as a percentage of available capacity) [AUTOTHROTTLE_MAX_TX_RATE] (default 90)
-metrics-window int
Time span of metrics required (seconds) [AUTOTHROTTLE_METRICS_WINDOW] (default 120)
-min-rate float
Minimum replication throttle rate (MB/s) [AUTOTHROTTLE_MIN_RATE] (default 10)
-net-rx-query string
Datadog query for broker inbound bandwidth by host [AUTOTHROTTLE_NET_RX_QUERY] (default "avg:system.net.bytes_rcvd{service:kafka} by {host}")
-net-tx-query string
Datadog query for broker outbound bandwidth by host [AUTOTHROTTLE_NET_TX_QUERY] (default "avg:system.net.bytes_sent{service:kafka} by {host}")
-version
version [AUTOTHROTTLE_VERSION]
-zk-addr string
ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [AUTOTHROTTLE_ZK_ADDR] (default "localhost:2181")
-zk-config-prefix string
ZooKeeper prefix to store autothrottle configuration [AUTOTHROTTLE_ZK_CONFIG_PREFIX] (default "autothrottle")
-zk-prefix string
ZooKeeper namespace prefix [AUTOTHROTTLE_ZK_PREFIX]
```

## Detailed: Rate Calculations, Applying Throttles
Expand Down
120 changes: 0 additions & 120 deletions cmd/autothrottle/api_deprecated.go

This file was deleted.

2 changes: 1 addition & 1 deletion cmd/autothrottle/capacities.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r replicationCapacityByBroker) reset() {
// in the reassignment. For each broker, it determines whether the broker is
// a leader (source) or a follower (destination), and calculates a throttle
// accordingly, returning a replicationCapacityByBroker and error.
func brokerReplicationCapacities(rtc *ReplicationThrottleConfigs, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) {
func brokerReplicationCapacities(rtc *ThrottleManager, reassigning reassigningBrokers, bm kafkametrics.BrokerMetrics) (replicationCapacityByBroker, error) {
capacities := replicationCapacityByBroker{}

// For each broker, check whether the it's a source and/or destination,
Expand Down
2 changes: 1 addition & 1 deletion cmd/autothrottle/capacities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestBrokerReplicationCapacities(t *testing.T) {
CapacityMap: map[string]float64{"stub": 200.00},
})

rtc := &ReplicationThrottleConfigs{
rtc := &ThrottleManager{
reassignments: reassignments,
previouslySetThrottles: replicationCapacityByBroker{1000: throttleByRole{float64ptr(20)}},
limits: lim,
Expand Down
Loading

0 comments on commit 0052781

Please sign in to comment.