Skip to content

Commit

Permalink
Merge pull request #99
Browse files Browse the repository at this point in the history
Fix #98
  • Loading branch information
chenzhiguo authored Oct 16, 2024
2 parents 956448b + 699dff2 commit 0605d19
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class AbstractRateLimiter implements RateLimiter {
*/
public AbstractRateLimiter(RateLimitPolicy policy) {
this.policy = policy;
this.option = MapOption.of(policy.getActionParameters());
this.option = MapOption.of(policy.getParameters());
this.timeout = Duration.ofMillis(policy.getMaxWaitMs());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
*/
public interface RateLimiter {

String MAX_BURST_SECONDS = "maxBurstSeconds";

/**
* Try to get a permit return the result
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2012 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.jd.live.agent.governance.invoke.ratelimit.leakybucket;

import com.jd.live.agent.governance.invoke.ratelimit.tokenbucket.SmoothTokenBucketLimiter;
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.atomic.AtomicLong;

/**
* LeakyBucketLimiter
*
* @since 1.4.0
*/
public class LeakyBucketLimiter extends SmoothTokenBucketLimiter {

private static final String KEY_CAPACITY = "capacity";

private final long capacity;

private final AtomicLong requests = new AtomicLong(0);

public LeakyBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) {
super(limitPolicy, slidingWindow);
this.capacity = option.getLong(KEY_CAPACITY, 0L);
}

@Override
protected boolean isAvailable(long nowMicros, long timeoutMicros) {
return super.isAvailable(nowMicros, timeoutMicros) && (capacity <= 0 || requests.get() < capacity);
}

@Override
protected void doAcquire(int permits, long nowMicros) {
requests.incrementAndGet();
try {
super.doAcquire(permits, nowMicros);
} finally {
requests.decrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.invoke.ratelimit.leakybucket;

import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.invoke.ratelimit.AbstractRateLimiterFactory;
import com.jd.live.agent.governance.invoke.ratelimit.RateLimiter;
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.List;

/**
* LeakyBucketLimiterFactory
*
* @since 1.4.0
*/
@Injectable
@Extension(value = "LeakyBucket")
public class LeakyBucketLimiterFactory extends AbstractRateLimiterFactory {

@Override
protected RateLimiter create(RateLimitPolicy policy) {
List<SlidingWindow> windows = policy.getSlidingWindows();
if (windows.size() == 1) {
return new LeakyBucketLimiter(policy, windows.get(0));
}
return new LeakyBucketLimiterGroup(policy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.invoke.ratelimit.leakybucket;

import com.jd.live.agent.governance.invoke.ratelimit.AbstractRateLimiterGroup;
import com.jd.live.agent.governance.invoke.ratelimit.RateLimiter;
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

/**
* LeakyBucketLimiterGroup
*
* @since 1.4.0
*/
public class LeakyBucketLimiterGroup extends AbstractRateLimiterGroup {

public LeakyBucketLimiterGroup(RateLimitPolicy policy) {
super(policy);
}

@Override
protected RateLimiter create(SlidingWindow window, String name) {
return new LeakyBucketLimiter(policy, window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,24 @@
*/
public class SmoothTokenBucketLimiter extends AbstractRateLimiter {

private final SleepingStopwatch stopwatch;
private final static String KEY_MAX_BURST_SECONDS = "maxBurstSeconds";

protected final SleepingStopwatch stopwatch;

/**
* The maximum number of stored permits.
*/
private final double maxPermits;
protected final double maxPermits;

/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
private final double stableIntervalMicros;
protected final double stableIntervalMicros;

private volatile Object mutexDoNotUseDirectly;
protected long nextFreeTicketMicros;

private long nextFreeTicketMicros;
protected volatile Object mutexDoNotUseDirectly;

/**
* The currently stored permits.
Expand All @@ -56,7 +58,7 @@ public class SmoothTokenBucketLimiter extends AbstractRateLimiter {

public SmoothTokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) {
super(limitPolicy);
int maxBurstSeconds = option.getInteger(MAX_BURST_SECONDS, 5);
int maxBurstSeconds = option.getInteger(KEY_MAX_BURST_SECONDS, 5);
double secondPermits = slidingWindow.getSecondPermits();
this.stopwatch = SleepingStopwatch.createFromSystemTimer();
this.stableIntervalMicros = secondPermits <= 0 ? 100 : TimeUnit.SECONDS.toMicros(1L) / secondPermits;
Expand All @@ -83,38 +85,74 @@ public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
long timeoutMicros = timeUnit.toMicros(timeout < 0 ? 0 : timeout);
long nowMicros = stopwatch.readMicros();
synchronized (mutex()) {
if (!canAcquire(nowMicros, timeoutMicros)) {
if (!isAvailable(nowMicros, timeoutMicros)) {
return false;
}
// always pay in advance
stopwatch.sleepMicrosUninterruptibly(reserveAndGetWaitLength(permits, nowMicros));
doAcquire(permits, nowMicros);
}
return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
/**
* Acquires the specified number of permits, blocking until they become available.
*
* @param permits the number of permits to acquire
* @param nowMicros the current time in microseconds
*/
protected void doAcquire(int permits, long nowMicros) {
// always pay in advance
stopwatch.sleepMicrosUninterruptibly(waitFor(permits, nowMicros));
}

/**
* Checks if the current time is before the timeout time for acquiring a free ticket.
*
* @param nowMicros the current time in microseconds
* @param timeoutMicros the timeout time in microseconds
* @return true if the current time is before the timeout time, false otherwise
*/
protected boolean isAvailable(long nowMicros, long timeoutMicros) {
return nextFreeTicketMicros - timeoutMicros <= nowMicros;
}

private long reserveAndGetWaitLength(long permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
/**
* Waits for the specified number of permits to become available.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time waited in microseconds, or 0 if no wait was necessary
*/
protected long waitFor(long permits, long nowMicros) {
long momentAvailable = waitForEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}

private long reserveEarliestAvailable(long requiredPermits, long nowMicros) {
reSync(nowMicros);
/**
* Waits for the specified number of permits to become available, returning the time at which the earliest permit will be available.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time at which the earliest permit will be available, in microseconds
*/
protected long waitForEarliestAvailable(long permits, long nowMicros) {
update(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, storedPermits);
double available = min(permits, storedPermits);

double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = (long) (freshPermits * stableIntervalMicros);
double lack = permits - available;
long waitMicros = (long) (lack * stableIntervalMicros);

nextFreeTicketMicros = saturatedAdd(nextFreeTicketMicros, waitMicros);
storedPermits -= storedPermitsToSpend;
storedPermits -= available;
return returnValue;
}

private void reSync(long nowMicros) {
/**
* Updates the internal state of the rate limiter based on the current time.
*
* @param nowMicros the current time in microseconds
*/
protected void update(long nowMicros) {
// if nextFreeTicket is in the past, reSync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros;
Expand All @@ -123,6 +161,11 @@ private void reSync(long nowMicros) {
}
}

/**
* Returns the mutex object used for synchronization.
*
* @return the mutex object
*/
private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
Expand All @@ -136,7 +179,14 @@ private Object mutex() {
return mutex;
}

private long saturatedAdd(long a, long b) {
/**
* Adds two long values, handling overflow by returning the maximum or minimum value.
*
* @param a the first value to add
* @param b the second value to add
* @return the sum of the two values, or Long.MAX_VALUE if the result overflows, or Long.MIN_VALUE if the result underflows
*/
protected long saturatedAdd(long a, long b) {
long naiveSum = a + b;
if ((a ^ b) < 0 | (a ^ naiveSum) >= 0) {
// If a and b have different signs or a has the same sign as the result then there was no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import java.util.List;

/**
* TokenBucketLimiterFactory
* SmoothTokenBucketLimiterFactory
*
* @since 1.0.0
*/
@Injectable
@Extension(value = "TokenBucket")
public class TokenBucketRateLimiterFactory extends AbstractRateLimiterFactory {
public class SmoothTokenBucketLimiterFactory extends AbstractRateLimiterFactory {

@Override
protected RateLimiter create(RateLimitPolicy policy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.jd.live.agent.governance.policy.service.limit;

import com.jd.live.agent.core.parser.json.JsonAlias;
import com.jd.live.agent.governance.policy.PolicyId;
import com.jd.live.agent.governance.rule.RelationType;
import com.jd.live.agent.governance.rule.tag.TagCondition;
Expand Down Expand Up @@ -72,7 +73,8 @@ public abstract class AbstractLimitPolicy extends PolicyId implements LimitPolic
/**
* A map of parameters that further customize the action of the limiting strategy.
*/
private Map<String, String> actionParameters;
@JsonAlias("actionParameters")
private Map<String, String> parameters;

/**
* The version of the limiting policy.
Expand Down Expand Up @@ -147,8 +149,8 @@ public void supplement(AbstractLimitPolicy source) {
if (realizeType == null) {
realizeType = source.realizeType;
}
if (actionParameters == null) {
actionParameters = source.actionParameters;
if (parameters == null) {
parameters = source.parameters;
}
if (version <= 0) {
version = source.version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ default String getName() {
*
* @return a map of action parameters, where each key is the parameter name and each value is the parameter value.
*/
Map<String, String> getActionParameters();
Map<String, String> getParameters();

}

Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
com.jd.live.agent.governance.invoke.ratelimit.tokenbucket.TokenBucketRateLimiterFactory
com.jd.live.agent.governance.invoke.ratelimit.tokenbucket.SmoothTokenBucketLimiterFactory
com.jd.live.agent.governance.invoke.ratelimit.leakybucket.LeakyBucketLimiterFactory

0 comments on commit 0605d19

Please sign in to comment.