Skip to content

Commit

Permalink
[autothrottle] optionally skip auto-delete throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
mjd95 committed Nov 21, 2023
1 parent b298cd9 commit 11e5a41
Showing 1 changed file with 48 additions and 42 deletions.
90 changes: 48 additions & 42 deletions cmd/autothrottle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,30 @@ var (

// Config holds configuration parameters.
Config struct {
KafkaNativeMode bool
KafkaAPIRequestTimeout int
APIKey string
AppKey string
NetworkTXQuery string
NetworkRXQuery string
BrokerIDTag string
InstanceTypeTag string
MetricsWindow int
BootstrapServers string
ZKAddr string
ZKPrefix string
Interval int
APIListen string
ConfigZKPrefix string
DDEventTags string
MinRate float64
SourceMaxRate float64
DestinationMaxRate float64
ChangeThreshold float64
FailureThreshold int
CapMap map[string]float64
CleanupAfter int64
KafkaNativeMode bool
KafkaAPIRequestTimeout int
APIKey string
AppKey string
NetworkTXQuery string
NetworkRXQuery string
BrokerIDTag string
InstanceTypeTag string
MetricsWindow int
BootstrapServers string
ZKAddr string
ZKPrefix string
Interval int
APIListen string
ConfigZKPrefix string
DDEventTags string
MinRate float64
SourceMaxRate float64
DestinationMaxRate float64
ChangeThreshold float64
FailureThreshold int
CapMap map[string]float64
CleanupAfter int64
SkipAutoDeleteThrottles bool
}
)

Expand Down Expand Up @@ -78,6 +79,7 @@ func main() {
flag.IntVar(&Config.FailureThreshold, "failure-threshold", 1, "Number of iterations that throttle determinations can fail before reverting to the min-rate")
m := flag.String("cap-map", "", "JSON map of instance types to network capacity in MB/s")
flag.Int64Var(&Config.CleanupAfter, "cleanup-after", 60, "Number of intervals after which to issue a global throttle unset if no replication is running")
flag.BoolVar(&Config.SkipAutoDeleteThrottles, "skip-auto-delete-throttles", false, "Skip automatic throttle removal")

envy.Parse("AUTOTHROTTLE")
flag.Parse()
Expand Down Expand Up @@ -422,31 +424,35 @@ func main() {

// If there's previously set throttles but no topics reassigning nor
// broker overrides set, we can issue a global throttle removal.
if throttlesToClear && !topicsReassigning && !brokerOverridesSet {
if !topicsReassigning && throttlesToClear && !brokerOverridesSet {
// Reset the interval count.
interval = 0

// Remove all the broker + topic throttle configs.
err := throttleManager.RemoveAllThrottles()
if err != nil {
log.Printf("Error removing throttles: %s\n", err.Error())
if Config.SkipAutoDeleteThrottles {
log.Println("There may be throttles eligible for removal, but skipping automatic removal since skip-auto-delete-throttles is set")
} else {
// Only set knownThrottles to false if we've removed all
// without error.
knownThrottles = false
}

// Ensure topic throttle updates are re-enabled.
throttleManager.EnableTopicUpdates()
throttleManager.EnableOverrideTopicUpdates()

// Remove any configured throttle overrides if AutoRemove is true.
if overrideCfg.AutoRemove {
err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{})
// Remove all the broker + topic throttle configs.
err := throttleManager.RemoveAllThrottles()
if err != nil {
log.Println(err)
log.Printf("Error removing throttles: %s\n", err.Error())
} else {
log.Println("Global throttle override removed")
// Only set knownThrottles to false if we've removed all
// without error.
knownThrottles = false
}

// Ensure topic throttle updates are re-enabled.
throttleManager.EnableTopicUpdates()
throttleManager.EnableOverrideTopicUpdates()

// Remove any configured throttle overrides if AutoRemove is true.
if overrideCfg.AutoRemove {
err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{})
if err != nil {
log.Println(err)
} else {
log.Println("Global throttle override removed")
}
}
}
}
Expand Down

0 comments on commit 11e5a41

Please sign in to comment.