Skip to content

Commit

Permalink
updates to backoff strategy and min backoff multiplier
Browse files Browse the repository at this point in the history
- strategies return whether to increase the backoff level
- config: min backoff multiplier changed from 0 to 1ms
- backoff calculation changed from 2^n*multiplier to 2^(n-1)*multiplier
  - n is always >= 1
- fixes #64
  • Loading branch information
judwhite committed Oct 10, 2016
1 parent f778f6c commit dbf821e
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 35 deletions.
73 changes: 64 additions & 9 deletions NsqSharp.Tests/ConfigTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void TestMinValues()
c.Set("default_requeue_delay", TimeSpan.Zero);
c.Set("backoff_strategy", "exponential");
c.Set("max_backoff_duration", TimeSpan.Zero);
c.Set("backoff_multiplier", 0);
c.Set("backoff_multiplier", "1ms");
c.Set("max_attempts", 0);
c.Set("low_rdy_idle_timeout", TimeSpan.FromSeconds(1));
c.Set("rdy_redistribute_interval", TimeSpan.FromMilliseconds(1));
Expand All @@ -164,7 +164,7 @@ public void TestMinValues()
Assert.AreEqual(TimeSpan.Zero, c.DefaultRequeueDelay, "default_requeue_delay");
Assert.AreEqual(typeof(ExponentialStrategy), c.BackoffStrategy.GetType(), "backoff_strategy");
Assert.AreEqual(TimeSpan.Zero, c.MaxBackoffDuration, "max_backoff_duration");
Assert.AreEqual(TimeSpan.Zero, c.BackoffMultiplier, "backoff_multiplier");
Assert.AreEqual(TimeSpan.FromMilliseconds(1), c.BackoffMultiplier, "backoff_multiplier");
Assert.AreEqual(0, c.MaxAttempts, "max_attempts");
Assert.AreEqual(TimeSpan.FromSeconds(1), c.LowRdyIdleTimeout, "low_rdy_idle_timeout");
Assert.AreEqual(TimeSpan.FromMilliseconds(1), c.RDYRedistributeInterval, "rdy_redistribute_interval");
Expand Down Expand Up @@ -258,7 +258,7 @@ public void TestValidatesLessThanMinValues()
Assert.Throws<Exception>(() => c.Set("default_requeue_delay", TimeSpan.Zero - tick), "default_requeue_delay");
Assert.Throws<Exception>(() => c.Set("backoff_strategy", "invalid"), "backoff_strategy");
Assert.Throws<Exception>(() => c.Set("max_backoff_duration", TimeSpan.Zero - tick), "max_backoff_duration");
Assert.Throws<Exception>(() => c.Set("backoff_multiplier", TimeSpan.Zero - tick), "backoff_multiplier");
Assert.Throws<Exception>(() => c.Set("backoff_multiplier", TimeSpan.FromMilliseconds(1) - tick), "backoff_multiplier");
Assert.Throws<Exception>(() => c.Set("max_attempts", 0 - 1), "max_attempts");
Assert.Throws<Exception>(() => c.Set("low_rdy_idle_timeout", TimeSpan.FromSeconds(1) - tick), "low_rdy_idle_timeout");
Assert.Throws<Exception>(() => c.Set("rdy_redistribute_interval", TimeSpan.FromMilliseconds(1) - tick), "rdy_redistribute_interval");
Expand Down Expand Up @@ -456,13 +456,15 @@ public void TestExponentialBackoff()
var backoffStrategy = new ExponentialStrategy();

var config = new Config();
config.BackoffMultiplier = TimeSpan.FromSeconds(1);

var attempts = new[] { 0, 1, 3, 5 };
var attempts = new[] { 1, 2, 4, 6 };
for (int i = 0; i < attempts.Length; i++)
{
var result = backoffStrategy.Calculate(config, attempts[i]);

Assert.AreEqual(expected[i], result, string.Format("wrong backoff duration for attempt {0}", attempts[i]));
Assert.AreEqual(expected[i], result.Duration,
string.Format("wrong backoff duration for attempt {0}", attempts[i]));
}
}

Expand All @@ -484,16 +486,69 @@ public void TestFullJitterBackoff()
var config = new Config();
config.BackoffMultiplier = TimeSpan.FromSeconds(0.5);

var attempts = new[] { 0, 1, 3, 5 };
var attempts = new[] { 1, 2, 4, 6 };
for (int count = 0; count < 50000; count++)
{
for (int i = 0; i < attempts.Length; i++)
{
int attempt = attempts[i];
var result = backoffStrategy.Calculate(config, attempt);
var result = backoffStrategy.Calculate(config, attempt).Duration;

Assert.LessOrEqual(result, maxExpected[i], string.Format("wrong backoff duration for attempt {0}", attempt));
//Console.WriteLine(string.Format("{0} {1}", result, maxExpected[i]));
Assert.LessOrEqual(result, maxExpected[i],
string.Format("wrong backoff duration for attempt {0}", attempt));
}
}
}

[Test]
public void TestExponentialMaxBackoffLevel()
{
TestBackoffMaxLevel(new ExponentialStrategy());
}

[Test]
public void TestFullJitterMaxBackoffLevel()
{
TestBackoffMaxLevel(new FullJitterStrategy());
}

private void TestBackoffMaxLevel(IBackoffStrategy backoffStrategy)
{
for (int maxBackoff = 0; maxBackoff <= 16; maxBackoff++)
{
for (int multiplier = 1; multiplier <= 16; multiplier++)
{
var config = new Config
{
MaxBackoffDuration = TimeSpan.FromMilliseconds(maxBackoff),
BackoffMultiplier = TimeSpan.FromMilliseconds(multiplier)
};
config.Validate();

int expectedMaxLevel = 1;
if (maxBackoff != 0 && multiplier != 0)
{
var x = config.MaxBackoffDuration.TotalSeconds / config.BackoffMultiplier.TotalSeconds;
expectedMaxLevel = (int)Math.Ceiling(Math.Log(x, 2)) + 1;
if (expectedMaxLevel <= 0)
expectedMaxLevel = 1;
}

bool increaseBackoffLevel;
if (expectedMaxLevel > 1)
{
increaseBackoffLevel = backoffStrategy.Calculate(config, expectedMaxLevel - 1).IncreaseBackoffLevel;

Assert.IsTrue(increaseBackoffLevel,
string.Format("increaseBackoff max={0} multiplier={1} level={2} expectedMaxLevel={3}",
config.MaxBackoffDuration, config.BackoffMultiplier, expectedMaxLevel - 1, expectedMaxLevel));
}

increaseBackoffLevel = backoffStrategy.Calculate(config, expectedMaxLevel).IncreaseBackoffLevel;

Assert.IsFalse(increaseBackoffLevel,
string.Format("increaseBackoff max={0} multiplier={1} level={2} expectedMaxLevel={3}",
config.MaxBackoffDuration, config.BackoffMultiplier, expectedMaxLevel, expectedMaxLevel));
}
}
}
Expand Down
85 changes: 63 additions & 22 deletions NsqSharp/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,79 @@ internal interface defaultsHandler : configHandler
}

/// <summary>
/// Read only configuration values related to backoff.
/// Read only configuration values related to backoff. See <see cref="IBackoffStrategy"/>.
/// </summary>
public interface IBackoffConfig
{
/// <summary>Unit of time for calculating consumer backoff.</summary>
TimeSpan BackoffMultiplier { get; }
/// <summary>
/// The max backoff duration used for calculating whether the backoff level should increase.
/// See <see cref="IBackoffStrategy.Calculate"/>.
/// </summary>
TimeSpan MaxBackoffDuration { get; }
}

/// <summary>
/// <see cref="IBackoffStrategy" /> defines a strategy for calculating the duration of time
/// a consumer should backoff for a given attempt
/// a consumer should backoff for a given attempt. See <see cref="ExponentialStrategy"/>
/// and <see cref="FullJitterStrategy"/>.
/// </summary>
public interface IBackoffStrategy
{
/// <summary>Calculates the backoff time.</summary>
/// <param name="backoffConfig">Read only configuration values related to backoff.</param>
/// <param name="attempt">The number of times this message has been attempted (first attempt = 1).</param>
/// <returns>The <see cref="TimeSpan"/> to backoff.</returns>
TimeSpan Calculate(IBackoffConfig backoffConfig, int attempt);
/// <param name="backoffLevel">
/// The backoff level (>= 1) used to calculate backoff duration.
/// <paramref name="backoffLevel"/> increases/decreases with successive failures/successes.
/// </param>
/// <returns>A <see cref="BackoffCalculation"/> object with the backoff duration and whether to increase
/// the backoff level.</returns>
BackoffCalculation Calculate(IBackoffConfig backoffConfig, int backoffLevel);
}

/// <summary>
/// <see cref="BackoffCalculation"/> is the return value from <see cref="IBackoffStrategy.Calculate"/>.
/// </summary>
public class BackoffCalculation
{
/// <summary>The backoff duration.</summary>
public TimeSpan Duration { get; set; }
/// <summary>Indicates whether the caller should increase the backoff level.</summary>
public bool IncreaseBackoffLevel { get; set; }
}

/// <summary>
/// <see cref="ExponentialStrategy"/> implements an exponential backoff strategy (default)
/// <see cref="ExponentialStrategy"/> implements an exponential backoff strategy (default).
/// </summary>
public class ExponentialStrategy : IBackoffStrategy
{
/// <summary>
/// Calculate returns a duration of time: 2 ^ attempt * <see cref="IBackoffConfig.BackoffMultiplier"/>.
/// Calculate returns a duration of time: 2^(backoffLevel-1) * <see cref="IBackoffConfig.BackoffMultiplier"/>.
/// </summary>
/// <param name="backoffConfig">Read only configuration values related to backoff.</param>
/// <param name="attempt">The number of times this message has been attempted (first attempt = 1).</param>
/// <returns>The <see cref="TimeSpan"/> to backoff.</returns>
public TimeSpan Calculate(IBackoffConfig backoffConfig, int attempt)
/// <param name="backoffLevel">
/// The backoff level (>= 1) used to calculate backoff duration.
/// <paramref name="backoffLevel"/> increases/decreases with successive failures/successes.
/// </param>
/// <returns>A <see cref="BackoffCalculation"/> object with the backoff duration and whether to increase
/// the backoff level.</returns>
public BackoffCalculation Calculate(IBackoffConfig backoffConfig, int backoffLevel)
{
var backoffDuration = new TimeSpan(backoffConfig.BackoffMultiplier.Ticks *
(long)Math.Pow(2, attempt));
return backoffDuration;
(long)Math.Pow(2, backoffLevel - 1));

return new BackoffCalculation
{
Duration = backoffDuration,
IncreaseBackoffLevel = backoffDuration < backoffConfig.MaxBackoffDuration
};
}
}

/// <summary>
/// FullJitterStrategy returns a random duration of time [0, 2 ^ attempt].
/// FullJitterStrategy returns a random duration of time in the
/// range [0, 2^(backoffLevel-1) * <see cref="IBackoffConfig.BackoffMultiplier"/>).
/// Implements http://www.awsarchitectureblog.com/2015/03/backoff.html.
/// </summary>
public class FullJitterStrategy : IBackoffStrategy
Expand All @@ -75,12 +106,17 @@ public class FullJitterStrategy : IBackoffStrategy
private RNGCryptoServiceProvider rng;

/// <summary>
/// Calculate returns a random duration of time [0, 2 ^ attempt] * <see cref="IBackoffConfig.BackoffMultiplier"/>.
/// Calculate returns a random duration of time in the
/// range [0, 2^(backoffLevel-1) * <see cref="IBackoffConfig.BackoffMultiplier"/>).
/// </summary>
/// <param name="backoffConfig">Read only configuration values related to backoff.</param>
/// <param name="attempt">The number of times this message has been attempted (first attempt = 1).</param>
/// <returns>The <see cref="TimeSpan"/> to backoff.</returns>
public TimeSpan Calculate(IBackoffConfig backoffConfig, int attempt)
/// <param name="backoffLevel">
/// The backoff level (>= 1) used to calculate backoff duration.
/// <paramref name="backoffLevel"/> increases/decreases with successive failures/successes.
/// </param>
/// <returns>A <see cref="BackoffCalculation"/> object with the backoff duration and whether to increase
/// the backoff level.</returns>
public BackoffCalculation Calculate(IBackoffConfig backoffConfig, int backoffLevel)
{
rngOnce.Do(() =>
{
Expand All @@ -92,11 +128,16 @@ public TimeSpan Calculate(IBackoffConfig backoffConfig, int attempt)
);

var backoffDuration = new TimeSpan(backoffConfig.BackoffMultiplier.Ticks *
(long)Math.Pow(2, attempt));
(long)Math.Pow(2, backoffLevel - 1));

int maxBackoffMilliseconds = (int)backoffDuration.TotalMilliseconds;
int backoffMilliseconds = maxBackoffMilliseconds == 0 ? 0 : rng.Intn(maxBackoffMilliseconds);
return TimeSpan.FromMilliseconds(backoffMilliseconds);

return new BackoffCalculation
{
Duration = TimeSpan.FromMilliseconds(backoffMilliseconds),
IncreaseBackoffLevel = backoffDuration < backoffConfig.MaxBackoffDuration
};
}
}

Expand Down Expand Up @@ -166,11 +207,11 @@ public class Config : IBackoffConfig
[Opt("max_backoff_duration"), Min("0"), Max("60m"), Default("2m")]
public TimeSpan MaxBackoffDuration { get; set; }
/// <summary>Unit of time for calculating consumer backoff.
/// Default backoff calculation: <see cref="BackoffMultiplier"/> * (2 ^ backoff counter).
/// Default backoff calculation: 2^(backoffLevel-1) * <see cref="BackoffMultiplier"/>.
/// Will not exceed <see cref="MaxBackoffDuration"/>.
/// See: <see cref="BackoffStrategy"/>
/// Range: 0-60m Default: 1s</summary>
[Opt("backoff_multiplier"), Min("0"), Max("60m"), Default("1s")]
/// Range: 1ms-60m Default: 1s</summary>
[Opt("backoff_multiplier"), Min("1ms"), Max("60m"), Default("1s")]
public TimeSpan BackoffMultiplier { get; set; }

/// <summary>Maximum number of times this consumer will attempt to process a message before giving up.
Expand Down
13 changes: 9 additions & 4 deletions NsqSharp/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,8 +1096,13 @@ private void startStopContinueBackoff(BackoffSignal signal)
}
break;
case BackoffSignal.BackoffFlag:
var nextBackoff = _config.BackoffStrategy.Calculate(_config, backoffCounter + 1);
if (nextBackoff <= _config.MaxBackoffDuration)
bool increaseBackoffLevel = (backoffCounter == 0);
if (!increaseBackoffLevel)
{
increaseBackoffLevel = _config.BackoffStrategy.Calculate(_config, backoffCounter)
.IncreaseBackoffLevel;
}
if (increaseBackoffLevel)
{
backoffCounter++;
backoffUpdated = true;
Expand All @@ -1119,15 +1124,15 @@ private void startStopContinueBackoff(BackoffSignal signal)
else if (backoffCounter > 0)
{
// start or continue backoff
var backoffDuration = _config.BackoffStrategy.Calculate(_config, backoffCounter);
var backoffDuration = _config.BackoffStrategy.Calculate(_config, backoffCounter).Duration;

if (backoffDuration > _config.MaxBackoffDuration)
{
backoffDuration = _config.MaxBackoffDuration;
}

log(LogLevel.Warning,
string.Format("backing off for {0:0.0000} seconds (backoff level {1}), setting all to RDY 0",
string.Format("backing off for {0:0.000} seconds (backoff level {1}), setting all to RDY 0",
backoffDuration.TotalSeconds, backoffCounter
));

Expand Down

0 comments on commit dbf821e

Please sign in to comment.